00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include <iostream>
00016
00017
00018 template <typename Writer, int PoolSize>
00019 PthrWriterBufPool<Writer, PoolSize>::PthrWriterBufPool(typename Writer::WriterArgType& arg)
00020 : m_inLoop(-1),
00021 m_outLoop(-1)
00022 {
00023 if ( PoolSize < 2 ) {
00024 std::cout << "[PthrWriterBufPool] The PoolSize of buffer must > 1" << std::endl;
00025 exit(1);
00026 }
00027
00028
00029 sem_init(&m_semIn, 0, (PoolSize-1));
00030 sem_init(&m_semOut, 0, 0);
00031 sem_init(&m_semFinalize, 0, 0);
00032
00033 for ( int i = 0; i < PoolSize; ++i ) {
00034
00035 m_buf[i] = new AutoEnlargeBuffer(128*1024);
00036 }
00037
00038 m_writer = new Writer(arg);
00039
00040
00041 pthread_create(&m_tid, NULL, thread_writing, (void*)this);
00042 }
00043
00044 template <typename Writer, int PoolSize>
00045 PthrWriterBufPool<Writer, PoolSize>::~PthrWriterBufPool()
00046 {
00047
00048 sem_wait(&m_semFinalize);
00049
00050 for ( int i = 0; i < PoolSize; ++i ) {
00051 delete m_buf[i];
00052 }
00053
00054 sem_destroy(&m_semIn);
00055 sem_destroy(&m_semOut);
00056 sem_destroy(&m_semFinalize);
00057
00058 delete m_writer;
00059 }
00060
00061 template <typename Writer, int PoolSize>
00062 int PthrWriterBufPool<Writer, PoolSize>::writeEvent(void* pevt, int size)
00063 {
00064 if ( m_writer->stat() != 0 ) return m_writer->stat();
00065
00066 sem_wait(&m_semIn);
00067
00068 int index = (++m_inLoop) % PoolSize;
00069
00070 m_buf[index]->copy( pevt, size );
00071
00072 sem_post(&m_semOut);
00073
00074 return 0;
00075 }
00076
00077 template <typename Writer, int PoolSize>
00078 int PthrWriterBufPool<Writer, PoolSize>::stat()
00079 {
00080 return m_writer->stat();
00081 }
00082
00083 template <typename Writer, int PoolSize>
00084 void* PthrWriterBufPool<Writer, PoolSize>::thread_writing(void* arg)
00085 {
00086 PthrWriterBufPool<Writer, PoolSize>* pthis = (PthrWriterBufPool<Writer, PoolSize>*)arg;
00087
00088 uint32_t index;
00089 AutoEnlargeBuffer* pbuf;
00090
00091
00092 while ( true ) {
00093
00094 sem_post(&(pthis->m_semIn));
00095
00096 sem_wait(&(pthis->m_semOut));
00097
00098 index = (++(pthis->m_outLoop)) % PoolSize;
00099
00100 pbuf = pthis->m_buf[index];
00101
00102 try {
00103 pthis->m_writer->writeEvent( pbuf->data(), pbuf->size() );
00104 }
00105 catch (ReachEndOfFileList& e) {
00106
00107 break;
00108 }
00109 catch (RawExMessage& e) {
00110 e.print();
00111 break;
00112 }
00113 catch ( ... ) {
00114 std::cout << "[PthrWriterBufPool] Catch unexpected exception !" << std::endl;
00115 exit(1);
00116 }
00117 }
00118
00119
00120 pthis->m_writer->close();
00121
00122 sem_post(&(pthis->m_semFinalize));
00123
00124 pthread_exit(NULL);
00125 }