#include <DimRpcReader.h>
Public Types | |
typedef const std::string | ReaderArgType |
Public Member Functions | |
DimRpcReader (ReaderArgType &name) | |
virtual | ~DimRpcReader () |
const uint32_t * | nextEvent () |
const uint32_t * | currentEvent () const |
uint32_t | runNo () |
std::string | currentFile () |
uint32_t | stat () |
Private Member Functions | |
DimRpcReader () | |
Private Attributes | |
int | m_inCode |
int | m_outCode |
AutoEnlargeBuffer * | m_buffer |
DimRpcInfo * | m_rpc |
Static Private Attributes | |
static pthread_mutex_t | m_rpcLock = PTHREAD_MUTEX_INITIALIZER |
Definition at line 11 of file DimRpcReader.h.
typedef const std::string DimRpcReader::ReaderArgType |
Definition at line 15 of file DimRpcReader.h.
DimRpcReader::DimRpcReader | ( | ReaderArgType & | name | ) |
Definition at line 11 of file DimRpcReader.cxx.
References m_buffer, m_rpc, ClientErrHandler::registerInstance(), and DistBossCode::ServerTimeout.
00012 { 00013 if ( name.empty() ) { 00014 throw RawExMessage("[NetDataReader] The name of DistBoss EvtServer was not set!"); 00015 } 00016 00017 ClientErrHandler::registerInstance(); 00018 00019 m_buffer = new AutoEnlargeBuffer(128*1024); 00020 00021 m_rpc = new DimRpcInfo(name.c_str(), 5, DistBossCode::ServerTimeout); 00022 }
DimRpcReader::~DimRpcReader | ( | ) | [virtual] |
Definition at line 24 of file DimRpcReader.cxx.
References m_rpc.
00025 { 00026 delete m_rpc; 00027 }
DimRpcReader::DimRpcReader | ( | ) | [private] |
const uint32_t * DimRpcReader::currentEvent | ( | ) | const |
std::string DimRpcReader::currentFile | ( | ) |
Definition at line 97 of file DimRpcReader.cxx.
References fname, DistBossCode::GetFileName, m_outCode, m_rpc, m_rpcLock, and deljobs::string.
00098 { 00099 pthread_mutex_lock( &m_rpcLock ); 00100 00101 m_outCode = DistBossCode::GetFileName; 00102 m_rpc->setData(m_outCode); 00103 // should check status code here, correct it in future 00104 std::string fname((char*)m_rpc->getData()); 00105 00106 pthread_mutex_unlock( &m_rpcLock ); 00107 00108 return fname; 00109 }
const uint32_t * DimRpcReader::nextEvent | ( | ) |
Definition at line 29 of file DimRpcReader.cxx.
References AutoEnlargeBuffer::copy(), AutoEnlargeBuffer::data(), DistBossCode::GetEvent, genRecEmupikp::i, m_buffer, m_inCode, m_outCode, m_rpc, m_rpcLock, DistBossCode::NoMoreEvents, DistBossCode::RetryEvent, DistBossCode::ServerError, DistBossCode::ServerTimeout, and delete_small_size::size.
00030 { 00031 static int nn = 0; 00032 ++nn; 00033 00034 int theCode = DistBossCode::GetEvent; 00035 00036 for ( int i = 1; i < 7; ++i ) { 00037 pthread_mutex_lock( &m_rpcLock ); 00038 00039 m_outCode = theCode; 00040 m_rpc->setData(m_outCode); 00041 int size = m_rpc->getSize(); 00042 void* data = m_rpc->getData(); 00043 m_buffer->copy(data, size); 00044 00045 pthread_mutex_unlock( &m_rpcLock ); 00046 00047 if ( size > 4 ) { 00048 return (const uint32_t*)m_buffer->data(); 00049 } 00050 else if ( size == 4 ) { 00051 m_inCode = *((const uint32_t*)m_buffer->data()); 00052 if ( m_inCode == DistBossCode::NoMoreEvents ) { 00053 throw RawExMessage("[NetDataReader] Reach the end, no more events left."); 00054 } 00055 else if ( m_inCode == DistBossCode::ServerTimeout) { 00056 if ( i < 6 ) { 00057 int sec = i; 00058 std::cout << "[NetDataReader] Event " << nn << " timeout. Sleep " << sec << "s before retry." << std::endl; 00059 sleep(sec); 00060 std::cout << "[NetDataReader] Event " << nn << " now retry time " << i << " ..." << std::endl; 00061 theCode = DistBossCode::RetryEvent; 00062 continue; 00063 } 00064 else { 00065 throw RawExMessage("[NetDataReader] Failed to retry server. Stop this client!"); 00066 } 00067 } 00068 else if ( m_inCode == DistBossCode::ServerError ) { 00069 throw RawExMessage("[NetDataReader] DistBossServer ERROR !!!"); 00070 } 00071 else { 00072 throw RawExMessage("[NetDataReader] Unknown server code !!!"); 00073 } 00074 } 00075 else { 00076 throw RawExMessage("[NetDataReader] Invalid data from server !!!"); 00077 } 00078 00079 break; 00080 } 00081 00082 return 0; 00083 }
uint32_t DimRpcReader::runNo | ( | ) |
Definition at line 90 of file DimRpcReader.cxx.
00091 { 00092 //FIXME: this is a place holder for runNo() 00093 //fill it in the future 00094 return 0xFFFFFFFF; 00095 }
uint32_t DimRpcReader::stat | ( | ) |
AutoEnlargeBuffer* DimRpcReader::m_buffer [private] |
Definition at line 36 of file DimRpcReader.h.
Referenced by currentEvent(), DimRpcReader(), and nextEvent().
int DimRpcReader::m_inCode [private] |
int DimRpcReader::m_outCode [private] |
DimRpcInfo* DimRpcReader::m_rpc [private] |
Definition at line 38 of file DimRpcReader.h.
Referenced by currentFile(), DimRpcReader(), nextEvent(), and ~DimRpcReader().
pthread_mutex_t DimRpcReader::m_rpcLock = PTHREAD_MUTEX_INITIALIZER [static, private] |