DimRpcReader Class Reference

#include <DimRpcReader.h>

List of all members.

Public Types

typedef const std::string ReaderArgType

Public Member Functions

 DimRpcReader (ReaderArgType &name)
virtual ~DimRpcReader ()
const uint32_t * nextEvent ()
const uint32_t * currentEvent () const
uint32_t runNo ()
std::string currentFile ()
uint32_t stat ()

Private Member Functions

 DimRpcReader ()

Private Attributes

int m_inCode
int m_outCode
AutoEnlargeBufferm_buffer
DimRpcInfo * m_rpc

Static Private Attributes

static pthread_mutex_t m_rpcLock = PTHREAD_MUTEX_INITIALIZER


Detailed Description

Definition at line 11 of file DimRpcReader.h.


Member Typedef Documentation

typedef const std::string DimRpcReader::ReaderArgType

Definition at line 15 of file DimRpcReader.h.


Constructor & Destructor Documentation

DimRpcReader::DimRpcReader ( ReaderArgType name  ) 

Definition at line 11 of file DimRpcReader.cxx.

References m_buffer, m_rpc, ClientErrHandler::registerInstance(), and DistBossCode::ServerTimeout.

00012 {
00013    if ( name.empty() ) {
00014       throw RawExMessage("[NetDataReader] The name of DistBoss EvtServer was not set!");
00015    }
00016 
00017    ClientErrHandler::registerInstance();
00018 
00019    m_buffer = new AutoEnlargeBuffer(128*1024);
00020 
00021    m_rpc = new DimRpcInfo(name.c_str(), 5, DistBossCode::ServerTimeout);
00022 }

DimRpcReader::~DimRpcReader (  )  [virtual]

Definition at line 24 of file DimRpcReader.cxx.

References m_rpc.

00025 {
00026    delete m_rpc;
00027 }

DimRpcReader::DimRpcReader (  )  [private]


Member Function Documentation

const uint32_t * DimRpcReader::currentEvent (  )  const

Definition at line 85 of file DimRpcReader.cxx.

References AutoEnlargeBuffer::data(), and m_buffer.

00086 {
00087    return (const uint32_t*)m_buffer->data();
00088 }

std::string DimRpcReader::currentFile (  ) 

Definition at line 97 of file DimRpcReader.cxx.

References fname, DistBossCode::GetFileName, m_outCode, m_rpc, m_rpcLock, and deljobs::string.

00098 {
00099    pthread_mutex_lock( &m_rpcLock );
00100 
00101    m_outCode = DistBossCode::GetFileName;
00102    m_rpc->setData(m_outCode);
00103    // should check status code here, correct it in future
00104    std::string fname((char*)m_rpc->getData());
00105 
00106    pthread_mutex_unlock( &m_rpcLock );
00107 
00108    return fname;
00109 }

const uint32_t * DimRpcReader::nextEvent (  ) 

Definition at line 29 of file DimRpcReader.cxx.

References AutoEnlargeBuffer::copy(), AutoEnlargeBuffer::data(), DistBossCode::GetEvent, genRecEmupikp::i, m_buffer, m_inCode, m_outCode, m_rpc, m_rpcLock, DistBossCode::NoMoreEvents, DistBossCode::RetryEvent, DistBossCode::ServerError, DistBossCode::ServerTimeout, and delete_small_size::size.

00030 {
00031    static int nn = 0;
00032    ++nn;
00033 
00034    int theCode = DistBossCode::GetEvent;
00035 
00036    for ( int i = 1; i < 7; ++i ) {
00037       pthread_mutex_lock( &m_rpcLock );
00038 
00039       m_outCode = theCode;
00040       m_rpc->setData(m_outCode);
00041       int size = m_rpc->getSize();
00042       void* data = m_rpc->getData();
00043       m_buffer->copy(data, size);
00044 
00045       pthread_mutex_unlock( &m_rpcLock );
00046 
00047       if ( size > 4 ) {
00048          return (const uint32_t*)m_buffer->data();
00049       }
00050       else if ( size == 4 ) {
00051          m_inCode = *((const uint32_t*)m_buffer->data());
00052          if ( m_inCode == DistBossCode::NoMoreEvents ) {
00053             throw RawExMessage("[NetDataReader] Reach the end, no more events left.");
00054          }
00055          else if ( m_inCode == DistBossCode::ServerTimeout) {
00056             if ( i < 6 ) {
00057                int sec = i;
00058                std::cout << "[NetDataReader] Event " << nn << " timeout. Sleep " << sec << "s before retry." << std::endl;
00059                sleep(sec);
00060                std::cout << "[NetDataReader] Event " << nn << " now retry time " << i << " ..." << std::endl;
00061                theCode = DistBossCode::RetryEvent;
00062                continue;
00063             }
00064             else {
00065                throw RawExMessage("[NetDataReader] Failed to retry server. Stop this client!");
00066             }
00067          }
00068          else if ( m_inCode == DistBossCode::ServerError ) {
00069             throw RawExMessage("[NetDataReader] DistBossServer ERROR !!!");
00070          }
00071          else {
00072             throw RawExMessage("[NetDataReader] Unknown server code !!!");
00073          }
00074       }
00075       else {
00076          throw RawExMessage("[NetDataReader] Invalid data from server !!!");
00077       }
00078 
00079       break;
00080    }
00081 
00082    return 0;
00083 }

uint32_t DimRpcReader::runNo (  ) 

Definition at line 90 of file DimRpcReader.cxx.

00091 {
00092     //FIXME: this is a place holder for runNo()
00093     //fill it in the future
00094     return 0xFFFFFFFF;
00095 }

uint32_t DimRpcReader::stat (  ) 

Definition at line 111 of file DimRpcReader.cxx.

00112 {
00113    return 0;
00114 }


Member Data Documentation

AutoEnlargeBuffer* DimRpcReader::m_buffer [private]

Definition at line 36 of file DimRpcReader.h.

Referenced by currentEvent(), DimRpcReader(), and nextEvent().

int DimRpcReader::m_inCode [private]

Definition at line 34 of file DimRpcReader.h.

Referenced by nextEvent().

int DimRpcReader::m_outCode [private]

Definition at line 35 of file DimRpcReader.h.

Referenced by currentFile(), and nextEvent().

DimRpcInfo* DimRpcReader::m_rpc [private]

Definition at line 38 of file DimRpcReader.h.

Referenced by currentFile(), DimRpcReader(), nextEvent(), and ~DimRpcReader().

pthread_mutex_t DimRpcReader::m_rpcLock = PTHREAD_MUTEX_INITIALIZER [static, private]

Definition at line 40 of file DimRpcReader.h.

Referenced by currentFile(), and nextEvent().


Generated on Tue Nov 29 23:18:18 2016 for BOSS_7.0.2 by  doxygen 1.4.7