#include <PthrWriterBufPool.h>
Inheritance diagram for PthrWriterBufPool< Writer, PoolSize >:
Public Member Functions | |
PthrWriterBufPool (typename Writer::WriterArgType &arg) | |
PthrWriterBufPool (typename Writer::WriterArgType &arg) | |
int | stat () |
int | stat () |
int | writeEvent (void *pevt, int size) |
int | writeEvent (void *pevt, int size) |
virtual | ~PthrWriterBufPool () |
virtual | ~PthrWriterBufPool () |
Private Member Functions | |
PthrWriterBufPool () | |
PthrWriterBufPool () | |
Static Private Member Functions | |
void * | thread_writing (void *arg) |
void * | thread_writing (void *arg) |
Private Attributes | |
AutoEnlargeBuffer * | m_buf [PoolSize] |
AutoEnlargeBuffer * | m_buf [PoolSize] |
int | m_inLoop |
int | m_outLoop |
sem_t | m_semFinalize |
sem_t | m_semIn |
sem_t | m_semOut |
pthread_t | m_tid |
Writer * | m_writer |
Writer * | m_writer |
|
00020 : m_inLoop(-1), 00021 m_outLoop(-1) 00022 { 00023 if ( PoolSize < 2 ) { 00024 std::cout << "[PthrWriterBufPool] The PoolSize of buffer must > 1" << std::endl; 00025 exit(1); 00026 } 00027 00028 // use(PoolSize-1) to protect the current event buffer 00029 sem_init(&m_semIn, 0, (PoolSize-1)); 00030 sem_init(&m_semOut, 0, 0); 00031 sem_init(&m_semFinalize, 0, 0); 00032 00033 for ( int i = 0; i < PoolSize; ++i ) { 00034 // buffer in the loop for each event 00035 m_buf[i] = new AutoEnlargeBuffer(128*1024); 00036 } 00037 00038 m_writer = new Writer(arg); 00039 00040 // create an individual thread for writing 00041 pthread_create(&m_tid, NULL, thread_writing, (void*)this); 00042 }
|
|
00046 { 00047 // wait for the end of thread_writing 00048 sem_wait(&m_semFinalize); 00049 00050 for ( int i = 0; i < PoolSize; ++i ) { 00051 delete m_buf[i]; 00052 } 00053 00054 sem_destroy(&m_semIn); 00055 sem_destroy(&m_semOut); 00056 sem_destroy(&m_semFinalize); 00057 00058 delete m_writer; 00059 }
|
|
|
|
|
|
|
|
|
|
Implements IRawWriter. |
|
Implements IRawWriter. 00079 { 00080 return m_writer->stat(); 00081 }
|
|
|
|
00085 { 00086 PthrWriterBufPool<Writer, PoolSize>* pthis = (PthrWriterBufPool<Writer, PoolSize>*)arg; 00087 00088 uint32_t index; 00089 AutoEnlargeBuffer* pbuf; 00090 00091 00092 while ( true ) { 00093 // the ahead event buffer is ready for update now 00094 sem_post(&(pthis->m_semIn)); 00095 // waiting for an event that ready to use 00096 sem_wait(&(pthis->m_semOut)); 00097 00098 index = (++(pthis->m_outLoop)) % PoolSize; 00099 00100 pbuf = pthis->m_buf[index]; 00101 00102 try { 00103 pthis->m_writer->writeEvent( pbuf->data(), pbuf->size() ); 00104 } 00105 catch (ReachEndOfFileList& e) { 00106 //std::cout << "[PthrWriterBufPool] Finalized Successfully!" << std::endl; 00107 break; 00108 } 00109 catch (RawExMessage& e) { 00110 e.print(); 00111 break; 00112 } 00113 catch ( ... ) { 00114 std::cout << "[PthrWriterBufPool] Catch unexpected exception !" << std::endl; 00115 exit(1); 00116 } 00117 } 00118 00119 00120 pthis->m_writer->close(); 00121 00122 sem_post(&(pthis->m_semFinalize)); 00123 00124 pthread_exit(NULL); 00125 }
|
|
Implements IRawWriter. |
|
Implements IRawWriter. 00063 { 00064 if ( m_writer->stat() != 0 ) return m_writer->stat(); 00065 00066 sem_wait(&m_semIn); 00067 00068 int index = (++m_inLoop) % PoolSize; 00069 00070 m_buf[index]->copy( pevt, size ); 00071 00072 sem_post(&m_semOut); 00073 00074 return 0; 00075 }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|