/home/bes3soft/bes3soft/Boss/7.0.2/dist/7.0.2/DistBoss/DistBossUtil/DistBossUtil-00-00-04/DistBossUtil/template/PthrReaderBufPool.cc

Go to the documentation of this file.
00001 /**********************************************************
00002   +-> 1      2      3      4      5      6  ...  PoolSize
00003   |               ahead current  next               |
00004   |_________________________________________________|
00005  
00006   0, a looped buffer pool
00007   1, we set semIn as (PoolSize-1), so to protect the
00008   current event buffer not be changed by thread_filling
00009   2, we can't update the current event buffer until we
00010   move to the next event(call nextEvent()).
00011   3, the ahead event buffer is ready to update when we
00012   at the beginning of current event.
00013 **********************************************************/
00014 
00015 #include <iostream>
00016 
00017 
00018 template <typename Reader, int PoolSize>
00019 PthrReaderBufPool<Reader, PoolSize>::PthrReaderBufPool(typename Reader::ReaderArgType& arg)
00020    : m_inLoop(-1),
00021      m_outLoop(-1),
00022      m_RawFileException(0)
00023 {
00024    if ( PoolSize < 2 ) {
00025       std::cout << "[PthrReaderBufPool] The PoolSize of buffer must > 1" << std::endl;
00026       exit(1);
00027    }
00028 
00029    // use(PoolSize-1) to protect the current event buffer
00030    sem_init(&m_semIn,  0, (PoolSize-1));
00031    sem_init(&m_semOut, 0, 0);
00032    pthread_mutex_init(&m_lock, NULL);
00033 
00034    sem_init(&m_semSyn, 0, 0);
00035 
00036    for ( int i = 0; i < PoolSize; ++i ) {
00037       // buffer in the loop for each event
00038       m_buf[i] = new AutoEnlargeBuffer(128*1024);
00039    }
00040 
00041    m_reader = new Reader(arg);
00042 
00043    // create an individual thread to fill to buffer
00044    pthread_create(&m_tid, NULL, thread_filling, (void*)this);
00045 }
00046 
00047 template <typename Reader, int PoolSize>
00048 PthrReaderBufPool<Reader, PoolSize>::~PthrReaderBufPool()
00049 {
00050    for ( int i = 0; i < PoolSize; ++i ) {
00051       delete m_buf[i];
00052    }
00053 
00054    delete m_RawFileException;
00055 
00056    sem_destroy(&m_semIn);
00057    sem_destroy(&m_semOut);
00058 
00059    delete m_reader;
00060 }
00061 
00062 template <typename Reader, int PoolSize>
00063 const uint32_t* PthrReaderBufPool<Reader, PoolSize>::nextEvent()
00064 {
00065    if ( m_outLoop < 0 ) sem_post(&m_semSyn);
00066 
00067    // the ahead event buffer is ready for update now
00068    sem_post(&m_semIn);
00069    // waiting for a ready to use event
00070    sem_wait(&m_semOut);
00071 
00072    pthread_mutex_lock(&m_lock);
00073    int inLoop = m_inLoop;
00074    pthread_mutex_unlock(&m_lock);
00075 
00076    if ( m_RawFileException != 0 && m_outLoop >= inLoop ) {
00077       if ( dynamic_cast<RawExMessage*>(m_RawFileException) ) {
00078          throw RawExMessage(*(RawExMessage*)m_RawFileException);
00079       }
00080       if ( dynamic_cast<ReachEndOfFileList*>(m_RawFileException) ) {
00081          throw ReachEndOfFileList(*(ReachEndOfFileList*)m_RawFileException);
00082       }
00083    }
00084 
00085    ++m_outLoop;
00086 
00087    const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data();
00088    return pevt;
00089 }
00090 
00091 template <typename Reader, int PoolSize>
00092 const uint32_t* PthrReaderBufPool<Reader, PoolSize>::currentEvent() const
00093 {
00094    //can't be called before any nextEvent() call
00095    const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data();
00096    return pevt;
00097 }
00098 
00099 template <typename Reader, int PoolSize>
00100 uint32_t PthrReaderBufPool<Reader, PoolSize>::runNo()
00101 {
00102    return m_reader->runNo();
00103 }
00104 
00105 template <typename Reader, int PoolSize>
00106 std::string PthrReaderBufPool<Reader, PoolSize>::currentFile()
00107 {
00108    return m_reader->currentFile();
00109 }
00110 
00111 template <typename Reader, int PoolSize>
00112 uint32_t PthrReaderBufPool<Reader, PoolSize>::stat()
00113 {
00114    return m_reader->stat();
00115 }
00116 
00117 template <typename Reader, int PoolSize>
00118 void* PthrReaderBufPool<Reader, PoolSize>::thread_filling(void* arg)
00119 {
00120    PthrReaderBufPool<Reader, PoolSize>* pthis = (PthrReaderBufPool<Reader, PoolSize>*)arg;
00121 
00122    uint32_t        index;
00123    const uint32_t* pevt;
00124 
00125    sem_wait(&(pthis->m_semSyn));
00126 
00127    while ( true ) {
00128       // waiting for a buffer that ready to update
00129       sem_wait(&(pthis->m_semIn));
00130 
00131       try {
00132          pevt = pthis->m_reader->nextEvent();
00133       }
00134       catch (RawExMessage& e) {
00135          pthis->m_RawFileException = new RawExMessage(e);
00136          break;
00137       }
00138       catch (ReachEndOfFileList& e) {
00139          pthis->m_RawFileException = new ReachEndOfFileList(e);
00140          break;
00141       }
00142       catch ( ... ) {
00143          std::cout << "[PthrReaderBufPool] Catch unexpected exception !" << std::endl;
00144          exit(1);
00145       }
00146 
00147       pthread_mutex_lock(&(pthis->m_lock));
00148       index = (++(pthis->m_inLoop)) % PoolSize;
00149       pthread_mutex_unlock(&(pthis->m_lock));
00150 
00151       pthis->m_buf[index]->copy( (void*)pevt, pevt[1]*sizeof(uint32_t) );
00152 
00153       // post an event that ready to use
00154       sem_post(&(pthis->m_semOut));
00155    }
00156 
00157    sem_post(&(pthis->m_semOut));
00158 
00159    pthread_exit(NULL);
00160 }

Generated on Tue Nov 29 22:58:02 2016 for BOSS_7.0.2 by  doxygen 1.4.7