00001 #include "DistBossServer/DistBossServer.h"
00002 #include "DistBossServer/WriterRpc.h"
00003 #include "DistBossServer/ReaderRpc.h"
00004 #include "RootFile/RootFileWriter.h"
00005 #include "RawFile/RawFileReader.h"
00006 #include "IRawFile/RawFileExceptions.h"
00007 #include <sstream>
00008 #include <iostream>
00009 #include <unistd.h>
00010
00011 const std::string DistBossServer::m_serverName = DistBossServer::autoServerName();
00012 const std::string DistBossServer::m_svc[7] = {
00013 m_serverName + "/RawEvtSvc",
00014 m_serverName + "/SimSeedSvc",
00015 m_serverName + "/RootEvtSvc",
00016 m_serverName + "/WriteDst",
00017 m_serverName + "/WriteRec",
00018 m_serverName + "/WriteTuple",
00019 m_serverName + "/WriteHist"
00020 };
00021
00022
00023 std::string DistBossServer::autoServerName()
00024 {
00025 std::stringstream spid;
00026 spid << getpid();
00027
00028 char hname[64];
00029 gethostname(hname, 64);
00030
00031 std::string name("/DistBoss/P");
00032 name += spid.str() + "@" + hname;
00033
00034 return name;
00035 }
00036
00037 DistBossServer::DistBossServer(const std::string& jobOpt)
00038 : m_optMgr(jobOpt)
00039 {
00040 if ( m_optMgr.evtMax() == 0 ) {
00041 std::cout << "[DistBossServer] EvtMax = 0, exit now!" << std::endl;
00042 exit(0);
00043 }
00044
00045 std::cout << "[DistBossServer] Starting server: " << m_serverName << std::endl;
00046
00047 switch ( m_optMgr.inputType() ) {
00048 case ( 0 ) :
00049 m_readerRpc = new ReaderRpc<RawFileReader>(m_svc[0], m_optMgr.inputFiles(), m_optMgr.evtMax());
00050 break;
00051 case ( 1 ) :
00052 std::cout << "[DistBossServer] Simulation is not supported by DistBoss yet!" << std::endl;
00053 exit(1);
00054 case ( 2 ) :
00055
00056
00057 std::cout << "[DistBossServer] Root input files are not supported by DistBoss yet!" << std::endl;
00058 exit(1);
00059 default :
00060 assert( false );
00061 }
00062
00063 const std::vector<int>& outputs = m_optMgr.outputTypes();
00064 for ( std::vector<int>::const_iterator it = outputs.begin(); it != outputs.end(); ++it ) {
00065 switch ( *it ) {
00066 case ( 3 ) :
00067 m_writerRpcs.push_back(new WriterRpc<RootFileWriter>(m_svc[3], m_optMgr.dstFile()));
00068 break;
00069 case ( 4 ) :
00070 m_writerRpcs.push_back(new WriterRpc<RootFileWriter>(m_svc[4], m_optMgr.recFile()));
00071 break;
00072 case ( 5 ) :
00073
00074 break;
00075 case ( 6 ) :
00076
00077 break;
00078 default :
00079 assert(false);
00080 }
00081 }
00082
00083 m_optMgr.clientOptsTemplate( m_serverName );
00084
00085 m_exitHandler = new ServerExitHandler();
00086 m_errorHandler = new ServerErrorHandler();
00087 }
00088
00089 DistBossServer::~DistBossServer()
00090 {
00091 delete m_readerRpc;
00092
00093 for ( unsigned int i = 0; i < m_writerRpcs.size(); ++i ) {
00094 delete m_writerRpcs[i];
00095 }
00096
00097 std::cout << "[DistBossServer] Server stopped." << std::endl;
00098
00099 delete m_exitHandler;
00100 }
00101
00102 int DistBossServer::run()
00103 {
00104 DimServer::addExitHandler( m_exitHandler );
00105 DimServer::addErrorHandler( m_errorHandler );
00106
00107 DimServer::start(m_serverName.c_str());
00108
00109
00110 int nClients = m_readerRpc->wait_to_end();
00111
00112 for ( unsigned int i = 0; i < m_writerRpcs.size(); ++i ) {
00113 nClients = m_writerRpcs[i]->wait_to_end();
00114 }
00115
00116 return nClients;
00117 }