#include <PthrReaderBufPool.h>
Inheritance diagram for PthrReaderBufPool< Reader, PoolSize >:
Public Member Functions | |
PthrReaderBufPool (typename Reader::ReaderArgType &arg) | |
virtual | ~PthrReaderBufPool () |
const uint32_t * | nextEvent () |
const uint32_t * | currentEvent () const |
uint32_t | runNo () |
std::string | currentFile () |
uint32_t | stat () |
Private Member Functions | |
PthrReaderBufPool () | |
Static Private Member Functions | |
static void * | thread_filling (void *arg) |
Private Attributes | |
int | m_inLoop |
int | m_outLoop |
AutoEnlargeBuffer * | m_buf [PoolSize] |
Reader * | m_reader |
pthread_t | m_tid |
sem_t | m_semIn |
sem_t | m_semOut |
pthread_mutex_t | m_lock |
sem_t | m_semSyn |
RawFileException * | m_RawFileException |
Definition at line 13 of file PthrReaderBufPool.h.
PthrReaderBufPool< Reader, PoolSize >::PthrReaderBufPool | ( | typename Reader::ReaderArgType & | arg | ) |
Definition at line 19 of file PthrReaderBufPool.cc.
References genRecEmupikp::i, PthrReaderBufPool< Reader, PoolSize >::m_buf, PthrReaderBufPool< Reader, PoolSize >::m_lock, PthrReaderBufPool< Reader, PoolSize >::m_reader, PthrReaderBufPool< Reader, PoolSize >::m_semIn, PthrReaderBufPool< Reader, PoolSize >::m_semOut, PthrReaderBufPool< Reader, PoolSize >::m_semSyn, PthrReaderBufPool< Reader, PoolSize >::m_tid, and PthrReaderBufPool< Reader, PoolSize >::thread_filling().
00020 : m_inLoop(-1), 00021 m_outLoop(-1), 00022 m_RawFileException(0) 00023 { 00024 if ( PoolSize < 2 ) { 00025 std::cout << "[PthrReaderBufPool] The PoolSize of buffer must > 1" << std::endl; 00026 exit(1); 00027 } 00028 00029 // use(PoolSize-1) to protect the current event buffer 00030 sem_init(&m_semIn, 0, (PoolSize-1)); 00031 sem_init(&m_semOut, 0, 0); 00032 pthread_mutex_init(&m_lock, NULL); 00033 00034 sem_init(&m_semSyn, 0, 0); 00035 00036 for ( int i = 0; i < PoolSize; ++i ) { 00037 // buffer in the loop for each event 00038 m_buf[i] = new AutoEnlargeBuffer(128*1024); 00039 } 00040 00041 m_reader = new Reader(arg); 00042 00043 // create an individual thread to fill to buffer 00044 pthread_create(&m_tid, NULL, thread_filling, (void*)this); 00045 }
PthrReaderBufPool< Reader, PoolSize >::~PthrReaderBufPool | ( | ) | [virtual] |
Definition at line 48 of file PthrReaderBufPool.cc.
References genRecEmupikp::i, PthrReaderBufPool< Reader, PoolSize >::m_buf, PthrReaderBufPool< Reader, PoolSize >::m_RawFileException, PthrReaderBufPool< Reader, PoolSize >::m_reader, PthrReaderBufPool< Reader, PoolSize >::m_semIn, and PthrReaderBufPool< Reader, PoolSize >::m_semOut.
00049 { 00050 for ( int i = 0; i < PoolSize; ++i ) { 00051 delete m_buf[i]; 00052 } 00053 00054 delete m_RawFileException; 00055 00056 sem_destroy(&m_semIn); 00057 sem_destroy(&m_semOut); 00058 00059 delete m_reader; 00060 }
PthrReaderBufPool< Reader, PoolSize >::PthrReaderBufPool | ( | ) | [private] |
const uint32_t * PthrReaderBufPool< Reader, PoolSize >::currentEvent | ( | ) | const [inline, virtual] |
Implements IRawReader.
Definition at line 92 of file PthrReaderBufPool.cc.
References PthrReaderBufPool< Reader, PoolSize >::m_buf, and PthrReaderBufPool< Reader, PoolSize >::m_outLoop.
00093 { 00094 //can't be called before any nextEvent() call 00095 const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data(); 00096 return pevt; 00097 }
std::string PthrReaderBufPool< Reader, PoolSize >::currentFile | ( | ) | [inline, virtual] |
Implements IRawReader.
Definition at line 106 of file PthrReaderBufPool.cc.
References PthrReaderBufPool< Reader, PoolSize >::m_reader.
Referenced by main(), and ReaderRpc< Reader >::rpcHandler().
00107 { 00108 return m_reader->currentFile(); 00109 }
const uint32_t * PthrReaderBufPool< Reader, PoolSize >::nextEvent | ( | ) | [inline, virtual] |
Implements IRawReader.
Definition at line 63 of file PthrReaderBufPool.cc.
References PthrReaderBufPool< Reader, PoolSize >::m_buf, PthrReaderBufPool< Reader, PoolSize >::m_inLoop, PthrReaderBufPool< Reader, PoolSize >::m_lock, PthrReaderBufPool< Reader, PoolSize >::m_outLoop, PthrReaderBufPool< Reader, PoolSize >::m_RawFileException, PthrReaderBufPool< Reader, PoolSize >::m_semIn, PthrReaderBufPool< Reader, PoolSize >::m_semOut, and PthrReaderBufPool< Reader, PoolSize >::m_semSyn.
Referenced by main(), and ReaderRpc< Reader >::rpcHandler().
00064 { 00065 if ( m_outLoop < 0 ) sem_post(&m_semSyn); 00066 00067 // the ahead event buffer is ready for update now 00068 sem_post(&m_semIn); 00069 // waiting for a ready to use event 00070 sem_wait(&m_semOut); 00071 00072 pthread_mutex_lock(&m_lock); 00073 int inLoop = m_inLoop; 00074 pthread_mutex_unlock(&m_lock); 00075 00076 if ( m_RawFileException != 0 && m_outLoop >= inLoop ) { 00077 if ( dynamic_cast<RawExMessage*>(m_RawFileException) ) { 00078 throw RawExMessage(*(RawExMessage*)m_RawFileException); 00079 } 00080 if ( dynamic_cast<ReachEndOfFileList*>(m_RawFileException) ) { 00081 throw ReachEndOfFileList(*(ReachEndOfFileList*)m_RawFileException); 00082 } 00083 } 00084 00085 ++m_outLoop; 00086 00087 const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data(); 00088 return pevt; 00089 }
uint32_t PthrReaderBufPool< Reader, PoolSize >::runNo | ( | ) | [inline, virtual] |
Implements IRawReader.
Definition at line 100 of file PthrReaderBufPool.cc.
References PthrReaderBufPool< Reader, PoolSize >::m_reader.
00101 { 00102 return m_reader->runNo(); 00103 }
uint32_t PthrReaderBufPool< Reader, PoolSize >::stat | ( | ) | [inline, virtual] |
Implements IRawReader.
Definition at line 112 of file PthrReaderBufPool.cc.
References PthrReaderBufPool< Reader, PoolSize >::m_reader.
00113 { 00114 return m_reader->stat(); 00115 }
void * PthrReaderBufPool< Reader, PoolSize >::thread_filling | ( | void * | arg | ) | [static, private] |
Definition at line 118 of file PthrReaderBufPool.cc.
References AutoEnlargeBuffer::copy(), PthrReaderBufPool< Reader, PoolSize >::m_buf, PthrReaderBufPool< Reader, PoolSize >::m_inLoop, PthrReaderBufPool< Reader, PoolSize >::m_lock, PthrReaderBufPool< Reader, PoolSize >::m_RawFileException, PthrReaderBufPool< Reader, PoolSize >::m_reader, PthrReaderBufPool< Reader, PoolSize >::m_semIn, PthrReaderBufPool< Reader, PoolSize >::m_semOut, and PthrReaderBufPool< Reader, PoolSize >::m_semSyn.
Referenced by PthrReaderBufPool< Reader, PoolSize >::PthrReaderBufPool().
00119 { 00120 PthrReaderBufPool<Reader, PoolSize>* pthis = (PthrReaderBufPool<Reader, PoolSize>*)arg; 00121 00122 uint32_t index; 00123 const uint32_t* pevt; 00124 00125 sem_wait(&(pthis->m_semSyn)); 00126 00127 while ( true ) { 00128 // waiting for a buffer that ready to update 00129 sem_wait(&(pthis->m_semIn)); 00130 00131 try { 00132 pevt = pthis->m_reader->nextEvent(); 00133 } 00134 catch (RawExMessage& e) { 00135 pthis->m_RawFileException = new RawExMessage(e); 00136 break; 00137 } 00138 catch (ReachEndOfFileList& e) { 00139 pthis->m_RawFileException = new ReachEndOfFileList(e); 00140 break; 00141 } 00142 catch ( ... ) { 00143 std::cout << "[PthrReaderBufPool] Catch unexpected exception !" << std::endl; 00144 exit(1); 00145 } 00146 00147 pthread_mutex_lock(&(pthis->m_lock)); 00148 index = (++(pthis->m_inLoop)) % PoolSize; 00149 pthread_mutex_unlock(&(pthis->m_lock)); 00150 00151 pthis->m_buf[index]->copy( (void*)pevt, pevt[1]*sizeof(uint32_t) ); 00152 00153 // post an event that ready to use 00154 sem_post(&(pthis->m_semOut)); 00155 } 00156 00157 sem_post(&(pthis->m_semOut)); 00158 00159 pthread_exit(NULL); 00160 }
AutoEnlargeBuffer* PthrReaderBufPool< Reader, PoolSize >::m_buf[PoolSize] [private] |
Definition at line 38 of file PthrReaderBufPool.h.
Referenced by PthrReaderBufPool< Reader, PoolSize >::currentEvent(), PthrReaderBufPool< Reader, PoolSize >::nextEvent(), PthrReaderBufPool< Reader, PoolSize >::PthrReaderBufPool(), PthrReaderBufPool< Reader, PoolSize >::thread_filling(), and PthrReaderBufPool< Reader, PoolSize >::~PthrReaderBufPool().
int PthrReaderBufPool< Reader, PoolSize >::m_inLoop [private] |
Definition at line 36 of file PthrReaderBufPool.h.
Referenced by PthrReaderBufPool< Reader, PoolSize >::nextEvent(), and PthrReaderBufPool< Reader, PoolSize >::thread_filling().
pthread_mutex_t PthrReaderBufPool< Reader, PoolSize >::m_lock [private] |
Definition at line 45 of file PthrReaderBufPool.h.
Referenced by PthrReaderBufPool< Reader, PoolSize >::nextEvent(), PthrReaderBufPool< Reader, PoolSize >::PthrReaderBufPool(), and PthrReaderBufPool< Reader, PoolSize >::thread_filling().
int PthrReaderBufPool< Reader, PoolSize >::m_outLoop [private] |
Definition at line 37 of file PthrReaderBufPool.h.
Referenced by PthrReaderBufPool< Reader, PoolSize >::currentEvent(), and PthrReaderBufPool< Reader, PoolSize >::nextEvent().
RawFileException* PthrReaderBufPool< Reader, PoolSize >::m_RawFileException [private] |
Definition at line 50 of file PthrReaderBufPool.h.
Referenced by PthrReaderBufPool< Reader, PoolSize >::nextEvent(), PthrReaderBufPool< Reader, PoolSize >::thread_filling(), and PthrReaderBufPool< Reader, PoolSize >::~PthrReaderBufPool().
Reader* PthrReaderBufPool< Reader, PoolSize >::m_reader [private] |
Definition at line 40 of file PthrReaderBufPool.h.
Referenced by PthrReaderBufPool< Reader, PoolSize >::currentFile(), PthrReaderBufPool< Reader, PoolSize >::PthrReaderBufPool(), PthrReaderBufPool< Reader, PoolSize >::runNo(), PthrReaderBufPool< Reader, PoolSize >::stat(), PthrReaderBufPool< Reader, PoolSize >::thread_filling(), and PthrReaderBufPool< Reader, PoolSize >::~PthrReaderBufPool().
sem_t PthrReaderBufPool< Reader, PoolSize >::m_semIn [private] |
sem_t PthrReaderBufPool< Reader, PoolSize >::m_semOut [private] |
sem_t PthrReaderBufPool< Reader, PoolSize >::m_semSyn [private] |
Definition at line 47 of file PthrReaderBufPool.h.
Referenced by PthrReaderBufPool< Reader, PoolSize >::nextEvent(), PthrReaderBufPool< Reader, PoolSize >::PthrReaderBufPool(), and PthrReaderBufPool< Reader, PoolSize >::thread_filling().
pthread_t PthrReaderBufPool< Reader, PoolSize >::m_tid [private] |
Definition at line 42 of file PthrReaderBufPool.h.
Referenced by PthrReaderBufPool< Reader, PoolSize >::PthrReaderBufPool().