/home/bes3soft/bes3soft/Boss/7.0.2/dist/7.0.2/DistBoss/DistBossUtil/DistBossUtil-00-00-04/DistBossUtil/template/PthrWriterBufPool.cc

Go to the documentation of this file.
00001 /**********************************************************
00002   +-> 1      2      3      4      5      6  ...  PoolSize
00003   |               ahead current  next               |
00004   |_________________________________________________|
00005  
00006   0, a looped buffer pool
00007   1, we set semIn as (PoolSize-1), so to protect the
00008   current event buffer not be changed by writeEvent
00009   2, we can't update the current event buffer until we
00010   move to the next event(in thread_writing()).
00011   3, the ahead event buffer is ready to update when we
00012   at the beginning of current event.
00013 **********************************************************/
00014 
00015 #include <iostream>
00016 
00017 
00018 template <typename Writer, int PoolSize>
00019 PthrWriterBufPool<Writer, PoolSize>::PthrWriterBufPool(typename Writer::WriterArgType& arg)
00020    : m_inLoop(-1),
00021      m_outLoop(-1)
00022 {
00023    if ( PoolSize < 2 ) {
00024       std::cout << "[PthrWriterBufPool] The PoolSize of buffer must > 1" << std::endl;
00025       exit(1);
00026    }
00027 
00028    // use(PoolSize-1) to protect the current event buffer
00029    sem_init(&m_semIn,  0, (PoolSize-1));
00030    sem_init(&m_semOut, 0, 0);
00031    sem_init(&m_semFinalize, 0, 0);
00032 
00033    for ( int i = 0; i < PoolSize; ++i ) {
00034       // buffer in the loop for each event
00035       m_buf[i] = new AutoEnlargeBuffer(128*1024);
00036    }
00037 
00038    m_writer = new Writer(arg);
00039 
00040    // create an individual thread for writing
00041    pthread_create(&m_tid, NULL, thread_writing, (void*)this);
00042 }
00043 
00044 template <typename Writer, int PoolSize>
00045 PthrWriterBufPool<Writer, PoolSize>::~PthrWriterBufPool()
00046 {
00047    // wait for the end of thread_writing
00048    sem_wait(&m_semFinalize);
00049 
00050    for ( int i = 0; i < PoolSize; ++i ) {
00051       delete m_buf[i];
00052    }
00053 
00054    sem_destroy(&m_semIn);
00055    sem_destroy(&m_semOut);
00056    sem_destroy(&m_semFinalize);
00057 
00058    delete m_writer;
00059 }
00060 
00061 template <typename Writer, int PoolSize>
00062 int PthrWriterBufPool<Writer, PoolSize>::writeEvent(void* pevt, int size)
00063 {
00064    if ( m_writer->stat() != 0 ) return m_writer->stat();
00065 
00066    sem_wait(&m_semIn);
00067 
00068    int index = (++m_inLoop) % PoolSize;
00069 
00070    m_buf[index]->copy( pevt, size );
00071 
00072    sem_post(&m_semOut);
00073 
00074    return 0;
00075 }
00076 
00077 template <typename Writer, int PoolSize>
00078 int PthrWriterBufPool<Writer, PoolSize>::stat()
00079 {
00080    return m_writer->stat();
00081 }
00082 
00083 template <typename Writer, int PoolSize>
00084 void* PthrWriterBufPool<Writer, PoolSize>::thread_writing(void* arg)
00085 {
00086    PthrWriterBufPool<Writer, PoolSize>* pthis = (PthrWriterBufPool<Writer, PoolSize>*)arg;
00087 
00088    uint32_t        index;
00089    AutoEnlargeBuffer* pbuf;
00090 
00091 
00092    while ( true ) {
00093       // the ahead event buffer is ready for update now
00094       sem_post(&(pthis->m_semIn));
00095       // waiting for an event that ready to use
00096       sem_wait(&(pthis->m_semOut));
00097 
00098       index = (++(pthis->m_outLoop)) % PoolSize;
00099 
00100       pbuf = pthis->m_buf[index];
00101 
00102       try {
00103          pthis->m_writer->writeEvent( pbuf->data(), pbuf->size() );
00104       }
00105       catch (ReachEndOfFileList& e) {
00106          //std::cout << "[PthrWriterBufPool] Finalized Successfully!" << std::endl;
00107          break;
00108       }
00109       catch (RawExMessage& e) {
00110          e.print();
00111          break;
00112       }
00113       catch ( ... ) {
00114          std::cout << "[PthrWriterBufPool] Catch unexpected exception !" << std::endl;
00115          exit(1);
00116       }
00117    }
00118 
00119 
00120    pthis->m_writer->close();
00121 
00122    sem_post(&(pthis->m_semFinalize));
00123 
00124    pthread_exit(NULL);
00125 }

Generated on Tue Nov 29 22:58:02 2016 for BOSS_7.0.2 by  doxygen 1.4.7