source: trunk/FACT++/src/EventBuilderWrapper.h@ 10934

Last change on this file since 10934 was 10923, checked in by tbretz, 13 years ago
Adapted to latest version of code; first implementration of an interface to write files.
File size: 9.9 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4/*
5#if BOOST_VERSION < 104400
6#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
7#undef BOOST_HAS_RVALUE_REFS
8#endif
9#endif
10#include <boost/thread.hpp>
11
12using namespace std;
13*/
14
15#include <boost/date_time/posix_time/posix_time_types.hpp>
16
17#include "EventBuilder.h"
18
19extern "C" {
20 extern void StartEvtBuild();
21 extern int CloseRunFile(uint32_t runId, uint32_t closeTime);
22}
23
24class EventBuilderWrapper
25{
26public:
27 // FIXME
28 static EventBuilderWrapper *This;
29
30private:
31 boost::thread fThread;
32
33 enum CommandStates_t // g_runStat
34 {
35 kAbort = -2, // quit as soon as possible ('abort')
36 kExit = -1, // stop reading, quit when buffered events done ('exit')
37 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
38 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
39 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
40 kModeFlush = 10, // read data from camera, but skip them ('flush')
41 kModeTest = 20, // read data and process them, but do not write to disk ('test')
42 kModeFlag = 30, // read data, process and write all to disk ('flag')
43 kModeRun = 40, // read data, process and write selected to disk ('run')
44 };
45
46 MessageImp &fMsg;
47
48 DimDescribedService fDimFiles;
49
50public:
51 EventBuilderWrapper(MessageImp &msg) : fMsg(msg),
52 fDimFiles("FAD_CONTROL/FILES", "X:1", "")
53 {
54 if (This)
55 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
56
57 This = this;
58
59 fFitsFormat = false;
60 }
61 ~EventBuilderWrapper()
62 {
63 Abort();
64 // FIXME: Used timed_join and abort afterwards
65 // What's the maximum time the eb need to abort?
66 fThread.join();
67 //fMsg.Info("EventBuilder stopped.");
68
69 for (vector<DataFileImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
70 delete *it;
71 }
72
73 void Update(ostringstream &msg, int severity)
74 {
75 fMsg.Update(msg, severity);
76 }
77
78 bool IsThreadRunning()
79 {
80 return !fThread.timed_join(boost::posix_time::microseconds(0));
81 }
82
83 void SetMaxMemory(unsigned int mb) const
84 {
85 if (mb*1000000<GetUsedMemory())
86 {
87 // fMsg.Warn("...");
88 return;
89 }
90
91 g_maxMem = mb*1000000;
92 }
93
94 void Start(const vector<tcp::endpoint> &addr)
95 {
96 if (IsThreadRunning())
97 {
98 fMsg.Warn("Start - EventBuilder still running");
99 return;
100 }
101
102 int cnt = 0;
103 for (size_t i=0; i<40; i++)
104 {
105 if (addr[i]==tcp::endpoint())
106 {
107 g_port[i].sockDef = -1;
108 continue;
109 }
110
111 // -1: if entry shell not be used
112 // 0: event builder will connect but ignore the events
113 // 1: event builder will connect and build events
114 g_port[i].sockDef = 1;
115
116 g_port[i].sockAddr.sin_family = AF_INET;
117 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr[i].address().to_v4().to_ulong());
118 g_port[i].sockAddr.sin_port = htons(addr[i].port());
119
120 cnt++;
121 }
122
123// g_maxBoards = cnt;
124 g_actBoards = cnt;
125
126 g_runStat = kModeRun;
127
128 fMsg.Message("Starting EventBuilder thread");
129
130 fThread = boost::thread(StartEvtBuild);
131 }
132 void Abort()
133 {
134 fMsg.Message("Signal abort to EventBuilder thread...");
135 g_runStat = kAbort;
136 }
137
138 void Exit()
139 {
140 fMsg.Message("Signal exit to EventBuilder thread...");
141 g_runStat = kExit;
142 }
143
144 /*
145 void Wait()
146 {
147 fThread.join();
148 fMsg.Message("EventBuilder stopped.");
149 }*/
150
151 void Hybernate() const { g_runStat = kHybernate; }
152 void Sleep() const { g_runStat = kSleep; }
153 void FlushMode() const { g_runStat = kModeFlush; }
154 void TestMode() const { g_runStat = kModeTest; }
155 void FlagMode() const { g_runStat = kModeFlag; }
156 void RunMode() const { g_runStat = kModeRun; }
157
158 // FIXME: To be removed
159 void SetMode(int mode) const { g_runStat = mode; }
160
161 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
162 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0; }
163 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
164
165 size_t GetUsedMemory() const { return gi_usedMem; }
166
167 /*
168 struct OpenFileToDim
169 {
170 int code;
171 char fileName[FILENAME_MAX];
172 };
173
174 SignalRunOpened(runid, filename);
175 // Send num open files
176 // Send runid, (more info about the run?), filename via dim
177
178 SignalEvtWritten(runid);
179 // Send num events written of newest file
180
181 SignalRunClose(runid);
182 // Send new num open files
183 // Send empty file-name if no file is open
184
185 */
186
187 // -------------- Mapped event builder callbacks ------------------
188
189 class DataFileImp
190 {
191 uint32_t fRunId;
192
193 public:
194 DataFileImp(uint32_t id) : fRunId(id) { }
195
196 virtual bool Write() = 0;
197 virtual bool Close() = 0;
198
199 uint32_t GetRunId() const { return fRunId; }
200 };
201
202
203 class DataFileRaw : public DataFileImp
204 {
205 public:
206 DataFileRaw(uint32_t id) : DataFileImp(id) { }
207 ~DataFileRaw() { Close(); }
208
209 virtual bool Write() { return true; }
210 virtual bool Close() { return true; }
211 };
212
213 class DataFileFits : public DataFileImp
214 {
215 public:
216 DataFileFits(uint32_t id) :DataFileImp(id) { }
217 ~DataFileFits() { Close(); }
218
219 virtual bool Write() { return true; }
220 virtual bool Close() { return true; }
221 };
222
223 vector<DataFileImp*> fFiles;
224
225 bool fFitsFormat;
226
227 template<class T>
228 void Update(DimDescribedService &svc, const T &data) const
229 {
230 cout << "Update: " << svc.getName() << " (" << sizeof(T) << ")" << endl;
231 svc.setData(const_cast<T*>(&data), sizeof(T));
232 svc.updateService();
233 }
234
235 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
236 {
237 // Check if file already exists...
238 cout << "OPEN_FILE #" << runid << endl;
239 cout << " Ver= " << h->Version << endl;
240 cout << " Typ= " << h->RunType << endl;
241 cout << " Nb = " << h->NBoard << endl;
242 cout << " Np = " << h->NPix << endl;
243 cout << " NTm= " << h->NTm << endl;
244 cout << " roi= " << h->Nroi << endl;
245
246 DataFileImp *file = NULL;
247 try
248 {
249 file = fFitsFormat ?
250 static_cast<DataFileImp*>(new DataFileFits(runid)) :
251 static_cast<DataFileImp*>(new DataFileRaw(runid));
252 }
253 catch (const exception &e)
254 {
255 return 0;
256 }
257
258 fFiles.push_back(file);
259
260 Update(fDimFiles, fFiles.size());
261
262// fDimFiles.setData(fFiles.size());
263// fDimFiles.update();
264
265 return reinterpret_cast<FileHandle_t>(file);
266 }
267
268 int runWrite(FileHandle_t handler, EVENT *e, size_t)
269 {
270 DataFileImp *file = reinterpret_cast<DataFileImp*>(handler);
271
272 cout << "WRITE_EVENT " << file->GetRunId() << endl;
273
274 cout << " Evt=" << e->EventNum << endl;
275 cout << " Typ=" << e->TriggerType << endl;
276 cout << " roi=" << e->Roi << endl;
277 cout << " trg=" << e->SoftTrig << endl;
278 cout << " tim=" << e->PCTime << endl;
279
280 if (!file->Write())
281 return -1;
282
283 // ===> SignalEvtWritten(runid);
284 // Send num events written of newest file
285
286 /* close run runId (all all runs if runId=0) */
287 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
288 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
289
290 return 0;
291 }
292
293 int runClose(FileHandle_t handler, RUN_TAIL *, size_t)
294 {
295 DataFileImp *file = reinterpret_cast<DataFileImp*>(handler);
296
297 cout << "CLOSE_RUN " << file->GetRunId() << endl;
298
299 fFiles.erase(find(fFiles.begin(), fFiles.end(), file));
300
301 Update(fDimFiles, fFiles.size());
302
303 //fDimFiles.setData(fFiles.size());
304 //fDimFiles.update();
305
306 const bool rc = file->Close();
307 if (!rc)
308 {
309 // Error message
310 }
311
312 delete file;
313
314 // ==> SignalRunClose(runid);
315 // Send new num open files
316 // Send empty file-name if no file is open
317
318 return rc ? 0 : -1;
319 }
320};
321
322EventBuilderWrapper *EventBuilderWrapper::This = 0;
323
324// ----------- Event builder callbacks implementation ---------------
325extern "C"
326{
327 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
328 {
329 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
330 }
331
332 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
333 {
334 return EventBuilderWrapper::This->runWrite(fileId, event, len);
335 }
336
337 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
338 {
339 return EventBuilderWrapper::This->runClose(fileId, runth, len);
340 }
341
342 void factOut(int severity, int err, char *message)
343 {
344 //replace(message, message+strlen(message), '\n', ' ');
345
346 // FIXME: Make the output to the console stream thread-safe
347 ostringstream str;
348 str << "EventBuilder(";
349 if (err<0)
350 str << "---";
351 else
352 str << err;
353 str << "): " << message;
354 EventBuilderWrapper::This->Update(str, severity);
355 }
356
357 void factStat(int severity, int err, char* message )
358 {
359 static string last;
360 if (message==last)
361 return;
362
363 if (err!=-1)
364 factOut(severity, err, message);
365 else
366 {
367 ostringstream str("Status: ");
368 str << message;
369 EventBuilderWrapper::This->Update(str, severity);
370 }
371
372 last = message;
373 }
374
375 /*
376 void message(int severity, const char *msg)
377 {
378 EventBuilderWrapper::This->Update(msg, severity);
379 }*/
380}
381
382#endif
Note: See TracBrowser for help on using the repository browser.