/home/bes3soft/bes3soft/Boss/7.0.2/dist/7.0.2/DistBoss/DistBossServer/DistBossServer-00-00-04/DistBossServer/template/ReaderRpc.cc

Go to the documentation of this file.
00001 #ifndef DISTBOSS_READER_RPC_CC
00002 #define DISTBOSS_READER_RPC_CC
00003 
00004 #include "DistBossUtil/DistBossCode.h"
00005 #include "IRawFile/RawFileExceptions.h"
00006 #include <iostream>
00007 
00008 template<class Reader>
00009 ReaderRpc<Reader>::ReaderRpc(const std::string& svrName, const std::vector<std::string>& fnames, int evtMax)
00010    : DimRpc(svrName.c_str(), "I", "I"),
00011      m_evtDone(0),
00012      m_evtMax(evtMax),
00013      m_stopFlag(0)
00014 {
00015    sem_init(&m_sem, 0, 0);
00016    m_freader = new PthrReaderBufPool<Reader>( fnames );
00017 
00018    m_svrName = svrName.substr( svrName.find_last_of('/')+1 );
00019 }
00020 
00021 template<class Reader>
00022 ReaderRpc<Reader>::~ReaderRpc()
00023 {
00024    sem_destroy(&m_sem);
00025 
00026    delete m_freader;
00027 
00028    for (std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.begin(); it != m_evtBak.end(); ++it) {
00029       delete it->second;
00030    }
00031 
00032    std::cout << "[" << m_svrName << "] Terminated. Total events processed: " << m_evtDone << std::endl;
00033 }
00034 
00035 template<class Reader>
00036 int ReaderRpc<Reader>::wait_to_end()
00037 {
00038    sem_wait(&m_sem);
00039 
00040    int nClients = dis_get_n_clients(itsIdOut);
00041 
00042    int i = 0;
00043    while ( nClients > 0 ) {
00044       // wait for all the clients dis-connecting to this server
00045       // and force the server to stop when the total waiting time
00046       // is greater than 5s(5*1s)
00047       if ( ++i > 5 ) break;
00048       sleep(1);
00049       nClients = dis_get_n_clients(itsIdOut);
00050    }
00051 
00052    return nClients;
00053 }
00054 
00055 template<class Reader>
00056 void ReaderRpc<Reader>::rpcHandler()
00057 {
00058    int clientId = DimServer::getClientId();
00059    m_inCode = getInt();
00060 
00061    switch (m_inCode) {
00062 
00063       //--------------------------------------------------------
00064       case (DistBossCode::GetEvent) :
00065       //--------------------------------------------------------
00066          {
00067             try {
00068                if ( m_stopFlag != 0 ) {
00069                   throw RawExMessage(("["+ m_svrName+"] Server stopflag is set, now waiting clients to exit!").c_str());
00070                }
00071                m_pEvt = m_freader->nextEvent();
00072             }
00073             catch ( RawFileException& e ) {
00074                if ( ++m_stopFlag < 3 ) e.print();
00075                m_outCode = DistBossCode::NoMoreEvents;
00076                setData( (int&)m_outCode );
00077                if ( m_stopFlag == 1 ) sem_post(&m_sem);
00078                return;
00079             }
00080 
00081             setData((void*)m_pEvt, (m_pEvt[1]*4) );
00082 
00083             std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.find( clientId );
00084             if ( it == m_evtBak.end() ) {
00085                m_evtBak[clientId] = new AutoEnlargeBuffer();
00086             }
00087             m_evtBak[clientId]->copy((void*)m_pEvt, (m_pEvt[1]*4) );
00088 
00089             ++m_evtDone;
00090 
00091             if ( m_evtMax >= 0 && m_evtDone >= m_evtMax ) {
00092                ++m_stopFlag;
00093                std::cout << "[" << m_svrName << "] We have reach the EvtMax!" << std::endl;
00094                sem_post(&m_sem);
00095             }
00096          }
00097 
00098          break;
00099 
00100       //--------------------------------------------------------
00101       case (DistBossCode::RetryEvent) :
00102       //--------------------------------------------------------
00103          {
00104             //std::cout << "[" << m_svrName << "] RESEND event to: " << DimServer::getClientName() << std::endl;
00105             m_pEvt = (uint32_t*)m_evtBak[clientId]->data();
00106             setData((void*)m_pEvt, (m_pEvt[1]*4) );
00107          }
00108          break;
00109 
00110       //--------------------------------------------------------
00111       case (DistBossCode::GetFileName) :
00112       //--------------------------------------------------------
00113          {
00114             std::string fname = m_freader->currentFile();
00115             setData( (void*)fname.c_str(), (fname.length()+1) );
00116          }
00117          break;
00118 
00119       //--------------------------------------------------------
00120       case (DistBossCode::ClientError) :
00121       //--------------------------------------------------------
00122          std::cout << "[" << m_svrName << "] Error code from client["
00123                    << clientId << "]: "
00124                    << DimServer::getClientName()
00125                    << std::endl;
00126          break;
00127 
00128       //--------------------------------------------------------
00129       default :
00130       //--------------------------------------------------------
00131          std::cout << "[" << m_svrName << "] Unknown code(0x"
00132                    << std::hex << m_inCode << std::dec
00133                    << ") from client[" << clientId << "]: " 
00134                    << DimServer::getClientName()
00135                    << std::endl;
00136    }
00137 }
00138 
00139 #endif

Generated on Tue Nov 29 22:58:02 2016 for BOSS_7.0.2 by  doxygen 1.4.7