source: trunk/FACT++/src/EventBuilderWrapper.h@ 10939

Last change on this file since 10939 was 10938, checked in by tbretz, 13 years ago
Send some dim messages.
File size: 11.5 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4/*
5#if BOOST_VERSION < 104400
6#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
7#undef BOOST_HAS_RVALUE_REFS
8#endif
9#endif
10#include <boost/thread.hpp>
11
12using namespace std;
13*/
14
15#include <boost/date_time/posix_time/posix_time_types.hpp>
16
17#include "EventBuilder.h"
18
19extern "C" {
20 extern void StartEvtBuild();
21 extern int CloseRunFile(uint32_t runId, uint32_t closeTime);
22}
23
24class EventBuilderWrapper
25{
26public:
27 // FIXME
28 static EventBuilderWrapper *This;
29
30private:
31 boost::thread fThread;
32
33 enum CommandStates_t // g_runStat
34 {
35 kAbort = -2, // quit as soon as possible ('abort')
36 kExit = -1, // stop reading, quit when buffered events done ('exit')
37 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
38 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
39 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
40 kModeFlush = 10, // read data from camera, but skip them ('flush')
41 kModeTest = 20, // read data and process them, but do not write to disk ('test')
42 kModeFlag = 30, // read data, process and write all to disk ('flag')
43 kModeRun = 40, // read data, process and write selected to disk ('run')
44 };
45
46 MessageImp &fMsg;
47
48 enum
49 {
50 kCurrent = 0,
51 kTotal = 1
52 };
53
54 bool fFitsFormat;
55
56 uint32_t fMaxRun;
57 uint32_t fNumEvts[2];
58
59 DimDescribedService fDimFiles;
60 DimDescribedService fDimRuns;
61 DimDescribedService fDimEvents;
62 DimDescribedService fDimCurrentEvent;
63
64public:
65 EventBuilderWrapper(MessageImp &msg) : fMsg(msg),
66 fFitsFormat(false), fMaxRun(0),
67 fDimFiles ("FAD_CONTROL/FILES", "X:1", ""),
68 fDimRuns ("FAD_CONTROL/RUNS", "I:1", ""),
69 fDimEvents("FAD_CONTROL/EVENTS", "I:3", ""),
70 fDimCurrentEvent("FAD_CONTROL/CURRENT_EVENT", "I:1", "")
71 {
72 if (This)
73 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
74
75 This = this;
76
77 memset(fNumEvts, 0, sizeof(fNumEvts));
78
79 Update(fDimRuns, uint32_t(0));
80 Update(fDimCurrentEvent, uint32_t(0));
81 Update(fDimEvents, fNumEvts);
82 }
83 ~EventBuilderWrapper()
84 {
85 Abort();
86 // FIXME: Used timed_join and abort afterwards
87 // What's the maximum time the eb need to abort?
88 fThread.join();
89 //fMsg.Info("EventBuilder stopped.");
90
91 for (vector<DataFileImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
92 delete *it;
93 }
94
95 void Update(ostringstream &msg, int severity)
96 {
97 fMsg.Update(msg, severity);
98 }
99
100 bool IsThreadRunning()
101 {
102 return !fThread.timed_join(boost::posix_time::microseconds(0));
103 }
104
105 void SetMaxMemory(unsigned int mb) const
106 {
107 if (mb*1000000<GetUsedMemory())
108 {
109 // fMsg.Warn("...");
110 return;
111 }
112
113 g_maxMem = mb*1000000;
114 }
115
116 void Start(const vector<tcp::endpoint> &addr)
117 {
118 if (IsThreadRunning())
119 {
120 fMsg.Warn("Start - EventBuilder still running");
121 return;
122 }
123
124 int cnt = 0;
125 for (size_t i=0; i<40; i++)
126 {
127 if (addr[i]==tcp::endpoint())
128 {
129 g_port[i].sockDef = -1;
130 continue;
131 }
132
133 // -1: if entry shell not be used
134 // 0: event builder will connect but ignore the events
135 // 1: event builder will connect and build events
136 g_port[i].sockDef = 1;
137
138 g_port[i].sockAddr.sin_family = AF_INET;
139 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr[i].address().to_v4().to_ulong());
140 g_port[i].sockAddr.sin_port = htons(addr[i].port());
141
142 cnt++;
143 }
144
145// g_maxBoards = cnt;
146 g_actBoards = cnt;
147
148 g_runStat = kModeRun;
149
150 fMsg.Message("Starting EventBuilder thread");
151
152 fThread = boost::thread(StartEvtBuild);
153 }
154 void Abort()
155 {
156 fMsg.Message("Signal abort to EventBuilder thread...");
157 g_runStat = kAbort;
158 }
159
160 void Exit()
161 {
162 fMsg.Message("Signal exit to EventBuilder thread...");
163 g_runStat = kExit;
164 }
165
166 /*
167 void Wait()
168 {
169 fThread.join();
170 fMsg.Message("EventBuilder stopped.");
171 }*/
172
173 void Hybernate() const { g_runStat = kHybernate; }
174 void Sleep() const { g_runStat = kSleep; }
175 void FlushMode() const { g_runStat = kModeFlush; }
176 void TestMode() const { g_runStat = kModeTest; }
177 void FlagMode() const { g_runStat = kModeFlag; }
178 void RunMode() const { g_runStat = kModeRun; }
179
180 // FIXME: To be removed
181 void SetMode(int mode) const { g_runStat = mode; }
182
183 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
184 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0; }
185 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
186
187 size_t GetUsedMemory() const { return gi_usedMem; }
188
189 virtual int CloseOpenFiles() { CloseRunFile(0, 0); return 0; }
190
191
192 /*
193 struct OpenFileToDim
194 {
195 int code;
196 char fileName[FILENAME_MAX];
197 };
198
199 SignalRunOpened(runid, filename);
200 // Send num open files
201 // Send runid, (more info about the run?), filename via dim
202
203 SignalEvtWritten(runid);
204 // Send num events written of newest file
205
206 SignalRunClose(runid);
207 // Send new num open files
208 // Send empty file-name if no file is open
209
210 */
211
212 // -------------- Mapped event builder callbacks ------------------
213
214 class DataFileImp
215 {
216 uint32_t fRunId;
217
218 public:
219 DataFileImp(uint32_t id) : fRunId(id) { }
220
221 virtual bool Write() = 0;
222 virtual bool Close() = 0;
223
224 uint32_t GetRunId() const { return fRunId; }
225 };
226
227
228 class DataFileRaw : public DataFileImp
229 {
230 public:
231 DataFileRaw(uint32_t id) : DataFileImp(id) { }
232 ~DataFileRaw() { Close(); }
233
234 virtual bool Write() { return true; }
235 virtual bool Close() { return true; }
236 };
237
238 class DataFileFits : public DataFileImp
239 {
240 public:
241 DataFileFits(uint32_t id) :DataFileImp(id) { }
242 ~DataFileFits() { Close(); }
243
244 virtual bool Write() { return true; }
245 virtual bool Close() { return true; }
246 };
247
248 vector<DataFileImp*> fFiles;
249
250 template<class T>
251 void Update(DimDescribedService &svc, const T &data) const
252 {
253 cout << "Update: " << svc.getName() << " (" << sizeof(T) << ")" << endl;
254 svc.setData(const_cast<T*>(&data), sizeof(T));
255 svc.updateService();
256 }
257
258 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
259 {
260 // Check if file already exists...
261 DataFileImp *file = NULL;
262 try
263 {
264 file = fFitsFormat ?
265 static_cast<DataFileImp*>(new DataFileFits(runid)) :
266 static_cast<DataFileImp*>(new DataFileRaw(runid));
267 }
268 catch (const exception &e)
269 {
270 return 0;
271 }
272
273 cout << "OPEN_FILE #" << runid << " (" << file << ")" << endl;
274 cout << " Ver= " << h->Version << endl;
275 cout << " Typ= " << h->RunType << endl;
276 cout << " Nb = " << h->NBoard << endl;
277 cout << " Np = " << h->NPix << endl;
278 cout << " NTm= " << h->NTm << endl;
279 cout << " roi= " << h->Nroi << endl;
280
281 fFiles.push_back(file);
282
283 if (runid>fMaxRun)
284 {
285 fMaxRun = runid;
286 fNumEvts[kCurrent] = 0;
287
288 Update(fDimRuns, fMaxRun);
289 Update(fDimEvents, fNumEvts);
290 Update(fDimCurrentEvent, uint32_t(0));
291 }
292
293 Update(fDimFiles, fFiles.size());
294
295// fDimFiles.setData(fFiles.size());
296// fDimFiles.update();
297
298 return reinterpret_cast<FileHandle_t>(file);
299 }
300
301 int runWrite(FileHandle_t handler, EVENT *e, size_t)
302 {
303 DataFileImp *file = reinterpret_cast<DataFileImp*>(handler);
304
305 cout << "WRITE_EVENT " << file->GetRunId() << endl;
306
307 cout << " Evt=" << e->EventNum << endl;
308 cout << " Typ=" << e->TriggerType << endl;
309 cout << " roi=" << e->Roi << endl;
310 cout << " trg=" << e->SoftTrig << endl;
311 cout << " tim=" << e->PCTime << endl;
312
313 if (!file->Write())
314 return -1;
315
316 if (file->GetRunId()==fMaxRun)
317 {
318 Update(fDimCurrentEvent, e->EventNum);
319 fNumEvts[kCurrent]++;
320 }
321
322 fNumEvts[kTotal]++;
323 Update(fDimEvents, fNumEvts);
324
325 // ===> SignalEvtWritten(runid);
326 // Send num events written of newest file
327
328 /* close run runId (all all runs if runId=0) */
329 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
330 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
331
332 return 0;
333 }
334
335 int runClose(FileHandle_t handler, RUN_TAIL *, size_t)
336 {
337 DataFileImp *file = reinterpret_cast<DataFileImp*>(handler);
338
339 const vector<DataFileImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
340 if (it==fFiles.end())
341 {
342 ostringstream str;
343 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
344 fMsg.Fatal(str);
345 return -1;
346 }
347
348 ostringstream str;
349 str << "CLOSE_RUN requested for " << file->GetRunId() << " (" << file << ")" <<endl;
350 fMsg.Debug(str);
351
352 fFiles.erase(it);
353
354 Update(fDimFiles, fFiles.size());
355
356 //fDimFiles.setData(fFiles.size());
357 //fDimFiles.update();
358
359 const bool rc = file->Close();
360 if (!rc)
361 {
362 // Error message
363 }
364
365 delete file;
366
367 // ==> SignalRunClose(runid);
368 // Send new num open files
369 // Send empty file-name if no file is open
370
371 return rc ? 0 : -1;
372 }
373};
374
375EventBuilderWrapper *EventBuilderWrapper::This = 0;
376
377// ----------- Event builder callbacks implementation ---------------
378extern "C"
379{
380 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
381 {
382 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
383 }
384
385 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
386 {
387 return EventBuilderWrapper::This->runWrite(fileId, event, len);
388 }
389
390 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
391 {
392 return EventBuilderWrapper::This->runClose(fileId, runth, len);
393 }
394
395 void factOut(int severity, int err, char *message)
396 {
397 //replace(message, message+strlen(message), '\n', ' ');
398
399 // FIXME: Make the output to the console stream thread-safe
400 ostringstream str;
401 str << "EventBuilder(";
402 if (err<0)
403 str << "---";
404 else
405 str << err;
406 str << "): " << message;
407 EventBuilderWrapper::This->Update(str, severity);
408 }
409
410 void factStat(int severity, int err, char* message )
411 {
412 static string last;
413 if (message==last)
414 return;
415
416 if (err!=-1)
417 factOut(severity, err, message);
418 else
419 {
420 ostringstream str("Status: ");
421 str << message;
422 EventBuilderWrapper::This->Update(str, severity);
423 }
424
425 last = message;
426 }
427
428 /*
429 void message(int severity, const char *msg)
430 {
431 EventBuilderWrapper::This->Update(msg, severity);
432 }*/
433}
434
435#endif
Note: See TracBrowser for help on using the repository browser.