/home/bes3soft/bes3soft/Boss/7.0.2/dist/7.0.2/DistBoss/NetDataReader/NetDataReader-00-00-06/src/DimRpcReader.cxx

Go to the documentation of this file.
00001 #include "NetDataReader/DimRpcReader.h"
00002 #include "ClientErrHandler/ClientErrHandler.h"
00003 #include "DistBossUtil/AutoEnlargeBuffer.h"
00004 #include "DistBossUtil/DistBossCode.h"
00005 #include "IRawFile/RawFileExceptions.h"
00006 #include "dic.hxx"
00007 #include <iostream>
00008 
00009 pthread_mutex_t DimRpcReader::m_rpcLock = PTHREAD_MUTEX_INITIALIZER;
00010 
00011 DimRpcReader::DimRpcReader(ReaderArgType& name)
00012 {
00013    if ( name.empty() ) {
00014       throw RawExMessage("[NetDataReader] The name of DistBoss EvtServer was not set!");
00015    }
00016 
00017    ClientErrHandler::registerInstance();
00018 
00019    m_buffer = new AutoEnlargeBuffer(128*1024);
00020 
00021    m_rpc = new DimRpcInfo(name.c_str(), 5, DistBossCode::ServerTimeout);
00022 }
00023 
00024 DimRpcReader::~DimRpcReader()
00025 {
00026    delete m_rpc;
00027 }
00028 
00029 const uint32_t* DimRpcReader::nextEvent()
00030 {
00031    static int nn = 0;
00032    ++nn;
00033 
00034    int theCode = DistBossCode::GetEvent;
00035 
00036    for ( int i = 1; i < 7; ++i ) {
00037       pthread_mutex_lock( &m_rpcLock );
00038 
00039       m_outCode = theCode;
00040       m_rpc->setData(m_outCode);
00041       int size = m_rpc->getSize();
00042       void* data = m_rpc->getData();
00043       m_buffer->copy(data, size);
00044 
00045       pthread_mutex_unlock( &m_rpcLock );
00046 
00047       if ( size > 4 ) {
00048          return (const uint32_t*)m_buffer->data();
00049       }
00050       else if ( size == 4 ) {
00051          m_inCode = *((const uint32_t*)m_buffer->data());
00052          if ( m_inCode == DistBossCode::NoMoreEvents ) {
00053             throw RawExMessage("[NetDataReader] Reach the end, no more events left.");
00054          }
00055          else if ( m_inCode == DistBossCode::ServerTimeout) {
00056             if ( i < 6 ) {
00057                int sec = i;
00058                std::cout << "[NetDataReader] Event " << nn << " timeout. Sleep " << sec << "s before retry." << std::endl;
00059                sleep(sec);
00060                std::cout << "[NetDataReader] Event " << nn << " now retry time " << i << " ..." << std::endl;
00061                theCode = DistBossCode::RetryEvent;
00062                continue;
00063             }
00064             else {
00065                throw RawExMessage("[NetDataReader] Failed to retry server. Stop this client!");
00066             }
00067          }
00068          else if ( m_inCode == DistBossCode::ServerError ) {
00069             throw RawExMessage("[NetDataReader] DistBossServer ERROR !!!");
00070          }
00071          else {
00072             throw RawExMessage("[NetDataReader] Unknown server code !!!");
00073          }
00074       }
00075       else {
00076          throw RawExMessage("[NetDataReader] Invalid data from server !!!");
00077       }
00078 
00079       break;
00080    }
00081 
00082    return 0;
00083 }
00084 
00085 const uint32_t* DimRpcReader::currentEvent() const
00086 {
00087    return (const uint32_t*)m_buffer->data();
00088 }
00089 
00090 uint32_t DimRpcReader::runNo()
00091 {
00092     //FIXME: this is a place holder for runNo()
00093     //fill it in the future
00094     return 0xFFFFFFFF;
00095 }
00096 
00097 std::string DimRpcReader::currentFile()
00098 {
00099    pthread_mutex_lock( &m_rpcLock );
00100 
00101    m_outCode = DistBossCode::GetFileName;
00102    m_rpc->setData(m_outCode);
00103    // should check status code here, correct it in future
00104    std::string fname((char*)m_rpc->getData());
00105 
00106    pthread_mutex_unlock( &m_rpcLock );
00107 
00108    return fname;
00109 }
00110 
00111 uint32_t DimRpcReader::stat()
00112 {
00113    return 0;
00114 }

Generated on Tue Nov 29 22:58:02 2016 for BOSS_7.0.2 by  doxygen 1.4.7