ReaderRpc< Reader > Class Template Reference

#include <ReaderRpc.h>

Inheritance diagram for ReaderRpc< Reader >:

RpcInterface List of all members.

Public Member Functions

 ReaderRpc (const std::string &svrName, const std::vector< std::string > &fnames, int evtMax)
virtual ~ReaderRpc ()
int wait_to_end ()

Private Member Functions

void rpcHandler ()
 ReaderRpc ()

Private Attributes

int m_evtDone
int m_evtMax
int m_stopFlag
int m_inCode
int m_outCode
const uint32_t * m_pEvt
PthrReaderBufPool< Reader > * m_freader
sem_t m_sem
std::string m_svrName
std::map< int, AutoEnlargeBuffer * > m_evtBak

Detailed Description

template<class Reader>
class ReaderRpc< Reader >

Definition at line 13 of file ReaderRpc.h.


Constructor & Destructor Documentation

template<class Reader>
ReaderRpc< Reader >::ReaderRpc ( const std::string svrName,
const std::vector< std::string > &  fnames,
int  evtMax 
)

Definition at line 9 of file ReaderRpc.cc.

References ReaderRpc< Reader >::m_freader, ReaderRpc< Reader >::m_sem, and ReaderRpc< Reader >::m_svrName.

00010    : DimRpc(svrName.c_str(), "I", "I"),
00011      m_evtDone(0),
00012      m_evtMax(evtMax),
00013      m_stopFlag(0)
00014 {
00015    sem_init(&m_sem, 0, 0);
00016    m_freader = new PthrReaderBufPool<Reader>( fnames );
00017 
00018    m_svrName = svrName.substr( svrName.find_last_of('/')+1 );
00019 }

template<class Reader>
ReaderRpc< Reader >::~ReaderRpc (  )  [virtual]

Definition at line 22 of file ReaderRpc.cc.

References ReaderRpc< Reader >::m_evtBak, ReaderRpc< Reader >::m_evtDone, ReaderRpc< Reader >::m_freader, ReaderRpc< Reader >::m_sem, and ReaderRpc< Reader >::m_svrName.

00023 {
00024    sem_destroy(&m_sem);
00025 
00026    delete m_freader;
00027 
00028    for (std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.begin(); it != m_evtBak.end(); ++it) {
00029       delete it->second;
00030    }
00031 
00032    std::cout << "[" << m_svrName << "] Terminated. Total events processed: " << m_evtDone << std::endl;
00033 }

template<class Reader>
ReaderRpc< Reader >::ReaderRpc (  )  [private]


Member Function Documentation

template<class Reader>
void ReaderRpc< Reader >::rpcHandler (  )  [private]

Definition at line 56 of file ReaderRpc.cc.

References EvtCyclic3::c_str(), DistBossCode::ClientError, PthrReaderBufPool< Reader, PoolSize >::currentFile(), fname, DistBossCode::GetEvent, DistBossCode::GetFileName, ReaderRpc< Reader >::m_evtBak, ReaderRpc< Reader >::m_evtDone, ReaderRpc< Reader >::m_evtMax, ReaderRpc< Reader >::m_freader, ReaderRpc< Reader >::m_inCode, ReaderRpc< Reader >::m_outCode, ReaderRpc< Reader >::m_pEvt, ReaderRpc< Reader >::m_sem, ReaderRpc< Reader >::m_stopFlag, ReaderRpc< Reader >::m_svrName, PthrReaderBufPool< Reader, PoolSize >::nextEvent(), DistBossCode::NoMoreEvents, RawFileException::print(), DistBossCode::RetryEvent, and deljobs::string.

