source: trunk/FACT++/src/dataLogger.cc@ 10994

Last change on this file since 10994 was 10994, checked in by tbretz, 13 years ago
The pulic functions don't need to be pulic.
File size: 90.0 KB
Line 
1//****************************************************************
2/** @class DataLogger
3
4 @brief Logs all message and infos between the services
5
6 This is the main logging class facility.
7 It derives from StateMachineDim and DimInfoHandler. the first parent is here to enforce
8 a state machine behaviour, while the second one is meant to make the dataLogger receive
9 dim services to which it subscribed from.
10 The possible states and transitions of the machine are:
11 \dot
12 digraph datalogger {
13 node [shape=record, fontname=Helvetica, fontsize=10];
14 e [label="Error" color="red"];
15 r [label="Ready"]
16 d [label="NightlyOpen"]
17 w [label="WaitingRun"]
18 l [label="Logging"]
19 b [label="BadNightlyconfig" color="red"]
20 c [label="BadRunConfig" color="red"]
21
22 e -> r
23 r -> e
24 r -> d
25 r -> b
26 d -> w
27 d -> r
28 w -> r
29 l -> r
30 l -> w
31 b -> d
32 w -> c
33 w -> l
34 b -> r
35 c -> r
36 c -> l
37 }
38 \enddot
39 */
40 //****************************************************************
41#include "Dim.h"
42#include "Event.h"
43#include "Time.h"
44#include "StateMachineDim.h"
45#include "WindowLog.h"
46#include "Configuration.h"
47#include "ServiceList.h"
48#include "Converter.h"
49#include "MessageImp.h"
50#include "LocalControl.h"
51#include "DimDescriptionService.h"
52
53#include "Description.h"
54
55//#include "DimServiceInfoList.h"
56#include "DimNetwork.h"
57//for getting stat of opened files
58#include <unistd.h>
59//for getting disk free space
60#include <sys/statvfs.h>
61//for getting files sizes
62#include <sys/stat.h>
63
64#include <fstream>
65#include <mutex>
66
67#include <boost/bind.hpp>
68#if BOOST_VERSION < 104400
69#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
70#undef BOOST_HAS_RVALUE_REFS
71#endif
72#endif
73#include <boost/thread.hpp>
74
75#ifdef HAVE_FITS
76#include "Fits.h"
77#endif
78
79//Dim structures
80///Distributes the writing statistics
81struct DataLoggerStats {
82 long sizeWritten;
83 long freeSpace;
84 long writingRate;
85};
86///distributes the number of opened subscriptions and fits files
87struct NumSubAndFitsType {
88 int numSubscriptions;
89 int numOpenFits;
90};
91///distributes which files were opened.
92struct OpenFileToDim {
93 int code;
94 char fileName[FILENAME_MAX];
95};
96
97///Run number record. Used to keep track of which run numbers are still active
98struct RunNumberType {
99#ifdef RUN_LOGS
100 ///the run number log file
101 shared_ptr<ofstream> logFile;
102#endif
103 ///the run number report file
104 shared_ptr<ofstream> reportFile;
105#ifdef HAVE_FITS
106 ///the run number group fits file
107 shared_ptr<CCfits::FITS> runFitsFile;
108#endif
109#ifdef RUN_LOGS
110 ///the log filename
111 string logName;
112#endif
113 ///the report filename
114 string reportName;
115 ///the actual run number
116 uint32_t runNumber;
117 ///the time at which the run number was received
118 Time time;
119 ///list of opened fits used to create the fits grouping when the run ends
120 map<string, vector<string> > openedFits;
121 ///default constructor
122 RunNumberType()
123 {
124#ifdef RUN_LOGS
125 logFile = shared_ptr<ofstream>(new ofstream());
126#endif
127 reportFile = shared_ptr<ofstream>(new ofstream());
128#ifdef HAVE_FITS
129 runFitsFile = shared_ptr<CCfits::FITS>();
130#endif
131 runNumber = 0;
132 }
133 ///default destructor
134 ~RunNumberType()
135 {
136
137 }
138
139 void addServiceToOpenedFits(const string& fileName, const string& serviceName)
140 {
141 //most likely I should add this service name.
142 //the only case for which I should not add it is if a service disapeared, hence the file was closed
143 //and reopened again. Unlikely to happen, but well it may
144
145 if (find(openedFits[fileName].begin(), openedFits[fileName].end(),
146 serviceName)==openedFits[fileName].end())
147 openedFits[fileName].push_back(serviceName);
148 }
149};
150///Dim subscription type. Stores all the relevant info to handle a Dim subscription
151struct SubscriptionType
152{
153#ifdef HAVE_FITS
154 ///Nightly FITS output file
155 Fits nightlyFile;
156 ///run-specific FITS output file
157 Fits runFile;
158#endif
159 ///the actual dimInfo pointer
160 shared_ptr<DimStampedInfo> dimInfo;
161 ///the server
162 string server;
163 ///the service
164 string service;
165 ///the converter for outputting the data according to the format
166 shared_ptr<Converter> fConv;
167 ///the current run number used by this subscription
168 uint32_t runNumber;
169 ///time of the latest received event
170 Time lastReceivedEvent;
171 ///whether or not the fits buffer was allocated already
172 bool fitsBufferAllocated;
173
174 ///Dim info constructor
175 SubscriptionType(DimStampedInfo* info=NULL)
176 {
177 dimInfo = shared_ptr<DimStampedInfo>(info);
178 fConv = shared_ptr<Converter>();
179 runNumber = 0;
180 lastReceivedEvent = Time::None;
181 fitsBufferAllocated = false;
182 }
183
184 ///default destructor
185 ~SubscriptionType()
186 {
187 }
188};
189
190class DataLogger : public StateMachineDim, DimServiceInfoList
191{
192public:
193 /// The list of existing states specific to the DataLogger
194 enum
195 {
196 kSM_NightlyOpen = 20, ///< Nightly file openned and writing
197 kSM_WaitingRun = 30, ///< waiting for the run number to open the run file
198 kSM_Logging = 40, ///< both files openned and writing
199 kSM_BadNightlyConfig = 0x101, ///< the folder specified for Nightly logging does not exist or has bad permissions
200 kSM_BadRunConfig = 0x102, ///< the folder specified for the run logging does not exist or has wrong permissions or no run number
201 kSM_WriteError = 0x103, ///< Denotes that an error occured while writing a file (text or fits).
202 } localstates_t;
203
204 DataLogger(ostream &out);
205 ~DataLogger();
206
207 bool SetConfiguration(Configuration& conf);
208
209private:
210 /************************************************
211 * MEMBER VARIABLES
212 ************************************************/
213 /// ofstream for the NightlyLogfile
214 ofstream fNightlyLogFile;
215 /// ofstream for the Nightly report file
216 ofstream fNightlyReportFile;
217 /// base path of the Nightlyfile
218 string fNightlyFilePath;
219 ///base path of the run file
220 string fRunFilePath;
221 ///run numbers
222 list<RunNumberType> fRunNumber;
223 ///old run numbers time-out delay (in minutes)
224 long fRunNumberTimeout;
225 ///previous run number. to check if changed while logging
226 int fPreviousRunNumber;
227 ///Current Service Quality
228 int fQuality;
229 ///Modified Julian Date
230 double fMjD;
231 ///for obtaining the name of the existing services
232// ServiceList fServiceList;
233 typedef map<const string, map<string, SubscriptionType> > SubscriptionsListType;
234 ///All the services to which we have subscribed to, sorted by server name.
235 SubscriptionsListType fServiceSubscriptions;
236 ///full name of the nightly log file
237 string fFullNightlyLogFileName;
238 ///full name of the nightly report file
239 string fFullNightlyReportFileName;
240 ///variable to track when the statistic were last calculated
241 Time fPreviousStatsUpdateTime;
242 Time fPreviousOldRunNumberCheck;
243 ///boolean to know whether we should close and reopen daily files or not
244 bool fDailyFileDayChangedAlready;
245
246private:
247 /***************************************************
248 * DIM INFO HANDLER
249 ***************************************************/
250 //overloading of DIM's infoHandler function
251 void infoHandler();
252
253 /***************************************************
254 * TRANSITION FUNCTIONS
255 ***************************************************/
256 ///Reporting method for the services info received
257 void ReportPlease(DimInfo* I, SubscriptionType& sub);
258
259 int ConfigureFileName(string &target, const string &type, const EventImp &evt);
260 ///Configuration of the nightly file path
261 int ConfigureNightlyFileName(const Event& evt);
262 ///Configuration fo the file name
263 int ConfigureRunFileName(const Event& evt);
264 ///DEPREC - configuration of the run number
265 int ConfigureRunNumber(const Event& evt);
266 ///print the current state of the dataLogger
267 int PrintStatePlease(const Event& evt);
268 ///checks whether or not the current info being treated is a run number
269 void CheckForRunNumber(DimInfo* I);
270 /// start transition
271 int StartPlease();
272 ///from waiting to logging transition
273 int StartRunPlease();
274 /// from logging to waiting transition
275 int StopRunPlease();
276 ///stop and reset transition
277 int GoToReadyPlease();
278 ///from NightlyOpen to waiting transition
279 int NightlyToWaitRunPlease();
280 /// from writing to error
281 std::string SetCurrentState(int state, const char *txt="", const std::string &cmd="");
282#ifdef HAVE_FITS
283 ///Open fits files
284 void OpenFITSFilesPlease(SubscriptionType& sub, RunNumberType* cRunNumber);
285 ///Write data to FITS files
286 void WriteToFITS(SubscriptionType& sub);
287 ///Allocate the buffers required for fits
288 void AllocateFITSBuffers(SubscriptionType& sub);
289#endif//has_fits
290
291 /***************************************
292 * DIM SERVICES PROVIDED BY THE DATA LOGGER
293 ***************************************/
294 ///monitoring notification loop
295 void ServicesMonitoring();
296 inline void NotifyOpenedFile(const string &name, int type, DimDescribedService* service);
297 ///variables for computing statistics
298 DataLoggerStats fStatVar;
299 ///mutex to make sure that the Stats are not accessed while updating
300// mutex fStatsMutex;
301 ///services notification thread
302// boost::thread fMonitoringThread;
303 ///end of the monitoring
304// bool fContinueMonitoring;
305 ///stores the size of each file that is or was open
306 map<string, long> fFileSizesMap;
307 ///total size of the opened files BEFORE they were opened by the logger
308 long fBaseSizeNightly;
309 long fPreviousSize;
310 long fBaseSizeRun;
311 ///Service for opened files
312 DimDescribedService* fOpenedNightlyFiles;
313 DimDescribedService* fOpenedRunFiles;
314 DimDescribedService* fNumSubAndFits;
315 DimDescribedService* fStatsMonitoring;
316 NumSubAndFitsType fNumSubAndFitsData;
317 ///Small function for calculating the total size written so far
318 bool calculateTotalSizeWritten(DataLoggerStats& statVar, bool isPrinting);
319
320 /***************************************************
321 * DATA LOGGER's CONFIGURATION STUFF
322 ***************************************************/
323 ///black/white listing
324 set<string> fBlackList;
325 set<string> fWhiteList;
326 ///list of services to be grouped
327 set<string> fGrouping;
328 ///configuration flags
329 bool fDebugIsOn;
330 float fStatsPeriodDuration;
331 bool fOpenedFilesIsOn;
332 bool fNumSubAndFitsIsOn;
333 //functions for controlling the services behavior
334 int SetDebugOnOff(const Event& evt);
335 int SetStatsPeriod(const Event& evt);
336 int SetOpenedFilesOnOff(const Event& evt);
337 int SetNumSubsAndFitsOnOff(const Event& evt);
338 int SetRunTimeoutDelay(const Event& evt);
339
340 ///boolean to prevent DIM update while desctructing the dataLogger
341 bool fDestructing;
342 /***************************************************
343 * UTILITIES
344 ***************************************************/
345 ///vectors to keep track of opened Fits files, for grouping purposes.
346 map<string, vector<string> > fOpenedNightlyFits;
347 ///creates a group fits file based on a list of files to be grouped
348 void CreateFitsGrouping(map<string, vector<string> >& filesToGroup, int runNumber);
349
350 bool OpenStream(shared_ptr<ofstream> stream, const string &filename);
351 ///Open the relevant text files related to a particular run
352 int OpenRunFile(RunNumberType& run);
353 ///add a new run number
354 void AddNewRunNumber(int64_t newRun, Time time);
355 ///removes the oldest run number, and close the relevant files.
356 void RemoveOldestRunNumber();
357 ///retrieves the size of a file
358 off_t GetFileSize(const string&);
359 ///Get the digits of year, month and day for filenames and paths
360 void GetYearMonthDayForFiles(unsigned short& year, unsigned short& month, unsigned short& day);
361 ///Appends the relevant year month day to a given path
362 void AppendYearMonthDaytoPath(string& path);
363 ///Form the files path
364 string CompileFileName(const string &path, const string &service, const string & extension, const Time &time=Time());
365 ///Form the files path
366 string CompileFileName(const string &path, uint32_t run, const string &service, const string & extension, const Time &time=Time());
367 ///Check whether service is in black and/or white list
368 bool ShouldSubscribe(const string& server, const string& service);
369 ///Subscribe to a given server and service
370 DimStampedInfo* SubscribeToPlease(const string& server, const string& service);
371 ///Open a text file and checks for ofstream status
372 bool OpenTextFilePlease(ofstream& stream, const string& name);
373 ///Check if a dir is . and returns the actual string corresponding to .
374// string CheckIfDirIsDot(const string& dir);
375 ///Remembers the size of newly opened files. for statistic purposes
376 bool RememberFileOrigSizePlease(string& fileName, bool nightly);
377 ///Checks if the input osftream is in error state, and if so close it.
378 void CheckForOfstreamError(ofstream& out);
379 ///Checks if a given path exist
380 bool DoesPathExist(string path);
381 ///Check if the statistics service should be updated, and if so, do it
382 void UpdateStatisticsService();
383 ///Check if old run numbers can be trimmed, and if so, do it
384 void TrimOldRunNumbers();
385 ///Create a given directory
386 bool CreateDirectory(string path);
387 /***************************************************
388 * INHERITED FROM DIMSERVICEINFOLIST
389 ***************************************************/
390 ///Add a new service subscription
391 void AddService(const string&, const string&, const string&, bool);
392 ///Remove a given service subscription
393 void RemoveService(const string&, const string&, bool);
394 ///Remove all the services associated with a given server
395 void RemoveAllServices(const string&);
396}; //DataLogger
397
398// --------------------------------------------------------------------------
399//
400//! Check if a given path exists
401//! @param path the path to be checked
402//! @return whether or not the creation has been successfull
403//
404bool DataLogger::CreateDirectory(string path)
405{
406 //remove last '/', if present
407 if (path[path.size()-1] == '/')
408 path = path.substr(0, path.size()-1);
409
410 //create boost path
411 const boost::filesystem::path fullPath = boost::filesystem::system_complete(boost::filesystem::path(path));
412
413 //if path does not exist, check if upper levels exist already
414 if (boost::filesystem::exists(fullPath))
415 {
416 //if path already exist, make sure it does not designate a file (filenames cannot be checked if they do not exist)
417 if (boost::filesystem::is_directory(fullPath))
418 return true;
419
420 Error("Path to be created contains a file name: '" + path + "'");
421 return false;
422 }
423
424 if (path.size() <= 1)
425 {//we're hitting "/", which SHOULD have existed...
426 Error("Something unexpected happened while creating a path");
427 }
428 CreateDirectory(path.substr(0, path.find_last_of('/')));
429
430 //path does not exist, and upper level have been created already by recusrion.
431 const mode_t rightsMask = S_IRWXU | S_IXGRP | S_IRGRP | S_IXOTH | S_IROTH; //everybody read, owner writes
432
433 const int returnValue = mkdir(path.c_str(), rightsMask);
434
435 if (returnValue != 0)
436 {
437 ostringstream str;
438 str << "Could not create directory " << path << " mkdir error code: " << errno;
439 Error(str.str());
440 return false;
441 }
442
443 return true;
444}
445// --------------------------------------------------------------------------
446//
447//! Check if a given path exists
448//! @param path the path to be checked
449//! @return whether or not the given path exists
450//
451bool DataLogger::DoesPathExist(string path)
452{
453 const boost::filesystem::path fullPath = boost::filesystem::system_complete(boost::filesystem::path(path));
454
455 if (!boost::filesystem::exists(fullPath))
456 return false;
457
458 if (!boost::filesystem::is_directory(fullPath))
459 {
460 Error("Path given for checking '" + path + "' designate a file name. Please provide a path name only");
461 return false;
462 }
463
464 if (access(path.c_str(), R_OK|W_OK|X_OK) != 0)
465 {
466 Error("Missing read, write or execute permissions on directory '" + path + "'");
467 return false;
468 }
469
470 return true;
471}
472// --------------------------------------------------------------------------
473//
474//! Add a new service subscription
475//! @param server the server for which the subscription should be created
476//! @param service the service for which the subscription should be created
477//! @param isCmd whether this is a Dim Command or not. Commands are not logged
478//
479void DataLogger::AddService(const string& server, const string& service, const string&, bool isCmd)
480{
481 //dataLogger does not subscribe to commands
482 if (isCmd)
483 return;
484
485 //check the given subscription against black and white lists
486 if (!ShouldSubscribe(server, service))
487 return;
488
489 map<string, SubscriptionType> &list = fServiceSubscriptions[server];
490
491 if (list.find(service) != list.end())
492 {
493 Error("Service " + server + "/" + service + " is already in the dataLogger's list. ignoring its update.");
494 return;
495 }
496
497 list[service].dimInfo = shared_ptr<DimStampedInfo>(SubscribeToPlease(server, service));
498 list[service].server = server;
499 list[service].service = service;
500 fNumSubAndFitsData.numSubscriptions++;
501 if (fDebugIsOn)
502 Debug("Added subscription to " + server + "/" + service);
503}
504// --------------------------------------------------------------------------
505//
506//! Remove a given service subscription
507//! @param server the server for which the subscription should be removed
508//! @param service the service that should be removed
509//! @param isCmd whether or not this is a command
510//
511void DataLogger::RemoveService(const string& server, const string& service, bool isCmd)
512{
513 if (isCmd)
514 return;
515 if (fServiceSubscriptions[server].erase(service) != 1)
516 {
517 ostringstream str;
518 str << "Subscription " << server << "/" << service << "could not be removed as it is not present";
519 Error(str.str());
520 return;
521 }
522 fNumSubAndFitsData.numSubscriptions--;
523 if (fDebugIsOn)
524 {
525 Debug("Removed subscription to " + server + "/" + service);
526 }
527}
528// --------------------------------------------------------------------------
529//
530//! Remove all the services associated with a given server
531//! @param server the server for which all the services should be removed
532//
533void DataLogger::RemoveAllServices(const string& server)
534{
535 fNumSubAndFitsData.numSubscriptions -= fServiceSubscriptions[server].size();
536 fServiceSubscriptions[server].clear();
537 if (fDebugIsOn)
538 {
539 Debug("Removed all subscriptions to " + server + "/");
540 }
541}
542// --------------------------------------------------------------------------
543//
544//! Checks if the given ofstream is in error state and if so, close it
545//! @param out the ofstream that should be checked
546//
547void DataLogger::CheckForOfstreamError(ofstream& out)
548{
549 if (out.good())
550 return;
551
552 Error("An error occured while writing to a text file. Closing it");
553 if (out.is_open())
554 out.close();
555 SetCurrentState(kSM_WriteError);
556}
557// --------------------------------------------------------------------------
558//
559//! Checks the size on disk of a given size, and remembers it in the relevant member variable
560//! @param fileName the file for which the size on disk should be retrieved
561//! @param nightly whether this is a run or nightly file, so that its size is added to the correct member variable
562//
563bool DataLogger::RememberFileOrigSizePlease(string& fileName, bool nightly)
564{
565 //get the size of the file we're about to open
566 if (fFileSizesMap.find(fileName) != fFileSizesMap.end())
567 return false;
568
569 if (nightly)
570 fBaseSizeNightly += GetFileSize(fileName);
571 else
572 fBaseSizeRun += GetFileSize(fileName);
573 fFileSizesMap[fileName] = 0;
574 return true;
575}
576
577// --------------------------------------------------------------------------
578//
579//! Open a text file and checks for error code
580//! @param stream the ofstream for which the file should be opened
581//! @name the file name
582//
583bool DataLogger::OpenTextFilePlease(ofstream& stream, const string& name)
584{
585 Info("Opening: "+name);
586
587 errno = 0;
588 stream.open(name.c_str(), ios_base::out | ios_base::app);
589 if (!stream)
590 {
591 ostringstream str;
592 str << "Trying to open file " << name << ": " << strerror(errno) << " (errno=" << errno << ")";
593 Error(str);
594 return false;
595 }
596
597 return true;
598}
599
600// --------------------------------------------------------------------------
601//
602//! Create a new dim subscription to a given server and service
603//! @param server the server name
604//! @param service the service name
605//
606DimStampedInfo* DataLogger::SubscribeToPlease(const string& server, const string& service)
607{
608 if (fDebugIsOn)
609 {
610 ostringstream str;
611 str << "Subscribing to service " << server << "/" << service;
612 Debug(str);
613 }
614 return new DimStampedInfo((server + "/" + service).c_str(), const_cast<char*>(""), this);
615}
616// --------------------------------------------------------------------------
617//
618//! Check whether a service should be subscribed to, based on the black/white list entries
619//! @param server the server name associated with the service being checked
620//! @param service the service name associated with the service being checked
621//
622bool DataLogger::ShouldSubscribe(const string& server, const string& service)
623{
624 if (fWhiteList.size()>0 &&
625 (fWhiteList.find(server + "/") == fWhiteList.end()) &&
626 (fWhiteList.find(server + "/" + service) == fWhiteList.end()) &&
627 (fWhiteList.find("/" + service) == fWhiteList.end()))
628 return false;
629
630 if ((fBlackList.find(server + "/") != fBlackList.end()) ||
631 (fBlackList.find(server + "/" + service) != fBlackList.end()) ||
632 (fBlackList.find("/" + service) != fBlackList.end()))
633 return false;
634
635 return true;
636}
637// --------------------------------------------------------------------------
638//
639//! Compiles a file name
640//! @param path the base path where to put the file
641//! @param time the time at which the file is created
642//! @param service the service name, if any
643//! @param extension the extension to add, if any
644//
645string DataLogger::CompileFileName(const string &path, const string &service, const string & extension, const Time &time)
646{
647 ostringstream str;
648 //calculate time suitable for naming files.
649 const Time ftime(time-boost::posix_time::time_duration(12,0,0));
650
651 //output it
652 str << path << Time::fmt("/%Y/%m/%d") << ftime;
653
654 //check if target directory exist
655 if (!DoesPathExist(str.str()))
656 CreateDirectory(str.str());
657
658 //output base of file name
659 str << Time::fmt("/%Y_%m_%d") << ftime;
660
661 //output service name
662 if (!service.empty())
663 str << "_" << service;
664
665 //output appropriate extension
666 if (!extension.empty())
667 str << "." << extension;
668
669 return str.str();
670}
671// --------------------------------------------------------------------------
672//
673//! Compiles a file name
674//! @param path the base path where to put the file
675//! @param time the time at which the file is created
676//! @param run the run number
677//! @param service the service name, if any
678//! @param extension the extension to add, if any
679//
680string DataLogger::CompileFileName(const string &path, uint32_t run, const string &service, const string & extension, const Time &time)
681{
682 ostringstream str;
683 //calculate suitable time for naming files and output it
684 str << path << Time::fmt("/%Y/%m/%d") << (time-boost::posix_time::time_duration(12,0,0));
685
686 //check if target directory exist
687 if (!DoesPathExist(str.str()))
688 CreateDirectory(str.str());
689
690 //output base of file name
691 str << '/' << setfill('0') << setw(8) << run;
692
693 //output service name
694 if (!service.empty())
695 str << "_" << service;
696
697 //output appropriate extension
698 if (!extension.empty())
699 str << "." << extension;
700 return str.str();
701}
702
703// --------------------------------------------------------------------------
704//
705//!retrieves the size on disk of a file
706//! @param fileName the full file name for which the size on disk should be retrieved
707//! @return the size of the file on disk, in bytes. 0 if the file does not exist or if an error occured
708//
709off_t DataLogger::GetFileSize(const string& fileName)
710{
711 errno = 0;
712 struct stat st;
713 if (!stat(fileName.c_str(), &st))
714 return st.st_size;
715
716 if (errno != 0 && errno != 2)//ignoring error #2: no such file or directory is not an error for new files
717 {
718 ostringstream str;
719 str << "Unable to stat " << fileName << ". Reason: " << strerror(errno) << " [" << errno << "]";
720 Error(str);
721 }
722
723 return 0;
724}
725// --------------------------------------------------------------------------
726//
727//! Removes the oldest run number and closes the fits files that should be closed
728//! Also creates the fits grouping file
729//
730void DataLogger::RemoveOldestRunNumber()
731{
732 if (fDebugIsOn)
733 {
734 ostringstream str;
735 str << "Removing run number " << fRunNumber.front().runNumber;
736 Debug(str);
737 }
738 CreateFitsGrouping(fRunNumber.front().openedFits, fRunNumber.front().runNumber);
739
740 //crawl through the subscriptions to see if there are still corresponding fits files opened.
741 for (SubscriptionsListType::iterator x=fServiceSubscriptions.begin();
742 x!=fServiceSubscriptions.end(); x++)
743 for (map<string, SubscriptionType>::iterator y=x->second.begin();
744 y!=x->second.end(); y++)
745 if (y->second.runFile.fRunNumber == fRunNumber.front().runNumber && y->second.runFile.IsOpen())
746 {
747 y->second.runFile.Close();
748
749 Info("Closed: "+y->second.runFile.fFileName);
750 }
751 //if a grouping file is on, decrease the number of opened fits manually
752 if (fRunNumber.front().runFitsFile)
753 (fNumSubAndFitsData.numOpenFits)--;
754 //remove the entry
755 fRunNumber.pop_front();
756}
757
758// --------------------------------------------------------------------------
759//
760//! Calculate the total number of written bytes since the logger was started
761//! @param statVar the data structure that should be updated
762//! @param shouldWarn whether or not error messages should be outputted
763//! @param isPrinting whether this function was called from the PRINT command or not. If so, displays relevant information
764//
765bool DataLogger::calculateTotalSizeWritten(DataLoggerStats& statVar, bool isPrinting)
766{
767//mutex
768// if (!isPrinting)
769// fStatsMutex.lock();
770#ifdef HAVE_FITS
771 if (isPrinting)
772 {
773 ostringstream str;
774 str << "There are " << fNumSubAndFitsData.numOpenFits << " FITS files open:";
775 Message(str);
776 }
777
778 ///TODO the grouping file is dealt with several times. This should not be a problem but well, better to fix it I guess.
779 for (SubscriptionsListType::const_iterator x=fServiceSubscriptions.begin();
780 x!=fServiceSubscriptions.end(); x++)
781 {
782 for (map<string, SubscriptionType>::const_iterator y=x->second.begin();
783 y!=x->second.end(); y++)
784 {
785 if (y->second.runFile.IsOpen())
786 {
787 fFileSizesMap[y->second.runFile.fFileName] = y->second.runFile.GetWrittenSize();
788 if (isPrinting)
789 Message("-> "+y->second.runFile.fFileName);
790 }
791 if (y->second.nightlyFile.IsOpen())
792 {
793 fFileSizesMap[y->second.nightlyFile.fFileName] = y->second.nightlyFile.GetWrittenSize();
794 if (isPrinting)
795 Message("-> "+y->second.nightlyFile.fFileName);
796 }
797 }
798 }
799#else
800 if (isPrinting)
801 Message("FITS output disabled at compilation");
802#endif
803 //gather log and report files sizes on disk
804 if (fNightlyLogFile.is_open())
805 fFileSizesMap[fFullNightlyLogFileName] = GetFileSize(fFullNightlyLogFileName);
806 if (fNightlyReportFile.is_open())
807 fFileSizesMap[fFullNightlyReportFileName] = GetFileSize(fFullNightlyReportFileName);
808 for (list<RunNumberType>::iterator it = fRunNumber.begin(); it != fRunNumber.end(); it++)
809 {
810 if (it->reportFile->is_open())
811 fFileSizesMap[it->reportName] = GetFileSize(it->reportName);
812#ifdef RUN_LOGS
813 if (it->logFile->is_open())
814 fFileSizesMap[it->logName] = GetFileSize(it->logName);
815#endif
816 }
817
818 bool shouldWarn = false;
819 struct statvfs vfs;
820 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
821 statVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
822 else
823 {
824 ostringstream str;
825 str << "Unable to retrieve stats for " << fNightlyFilePath << ". Reason: " << strerror(errno) << " [" << errno << "]";
826 if (!shouldWarn)
827 Error(str);
828 statVar.freeSpace = -1;
829 }
830 //sum up all the file sizes. past and present
831 statVar.sizeWritten = 0;
832 for (map<string, long>::const_iterator it=fFileSizesMap.begin(); it != fFileSizesMap.end(); it++)
833 statVar.sizeWritten += it->second;
834 statVar.sizeWritten -= fBaseSizeNightly;
835 statVar.sizeWritten -= fBaseSizeRun;
836
837//mutex
838// if (!isPrinting)
839// fStatsMutex.unlock();
840
841 return shouldWarn;
842}
843
844// --------------------------------------------------------------------------
845//
846//! Monitor the number of opened files and total size written, and distributes this data through a Dim service
847//
848//
849/*
850void DataLogger::ServicesMonitoring()
851{
852 struct statvfs vfs;
853 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
854 fStatVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
855 else
856 fStatVar.freeSpace = -1;
857
858 DimDescribedService srvc ("DATA_LOGGER/STATS", "X:3", fStatVar, "Add description here");
859 fPreviousSize = 0;
860 //loop-wait for broadcast
861 while (fContinueMonitoring)
862 {
863 if (fStatsPeriodDuration == 0.0f)
864 {
865 sleep(0.1f);
866 continue;
867 }
868
869 sleep(fStatsPeriodDuration);
870
871
872 fStatsMutex.lock();
873
874 if (fStatVar.writingRate != 0) //if data has been written
875 {
876 srvc.updateService();
877
878 if(fDebugIsOn)
879 {
880 ostringstream str;
881 str << "Size written: " << fStatVar.sizeWritten/1000 << " kB; writing rate: ";
882 str << fStatVar.writingRate/1000 << " kB/s; free space: ";
883 str << fStatVar.freeSpace/(1000*1000) << " MB";
884 Debug(str);
885 }
886 }
887 fStatsMutex.unlock();
888 }
889}
890*/
891// --------------------------------------------------------------------------
892//
893//! Default constructor. The name of the machine is given DATA_LOGGER
894//! and the state is set to kSM_Ready at the end of the function.
895//
896//!Setup the allows states, configs and transitions for the data logger
897//
898DataLogger::DataLogger(ostream &out) : StateMachineDim(out, "DATA_LOGGER")
899{
900 //initialize member data
901 fNightlyFilePath = ".";
902 fRunFilePath = ".";
903
904 //Give a name to this machine's specific states
905 AddStateName(kSM_NightlyOpen, "NightlyFileOpen", "The summary files for the night are open.");
906 AddStateName(kSM_WaitingRun, "WaitForRun", "The summary files for the night are open and we wait for a run to be started.");
907 AddStateName(kSM_Logging, "Logging", "The summary files for the night and the files for a single run are open.");
908 AddStateName(kSM_BadNightlyConfig, "ErrNightlyFolder", "The folder for the nighly summary files is invalid.");
909 AddStateName(kSM_BadRunConfig, "ErrRunFolder", "The folder for the run files is invalid.");
910 AddStateName(kSM_WriteError, "ErrWrite", "An error occured while writing to a file.");
911
912 // Add the possible transitions for this machine
913 AddEvent(kSM_NightlyOpen, "START", kSM_Ready, kSM_BadNightlyConfig)
914 (boost::bind(&DataLogger::StartPlease, this))
915 ("Start the nightly logging. Nightly file location must be specified already");
916
917 AddEvent(kSM_Ready, "STOP", kSM_NightlyOpen, kSM_WaitingRun, kSM_Logging, kSM_WriteError)
918 (boost::bind(&DataLogger::GoToReadyPlease, this))
919 ("Stop all data logging, close all files.");
920
921 AddEvent(kSM_Logging, "START_RUN", kSM_WaitingRun, kSM_BadRunConfig)
922 (boost::bind(&DataLogger::StartRunPlease, this))
923 ("Start the run logging. Run file location must be specified already.");
924
925 AddEvent(kSM_WaitingRun, "STOP_RUN", kSM_Logging)
926 (boost::bind(&DataLogger::StopRunPlease, this))
927 ("Wait for a run to be started, open run-files as soon as a run number arrives.");
928
929 AddEvent(kSM_Ready, "RESET", kSM_Error, kSM_BadNightlyConfig, kSM_BadRunConfig, kSM_WriteError)
930 (boost::bind(&DataLogger::GoToReadyPlease, this))
931 ("Transition to exit error states. Closes the any open file.");
932
933 AddEvent(kSM_WaitingRun, "WAIT_FOR_RUN_NUMBER", kSM_NightlyOpen)
934 (boost::bind(&DataLogger::NightlyToWaitRunPlease, this))
935 ("Go to waiting for run number state. In this state with any received run-number a new file is opened.");
936
937 // Add the possible configurations for this machine
938 AddEvent("SET_NIGHTLY_FOLDER", "C", kSM_Ready, kSM_BadNightlyConfig)
939 (boost::bind(&DataLogger::ConfigureNightlyFileName, this, _1))
940 ("Configure the base folder for the nightly files."
941 "|Path[string]:Absolute or relative path name where the nightly files should be stored.");
942
943 AddEvent("SET_RUN_FOLDER", "C", kSM_Ready, kSM_BadNightlyConfig, kSM_NightlyOpen, kSM_WaitingRun, kSM_BadRunConfig)
944 (boost::bind(&DataLogger::ConfigureRunFileName, this, _1))
945 ("Configure the base folder for the run files."
946 "|Path[string]:Absolute or relative path name where the run files should be stored.");
947
948 AddEvent("SET_RUN_NUMBER", "X", kSM_Ready, kSM_NightlyOpen, kSM_WaitingRun, kSM_BadRunConfig, kSM_Logging)
949 (boost::bind(&DataLogger::ConfigureRunNumber, this, _1))
950 ("Configure the run number. Cannot be done in logging state");
951
952 // Provide a print command
953 AddEvent("PRINT")
954 (boost::bind(&DataLogger::PrintStatePlease, this, _1))
955 ("Print information about the internal status of the data logger.");
956
957 OpenFileToDim fToDim;
958 fToDim.code = 0;
959 fToDim.fileName[0] = '\0';
960
961 fOpenedNightlyFiles = new DimDescribedService(GetName() + "/FILENAME_NIGHTLY", "I:1;C", fToDim,
962 "Path and base name which is used to compile the filenames for the nightly files."
963 "|Type[int]:type of open files (1=log, 2=rep, 4=fits)"
964 "|Name[string]:path and base file name");
965
966 fOpenedRunFiles = new DimDescribedService(GetName() + "/FILENAME_RUN", "I:1;C", fToDim,
967 "Path and base name which is used to compile the filenames for the run files."
968 "|Type[int]:type of open files (1=log, 2=rep, 4=fits)"
969 "|Name[string]:path and base file name");
970
971 fNumSubAndFitsData.numSubscriptions = 0;
972 fNumSubAndFitsData.numOpenFits = 0;
973 fNumSubAndFits = new DimDescribedService(GetName() + "/NUM_SUBS", "I:2", fNumSubAndFitsData,
974 "Shows number of services to which the data logger is currently subscribed and the total number of open files."
975 "|Subscriptions[int]:number of dim services to which the data logger is currently subscribed."
976 "|NumOpenFiles[int]:number of files currently open by the data logger");
977
978 //services parameters
979 fDebugIsOn = false;
980 fStatsPeriodDuration = 1.0f;
981 fOpenedFilesIsOn = true;
982 fNumSubAndFitsIsOn = true;
983
984 // provide services control commands
985 AddEvent("SET_DEUG_MODE", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
986 (boost::bind(&DataLogger::SetDebugOnOff, this, _1))
987 ("Switch debug mode on or off. Debug mode prints ifnormation about every service written to a file."
988 "|Enable[bool]:Enable of disable debug mode (yes/no).");
989
990 AddEvent("SET_STATISTICS_UPDATE_INTERVAL", "F", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
991 (boost::bind(&DataLogger::SetStatsPeriod, this, _1))
992 ("Interval in which the data-logger statistics service (STATS) is updated."
993 "|Interval[s]:Floating point value in seconds.");
994
995 AddEvent("ENABLE_FILENAME_SERVICES", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
996 (boost::bind(&DataLogger::SetOpenedFilesOnOff ,this, _1))
997 ("Switch service which distributes information about the open files on or off."
998 "|Enable[bool]:Enable of disable filename services (yes/no).");
999
1000 AddEvent("ENABLE_NUMSUBS_SERVICE", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
1001 (boost::bind(&DataLogger::SetNumSubsAndFitsOnOff, this, _1))
1002 ("Switch the service which distributes information about the number of subscriptions and open files on or off."
1003 "|Enable[bool]:Enable of disable NUM_SUBS service (yes/no).");
1004
1005 AddEvent("SET_RUN_TIMEOUT", "L:1", kSM_Ready, kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun)
1006 (boost::bind(&DataLogger::SetRunTimeoutDelay, this, _1))
1007 ("Set the timeout delay for old run numbers."
1008 "|timeout[min]:Time out in minutes after which files for expired runs are closed.");
1009
1010 fDestructing = false;
1011
1012 //start the monitoring service
1013 fStatVar.sizeWritten = 0;
1014 fStatVar.freeSpace = 0;
1015 fStatVar.writingRate = 0;
1016 fPreviousStatsUpdateTime = Time().Mjd();
1017 fPreviousOldRunNumberCheck = Time().Mjd();
1018 fPreviousSize = 0;
1019
1020 struct statvfs vfs;
1021 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
1022 fStatVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
1023 else
1024 fStatVar.freeSpace = -1;
1025
1026 fStatsMonitoring = new DimDescribedService(GetName() + "/STATS", "X:3", fStatVar, "Add description here");
1027
1028//mutex
1029// fContinueMonitoring = true;
1030// fMonitoringThread = boost::thread(boost::bind(&DataLogger::ServicesMonitoring, this));
1031 fBaseSizeNightly = 0;
1032 fBaseSizeRun = 0;
1033 fDailyFileDayChangedAlready = true;
1034 fRunNumberTimeout = 1;
1035 if(fDebugIsOn)
1036 {
1037 Debug("DataLogger Init Done.");
1038 }
1039}
1040
1041// --------------------------------------------------------------------------
1042//
1043//! Destructor
1044//
1045DataLogger::~DataLogger()
1046{
1047 if (fDebugIsOn)
1048 {
1049 Debug("DataLogger destruction starts");
1050 }
1051 fDestructing = true;
1052 //first let's go to the ready state
1053 GoToReadyPlease();
1054 //release the services subscriptions
1055 fServiceSubscriptions.clear();
1056 //exit the monitoring loop
1057//mutex
1058// fContinueMonitoring = false;
1059// fMonitoringThread.join();
1060 //clear any remaining run number (should remain only one)
1061 while (fRunNumber.size() > 0)
1062 {
1063 RemoveOldestRunNumber();
1064 }
1065
1066 delete fOpenedNightlyFiles;
1067 delete fOpenedRunFiles;
1068 delete fNumSubAndFits;
1069
1070 if (fDebugIsOn)
1071 {
1072 Debug("DataLogger desctruction ends");
1073 }
1074}
1075// --------------------------------------------------------------------------
1076//
1077//! checks if the statistic service should be updated, and if so, do it
1078//
1079void DataLogger::UpdateStatisticsService()
1080{
1081 //update the fits files sizes
1082 const Time cTime = Time();
1083
1084 if ((fStatsPeriodDuration == 0) || ((cTime - fPreviousStatsUpdateTime).total_seconds() < fStatsPeriodDuration))
1085 return;
1086
1087 calculateTotalSizeWritten(fStatVar, false);
1088 fStatVar.writingRate = (fStatVar.sizeWritten - fPreviousSize)/((cTime - fPreviousStatsUpdateTime).total_seconds());
1089 fPreviousSize = fStatVar.sizeWritten;
1090 fPreviousStatsUpdateTime = cTime;
1091 //update the service. No need to check if data has been written, because some must have been, otherwise we would not have hit this piece of code
1092 fStatsMonitoring->updateService();
1093
1094 if(fDebugIsOn)
1095 {
1096 ostringstream str;
1097 str << "Size written: " << fStatVar.sizeWritten/1000 << " kB; writing rate: ";
1098 str << fStatVar.writingRate/1000 << " kB/s; free space: ";
1099 str << fStatVar.freeSpace/(1000*1000) << " MB";
1100 Debug(str);
1101 }
1102}
1103// --------------------------------------------------------------------------
1104//
1105//! checks if old run numbers should be trimmed and if so, do it
1106//
1107void DataLogger::TrimOldRunNumbers()
1108{
1109 const Time cTime = Time();
1110
1111 if ((cTime - fPreviousOldRunNumberCheck).total_seconds() < fRunNumberTimeout*60)
1112 return;
1113
1114 while (fRunNumber.size() > 1 && (cTime - fRunNumber.back().time) > boost::posix_time::minutes(fRunNumberTimeout))
1115 {
1116 RemoveOldestRunNumber();
1117 }
1118 fPreviousOldRunNumberCheck = cTime;
1119}
1120// --------------------------------------------------------------------------
1121//
1122//! Inherited from DimInfo. Handles all the Infos to which we subscribed, and log them
1123//
1124void DataLogger::infoHandler()
1125{
1126 // Make sure getTimestamp is called _before_ getTimestampMillisecs
1127 if (fDestructing)
1128 return;
1129
1130 DimInfo* I = getInfo();
1131
1132 if (I==NULL)
1133 return;
1134 //check if the service pointer corresponds to something that we subscribed to
1135 //this is a fix for a bug that provides bad Infos when a server starts
1136 bool found = false;
1137 SubscriptionsListType::iterator x;
1138 map<string, SubscriptionType>::iterator y;
1139 for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++)
1140 {//find current service is subscriptions
1141 for (y=x->second.begin(); y!=x->second.end();y++)
1142 if ((y->second.dimInfo).get() == I)
1143 {
1144 found = true;
1145 break;
1146 }
1147 if (found)
1148 break;
1149 }
1150 if (!found)
1151 {
1152 DimServiceInfoList::infoHandler();
1153 return;
1154 }
1155 if (I->getSize() <= 0)
1156 return;
1157
1158 // Make sure that getTimestampMillisecs is NEVER called before
1159 // getTimestamp is properly called
1160 // check that the message has been updated by something, i.e. must be different from its initial value
1161 if (I->getTimestamp() == 0)
1162 return;
1163
1164 // FIXME: Here we have to check if we have received the
1165 // service with the run-number.
1166 // CheckForRunNumber(I); has been removed because we have to
1167 // subscribe to this service anyway and hence we have the pointer
1168 // (no need to check for the name)
1169
1170 ReportPlease(I, y->second);
1171
1172 //update the fits files sizes
1173 UpdateStatisticsService();
1174
1175 //remove old run numbers
1176 TrimOldRunNumbers();
1177}
1178
1179bool DataLogger::OpenStream(shared_ptr<ofstream> stream, const string &filename)
1180{
1181 Info("Opening: "+filename);
1182
1183 if (stream->is_open())
1184 {
1185 ostringstream str;
1186 str << filename << " was already open when trying to open it.";
1187 Error(str);
1188 return false;
1189 }
1190
1191 errno = 0;
1192 stream->open(filename.c_str(), ios_base::out | ios_base::app);
1193 if (errno != 0)
1194 {
1195 ostringstream str;
1196 str << "Unable to open " << filename << ": " << strerror(errno) << " (errno=" << errno << ")";
1197 Error(str);
1198 return false;
1199 }
1200
1201 if (!stream->is_open())
1202 {
1203 ostringstream str;
1204 str << "File " << filename << " not open as it ought to be.";
1205 Error(str);
1206 return false;
1207 }
1208
1209 return true;
1210}
1211
1212// --------------------------------------------------------------------------
1213//
1214//! Open the text files associated with the given run number
1215//! @param run the run number to be dealt with
1216//
1217int DataLogger::OpenRunFile(RunNumberType& run)
1218{
1219#ifdef RUN_LOGS
1220 // open log file
1221 run.logName = CompileFileName(fRunFilePath, run.runNumber, "", "log");
1222 if (!OpenStream(run.logFile, run.logName))
1223 return -1;
1224#endif
1225
1226 // open report file
1227 run.reportName = CompileFileName(fRunFilePath, run.runNumber, "", "rep");
1228 if (!OpenStream(run.reportFile, run.reportName))
1229 return -1;
1230
1231 //get the size of the newly opened file.
1232#ifdef RUN_LOGS
1233 RememberFileOrigSizePlease(run.logName, false);
1234#endif
1235 RememberFileOrigSizePlease(run.reportName, false);
1236
1237 //TODO this notification scheme might be messed up now.... fix it !
1238 const string baseFileName = CompileFileName(fRunFilePath, run.runNumber, "", "");
1239 NotifyOpenedFile(baseFileName, 3, fOpenedRunFiles);
1240 run.openedFits.clear();
1241 return 0;
1242}
1243// --------------------------------------------------------------------------
1244//
1245//! Add a new active run number
1246//! @param newRun the new run number
1247//! @param time the time at which the new run number was issued
1248//
1249void DataLogger::AddNewRunNumber(int64_t newRun, Time time)
1250{
1251
1252 if (newRun > 0xffffffff)
1253 {
1254 Error("New run number too large, out of range. Ignoring.");
1255 return;
1256 }
1257 if (fDebugIsOn)
1258 {
1259 ostringstream str;
1260 str << "Adding new run number " << newRun << " that was issued on " << time;
1261 Debug(str);
1262 }
1263 //Add new run number to run number list
1264 fRunNumber.push_back(RunNumberType());
1265 fRunNumber.back().runNumber = uint32_t(newRun);
1266 fRunNumber.back().time = time;
1267
1268 ostringstream str;
1269 str << "The new run number is: " << fRunNumber.back().runNumber;
1270 Message(str);
1271
1272 if (GetCurrentState() != kSM_Logging)
1273 return;
1274 //open the log and report files
1275 OpenRunFile(fRunNumber.back());
1276}
1277// --------------------------------------------------------------------------
1278//
1279//! Checks whether or not the current info is a run number.
1280//! If so, then remember it. A run number is required to open the run-log file
1281//! @param I
1282//! the current DimInfo
1283//
1284void DataLogger::CheckForRunNumber(DimInfo* I)
1285{
1286 if (strstr(I->getName(), "SET_RUN_NUMBER") != NULL)
1287 {//assumes that the run number is an integer
1288 //check if some run number entries can be deleted leave one so that two remain after adding the new one
1289 AddNewRunNumber(I->getLonglong(), Time(I->getTimestamp(), I->getTimestampMillisecs()*1000));
1290 }
1291}
1292
1293// --------------------------------------------------------------------------
1294//
1295//! write infos to log files.
1296//! @param I
1297//! The current DimInfo
1298//! @param sub
1299//! The dataLogger's subscription corresponding to this DimInfo
1300//
1301void DataLogger::ReportPlease(DimInfo* I, SubscriptionType& sub)
1302{
1303
1304
1305 //should we log or report this info ? (i.e. is it a message ?)
1306 bool isItaReport = ((strstr(I->getName(), "Message") == NULL) && (strstr(I->getName(), "MESSAGE") == NULL));
1307 if (I->getFormat()[0] == 'C')
1308 isItaReport = false;
1309
1310 if (!fNightlyReportFile.is_open())
1311 return;
1312
1313 //Check whether we should close and reopen daily text files or not
1314 //This should work in any case base of the following:
1315 // - fDailyFileDayChangedAlready is initialized to true. So if the dataLogger is started around noon, no file will be closed
1316 // - fDailyFileDayChangedAlready is set to false if (time != 12), so the file will be closed and reopened only if the logger runs since before noon (which is the required behavior)
1317 //This only applies to text files. Fits are closed and reopened based on the last and current service received time.
1318 //this was not applicable to text files, because as they gather several services, we have no guarantee that the received time will be greater than the previous one,
1319 //which could lead to several close/reopen instead of only one.
1320 if (Time().h() == 12 && !fDailyFileDayChangedAlready)
1321 {
1322 if (fDebugIsOn)
1323 Debug("Its Noon! Closing and reopening daily text files");
1324
1325 fNightlyLogFile.close();
1326 fNightlyReportFile.close();
1327
1328 Info("Closed: "+fFullNightlyLogFileName);
1329 Info("Closed: "+fFullNightlyReportFileName);
1330
1331 fFullNightlyLogFileName = CompileFileName(fNightlyFilePath, "", "log");
1332 OpenTextFilePlease(fNightlyLogFile, fFullNightlyLogFileName);
1333 //FIXME: Handle return code properly!
1334
1335 fFullNightlyReportFileName = CompileFileName(fNightlyFilePath, "", "rep");
1336 OpenTextFilePlease(fNightlyReportFile, fFullNightlyReportFileName);
1337 //FIXME: Handle return code properly!
1338
1339 fDailyFileDayChangedAlready = true;
1340 }
1341 if (Time().h() != 12 && fDailyFileDayChangedAlready)
1342 fDailyFileDayChangedAlready = false;
1343
1344 //create the converter for that service
1345 if ((!sub.fConv.get()) && isItaReport)
1346 {
1347 //trick the converter in case of 'C'. why do I do this ? well simple: the converter checks that the right number
1348 //of bytes was written. because I skip 'C' with fits, the bytes will not be allocated, hence the "size copied ckeck"
1349 //of the converter will fail, hence throwing an exception.
1350 string fakeFormat(I->getFormat());
1351 if (fakeFormat[fakeFormat.size()-1] == 'C')
1352 fakeFormat = fakeFormat.substr(0, fakeFormat.size()-1);
1353 sub.fConv = shared_ptr<Converter>(new Converter(Out(), I->getFormat()));
1354 if (!sub.fConv)
1355 {
1356 ostringstream str;
1357 str << "Couldn't properly parse the format... service " << sub.dimInfo->getName() << " ignored.";
1358 Error(str);
1359 return;
1360 }
1361 }
1362 //construct the header
1363 ostringstream header;
1364 const Time cTime(I->getTimestamp(), I->getTimestampMillisecs()*1000);
1365 fQuality = I->getQuality();
1366 fMjD = cTime.Mjd();
1367
1368 //figure out which run file should be used
1369 ofstream* targetRunFile = NULL;
1370 RunNumberType* cRunNumber = NULL;
1371 if (GetCurrentState() == kSM_Logging)
1372 {
1373 list<RunNumberType>::reverse_iterator rit;
1374 for (rit=fRunNumber.rbegin(); rit!=fRunNumber.rend(); rit++)
1375 {
1376 if (rit->time < cTime) //this is the run number that we want to use
1377 {
1378 //Find something better to convert iterator to pointer than the ugly line below....
1379 cRunNumber = &(*rit);
1380 sub.runNumber = rit->runNumber;
1381#ifdef RUN_LOGS
1382 targetRunFile = isItaReport ? (rit->reportFile).get() : (rit->logFile).get();
1383#else
1384 targetRunFile = isItaReport ? (rit->reportFile).get() : NULL;
1385#endif
1386 break;
1387 }
1388 }
1389 if (rit == fRunNumber.rend() && fRunNumber.size() != 0)
1390 {
1391 ostringstream str;
1392 str << "Could not find an appropriate run number for info coming at time: " << cTime;
1393 Error(str);
1394 Error("Active run numbers: ");
1395 for (rit=fRunNumber.rbegin(); rit != fRunNumber.rend(); rit++)
1396 {
1397 str.str("");
1398 str << rit->runNumber;
1399 Error(str);
1400 }
1401
1402 }
1403 }
1404
1405 if (isItaReport)
1406 {
1407 //write text header
1408 header << I->getName() << " " << fQuality << " ";
1409 header << cTime.Y() << " " << cTime.M() << " " << cTime.D() << " ";
1410 header << cTime.h() << " " << cTime.m() << " " << cTime.s() << " ";
1411 header << cTime.ms() << " " << I->getTimestamp() << " ";
1412
1413 string text;
1414 try
1415 {
1416 text = sub.fConv->GetString(I->getData(), I->getSize());
1417 }
1418 catch (const runtime_error &e)
1419 {
1420 ostringstream str;
1421 str << "Parsing service " << sub.dimInfo->getName();
1422 str << " failed: " << e.what();
1423 Error(str);
1424 return;
1425 }
1426
1427 if (text.empty())
1428 {
1429 ostringstream str;
1430 str << "Service " << sub.dimInfo->getName() << " sent an empty string";
1431 Info(str);
1432 return;
1433 }
1434 //replace bizarre characters by white space
1435 replace(text.begin(), text.end(), '\n', '\\');
1436 replace_if(text.begin(), text.end(), ptr_fun<int, int>(&iscntrl), ' ');
1437
1438 //write entry to Nightly report
1439 if (fNightlyReportFile.is_open())
1440 {
1441 fNightlyReportFile << header.str() << text << endl;
1442 CheckForOfstreamError(fNightlyReportFile);
1443 }
1444 //write entry to run-report
1445 if (targetRunFile && targetRunFile->is_open())
1446 {
1447 *targetRunFile << header.str() << text << endl;
1448 CheckForOfstreamError(*targetRunFile);
1449 }
1450 }
1451 else
1452 {//write entry to both Nightly and run logs
1453 ostringstream msg;
1454 msg << I->getName() << ": " << I->getString();
1455
1456 if (fNightlyLogFile.is_open())
1457 {
1458 MessageImp(fNightlyLogFile).Write(cTime, msg.str().c_str(), fQuality);
1459 CheckForOfstreamError(fNightlyLogFile);
1460 }
1461 if (targetRunFile && targetRunFile->is_open())
1462 {
1463 MessageImp(*targetRunFile).Write(cTime, msg.str().c_str(), fQuality);
1464 CheckForOfstreamError(*targetRunFile);
1465 }
1466 }
1467
1468#ifdef HAVE_FITS
1469 if (isItaReport)
1470 {
1471 //check if the last received event was before noon and if current one is after noon.
1472 //if so, close the file so that it gets reopened.
1473 if (sub.nightlyFile.IsOpen())
1474 if ((sub.lastReceivedEvent != Time::None) && (sub.lastReceivedEvent.h() < 12) && (cTime.h() >= 12))
1475 {
1476 sub.nightlyFile.Close();
1477 }
1478 sub.lastReceivedEvent = cTime;
1479 if (!sub.nightlyFile.IsOpen() || !sub.runFile.IsOpen() || sub.runNumber != sub.runFile.fRunNumber)
1480 OpenFITSFilesPlease(sub, cRunNumber);
1481 WriteToFITS(sub);
1482 }
1483#endif
1484
1485}
1486
1487// --------------------------------------------------------------------------
1488//
1489//! print the dataLogger's current state. invoked by the PRINT command
1490//! @param evt
1491//! the current event. Not used by the method
1492//! @returns
1493//! the new state. Which, in that case, is the current state
1494//!
1495int DataLogger::PrintStatePlease(const Event& )
1496{
1497 Message("------------------------------------------");
1498 Message("------- DATA LOGGER CURRENT STATE --------");
1499 Message("------------------------------------------");
1500
1501 //print the path configuration
1502 Message("Nightly Path: " + boost::filesystem::system_complete(boost::filesystem::path(fNightlyFilePath)).directory_string());
1503 Message("Run Path: " + boost::filesystem::system_complete(boost::filesystem::path(fRunFilePath)).directory_string());
1504
1505 //print active run numbers
1506 ostringstream str;
1507 str << "Active Run Numbers:";
1508 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it!=fRunNumber.end(); it++)
1509 str << " " << it->runNumber;
1510 if (fRunNumber.size()==0)
1511 str << " <none>";
1512 Message(str);
1513 //timeout value
1514 str.str("");
1515 str << "Timeout delay for old run numbers: " << fRunNumberTimeout << " minute(s)";
1516 Message(str);
1517
1518 //print all the open files.
1519 Message("------------ OPENED FILES ----------------");
1520 if (fNightlyLogFile.is_open())
1521 Message("Nightly log-file: OPEN");
1522
1523 if (fNightlyReportFile.is_open())
1524 Message("Nightly report-file: OPEN");
1525
1526 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it!=fRunNumber.end(); it++)
1527 {
1528#ifdef RUN_LOGS
1529 if (it->logFile->is_open())
1530 Message("Run log-file: " + it->logName + " (OPEN)");
1531#endif
1532 if (it->reportFile->is_open())
1533 Message("Run report-file: " + it->reportName + " (OPEN)");
1534 }
1535
1536 DataLoggerStats statVar;
1537 /*const bool statWarning =*/ calculateTotalSizeWritten(statVar, true);
1538
1539 Message("----------------- STATS ------------------");
1540 str.str("");
1541 str << "Total Size written: " << statVar.sizeWritten << " bytes.";
1542 Message(str);
1543 str.str("");
1544 str << "Disk free space: " << statVar.freeSpace << " bytes.";
1545 Message(str);
1546 str.str("");
1547 str << "Statistics are updated every " << fStatsPeriodDuration << " seconds";
1548 if (fStatsPeriodDuration != 0)
1549 Message(str);
1550 else
1551 Message("Statistics updates are currently disabled");
1552
1553 Message("------------ DIM SUBSCRIPTIONS -----------");
1554 str.str("");
1555 str << "There are " << fNumSubAndFitsData.numSubscriptions << " active DIM subscriptions.";
1556 Message(str);
1557 for (map<const string, map<string, SubscriptionType> >::const_iterator it=fServiceSubscriptions.begin(); it!= fServiceSubscriptions.end();it++)
1558 {
1559 Message("Server "+it->first);
1560 for (map<string, SubscriptionType>::const_iterator it2=it->second.begin(); it2!=it->second.end(); it2++)
1561 Message(" -> "+it2->first);
1562 }
1563 Message("--------------- BLOCK LIST ---------------");
1564 for (set<string>::const_iterator it=fBlackList.begin(); it != fBlackList.end(); it++)
1565 Message(" -> "+*it);
1566 if (fBlackList.size()==0)
1567 Message(" <empty>");
1568
1569 Message("--------------- ALLOW LIST ---------------");
1570 for (set<string>::const_iterator it=fWhiteList.begin(); it != fWhiteList.end(); it++)
1571 Message(" -> "+*it);
1572 if (fWhiteList.size()==0)
1573 Message(" <empty>");
1574
1575 Message("-------------- GROUPING LIST -------------");
1576 Message("The following servers and/or services will");
1577 Message("be grouped into a single fits file:");
1578 for (set<string>::const_iterator it=fGrouping.begin(); it != fGrouping.end(); it++)
1579 Message(" -> "+*it);
1580 if (fGrouping.size()==0)
1581 Message(" <no grouping>");
1582
1583 Message("------------------------------------------");
1584 Message("-------- END OF DATA LOGGER STATE --------");
1585 Message("------------------------------------------");
1586
1587 return GetCurrentState();
1588}
1589
1590// --------------------------------------------------------------------------
1591//
1592//! turn debug mode on and off
1593//! @param evt
1594//! the current event. contains the instruction string: On, Off, on, off, ON, OFF, 0 or 1
1595//! @returns
1596//! the new state. Which, in that case, is the current state
1597//!
1598int DataLogger::SetDebugOnOff(const Event& evt)
1599{
1600 const bool backupDebug = fDebugIsOn;
1601
1602 fDebugIsOn = evt.GetBool();
1603
1604 if (fDebugIsOn == backupDebug)
1605 Message("Debug mode was already in the requested state.");
1606
1607 ostringstream str;
1608 str << "Debug mode is now " << fDebugIsOn;
1609 Message(str);
1610
1611 return GetCurrentState();
1612}
1613// --------------------------------------------------------------------------
1614//
1615//! set the statistics update period duration. 0 disables the statistics
1616//! @param evt
1617//! the current event. contains the new duration.
1618//! @returns
1619//! the new state. Which, in that case, is the current state
1620//!
1621int DataLogger::SetStatsPeriod(const Event& evt)
1622{
1623 const float backupDuration = fStatsPeriodDuration;
1624
1625 fStatsPeriodDuration = evt.GetFloat();
1626
1627 if (fStatsPeriodDuration < 0)
1628 {
1629 Error("Statistics period duration should be greater than zero. Discarding provided value.");
1630 fStatsPeriodDuration = backupDuration;
1631 return GetCurrentState();
1632 }
1633 if (!finite(fStatsPeriodDuration))// != fStatsPeriodDuration)
1634 {
1635 Error("Provided duration does not appear to be a valid float. Discarding it.");
1636 fStatsPeriodDuration = backupDuration;
1637 return GetCurrentState();
1638 }
1639 if (backupDuration == fStatsPeriodDuration)
1640 Warn("Statistics period not modified. Supplied value already in use.");
1641
1642 if (fStatsPeriodDuration == 0.0f)
1643 Message("Statistics are now OFF");
1644 else
1645 {
1646 ostringstream str;
1647 str << "Statistics period is now " << fStatsPeriodDuration << " seconds";
1648 Message(str);
1649 }
1650
1651 return GetCurrentState();
1652}
1653// --------------------------------------------------------------------------
1654//
1655//! set the opened files service on or off.
1656//! @param evt
1657//! the current event. contains the instruction string. similar to setdebugonoff
1658//! @returns
1659//! the new state. Which, in that case, is the current state
1660//!
1661int DataLogger::SetOpenedFilesOnOff(const Event& evt)
1662{
1663 const bool backupOpened = fOpenedFilesIsOn;
1664
1665 fOpenedFilesIsOn = evt.GetBool();
1666
1667 if (fOpenedFilesIsOn == backupOpened)
1668 Message("Opened files service mode was already in the requested state.");
1669
1670 ostringstream str;
1671 str << "Opened files service mode is now " << fOpenedFilesIsOn;
1672 Message(str);
1673
1674 return GetCurrentState();
1675}
1676
1677// --------------------------------------------------------------------------
1678//
1679//! set the number of subscriptions and opened fits on and off
1680//! @param evt
1681//! the current event. contains the instruction string. similar to setdebugonoff
1682//! @returns
1683//! the new state. Which, in that case, is the current state
1684//!
1685int DataLogger::SetNumSubsAndFitsOnOff(const Event& evt)
1686{
1687 const bool backupSubs = fNumSubAndFitsIsOn;
1688
1689 fNumSubAndFitsIsOn = evt.GetBool();
1690
1691 if (fNumSubAndFitsIsOn == backupSubs)
1692 Message("Number of subscriptions service mode was already in the requested state");
1693
1694 ostringstream str;
1695 str << "Number of subscriptions service mode is now " << fNumSubAndFitsIsOn;
1696 Message(str);
1697
1698 return GetCurrentState();
1699}
1700// --------------------------------------------------------------------------
1701//
1702//! set the timeout delay for old run numbers
1703//! @param evt
1704//! the current event. contains the timeout delay long value
1705//! @returns
1706//! the new state. Which, in that case, is the current state
1707//!
1708int DataLogger::SetRunTimeoutDelay(const Event& evt)
1709{
1710 const long backupTimeout = fRunNumberTimeout;
1711 fRunNumberTimeout = evt.GetXtra();
1712
1713 if (fRunNumberTimeout == 0)
1714 {
1715 fRunNumberTimeout = backupTimeout;
1716 Error("Timeout delays for old run numbers must be greater than 0. Ignored.");
1717 return GetCurrentState();
1718 }
1719
1720 if (fRunNumberTimeout == backupTimeout)
1721 Message("New timeout for old run numbers is same value as previous one.");
1722
1723 ostringstream str;
1724 str << "Timeout delay for old run numbers is now " << fRunNumberTimeout;
1725 Message(str);
1726
1727 return GetCurrentState();
1728}
1729
1730int DataLogger::ConfigureFileName(string &target, const string &type, const EventImp &evt)
1731{
1732 if (!evt.GetText())
1733 {
1734 Error("Empty "+type+" folder given. Please specify a valid path.");
1735 return GetCurrentState();
1736 }
1737
1738 const string givenPath = evt.GetText();
1739 if (!DoesPathExist(givenPath))
1740 {
1741 Error("Provided "+type+" path '"+givenPath+"' is not a valid folder. Ignored");
1742 return GetCurrentState();
1743 }
1744
1745 Message("New "+type+" folder: "+givenPath);
1746
1747 target = givenPath;
1748
1749 return GetCurrentState();
1750}
1751
1752// --------------------------------------------------------------------------
1753//
1754//! Sets the path to use for the Nightly log file.
1755//! @param evt
1756//! the event transporting the path
1757//! @returns
1758//! currently only the current state.
1759//
1760int DataLogger::ConfigureNightlyFileName(const Event& evt)
1761{
1762 return ConfigureFileName(fNightlyFilePath, "nightly", evt);
1763}
1764
1765// --------------------------------------------------------------------------
1766//
1767//! Sets the path to use for the run log file.
1768//! @param evt
1769//! the event transporting the path
1770//! @returns
1771//! currently only the current state
1772int DataLogger::ConfigureRunFileName(const Event& evt)
1773{
1774 return ConfigureFileName(fRunFilePath, "run", evt);
1775}
1776
1777// --------------------------------------------------------------------------
1778//
1779//! Sets the run number.
1780//! @param evt
1781//! the event transporting the run number
1782//! @returns
1783//! currently only the current state
1784int DataLogger::ConfigureRunNumber(const Event& evt)
1785{
1786 AddNewRunNumber(evt.GetXtra(), evt.GetTime());
1787 return GetCurrentState();
1788}
1789// --------------------------------------------------------------------------
1790//
1791//! Notifies the DIM service that a particular file was opened
1792//! @ param name the base name of the opened file, i.e. without path nor extension.
1793//! WARNING: use string instead of string& because I pass values that do not convert to string&.
1794//! this is not a problem though because file are not opened so often.
1795//! @ param type the type of the opened file. 0 = none open, 1 = log, 2 = text, 4 = fits
1796inline void DataLogger::NotifyOpenedFile(const string &name, int type, DimDescribedService* service)
1797{
1798 if (!fOpenedFilesIsOn)
1799 return;
1800
1801 if (fDebugIsOn)
1802 {
1803 ostringstream str;
1804 str << "Updating " << service->getName() << " file '" << name << "' (type=" << type << ")";
1805 Debug(str);
1806
1807 str.str("");
1808 str << "Num subs: " << fNumSubAndFitsData.numSubscriptions << " Num open FITS: " << fNumSubAndFitsData.numOpenFits;
1809 Debug(str);
1810 }
1811
1812 if (name.size()+1 > FILENAME_MAX)
1813 {
1814 Error("Provided file name '" + name + "' is longer than allowed file name length.");
1815 return;
1816 }
1817
1818 OpenFileToDim fToDim;
1819 fToDim.code = type;
1820 memcpy(fToDim.fileName, name.c_str(), name.size()+1);
1821
1822 service->setData(reinterpret_cast<void*>(&fToDim), name.size()+1+sizeof(int));
1823 service->setQuality(0);
1824 service->updateService();
1825}
1826// --------------------------------------------------------------------------
1827//
1828//! Implements the Start transition.
1829//! Concatenates the given path for the Nightly file and the filename itself (based on the day),
1830//! and tries to open it.
1831//! @returns
1832//! kSM_NightlyOpen if success, kSM_BadNightlyConfig if failure
1833int DataLogger::StartPlease()
1834{
1835 if (fDebugIsOn)
1836 {
1837 Debug("Starting...");
1838 }
1839 fFullNightlyLogFileName = CompileFileName(fNightlyFilePath, "", "log");
1840 if (!OpenTextFilePlease(fNightlyLogFile, fFullNightlyLogFileName))
1841 return kSM_BadNightlyConfig;
1842
1843
1844 fFullNightlyReportFileName = CompileFileName(fNightlyFilePath, "", "rep");
1845 if (!OpenTextFilePlease(fNightlyReportFile, fFullNightlyReportFileName))
1846 return kSM_BadNightlyConfig;
1847
1848 //get the size of the newly opened file.
1849 fBaseSizeNightly = GetFileSize(fFullNightlyLogFileName);
1850 fBaseSizeNightly += GetFileSize(fFullNightlyReportFileName);
1851 fFileSizesMap.clear();
1852 fBaseSizeRun = 0;
1853 fPreviousSize = 0;
1854
1855 //notify that a new file has been opened.
1856 const string baseFileName = CompileFileName(fNightlyFilePath, "", "");
1857 NotifyOpenedFile(baseFileName, 3, fOpenedNightlyFiles);
1858
1859 fOpenedNightlyFits.clear();
1860
1861 return kSM_NightlyOpen;
1862}
1863
1864#ifdef HAVE_FITS
1865// --------------------------------------------------------------------------
1866//
1867//! open if required a the FITS files corresponding to a given subscription
1868//! @param sub
1869//! the current DimInfo subscription being examined
1870void DataLogger::OpenFITSFilesPlease(SubscriptionType& sub, RunNumberType* cRunNumber)
1871{
1872 string serviceName(sub.dimInfo->getName());
1873
1874 //if run number has changed, reopen a new fits file with the correct run number.
1875 if (sub.runFile.IsOpen() && sub.runFile.fRunNumber != sub.runNumber)
1876 {
1877 sub.runFile.Close();
1878 Info("Closed: "+sub.runFile.fFileName+" (new run number)");
1879 }
1880
1881 //we must check if we should group this service subscription to a single fits file before we replace the / by _
1882 bool hasGrouping = false;
1883 if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging))
1884 {//will we find this service in the grouping list ?
1885 for (set<string>::const_iterator it=fGrouping.begin(); it!=fGrouping.end(); it++)
1886 {
1887 if (serviceName.find(*it) != string::npos)
1888 {
1889 hasGrouping = true;
1890 break;
1891 }
1892 }
1893 }
1894 hasGrouping = true;
1895 for (unsigned int i=0;i<serviceName.size(); i++)
1896 {
1897 if (serviceName[i] == '/')
1898 {
1899 serviceName[i] = '_';
1900 break;
1901 }
1902 }
1903 //we open the NightlyFile anyway, otherwise this function shouldn't have been called.
1904 if (!sub.nightlyFile.IsOpen())
1905 {
1906 string partialName = CompileFileName(fNightlyFilePath, serviceName, "fits");
1907
1908 const string fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1909 if (!sub.fitsBufferAllocated)
1910 AllocateFITSBuffers(sub);
1911 //get the size of the file we're about to open
1912 if (RememberFileOrigSizePlease(partialName, true))//and remember that the file was opened (i.e. not an update)
1913 fOpenedNightlyFits[fileNameOnly].push_back(serviceName);
1914
1915 ostringstream str;
1916 str << "Opening: " << partialName << " (Nfits=" << fNumSubAndFitsData.numOpenFits << ")";
1917 Info(str);
1918
1919 if (!sub.nightlyFile.Open(partialName, serviceName, NULL, &fNumSubAndFitsData.numOpenFits, this, 0))
1920 {
1921 SetCurrentState(kSM_WriteError);
1922 return;
1923 }
1924 //notify the opening
1925 const string baseFileName = CompileFileName(fNightlyFilePath, "", "");
1926 NotifyOpenedFile(baseFileName, 7, fOpenedNightlyFiles);
1927 if (fNumSubAndFitsIsOn)
1928 fNumSubAndFits->updateService();
1929 }
1930 //do the actual file open
1931 if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging) && sub.runNumber != 0)
1932 {//buffer for the run file have already been allocated when doing the Nightly file
1933 string fileNameOnly;
1934 string partialName;
1935 if (hasGrouping)
1936 {
1937 partialName = CompileFileName(fRunFilePath, sub.runNumber, "", "fits");
1938 fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1939 }
1940 else
1941 {
1942 partialName = CompileFileName(fRunFilePath, sub.runNumber, serviceName, "fits");
1943 fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1944 }
1945 //get the size of the file we're about to open
1946 if (RememberFileOrigSizePlease(partialName, false))//and remember that the file was opened (i.e. not an update)
1947 cRunNumber->openedFits[fileNameOnly].push_back(serviceName);
1948 else
1949 if (hasGrouping)
1950 {
1951 cRunNumber->addServiceToOpenedFits(fileNameOnly, serviceName);
1952 }
1953
1954 if (hasGrouping && (!cRunNumber->runFitsFile.get()))
1955 try
1956 {
1957 cRunNumber->runFitsFile = shared_ptr<CCfits::FITS>(new CCfits::FITS(partialName, CCfits::RWmode::Write));
1958 (fNumSubAndFitsData.numOpenFits)++;
1959 }
1960 catch (CCfits::FitsException e)
1961 {
1962 ostringstream str;
1963 str << "Open FITS file " << partialName << ": " << e.message();
1964 Error(str);
1965 cRunNumber->runFitsFile = shared_ptr<CCfits::FITS>();//NULL;
1966 }
1967
1968 const string baseFileName = CompileFileName(fRunFilePath, sub.runNumber, "", "");
1969 NotifyOpenedFile(baseFileName, 7, fOpenedRunFiles);// + '_' + serviceName, 4);
1970
1971 ostringstream str;
1972 str << "Opening: " << partialName << " (Nfits=" << fNumSubAndFitsData.numOpenFits << ")";
1973 Info(str);
1974
1975 if (hasGrouping)
1976 {
1977 if (!sub.runFile.Open(partialName, serviceName, (cRunNumber->runFitsFile).get(), &fNumSubAndFitsData.numOpenFits, this, sub.runNumber))
1978 {
1979 SetCurrentState(kSM_WriteError);
1980 return;
1981 }
1982 }
1983 else
1984 {
1985 if (sub.runFile.Open(partialName, serviceName, NULL, &fNumSubAndFitsData.numOpenFits, this, sub.runNumber))
1986 {
1987 SetCurrentState(kSM_WriteError);
1988 return;
1989 }
1990 }
1991 if (fNumSubAndFitsIsOn)
1992 fNumSubAndFits->updateService();
1993 }
1994}
1995// --------------------------------------------------------------------------
1996//
1997//! Allocates the required memory for a given pair of fits files (nightly and run)
1998//! @param sub the subscription of interest.
1999//
2000void DataLogger::AllocateFITSBuffers(SubscriptionType& sub)
2001{
2002 //Init the time columns of the file
2003 Description dateDesc(string("Time"), string("Modified Julian Date"), string("MjD"));
2004 sub.nightlyFile.AddStandardColumn(dateDesc, "1D", &fMjD, sizeof(double));
2005 sub.runFile.AddStandardColumn(dateDesc, "1D", &fMjD, sizeof(double));
2006
2007 Description QoSDesc("Qos", "Quality of service", "None");
2008 sub.nightlyFile.AddStandardColumn(QoSDesc, "1J", &fQuality, sizeof(int));
2009 sub.runFile.AddStandardColumn(QoSDesc, "1J", &fQuality, sizeof(int));
2010
2011 const Converter::FormatList flist = sub.fConv->GetList();
2012 // Compilation failed
2013 if (!sub.fConv->valid())
2014 {
2015 Error("Compilation of format string failed.");
2016 return;
2017 }
2018
2019 //we've got a nice structure describing the format of this service's messages.
2020 //Let's create the appropriate FITS columns
2021 int size = sub.dimInfo->getSize();
2022
2023 vector<string> dataFormatsLocal;
2024 for (unsigned int i=0;i<flist.size()-1;i++)
2025 {
2026 ostringstream dataQualifier;
2027
2028 dataQualifier << flist[i].second.first;
2029 switch (flist[i].first.first->name()[0])
2030 {
2031 case 'c':
2032 case 'C':
2033 dataQualifier.str("S");
2034 break;
2035 case 's':
2036 dataQualifier << "I";
2037 break;
2038 case 'i':
2039 case 'I':
2040 dataQualifier << "J";
2041 break;
2042 case 'l':
2043 case 'L':
2044 dataQualifier << "J";
2045 break;
2046 case 'f':
2047 case 'F':
2048 dataQualifier << "E";
2049 break;
2050 case 'd':
2051 case 'D':
2052 dataQualifier << "D";
2053 break;
2054 case 'x':
2055 case 'X':
2056 dataQualifier << "K";
2057 break;
2058 case 'S':
2059 size--;
2060 //for strings, the number of elements I get is wrong. Correct it
2061 dataQualifier.str(""); //clear
2062 dataQualifier << size << "A";
2063 break;
2064
2065 default:
2066 Fatal("THIS SHOULD NEVER BE REACHED. dataLogger.cc ln 1198.");
2067 };
2068 //we skip the variable length strings for now (in fits only)
2069 if (dataQualifier.str() != "S")
2070 dataFormatsLocal.push_back(dataQualifier.str());
2071 }
2072 sub.nightlyFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), size);
2073 sub.runFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), size);
2074 sub.fitsBufferAllocated = true;
2075}
2076// --------------------------------------------------------------------------
2077//
2078//! write a dimInfo data to its corresponding FITS files
2079//
2080void DataLogger::WriteToFITS(SubscriptionType& sub)
2081{
2082 //nightly File status (open or not) already checked
2083 if (sub.nightlyFile.IsOpen())
2084 {
2085 if (!sub.nightlyFile.Write(sub.fConv.get()))
2086 SetCurrentState(kSM_WriteError);
2087 }
2088
2089 if (sub.runFile.IsOpen())
2090 {
2091 if (!sub.runFile.Write(sub.fConv.get()))
2092 SetCurrentState(kSM_WriteError);
2093 }
2094}
2095#endif //if has_fits
2096
2097std::string DataLogger::SetCurrentState(int state, const char *txt, const std::string &cmd)
2098{
2099 if (state == kSM_WriteError && GetCurrentState() == kSM_WriteError)
2100 return "";
2101 return StateMachineImp::SetCurrentState(state, txt, cmd);
2102}
2103// --------------------------------------------------------------------------
2104//
2105//! Implements the StartRun transition.
2106//! Concatenates the given path for the run file and the filename itself (based on the run number),
2107//! and tries to open it.
2108//! @returns
2109//! kSM_Logging if success, kSM_BadRunConfig if failure.
2110int DataLogger::StartRunPlease()
2111{
2112 if (fDebugIsOn)
2113 {
2114 Debug("Starting Run Logging...");
2115 }
2116 //open all the relevant run-files. i.e. all the files associated with run numbers.
2117 for (list<RunNumberType>::iterator it=fRunNumber.begin(); it != fRunNumber.end(); it++)
2118 OpenRunFile(*it);
2119
2120 return kSM_Logging;
2121}
2122
2123#ifdef HAVE_FITS
2124// --------------------------------------------------------------------------
2125//
2126//! Create a fits group file with all the run-fits that were written (either daily or run)
2127//! @param filesToGroup a map of filenames mapping to table names to be grouped (i.e. a
2128//! single file can contain several tables to group
2129//! @param runNumber the run number that should be used for grouping. 0 means nightly group
2130//
2131void DataLogger::CreateFitsGrouping(map<string, vector<string> > & filesToGroup, int runNumber)
2132{
2133 if (fDebugIsOn)
2134 {
2135 ostringstream str;
2136 str << "Creating fits group for ";
2137 if (runNumber != 0)
2138 str << "run files";
2139 else
2140 str << "nightly files";
2141 Debug(str);
2142 }
2143 //create the FITS group corresponding to the ending run.
2144 CCfits::FITS* groupFile;
2145 unsigned int numFilesToGroup = 0;
2146 for (map<string, vector<string> >::const_iterator it=filesToGroup.begin(); it != filesToGroup.end(); it++)
2147 {
2148 numFilesToGroup += it->second.size();
2149 }
2150 if (fDebugIsOn)
2151 {
2152 ostringstream str;
2153 str << "There are " << numFilesToGroup << " tables to group";
2154 Debug(str);
2155 }
2156 if (numFilesToGroup <= 1)
2157 {
2158 filesToGroup.clear();
2159 return;
2160 }
2161 string groupName;
2162 if (runNumber != 0)
2163 groupName = CompileFileName(fRunFilePath, runNumber, "", "fits");
2164 else
2165 groupName = CompileFileName(fNightlyFilePath, "", "fits");
2166
2167 Info("Creating FITS group in: "+groupName);
2168
2169 CCfits::Table* groupTable;
2170 const int maxCharLength = 50;//FILENAME_MAX;
2171 try
2172 {
2173 groupFile = new CCfits::FITS(groupName, CCfits::RWmode::Write);
2174 //setup the column names
2175 ostringstream pathTypeName;
2176 pathTypeName << maxCharLength << "A";
2177 vector<string> names;
2178 vector<string> dataTypes;
2179 names.push_back("MEMBER_XTENSION");
2180 dataTypes.push_back("8A");
2181 names.push_back("MEMBER_URI_TYPE");
2182 dataTypes.push_back("3A");
2183 names.push_back("MEMBER_LOCATION");
2184 dataTypes.push_back(pathTypeName.str());
2185 names.push_back("MEMBER_NAME");
2186 dataTypes.push_back(pathTypeName.str());
2187
2188 groupTable = groupFile->addTable("GROUPING", numFilesToGroup, names, dataTypes);
2189//TODO handle the case when the logger was stopped and restarted during the same day, i.e. the grouping file must be updated
2190 }
2191 catch (CCfits::FitsException e)
2192 {
2193 ostringstream str;
2194 str << "Creating FITS table GROUPING in " << groupName << ": " << e.message();
2195 Error(str);
2196 return;
2197 }
2198
2199 //CCfits seems to be buggy somehow: can't use the column's function "write": it create a compilation error: maybe strings were not thought about.
2200 //use cfitsio routines instead
2201 groupTable->makeThisCurrent();
2202 //create appropriate buffer.
2203 const unsigned int n = 8 + 3 + 2*maxCharLength + 1; //+1 for trailling character
2204
2205 vector<unsigned char> realBuffer;
2206 realBuffer.resize(n);
2207 unsigned char* fitsBuffer = &realBuffer[0];
2208 memset(fitsBuffer, 0, n);
2209
2210 char* startOfExtension = reinterpret_cast<char*>(fitsBuffer);
2211 char* startOfURI = reinterpret_cast<char*>(&fitsBuffer[8]);
2212 char* startOfLocation = reinterpret_cast<char*>(&fitsBuffer[8 + 3]);
2213 char* startOfName = reinterpret_cast<char*>(&fitsBuffer[8+3+maxCharLength]);
2214
2215 strcpy(startOfExtension, "BINTABLE");
2216 strcpy(startOfURI, "URL");
2217
2218 int i=1;
2219 for (map<string, vector<string> >::const_iterator it=filesToGroup.begin(); it!=filesToGroup.end(); it++)
2220 for (vector<string>::const_iterator jt=it->second.begin(); jt != it->second.end(); jt++, i++)
2221 {
2222 strcpy(startOfLocation, it->first.c_str());
2223 strcpy(startOfName, jt->c_str());
2224
2225 if (fDebugIsOn)
2226 {
2227 ostringstream str;
2228 str << "Grouping " << it->first << " " << *jt;
2229 Debug(str);
2230 }
2231
2232 int status = 0;
2233 fits_write_tblbytes(groupFile->fitsPointer(), i, 1, 8+3+2*maxCharLength, fitsBuffer, &status);
2234 if (status)
2235 {
2236 char text[30];//max length of cfitsio error strings (from doc)
2237 fits_get_errstatus(status, text);
2238 ostringstream str;
2239 str << "Writing FITS row " << i << " in " << groupName << ": " << text << " (file_write_tblbytes, rc=" << status << ")";
2240 Error(str);
2241 // FIXME: What to do in case of error?
2242 }
2243 }
2244
2245 filesToGroup.clear();
2246 delete groupFile;
2247}
2248#endif //HAVE_FITS
2249
2250// --------------------------------------------------------------------------
2251//
2252//! Implements the StopRun transition.
2253//! Attempts to close the run file.
2254//! @returns
2255//! kSM_WaitingRun if success, kSM_FatalError otherwise
2256int DataLogger::StopRunPlease()
2257{
2258
2259 if (fDebugIsOn)
2260 {
2261 Debug("Stopping Run Logging...");
2262 }
2263 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it != fRunNumber.end(); it++)
2264 {
2265#ifdef RUN_LOGS
2266 if (!it->logFile->is_open() || !it->reportFile->is_open())
2267#else
2268 if (!it->reportFile->is_open())
2269#endif
2270 return kSM_FatalError;
2271#ifdef RUN_LOGS
2272 it->logFile->close();
2273#endif
2274 it->reportFile->close();
2275 }
2276
2277#ifdef HAVE_FITS
2278 for (SubscriptionsListType::iterator i = fServiceSubscriptions.begin(); i != fServiceSubscriptions.end(); i++)
2279 for (map<string, SubscriptionType>::iterator j = i->second.begin(); j != i->second.end(); j++)
2280 {
2281 if (j->second.runFile.IsOpen())
2282 j->second.runFile.Close();
2283 }
2284#endif
2285 NotifyOpenedFile("", 0, fOpenedRunFiles);
2286 if (fNumSubAndFitsIsOn)
2287 fNumSubAndFits->updateService();
2288
2289 while (fRunNumber.size() > 0)
2290 {
2291 RemoveOldestRunNumber();
2292 }
2293
2294 return kSM_WaitingRun;
2295}
2296// --------------------------------------------------------------------------
2297//
2298//! Implements the Stop and Reset transitions.
2299//! Attempts to close any openned file.
2300//! @returns
2301//! kSM_Ready
2302int DataLogger::GoToReadyPlease()
2303{
2304 if (fDebugIsOn)
2305 {
2306 Debug("Going to the Ready state...");
2307 }
2308 if (GetCurrentState() == kSM_Logging)
2309 StopRunPlease();
2310
2311 if (fNightlyLogFile.is_open())
2312 fNightlyLogFile.close();
2313 if (fNightlyReportFile.is_open())
2314 fNightlyReportFile.close();
2315
2316#ifdef HAVE_FITS
2317 for (SubscriptionsListType::iterator i = fServiceSubscriptions.begin(); i != fServiceSubscriptions.end(); i++)
2318 for (map<string, SubscriptionType>::iterator j = i->second.begin(); j != i->second.end(); j++)
2319 {
2320 if (j->second.nightlyFile.IsOpen())
2321 j->second.nightlyFile.Close();;
2322 }
2323#endif
2324 if (GetCurrentState() == kSM_Logging ||
2325 GetCurrentState() == kSM_WaitingRun ||
2326 GetCurrentState() == kSM_NightlyOpen)
2327 {
2328 NotifyOpenedFile("", 0, fOpenedNightlyFiles);
2329 if (fNumSubAndFitsIsOn)
2330 fNumSubAndFits->updateService();
2331 }
2332#ifdef HAVE_FITS
2333 CreateFitsGrouping(fOpenedNightlyFits, 0);
2334#endif
2335 return kSM_Ready;
2336}
2337// --------------------------------------------------------------------------
2338//
2339//! Implements the transition towards kSM_WaitingRun
2340//! Does nothing really.
2341//! @returns
2342//! kSM_WaitingRun
2343int DataLogger::NightlyToWaitRunPlease()
2344{
2345 if (fDebugIsOn)
2346 {
2347 Debug("Going to Wait Run Number state...");
2348 }
2349 return kSM_WaitingRun;
2350}
2351// --------------------------------------------------------------------------
2352//
2353//! Setup Logger's configuration from a Configuration object
2354//! @param conf the configuration object that should be used
2355//!
2356bool DataLogger::SetConfiguration(Configuration& conf)
2357{
2358 fDebugIsOn = conf.Get<bool>("debug");
2359
2360 //Set the block or allow list
2361 fBlackList.clear();
2362 fWhiteList.clear();
2363
2364 //Adding entries that should ALWAYS be ignored
2365 //fBlackList.insert("DATA_LOGGER/");
2366 fBlackList.insert("/SERVICE_LIST");
2367 fBlackList.insert("DIS_DNS/");
2368
2369 //set the black list
2370 if (conf.Has("block"))
2371 {
2372 const vector<string> vec = conf.Get<vector<string>>("block");
2373
2374 fBlackList.insert(vec.begin(), vec.end());
2375 }
2376
2377 //set the white list
2378 if (conf.Has("allow"))
2379 {
2380 const vector<string> vec = conf.Get<vector<string>>("allow");
2381 fWhiteList.insert(vec.begin(), vec.end());
2382 }
2383
2384 //Set the grouping
2385 if (conf.Has("group"))
2386 {
2387 const vector<string> vec = conf.Get<vector<string>>("group");
2388 fGrouping.insert(vec.begin(), vec.end());
2389 }
2390
2391 //set the old run numbers timeout delay
2392 if (conf.Has("runtimeout"))
2393 {
2394 const long timeout = conf.Get<long>("runtimeout");
2395 if (timeout <= 0)
2396 {
2397 Error("Time out delay for old run numbers should be greater than 0 minute");
2398 return false;
2399 }
2400 fRunNumberTimeout = timeout;
2401 }
2402 return true;
2403}
2404
2405// --------------------------------------------------------------------------
2406int RunDim(Configuration &conf)
2407{
2408 WindowLog wout;
2409
2410 //log.SetWindow(stdscr);
2411 if (conf.Has("log"))
2412 if (!wout.OpenLogFile(conf.Get<string>("log")))
2413 wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
2414
2415 // Start io_service.Run to use the StateMachineImp::Run() loop
2416 // Start io_service.run to only use the commandHandler command detaching
2417 DataLogger logger(wout);
2418 if (!logger.SetConfiguration(conf))
2419 return -1;
2420
2421 logger.Run(true);
2422
2423 return 0;
2424}
2425// --------------------------------------------------------------------------
2426void RunThread(DataLogger* logger)
2427{
2428 // This is necessary so that the StateMachine Thread can signal the
2429 // Readline thread to exit
2430 logger->Run(true);
2431 Readline::Stop();
2432}
2433// --------------------------------------------------------------------------
2434template<class T>
2435int RunShell(Configuration &conf)
2436{
2437 static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
2438
2439 WindowLog &win = shell.GetStreamIn();
2440 WindowLog &wout = shell.GetStreamOut();
2441
2442 if (conf.Has("log"))
2443 if (!wout.OpenLogFile(conf.Get<string>("log")))
2444 win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
2445
2446 DataLogger logger(wout);
2447
2448 if (!logger.SetConfiguration(conf))
2449 return -1;
2450
2451 shell.SetReceiver(logger);
2452
2453 boost::thread t(boost::bind(RunThread, &logger));
2454
2455 shell.Run(); // Run the shell
2456
2457 logger.Stop();
2458
2459 //Wait until the StateMachine has finished its thread
2460 //before returning and destroyinng the dim objects which might
2461 //still be in use.
2462 t.join();
2463
2464 return 0;
2465}
2466
2467/*
2468 Extract usage clause(s) [if any] for SYNOPSIS.
2469 Translators: "Usage" and "or" here are patterns (regular expressions) which
2470 are used to match the usage synopsis in program output. An example from cp
2471 (GNU coreutils) which contains both strings:
2472 Usage: cp [OPTION]... [-T] SOURCE DEST
2473 or: cp [OPTION]... SOURCE... DIRECTORY
2474 or: cp [OPTION]... -t DIRECTORY SOURCE...
2475 */
2476void PrintUsage()
2477{
2478 cout << "\n"
2479 "The data logger connects to all available Dim services and "
2480 "writes them to ascii and fits files.\n"
2481 "\n"
2482 "The default is that the program is started without user interaction. "
2483 "All actions are supposed to arrive as DimCommands. Using the -c "
2484 "option, a local shell can be initialized. With h or help a short "
2485 "help message about the usage can be brought to the screen.\n"
2486 "\n"
2487 "Usage: dataLogger [-c type] [OPTIONS]\n"
2488 " or: dataLogger [OPTIONS]\n";
2489 cout << endl;
2490
2491}
2492// --------------------------------------------------------------------------
2493void PrintHelp()
2494{
2495 /* Additional help text which is printed after the configuration
2496 options goes here */
2497 cout <<
2498 "\n"
2499 "If the allow list has any element, only the servers and/or services "
2500 "specified in the list will be used for subscription. The black list "
2501 "will disable service subscription and has higher priority than the "
2502 "allow list. If the allow list is not present by default all services "
2503 "will be subscribed."
2504 "\n"
2505 "For example, block=DIS_DNS/ will skip all the services offered by "
2506 "the DIS_DNS server, while block=/SERVICE_LIST will skip all the "
2507 "SERVICE_LIST services offered by any server and DIS_DNS/SERVICE_LIST "
2508 "will skip DIS_DNS/SERVICE_LIST.\n"
2509 << endl;
2510}
2511
2512// --------------------------------------------------------------------------
2513void SetupConfiguration(Configuration &conf)
2514{
2515 const string n = conf.GetName()+".log";
2516
2517 po::options_description configp("Program options");
2518 configp.add_options()
2519 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
2520 ("log,l", var<string>(n), "Write log-file")
2521 ("console,c", var<int>(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
2522 ;
2523
2524 po::options_description configs("DataLogger options");
2525 configs.add_options()
2526 ("block,b", vars<string>(), "Black-list of services")
2527 ("allow,a", vars<string>(), "White-list of services")
2528 ("debug", po_bool(), "Debug mode. Print clear text of received service reports to log-stream")
2529 ("group,g", vars<string>(), "Grouping of services into a single run-Fits")
2530 ("runtimeout", var<long>(), "Time out delay for old run numbers")
2531 ;
2532
2533 conf.AddEnv("dns", "DIM_DNS_NODE");
2534
2535 conf.AddOptions(configp);
2536 conf.AddOptions(configs);
2537}
2538// --------------------------------------------------------------------------
2539int main(int argc, const char* argv[])
2540{
2541 Configuration conf(argv[0]);
2542 conf.SetPrintUsage(PrintUsage);
2543 SetupConfiguration(conf);
2544
2545 po::variables_map vm;
2546 try
2547 {
2548 vm = conf.Parse(argc, argv);
2549 }
2550#if BOOST_VERSION > 104000
2551 catch (po::multiple_occurrences &e)
2552 {
2553 cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl;
2554 return -1;
2555 }
2556#endif
2557 catch (exception& e)
2558 {
2559 cerr << "Program options invalid due to: " << e.what() << endl;
2560 return -1;
2561 }
2562
2563 if (conf.HasVersion() || conf.HasPrint())
2564 return -1;
2565
2566 if (conf.HasHelp())
2567 {
2568 PrintHelp();
2569 return -1;
2570 }
2571
2572 Dim::Setup(conf.Get<string>("dns"));
2573
2574// try
2575 {
2576 // No console access at all
2577 if (!conf.Has("console"))
2578 return RunDim(conf);
2579
2580 // Console access w/ and w/o Dim
2581 if (conf.Get<int>("console")==0)
2582 return RunShell<LocalShell>(conf);
2583 else
2584 return RunShell<LocalConsole>(conf);
2585 }
2586/* catch (exception& e)
2587 {
2588 cerr << "Exception: " << e.what() << endl;
2589 return -1;
2590 }*/
2591
2592 return 0;
2593}
Note: See TracBrowser for help on using the repository browser.