00001 #ifndef DISTBOSS_READER_RPC_CC
00002 #define DISTBOSS_READER_RPC_CC
00003
00004 #include "DistBossUtil/DistBossCode.h"
00005 #include "IRawFile/RawFileExceptions.h"
00006 #include <iostream>
00007
00008 template<class Reader>
00009 ReaderRpc<Reader>::ReaderRpc(const std::string& svrName, const std::vector<std::string>& fnames, int evtMax)
00010 : DimRpc(svrName.c_str(), "I", "I"),
00011 m_evtDone(0),
00012 m_evtMax(evtMax),
00013 m_stopFlag(0)
00014 {
00015 sem_init(&m_sem, 0, 0);
00016 m_freader = new PthrReaderBufPool<Reader>( fnames );
00017
00018 m_svrName = svrName.substr( svrName.find_last_of('/')+1 );
00019 }
00020
00021 template<class Reader>
00022 ReaderRpc<Reader>::~ReaderRpc()
00023 {
00024 sem_destroy(&m_sem);
00025
00026 delete m_freader;
00027
00028 for (std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.begin(); it != m_evtBak.end(); ++it) {
00029 delete it->second;
00030 }
00031
00032 std::cout << "[" << m_svrName << "] Terminated. Total events processed: " << m_evtDone << std::endl;
00033 }
00034
00035 template<class Reader>
00036 int ReaderRpc<Reader>::wait_to_end()
00037 {
00038 sem_wait(&m_sem);
00039
00040 int nClients = dis_get_n_clients(itsIdOut);
00041
00042 int i = 0;
00043 while ( nClients > 0 ) {
00044
00045
00046
00047 if ( ++i > 5 ) break;
00048 sleep(1);
00049 nClients = dis_get_n_clients(itsIdOut);
00050 }
00051
00052 return nClients;
00053 }
00054
00055 template<class Reader>
00056 void ReaderRpc<Reader>::rpcHandler()
00057 {
00058 int clientId = DimServer::getClientId();
00059 m_inCode = getInt();
00060
00061 switch (m_inCode) {
00062
00063
00064 case (DistBossCode::GetEvent) :
00065
00066 {
00067 try {
00068 if ( m_stopFlag != 0 ) {
00069 throw RawExMessage(("["+ m_svrName+"] Server stopflag is set, now waiting clients to exit!").c_str());
00070 }
00071 m_pEvt = m_freader->nextEvent();
00072 }
00073 catch ( RawFileException& e ) {
00074 if ( ++m_stopFlag < 3 ) e.print();
00075 m_outCode = DistBossCode::NoMoreEvents;
00076 setData( (int&)m_outCode );
00077 if ( m_stopFlag == 1 ) sem_post(&m_sem);
00078 return;
00079 }
00080
00081 setData((void*)m_pEvt, (m_pEvt[1]*4) );
00082
00083 std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.find( clientId );
00084 if ( it == m_evtBak.end() ) {
00085 m_evtBak[clientId] = new AutoEnlargeBuffer();
00086 }
00087 m_evtBak[clientId]->copy((void*)m_pEvt, (m_pEvt[1]*4) );
00088
00089 ++m_evtDone;
00090
00091 if ( m_evtMax >= 0 && m_evtDone >= m_evtMax ) {
00092 ++m_stopFlag;
00093 std::cout << "[" << m_svrName << "] We have reach the EvtMax!" << std::endl;
00094 sem_post(&m_sem);
00095 }
00096 }
00097
00098 break;
00099
00100
00101 case (DistBossCode::RetryEvent) :
00102
00103 {
00104
00105 m_pEvt = (uint32_t*)m_evtBak[clientId]->data();
00106 setData((void*)m_pEvt, (m_pEvt[1]*4) );
00107 }
00108 break;
00109
00110
00111 case (DistBossCode::GetFileName) :
00112
00113 {
00114 std::string fname = m_freader->currentFile();
00115 setData( (void*)fname.c_str(), (fname.length()+1) );
00116 }
00117 break;
00118
00119
00120 case (DistBossCode::ClientError) :
00121
00122 std::cout << "[" << m_svrName << "] Error code from client["
00123 << clientId << "]: "
00124 << DimServer::getClientName()
00125 << std::endl;
00126 break;
00127
00128
00129 default :
00130
00131 std::cout << "[" << m_svrName << "] Unknown code(0x"
00132 << std::hex << m_inCode << std::dec
00133 << ") from client[" << clientId << "]: "
00134 << DimServer::getClientName()
00135 << std::endl;
00136 }
00137 }
00138
00139 #endif