00057 {
00058    int clientId = DimServer::getClientId();
00059    m_inCode = getInt();
00060 
00061    switch (m_inCode) {
00062 
00063       //--------------------------------------------------------
00064       case (DistBossCode::GetEvent) :
00065       //--------------------------------------------------------
00066          {
00067             try {
00068                if ( m_stopFlag != 0 ) {
00069                   throw RawExMessage(("["+ m_svrName+"] Server stopflag is set, now waiting clients to exit!").c_str());
00070                }
00071                m_pEvt = m_freader->nextEvent();
00072             }
00073             catch ( RawFileException& e ) {
00074                if ( ++m_stopFlag < 3 ) e.print();
00075                m_outCode = DistBossCode::NoMoreEvents;
00076                setData( (int&)m_outCode );
00077                if ( m_stopFlag == 1 ) sem_post(&m_sem);
00078                return;
00079             }
00080 
00081             setData((void*)m_pEvt, (m_pEvt[1]*4) );
00082 
00083             std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.find( clientId );
00084             if ( it == m_evtBak.end() ) {
00085                m_evtBak[clientId] = new AutoEnlargeBuffer();
00086             }
00087             m_evtBak[clientId]->copy((void*)m_pEvt, (m_pEvt[1]*4) );
00088 
00089             ++m_evtDone;
00090 
00091             if ( m_evtMax >= 0 && m_evtDone >= m_evtMax ) {
00092                ++m_stopFlag;
00093                std::cout << "[" << m_svrName << "] We have reach the EvtMax!" << std::endl;
00094                sem_post(&m_sem);
00095             }
00096          }
00097 
00098          break;
00099 
00100       //--------------------------------------------------------
00101       case (DistBossCode::RetryEvent) :
00102       //--------------------------------------------------------
00103          {
00104             //std::cout << "[" << m_svrName << "] RESEND event to: " << DimServer::getClientName() << std::endl;
00105             m_pEvt = (uint32_t*)m_evtBak[clientId]->data();
00106             setData((void*)m_pEvt, (m_pEvt[1]*4) );
00107          }
00108          break;
00109 
00110       //--------------------------------------------------------
00111       case (DistBossCode::GetFileName) :
00112       //--------------------------------------------------------
00113          {
00114             std::string fname = m_freader->currentFile();
00115             setData( (void*)fname.c_str(), (fname.length()+1) );
00116          }
00117          break;
00118 
00119       //--------------------------------------------------------
00120       case (DistBossCode::ClientError) :
00121       //--------------------------------------------------------
00122          std::cout << "[" << m_svrName << "] Error code from client["
00123                    << clientId << "]: "
00124                    << DimServer::getClientName()
00125                    << std::endl;
00126          break;
00127 
00128       //--------------------------------------------------------
00129       default :
00130       //--------------------------------------------------------
00131          std::cout << "[" << m_svrName << "] Unknown code(0x"
00132                    << std::hex << m_inCode << std::dec
00133                    << ") from client[" << clientId << "]: " 
00134                    << DimServer::getClientName()
00135                    << std::endl;
00136    }
00137 }

template<class Reader>
int ReaderRpc< Reader >::wait_to_end (  )  [virtual]

Implements RpcInterface.

Definition at line 36 of file ReaderRpc.cc.

References genRecEmupikp::i, and ReaderRpc< Reader >::m_sem.

00037 {
00038    sem_wait(&m_sem);
00039 
00040    int nClients = dis_get_n_clients(itsIdOut);
00041 
00042    int i = 0;
00043    while ( nClients > 0 ) {
00044       // wait for all the clients dis-connecting to this server
00045       // and force the server to stop when the total waiting time
00046       // is greater than 5s(5*1s)
00047       if ( ++i > 5 ) break;
00048       sleep(1);
00049       nClients = dis_get_n_clients(itsIdOut);
00050    }
00051 
00052    return nClients;
00053 }


Member Data Documentation

template<class Reader>
std::map<int, AutoEnlargeBuffer*> ReaderRpc< Reader >::m_evtBak [private]

Definition at line 47 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::rpcHandler(), and ReaderRpc< Reader >::~ReaderRpc().

template<class Reader>
int ReaderRpc< Reader >::m_evtDone [private]

Definition at line 32 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::rpcHandler(), and ReaderRpc< Reader >::~ReaderRpc().

template<class Reader>
int ReaderRpc< Reader >::m_evtMax [private]

Definition at line 33 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::rpcHandler().

template<class Reader>
PthrReaderBufPool<Reader>* ReaderRpc< Reader >::m_freader [private]

Definition at line 41 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::ReaderRpc(), ReaderRpc< Reader >::rpcHandler(), and ReaderRpc< Reader >::~ReaderRpc().

template<class Reader>
int ReaderRpc< Reader >::m_inCode [private]

Definition at line 37 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::rpcHandler().

template<class Reader>
int ReaderRpc< Reader >::m_outCode [private]

Definition at line 38 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::rpcHandler().

template<class Reader>
const uint32_t* ReaderRpc< Reader >::m_pEvt [private]

Definition at line 39 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::rpcHandler().

template<class Reader>
sem_t ReaderRpc< Reader >::m_sem [private]

Definition at line 43 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::ReaderRpc(), ReaderRpc< Reader >::rpcHandler(), ReaderRpc< Reader >::wait_to_end(), and ReaderRpc< Reader >::~ReaderRpc().

template<class Reader>
int ReaderRpc< Reader >::m_stopFlag [private]

Definition at line 35 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::rpcHandler().

template<class Reader>
std::string ReaderRpc< Reader >::m_svrName [private]

Definition at line 45 of file ReaderRpc.h.

Referenced by ReaderRpc< Reader >::ReaderRpc(), ReaderRpc< Reader >::rpcHandler(), and ReaderRpc< Reader >::~ReaderRpc().


Generated on Tue Nov 29 23:20:47 2016 for BOSS_7.0.2 by  doxygen 1.4.7