00001 #include "NetDataReader/DimRpcReader.h" 00002 #include "ClientErrHandler/ClientErrHandler.h" 00003 #include "DistBossUtil/AutoEnlargeBuffer.h" 00004 #include "DistBossUtil/DistBossCode.h" 00005 #include "IRawFile/RawFileExceptions.h" 00006 #include "dic.hxx" 00007 #include <iostream> 00008 00009 pthread_mutex_t DimRpcReader::m_rpcLock = PTHREAD_MUTEX_INITIALIZER; 00010 00011 DimRpcReader::DimRpcReader(ReaderArgType& name) 00012 { 00013 if ( name.empty() ) { 00014 throw RawExMessage("[NetDataReader] The name of DistBoss EvtServer was not set!"); 00015 } 00016 00017 ClientErrHandler::registerInstance(); 00018 00019 m_buffer = new AutoEnlargeBuffer(128*1024); 00020 00021 m_rpc = new DimRpcInfo(name.c_str(), 5, DistBossCode::ServerTimeout); 00022 } 00023 00024 DimRpcReader::~DimRpcReader() 00025 { 00026 delete m_rpc; 00027 } 00028 00029 const uint32_t* DimRpcReader::nextEvent() 00030 { 00031 static int nn = 0; 00032 ++nn; 00033 00034 int theCode = DistBossCode::GetEvent; 00035 00036 for ( int i = 1; i < 7; ++i ) { 00037 pthread_mutex_lock( &m_rpcLock ); 00038 00039 m_outCode = theCode; 00040 m_rpc->setData(m_outCode); 00041 int size = m_rpc->getSize(); 00042 void* data = m_rpc->getData(); 00043 m_buffer->copy(data, size); 00044 00045 pthread_mutex_unlock( &m_rpcLock ); 00046 00047 if ( size > 4 ) { 00048 return (const uint32_t*)m_buffer->data(); 00049 } 00050 else if ( size == 4 ) { 00051 m_inCode = *((const uint32_t*)m_buffer->data()); 00052 if ( m_inCode == DistBossCode::NoMoreEvents ) { 00053 throw RawExMessage("[NetDataReader] Reach the end, no more events left."); 00054 } 00055 else if ( m_inCode == DistBossCode::ServerTimeout) { 00056 if ( i < 6 ) { 00057 int sec = i; 00058 std::cout << "[NetDataReader] Event " << nn << " timeout. Sleep " << sec << "s before retry." << std::endl; 00059 sleep(sec); 00060 std::cout << "[NetDataReader] Event " << nn << " now retry time " << i << " ..." << std::endl; 00061 theCode = DistBossCode::RetryEvent; 00062 continue; 00063 } 00064 else { 00065 throw RawExMessage("[NetDataReader] Failed to retry server. Stop this client!"); 00066 } 00067 } 00068 else if ( m_inCode == DistBossCode::ServerError ) { 00069 throw RawExMessage("[NetDataReader] DistBossServer ERROR !!!"); 00070 } 00071 else { 00072 throw RawExMessage("[NetDataReader] Unknown server code !!!"); 00073 } 00074 } 00075 else { 00076 throw RawExMessage("[NetDataReader] Invalid data from server !!!"); 00077 } 00078 00079 break; 00080 } 00081 00082 return 0; 00083 } 00084 00085 const uint32_t* DimRpcReader::currentEvent() const 00086 { 00087 return (const uint32_t*)m_buffer->data(); 00088 } 00089 00090 uint32_t DimRpcReader::runNo() 00091 { 00092 //FIXME: this is a place holder for runNo() 00093 //fill it in the future 00094 return 0xFFFFFFFF; 00095 } 00096 00097 std::string DimRpcReader::currentFile() 00098 { 00099 pthread_mutex_lock( &m_rpcLock ); 00100 00101 m_outCode = DistBossCode::GetFileName; 00102 m_rpc->setData(m_outCode); 00103 // should check status code here, correct it in future 00104 std::string fname((char*)m_rpc->getData()); 00105 00106 pthread_mutex_unlock( &m_rpcLock ); 00107 00108 return fname; 00109 } 00110 00111 uint32_t DimRpcReader::stat() 00112 { 00113 return 0; 00114 }