00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include <iostream>
00016
00017
00018 template <typename Reader, int PoolSize>
00019 PthrReaderBufPool<Reader, PoolSize>::PthrReaderBufPool(typename Reader::ReaderArgType& arg)
00020 : m_inLoop(-1),
00021 m_outLoop(-1),
00022 m_RawFileException(0)
00023 {
00024 if ( PoolSize < 2 ) {
00025 std::cout << "[PthrReaderBufPool] The PoolSize of buffer must > 1" << std::endl;
00026 exit(1);
00027 }
00028
00029
00030 sem_init(&m_semIn, 0, (PoolSize-1));
00031 sem_init(&m_semOut, 0, 0);
00032 pthread_mutex_init(&m_lock, NULL);
00033
00034 sem_init(&m_semSyn, 0, 0);
00035
00036 for ( int i = 0; i < PoolSize; ++i ) {
00037
00038 m_buf[i] = new AutoEnlargeBuffer(128*1024);
00039 }
00040
00041 m_reader = new Reader(arg);
00042
00043
00044 pthread_create(&m_tid, NULL, thread_filling, (void*)this);
00045 }
00046
00047 template <typename Reader, int PoolSize>
00048 PthrReaderBufPool<Reader, PoolSize>::~PthrReaderBufPool()
00049 {
00050 for ( int i = 0; i < PoolSize; ++i ) {
00051 delete m_buf[i];
00052 }
00053
00054 delete m_RawFileException;
00055
00056 sem_destroy(&m_semIn);
00057 sem_destroy(&m_semOut);
00058
00059 delete m_reader;
00060 }
00061
00062 template <typename Reader, int PoolSize>
00063 const uint32_t* PthrReaderBufPool<Reader, PoolSize>::nextEvent()
00064 {
00065 if ( m_outLoop < 0 ) sem_post(&m_semSyn);
00066
00067
00068 sem_post(&m_semIn);
00069
00070 sem_wait(&m_semOut);
00071
00072 pthread_mutex_lock(&m_lock);
00073 int inLoop = m_inLoop;
00074 pthread_mutex_unlock(&m_lock);
00075
00076 if ( m_RawFileException != 0 && m_outLoop >= inLoop ) {
00077 if ( dynamic_cast<RawExMessage*>(m_RawFileException) ) {
00078 throw RawExMessage(*(RawExMessage*)m_RawFileException);
00079 }
00080 if ( dynamic_cast<ReachEndOfFileList*>(m_RawFileException) ) {
00081 throw ReachEndOfFileList(*(ReachEndOfFileList*)m_RawFileException);
00082 }
00083 }
00084
00085 ++m_outLoop;
00086
00087 const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data();
00088 return pevt;
00089 }
00090
00091 template <typename Reader, int PoolSize>
00092 const uint32_t* PthrReaderBufPool<Reader, PoolSize>::currentEvent() const
00093 {
00094
00095 const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data();
00096 return pevt;
00097 }
00098
00099 template <typename Reader, int PoolSize>
00100 uint32_t PthrReaderBufPool<Reader, PoolSize>::runNo()
00101 {
00102 return m_reader->runNo();
00103 }
00104
00105 template <typename Reader, int PoolSize>
00106 std::string PthrReaderBufPool<Reader, PoolSize>::currentFile()
00107 {
00108 return m_reader->currentFile();
00109 }
00110
00111 template <typename Reader, int PoolSize>
00112 uint32_t PthrReaderBufPool<Reader, PoolSize>::stat()
00113 {
00114 return m_reader->stat();
00115 }
00116
00117 template <typename Reader, int PoolSize>
00118 void* PthrReaderBufPool<Reader, PoolSize>::thread_filling(void* arg)
00119 {
00120 PthrReaderBufPool<Reader, PoolSize>* pthis = (PthrReaderBufPool<Reader, PoolSize>*)arg;
00121
00122 uint32_t index;
00123 const uint32_t* pevt;
00124
00125 sem_wait(&(pthis->m_semSyn));
00126
00127 while ( true ) {
00128
00129 sem_wait(&(pthis->m_semIn));
00130
00131 try {
00132 pevt = pthis->m_reader->nextEvent();
00133 }
00134 catch (RawExMessage& e) {
00135 pthis->m_RawFileException = new RawExMessage(e);
00136 break;
00137 }
00138 catch (ReachEndOfFileList& e) {
00139 pthis->m_RawFileException = new ReachEndOfFileList(e);
00140 break;
00141 }
00142 catch ( ... ) {
00143 std::cout << "[PthrReaderBufPool] Catch unexpected exception !" << std::endl;
00144 exit(1);
00145 }
00146
00147 pthread_mutex_lock(&(pthis->m_lock));
00148 index = (++(pthis->m_inLoop)) % PoolSize;
00149 pthread_mutex_unlock(&(pthis->m_lock));
00150
00151 pthis->m_buf[index]->copy( (void*)pevt, pevt[1]*sizeof(uint32_t) );
00152
00153
00154 sem_post(&(pthis->m_semOut));
00155 }
00156
00157 sem_post(&(pthis->m_semOut));
00158
00159 pthread_exit(NULL);
00160 }