source: trunk/FACT++/src/dataLogger.cc@ 10993

Last change on this file since 10993 was 10993, checked in by tbretz, 13 years ago
Added a const-qualifier.
File size: 90.0 KB
Line 
1//****************************************************************
2/** @class DataLogger
3
4 @brief Logs all message and infos between the services
5
6 This is the main logging class facility.
7 It derives from StateMachineDim and DimInfoHandler. the first parent is here to enforce
8 a state machine behaviour, while the second one is meant to make the dataLogger receive
9 dim services to which it subscribed from.
10 The possible states and transitions of the machine are:
11 \dot
12 digraph datalogger {
13 node [shape=record, fontname=Helvetica, fontsize=10];
14 e [label="Error" color="red"];
15 r [label="Ready"]
16 d [label="NightlyOpen"]
17 w [label="WaitingRun"]
18 l [label="Logging"]
19 b [label="BadNightlyconfig" color="red"]
20 c [label="BadRunConfig" color="red"]
21
22 e -> r
23 r -> e
24 r -> d
25 r -> b
26 d -> w
27 d -> r
28 w -> r
29 l -> r
30 l -> w
31 b -> d
32 w -> c
33 w -> l
34 b -> r
35 c -> r
36 c -> l
37 }
38 \enddot
39 */
40 //****************************************************************
41#include "Dim.h"
42#include "Event.h"
43#include "Time.h"
44#include "StateMachineDim.h"
45#include "WindowLog.h"
46#include "Configuration.h"
47#include "ServiceList.h"
48#include "Converter.h"
49#include "MessageImp.h"
50#include "LocalControl.h"
51#include "DimDescriptionService.h"
52
53#include "Description.h"
54
55//#include "DimServiceInfoList.h"
56#include "DimNetwork.h"
57//for getting stat of opened files
58#include <unistd.h>
59//for getting disk free space
60#include <sys/statvfs.h>
61//for getting files sizes
62#include <sys/stat.h>
63
64#include <fstream>
65#include <mutex>
66
67#include <boost/bind.hpp>
68#if BOOST_VERSION < 104400
69#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
70#undef BOOST_HAS_RVALUE_REFS
71#endif
72#endif
73#include <boost/thread.hpp>
74
75#ifdef HAVE_FITS
76#include "Fits.h"
77#endif
78
79//Dim structures
80///Distributes the writing statistics
81struct DataLoggerStats {
82 long sizeWritten;
83 long freeSpace;
84 long writingRate;
85};
86///distributes the number of opened subscriptions and fits files
87struct NumSubAndFitsType {
88 int numSubscriptions;
89 int numOpenFits;
90};
91///distributes which files were opened.
92struct OpenFileToDim {
93 int code;
94 char fileName[FILENAME_MAX];
95};
96
97///Run number record. Used to keep track of which run numbers are still active
98struct RunNumberType {
99#ifdef RUN_LOGS
100 ///the run number log file
101 shared_ptr<ofstream> logFile;
102#endif
103 ///the run number report file
104 shared_ptr<ofstream> reportFile;
105#ifdef HAVE_FITS
106 ///the run number group fits file
107 shared_ptr<CCfits::FITS> runFitsFile;
108#endif
109#ifdef RUN_LOGS
110 ///the log filename
111 string logName;
112#endif
113 ///the report filename
114 string reportName;
115 ///the actual run number
116 uint32_t runNumber;
117 ///the time at which the run number was received
118 Time time;
119 ///list of opened fits used to create the fits grouping when the run ends
120 map<string, vector<string> > openedFits;
121 ///default constructor
122 RunNumberType()
123 {
124#ifdef RUN_LOGS
125 logFile = shared_ptr<ofstream>(new ofstream());
126#endif
127 reportFile = shared_ptr<ofstream>(new ofstream());
128#ifdef HAVE_FITS
129 runFitsFile = shared_ptr<CCfits::FITS>();
130#endif
131 runNumber = 0;
132 }
133 ///default destructor
134 ~RunNumberType()
135 {
136
137 }
138
139 void addServiceToOpenedFits(const string& fileName, const string& serviceName)
140 {
141 //most likely I should add this service name.
142 //the only case for which I should not add it is if a service disapeared, hence the file was closed
143 //and reopened again. Unlikely to happen, but well it may
144
145 if (find(openedFits[fileName].begin(), openedFits[fileName].end(),
146 serviceName)==openedFits[fileName].end())
147 openedFits[fileName].push_back(serviceName);
148 }
149};
150///Dim subscription type. Stores all the relevant info to handle a Dim subscription
151struct SubscriptionType
152{
153#ifdef HAVE_FITS
154 ///Nightly FITS output file
155 Fits nightlyFile;
156 ///run-specific FITS output file
157 Fits runFile;
158#endif
159 ///the actual dimInfo pointer
160 shared_ptr<DimStampedInfo> dimInfo;
161 ///the server
162 string server;
163 ///the service
164 string service;
165 ///the converter for outputting the data according to the format
166 shared_ptr<Converter> fConv;
167 ///the current run number used by this subscription
168 uint32_t runNumber;
169 ///time of the latest received event
170 Time lastReceivedEvent;
171 ///whether or not the fits buffer was allocated already
172 bool fitsBufferAllocated;
173
174 ///Dim info constructor
175 SubscriptionType(DimStampedInfo* info=NULL)
176 {
177 dimInfo = shared_ptr<DimStampedInfo>(info);
178 fConv = shared_ptr<Converter>();
179 runNumber = 0;
180 lastReceivedEvent = Time::None;
181 fitsBufferAllocated = false;
182 }
183
184 ///default destructor
185 ~SubscriptionType()
186 {
187 }
188};
189
190class DataLogger : public StateMachineDim, DimServiceInfoList
191{
192public:
193 /// The list of existing states specific to the DataLogger
194 enum
195 {
196 kSM_NightlyOpen = 20, ///< Nightly file openned and writing
197 kSM_WaitingRun = 30, ///< waiting for the run number to open the run file
198 kSM_Logging = 40, ///< both files openned and writing
199 kSM_BadNightlyConfig = 0x101, ///< the folder specified for Nightly logging does not exist or has bad permissions
200 kSM_BadRunConfig = 0x102, ///< the folder specified for the run logging does not exist or has wrong permissions or no run number
201 kSM_WriteError = 0x103, ///< Denotes that an error occured while writing a file (text or fits).
202 } localstates_t;
203
204 DataLogger(ostream &out);
205 ~DataLogger();
206
207 bool SetConfiguration(Configuration& conf);
208
209private:
210 /************************************************
211 * MEMBER VARIABLES
212 ************************************************/
213 /// ofstream for the NightlyLogfile
214 ofstream fNightlyLogFile;
215 /// ofstream for the Nightly report file
216 ofstream fNightlyReportFile;
217 /// base path of the Nightlyfile
218 string fNightlyFilePath;
219 ///base path of the run file
220 string fRunFilePath;
221 ///run numbers
222 list<RunNumberType> fRunNumber;
223 ///old run numbers time-out delay (in minutes)
224 long fRunNumberTimeout;
225 ///previous run number. to check if changed while logging
226 int fPreviousRunNumber;
227 ///Current Service Quality
228 int fQuality;
229 ///Modified Julian Date
230 double fMjD;
231 ///for obtaining the name of the existing services
232// ServiceList fServiceList;
233 typedef map<const string, map<string, SubscriptionType> > SubscriptionsListType;
234 ///All the services to which we have subscribed to, sorted by server name.
235 SubscriptionsListType fServiceSubscriptions;
236 ///full name of the nightly log file
237 string fFullNightlyLogFileName;
238 ///full name of the nightly report file
239 string fFullNightlyReportFileName;
240 ///variable to track when the statistic were last calculated
241 Time fPreviousStatsUpdateTime;
242 Time fPreviousOldRunNumberCheck;
243 ///boolean to know whether we should close and reopen daily files or not
244 bool fDailyFileDayChangedAlready;
245
246private:
247 /***************************************************
248 * DIM INFO HANDLER
249 ***************************************************/
250 //overloading of DIM's infoHandler function
251 void infoHandler();
252
253 /***************************************************
254 * TRANSITION FUNCTIONS
255 ***************************************************/
256 ///Reporting method for the services info received
257 void ReportPlease(DimInfo* I, SubscriptionType& sub);
258
259 int ConfigureFileName(string &target, const string &type, const EventImp &evt);
260 ///Configuration of the nightly file path
261 int ConfigureNightlyFileName(const Event& evt);
262 ///Configuration fo the file name
263 int ConfigureRunFileName(const Event& evt);
264 ///DEPREC - configuration of the run number
265 int ConfigureRunNumber(const Event& evt);
266 ///print the current state of the dataLogger
267 int PrintStatePlease(const Event& evt);
268 ///checks whether or not the current info being treated is a run number
269 void CheckForRunNumber(DimInfo* I);
270 /// start transition
271 int StartPlease();
272 ///from waiting to logging transition
273 int StartRunPlease();
274 /// from logging to waiting transition
275 int StopRunPlease();
276 ///stop and reset transition
277 int GoToReadyPlease();
278 ///from NightlyOpen to waiting transition
279 int NightlyToWaitRunPlease();
280 /// from writing to error
281 std::string SetCurrentState(int state, const char *txt="", const std::string &cmd="");
282#ifdef HAVE_FITS
283 ///Open fits files
284 void OpenFITSFilesPlease(SubscriptionType& sub, RunNumberType* cRunNumber);
285 ///Write data to FITS files
286 void WriteToFITS(SubscriptionType& sub);
287 ///Allocate the buffers required for fits
288 void AllocateFITSBuffers(SubscriptionType& sub);
289#endif//has_fits
290
291 /***************************************
292 * DIM SERVICES PROVIDED BY THE DATA LOGGER
293 ***************************************/
294 ///monitoring notification loop
295 void ServicesMonitoring();
296 inline void NotifyOpenedFile(const string &name, int type, DimDescribedService* service);
297 ///variables for computing statistics
298 DataLoggerStats fStatVar;
299 ///mutex to make sure that the Stats are not accessed while updating
300// mutex fStatsMutex;
301 ///services notification thread
302// boost::thread fMonitoringThread;
303 ///end of the monitoring
304// bool fContinueMonitoring;
305 ///stores the size of each file that is or was open
306 map<string, long> fFileSizesMap;
307 ///total size of the opened files BEFORE they were opened by the logger
308 long fBaseSizeNightly;
309 long fPreviousSize;
310 long fBaseSizeRun;
311 ///Service for opened files
312 DimDescribedService* fOpenedNightlyFiles;
313 DimDescribedService* fOpenedRunFiles;
314 DimDescribedService* fNumSubAndFits;
315 DimDescribedService* fStatsMonitoring;
316 NumSubAndFitsType fNumSubAndFitsData;
317 ///Small function for calculating the total size written so far
318 bool calculateTotalSizeWritten(DataLoggerStats& statVar, bool isPrinting);
319
320 /***************************************************
321 * DATA LOGGER's CONFIGURATION STUFF
322 ***************************************************/
323 ///black/white listing
324 set<string> fBlackList;
325 set<string> fWhiteList;
326 ///list of services to be grouped
327 set<string> fGrouping;
328 ///configuration flags
329 bool fDebugIsOn;
330 float fStatsPeriodDuration;
331 bool fOpenedFilesIsOn;
332 bool fNumSubAndFitsIsOn;
333 //functions for controlling the services behavior
334 int SetDebugOnOff(const Event& evt);
335 int SetStatsPeriod(const Event& evt);
336 int SetOpenedFilesOnOff(const Event& evt);
337 int SetNumSubsAndFitsOnOff(const Event& evt);
338 int SetRunTimeoutDelay(const Event& evt);
339
340 ///boolean to prevent DIM update while desctructing the dataLogger
341 bool fDestructing;
342 /***************************************************
343 * UTILITIES
344 ***************************************************/
345 ///vectors to keep track of opened Fits files, for grouping purposes.
346 map<string, vector<string> > fOpenedNightlyFits;
347 ///creates a group fits file based on a list of files to be grouped
348 void CreateFitsGrouping(map<string, vector<string> >& filesToGroup, int runNumber);
349
350 bool OpenStream(shared_ptr<ofstream> stream, const string &filename);
351 ///Open the relevant text files related to a particular run
352 int OpenRunFile(RunNumberType& run);
353 ///add a new run number
354 void AddNewRunNumber(int64_t newRun, Time time);
355 ///removes the oldest run number, and close the relevant files.
356 void RemoveOldestRunNumber();
357 ///retrieves the size of a file
358 off_t GetFileSize(const string&);
359 ///Get the digits of year, month and day for filenames and paths
360 void GetYearMonthDayForFiles(unsigned short& year, unsigned short& month, unsigned short& day);
361 ///Appends the relevant year month day to a given path
362 void AppendYearMonthDaytoPath(string& path);
363 ///Form the files path
364 string CompileFileName(const string &path, const string &service, const string & extension, const Time &time=Time());
365 ///Form the files path
366 string CompileFileName(const string &path, uint32_t run, const string &service, const string & extension, const Time &time=Time());
367 ///Check whether service is in black and/or white list
368 bool ShouldSubscribe(const string& server, const string& service);
369 ///Subscribe to a given server and service
370 DimStampedInfo* SubscribeToPlease(const string& server, const string& service);
371 ///Open a text file and checks for ofstream status
372 bool OpenTextFilePlease(ofstream& stream, const string& name);
373 ///Check if a dir is . and returns the actual string corresponding to .
374// string CheckIfDirIsDot(const string& dir);
375 ///Remembers the size of newly opened files. for statistic purposes
376 bool RememberFileOrigSizePlease(string& fileName, bool nightly);
377 ///Checks if the input osftream is in error state, and if so close it.
378 void CheckForOfstreamError(ofstream& out);
379 ///Checks if a given path exist
380 bool DoesPathExist(string path);
381 ///Check if the statistics service should be updated, and if so, do it
382 void UpdateStatisticsService();
383 ///Check if old run numbers can be trimmed, and if so, do it
384 void TrimOldRunNumbers();
385public:
386 ///Create a given directory
387 bool CreateDirectory(string path);
388 /***************************************************
389 * INHERITED FROM DIMSERVICEINFOLIST
390 ***************************************************/
391 ///Add a new service subscription
392 void AddService(const string&, const string&, const string&, bool);
393 ///Remove a given service subscription
394 void RemoveService(const string&, const string&, bool);
395 ///Remove all the services associated with a given server
396 void RemoveAllServices(const string&);
397}; //DataLogger
398
399// --------------------------------------------------------------------------
400//
401//! Check if a given path exists
402//! @param path the path to be checked
403//! @return whether or not the creation has been successfull
404//
405bool DataLogger::CreateDirectory(string path)
406{
407 //remove last '/', if present
408 if (path[path.size()-1] == '/')
409 path = path.substr(0, path.size()-1);
410
411 //create boost path
412 const boost::filesystem::path fullPath = boost::filesystem::system_complete(boost::filesystem::path(path));
413
414 //if path does not exist, check if upper levels exist already
415 if (boost::filesystem::exists(fullPath))
416 {
417 //if path already exist, make sure it does not designate a file (filenames cannot be checked if they do not exist)
418 if (boost::filesystem::is_directory(fullPath))
419 return true;
420
421 Error("Path to be created contains a file name: '" + path + "'");
422 return false;
423 }
424
425 if (path.size() <= 1)
426 {//we're hitting "/", which SHOULD have existed...
427 Error("Something unexpected happened while creating a path");
428 }
429 CreateDirectory(path.substr(0, path.find_last_of('/')));
430
431 //path does not exist, and upper level have been created already by recusrion.
432 const mode_t rightsMask = S_IRWXU | S_IXGRP | S_IRGRP | S_IXOTH | S_IROTH; //everybody read, owner writes
433
434 const int returnValue = mkdir(path.c_str(), rightsMask);
435
436 if (returnValue != 0)
437 {
438 ostringstream str;
439 str << "Could not create directory " << path << " mkdir error code: " << errno;
440 Error(str.str());
441 return false;
442 }
443
444 return true;
445}
446// --------------------------------------------------------------------------
447//
448//! Check if a given path exists
449//! @param path the path to be checked
450//! @return whether or not the given path exists
451//
452bool DataLogger::DoesPathExist(string path)
453{
454 const boost::filesystem::path fullPath = boost::filesystem::system_complete(boost::filesystem::path(path));
455
456 if (!boost::filesystem::exists(fullPath))
457 return false;
458
459 if (!boost::filesystem::is_directory(fullPath))
460 {
461 Error("Path given for checking '" + path + "' designate a file name. Please provide a path name only");
462 return false;
463 }
464
465 if (access(path.c_str(), R_OK|W_OK|X_OK) != 0)
466 {
467 Error("Missing read, write or execute permissions on directory '" + path + "'");
468 return false;
469 }
470
471 return true;
472}
473// --------------------------------------------------------------------------
474//
475//! Add a new service subscription
476//! @param server the server for which the subscription should be created
477//! @param service the service for which the subscription should be created
478//! @param isCmd whether this is a Dim Command or not. Commands are not logged
479//
480void DataLogger::AddService(const string& server, const string& service, const string&, bool isCmd)
481{
482 //dataLogger does not subscribe to commands
483 if (isCmd)
484 return;
485
486 //check the given subscription against black and white lists
487 if (!ShouldSubscribe(server, service))
488 return;
489
490 map<string, SubscriptionType> &list = fServiceSubscriptions[server];
491
492 if (list.find(service) != list.end())
493 {
494 Error("Service " + server + "/" + service + " is already in the dataLogger's list. ignoring its update.");
495 return;
496 }
497
498 list[service].dimInfo = shared_ptr<DimStampedInfo>(SubscribeToPlease(server, service));
499 list[service].server = server;
500 list[service].service = service;
501 fNumSubAndFitsData.numSubscriptions++;
502 if (fDebugIsOn)
503 Debug("Added subscription to " + server + "/" + service);
504}
505// --------------------------------------------------------------------------
506//
507//! Remove a given service subscription
508//! @param server the server for which the subscription should be removed
509//! @param service the service that should be removed
510//! @param isCmd whether or not this is a command
511//
512void DataLogger::RemoveService(const string& server, const string& service, bool isCmd)
513{
514 if (isCmd)
515 return;
516 if (fServiceSubscriptions[server].erase(service) != 1)
517 {
518 ostringstream str;
519 str << "Subscription " << server << "/" << service << "could not be removed as it is not present";
520 Error(str.str());
521 return;
522 }
523 fNumSubAndFitsData.numSubscriptions--;
524 if (fDebugIsOn)
525 {
526 Debug("Removed subscription to " + server + "/" + service);
527 }
528}
529// --------------------------------------------------------------------------
530//
531//! Remove all the services associated with a given server
532//! @param server the server for which all the services should be removed
533//
534void DataLogger::RemoveAllServices(const string& server)
535{
536 fNumSubAndFitsData.numSubscriptions -= fServiceSubscriptions[server].size();
537 fServiceSubscriptions[server].clear();
538 if (fDebugIsOn)
539 {
540 Debug("Removed all subscriptions to " + server + "/");
541 }
542}
543// --------------------------------------------------------------------------
544//
545//! Checks if the given ofstream is in error state and if so, close it
546//! @param out the ofstream that should be checked
547//
548void DataLogger::CheckForOfstreamError(ofstream& out)
549{
550 if (out.good())
551 return;
552
553 Error("An error occured while writing to a text file. Closing it");
554 if (out.is_open())
555 out.close();
556 SetCurrentState(kSM_WriteError);
557}
558// --------------------------------------------------------------------------
559//
560//! Checks the size on disk of a given size, and remembers it in the relevant member variable
561//! @param fileName the file for which the size on disk should be retrieved
562//! @param nightly whether this is a run or nightly file, so that its size is added to the correct member variable
563//
564bool DataLogger::RememberFileOrigSizePlease(string& fileName, bool nightly)
565{
566 //get the size of the file we're about to open
567 if (fFileSizesMap.find(fileName) != fFileSizesMap.end())
568 return false;
569
570 if (nightly)
571 fBaseSizeNightly += GetFileSize(fileName);
572 else
573 fBaseSizeRun += GetFileSize(fileName);
574 fFileSizesMap[fileName] = 0;
575 return true;
576}
577
578// --------------------------------------------------------------------------
579//
580//! Open a text file and checks for error code
581//! @param stream the ofstream for which the file should be opened
582//! @name the file name
583//
584bool DataLogger::OpenTextFilePlease(ofstream& stream, const string& name)
585{
586 Info("Opening: "+name);
587
588 errno = 0;
589 stream.open(name.c_str(), ios_base::out | ios_base::app);
590 if (!stream)
591 {
592 ostringstream str;
593 str << "Trying to open file " << name << ": " << strerror(errno) << " (errno=" << errno << ")";
594 Error(str);
595 return false;
596 }
597
598 return true;
599}
600
601// --------------------------------------------------------------------------
602//
603//! Create a new dim subscription to a given server and service
604//! @param server the server name
605//! @param service the service name
606//
607DimStampedInfo* DataLogger::SubscribeToPlease(const string& server, const string& service)
608{
609 if (fDebugIsOn)
610 {
611 ostringstream str;
612 str << "Subscribing to service " << server << "/" << service;
613 Debug(str);
614 }
615 return new DimStampedInfo((server + "/" + service).c_str(), const_cast<char*>(""), this);
616}
617// --------------------------------------------------------------------------
618//
619//! Check whether a service should be subscribed to, based on the black/white list entries
620//! @param server the server name associated with the service being checked
621//! @param service the service name associated with the service being checked
622//
623bool DataLogger::ShouldSubscribe(const string& server, const string& service)
624{
625 if (fWhiteList.size()>0 &&
626 (fWhiteList.find(server + "/") == fWhiteList.end()) &&
627 (fWhiteList.find(server + "/" + service) == fWhiteList.end()) &&
628 (fWhiteList.find("/" + service) == fWhiteList.end()))
629 return false;
630
631 if ((fBlackList.find(server + "/") != fBlackList.end()) ||
632 (fBlackList.find(server + "/" + service) != fBlackList.end()) ||
633 (fBlackList.find("/" + service) != fBlackList.end()))
634 return false;
635
636 return true;
637}
638// --------------------------------------------------------------------------
639//
640//! Compiles a file name
641//! @param path the base path where to put the file
642//! @param time the time at which the file is created
643//! @param service the service name, if any
644//! @param extension the extension to add, if any
645//
646string DataLogger::CompileFileName(const string &path, const string &service, const string & extension, const Time &time)
647{
648 ostringstream str;
649 //calculate time suitable for naming files.
650 const Time ftime(time-boost::posix_time::time_duration(12,0,0));
651
652 //output it
653 str << path << Time::fmt("/%Y/%m/%d") << ftime;
654
655 //check if target directory exist
656 if (!DoesPathExist(str.str()))
657 CreateDirectory(str.str());
658
659 //output base of file name
660 str << Time::fmt("/%Y_%m_%d") << ftime;
661
662 //output service name
663 if (!service.empty())
664 str << "_" << service;
665
666 //output appropriate extension
667 if (!extension.empty())
668 str << "." << extension;
669
670 return str.str();
671}
672// --------------------------------------------------------------------------
673//
674//! Compiles a file name
675//! @param path the base path where to put the file
676//! @param time the time at which the file is created
677//! @param run the run number
678//! @param service the service name, if any
679//! @param extension the extension to add, if any
680//
681string DataLogger::CompileFileName(const string &path, uint32_t run, const string &service, const string & extension, const Time &time)
682{
683 ostringstream str;
684 //calculate suitable time for naming files and output it
685 str << path << Time::fmt("/%Y/%m/%d") << (time-boost::posix_time::time_duration(12,0,0));
686
687 //check if target directory exist
688 if (!DoesPathExist(str.str()))
689 CreateDirectory(str.str());
690
691 //output base of file name
692 str << '/' << setfill('0') << setw(8) << run;
693
694 //output service name
695 if (!service.empty())
696 str << "_" << service;
697
698 //output appropriate extension
699 if (!extension.empty())
700 str << "." << extension;
701 return str.str();
702}
703
704// --------------------------------------------------------------------------
705//
706//!retrieves the size on disk of a file
707//! @param fileName the full file name for which the size on disk should be retrieved
708//! @return the size of the file on disk, in bytes. 0 if the file does not exist or if an error occured
709//
710off_t DataLogger::GetFileSize(const string& fileName)
711{
712 errno = 0;
713 struct stat st;
714 if (!stat(fileName.c_str(), &st))
715 return st.st_size;
716
717 if (errno != 0 && errno != 2)//ignoring error #2: no such file or directory is not an error for new files
718 {
719 ostringstream str;
720 str << "Unable to stat " << fileName << ". Reason: " << strerror(errno) << " [" << errno << "]";
721 Error(str);
722 }
723
724 return 0;
725}
726// --------------------------------------------------------------------------
727//
728//! Removes the oldest run number and closes the fits files that should be closed
729//! Also creates the fits grouping file
730//
731void DataLogger::RemoveOldestRunNumber()
732{
733 if (fDebugIsOn)
734 {
735 ostringstream str;
736 str << "Removing run number " << fRunNumber.front().runNumber;
737 Debug(str);
738 }
739 CreateFitsGrouping(fRunNumber.front().openedFits, fRunNumber.front().runNumber);
740
741 //crawl through the subscriptions to see if there are still corresponding fits files opened.
742 for (SubscriptionsListType::iterator x=fServiceSubscriptions.begin();
743 x!=fServiceSubscriptions.end(); x++)
744 for (map<string, SubscriptionType>::iterator y=x->second.begin();
745 y!=x->second.end(); y++)
746 if (y->second.runFile.fRunNumber == fRunNumber.front().runNumber && y->second.runFile.IsOpen())
747 {
748 y->second.runFile.Close();
749
750 Info("Closed: "+y->second.runFile.fFileName);
751 }
752 //if a grouping file is on, decrease the number of opened fits manually
753 if (fRunNumber.front().runFitsFile)
754 (fNumSubAndFitsData.numOpenFits)--;
755 //remove the entry
756 fRunNumber.pop_front();
757}
758
759// --------------------------------------------------------------------------
760//
761//! Calculate the total number of written bytes since the logger was started
762//! @param statVar the data structure that should be updated
763//! @param shouldWarn whether or not error messages should be outputted
764//! @param isPrinting whether this function was called from the PRINT command or not. If so, displays relevant information
765//
766bool DataLogger::calculateTotalSizeWritten(DataLoggerStats& statVar, bool isPrinting)
767{
768//mutex
769// if (!isPrinting)
770// fStatsMutex.lock();
771#ifdef HAVE_FITS
772 if (isPrinting)
773 {
774 ostringstream str;
775 str << "There are " << fNumSubAndFitsData.numOpenFits << " FITS files open:";
776 Message(str);
777 }
778
779 ///TODO the grouping file is dealt with several times. This should not be a problem but well, better to fix it I guess.
780 for (SubscriptionsListType::const_iterator x=fServiceSubscriptions.begin();
781 x!=fServiceSubscriptions.end(); x++)
782 {
783 for (map<string, SubscriptionType>::const_iterator y=x->second.begin();
784 y!=x->second.end(); y++)
785 {
786 if (y->second.runFile.IsOpen())
787 {
788 fFileSizesMap[y->second.runFile.fFileName] = y->second.runFile.GetWrittenSize();
789 if (isPrinting)
790 Message("-> "+y->second.runFile.fFileName);
791 }
792 if (y->second.nightlyFile.IsOpen())
793 {
794 fFileSizesMap[y->second.nightlyFile.fFileName] = y->second.nightlyFile.GetWrittenSize();
795 if (isPrinting)
796 Message("-> "+y->second.nightlyFile.fFileName);
797 }
798 }
799 }
800#else
801 if (isPrinting)
802 Message("FITS output disabled at compilation");
803#endif
804 //gather log and report files sizes on disk
805 if (fNightlyLogFile.is_open())
806 fFileSizesMap[fFullNightlyLogFileName] = GetFileSize(fFullNightlyLogFileName);
807 if (fNightlyReportFile.is_open())
808 fFileSizesMap[fFullNightlyReportFileName] = GetFileSize(fFullNightlyReportFileName);
809 for (list<RunNumberType>::iterator it = fRunNumber.begin(); it != fRunNumber.end(); it++)
810 {
811 if (it->reportFile->is_open())
812 fFileSizesMap[it->reportName] = GetFileSize(it->reportName);
813#ifdef RUN_LOGS
814 if (it->logFile->is_open())
815 fFileSizesMap[it->logName] = GetFileSize(it->logName);
816#endif
817 }
818
819 bool shouldWarn = false;
820 struct statvfs vfs;
821 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
822 statVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
823 else
824 {
825 ostringstream str;
826 str << "Unable to retrieve stats for " << fNightlyFilePath << ". Reason: " << strerror(errno) << " [" << errno << "]";
827 if (!shouldWarn)
828 Error(str);
829 statVar.freeSpace = -1;
830 }
831 //sum up all the file sizes. past and present
832 statVar.sizeWritten = 0;
833 for (map<string, long>::const_iterator it=fFileSizesMap.begin(); it != fFileSizesMap.end(); it++)
834 statVar.sizeWritten += it->second;
835 statVar.sizeWritten -= fBaseSizeNightly;
836 statVar.sizeWritten -= fBaseSizeRun;
837
838//mutex
839// if (!isPrinting)
840// fStatsMutex.unlock();
841
842 return shouldWarn;
843}
844
845// --------------------------------------------------------------------------
846//
847//! Monitor the number of opened files and total size written, and distributes this data through a Dim service
848//
849//
850/*
851void DataLogger::ServicesMonitoring()
852{
853 struct statvfs vfs;
854 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
855 fStatVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
856 else
857 fStatVar.freeSpace = -1;
858
859 DimDescribedService srvc ("DATA_LOGGER/STATS", "X:3", fStatVar, "Add description here");
860 fPreviousSize = 0;
861 //loop-wait for broadcast
862 while (fContinueMonitoring)
863 {
864 if (fStatsPeriodDuration == 0.0f)
865 {
866 sleep(0.1f);
867 continue;
868 }
869
870 sleep(fStatsPeriodDuration);
871
872
873 fStatsMutex.lock();
874
875 if (fStatVar.writingRate != 0) //if data has been written
876 {
877 srvc.updateService();
878
879 if(fDebugIsOn)
880 {
881 ostringstream str;
882 str << "Size written: " << fStatVar.sizeWritten/1000 << " kB; writing rate: ";
883 str << fStatVar.writingRate/1000 << " kB/s; free space: ";
884 str << fStatVar.freeSpace/(1000*1000) << " MB";
885 Debug(str);
886 }
887 }
888 fStatsMutex.unlock();
889 }
890}
891*/
892// --------------------------------------------------------------------------
893//
894//! Default constructor. The name of the machine is given DATA_LOGGER
895//! and the state is set to kSM_Ready at the end of the function.
896//
897//!Setup the allows states, configs and transitions for the data logger
898//
899DataLogger::DataLogger(ostream &out) : StateMachineDim(out, "DATA_LOGGER")
900{
901 //initialize member data
902 fNightlyFilePath = ".";
903 fRunFilePath = ".";
904
905 //Give a name to this machine's specific states
906 AddStateName(kSM_NightlyOpen, "NightlyFileOpen", "The summary files for the night are open.");
907 AddStateName(kSM_WaitingRun, "WaitForRun", "The summary files for the night are open and we wait for a run to be started.");
908 AddStateName(kSM_Logging, "Logging", "The summary files for the night and the files for a single run are open.");
909 AddStateName(kSM_BadNightlyConfig, "ErrNightlyFolder", "The folder for the nighly summary files is invalid.");
910 AddStateName(kSM_BadRunConfig, "ErrRunFolder", "The folder for the run files is invalid.");
911 AddStateName(kSM_WriteError, "ErrWrite", "An error occured while writing to a file.");
912
913 // Add the possible transitions for this machine
914 AddEvent(kSM_NightlyOpen, "START", kSM_Ready, kSM_BadNightlyConfig)
915 (boost::bind(&DataLogger::StartPlease, this))
916 ("Start the nightly logging. Nightly file location must be specified already");
917
918 AddEvent(kSM_Ready, "STOP", kSM_NightlyOpen, kSM_WaitingRun, kSM_Logging, kSM_WriteError)
919 (boost::bind(&DataLogger::GoToReadyPlease, this))
920 ("Stop all data logging, close all files.");
921
922 AddEvent(kSM_Logging, "START_RUN", kSM_WaitingRun, kSM_BadRunConfig)
923 (boost::bind(&DataLogger::StartRunPlease, this))
924 ("Start the run logging. Run file location must be specified already.");
925
926 AddEvent(kSM_WaitingRun, "STOP_RUN", kSM_Logging)
927 (boost::bind(&DataLogger::StopRunPlease, this))
928 ("Wait for a run to be started, open run-files as soon as a run number arrives.");
929
930 AddEvent(kSM_Ready, "RESET", kSM_Error, kSM_BadNightlyConfig, kSM_BadRunConfig, kSM_WriteError)
931 (boost::bind(&DataLogger::GoToReadyPlease, this))
932 ("Transition to exit error states. Closes the any open file.");
933
934 AddEvent(kSM_WaitingRun, "WAIT_FOR_RUN_NUMBER", kSM_NightlyOpen)
935 (boost::bind(&DataLogger::NightlyToWaitRunPlease, this))
936 ("Go to waiting for run number state. In this state with any received run-number a new file is opened.");
937
938 // Add the possible configurations for this machine
939 AddEvent("SET_NIGHTLY_FOLDER", "C", kSM_Ready, kSM_BadNightlyConfig)
940 (boost::bind(&DataLogger::ConfigureNightlyFileName, this, _1))
941 ("Configure the base folder for the nightly files."
942 "|Path[string]:Absolute or relative path name where the nightly files should be stored.");
943
944 AddEvent("SET_RUN_FOLDER", "C", kSM_Ready, kSM_BadNightlyConfig, kSM_NightlyOpen, kSM_WaitingRun, kSM_BadRunConfig)
945 (boost::bind(&DataLogger::ConfigureRunFileName, this, _1))
946 ("Configure the base folder for the run files."
947 "|Path[string]:Absolute or relative path name where the run files should be stored.");
948
949 AddEvent("SET_RUN_NUMBER", "X", kSM_Ready, kSM_NightlyOpen, kSM_WaitingRun, kSM_BadRunConfig, kSM_Logging)
950 (boost::bind(&DataLogger::ConfigureRunNumber, this, _1))
951 ("Configure the run number. Cannot be done in logging state");
952
953 // Provide a print command
954 AddEvent("PRINT")
955 (boost::bind(&DataLogger::PrintStatePlease, this, _1))
956 ("Print information about the internal status of the data logger.");
957
958 OpenFileToDim fToDim;
959 fToDim.code = 0;
960 fToDim.fileName[0] = '\0';
961
962 fOpenedNightlyFiles = new DimDescribedService(GetName() + "/FILENAME_NIGHTLY", "I:1;C", fToDim,
963 "Path and base name which is used to compile the filenames for the nightly files."
964 "|Type[int]:type of open files (1=log, 2=rep, 4=fits)"
965 "|Name[string]:path and base file name");
966
967 fOpenedRunFiles = new DimDescribedService(GetName() + "/FILENAME_RUN", "I:1;C", fToDim,
968 "Path and base name which is used to compile the filenames for the run files."
969 "|Type[int]:type of open files (1=log, 2=rep, 4=fits)"
970 "|Name[string]:path and base file name");
971
972 fNumSubAndFitsData.numSubscriptions = 0;
973 fNumSubAndFitsData.numOpenFits = 0;
974 fNumSubAndFits = new DimDescribedService(GetName() + "/NUM_SUBS", "I:2", fNumSubAndFitsData,
975 "Shows number of services to which the data logger is currently subscribed and the total number of open files."
976 "|Subscriptions[int]:number of dim services to which the data logger is currently subscribed."
977 "|NumOpenFiles[int]:number of files currently open by the data logger");
978
979 //services parameters
980 fDebugIsOn = false;
981 fStatsPeriodDuration = 1.0f;
982 fOpenedFilesIsOn = true;
983 fNumSubAndFitsIsOn = true;
984
985 // provide services control commands
986 AddEvent("SET_DEUG_MODE", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
987 (boost::bind(&DataLogger::SetDebugOnOff, this, _1))
988 ("Switch debug mode on or off. Debug mode prints ifnormation about every service written to a file."
989 "|Enable[bool]:Enable of disable debug mode (yes/no).");
990
991 AddEvent("SET_STATISTICS_UPDATE_INTERVAL", "F", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
992 (boost::bind(&DataLogger::SetStatsPeriod, this, _1))
993 ("Interval in which the data-logger statistics service (STATS) is updated."
994 "|Interval[s]:Floating point value in seconds.");
995
996 AddEvent("ENABLE_FILENAME_SERVICES", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
997 (boost::bind(&DataLogger::SetOpenedFilesOnOff ,this, _1))
998 ("Switch service which distributes information about the open files on or off."
999 "|Enable[bool]:Enable of disable filename services (yes/no).");
1000
1001 AddEvent("ENABLE_NUMSUBS_SERVICE", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
1002 (boost::bind(&DataLogger::SetNumSubsAndFitsOnOff, this, _1))
1003 ("Switch the service which distributes information about the number of subscriptions and open files on or off."
1004 "|Enable[bool]:Enable of disable NUM_SUBS service (yes/no).");
1005
1006 AddEvent("SET_RUN_TIMEOUT", "L:1", kSM_Ready, kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun)
1007 (boost::bind(&DataLogger::SetRunTimeoutDelay, this, _1))
1008 ("Set the timeout delay for old run numbers."
1009 "|timeout[min]:Time out in minutes after which files for expired runs are closed.");
1010
1011 fDestructing = false;
1012
1013 //start the monitoring service
1014 fStatVar.sizeWritten = 0;
1015 fStatVar.freeSpace = 0;
1016 fStatVar.writingRate = 0;
1017 fPreviousStatsUpdateTime = Time().Mjd();
1018 fPreviousOldRunNumberCheck = Time().Mjd();
1019 fPreviousSize = 0;
1020
1021 struct statvfs vfs;
1022 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
1023 fStatVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
1024 else
1025 fStatVar.freeSpace = -1;
1026
1027 fStatsMonitoring = new DimDescribedService(GetName() + "/STATS", "X:3", fStatVar, "Add description here");
1028
1029//mutex
1030// fContinueMonitoring = true;
1031// fMonitoringThread = boost::thread(boost::bind(&DataLogger::ServicesMonitoring, this));
1032 fBaseSizeNightly = 0;
1033 fBaseSizeRun = 0;
1034 fDailyFileDayChangedAlready = true;
1035 fRunNumberTimeout = 1;
1036 if(fDebugIsOn)
1037 {
1038 Debug("DataLogger Init Done.");
1039 }
1040}
1041
1042// --------------------------------------------------------------------------
1043//
1044//! Destructor
1045//
1046DataLogger::~DataLogger()
1047{
1048 if (fDebugIsOn)
1049 {
1050 Debug("DataLogger destruction starts");
1051 }
1052 fDestructing = true;
1053 //first let's go to the ready state
1054 GoToReadyPlease();
1055 //release the services subscriptions
1056 fServiceSubscriptions.clear();
1057 //exit the monitoring loop
1058//mutex
1059// fContinueMonitoring = false;
1060// fMonitoringThread.join();
1061 //clear any remaining run number (should remain only one)
1062 while (fRunNumber.size() > 0)
1063 {
1064 RemoveOldestRunNumber();
1065 }
1066
1067 delete fOpenedNightlyFiles;
1068 delete fOpenedRunFiles;
1069 delete fNumSubAndFits;
1070
1071 if (fDebugIsOn)
1072 {
1073 Debug("DataLogger desctruction ends");
1074 }
1075}
1076// --------------------------------------------------------------------------
1077//
1078//! checks if the statistic service should be updated, and if so, do it
1079//
1080void DataLogger::UpdateStatisticsService()
1081{
1082 //update the fits files sizes
1083 const Time cTime = Time();
1084
1085 if ((fStatsPeriodDuration == 0) || ((cTime - fPreviousStatsUpdateTime).total_seconds() < fStatsPeriodDuration))
1086 return;
1087
1088 calculateTotalSizeWritten(fStatVar, false);
1089 fStatVar.writingRate = (fStatVar.sizeWritten - fPreviousSize)/((cTime - fPreviousStatsUpdateTime).total_seconds());
1090 fPreviousSize = fStatVar.sizeWritten;
1091 fPreviousStatsUpdateTime = cTime;
1092 //update the service. No need to check if data has been written, because some must have been, otherwise we would not have hit this piece of code
1093 fStatsMonitoring->updateService();
1094
1095 if(fDebugIsOn)
1096 {
1097 ostringstream str;
1098 str << "Size written: " << fStatVar.sizeWritten/1000 << " kB; writing rate: ";
1099 str << fStatVar.writingRate/1000 << " kB/s; free space: ";
1100 str << fStatVar.freeSpace/(1000*1000) << " MB";
1101 Debug(str);
1102 }
1103}
1104// --------------------------------------------------------------------------
1105//
1106//! checks if old run numbers should be trimmed and if so, do it
1107//
1108void DataLogger::TrimOldRunNumbers()
1109{
1110 const Time cTime = Time();
1111
1112 if ((cTime - fPreviousOldRunNumberCheck).total_seconds() < fRunNumberTimeout*60)
1113 return;
1114
1115 while (fRunNumber.size() > 1 && (cTime - fRunNumber.back().time) > boost::posix_time::minutes(fRunNumberTimeout))
1116 {
1117 RemoveOldestRunNumber();
1118 }
1119 fPreviousOldRunNumberCheck = cTime;
1120}
1121// --------------------------------------------------------------------------
1122//
1123//! Inherited from DimInfo. Handles all the Infos to which we subscribed, and log them
1124//
1125void DataLogger::infoHandler()
1126{
1127 // Make sure getTimestamp is called _before_ getTimestampMillisecs
1128 if (fDestructing)
1129 return;
1130
1131 DimInfo* I = getInfo();
1132
1133 if (I==NULL)
1134 return;
1135 //check if the service pointer corresponds to something that we subscribed to
1136 //this is a fix for a bug that provides bad Infos when a server starts
1137 bool found = false;
1138 SubscriptionsListType::iterator x;
1139 map<string, SubscriptionType>::iterator y;
1140 for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++)
1141 {//find current service is subscriptions
1142 for (y=x->second.begin(); y!=x->second.end();y++)
1143 if ((y->second.dimInfo).get() == I)
1144 {
1145 found = true;
1146 break;
1147 }
1148 if (found)
1149 break;
1150 }
1151 if (!found)
1152 {
1153 DimServiceInfoList::infoHandler();
1154 return;
1155 }
1156 if (I->getSize() <= 0)
1157 return;
1158
1159 // Make sure that getTimestampMillisecs is NEVER called before
1160 // getTimestamp is properly called
1161 // check that the message has been updated by something, i.e. must be different from its initial value
1162 if (I->getTimestamp() == 0)
1163 return;
1164
1165 // FIXME: Here we have to check if we have received the
1166 // service with the run-number.
1167 // CheckForRunNumber(I); has been removed because we have to
1168 // subscribe to this service anyway and hence we have the pointer
1169 // (no need to check for the name)
1170
1171 ReportPlease(I, y->second);
1172
1173 //update the fits files sizes
1174 UpdateStatisticsService();
1175
1176 //remove old run numbers
1177 TrimOldRunNumbers();
1178}
1179
1180bool DataLogger::OpenStream(shared_ptr<ofstream> stream, const string &filename)
1181{
1182 Info("Opening: "+filename);
1183
1184 if (stream->is_open())
1185 {
1186 ostringstream str;
1187 str << filename << " was already open when trying to open it.";
1188 Error(str);
1189 return false;
1190 }
1191
1192 errno = 0;
1193 stream->open(filename.c_str(), ios_base::out | ios_base::app);
1194 if (errno != 0)
1195 {
1196 ostringstream str;
1197 str << "Unable to open " << filename << ": " << strerror(errno) << " (errno=" << errno << ")";
1198 Error(str);
1199 return false;
1200 }
1201
1202 if (!stream->is_open())
1203 {
1204 ostringstream str;
1205 str << "File " << filename << " not open as it ought to be.";
1206 Error(str);
1207 return false;
1208 }
1209
1210 return true;
1211}
1212
1213// --------------------------------------------------------------------------
1214//
1215//! Open the text files associated with the given run number
1216//! @param run the run number to be dealt with
1217//
1218int DataLogger::OpenRunFile(RunNumberType& run)
1219{
1220#ifdef RUN_LOGS
1221 // open log file
1222 run.logName = CompileFileName(fRunFilePath, run.runNumber, "", "log");
1223 if (!OpenStream(run.logFile, run.logName))
1224 return -1;
1225#endif
1226
1227 // open report file
1228 run.reportName = CompileFileName(fRunFilePath, run.runNumber, "", "rep");
1229 if (!OpenStream(run.reportFile, run.reportName))
1230 return -1;
1231
1232 //get the size of the newly opened file.
1233#ifdef RUN_LOGS
1234 RememberFileOrigSizePlease(run.logName, false);
1235#endif
1236 RememberFileOrigSizePlease(run.reportName, false);
1237
1238 //TODO this notification scheme might be messed up now.... fix it !
1239 const string baseFileName = CompileFileName(fRunFilePath, run.runNumber, "", "");
1240 NotifyOpenedFile(baseFileName, 3, fOpenedRunFiles);
1241 run.openedFits.clear();
1242 return 0;
1243}
1244// --------------------------------------------------------------------------
1245//
1246//! Add a new active run number
1247//! @param newRun the new run number
1248//! @param time the time at which the new run number was issued
1249//
1250void DataLogger::AddNewRunNumber(int64_t newRun, Time time)
1251{
1252
1253 if (newRun > 0xffffffff)
1254 {
1255 Error("New run number too large, out of range. Ignoring.");
1256 return;
1257 }
1258 if (fDebugIsOn)
1259 {
1260 ostringstream str;
1261 str << "Adding new run number " << newRun << " that was issued on " << time;
1262 Debug(str);
1263 }
1264 //Add new run number to run number list
1265 fRunNumber.push_back(RunNumberType());
1266 fRunNumber.back().runNumber = uint32_t(newRun);
1267 fRunNumber.back().time = time;
1268
1269 ostringstream str;
1270 str << "The new run number is: " << fRunNumber.back().runNumber;
1271 Message(str);
1272
1273 if (GetCurrentState() != kSM_Logging)
1274 return;
1275 //open the log and report files
1276 OpenRunFile(fRunNumber.back());
1277}
1278// --------------------------------------------------------------------------
1279//
1280//! Checks whether or not the current info is a run number.
1281//! If so, then remember it. A run number is required to open the run-log file
1282//! @param I
1283//! the current DimInfo
1284//
1285void DataLogger::CheckForRunNumber(DimInfo* I)
1286{
1287 if (strstr(I->getName(), "SET_RUN_NUMBER") != NULL)
1288 {//assumes that the run number is an integer
1289 //check if some run number entries can be deleted leave one so that two remain after adding the new one
1290 AddNewRunNumber(I->getLonglong(), Time(I->getTimestamp(), I->getTimestampMillisecs()*1000));
1291 }
1292}
1293
1294// --------------------------------------------------------------------------
1295//
1296//! write infos to log files.
1297//! @param I
1298//! The current DimInfo
1299//! @param sub
1300//! The dataLogger's subscription corresponding to this DimInfo
1301//
1302void DataLogger::ReportPlease(DimInfo* I, SubscriptionType& sub)
1303{
1304
1305
1306 //should we log or report this info ? (i.e. is it a message ?)
1307 bool isItaReport = ((strstr(I->getName(), "Message") == NULL) && (strstr(I->getName(), "MESSAGE") == NULL));
1308 if (I->getFormat()[0] == 'C')
1309 isItaReport = false;
1310
1311 if (!fNightlyReportFile.is_open())
1312 return;
1313
1314 //Check whether we should close and reopen daily text files or not
1315 //This should work in any case base of the following:
1316 // - fDailyFileDayChangedAlready is initialized to true. So if the dataLogger is started around noon, no file will be closed
1317 // - fDailyFileDayChangedAlready is set to false if (time != 12), so the file will be closed and reopened only if the logger runs since before noon (which is the required behavior)
1318 //This only applies to text files. Fits are closed and reopened based on the last and current service received time.
1319 //this was not applicable to text files, because as they gather several services, we have no guarantee that the received time will be greater than the previous one,
1320 //which could lead to several close/reopen instead of only one.
1321 if (Time().h() == 12 && !fDailyFileDayChangedAlready)
1322 {
1323 if (fDebugIsOn)
1324 Debug("Its Noon! Closing and reopening daily text files");
1325
1326 fNightlyLogFile.close();
1327 fNightlyReportFile.close();
1328
1329 Info("Closed: "+fFullNightlyLogFileName);
1330 Info("Closed: "+fFullNightlyReportFileName);
1331
1332 fFullNightlyLogFileName = CompileFileName(fNightlyFilePath, "", "log");
1333 OpenTextFilePlease(fNightlyLogFile, fFullNightlyLogFileName);
1334 //FIXME: Handle return code properly!
1335
1336 fFullNightlyReportFileName = CompileFileName(fNightlyFilePath, "", "rep");
1337 OpenTextFilePlease(fNightlyReportFile, fFullNightlyReportFileName);
1338 //FIXME: Handle return code properly!
1339
1340 fDailyFileDayChangedAlready = true;
1341 }
1342 if (Time().h() != 12 && fDailyFileDayChangedAlready)
1343 fDailyFileDayChangedAlready = false;
1344
1345 //create the converter for that service
1346 if ((!sub.fConv.get()) && isItaReport)
1347 {
1348 //trick the converter in case of 'C'. why do I do this ? well simple: the converter checks that the right number
1349 //of bytes was written. because I skip 'C' with fits, the bytes will not be allocated, hence the "size copied ckeck"
1350 //of the converter will fail, hence throwing an exception.
1351 string fakeFormat(I->getFormat());
1352 if (fakeFormat[fakeFormat.size()-1] == 'C')
1353 fakeFormat = fakeFormat.substr(0, fakeFormat.size()-1);
1354 sub.fConv = shared_ptr<Converter>(new Converter(Out(), I->getFormat()));
1355 if (!sub.fConv)
1356 {
1357 ostringstream str;
1358 str << "Couldn't properly parse the format... service " << sub.dimInfo->getName() << " ignored.";
1359 Error(str);
1360 return;
1361 }
1362 }
1363 //construct the header
1364 ostringstream header;
1365 const Time cTime(I->getTimestamp(), I->getTimestampMillisecs()*1000);
1366 fQuality = I->getQuality();
1367 fMjD = cTime.Mjd();
1368
1369 //figure out which run file should be used
1370 ofstream* targetRunFile = NULL;
1371 RunNumberType* cRunNumber = NULL;
1372 if (GetCurrentState() == kSM_Logging)
1373 {
1374 list<RunNumberType>::reverse_iterator rit;
1375 for (rit=fRunNumber.rbegin(); rit!=fRunNumber.rend(); rit++)
1376 {
1377 if (rit->time < cTime) //this is the run number that we want to use
1378 {
1379 //Find something better to convert iterator to pointer than the ugly line below....
1380 cRunNumber = &(*rit);
1381 sub.runNumber = rit->runNumber;
1382#ifdef RUN_LOGS
1383 targetRunFile = isItaReport ? (rit->reportFile).get() : (rit->logFile).get();
1384#else
1385 targetRunFile = isItaReport ? (rit->reportFile).get() : NULL;
1386#endif
1387 break;
1388 }
1389 }
1390 if (rit == fRunNumber.rend() && fRunNumber.size() != 0)
1391 {
1392 ostringstream str;
1393 str << "Could not find an appropriate run number for info coming at time: " << cTime;
1394 Error(str);
1395 Error("Active run numbers: ");
1396 for (rit=fRunNumber.rbegin(); rit != fRunNumber.rend(); rit++)
1397 {
1398 str.str("");
1399 str << rit->runNumber;
1400 Error(str);
1401 }
1402
1403 }
1404 }
1405
1406 if (isItaReport)
1407 {
1408 //write text header
1409 header << I->getName() << " " << fQuality << " ";
1410 header << cTime.Y() << " " << cTime.M() << " " << cTime.D() << " ";
1411 header << cTime.h() << " " << cTime.m() << " " << cTime.s() << " ";
1412 header << cTime.ms() << " " << I->getTimestamp() << " ";
1413
1414 string text;
1415 try
1416 {
1417 text = sub.fConv->GetString(I->getData(), I->getSize());
1418 }
1419 catch (const runtime_error &e)
1420 {
1421 ostringstream str;
1422 str << "Parsing service " << sub.dimInfo->getName();
1423 str << " failed: " << e.what();
1424 Error(str);
1425 return;
1426 }
1427
1428 if (text.empty())
1429 {
1430 ostringstream str;
1431 str << "Service " << sub.dimInfo->getName() << " sent an empty string";
1432 Info(str);
1433 return;
1434 }
1435 //replace bizarre characters by white space
1436 replace(text.begin(), text.end(), '\n', '\\');
1437 replace_if(text.begin(), text.end(), ptr_fun<int, int>(&iscntrl), ' ');
1438
1439 //write entry to Nightly report
1440 if (fNightlyReportFile.is_open())
1441 {
1442 fNightlyReportFile << header.str() << text << endl;
1443 CheckForOfstreamError(fNightlyReportFile);
1444 }
1445 //write entry to run-report
1446 if (targetRunFile && targetRunFile->is_open())
1447 {
1448 *targetRunFile << header.str() << text << endl;
1449 CheckForOfstreamError(*targetRunFile);
1450 }
1451 }
1452 else
1453 {//write entry to both Nightly and run logs
1454 ostringstream msg;
1455 msg << I->getName() << ": " << I->getString();
1456
1457 if (fNightlyLogFile.is_open())
1458 {
1459 MessageImp(fNightlyLogFile).Write(cTime, msg.str().c_str(), fQuality);
1460 CheckForOfstreamError(fNightlyLogFile);
1461 }
1462 if (targetRunFile && targetRunFile->is_open())
1463 {
1464 MessageImp(*targetRunFile).Write(cTime, msg.str().c_str(), fQuality);
1465 CheckForOfstreamError(*targetRunFile);
1466 }
1467 }
1468
1469#ifdef HAVE_FITS
1470 if (isItaReport)
1471 {
1472 //check if the last received event was before noon and if current one is after noon.
1473 //if so, close the file so that it gets reopened.
1474 if (sub.nightlyFile.IsOpen())
1475 if ((sub.lastReceivedEvent != Time::None) && (sub.lastReceivedEvent.h() < 12) && (cTime.h() >= 12))
1476 {
1477 sub.nightlyFile.Close();
1478 }
1479 sub.lastReceivedEvent = cTime;
1480 if (!sub.nightlyFile.IsOpen() || !sub.runFile.IsOpen() || sub.runNumber != sub.runFile.fRunNumber)
1481 OpenFITSFilesPlease(sub, cRunNumber);
1482 WriteToFITS(sub);
1483 }
1484#endif
1485
1486}
1487
1488// --------------------------------------------------------------------------
1489//
1490//! print the dataLogger's current state. invoked by the PRINT command
1491//! @param evt
1492//! the current event. Not used by the method
1493//! @returns
1494//! the new state. Which, in that case, is the current state
1495//!
1496int DataLogger::PrintStatePlease(const Event& )
1497{
1498 Message("------------------------------------------");
1499 Message("------- DATA LOGGER CURRENT STATE --------");
1500 Message("------------------------------------------");
1501
1502 //print the path configuration
1503 Message("Nightly Path: " + boost::filesystem::system_complete(boost::filesystem::path(fNightlyFilePath)).directory_string());
1504 Message("Run Path: " + boost::filesystem::system_complete(boost::filesystem::path(fRunFilePath)).directory_string());
1505
1506 //print active run numbers
1507 ostringstream str;
1508 str << "Active Run Numbers:";
1509 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it!=fRunNumber.end(); it++)
1510 str << " " << it->runNumber;
1511 if (fRunNumber.size()==0)
1512 str << " <none>";
1513 Message(str);
1514 //timeout value
1515 str.str("");
1516 str << "Timeout delay for old run numbers: " << fRunNumberTimeout << " minute(s)";
1517 Message(str);
1518
1519 //print all the open files.
1520 Message("------------ OPENED FILES ----------------");
1521 if (fNightlyLogFile.is_open())
1522 Message("Nightly log-file: OPEN");
1523
1524 if (fNightlyReportFile.is_open())
1525 Message("Nightly report-file: OPEN");
1526
1527 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it!=fRunNumber.end(); it++)
1528 {
1529#ifdef RUN_LOGS
1530 if (it->logFile->is_open())
1531 Message("Run log-file: " + it->logName + " (OPEN)");
1532#endif
1533 if (it->reportFile->is_open())
1534 Message("Run report-file: " + it->reportName + " (OPEN)");
1535 }
1536
1537 DataLoggerStats statVar;
1538 /*const bool statWarning =*/ calculateTotalSizeWritten(statVar, true);
1539
1540 Message("----------------- STATS ------------------");
1541 str.str("");
1542 str << "Total Size written: " << statVar.sizeWritten << " bytes.";
1543 Message(str);
1544 str.str("");
1545 str << "Disk free space: " << statVar.freeSpace << " bytes.";
1546 Message(str);
1547 str.str("");
1548 str << "Statistics are updated every " << fStatsPeriodDuration << " seconds";
1549 if (fStatsPeriodDuration != 0)
1550 Message(str);
1551 else
1552 Message("Statistics updates are currently disabled");
1553
1554 Message("------------ DIM SUBSCRIPTIONS -----------");
1555 str.str("");
1556 str << "There are " << fNumSubAndFitsData.numSubscriptions << " active DIM subscriptions.";
1557 Message(str);
1558 for (map<const string, map<string, SubscriptionType> >::const_iterator it=fServiceSubscriptions.begin(); it!= fServiceSubscriptions.end();it++)
1559 {
1560 Message("Server "+it->first);
1561 for (map<string, SubscriptionType>::const_iterator it2=it->second.begin(); it2!=it->second.end(); it2++)
1562 Message(" -> "+it2->first);
1563 }
1564 Message("--------------- BLOCK LIST ---------------");
1565 for (set<string>::const_iterator it=fBlackList.begin(); it != fBlackList.end(); it++)
1566 Message(" -> "+*it);
1567 if (fBlackList.size()==0)
1568 Message(" <empty>");
1569
1570 Message("--------------- ALLOW LIST ---------------");
1571 for (set<string>::const_iterator it=fWhiteList.begin(); it != fWhiteList.end(); it++)
1572 Message(" -> "+*it);
1573 if (fWhiteList.size()==0)
1574 Message(" <empty>");
1575
1576 Message("-------------- GROUPING LIST -------------");
1577 Message("The following servers and/or services will");
1578 Message("be grouped into a single fits file:");
1579 for (set<string>::const_iterator it=fGrouping.begin(); it != fGrouping.end(); it++)
1580 Message(" -> "+*it);
1581 if (fGrouping.size()==0)
1582 Message(" <no grouping>");
1583
1584 Message("------------------------------------------");
1585 Message("-------- END OF DATA LOGGER STATE --------");
1586 Message("------------------------------------------");
1587
1588 return GetCurrentState();
1589}
1590
1591// --------------------------------------------------------------------------
1592//
1593//! turn debug mode on and off
1594//! @param evt
1595//! the current event. contains the instruction string: On, Off, on, off, ON, OFF, 0 or 1
1596//! @returns
1597//! the new state. Which, in that case, is the current state
1598//!
1599int DataLogger::SetDebugOnOff(const Event& evt)
1600{
1601 const bool backupDebug = fDebugIsOn;
1602
1603 fDebugIsOn = evt.GetBool();
1604
1605 if (fDebugIsOn == backupDebug)
1606 Message("Debug mode was already in the requested state.");
1607
1608 ostringstream str;
1609 str << "Debug mode is now " << fDebugIsOn;
1610 Message(str);
1611
1612 return GetCurrentState();
1613}
1614// --------------------------------------------------------------------------
1615//
1616//! set the statistics update period duration. 0 disables the statistics
1617//! @param evt
1618//! the current event. contains the new duration.
1619//! @returns
1620//! the new state. Which, in that case, is the current state
1621//!
1622int DataLogger::SetStatsPeriod(const Event& evt)
1623{
1624 const float backupDuration = fStatsPeriodDuration;
1625
1626 fStatsPeriodDuration = evt.GetFloat();
1627
1628 if (fStatsPeriodDuration < 0)
1629 {
1630 Error("Statistics period duration should be greater than zero. Discarding provided value.");
1631 fStatsPeriodDuration = backupDuration;
1632 return GetCurrentState();
1633 }
1634 if (!finite(fStatsPeriodDuration))// != fStatsPeriodDuration)
1635 {
1636 Error("Provided duration does not appear to be a valid float. Discarding it.");
1637 fStatsPeriodDuration = backupDuration;
1638 return GetCurrentState();
1639 }
1640 if (backupDuration == fStatsPeriodDuration)
1641 Warn("Statistics period not modified. Supplied value already in use.");
1642
1643 if (fStatsPeriodDuration == 0.0f)
1644 Message("Statistics are now OFF");
1645 else
1646 {
1647 ostringstream str;
1648 str << "Statistics period is now " << fStatsPeriodDuration << " seconds";
1649 Message(str);
1650 }
1651
1652 return GetCurrentState();
1653}
1654// --------------------------------------------------------------------------
1655//
1656//! set the opened files service on or off.
1657//! @param evt
1658//! the current event. contains the instruction string. similar to setdebugonoff
1659//! @returns
1660//! the new state. Which, in that case, is the current state
1661//!
1662int DataLogger::SetOpenedFilesOnOff(const Event& evt)
1663{
1664 const bool backupOpened = fOpenedFilesIsOn;
1665
1666 fOpenedFilesIsOn = evt.GetBool();
1667
1668 if (fOpenedFilesIsOn == backupOpened)
1669 Message("Opened files service mode was already in the requested state.");
1670
1671 ostringstream str;
1672 str << "Opened files service mode is now " << fOpenedFilesIsOn;
1673 Message(str);
1674
1675 return GetCurrentState();
1676}
1677
1678// --------------------------------------------------------------------------
1679//
1680//! set the number of subscriptions and opened fits on and off
1681//! @param evt
1682//! the current event. contains the instruction string. similar to setdebugonoff
1683//! @returns
1684//! the new state. Which, in that case, is the current state
1685//!
1686int DataLogger::SetNumSubsAndFitsOnOff(const Event& evt)
1687{
1688 const bool backupSubs = fNumSubAndFitsIsOn;
1689
1690 fNumSubAndFitsIsOn = evt.GetBool();
1691
1692 if (fNumSubAndFitsIsOn == backupSubs)
1693 Message("Number of subscriptions service mode was already in the requested state");
1694
1695 ostringstream str;
1696 str << "Number of subscriptions service mode is now " << fNumSubAndFitsIsOn;
1697 Message(str);
1698
1699 return GetCurrentState();
1700}
1701// --------------------------------------------------------------------------
1702//
1703//! set the timeout delay for old run numbers
1704//! @param evt
1705//! the current event. contains the timeout delay long value
1706//! @returns
1707//! the new state. Which, in that case, is the current state
1708//!
1709int DataLogger::SetRunTimeoutDelay(const Event& evt)
1710{
1711 const long backupTimeout = fRunNumberTimeout;
1712 fRunNumberTimeout = evt.GetXtra();
1713
1714 if (fRunNumberTimeout == 0)
1715 {
1716 fRunNumberTimeout = backupTimeout;
1717 Error("Timeout delays for old run numbers must be greater than 0. Ignored.");
1718 return GetCurrentState();
1719 }
1720
1721 if (fRunNumberTimeout == backupTimeout)
1722 Message("New timeout for old run numbers is same value as previous one.");
1723
1724 ostringstream str;
1725 str << "Timeout delay for old run numbers is now " << fRunNumberTimeout;
1726 Message(str);
1727
1728 return GetCurrentState();
1729}
1730
1731int DataLogger::ConfigureFileName(string &target, const string &type, const EventImp &evt)
1732{
1733 if (!evt.GetText())
1734 {
1735 Error("Empty "+type+" folder given. Please specify a valid path.");
1736 return GetCurrentState();
1737 }
1738
1739 const string givenPath = evt.GetText();
1740 if (!DoesPathExist(givenPath))
1741 {
1742 Error("Provided "+type+" path '"+givenPath+"' is not a valid folder. Ignored");
1743 return GetCurrentState();
1744 }
1745
1746 Message("New "+type+" folder: "+givenPath);
1747
1748 target = givenPath;
1749
1750 return GetCurrentState();
1751}
1752
1753// --------------------------------------------------------------------------
1754//
1755//! Sets the path to use for the Nightly log file.
1756//! @param evt
1757//! the event transporting the path
1758//! @returns
1759//! currently only the current state.
1760//
1761int DataLogger::ConfigureNightlyFileName(const Event& evt)
1762{
1763 return ConfigureFileName(fNightlyFilePath, "nightly", evt);
1764}
1765
1766// --------------------------------------------------------------------------
1767//
1768//! Sets the path to use for the run log file.
1769//! @param evt
1770//! the event transporting the path
1771//! @returns
1772//! currently only the current state
1773int DataLogger::ConfigureRunFileName(const Event& evt)
1774{
1775 return ConfigureFileName(fRunFilePath, "run", evt);
1776}
1777
1778// --------------------------------------------------------------------------
1779//
1780//! Sets the run number.
1781//! @param evt
1782//! the event transporting the run number
1783//! @returns
1784//! currently only the current state
1785int DataLogger::ConfigureRunNumber(const Event& evt)
1786{
1787 AddNewRunNumber(evt.GetXtra(), evt.GetTime());
1788 return GetCurrentState();
1789}
1790// --------------------------------------------------------------------------
1791//
1792//! Notifies the DIM service that a particular file was opened
1793//! @ param name the base name of the opened file, i.e. without path nor extension.
1794//! WARNING: use string instead of string& because I pass values that do not convert to string&.
1795//! this is not a problem though because file are not opened so often.
1796//! @ param type the type of the opened file. 0 = none open, 1 = log, 2 = text, 4 = fits
1797inline void DataLogger::NotifyOpenedFile(const string &name, int type, DimDescribedService* service)
1798{
1799 if (!fOpenedFilesIsOn)
1800 return;
1801
1802 if (fDebugIsOn)
1803 {
1804 ostringstream str;
1805 str << "Updating " << service->getName() << " file '" << name << "' (type=" << type << ")";
1806 Debug(str);
1807
1808 str.str("");
1809 str << "Num subs: " << fNumSubAndFitsData.numSubscriptions << " Num open FITS: " << fNumSubAndFitsData.numOpenFits;
1810 Debug(str);
1811 }
1812
1813 if (name.size()+1 > FILENAME_MAX)
1814 {
1815 Error("Provided file name '" + name + "' is longer than allowed file name length.");
1816 return;
1817 }
1818
1819 OpenFileToDim fToDim;
1820 fToDim.code = type;
1821 memcpy(fToDim.fileName, name.c_str(), name.size()+1);
1822
1823 service->setData(reinterpret_cast<void*>(&fToDim), name.size()+1+sizeof(int));
1824 service->setQuality(0);
1825 service->updateService();
1826}
1827// --------------------------------------------------------------------------
1828//
1829//! Implements the Start transition.
1830//! Concatenates the given path for the Nightly file and the filename itself (based on the day),
1831//! and tries to open it.
1832//! @returns
1833//! kSM_NightlyOpen if success, kSM_BadNightlyConfig if failure
1834int DataLogger::StartPlease()
1835{
1836 if (fDebugIsOn)
1837 {
1838 Debug("Starting...");
1839 }
1840 fFullNightlyLogFileName = CompileFileName(fNightlyFilePath, "", "log");
1841 if (!OpenTextFilePlease(fNightlyLogFile, fFullNightlyLogFileName))
1842 return kSM_BadNightlyConfig;
1843
1844
1845 fFullNightlyReportFileName = CompileFileName(fNightlyFilePath, "", "rep");
1846 if (!OpenTextFilePlease(fNightlyReportFile, fFullNightlyReportFileName))
1847 return kSM_BadNightlyConfig;
1848
1849 //get the size of the newly opened file.
1850 fBaseSizeNightly = GetFileSize(fFullNightlyLogFileName);
1851 fBaseSizeNightly += GetFileSize(fFullNightlyReportFileName);
1852 fFileSizesMap.clear();
1853 fBaseSizeRun = 0;
1854 fPreviousSize = 0;
1855
1856 //notify that a new file has been opened.
1857 const string baseFileName = CompileFileName(fNightlyFilePath, "", "");
1858 NotifyOpenedFile(baseFileName, 3, fOpenedNightlyFiles);
1859
1860 fOpenedNightlyFits.clear();
1861
1862 return kSM_NightlyOpen;
1863}
1864
1865#ifdef HAVE_FITS
1866// --------------------------------------------------------------------------
1867//
1868//! open if required a the FITS files corresponding to a given subscription
1869//! @param sub
1870//! the current DimInfo subscription being examined
1871void DataLogger::OpenFITSFilesPlease(SubscriptionType& sub, RunNumberType* cRunNumber)
1872{
1873 string serviceName(sub.dimInfo->getName());
1874
1875 //if run number has changed, reopen a new fits file with the correct run number.
1876 if (sub.runFile.IsOpen() && sub.runFile.fRunNumber != sub.runNumber)
1877 {
1878 sub.runFile.Close();
1879 Info("Closed: "+sub.runFile.fFileName+" (new run number)");
1880 }
1881
1882 //we must check if we should group this service subscription to a single fits file before we replace the / by _
1883 bool hasGrouping = false;
1884 if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging))
1885 {//will we find this service in the grouping list ?
1886 for (set<string>::const_iterator it=fGrouping.begin(); it!=fGrouping.end(); it++)
1887 {
1888 if (serviceName.find(*it) != string::npos)
1889 {
1890 hasGrouping = true;
1891 break;
1892 }
1893 }
1894 }
1895 hasGrouping = true;
1896 for (unsigned int i=0;i<serviceName.size(); i++)
1897 {
1898 if (serviceName[i] == '/')
1899 {
1900 serviceName[i] = '_';
1901 break;
1902 }
1903 }
1904 //we open the NightlyFile anyway, otherwise this function shouldn't have been called.
1905 if (!sub.nightlyFile.IsOpen())
1906 {
1907 string partialName = CompileFileName(fNightlyFilePath, serviceName, "fits");
1908
1909 const string fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1910 if (!sub.fitsBufferAllocated)
1911 AllocateFITSBuffers(sub);
1912 //get the size of the file we're about to open
1913 if (RememberFileOrigSizePlease(partialName, true))//and remember that the file was opened (i.e. not an update)
1914 fOpenedNightlyFits[fileNameOnly].push_back(serviceName);
1915
1916 ostringstream str;
1917 str << "Opening: " << partialName << " (Nfits=" << fNumSubAndFitsData.numOpenFits << ")";
1918 Info(str);
1919
1920 if (!sub.nightlyFile.Open(partialName, serviceName, NULL, &fNumSubAndFitsData.numOpenFits, this, 0))
1921 {
1922 SetCurrentState(kSM_WriteError);
1923 return;
1924 }
1925 //notify the opening
1926 const string baseFileName = CompileFileName(fNightlyFilePath, "", "");
1927 NotifyOpenedFile(baseFileName, 7, fOpenedNightlyFiles);
1928 if (fNumSubAndFitsIsOn)
1929 fNumSubAndFits->updateService();
1930 }
1931 //do the actual file open
1932 if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging) && sub.runNumber != 0)
1933 {//buffer for the run file have already been allocated when doing the Nightly file
1934 string fileNameOnly;
1935 string partialName;
1936 if (hasGrouping)
1937 {
1938 partialName = CompileFileName(fRunFilePath, sub.runNumber, "", "fits");
1939 fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1940 }
1941 else
1942 {
1943 partialName = CompileFileName(fRunFilePath, sub.runNumber, serviceName, "fits");
1944 fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1945 }
1946 //get the size of the file we're about to open
1947 if (RememberFileOrigSizePlease(partialName, false))//and remember that the file was opened (i.e. not an update)
1948 cRunNumber->openedFits[fileNameOnly].push_back(serviceName);
1949 else
1950 if (hasGrouping)
1951 {
1952 cRunNumber->addServiceToOpenedFits(fileNameOnly, serviceName);
1953 }
1954
1955 if (hasGrouping && (!cRunNumber->runFitsFile.get()))
1956 try
1957 {
1958 cRunNumber->runFitsFile = shared_ptr<CCfits::FITS>(new CCfits::FITS(partialName, CCfits::RWmode::Write));
1959 (fNumSubAndFitsData.numOpenFits)++;
1960 }
1961 catch (CCfits::FitsException e)
1962 {
1963 ostringstream str;
1964 str << "Open FITS file " << partialName << ": " << e.message();
1965 Error(str);
1966 cRunNumber->runFitsFile = shared_ptr<CCfits::FITS>();//NULL;
1967 }
1968
1969 const string baseFileName = CompileFileName(fRunFilePath, sub.runNumber, "", "");
1970 NotifyOpenedFile(baseFileName, 7, fOpenedRunFiles);// + '_' + serviceName, 4);
1971
1972 ostringstream str;
1973 str << "Opening: " << partialName << " (Nfits=" << fNumSubAndFitsData.numOpenFits << ")";
1974 Info(str);
1975
1976 if (hasGrouping)
1977 {
1978 if (!sub.runFile.Open(partialName, serviceName, (cRunNumber->runFitsFile).get(), &fNumSubAndFitsData.numOpenFits, this, sub.runNumber))
1979 {
1980 SetCurrentState(kSM_WriteError);
1981 return;
1982 }
1983 }
1984 else
1985 {
1986 if (sub.runFile.Open(partialName, serviceName, NULL, &fNumSubAndFitsData.numOpenFits, this, sub.runNumber))
1987 {
1988 SetCurrentState(kSM_WriteError);
1989 return;
1990 }
1991 }
1992 if (fNumSubAndFitsIsOn)
1993 fNumSubAndFits->updateService();
1994 }
1995}
1996// --------------------------------------------------------------------------
1997//
1998//! Allocates the required memory for a given pair of fits files (nightly and run)
1999//! @param sub the subscription of interest.
2000//
2001void DataLogger::AllocateFITSBuffers(SubscriptionType& sub)
2002{
2003 //Init the time columns of the file
2004 Description dateDesc(string("Time"), string("Modified Julian Date"), string("MjD"));
2005 sub.nightlyFile.AddStandardColumn(dateDesc, "1D", &fMjD, sizeof(double));
2006 sub.runFile.AddStandardColumn(dateDesc, "1D", &fMjD, sizeof(double));
2007
2008 Description QoSDesc("Qos", "Quality of service", "None");
2009 sub.nightlyFile.AddStandardColumn(QoSDesc, "1J", &fQuality, sizeof(int));
2010 sub.runFile.AddStandardColumn(QoSDesc, "1J", &fQuality, sizeof(int));
2011
2012 const Converter::FormatList flist = sub.fConv->GetList();
2013 // Compilation failed
2014 if (!sub.fConv->valid())
2015 {
2016 Error("Compilation of format string failed.");
2017 return;
2018 }
2019
2020 //we've got a nice structure describing the format of this service's messages.
2021 //Let's create the appropriate FITS columns
2022 int size = sub.dimInfo->getSize();
2023
2024 vector<string> dataFormatsLocal;
2025 for (unsigned int i=0;i<flist.size()-1;i++)
2026 {
2027 ostringstream dataQualifier;
2028
2029 dataQualifier << flist[i].second.first;
2030 switch (flist[i].first.first->name()[0])
2031 {
2032 case 'c':
2033 case 'C':
2034 dataQualifier.str("S");
2035 break;
2036 case 's':
2037 dataQualifier << "I";
2038 break;
2039 case 'i':
2040 case 'I':
2041 dataQualifier << "J";
2042 break;
2043 case 'l':
2044 case 'L':
2045 dataQualifier << "J";
2046 break;
2047 case 'f':
2048 case 'F':
2049 dataQualifier << "E";
2050 break;
2051 case 'd':
2052 case 'D':
2053 dataQualifier << "D";
2054 break;
2055 case 'x':
2056 case 'X':
2057 dataQualifier << "K";
2058 break;
2059 case 'S':
2060 size--;
2061 //for strings, the number of elements I get is wrong. Correct it
2062 dataQualifier.str(""); //clear
2063 dataQualifier << size << "A";
2064 break;
2065
2066 default:
2067 Fatal("THIS SHOULD NEVER BE REACHED. dataLogger.cc ln 1198.");
2068 };
2069 //we skip the variable length strings for now (in fits only)
2070 if (dataQualifier.str() != "S")
2071 dataFormatsLocal.push_back(dataQualifier.str());
2072 }
2073 sub.nightlyFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), size);
2074 sub.runFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), size);
2075 sub.fitsBufferAllocated = true;
2076}
2077// --------------------------------------------------------------------------
2078//
2079//! write a dimInfo data to its corresponding FITS files
2080//
2081void DataLogger::WriteToFITS(SubscriptionType& sub)
2082{
2083 //nightly File status (open or not) already checked
2084 if (sub.nightlyFile.IsOpen())
2085 {
2086 if (!sub.nightlyFile.Write(sub.fConv.get()))
2087 SetCurrentState(kSM_WriteError);
2088 }
2089
2090 if (sub.runFile.IsOpen())
2091 {
2092 if (!sub.runFile.Write(sub.fConv.get()))
2093 SetCurrentState(kSM_WriteError);
2094 }
2095}
2096#endif //if has_fits
2097
2098std::string DataLogger::SetCurrentState(int state, const char *txt, const std::string &cmd)
2099{
2100 if (state == kSM_WriteError && GetCurrentState() == kSM_WriteError)
2101 return "";
2102 return StateMachineImp::SetCurrentState(state, txt, cmd);
2103}
2104// --------------------------------------------------------------------------
2105//
2106//! Implements the StartRun transition.
2107//! Concatenates the given path for the run file and the filename itself (based on the run number),
2108//! and tries to open it.
2109//! @returns
2110//! kSM_Logging if success, kSM_BadRunConfig if failure.
2111int DataLogger::StartRunPlease()
2112{
2113 if (fDebugIsOn)
2114 {
2115 Debug("Starting Run Logging...");
2116 }
2117 //open all the relevant run-files. i.e. all the files associated with run numbers.
2118 for (list<RunNumberType>::iterator it=fRunNumber.begin(); it != fRunNumber.end(); it++)
2119 OpenRunFile(*it);
2120
2121 return kSM_Logging;
2122}
2123
2124#ifdef HAVE_FITS
2125// --------------------------------------------------------------------------
2126//
2127//! Create a fits group file with all the run-fits that were written (either daily or run)
2128//! @param filesToGroup a map of filenames mapping to table names to be grouped (i.e. a
2129//! single file can contain several tables to group
2130//! @param runNumber the run number that should be used for grouping. 0 means nightly group
2131//
2132void DataLogger::CreateFitsGrouping(map<string, vector<string> > & filesToGroup, int runNumber)
2133{
2134 if (fDebugIsOn)
2135 {
2136 ostringstream str;
2137 str << "Creating fits group for ";
2138 if (runNumber != 0)
2139 str << "run files";
2140 else
2141 str << "nightly files";
2142 Debug(str);
2143 }
2144 //create the FITS group corresponding to the ending run.
2145 CCfits::FITS* groupFile;
2146 unsigned int numFilesToGroup = 0;
2147 for (map<string, vector<string> >::const_iterator it=filesToGroup.begin(); it != filesToGroup.end(); it++)
2148 {
2149 numFilesToGroup += it->second.size();
2150 }
2151 if (fDebugIsOn)
2152 {
2153 ostringstream str;
2154 str << "There are " << numFilesToGroup << " tables to group";
2155 Debug(str);
2156 }
2157 if (numFilesToGroup <= 1)
2158 {
2159 filesToGroup.clear();
2160 return;
2161 }
2162 string groupName;
2163 if (runNumber != 0)
2164 groupName = CompileFileName(fRunFilePath, runNumber, "", "fits");
2165 else
2166 groupName = CompileFileName(fNightlyFilePath, "", "fits");
2167
2168 Info("Creating FITS group in: "+groupName);
2169
2170 CCfits::Table* groupTable;
2171 const int maxCharLength = 50;//FILENAME_MAX;
2172 try
2173 {
2174 groupFile = new CCfits::FITS(groupName, CCfits::RWmode::Write);
2175 //setup the column names
2176 ostringstream pathTypeName;
2177 pathTypeName << maxCharLength << "A";
2178 vector<string> names;
2179 vector<string> dataTypes;
2180 names.push_back("MEMBER_XTENSION");
2181 dataTypes.push_back("8A");
2182 names.push_back("MEMBER_URI_TYPE");
2183 dataTypes.push_back("3A");
2184 names.push_back("MEMBER_LOCATION");
2185 dataTypes.push_back(pathTypeName.str());
2186 names.push_back("MEMBER_NAME");
2187 dataTypes.push_back(pathTypeName.str());
2188
2189 groupTable = groupFile->addTable("GROUPING", numFilesToGroup, names, dataTypes);
2190//TODO handle the case when the logger was stopped and restarted during the same day, i.e. the grouping file must be updated
2191 }
2192 catch (CCfits::FitsException e)
2193 {
2194 ostringstream str;
2195 str << "Creating FITS table GROUPING in " << groupName << ": " << e.message();
2196 Error(str);
2197 return;
2198 }
2199
2200 //CCfits seems to be buggy somehow: can't use the column's function "write": it create a compilation error: maybe strings were not thought about.
2201 //use cfitsio routines instead
2202 groupTable->makeThisCurrent();
2203 //create appropriate buffer.
2204 const unsigned int n = 8 + 3 + 2*maxCharLength + 1; //+1 for trailling character
2205
2206 vector<unsigned char> realBuffer;
2207 realBuffer.resize(n);
2208 unsigned char* fitsBuffer = &realBuffer[0];
2209 memset(fitsBuffer, 0, n);
2210
2211 char* startOfExtension = reinterpret_cast<char*>(fitsBuffer);
2212 char* startOfURI = reinterpret_cast<char*>(&fitsBuffer[8]);
2213 char* startOfLocation = reinterpret_cast<char*>(&fitsBuffer[8 + 3]);
2214 char* startOfName = reinterpret_cast<char*>(&fitsBuffer[8+3+maxCharLength]);
2215
2216 strcpy(startOfExtension, "BINTABLE");
2217 strcpy(startOfURI, "URL");
2218
2219 int i=1;
2220 for (map<string, vector<string> >::const_iterator it=filesToGroup.begin(); it!=filesToGroup.end(); it++)
2221 for (vector<string>::const_iterator jt=it->second.begin(); jt != it->second.end(); jt++, i++)
2222 {
2223 strcpy(startOfLocation, it->first.c_str());
2224 strcpy(startOfName, jt->c_str());
2225
2226 if (fDebugIsOn)
2227 {
2228 ostringstream str;
2229 str << "Grouping " << it->first << " " << *jt;
2230 Debug(str);
2231 }
2232
2233 int status = 0;
2234 fits_write_tblbytes(groupFile->fitsPointer(), i, 1, 8+3+2*maxCharLength, fitsBuffer, &status);
2235 if (status)
2236 {
2237 char text[30];//max length of cfitsio error strings (from doc)
2238 fits_get_errstatus(status, text);
2239 ostringstream str;
2240 str << "Writing FITS row " << i << " in " << groupName << ": " << text << " (file_write_tblbytes, rc=" << status << ")";
2241 Error(str);
2242 // FIXME: What to do in case of error?
2243 }
2244 }
2245
2246 filesToGroup.clear();
2247 delete groupFile;
2248}
2249#endif //HAVE_FITS
2250
2251// --------------------------------------------------------------------------
2252//
2253//! Implements the StopRun transition.
2254//! Attempts to close the run file.
2255//! @returns
2256//! kSM_WaitingRun if success, kSM_FatalError otherwise
2257int DataLogger::StopRunPlease()
2258{
2259
2260 if (fDebugIsOn)
2261 {
2262 Debug("Stopping Run Logging...");
2263 }
2264 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it != fRunNumber.end(); it++)
2265 {
2266#ifdef RUN_LOGS
2267 if (!it->logFile->is_open() || !it->reportFile->is_open())
2268#else
2269 if (!it->reportFile->is_open())
2270#endif
2271 return kSM_FatalError;
2272#ifdef RUN_LOGS
2273 it->logFile->close();
2274#endif
2275 it->reportFile->close();
2276 }
2277
2278#ifdef HAVE_FITS
2279 for (SubscriptionsListType::iterator i = fServiceSubscriptions.begin(); i != fServiceSubscriptions.end(); i++)
2280 for (map<string, SubscriptionType>::iterator j = i->second.begin(); j != i->second.end(); j++)
2281 {
2282 if (j->second.runFile.IsOpen())
2283 j->second.runFile.Close();
2284 }
2285#endif
2286 NotifyOpenedFile("", 0, fOpenedRunFiles);
2287 if (fNumSubAndFitsIsOn)
2288 fNumSubAndFits->updateService();
2289
2290 while (fRunNumber.size() > 0)
2291 {
2292 RemoveOldestRunNumber();
2293 }
2294
2295 return kSM_WaitingRun;
2296}
2297// --------------------------------------------------------------------------
2298//
2299//! Implements the Stop and Reset transitions.
2300//! Attempts to close any openned file.
2301//! @returns
2302//! kSM_Ready
2303int DataLogger::GoToReadyPlease()
2304{
2305 if (fDebugIsOn)
2306 {
2307 Debug("Going to the Ready state...");
2308 }
2309 if (GetCurrentState() == kSM_Logging)
2310 StopRunPlease();
2311
2312 if (fNightlyLogFile.is_open())
2313 fNightlyLogFile.close();
2314 if (fNightlyReportFile.is_open())
2315 fNightlyReportFile.close();
2316
2317#ifdef HAVE_FITS
2318 for (SubscriptionsListType::iterator i = fServiceSubscriptions.begin(); i != fServiceSubscriptions.end(); i++)
2319 for (map<string, SubscriptionType>::iterator j = i->second.begin(); j != i->second.end(); j++)
2320 {
2321 if (j->second.nightlyFile.IsOpen())
2322 j->second.nightlyFile.Close();;
2323 }
2324#endif
2325 if (GetCurrentState() == kSM_Logging ||
2326 GetCurrentState() == kSM_WaitingRun ||
2327 GetCurrentState() == kSM_NightlyOpen)
2328 {
2329 NotifyOpenedFile("", 0, fOpenedNightlyFiles);
2330 if (fNumSubAndFitsIsOn)
2331 fNumSubAndFits->updateService();
2332 }
2333#ifdef HAVE_FITS
2334 CreateFitsGrouping(fOpenedNightlyFits, 0);
2335#endif
2336 return kSM_Ready;
2337}
2338// --------------------------------------------------------------------------
2339//
2340//! Implements the transition towards kSM_WaitingRun
2341//! Does nothing really.
2342//! @returns
2343//! kSM_WaitingRun
2344int DataLogger::NightlyToWaitRunPlease()
2345{
2346 if (fDebugIsOn)
2347 {
2348 Debug("Going to Wait Run Number state...");
2349 }
2350 return kSM_WaitingRun;
2351}
2352// --------------------------------------------------------------------------
2353//
2354//! Setup Logger's configuration from a Configuration object
2355//! @param conf the configuration object that should be used
2356//!
2357bool DataLogger::SetConfiguration(Configuration& conf)
2358{
2359 fDebugIsOn = conf.Get<bool>("debug");
2360
2361 //Set the block or allow list
2362 fBlackList.clear();
2363 fWhiteList.clear();
2364
2365 //Adding entries that should ALWAYS be ignored
2366 //fBlackList.insert("DATA_LOGGER/");
2367 fBlackList.insert("/SERVICE_LIST");
2368 fBlackList.insert("DIS_DNS/");
2369
2370 //set the black list
2371 if (conf.Has("block"))
2372 {
2373 const vector<string> vec = conf.Get<vector<string>>("block");
2374
2375 fBlackList.insert(vec.begin(), vec.end());
2376 }
2377
2378 //set the white list
2379 if (conf.Has("allow"))
2380 {
2381 const vector<string> vec = conf.Get<vector<string>>("allow");
2382 fWhiteList.insert(vec.begin(), vec.end());
2383 }
2384
2385 //Set the grouping
2386 if (conf.Has("group"))
2387 {
2388 const vector<string> vec = conf.Get<vector<string>>("group");
2389 fGrouping.insert(vec.begin(), vec.end());
2390 }
2391
2392 //set the old run numbers timeout delay
2393 if (conf.Has("runtimeout"))
2394 {
2395 const long timeout = conf.Get<long>("runtimeout");
2396 if (timeout <= 0)
2397 {
2398 Error("Time out delay for old run numbers should be greater than 0 minute");
2399 return false;
2400 }
2401 fRunNumberTimeout = timeout;
2402 }
2403 return true;
2404}
2405
2406// --------------------------------------------------------------------------
2407int RunDim(Configuration &conf)
2408{
2409 WindowLog wout;
2410
2411 //log.SetWindow(stdscr);
2412 if (conf.Has("log"))
2413 if (!wout.OpenLogFile(conf.Get<string>("log")))
2414 wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
2415
2416 // Start io_service.Run to use the StateMachineImp::Run() loop
2417 // Start io_service.run to only use the commandHandler command detaching
2418 DataLogger logger(wout);
2419 if (!logger.SetConfiguration(conf))
2420 return -1;
2421
2422 logger.Run(true);
2423
2424 return 0;
2425}
2426// --------------------------------------------------------------------------
2427void RunThread(DataLogger* logger)
2428{
2429 // This is necessary so that the StateMachine Thread can signal the
2430 // Readline thread to exit
2431 logger->Run(true);
2432 Readline::Stop();
2433}
2434// --------------------------------------------------------------------------
2435template<class T>
2436int RunShell(Configuration &conf)
2437{
2438 static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
2439
2440 WindowLog &win = shell.GetStreamIn();
2441 WindowLog &wout = shell.GetStreamOut();
2442
2443 if (conf.Has("log"))
2444 if (!wout.OpenLogFile(conf.Get<string>("log")))
2445 win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
2446
2447 DataLogger logger(wout);
2448
2449 if (!logger.SetConfiguration(conf))
2450 return -1;
2451
2452 shell.SetReceiver(logger);
2453
2454 boost::thread t(boost::bind(RunThread, &logger));
2455
2456 shell.Run(); // Run the shell
2457
2458 logger.Stop();
2459
2460 //Wait until the StateMachine has finished its thread
2461 //before returning and destroyinng the dim objects which might
2462 //still be in use.
2463 t.join();
2464
2465 return 0;
2466}
2467
2468/*
2469 Extract usage clause(s) [if any] for SYNOPSIS.
2470 Translators: "Usage" and "or" here are patterns (regular expressions) which
2471 are used to match the usage synopsis in program output. An example from cp
2472 (GNU coreutils) which contains both strings:
2473 Usage: cp [OPTION]... [-T] SOURCE DEST
2474 or: cp [OPTION]... SOURCE... DIRECTORY
2475 or: cp [OPTION]... -t DIRECTORY SOURCE...
2476 */
2477void PrintUsage()
2478{
2479 cout << "\n"
2480 "The data logger connects to all available Dim services and "
2481 "writes them to ascii and fits files.\n"
2482 "\n"
2483 "The default is that the program is started without user interaction. "
2484 "All actions are supposed to arrive as DimCommands. Using the -c "
2485 "option, a local shell can be initialized. With h or help a short "
2486 "help message about the usage can be brought to the screen.\n"
2487 "\n"
2488 "Usage: dataLogger [-c type] [OPTIONS]\n"
2489 " or: dataLogger [OPTIONS]\n";
2490 cout << endl;
2491
2492}
2493// --------------------------------------------------------------------------
2494void PrintHelp()
2495{
2496 /* Additional help text which is printed after the configuration
2497 options goes here */
2498 cout <<
2499 "\n"
2500 "If the allow list has any element, only the servers and/or services "
2501 "specified in the list will be used for subscription. The black list "
2502 "will disable service subscription and has higher priority than the "
2503 "allow list. If the allow list is not present by default all services "
2504 "will be subscribed."
2505 "\n"
2506 "For example, block=DIS_DNS/ will skip all the services offered by "
2507 "the DIS_DNS server, while block=/SERVICE_LIST will skip all the "
2508 "SERVICE_LIST services offered by any server and DIS_DNS/SERVICE_LIST "
2509 "will skip DIS_DNS/SERVICE_LIST.\n"
2510 << endl;
2511}
2512
2513// --------------------------------------------------------------------------
2514void SetupConfiguration(Configuration &conf)
2515{
2516 const string n = conf.GetName()+".log";
2517
2518 po::options_description configp("Program options");
2519 configp.add_options()
2520 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
2521 ("log,l", var<string>(n), "Write log-file")
2522 ("console,c", var<int>(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
2523 ;
2524
2525 po::options_description configs("DataLogger options");
2526 configs.add_options()
2527 ("block,b", vars<string>(), "Black-list of services")
2528 ("allow,a", vars<string>(), "White-list of services")
2529 ("debug", po_bool(), "Debug mode. Print clear text of received service reports to log-stream")
2530 ("group,g", vars<string>(), "Grouping of services into a single run-Fits")
2531 ("runtimeout", var<long>(), "Time out delay for old run numbers")
2532 ;
2533
2534 conf.AddEnv("dns", "DIM_DNS_NODE");
2535
2536 conf.AddOptions(configp);
2537 conf.AddOptions(configs);
2538}
2539// --------------------------------------------------------------------------
2540int main(int argc, const char* argv[])
2541{
2542 Configuration conf(argv[0]);
2543 conf.SetPrintUsage(PrintUsage);
2544 SetupConfiguration(conf);
2545
2546 po::variables_map vm;
2547 try
2548 {
2549 vm = conf.Parse(argc, argv);
2550 }
2551#if BOOST_VERSION > 104000
2552 catch (po::multiple_occurrences &e)
2553 {
2554 cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl;
2555 return -1;
2556 }
2557#endif
2558 catch (exception& e)
2559 {
2560 cerr << "Program options invalid due to: " << e.what() << endl;
2561 return -1;
2562 }
2563
2564 if (conf.HasVersion() || conf.HasPrint())
2565 return -1;
2566
2567 if (conf.HasHelp())
2568 {
2569 PrintHelp();
2570 return -1;
2571 }
2572
2573 Dim::Setup(conf.Get<string>("dns"));
2574
2575// try
2576 {
2577 // No console access at all
2578 if (!conf.Has("console"))
2579 return RunDim(conf);
2580
2581 // Console access w/ and w/o Dim
2582 if (conf.Get<int>("console")==0)
2583 return RunShell<LocalShell>(conf);
2584 else
2585 return RunShell<LocalConsole>(conf);
2586 }
2587/* catch (exception& e)
2588 {
2589 cerr << "Exception: " << e.what() << endl;
2590 return -1;
2591 }*/
2592
2593 return 0;
2594}
Note: See TracBrowser for help on using the repository browser.