00001 #include "NetDataWriter/DimRpcWriter.h" 00002 #include "ClientErrHandler/ClientErrHandler.h" 00003 #include "DistBossUtil/DistBossCode.h" 00004 #include "IRawFile/RawFileExceptions.h" 00005 #include "dic.hxx" 00006 #include <iostream> 00007 00008 pthread_mutex_t DimRpcWriter::m_rpcLock = PTHREAD_MUTEX_INITIALIZER; 00009 00010 DimRpcWriter::DimRpcWriter(WriterArgType& name) 00011 : m_inCode(0) 00012 { 00013 if ( name.empty() ) { 00014 throw RawExMessage("[NetDataWriter] The name of DistBoss Server was not set!"); 00015 } 00016 00017 ClientErrHandler::registerInstance(); 00018 00019 m_rpc = new DimRpcInfo(name.c_str(), 5, DistBossCode::ServerTimeout); 00020 } 00021 00022 DimRpcWriter::~DimRpcWriter() 00023 { 00024 delete m_rpc; 00025 } 00026 00027 int DimRpcWriter::writeEvent(void *pevt, int size) 00028 { 00029 static int nn = 0; 00030 ++nn; 00031 00032 for ( int i = 1; i < 7; ++i ) { 00033 pthread_mutex_lock( &m_rpcLock ); 00034 00035 m_rpc->setData(pevt, size); 00036 int inCode = m_rpc->getInt(); 00037 00038 pthread_mutex_unlock( &m_rpcLock ); 00039 00040 00041 if ( inCode != DistBossCode::StatusSuccess) { 00042 if ( inCode == DistBossCode::ServerTimeout) { 00043 if ( i < 6 ) { 00044 int sec = i; 00045 std::cout << "[NetDataWriter] Event " << nn << " timeout. Sleep " << sec << "s before retry." << std::endl; 00046 sleep(sec); 00047 std::cout << "[NetDataWriter] Event " << nn << " now retry time " << i << " ..." << std::endl; 00048 continue; 00049 } 00050 else { 00051 m_inCode = inCode; 00052 throw RawExMessage("[NetDataWriter] Failed to retry server. Stop this client!"); 00053 } 00054 } 00055 else if ( inCode == DistBossCode::ServerError) { 00056 m_inCode = inCode; 00057 throw RawExMessage("[NetDataWriter] Received server ERROR code!"); 00058 } 00059 else { 00060 m_inCode = inCode; 00061 throw RawExMessage("[NetDataWriter] Unknown server code!"); 00062 } 00063 } 00064 00065 if ( size == 4 && *((int*)pevt) == DistBossCode::StatusFinalize) { 00066 throw ReachEndOfFileList(); 00067 } 00068 00069 break; 00070 } 00071 00072 return 0; 00073 } 00074 00075 int DimRpcWriter::close() 00076 { 00077 return 0; 00078 } 00079 00080 int DimRpcWriter::stat() 00081 { 00082 return m_inCode; 00083 }