#include <WriterRpc.h>
Inheritance diagram for WriterRpc< Writer >:
Public Member Functions | |
WriterRpc (const std::string &svrName, const std::string &fname) | |
virtual | ~WriterRpc () |
int | wait_to_end () |
Private Member Functions | |
void | rpcHandler () |
void | clearBak (int clientId) |
WriterRpc () | |
Private Attributes | |
int | m_evtDone |
int | m_inCode |
int | m_outCode |
int | m_size |
void * | m_pEvt |
PthrWriterBufPool< Writer > * | m_writer |
std::string | m_svrName |
std::map< int, AutoEnlargeBuffer * > | m_evtBak |
Definition at line 11 of file WriterRpc.h.
WriterRpc< Writer >::WriterRpc | ( | const std::string & | svrName, | |
const std::string & | fname | |||
) |
Definition at line 8 of file WriterRpc.cc.
References WriterRpc< Writer >::m_svrName, and WriterRpc< Writer >::m_writer.
00009 : DimRpc(svrName.c_str(), "I", "I"), 00010 m_evtDone(0) 00011 { 00012 m_writer = new PthrWriterBufPool<Writer>(fname); 00013 00014 m_svrName = svrName.substr( svrName.find_last_of('/')+1 ); 00015 }
Definition at line 18 of file WriterRpc.cc.
References WriterRpc< Writer >::clearBak(), WriterRpc< Writer >::m_evtBak, WriterRpc< Writer >::m_evtDone, WriterRpc< Writer >::m_svrName, WriterRpc< Writer >::m_writer, PthrWriterBufPool< Writer, PoolSize >::stat(), DistBossCode::StatusFinalize, and PthrWriterBufPool< Writer, PoolSize >::writeEvent().
00019 { 00020 if ( m_writer->stat() == 0 ) { 00021 while ( m_evtBak.size() > 0 ) { 00022 clearBak(-1); 00023 } 00024 if ( dis_get_n_clients(itsIdOut) > 0 ) { 00025 std::cout << "[" << m_svrName << "] PROBLEMS @ TERMINATING. FORCE TO QUIT" << std::endl; 00026 } 00027 int code = DistBossCode::StatusFinalize; 00028 m_writer->writeEvent(&code, 4); 00029 } 00030 00031 delete m_writer; 00032 00033 for (std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.begin(); it != m_evtBak.end(); ++it) { 00034 delete it->second; 00035 } 00036 00037 std::cout << "[" << m_svrName << "] Terminated. Total events processed: " << m_evtDone << std::endl; 00038 }
void WriterRpc< Writer >::clearBak | ( | int | clientId | ) | [private] |
Definition at line 105 of file WriterRpc.cc.
References WriterRpc< Writer >::m_evtBak, WriterRpc< Writer >::m_evtDone, WriterRpc< Writer >::m_writer, and PthrWriterBufPool< Writer, PoolSize >::writeEvent().
Referenced by WriterRpc< Writer >::rpcHandler(), and WriterRpc< Writer >::~WriterRpc().
00106 { 00107 std::map<int, AutoEnlargeBuffer*>::iterator it = (clientId<0) ? m_evtBak.begin() : m_evtBak.find( clientId ); 00108 if ( it != m_evtBak.end() ) { 00109 m_writer->writeEvent( it->second->data(), it->second->size() ); 00110 ++m_evtDone; 00111 00112 delete it->second; 00113 m_evtBak.erase(it); 00114 } 00115 }
void WriterRpc< Writer >::rpcHandler | ( | ) | [private] |
Definition at line 60 of file WriterRpc.cc.
References WriterRpc< Writer >::clearBak(), DistBossCode::ClientReady, WriterRpc< Writer >::m_evtBak, WriterRpc< Writer >::m_evtDone, WriterRpc< Writer >::m_pEvt, WriterRpc< Writer >::m_size, WriterRpc< Writer >::m_svrName, WriterRpc< Writer >::m_writer, DistBossCode::ServerTimeout, delete_small_size::size, DistBossCode::StatusFinalize, DistBossCode::StatusSuccess, and PthrWriterBufPool< Writer, PoolSize >::writeEvent().
00061 { 00062 m_size = getSize(); 00063 m_pEvt = getData(); 00064 00065 int clientId = DimServer::getClientId(); 00066 int outCode = DistBossCode::StatusSuccess; 00067 00068 if ( m_size == 4 ) { 00069 unsigned int inCode = *(unsigned int*)m_pEvt; 00070 00071 if ( inCode == DistBossCode::StatusFinalize ) { 00072 std::cout << "[" << m_svrName << "] Client [" << DimServer::getClientName() << "] finalized" << std::endl; 00073 clearBak(clientId); 00074 if ( dis_get_n_clients(itsIdOut) <= 1 ) { 00075 m_writer->writeEvent(m_pEvt, m_size); 00076 } 00077 } 00078 else if ( inCode == DistBossCode::ClientReady ) { 00079 std::cout << "[" << m_svrName << "] Client [" << DimServer::getClientName() << "] connected" << std::endl; 00080 } 00081 } 00082 else if ( m_size > 4 ) { 00083 std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.find( clientId ); 00084 if ( it == m_evtBak.end() ) { 00085 m_evtBak[clientId] = new AutoEnlargeBuffer(); 00086 m_evtBak[clientId]->copy(m_pEvt, m_size ); 00087 } 00088 00089 void* pbak = m_evtBak[clientId]->data(); 00090 if ( *(int*)pbak != *(int*)m_pEvt ) { 00091 m_writer->writeEvent(pbak, m_evtBak[clientId]->size()); 00092 ++m_evtDone; 00093 } 00094 00095 m_evtBak[clientId]->copy(m_pEvt, m_size ); 00096 } 00097 else { 00098 outCode = DistBossCode::ServerTimeout; 00099 } 00100 00101 setData(outCode); 00102 }
int WriterRpc< Writer >::wait_to_end | ( | ) | [virtual] |
Implements RpcInterface.
Definition at line 41 of file WriterRpc.cc.
References genRecEmupikp::i.
00042 { 00043 int nClients = 0; 00044 int i = 0; 00045 00046 do { 00047 // wait for all the clients dis-connecting to this server 00048 // and force the server to stop when the total waiting time 00049 // is greater than 10s(5*1s) 00050 if ( ++i > 5 ) break; 00051 sleep(2); 00052 nClients = dis_get_n_clients(itsIdOut); 00053 } 00054 while ( nClients > 0 ); 00055 00056 return nClients; 00057 }
std::map<int, AutoEnlargeBuffer*> WriterRpc< Writer >::m_evtBak [private] |
Definition at line 44 of file WriterRpc.h.
Referenced by WriterRpc< Writer >::clearBak(), WriterRpc< Writer >::rpcHandler(), and WriterRpc< Writer >::~WriterRpc().
Definition at line 32 of file WriterRpc.h.
Referenced by WriterRpc< Writer >::clearBak(), WriterRpc< Writer >::rpcHandler(), and WriterRpc< Writer >::~WriterRpc().
Definition at line 34 of file WriterRpc.h.
Definition at line 35 of file WriterRpc.h.
std::string WriterRpc< Writer >::m_svrName [private] |
Definition at line 42 of file WriterRpc.h.
Referenced by WriterRpc< Writer >::rpcHandler(), WriterRpc< Writer >::WriterRpc(), and WriterRpc< Writer >::~WriterRpc().
PthrWriterBufPool<Writer>* WriterRpc< Writer >::m_writer [private] |
Definition at line 40 of file WriterRpc.h.
Referenced by WriterRpc< Writer >::clearBak(), WriterRpc< Writer >::rpcHandler(), WriterRpc< Writer >::WriterRpc(), and WriterRpc< Writer >::~WriterRpc().