#include <ReaderRpc.h>
Inheritance diagram for ReaderRpc< Reader >:
Public Member Functions | |
ReaderRpc (const std::string &svrName, const std::vector< std::string > &fnames, int evtMax) | |
virtual | ~ReaderRpc () |
int | wait_to_end () |
Private Member Functions | |
void | rpcHandler () |
ReaderRpc () | |
Private Attributes | |
int | m_evtDone |
int | m_evtMax |
int | m_stopFlag |
int | m_inCode |
int | m_outCode |
const uint32_t * | m_pEvt |
PthrReaderBufPool< Reader > * | m_freader |
sem_t | m_sem |
std::string | m_svrName |
std::map< int, AutoEnlargeBuffer * > | m_evtBak |
Definition at line 13 of file ReaderRpc.h.
ReaderRpc< Reader >::ReaderRpc | ( | const std::string & | svrName, | |
const std::vector< std::string > & | fnames, | |||
int | evtMax | |||
) |
Definition at line 9 of file ReaderRpc.cc.
References ReaderRpc< Reader >::m_freader, ReaderRpc< Reader >::m_sem, and ReaderRpc< Reader >::m_svrName.
00010 : DimRpc(svrName.c_str(), "I", "I"), 00011 m_evtDone(0), 00012 m_evtMax(evtMax), 00013 m_stopFlag(0) 00014 { 00015 sem_init(&m_sem, 0, 0); 00016 m_freader = new PthrReaderBufPool<Reader>( fnames ); 00017 00018 m_svrName = svrName.substr( svrName.find_last_of('/')+1 ); 00019 }
Definition at line 22 of file ReaderRpc.cc.
References ReaderRpc< Reader >::m_evtBak, ReaderRpc< Reader >::m_evtDone, ReaderRpc< Reader >::m_freader, ReaderRpc< Reader >::m_sem, and ReaderRpc< Reader >::m_svrName.
00023 { 00024 sem_destroy(&m_sem); 00025 00026 delete m_freader; 00027 00028 for (std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.begin(); it != m_evtBak.end(); ++it) { 00029 delete it->second; 00030 } 00031 00032 std::cout << "[" << m_svrName << "] Terminated. Total events processed: " << m_evtDone << std::endl; 00033 }
void ReaderRpc< Reader >::rpcHandler | ( | ) | [private] |
Definition at line 56 of file ReaderRpc.cc.
References EvtCyclic3::c_str(), DistBossCode::ClientError, PthrReaderBufPool< Reader, PoolSize >::currentFile(), fname, DistBossCode::GetEvent, DistBossCode::GetFileName, ReaderRpc< Reader >::m_evtBak, ReaderRpc< Reader >::m_evtDone, ReaderRpc< Reader >::m_evtMax, ReaderRpc< Reader >::m_freader, ReaderRpc< Reader >::m_inCode, ReaderRpc< Reader >::m_outCode, ReaderRpc< Reader >::m_pEvt, ReaderRpc< Reader >::m_sem, ReaderRpc< Reader >::m_stopFlag, ReaderRpc< Reader >::m_svrName, PthrReaderBufPool< Reader, PoolSize >::nextEvent(), DistBossCode::NoMoreEvents, RawFileException::print(), DistBossCode::RetryEvent, and deljobs::string.
00057 { 00058 int clientId = DimServer::getClientId(); 00059 m_inCode = getInt(); 00060 00061 switch (m_inCode) { 00062 00063 //-------------------------------------------------------- 00064 case (DistBossCode::GetEvent) : 00065 //-------------------------------------------------------- 00066 { 00067 try { 00068 if ( m_stopFlag != 0 ) { 00069 throw RawExMessage(("["+ m_svrName+"] Server stopflag is set, now waiting clients to exit!").c_str()); 00070 } 00071 m_pEvt = m_freader->nextEvent(); 00072 } 00073 catch ( RawFileException& e ) { 00074 if ( ++m_stopFlag < 3 ) e.print(); 00075 m_outCode = DistBossCode::NoMoreEvents; 00076 setData( (int&)m_outCode ); 00077 if ( m_stopFlag == 1 ) sem_post(&m_sem); 00078 return; 00079 } 00080 00081 setData((void*)m_pEvt, (m_pEvt[1]*4) ); 00082 00083 std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.find( clientId ); 00084 if ( it == m_evtBak.end() ) { 00085 m_evtBak[clientId] = new AutoEnlargeBuffer(); 00086 } 00087 m_evtBak[clientId]->copy((void*)m_pEvt, (m_pEvt[1]*4) ); 00088 00089 ++m_evtDone; 00090 00091 if ( m_evtMax >= 0 && m_evtDone >= m_evtMax ) { 00092 ++m_stopFlag; 00093 std::cout << "[" << m_svrName << "] We have reach the EvtMax!" << std::endl; 00094 sem_post(&m_sem); 00095 } 00096 } 00097 00098 break; 00099 00100 //-------------------------------------------------------- 00101 case (DistBossCode::RetryEvent) : 00102 //-------------------------------------------------------- 00103 { 00104 //std::cout << "[" << m_svrName << "] RESEND event to: " << DimServer::getClientName() << std::endl; 00105 m_pEvt = (uint32_t*)m_evtBak[clientId]->data(); 00106 setData((void*)m_pEvt, (m_pEvt[1]*4) ); 00107 } 00108 break; 00109 00110 //-------------------------------------------------------- 00111 case (DistBossCode::GetFileName) : 00112 //-------------------------------------------------------- 00113 { 00114 std::string fname = m_freader->currentFile(); 00115 setData( (void*)fname.c_str(), (fname.length()+1) ); 00116 } 00117 break; 00118 00119 //-------------------------------------------------------- 00120 case (DistBossCode::ClientError) : 00121 //-------------------------------------------------------- 00122 std::cout << "[" << m_svrName << "] Error code from client[" 00123 << clientId << "]: " 00124 << DimServer::getClientName() 00125 << std::endl; 00126 break; 00127 00128 //-------------------------------------------------------- 00129 default : 00130 //-------------------------------------------------------- 00131 std::cout << "[" << m_svrName << "] Unknown code(0x" 00132 << std::hex << m_inCode << std::dec 00133 << ") from client[" << clientId << "]: " 00134 << DimServer::getClientName() 00135 << std::endl; 00136 } 00137 }
int ReaderRpc< Reader >::wait_to_end | ( | ) | [virtual] |
Implements RpcInterface.
Definition at line 36 of file ReaderRpc.cc.
References genRecEmupikp::i, and ReaderRpc< Reader >::m_sem.
00037 { 00038 sem_wait(&m_sem); 00039 00040 int nClients = dis_get_n_clients(itsIdOut); 00041 00042 int i = 0; 00043 while ( nClients > 0 ) { 00044 // wait for all the clients dis-connecting to this server 00045 // and force the server to stop when the total waiting time 00046 // is greater than 5s(5*1s) 00047 if ( ++i > 5 ) break; 00048 sleep(1); 00049 nClients = dis_get_n_clients(itsIdOut); 00050 } 00051 00052 return nClients; 00053 }
std::map<int, AutoEnlargeBuffer*> ReaderRpc< Reader >::m_evtBak [private] |
Definition at line 47 of file ReaderRpc.h.
Referenced by ReaderRpc< Reader >::rpcHandler(), and ReaderRpc< Reader >::~ReaderRpc().
Definition at line 32 of file ReaderRpc.h.
Referenced by ReaderRpc< Reader >::rpcHandler(), and ReaderRpc< Reader >::~ReaderRpc().
PthrReaderBufPool<Reader>* ReaderRpc< Reader >::m_freader [private] |
Definition at line 41 of file ReaderRpc.h.
Referenced by ReaderRpc< Reader >::ReaderRpc(), ReaderRpc< Reader >::rpcHandler(), and ReaderRpc< Reader >::~ReaderRpc().
Definition at line 43 of file ReaderRpc.h.
Referenced by ReaderRpc< Reader >::ReaderRpc(), ReaderRpc< Reader >::rpcHandler(), ReaderRpc< Reader >::wait_to_end(), and ReaderRpc< Reader >::~ReaderRpc().
int ReaderRpc< Reader >::m_stopFlag [private] |
std::string ReaderRpc< Reader >::m_svrName [private] |
Definition at line 45 of file ReaderRpc.h.
Referenced by ReaderRpc< Reader >::ReaderRpc(), ReaderRpc< Reader >::rpcHandler(), and ReaderRpc< Reader >::~ReaderRpc().