Changeset 10923
- Timestamp:
- 06/07/11 22:18:46 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilderWrapper.h
r10861 r10923 16 16 17 17 #include "EventBuilder.h" 18 19 extern "C" { 20 extern void StartEvtBuild(); 21 extern int CloseRunFile(uint32_t runId, uint32_t closeTime); 22 } 18 23 19 24 class EventBuilderWrapper … … 41 46 MessageImp &fMsg; 42 47 48 DimDescribedService fDimFiles; 49 43 50 public: 44 EventBuilderWrapper(MessageImp &msg) : fMsg(msg) 51 EventBuilderWrapper(MessageImp &msg) : fMsg(msg), 52 fDimFiles("FAD_CONTROL/FILES", "X:1", "") 45 53 { 46 54 if (This) … … 48 56 49 57 This = this; 58 59 fFitsFormat = false; 50 60 } 51 61 ~EventBuilderWrapper() … … 56 66 fThread.join(); 57 67 //fMsg.Info("EventBuilder stopped."); 58 } 59 60 void Update(const char *msg, int severity) 68 69 for (vector<DataFileImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++) 70 delete *it; 71 } 72 73 void Update(ostringstream &msg, int severity) 61 74 { 62 75 fMsg.Update(msg, severity); … … 79 92 } 80 93 81 void Start(const vector< string> &addr)94 void Start(const vector<tcp::endpoint> &addr) 82 95 { 83 96 if (IsThreadRunning()) … … 87 100 } 88 101 102 int cnt = 0; 103 for (size_t i=0; i<40; i++) 104 { 105 if (addr[i]==tcp::endpoint()) 106 { 107 g_port[i].sockDef = -1; 108 continue; 109 } 110 111 // -1: if entry shell not be used 112 // 0: event builder will connect but ignore the events 113 // 1: event builder will connect and build events 114 g_port[i].sockDef = 1; 115 116 g_port[i].sockAddr.sin_family = AF_INET; 117 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr[i].address().to_v4().to_ulong()); 118 g_port[i].sockAddr.sin_port = htons(addr[i].port()); 119 120 cnt++; 121 } 122 123 // g_maxBoards = cnt; 124 g_actBoards = cnt; 125 126 g_runStat = kModeRun; 127 89 128 fMsg.Message("Starting EventBuilder thread"); 90 91 for (size_t i=0; i<addr.size(); i++)92 {93 memset(g_ip[i].addr, 0, sizeof(g_ip[i].addr));94 95 const size_t pos = addr[i].find_first_of(':');96 97 const string a = addr[i].substr(0, pos);98 const string p = addr[i].substr(pos+1);99 100 strcpy(g_ip[i].addr, "127.0.0.1");101 g_ip[i].port = atoi(p.c_str());102 }103 104 g_maxBoards = addr.size();105 g_actBoards = addr.size();106 107 g_runStat = kModeRun;108 129 109 130 fThread = boost::thread(StartEvtBuild); … … 144 165 size_t GetUsedMemory() const { return gi_usedMem; } 145 166 167 /* 168 struct OpenFileToDim 169 { 170 int code; 171 char fileName[FILENAME_MAX]; 172 }; 173 174 SignalRunOpened(runid, filename); 175 // Send num open files 176 // Send runid, (more info about the run?), filename via dim 177 178 SignalEvtWritten(runid); 179 // Send num events written of newest file 180 181 SignalRunClose(runid); 182 // Send new num open files 183 // Send empty file-name if no file is open 184 185 */ 146 186 147 187 // -------------- Mapped event builder callbacks ------------------ 148 188 149 int runOpen(uint32_t runid, RUN_HEAD *h, size_t) 150 { 189 class DataFileImp 190 { 191 uint32_t fRunId; 192 193 public: 194 DataFileImp(uint32_t id) : fRunId(id) { } 195 196 virtual bool Write() = 0; 197 virtual bool Close() = 0; 198 199 uint32_t GetRunId() const { return fRunId; } 200 }; 201 202 203 class DataFileRaw : public DataFileImp 204 { 205 public: 206 DataFileRaw(uint32_t id) : DataFileImp(id) { } 207 ~DataFileRaw() { Close(); } 208 209 virtual bool Write() { return true; } 210 virtual bool Close() { return true; } 211 }; 212 213 class DataFileFits : public DataFileImp 214 { 215 public: 216 DataFileFits(uint32_t id) :DataFileImp(id) { } 217 ~DataFileFits() { Close(); } 218 219 virtual bool Write() { return true; } 220 virtual bool Close() { return true; } 221 }; 222 223 vector<DataFileImp*> fFiles; 224 225 bool fFitsFormat; 226 227 template<class T> 228 void Update(DimDescribedService &svc, const T &data) const 229 { 230 cout << "Update: " << svc.getName() << " (" << sizeof(T) << ")" << endl; 231 svc.setData(const_cast<T*>(&data), sizeof(T)); 232 svc.updateService(); 233 } 234 235 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t) 236 { 237 // Check if file already exists... 151 238 cout << "OPEN_FILE #" << runid << endl; 152 239 cout << " Ver= " << h->Version << endl; … … 157 244 cout << " roi= " << h->Nroi << endl; 158 245 159 return 0; 160 } 161 162 int runWrite(int, EVENT *e, size_t) 163 { 164 cout << "WRITE_EVENT" << endl; 246 DataFileImp *file = NULL; 247 try 248 { 249 file = fFitsFormat ? 250 static_cast<DataFileImp*>(new DataFileFits(runid)) : 251 static_cast<DataFileImp*>(new DataFileRaw(runid)); 252 } 253 catch (const exception &e) 254 { 255 return 0; 256 } 257 258 fFiles.push_back(file); 259 260 Update(fDimFiles, fFiles.size()); 261 262 // fDimFiles.setData(fFiles.size()); 263 // fDimFiles.update(); 264 265 return reinterpret_cast<FileHandle_t>(file); 266 } 267 268 int runWrite(FileHandle_t handler, EVENT *e, size_t) 269 { 270 DataFileImp *file = reinterpret_cast<DataFileImp*>(handler); 271 272 cout << "WRITE_EVENT " << file->GetRunId() << endl; 165 273 166 274 cout << " Evt=" << e->EventNum << endl; … … 170 278 cout << " tim=" << e->PCTime << endl; 171 279 280 if (!file->Write()) 281 return -1; 282 283 // ===> SignalEvtWritten(runid); 284 // Send num events written of newest file 285 286 /* close run runId (all all runs if runId=0) */ 287 /* return: 0=close scheduled / >0 already closed / <0 does not exist */ 288 //CloseRunFile(file->GetRunId(), time(NULL)+2) ; 289 172 290 return 0; 173 291 } 174 292 175 int runClose(int, RUN_TAIL *, size_t) 176 { 177 cout << "CLOSE_RUN" << endl; 178 return 0; 293 int runClose(FileHandle_t handler, RUN_TAIL *, size_t) 294 { 295 DataFileImp *file = reinterpret_cast<DataFileImp*>(handler); 296 297 cout << "CLOSE_RUN " << file->GetRunId() << endl; 298 299 fFiles.erase(find(fFiles.begin(), fFiles.end(), file)); 300 301 Update(fDimFiles, fFiles.size()); 302 303 //fDimFiles.setData(fFiles.size()); 304 //fDimFiles.update(); 305 306 const bool rc = file->Close(); 307 if (!rc) 308 { 309 // Error message 310 } 311 312 delete file; 313 314 // ==> SignalRunClose(runid); 315 // Send new num open files 316 // Send empty file-name if no file is open 317 318 return rc ? 0 : -1; 179 319 } 180 320 }; … … 185 325 extern "C" 186 326 { 187 int runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)327 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len) 188 328 { 189 329 return EventBuilderWrapper::This->runOpen(irun, runhd, len); 190 330 } 191 331 192 int runWrite( int fileId, EVENT *event, size_t len)332 int runWrite(FileHandle_t fileId, EVENT *event, size_t len) 193 333 { 194 334 return EventBuilderWrapper::This->runWrite(fileId, event, len); 195 335 } 196 336 197 int runClose( int fileId, RUN_TAIL *runth, size_t len)337 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len) 198 338 { 199 339 return EventBuilderWrapper::This->runClose(fileId, runth, len); 200 340 } 201 341 342 void factOut(int severity, int err, char *message) 343 { 344 //replace(message, message+strlen(message), '\n', ' '); 345 346 // FIXME: Make the output to the console stream thread-safe 347 ostringstream str; 348 str << "EventBuilder("; 349 if (err<0) 350 str << "---"; 351 else 352 str << err; 353 str << "): " << message; 354 EventBuilderWrapper::This->Update(str, severity); 355 } 356 357 void factStat(int severity, int err, char* message ) 358 { 359 static string last; 360 if (message==last) 361 return; 362 363 if (err!=-1) 364 factOut(severity, err, message); 365 else 366 { 367 ostringstream str("Status: "); 368 str << message; 369 EventBuilderWrapper::This->Update(str, severity); 370 } 371 372 last = message; 373 } 374 375 /* 202 376 void message(int severity, const char *msg) 203 377 { 204 378 EventBuilderWrapper::This->Update(msg, severity); 205 } 379 }*/ 206 380 } 207 381
Note:
See TracChangeset
for help on using the changeset viewer.