#include <PthrWriterBufPool.h>
Inheritance diagram for PthrWriterBufPool< Writer, PoolSize >:
Public Member Functions | |
PthrWriterBufPool (typename Writer::WriterArgType &arg) | |
virtual | ~PthrWriterBufPool () |
int | writeEvent (void *pevt, int size) |
int | stat () |
Private Member Functions | |
PthrWriterBufPool () | |
Static Private Member Functions | |
static void * | thread_writing (void *arg) |
Private Attributes | |
int | m_inLoop |
int | m_outLoop |
AutoEnlargeBuffer * | m_buf [PoolSize] |
Writer * | m_writer |
pthread_t | m_tid |
sem_t | m_semIn |
sem_t | m_semOut |
sem_t | m_semFinalize |
Definition at line 11 of file PthrWriterBufPool.h.
PthrWriterBufPool< Writer, PoolSize >::PthrWriterBufPool | ( | typename Writer::WriterArgType & | arg | ) |
Definition at line 19 of file PthrWriterBufPool.cc.
References genRecEmupikp::i, PthrWriterBufPool< Writer, PoolSize >::m_buf, PthrWriterBufPool< Writer, PoolSize >::m_semFinalize, PthrWriterBufPool< Writer, PoolSize >::m_semIn, PthrWriterBufPool< Writer, PoolSize >::m_semOut, PthrWriterBufPool< Writer, PoolSize >::m_tid, PthrWriterBufPool< Writer, PoolSize >::m_writer, and PthrWriterBufPool< Writer, PoolSize >::thread_writing().
00020 : m_inLoop(-1), 00021 m_outLoop(-1) 00022 { 00023 if ( PoolSize < 2 ) { 00024 std::cout << "[PthrWriterBufPool] The PoolSize of buffer must > 1" << std::endl; 00025 exit(1); 00026 } 00027 00028 // use(PoolSize-1) to protect the current event buffer 00029 sem_init(&m_semIn, 0, (PoolSize-1)); 00030 sem_init(&m_semOut, 0, 0); 00031 sem_init(&m_semFinalize, 0, 0); 00032 00033 for ( int i = 0; i < PoolSize; ++i ) { 00034 // buffer in the loop for each event 00035 m_buf[i] = new AutoEnlargeBuffer(128*1024); 00036 } 00037 00038 m_writer = new Writer(arg); 00039 00040 // create an individual thread for writing 00041 pthread_create(&m_tid, NULL, thread_writing, (void*)this); 00042 }
PthrWriterBufPool< Writer, PoolSize >::~PthrWriterBufPool | ( | ) | [virtual] |
Definition at line 45 of file PthrWriterBufPool.cc.
References genRecEmupikp::i, PthrWriterBufPool< Writer, PoolSize >::m_buf, PthrWriterBufPool< Writer, PoolSize >::m_semFinalize, PthrWriterBufPool< Writer, PoolSize >::m_semIn, PthrWriterBufPool< Writer, PoolSize >::m_semOut, and PthrWriterBufPool< Writer, PoolSize >::m_writer.
00046 { 00047 // wait for the end of thread_writing 00048 sem_wait(&m_semFinalize); 00049 00050 for ( int i = 0; i < PoolSize; ++i ) { 00051 delete m_buf[i]; 00052 } 00053 00054 sem_destroy(&m_semIn); 00055 sem_destroy(&m_semOut); 00056 sem_destroy(&m_semFinalize); 00057 00058 delete m_writer; 00059 }
PthrWriterBufPool< Writer, PoolSize >::PthrWriterBufPool | ( | ) | [private] |
int PthrWriterBufPool< Writer, PoolSize >::stat | ( | ) | [virtual] |
Implements IRawWriter.
Definition at line 78 of file PthrWriterBufPool.cc.
References PthrWriterBufPool< Writer, PoolSize >::m_writer.
Referenced by WriterRpc< Writer >::~WriterRpc().
00079 { 00080 return m_writer->stat(); 00081 }
void * PthrWriterBufPool< Writer, PoolSize >::thread_writing | ( | void * | arg | ) | [static, private] |
Definition at line 84 of file PthrWriterBufPool.cc.
References AutoEnlargeBuffer::data(), PthrWriterBufPool< Writer, PoolSize >::m_buf, PthrWriterBufPool< Writer, PoolSize >::m_outLoop, PthrWriterBufPool< Writer, PoolSize >::m_semFinalize, PthrWriterBufPool< Writer, PoolSize >::m_semIn, PthrWriterBufPool< Writer, PoolSize >::m_semOut, PthrWriterBufPool< Writer, PoolSize >::m_writer, RawExMessage::print(), and AutoEnlargeBuffer::size().
Referenced by PthrWriterBufPool< Writer, PoolSize >::PthrWriterBufPool().
00085 { 00086 PthrWriterBufPool<Writer, PoolSize>* pthis = (PthrWriterBufPool<Writer, PoolSize>*)arg; 00087 00088 uint32_t index; 00089 AutoEnlargeBuffer* pbuf; 00090 00091 00092 while ( true ) { 00093 // the ahead event buffer is ready for update now 00094 sem_post(&(pthis->m_semIn)); 00095 // waiting for an event that ready to use 00096 sem_wait(&(pthis->m_semOut)); 00097 00098 index = (++(pthis->m_outLoop)) % PoolSize; 00099 00100 pbuf = pthis->m_buf[index]; 00101 00102 try { 00103 pthis->m_writer->writeEvent( pbuf->data(), pbuf->size() ); 00104 } 00105 catch (ReachEndOfFileList& e) { 00106 //std::cout << "[PthrWriterBufPool] Finalized Successfully!" << std::endl; 00107 break; 00108 } 00109 catch (RawExMessage& e) { 00110 e.print(); 00111 break; 00112 } 00113 catch ( ... ) { 00114 std::cout << "[PthrWriterBufPool] Catch unexpected exception !" << std::endl; 00115 exit(1); 00116 } 00117 } 00118 00119 00120 pthis->m_writer->close(); 00121 00122 sem_post(&(pthis->m_semFinalize)); 00123 00124 pthread_exit(NULL); 00125 }
int PthrWriterBufPool< Writer, PoolSize >::writeEvent | ( | void * | pevt, | |
int | size | |||
) | [virtual] |
Implements IRawWriter.
Definition at line 62 of file PthrWriterBufPool.cc.
References AutoEnlargeBuffer::copy(), PthrWriterBufPool< Writer, PoolSize >::m_buf, PthrWriterBufPool< Writer, PoolSize >::m_inLoop, PthrWriterBufPool< Writer, PoolSize >::m_semIn, PthrWriterBufPool< Writer, PoolSize >::m_semOut, and PthrWriterBufPool< Writer, PoolSize >::m_writer.
Referenced by WriterRpc< Writer >::clearBak(), EventWriter::execute(), EventWriter::finalize(), EventWriter::initialize(), WriterRpc< Writer >::rpcHandler(), and WriterRpc< Writer >::~WriterRpc().
00063 { 00064 if ( m_writer->stat() != 0 ) return m_writer->stat(); 00065 00066 sem_wait(&m_semIn); 00067 00068 int index = (++m_inLoop) % PoolSize; 00069 00070 m_buf[index]->copy( pevt, size ); 00071 00072 sem_post(&m_semOut); 00073 00074 return 0; 00075 }
AutoEnlargeBuffer* PthrWriterBufPool< Writer, PoolSize >::m_buf[PoolSize] [private] |
int PthrWriterBufPool< Writer, PoolSize >::m_inLoop [private] |
Definition at line 31 of file PthrWriterBufPool.h.
Referenced by PthrWriterBufPool< Writer, PoolSize >::writeEvent().
int PthrWriterBufPool< Writer, PoolSize >::m_outLoop [private] |
Definition at line 32 of file PthrWriterBufPool.h.
Referenced by PthrWriterBufPool< Writer, PoolSize >::thread_writing().
sem_t PthrWriterBufPool< Writer, PoolSize >::m_semFinalize [private] |
Definition at line 41 of file PthrWriterBufPool.h.
Referenced by PthrWriterBufPool< Writer, PoolSize >::PthrWriterBufPool(), PthrWriterBufPool< Writer, PoolSize >::thread_writing(), and PthrWriterBufPool< Writer, PoolSize >::~PthrWriterBufPool().
sem_t PthrWriterBufPool< Writer, PoolSize >::m_semIn [private] |
sem_t PthrWriterBufPool< Writer, PoolSize >::m_semOut [private] |
pthread_t PthrWriterBufPool< Writer, PoolSize >::m_tid [private] |
Definition at line 37 of file PthrWriterBufPool.h.
Referenced by PthrWriterBufPool< Writer, PoolSize >::PthrWriterBufPool().
Writer* PthrWriterBufPool< Writer, PoolSize >::m_writer [private] |
Definition at line 35 of file PthrWriterBufPool.h.
Referenced by PthrWriterBufPool< Writer, PoolSize >::PthrWriterBufPool(), PthrWriterBufPool< Writer, PoolSize >::stat(), PthrWriterBufPool< Writer, PoolSize >::thread_writing(), PthrWriterBufPool< Writer, PoolSize >::writeEvent(), and PthrWriterBufPool< Writer, PoolSize >::~PthrWriterBufPool().