00001 #ifndef DISTBOSS_WRITER_RPC_CC
00002 #define DISTBOSS_WRITER_RPC_CC
00003
00004 #include "DistBossUtil/DistBossCode.h"
00005 #include <iostream>
00006
00007 template<class Writer>
00008 WriterRpc<Writer>::WriterRpc(const std::string& svrName, const std::string& fname)
00009 : DimRpc(svrName.c_str(), "I", "I"),
00010 m_evtDone(0)
00011 {
00012 m_writer = new PthrWriterBufPool<Writer>(fname);
00013
00014 m_svrName = svrName.substr( svrName.find_last_of('/')+1 );
00015 }
00016
00017 template<class Writer>
00018 WriterRpc<Writer>::~WriterRpc()
00019 {
00020 if ( m_writer->stat() == 0 ) {
00021 while ( m_evtBak.size() > 0 ) {
00022 clearBak(-1);
00023 }
00024 if ( dis_get_n_clients(itsIdOut) > 0 ) {
00025 std::cout << "[" << m_svrName << "] PROBLEMS @ TERMINATING. FORCE TO QUIT" << std::endl;
00026 }
00027 int code = DistBossCode::StatusFinalize;
00028 m_writer->writeEvent(&code, 4);
00029 }
00030
00031 delete m_writer;
00032
00033 for (std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.begin(); it != m_evtBak.end(); ++it) {
00034 delete it->second;
00035 }
00036
00037 std::cout << "[" << m_svrName << "] Terminated. Total events processed: " << m_evtDone << std::endl;
00038 }
00039
00040 template<class Writer>
00041 int WriterRpc<Writer>::wait_to_end()
00042 {
00043 int nClients = 0;
00044 int i = 0;
00045
00046 do {
00047
00048
00049
00050 if ( ++i > 5 ) break;
00051 sleep(2);
00052 nClients = dis_get_n_clients(itsIdOut);
00053 }
00054 while ( nClients > 0 );
00055
00056 return nClients;
00057 }
00058
00059 template<class Writer>
00060 void WriterRpc<Writer>::rpcHandler()
00061 {
00062 m_size = getSize();
00063 m_pEvt = getData();
00064
00065 int clientId = DimServer::getClientId();
00066 int outCode = DistBossCode::StatusSuccess;
00067
00068 if ( m_size == 4 ) {
00069 unsigned int inCode = *(unsigned int*)m_pEvt;
00070
00071 if ( inCode == DistBossCode::StatusFinalize ) {
00072 std::cout << "[" << m_svrName << "] Client [" << DimServer::getClientName() << "] finalized" << std::endl;
00073 clearBak(clientId);
00074 if ( dis_get_n_clients(itsIdOut) <= 1 ) {
00075 m_writer->writeEvent(m_pEvt, m_size);
00076 }
00077 }
00078 else if ( inCode == DistBossCode::ClientReady ) {
00079 std::cout << "[" << m_svrName << "] Client [" << DimServer::getClientName() << "] connected" << std::endl;
00080 }
00081 }
00082 else if ( m_size > 4 ) {
00083 std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.find( clientId );
00084 if ( it == m_evtBak.end() ) {
00085 m_evtBak[clientId] = new AutoEnlargeBuffer();
00086 m_evtBak[clientId]->copy(m_pEvt, m_size );
00087 }
00088
00089 void* pbak = m_evtBak[clientId]->data();
00090 if ( *(int*)pbak != *(int*)m_pEvt ) {
00091 m_writer->writeEvent(pbak, m_evtBak[clientId]->size());
00092 ++m_evtDone;
00093 }
00094
00095 m_evtBak[clientId]->copy(m_pEvt, m_size );
00096 }
00097 else {
00098 outCode = DistBossCode::ServerTimeout;
00099 }
00100
00101 setData(outCode);
00102 }
00103
00104 template<class Writer>
00105 void WriterRpc<Writer>::clearBak(int clientId)
00106 {
00107 std::map<int, AutoEnlargeBuffer*>::iterator it = (clientId<0) ? m_evtBak.begin() : m_evtBak.find( clientId );
00108 if ( it != m_evtBak.end() ) {
00109 m_writer->writeEvent( it->second->data(), it->second->size() );
00110 ++m_evtDone;
00111
00112 delete it->second;
00113 m_evtBak.erase(it);
00114 }
00115 }
00116
00117 #endif