source: trunk/FACT++/src/dataLogger.cc@ 10991

Last change on this file since 10991 was 10991, checked in by tbretz, 13 years ago
Fixed a typo in a command description.
File size: 90.2 KB
Line 
1//****************************************************************
2/** @class DataLogger
3
4 @brief Logs all message and infos between the services
5
6 This is the main logging class facility.
7 It derives from StateMachineDim and DimInfoHandler. the first parent is here to enforce
8 a state machine behaviour, while the second one is meant to make the dataLogger receive
9 dim services to which it subscribed from.
10 The possible states and transitions of the machine are:
11 \dot
12 digraph datalogger {
13 node [shape=record, fontname=Helvetica, fontsize=10];
14 e [label="Error" color="red"];
15 r [label="Ready"]
16 d [label="NightlyOpen"]
17 w [label="WaitingRun"]
18 l [label="Logging"]
19 b [label="BadNightlyconfig" color="red"]
20 c [label="BadRunConfig" color="red"]
21
22 e -> r
23 r -> e
24 r -> d
25 r -> b
26 d -> w
27 d -> r
28 w -> r
29 l -> r
30 l -> w
31 b -> d
32 w -> c
33 w -> l
34 b -> r
35 c -> r
36 c -> l
37 }
38 \enddot
39 */
40 //****************************************************************
41#include "Dim.h"
42#include "Event.h"
43#include "Time.h"
44#include "StateMachineDim.h"
45#include "WindowLog.h"
46#include "Configuration.h"
47#include "ServiceList.h"
48#include "Converter.h"
49#include "MessageImp.h"
50#include "LocalControl.h"
51#include "DimDescriptionService.h"
52
53#include "Description.h"
54
55//#include "DimServiceInfoList.h"
56#include "DimNetwork.h"
57//for getting stat of opened files
58#include <unistd.h>
59//for getting disk free space
60#include <sys/statvfs.h>
61//for getting files sizes
62#include <sys/stat.h>
63
64#include <fstream>
65#include <mutex>
66
67#include <boost/bind.hpp>
68#if BOOST_VERSION < 104400
69#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
70#undef BOOST_HAS_RVALUE_REFS
71#endif
72#endif
73#include <boost/thread.hpp>
74
75#ifdef HAVE_FITS
76#include "Fits.h"
77#endif
78
79//Dim structures
80///Distributes the writing statistics
81struct DataLoggerStats {
82 long sizeWritten;
83 long freeSpace;
84 long writingRate;
85};
86///distributes the number of opened subscriptions and fits files
87struct NumSubAndFitsType {
88 int numSubscriptions;
89 int numOpenFits;
90};
91///distributes which files were opened.
92struct OpenFileToDim {
93 int code;
94 char fileName[FILENAME_MAX];
95};
96
97///Run number record. Used to keep track of which run numbers are still active
98struct RunNumberType {
99#ifdef RUN_LOGS
100 ///the run number log file
101 shared_ptr<ofstream> logFile;
102#endif
103 ///the run number report file
104 shared_ptr<ofstream> reportFile;
105#ifdef HAVE_FITS
106 ///the run number group fits file
107 shared_ptr<CCfits::FITS> runFitsFile;
108#endif
109#ifdef RUN_LOGS
110 ///the log filename
111 string logName;
112#endif
113 ///the report filename
114 string reportName;
115 ///the actual run number
116 uint32_t runNumber;
117 ///the time at which the run number was received
118 Time time;
119 ///list of opened fits used to create the fits grouping when the run ends
120 map<string, vector<string> > openedFits;
121 ///default constructor
122 RunNumberType()
123 {
124#ifdef RUN_LOGS
125 logFile = shared_ptr<ofstream>(new ofstream());
126#endif
127 reportFile = shared_ptr<ofstream>(new ofstream());
128#ifdef HAVE_FITS
129 runFitsFile = shared_ptr<CCfits::FITS>();
130#endif
131 runNumber = 0;
132 }
133 ///default destructor
134 ~RunNumberType()
135 {
136
137 }
138
139 void addServiceToOpenedFits(const string& fileName, const string& serviceName)
140 {
141 //most likely I should add this service name.
142 //the only case for which I should not add it is if a service disapeared, hence the file was closed
143 //and reopened again. Unlikely to happen, but well it may
144
145 if (find(openedFits[fileName].begin(), openedFits[fileName].end(),
146 serviceName)==openedFits[fileName].end())
147 openedFits[fileName].push_back(serviceName);
148 }
149};
150///Dim subscription type. Stores all the relevant info to handle a Dim subscription
151struct SubscriptionType
152{
153#ifdef HAVE_FITS
154 ///Nightly FITS output file
155 Fits nightlyFile;
156 ///run-specific FITS output file
157 Fits runFile;
158#endif
159 ///the actual dimInfo pointer
160 shared_ptr<DimStampedInfo> dimInfo;
161 ///the server
162 string server;
163 ///the service
164 string service;
165 ///the converter for outputting the data according to the format
166 shared_ptr<Converter> fConv;
167 ///the current run number used by this subscription
168 uint32_t runNumber;
169 ///time of the latest received event
170 Time lastReceivedEvent;
171 ///whether or not the fits buffer was allocated already
172 bool fitsBufferAllocated;
173
174 ///Dim info constructor
175 SubscriptionType(DimStampedInfo* info=NULL)
176 {
177 dimInfo = shared_ptr<DimStampedInfo>(info);
178 fConv = shared_ptr<Converter>();
179 runNumber = 0;
180 lastReceivedEvent = Time::None;
181 fitsBufferAllocated = false;
182 }
183
184 ///default destructor
185 ~SubscriptionType()
186 {
187 }
188};
189
190class DataLogger : public StateMachineDim, DimServiceInfoList
191{
192public:
193 /// The list of existing states specific to the DataLogger
194 enum
195 {
196 kSM_NightlyOpen = 20, ///< Nightly file openned and writing
197 kSM_WaitingRun = 30, ///< waiting for the run number to open the run file
198 kSM_Logging = 40, ///< both files openned and writing
199 kSM_BadNightlyConfig = 0x101, ///< the folder specified for Nightly logging does not exist or has bad permissions
200 kSM_BadRunConfig = 0x102, ///< the folder specified for the run logging does not exist or has wrong permissions or no run number
201 kSM_WriteError = 0x103, ///< Denotes that an error occured while writing a file (text or fits).
202 } localstates_t;
203
204 DataLogger(ostream &out);
205 ~DataLogger();
206
207 bool SetConfiguration(Configuration& conf);
208
209private:
210 /************************************************
211 * MEMBER VARIABLES
212 ************************************************/
213 /// ofstream for the NightlyLogfile
214 ofstream fNightlyLogFile;
215 /// ofstream for the Nightly report file
216 ofstream fNightlyReportFile;
217 /// base path of the Nightlyfile
218 string fNightlyFilePath;
219 ///base path of the run file
220 string fRunFilePath;
221 ///run numbers
222 list<RunNumberType> fRunNumber;
223 ///old run numbers time-out delay (in minutes)
224 long fRunNumberTimeout;
225 ///previous run number. to check if changed while logging
226 int fPreviousRunNumber;
227 ///Current Service Quality
228 int fQuality;
229 ///Modified Julian Date
230 double fMjD;
231 ///for obtaining the name of the existing services
232// ServiceList fServiceList;
233 typedef map<const string, map<string, SubscriptionType> > SubscriptionsListType;
234 ///All the services to which we have subscribed to, sorted by server name.
235 SubscriptionsListType fServiceSubscriptions;
236 ///full name of the nightly log file
237 string fFullNightlyLogFileName;
238 ///full name of the nightly report file
239 string fFullNightlyReportFileName;
240 ///variable to track when the statistic were last calculated
241 Time fPreviousStatsUpdateTime;
242 Time fPreviousOldRunNumberCheck;
243 ///boolean to know whether we should close and reopen daily files or not
244 bool fDailyFileDayChangedAlready;
245
246private:
247 /***************************************************
248 * DIM INFO HANDLER
249 ***************************************************/
250 //overloading of DIM's infoHandler function
251 void infoHandler();
252
253 /***************************************************
254 * TRANSITION FUNCTIONS
255 ***************************************************/
256 ///Reporting method for the services info received
257 void ReportPlease(DimInfo* I, SubscriptionType& sub);
258
259 int ConfigureFileName(string &target, const string &type, const EventImp &evt);
260 ///Configuration of the nightly file path
261 int ConfigureNightlyFileName(const Event& evt);
262 ///Configuration fo the file name
263 int ConfigureRunFileName(const Event& evt);
264 ///DEPREC - configuration of the run number
265 int ConfigureRunNumber(const Event& evt);
266 ///print the current state of the dataLogger
267 int PrintStatePlease(const Event& evt);
268 ///checks whether or not the current info being treated is a run number
269 void CheckForRunNumber(DimInfo* I);
270 /// start transition
271 int StartPlease();
272 ///from waiting to logging transition
273 int StartRunPlease();
274 /// from logging to waiting transition
275 int StopRunPlease();
276 ///stop and reset transition
277 int GoToReadyPlease();
278 ///from NightlyOpen to waiting transition
279 int NightlyToWaitRunPlease();
280 /// from writing to error
281 std::string SetCurrentState(int state, const char *txt="", const std::string &cmd="");
282#ifdef HAVE_FITS
283 ///Open fits files
284 void OpenFITSFilesPlease(SubscriptionType& sub, RunNumberType* cRunNumber);
285 ///Write data to FITS files
286 void WriteToFITS(SubscriptionType& sub);
287 ///Allocate the buffers required for fits
288 void AllocateFITSBuffers(SubscriptionType& sub);
289#endif//has_fits
290
291 /***************************************
292 * DIM SERVICES PROVIDED BY THE DATA LOGGER
293 ***************************************/
294 ///monitoring notification loop
295 void ServicesMonitoring();
296 inline void NotifyOpenedFile(const string &name, int type, DimDescribedService* service);
297 ///variables for computing statistics
298 DataLoggerStats fStatVar;
299 ///mutex to make sure that the Stats are not accessed while updating
300// mutex fStatsMutex;
301 ///services notification thread
302// boost::thread fMonitoringThread;
303 ///end of the monitoring
304// bool fContinueMonitoring;
305 ///stores the size of each file that is or was open
306 map<string, long> fFileSizesMap;
307 ///total size of the opened files BEFORE they were opened by the logger
308 long fBaseSizeNightly;
309 long fPreviousSize;
310 long fBaseSizeRun;
311 ///Service for opened files
312 DimDescribedService* fOpenedNightlyFiles;
313 DimDescribedService* fOpenedRunFiles;
314 DimDescribedService* fNumSubAndFits;
315 DimDescribedService* fStatsMonitoring;
316 NumSubAndFitsType fNumSubAndFitsData;
317 ///Small function for calculating the total size written so far
318 bool calculateTotalSizeWritten(DataLoggerStats& statVar, bool isPrinting);
319
320 /***************************************************
321 * DATA LOGGER's CONFIGURATION STUFF
322 ***************************************************/
323 ///black/white listing
324 set<string> fBlackList;
325 set<string> fWhiteList;
326 ///list of services to be grouped
327 set<string> fGrouping;
328 ///configuration flags
329 bool fDebugIsOn;
330 float fStatsPeriodDuration;
331 bool fOpenedFilesIsOn;
332 bool fNumSubAndFitsIsOn;
333 //functions for controlling the services behavior
334 int SetDebugOnOff(const Event& evt);
335 int SetStatsPeriod(const Event& evt);
336 int SetOpenedFilesOnOff(const Event& evt);
337 int SetNumSubsAndFitsOnOff(const Event& evt);
338 int SetRunTimeoutDelay(const Event& evt);
339
340 ///boolean to prevent DIM update while desctructing the dataLogger
341 bool fDestructing;
342 /***************************************************
343 * UTILITIES
344 ***************************************************/
345 ///vectors to keep track of opened Fits files, for grouping purposes.
346 map<string, vector<string> > fOpenedNightlyFits;
347 ///creates a group fits file based on a list of files to be grouped
348 void CreateFitsGrouping(map<string, vector<string> >& filesToGroup, int runNumber);
349
350 bool OpenStream(shared_ptr<ofstream> stream, const string &filename);
351 ///Open the relevant text files related to a particular run
352 int OpenRunFile(RunNumberType& run);
353 ///add a new run number
354 void AddNewRunNumber(int64_t newRun, Time time);
355 ///removes the oldest run number, and close the relevant files.
356 void RemoveOldestRunNumber();
357 ///retrieves the size of a file
358 off_t GetFileSize(const string&);
359 ///Get the digits of year, month and day for filenames and paths
360 void GetYearMonthDayForFiles(unsigned short& year, unsigned short& month, unsigned short& day);
361 ///Appends the relevant year month day to a given path
362 void AppendYearMonthDaytoPath(string& path);
363 ///Form the files path
364 string CompileFileName(const string &path, const string &service, const string & extension, const Time &time=Time());
365 ///Form the files path
366 string CompileFileName(const string &path, uint32_t run, const string &service, const string & extension, const Time &time=Time());
367 ///Check whether service is in black and/or white list
368 bool ShouldSubscribe(const string& server, const string& service);
369 ///Subscribe to a given server and service
370 DimStampedInfo* SubscribeToPlease(const string& server, const string& service);
371 ///Open a text file and checks for ofstream status
372 void OpenTextFilePlease(ofstream& stream, const string& name);
373 ///Check if a dir is . and returns the actual string corresponding to .
374// string CheckIfDirIsDot(const string& dir);
375 ///Remembers the size of newly opened files. for statistic purposes
376 bool RememberFileOrigSizePlease(string& fileName, bool nightly);
377 ///Checks if the input osftream is in error state, and if so close it.
378 void CheckForOfstreamError(ofstream& out);
379 ///Checks if a given path exist
380 bool DoesPathExist(string path);
381 ///Check if the statistics service should be updated, and if so, do it
382 void UpdateStatisticsService();
383 ///Check if old run numbers can be trimmed, and if so, do it
384 void TrimOldRunNumbers();
385public:
386 ///Create a given directory
387 bool CreateDirectory(string path);
388 /***************************************************
389 * INHERITED FROM DIMSERVICEINFOLIST
390 ***************************************************/
391 ///Add a new service subscription
392 void AddService(const string&, const string&, const string&, bool);
393 ///Remove a given service subscription
394 void RemoveService(const string&, const string&, bool);
395 ///Remove all the services associated with a given server
396 void RemoveAllServices(const string&);
397}; //DataLogger
398
399// --------------------------------------------------------------------------
400//
401//! Check if a given path exists
402//! @param path the path to be checked
403//! @return whether or not the creation has been successfull
404//
405bool DataLogger::CreateDirectory(string path)
406{
407 //remove last '/', if present
408 if (path[path.size()-1] == '/')
409 path = path.substr(0, path.size()-1);
410
411 //create boost path
412 const boost::filesystem::path fullPath = boost::filesystem::system_complete(boost::filesystem::path(path));
413
414 //if path does not exist, check if upper levels exist already
415 if (boost::filesystem::exists(fullPath))
416 {
417 //if path already exist, make sure it does not designate a file (filenames cannot be checked if they do not exist)
418 if (boost::filesystem::is_directory(fullPath))
419 return true;
420
421 Error("Path to be created contains a file name: '" + path + "'");
422 return false;
423 }
424
425 if (path.size() <= 1)
426 {//we're hitting "/", which SHOULD have existed...
427 Error("Something unexpected happened while creating a path");
428 }
429 CreateDirectory(path.substr(0, path.find_last_of('/')));
430
431 //path does not exist, and upper level have been created already by recusrion.
432 const mode_t rightsMask = S_IRWXU | S_IXGRP | S_IRGRP | S_IXOTH | S_IROTH; //everybody read, owner writes
433
434 const int returnValue = mkdir(path.c_str(), rightsMask);
435
436 if (returnValue != 0)
437 {
438 ostringstream str;
439 str << "Could not create directory " << path << " mkdir error code: " << errno;
440 Error(str.str());
441 return false;
442 }
443
444 return true;
445}
446// --------------------------------------------------------------------------
447//
448//! Check if a given path exists
449//! @param path the path to be checked
450//! @return whether or not the given path exists
451//
452bool DataLogger::DoesPathExist(string path)
453{
454 const boost::filesystem::path fullPath = boost::filesystem::system_complete(boost::filesystem::path(path));
455
456 if (!boost::filesystem::exists(fullPath))
457 return false;
458
459 if (!boost::filesystem::is_directory(fullPath))
460 {
461 Error("Path given for checking '" + path + "' designate a file name. Please provide a path name only");
462 return false;
463 }
464
465 if (access(path.c_str(), R_OK|W_OK|X_OK) != 0)
466 {
467 Error("Missing read, write or execute permissions on directory '" + path + "'");
468 return false;
469 }
470
471 return true;
472}
473// --------------------------------------------------------------------------
474//
475//! Add a new service subscription
476//! @param server the server for which the subscription should be created
477//! @param service the service for which the subscription should be created
478//! @param isCmd whether this is a Dim Command or not. Commands are not logged
479//
480void DataLogger::AddService(const string& server, const string& service, const string&, bool isCmd)
481{
482 //dataLogger does not subscribe to commands
483 if (isCmd)
484 return;
485
486 //check the given subscription against black and white lists
487 if (!ShouldSubscribe(server, service))
488 return;
489
490 map<string, SubscriptionType> &list = fServiceSubscriptions[server];
491
492 if (list.find(service) != list.end())
493 {
494 Error("Service " + server + "/" + service + " is already in the dataLogger's list. ignoring its update.");
495 return;
496 }
497
498 list[service].dimInfo = shared_ptr<DimStampedInfo>(SubscribeToPlease(server, service));
499 list[service].server = server;
500 list[service].service = service;
501 fNumSubAndFitsData.numSubscriptions++;
502 if (fDebugIsOn)
503 Debug("Added subscription to " + server + "/" + service);
504}
505// --------------------------------------------------------------------------
506//
507//! Remove a given service subscription
508//! @param server the server for which the subscription should be removed
509//! @param service the service that should be removed
510//! @param isCmd whether or not this is a command
511//
512void DataLogger::RemoveService(const string& server, const string& service, bool isCmd)
513{
514 if (isCmd)
515 return;
516 if (fServiceSubscriptions[server].erase(service) != 1)
517 {
518 ostringstream str;
519 str << "Subscription " << server << "/" << service << "could not be removed as it is not present";
520 Error(str.str());
521 return;
522 }
523 fNumSubAndFitsData.numSubscriptions--;
524 if (fDebugIsOn)
525 {
526 Debug("Removed subscription to " + server + "/" + service);
527 }
528}
529// --------------------------------------------------------------------------
530//
531//! Remove all the services associated with a given server
532//! @param server the server for which all the services should be removed
533//
534void DataLogger::RemoveAllServices(const string& server)
535{
536 fNumSubAndFitsData.numSubscriptions -= fServiceSubscriptions[server].size();
537 fServiceSubscriptions[server].clear();
538 if (fDebugIsOn)
539 {
540 Debug("Removed all subscriptions to " + server + "/");
541 }
542}
543// --------------------------------------------------------------------------
544//
545//! Checks if the given ofstream is in error state and if so, close it
546//! @param out the ofstream that should be checked
547//
548void DataLogger::CheckForOfstreamError(ofstream& out)
549{
550 if (out.good())
551 return;
552
553 Error("An error occured while writing to a text file. Closing it");
554 if (out.is_open())
555 out.close();
556 SetCurrentState(kSM_WriteError);
557}
558// --------------------------------------------------------------------------
559//
560//! Checks the size on disk of a given size, and remembers it in the relevant member variable
561//! @param fileName the file for which the size on disk should be retrieved
562//! @param nightly whether this is a run or nightly file, so that its size is added to the correct member variable
563//
564bool DataLogger::RememberFileOrigSizePlease(string& fileName, bool nightly)
565{
566 //get the size of the file we're about to open
567 if (fFileSizesMap.find(fileName) != fFileSizesMap.end())
568 return false;
569
570 if (nightly)
571 fBaseSizeNightly += GetFileSize(fileName);
572 else
573 fBaseSizeRun += GetFileSize(fileName);
574 fFileSizesMap[fileName] = 0;
575 return true;
576}
577
578// --------------------------------------------------------------------------
579//
580//! Open a text file and checks for error code
581//! @param stream the ofstream for which the file should be opened
582//! @name the file name
583//
584void DataLogger::OpenTextFilePlease(ofstream& stream, const string& name)
585{
586 errno = 0;
587 stream.open(name.c_str(), ios_base::out | ios_base::app);
588 if (!stream)
589 {
590 ostringstream str;
591 str << "Open file " << name << ": " << strerror(errno) << " (errno=" << errno << ")";
592 Error(str);
593 }
594}
595// --------------------------------------------------------------------------
596//
597//! Create a new dim subscription to a given server and service
598//! @param server the server name
599//! @param service the service name
600//
601DimStampedInfo* DataLogger::SubscribeToPlease(const string& server, const string& service)
602{
603 if (fDebugIsOn)
604 {
605 ostringstream str;
606 str << "Subscribing to service " << server << "/" << service;
607 Debug(str);
608 }
609 return new DimStampedInfo((server + "/" + service).c_str(), const_cast<char*>(""), this);
610}
611// --------------------------------------------------------------------------
612//
613//! Check whether a service should be subscribed to, based on the black/white list entries
614//! @param server the server name associated with the service being checked
615//! @param service the service name associated with the service being checked
616//
617bool DataLogger::ShouldSubscribe(const string& server, const string& service)
618{
619 if (fWhiteList.size()>0 &&
620 (fWhiteList.find(server + "/") == fWhiteList.end()) &&
621 (fWhiteList.find(server + "/" + service) == fWhiteList.end()) &&
622 (fWhiteList.find("/" + service) == fWhiteList.end()))
623 return false;
624
625 if ((fBlackList.find(server + "/") != fBlackList.end()) ||
626 (fBlackList.find(server + "/" + service) != fBlackList.end()) ||
627 (fBlackList.find("/" + service) != fBlackList.end()))
628 return false;
629
630 return true;
631}
632// --------------------------------------------------------------------------
633//
634//! Compiles a file name
635//! @param path the base path where to put the file
636//! @param time the time at which the file is created
637//! @param service the service name, if any
638//! @param extension the extension to add, if any
639//
640string DataLogger::CompileFileName(const string &path, const string &service, const string & extension, const Time &time)
641{
642 ostringstream str;
643 //calculate time suitable for naming files.
644 const Time ftime(time-boost::posix_time::time_duration(12,0,0));
645
646 //output it
647 str << path << Time::fmt("/%Y/%m/%d") << ftime;
648
649 //check if target directory exist
650 if (!DoesPathExist(str.str()))
651 CreateDirectory(str.str());
652
653 //output base of file name
654 str << Time::fmt("/%Y_%m_%d") << ftime;
655
656 //output service name
657 if (!service.empty())
658 str << "_" << service;
659
660 //output appropriate extension
661 if (!extension.empty())
662 str << "." << extension;
663
664 return str.str();
665}
666// --------------------------------------------------------------------------
667//
668//! Compiles a file name
669//! @param path the base path where to put the file
670//! @param time the time at which the file is created
671//! @param run the run number
672//! @param service the service name, if any
673//! @param extension the extension to add, if any
674//
675string DataLogger::CompileFileName(const string &path, uint32_t run, const string &service, const string & extension, const Time &time)
676{
677 ostringstream str;
678 //calculate suitable time for naming files and output it
679 str << path << Time::fmt("/%Y/%m/%d") << (time-boost::posix_time::time_duration(12,0,0));
680
681 //check if target directory exist
682 if (!DoesPathExist(str.str()))
683 CreateDirectory(str.str());
684
685 //output base of file name
686 str << '/' << setfill('0') << setw(8) << run;
687
688 //output service name
689 if (!service.empty())
690 str << "_" << service;
691
692 //output appropriate extension
693 if (!extension.empty())
694 str << "." << extension;
695 return str.str();
696}
697
698// --------------------------------------------------------------------------
699//
700//!retrieves the size on disk of a file
701//! @param fileName the full file name for which the size on disk should be retrieved
702//! @return the size of the file on disk, in bytes. 0 if the file does not exist or if an error occured
703//
704off_t DataLogger::GetFileSize(const string& fileName)
705{
706 errno = 0;
707 struct stat st;
708 if (!stat(fileName.c_str(), &st))
709 return st.st_size;
710
711 if (errno != 0 && errno != 2)//ignoring error #2: no such file or directory is not an error for new files
712 {
713 ostringstream str;
714 str << "Unable to stat " << fileName << ". Reason: " << strerror(errno) << " [" << errno << "]";
715 Error(str);
716 }
717
718 return 0;
719}
720// --------------------------------------------------------------------------
721//
722//! Removes the oldest run number and closes the fits files that should be closed
723//! Also creates the fits grouping file
724//
725void DataLogger::RemoveOldestRunNumber()
726{
727 if (fDebugIsOn)
728 {
729 ostringstream str;
730 str << "Removing run number " << fRunNumber.front().runNumber;
731 Debug(str);
732 }
733 CreateFitsGrouping(fRunNumber.front().openedFits, fRunNumber.front().runNumber);
734
735 //crawl through the subscriptions to see if there are still corresponding fits files opened.
736 for (SubscriptionsListType::iterator x=fServiceSubscriptions.begin();
737 x!=fServiceSubscriptions.end(); x++)
738 for (map<string, SubscriptionType>::iterator y=x->second.begin();
739 y!=x->second.end(); y++)
740 if (y->second.runFile.fRunNumber == fRunNumber.front().runNumber && y->second.runFile.IsOpen())
741 {
742 if (fDebugIsOn)
743 {
744 ostringstream str;
745 str << "Closing Fits run file " << y->second.runFile.fFileName;
746 Debug(str);
747 }
748 y->second.runFile.Close();
749 }
750 //if a grouping file is on, decrease the number of opened fits manually
751 if (fRunNumber.front().runFitsFile)
752 (fNumSubAndFitsData.numOpenFits)--;
753 //remove the entry
754 fRunNumber.pop_front();
755}
756
757// --------------------------------------------------------------------------
758//
759//! Calculate the total number of written bytes since the logger was started
760//! @param statVar the data structure that should be updated
761//! @param shouldWarn whether or not error messages should be outputted
762//! @param isPrinting whether this function was called from the PRINT command or not. If so, displays relevant information
763//
764bool DataLogger::calculateTotalSizeWritten(DataLoggerStats& statVar, bool isPrinting)
765{
766//mutex
767// if (!isPrinting)
768// fStatsMutex.lock();
769#ifdef HAVE_FITS
770 if (isPrinting)
771 {
772 ostringstream str;
773 str << "There are " << fNumSubAndFitsData.numOpenFits << " FITS files open:";
774 Message(str);
775 }
776
777 ///TODO the grouping file is dealt with several times. This should not be a problem but well, better to fix it I guess.
778 for (SubscriptionsListType::const_iterator x=fServiceSubscriptions.begin();
779 x!=fServiceSubscriptions.end(); x++)
780 {
781 for (map<string, SubscriptionType>::const_iterator y=x->second.begin();
782 y!=x->second.end(); y++)
783 {
784 if (y->second.runFile.IsOpen())
785 {
786 fFileSizesMap[y->second.runFile.fFileName] = y->second.runFile.GetWrittenSize();
787 if (isPrinting)
788 Message("-> "+y->second.runFile.fFileName);
789 }
790 if (y->second.nightlyFile.IsOpen())
791 {
792 fFileSizesMap[y->second.nightlyFile.fFileName] = y->second.nightlyFile.GetWrittenSize();
793 if (isPrinting)
794 Message("-> "+y->second.nightlyFile.fFileName);
795 }
796 }
797 }
798#else
799 if (isPrinting)
800 Message("FITS output disabled at compilation");
801#endif
802 //gather log and report files sizes on disk
803 if (fNightlyLogFile.is_open())
804 fFileSizesMap[fFullNightlyLogFileName] = GetFileSize(fFullNightlyLogFileName);
805 if (fNightlyReportFile.is_open())
806 fFileSizesMap[fFullNightlyReportFileName] = GetFileSize(fFullNightlyReportFileName);
807 for (list<RunNumberType>::iterator it = fRunNumber.begin(); it != fRunNumber.end(); it++)
808 {
809 if (it->reportFile->is_open())
810 fFileSizesMap[it->reportName] = GetFileSize(it->reportName);
811#ifdef RUN_LOGS
812 if (it->logFile->is_open())
813 fFileSizesMap[it->logName] = GetFileSize(it->logName);
814#endif
815 }
816
817 bool shouldWarn = false;
818 struct statvfs vfs;
819 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
820 statVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
821 else
822 {
823 ostringstream str;
824 str << "Unable to retrieve stats for " << fNightlyFilePath << ". Reason: " << strerror(errno) << " [" << errno << "]";
825 if (!shouldWarn)
826 Error(str);
827 statVar.freeSpace = -1;
828 }
829 //sum up all the file sizes. past and present
830 statVar.sizeWritten = 0;
831 for (map<string, long>::const_iterator it=fFileSizesMap.begin(); it != fFileSizesMap.end(); it++)
832 statVar.sizeWritten += it->second;
833 statVar.sizeWritten -= fBaseSizeNightly;
834 statVar.sizeWritten -= fBaseSizeRun;
835
836//mutex
837// if (!isPrinting)
838// fStatsMutex.unlock();
839
840 return shouldWarn;
841}
842
843// --------------------------------------------------------------------------
844//
845//! Monitor the number of opened files and total size written, and distributes this data through a Dim service
846//
847//
848/*
849void DataLogger::ServicesMonitoring()
850{
851 struct statvfs vfs;
852 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
853 fStatVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
854 else
855 fStatVar.freeSpace = -1;
856
857 DimDescribedService srvc ("DATA_LOGGER/STATS", "X:3", fStatVar, "Add description here");
858 fPreviousSize = 0;
859 //loop-wait for broadcast
860 while (fContinueMonitoring)
861 {
862 if (fStatsPeriodDuration == 0.0f)
863 {
864 sleep(0.1f);
865 continue;
866 }
867
868 sleep(fStatsPeriodDuration);
869
870
871 fStatsMutex.lock();
872
873 if (fStatVar.writingRate != 0) //if data has been written
874 {
875 srvc.updateService();
876
877 if(fDebugIsOn)
878 {
879 ostringstream str;
880 str << "Size written: " << fStatVar.sizeWritten/1000 << " kB; writing rate: ";
881 str << fStatVar.writingRate/1000 << " kB/s; free space: ";
882 str << fStatVar.freeSpace/(1000*1000) << " MB";
883 Debug(str);
884 }
885 }
886 fStatsMutex.unlock();
887 }
888}
889*/
890// --------------------------------------------------------------------------
891//
892//! Default constructor. The name of the machine is given DATA_LOGGER
893//! and the state is set to kSM_Ready at the end of the function.
894//
895//!Setup the allows states, configs and transitions for the data logger
896//
897DataLogger::DataLogger(ostream &out) : StateMachineDim(out, "DATA_LOGGER")
898{
899 //initialize member data
900 fNightlyFilePath = ".";
901 fRunFilePath = ".";
902
903 //Give a name to this machine's specific states
904 AddStateName(kSM_NightlyOpen, "NightlyFileOpen", "The summary files for the night are open.");
905 AddStateName(kSM_WaitingRun, "WaitForRun", "The summary files for the night are open and we wait for a run to be started.");
906 AddStateName(kSM_Logging, "Logging", "The summary files for the night and the files for a single run are open.");
907 AddStateName(kSM_BadNightlyConfig, "ErrNightlyFolder", "The folder for the nighly summary files is invalid.");
908 AddStateName(kSM_BadRunConfig, "ErrRunFolder", "The folder for the run files is invalid.");
909 AddStateName(kSM_WriteError, "ErrWrite", "An error occured while writing to a file.");
910
911 // Add the possible transitions for this machine
912 AddEvent(kSM_NightlyOpen, "START", kSM_Ready, kSM_BadNightlyConfig)
913 (boost::bind(&DataLogger::StartPlease, this))
914 ("Start the nightly logging. Nightly file location must be specified already");
915
916 AddEvent(kSM_Ready, "STOP", kSM_NightlyOpen, kSM_WaitingRun, kSM_Logging, kSM_WriteError)
917 (boost::bind(&DataLogger::GoToReadyPlease, this))
918 ("Stop all data logging, close all files.");
919
920 AddEvent(kSM_Logging, "START_RUN", kSM_WaitingRun, kSM_BadRunConfig)
921 (boost::bind(&DataLogger::StartRunPlease, this))
922 ("Start the run logging. Run file location must be specified already.");
923
924 AddEvent(kSM_WaitingRun, "STOP_RUN", kSM_Logging)
925 (boost::bind(&DataLogger::StopRunPlease, this))
926 ("Wait for a run to be started, open run-files as soon as a run number arrives.");
927
928 AddEvent(kSM_Ready, "RESET", kSM_Error, kSM_BadNightlyConfig, kSM_BadRunConfig, kSM_WriteError)
929 (boost::bind(&DataLogger::GoToReadyPlease, this))
930 ("Transition to exit error states. Closes the any open file.");
931
932 AddEvent(kSM_WaitingRun, "WAIT_FOR_RUN_NUMBER", kSM_NightlyOpen)
933 (boost::bind(&DataLogger::NightlyToWaitRunPlease, this))
934 ("Go to waiting for run number state. In this state with any received run-number a new file is opened.");
935
936 // Add the possible configurations for this machine
937 AddEvent("SET_NIGHTLY_FOLDER", "C", kSM_Ready, kSM_BadNightlyConfig)
938 (boost::bind(&DataLogger::ConfigureNightlyFileName, this, _1))
939 ("Configure the base folder for the nightly files."
940 "|Path[string]:Absolute or relative path name where the nightly files should be stored.");
941
942 AddEvent("SET_RUN_FOLDER", "C", kSM_Ready, kSM_BadNightlyConfig, kSM_NightlyOpen, kSM_WaitingRun, kSM_BadRunConfig)
943 (boost::bind(&DataLogger::ConfigureRunFileName, this, _1))
944 ("Configure the base folder for the run files."
945 "|Path[string]:Absolute or relative path name where the run files should be stored.");
946
947 AddEvent("SET_RUN_NUMBER", "X", kSM_Ready, kSM_NightlyOpen, kSM_WaitingRun, kSM_BadRunConfig, kSM_Logging)
948 (boost::bind(&DataLogger::ConfigureRunNumber, this, _1))
949 ("Configure the run number. Cannot be done in logging state");
950
951 // Provide a print command
952 AddEvent("PRINT")
953 (boost::bind(&DataLogger::PrintStatePlease, this, _1))
954 ("Print information about the internal status of the data logger.");
955
956 OpenFileToDim fToDim;
957 fToDim.code = 0;
958 fToDim.fileName[0] = '\0';
959
960 fOpenedNightlyFiles = new DimDescribedService(GetName() + "/FILENAME_NIGHTLY", "I:1;C", fToDim,
961 "Path and base name which is used to compile the filenames for the nightly files."
962 "|Type[int]:type of open files (1=log, 2=rep, 4=fits)"
963 "|Name[string]:path and base file name");
964
965 fOpenedRunFiles = new DimDescribedService(GetName() + "/FILENAME_RUN", "I:1;C", fToDim,
966 "Path and base name which is used to compile the filenames for the run files."
967 "|Type[int]:type of open files (1=log, 2=rep, 4=fits)"
968 "|Name[string]:path and base file name");
969
970 fNumSubAndFitsData.numSubscriptions = 0;
971 fNumSubAndFitsData.numOpenFits = 0;
972 fNumSubAndFits = new DimDescribedService(GetName() + "/NUM_SUBS", "I:2", fNumSubAndFitsData,
973 "Shows number of services to which the data logger is currently subscribed and the total number of open files."
974 "|Subscriptions[int]:number of dim services to which the data logger is currently subscribed."
975 "|NumOpenFiles[int]:number of files currently open by the data logger");
976
977 //services parameters
978 fDebugIsOn = false;
979 fStatsPeriodDuration = 1.0f;
980 fOpenedFilesIsOn = true;
981 fNumSubAndFitsIsOn = true;
982
983 // provide services control commands
984 AddEvent("SET_DEUG_MODE", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
985 (boost::bind(&DataLogger::SetDebugOnOff, this, _1))
986 ("Switch debug mode on or off. Debug mode prints ifnormation about every service written to a file."
987 "|Enable[bool]:Enable of disable debug mode (yes/no).");
988
989 AddEvent("SET_STATISTICS_UPDATE_INTERVAL", "F", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
990 (boost::bind(&DataLogger::SetStatsPeriod, this, _1))
991 ("Interval in which the data-logger statistics service (STATS) is updated."
992 "|Interval[s]:Floating point value in seconds.");
993
994 AddEvent("ENABLE_FILENAME_SERVICES", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
995 (boost::bind(&DataLogger::SetOpenedFilesOnOff ,this, _1))
996 ("Switch service which distributes information about the open files on or off."
997 "|Enable[bool]:Enable of disable filename services (yes/no).");
998
999 AddEvent("ENABLE_NUMSUBS_SERVICE", "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready)
1000 (boost::bind(&DataLogger::SetNumSubsAndFitsOnOff, this, _1))
1001 ("Switch the service which distributes information about the number of subscriptions and open files on or off."
1002 "|Enable[bool]:Enable of disable NUM_SUBS service (yes/no).");
1003
1004 AddEvent("SET_RUN_TIMEOUT", "L:1", kSM_Ready, kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun)
1005 (boost::bind(&DataLogger::SetRunTimeoutDelay, this, _1))
1006 ("Set the timeout delay for old run numbers."
1007 "|timeout[min]:Time out in minutes after which files for expired runs are closed.");
1008
1009 fDestructing = false;
1010
1011 //start the monitoring service
1012 fStatVar.sizeWritten = 0;
1013 fStatVar.freeSpace = 0;
1014 fStatVar.writingRate = 0;
1015 fPreviousStatsUpdateTime = Time().Mjd();
1016 fPreviousOldRunNumberCheck = Time().Mjd();
1017 fPreviousSize = 0;
1018
1019 struct statvfs vfs;
1020 if (!statvfs(fNightlyFilePath.c_str(), &vfs))
1021 fStatVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
1022 else
1023 fStatVar.freeSpace = -1;
1024
1025 fStatsMonitoring = new DimDescribedService(GetName() + "/STATS", "X:3", fStatVar, "Add description here");
1026
1027//mutex
1028// fContinueMonitoring = true;
1029// fMonitoringThread = boost::thread(boost::bind(&DataLogger::ServicesMonitoring, this));
1030 fBaseSizeNightly = 0;
1031 fBaseSizeRun = 0;
1032 fDailyFileDayChangedAlready = true;
1033 fRunNumberTimeout = 1;
1034 if(fDebugIsOn)
1035 {
1036 Debug("DataLogger Init Done.");
1037 }
1038}
1039
1040// --------------------------------------------------------------------------
1041//
1042//! Destructor
1043//
1044DataLogger::~DataLogger()
1045{
1046 if (fDebugIsOn)
1047 {
1048 Debug("DataLogger destruction starts");
1049 }
1050 fDestructing = true;
1051 //first let's go to the ready state
1052 GoToReadyPlease();
1053 //release the services subscriptions
1054 fServiceSubscriptions.clear();
1055 //exit the monitoring loop
1056//mutex
1057// fContinueMonitoring = false;
1058// fMonitoringThread.join();
1059 //clear any remaining run number (should remain only one)
1060 while (fRunNumber.size() > 0)
1061 {
1062 RemoveOldestRunNumber();
1063 }
1064
1065 delete fOpenedNightlyFiles;
1066 delete fOpenedRunFiles;
1067 delete fNumSubAndFits;
1068
1069 if (fDebugIsOn)
1070 {
1071 Debug("DataLogger desctruction ends");
1072 }
1073}
1074// --------------------------------------------------------------------------
1075//
1076//! checks if the statistic service should be updated, and if so, do it
1077//
1078void DataLogger::UpdateStatisticsService()
1079{
1080 //update the fits files sizes
1081 const Time cTime = Time();
1082
1083 if ((fStatsPeriodDuration == 0) || ((cTime - fPreviousStatsUpdateTime).total_seconds() < fStatsPeriodDuration))
1084 return;
1085
1086 calculateTotalSizeWritten(fStatVar, false);
1087 fStatVar.writingRate = (fStatVar.sizeWritten - fPreviousSize)/((cTime - fPreviousStatsUpdateTime).total_seconds());
1088 fPreviousSize = fStatVar.sizeWritten;
1089 fPreviousStatsUpdateTime = cTime;
1090 //update the service. No need to check if data has been written, because some must have been, otherwise we would not have hit this piece of code
1091 fStatsMonitoring->updateService();
1092
1093 if(fDebugIsOn)
1094 {
1095 ostringstream str;
1096 str << "Size written: " << fStatVar.sizeWritten/1000 << " kB; writing rate: ";
1097 str << fStatVar.writingRate/1000 << " kB/s; free space: ";
1098 str << fStatVar.freeSpace/(1000*1000) << " MB";
1099 Debug(str);
1100 }
1101}
1102// --------------------------------------------------------------------------
1103//
1104//! checks if old run numbers should be trimmed and if so, do it
1105//
1106void DataLogger::TrimOldRunNumbers()
1107{
1108 const Time cTime = Time();
1109
1110 if ((cTime - fPreviousOldRunNumberCheck).total_seconds() < fRunNumberTimeout*60)
1111 return;
1112
1113 while (fRunNumber.size() > 1 && (cTime - fRunNumber.back().time) > boost::posix_time::minutes(fRunNumberTimeout))
1114 {
1115 RemoveOldestRunNumber();
1116 }
1117 fPreviousOldRunNumberCheck = cTime;
1118}
1119// --------------------------------------------------------------------------
1120//
1121//! Inherited from DimInfo. Handles all the Infos to which we subscribed, and log them
1122//
1123void DataLogger::infoHandler()
1124{
1125 // Make sure getTimestamp is called _before_ getTimestampMillisecs
1126 if (fDestructing)
1127 return;
1128
1129 DimInfo* I = getInfo();
1130
1131 if (I==NULL)
1132 return;
1133 //check if the service pointer corresponds to something that we subscribed to
1134 //this is a fix for a bug that provides bad Infos when a server starts
1135 bool found = false;
1136 SubscriptionsListType::iterator x;
1137 map<string, SubscriptionType>::iterator y;
1138 for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++)
1139 {//find current service is subscriptions
1140 for (y=x->second.begin(); y!=x->second.end();y++)
1141 if ((y->second.dimInfo).get() == I)
1142 {
1143 found = true;
1144 break;
1145 }
1146 if (found)
1147 break;
1148 }
1149 if (!found)
1150 {
1151 DimServiceInfoList::infoHandler();
1152 return;
1153 }
1154 if (I->getSize() <= 0)
1155 return;
1156
1157 // Make sure that getTimestampMillisecs is NEVER called before
1158 // getTimestamp is properly called
1159 // check that the message has been updated by something, i.e. must be different from its initial value
1160 if (I->getTimestamp() == 0)
1161 return;
1162
1163 // FIXME: Here we have to check if we have received the
1164 // service with the run-number.
1165 // CheckForRunNumber(I); has been removed because we have to
1166 // subscribe to this service anyway and hence we have the pointer
1167 // (no need to check for the name)
1168
1169 ReportPlease(I, y->second);
1170
1171 //update the fits files sizes
1172 UpdateStatisticsService();
1173
1174 //remove old run numbers
1175 TrimOldRunNumbers();
1176}
1177
1178bool DataLogger::OpenStream(shared_ptr<ofstream> stream, const string &filename)
1179{
1180 if (stream->is_open())
1181 {
1182 ostringstream str;
1183 str << filename << " was already open when trying to open it.";
1184 Error(str);
1185 return false;
1186 }
1187
1188 errno = 0;
1189 stream->open(filename.c_str(), ios_base::out | ios_base::app);
1190 if (errno != 0)
1191 {
1192 ostringstream str;
1193 str << "Unable to open " << filename << ": " << strerror(errno) << " (errno=" << errno << ")";
1194 Error(str);
1195 return false;
1196 }
1197
1198 if (!stream->is_open())
1199 {
1200 ostringstream str;
1201 str << "File " << filename << " not open as it ought to be.";
1202 Error(str);
1203 return false;
1204 }
1205
1206 return true;
1207}
1208
1209// --------------------------------------------------------------------------
1210//
1211//! Open the text files associated with the given run number
1212//! @param run the run number to be dealt with
1213//
1214int DataLogger::OpenRunFile(RunNumberType& run)
1215{
1216#ifdef RUN_LOGS
1217 // open log file
1218 run.logName = CompileFileName(fRunFilePath, run.runNumber, "", "log");
1219 if (!OpenStream(run.logFile, run.logName))
1220 return -1;
1221#endif
1222
1223 // open report file
1224 run.reportName = CompileFileName(fRunFilePath, run.runNumber, "", "rep");
1225 if (!OpenStream(run.reportFile, run.reportName))
1226 return -1;
1227
1228 //get the size of the newly opened file.
1229#ifdef RUN_LOGS
1230 RememberFileOrigSizePlease(run.logName, false);
1231#endif
1232 RememberFileOrigSizePlease(run.reportName, false);
1233
1234 //TODO this notification scheme might be messed up now.... fix it !
1235 const string baseFileName = CompileFileName(fRunFilePath, run.runNumber, "", "");
1236 NotifyOpenedFile(baseFileName, 3, fOpenedRunFiles);
1237 run.openedFits.clear();
1238 return 0;
1239}
1240// --------------------------------------------------------------------------
1241//
1242//! Add a new active run number
1243//! @param newRun the new run number
1244//! @param time the time at which the new run number was issued
1245//
1246void DataLogger::AddNewRunNumber(int64_t newRun, Time time)
1247{
1248
1249 if (newRun > 0xffffffff)
1250 {
1251 Error("New run number too large, out of range. Ignoring.");
1252 return;
1253 }
1254 if (fDebugIsOn)
1255 {
1256 ostringstream str;
1257 str << "Adding new run number " << newRun << " that was issued on " << time;
1258 Debug(str);
1259 }
1260 //Add new run number to run number list
1261 fRunNumber.push_back(RunNumberType());
1262 fRunNumber.back().runNumber = uint32_t(newRun);
1263 fRunNumber.back().time = time;
1264
1265 ostringstream str;
1266 str << "The new run number is: " << fRunNumber.back().runNumber;
1267 Message(str);
1268
1269 if (GetCurrentState() != kSM_Logging)
1270 return;
1271 //open the log and report files
1272 OpenRunFile(fRunNumber.back());
1273}
1274// --------------------------------------------------------------------------
1275//
1276//! Checks whether or not the current info is a run number.
1277//! If so, then remember it. A run number is required to open the run-log file
1278//! @param I
1279//! the current DimInfo
1280//
1281void DataLogger::CheckForRunNumber(DimInfo* I)
1282{
1283 if (strstr(I->getName(), "SET_RUN_NUMBER") != NULL)
1284 {//assumes that the run number is an integer
1285 //check if some run number entries can be deleted leave one so that two remain after adding the new one
1286 AddNewRunNumber(I->getLonglong(), Time(I->getTimestamp(), I->getTimestampMillisecs()*1000));
1287 }
1288}
1289
1290// --------------------------------------------------------------------------
1291//
1292//! write infos to log files.
1293//! @param I
1294//! The current DimInfo
1295//! @param sub
1296//! The dataLogger's subscription corresponding to this DimInfo
1297//
1298void DataLogger::ReportPlease(DimInfo* I, SubscriptionType& sub)
1299{
1300
1301
1302 //should we log or report this info ? (i.e. is it a message ?)
1303 bool isItaReport = ((strstr(I->getName(), "Message") == NULL) && (strstr(I->getName(), "MESSAGE") == NULL));
1304 if (I->getFormat()[0] == 'C')
1305 isItaReport = false;
1306
1307 if (!fNightlyReportFile.is_open())
1308 return;
1309
1310 //Check whether we should close and reopen daily text files or not
1311 //This should work in any case base of the following:
1312 // - fDailyFileDayChangedAlready is initialized to true. So if the dataLogger is started around noon, no file will be closed
1313 // - fDailyFileDayChangedAlready is set to false if (time != 12), so the file will be closed and reopened only if the logger runs since before noon (which is the required behavior)
1314 //This only applies to text files. Fits are closed and reopened based on the last and current service received time.
1315 //this was not applicable to text files, because as they gather several services, we have no guarantee that the received time will be greater than the previous one,
1316 //which could lead to several close/reopen instead of only one.
1317 if (Time().h() == 12 && !fDailyFileDayChangedAlready)
1318 {
1319 if (fDebugIsOn)
1320 Debug("Its Noon! Closing and reopening daily text files");
1321
1322 fNightlyLogFile.close();
1323 fNightlyReportFile.close();
1324
1325 fFullNightlyLogFileName = CompileFileName(fNightlyFilePath, "", "log");
1326 OpenTextFilePlease(fNightlyLogFile, fFullNightlyLogFileName);
1327
1328 fFullNightlyReportFileName = CompileFileName(fNightlyFilePath, "", "rep");
1329 OpenTextFilePlease(fNightlyReportFile, fFullNightlyReportFileName);
1330
1331 fDailyFileDayChangedAlready = true;
1332 }
1333 if (Time().h() != 12 && fDailyFileDayChangedAlready)
1334 fDailyFileDayChangedAlready = false;
1335
1336 //create the converter for that service
1337 if ((!sub.fConv.get()) && isItaReport)
1338 {
1339 //trick the converter in case of 'C'. why do I do this ? well simple: the converter checks that the right number
1340 //of bytes was written. because I skip 'C' with fits, the bytes will not be allocated, hence the "size copied ckeck"
1341 //of the converter will fail, hence throwing an exception.
1342 string fakeFormat(I->getFormat());
1343 if (fakeFormat[fakeFormat.size()-1] == 'C')
1344 fakeFormat = fakeFormat.substr(0, fakeFormat.size()-1);
1345 sub.fConv = shared_ptr<Converter>(new Converter(Out(), I->getFormat()));
1346 if (!sub.fConv)
1347 {
1348 ostringstream str;
1349 str << "Couldn't properly parse the format... service " << sub.dimInfo->getName() << " ignored.";
1350 Error(str);
1351 return;
1352 }
1353 }
1354 //construct the header
1355 ostringstream header;
1356 const Time cTime(I->getTimestamp(), I->getTimestampMillisecs()*1000);
1357 fQuality = I->getQuality();
1358 fMjD = cTime.Mjd();
1359
1360 //figure out which run file should be used
1361 ofstream* targetRunFile = NULL;
1362 RunNumberType* cRunNumber = NULL;
1363 if (GetCurrentState() == kSM_Logging)
1364 {
1365 list<RunNumberType>::reverse_iterator rit;
1366 for (rit=fRunNumber.rbegin(); rit!=fRunNumber.rend(); rit++)
1367 {
1368 if (rit->time < cTime) //this is the run number that we want to use
1369 {
1370 //Find something better to convert iterator to pointer than the ugly line below....
1371 cRunNumber = &(*rit);
1372 sub.runNumber = rit->runNumber;
1373#ifdef RUN_LOGS
1374 targetRunFile = isItaReport ? (rit->reportFile).get() : (rit->logFile).get();
1375#else
1376 targetRunFile = isItaReport ? (rit->reportFile).get() : NULL;
1377#endif
1378 break;
1379 }
1380 }
1381 if (rit == fRunNumber.rend() && fRunNumber.size() != 0)
1382 {
1383 ostringstream str;
1384 str << "Could not find an appropriate run number for info coming at time: " << cTime;
1385 Error(str);
1386 Error("Active run numbers: ");
1387 for (rit=fRunNumber.rbegin(); rit != fRunNumber.rend(); rit++)
1388 {
1389 str.str("");
1390 str << rit->runNumber;
1391 Error(str);
1392 }
1393
1394 }
1395 }
1396
1397 if (isItaReport)
1398 {
1399 //write text header
1400 header << I->getName() << " " << fQuality << " ";
1401 header << cTime.Y() << " " << cTime.M() << " " << cTime.D() << " ";
1402 header << cTime.h() << " " << cTime.m() << " " << cTime.s() << " ";
1403 header << cTime.ms() << " " << I->getTimestamp() << " ";
1404
1405 string text;
1406 try
1407 {
1408 text = sub.fConv->GetString(I->getData(), I->getSize());
1409 }
1410 catch (const runtime_error &e)
1411 {
1412 ostringstream str;
1413 str << "Parsing service " << sub.dimInfo->getName();
1414 str << " failed: " << e.what();
1415 Error(str);
1416 return;
1417 }
1418
1419 if (text.empty())
1420 {
1421 ostringstream str;
1422 str << "Service " << sub.dimInfo->getName() << " sent an empty string";
1423 Info(str);
1424 return;
1425 }
1426 //replace bizarre characters by white space
1427 replace(text.begin(), text.end(), '\n', '\\');
1428 replace_if(text.begin(), text.end(), ptr_fun<int, int>(&iscntrl), ' ');
1429
1430 //write entry to Nightly report
1431 if (fNightlyReportFile.is_open())
1432 {
1433 fNightlyReportFile << header.str() << text << endl;
1434 CheckForOfstreamError(fNightlyReportFile);
1435 }
1436 //write entry to run-report
1437 if (targetRunFile && targetRunFile->is_open())
1438 {
1439 *targetRunFile << header.str() << text << endl;
1440 CheckForOfstreamError(*targetRunFile);
1441 }
1442 }
1443 else
1444 {//write entry to both Nightly and run logs
1445 ostringstream msg;
1446 msg << I->getName() << ": " << I->getString();
1447
1448 if (fNightlyLogFile.is_open())
1449 {
1450 MessageImp(fNightlyLogFile).Write(cTime, msg.str().c_str(), fQuality);
1451 CheckForOfstreamError(fNightlyLogFile);
1452 }
1453 if (targetRunFile && targetRunFile->is_open())
1454 {
1455 MessageImp(*targetRunFile).Write(cTime, msg.str().c_str(), fQuality);
1456 CheckForOfstreamError(*targetRunFile);
1457 }
1458 }
1459
1460#ifdef HAVE_FITS
1461 if (isItaReport)
1462 {
1463 //check if the last received event was before noon and if current one is after noon.
1464 //if so, close the file so that it gets reopened.
1465 if (sub.nightlyFile.IsOpen())
1466 if ((sub.lastReceivedEvent != Time::None) && (sub.lastReceivedEvent.h() < 12) && (cTime.h() >= 12))
1467 {
1468 sub.nightlyFile.Close();
1469 }
1470 sub.lastReceivedEvent = cTime;
1471 if (!sub.nightlyFile.IsOpen() || !sub.runFile.IsOpen() || sub.runNumber != sub.runFile.fRunNumber)
1472 OpenFITSFilesPlease(sub, cRunNumber);
1473 WriteToFITS(sub);
1474 }
1475#endif
1476
1477}
1478
1479// --------------------------------------------------------------------------
1480//
1481//! print the dataLogger's current state. invoked by the PRINT command
1482//! @param evt
1483//! the current event. Not used by the method
1484//! @returns
1485//! the new state. Which, in that case, is the current state
1486//!
1487int DataLogger::PrintStatePlease(const Event& )
1488{
1489 Message("------------------------------------------");
1490 Message("------- DATA LOGGER CURRENT STATE --------");
1491 Message("------------------------------------------");
1492
1493 //print the path configuration
1494 Message("Nightly Path: " + boost::filesystem::system_complete(boost::filesystem::path(fNightlyFilePath)).directory_string());
1495 Message("Run Path: " + boost::filesystem::system_complete(boost::filesystem::path(fRunFilePath)).directory_string());
1496
1497 //print active run numbers
1498 ostringstream str;
1499 str << "Active Run Numbers:";
1500 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it!=fRunNumber.end(); it++)
1501 str << " " << it->runNumber;
1502 if (fRunNumber.size()==0)
1503 str << " <none>";
1504 Message(str);
1505 //timeout value
1506 str.str("");
1507 str << "Timeout delay for old run numbers: " << fRunNumberTimeout << " minute(s)";
1508 Message(str);
1509
1510 //print all the open files.
1511 Message("------------ OPENED FILES ----------------");
1512 if (fNightlyLogFile.is_open())
1513 Message("Nightly log-file: OPEN");
1514
1515 if (fNightlyReportFile.is_open())
1516 Message("Nightly report-file: OPEN");
1517
1518 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it!=fRunNumber.end(); it++)
1519 {
1520#ifdef RUN_LOGS
1521 if (it->logFile->is_open())
1522 Message("Run log-file: " + it->logName + " (OPEN)");
1523#endif
1524 if (it->reportFile->is_open())
1525 Message("Run report-file: " + it->reportName + " (OPEN)");
1526 }
1527
1528 DataLoggerStats statVar;
1529 /*const bool statWarning =*/ calculateTotalSizeWritten(statVar, true);
1530
1531 Message("----------------- STATS ------------------");
1532 str.str("");
1533 str << "Total Size written: " << statVar.sizeWritten << " bytes.";
1534 Message(str);
1535 str.str("");
1536 str << "Disk free space: " << statVar.freeSpace << " bytes.";
1537 Message(str);
1538 str.str("");
1539 str << "Statistics are updated every " << fStatsPeriodDuration << " seconds";
1540 if (fStatsPeriodDuration != 0)
1541 Message(str);
1542 else
1543 Message("Statistics updates are currently disabled");
1544
1545 Message("------------ DIM SUBSCRIPTIONS -----------");
1546 str.str("");
1547 str << "There are " << fNumSubAndFitsData.numSubscriptions << " active DIM subscriptions.";
1548 Message(str);
1549 for (map<const string, map<string, SubscriptionType> >::const_iterator it=fServiceSubscriptions.begin(); it!= fServiceSubscriptions.end();it++)
1550 {
1551 Message("Server "+it->first);
1552 for (map<string, SubscriptionType>::const_iterator it2=it->second.begin(); it2!=it->second.end(); it2++)
1553 Message(" -> "+it2->first);
1554 }
1555 Message("--------------- BLOCK LIST ---------------");
1556 for (set<string>::const_iterator it=fBlackList.begin(); it != fBlackList.end(); it++)
1557 Message(" -> "+*it);
1558 if (fBlackList.size()==0)
1559 Message(" <empty>");
1560
1561 Message("--------------- ALLOW LIST ---------------");
1562 for (set<string>::const_iterator it=fWhiteList.begin(); it != fWhiteList.end(); it++)
1563 Message(" -> "+*it);
1564 if (fWhiteList.size()==0)
1565 Message(" <empty>");
1566
1567 Message("-------------- GROUPING LIST -------------");
1568 Message("The following servers and/or services will");
1569 Message("be grouped into a single fits file:");
1570 for (set<string>::const_iterator it=fGrouping.begin(); it != fGrouping.end(); it++)
1571 Message(" -> "+*it);
1572 if (fGrouping.size()==0)
1573 Message(" <no grouping>");
1574
1575 Message("------------------------------------------");
1576 Message("-------- END OF DATA LOGGER STATE --------");
1577 Message("------------------------------------------");
1578
1579 return GetCurrentState();
1580}
1581
1582// --------------------------------------------------------------------------
1583//
1584//! turn debug mode on and off
1585//! @param evt
1586//! the current event. contains the instruction string: On, Off, on, off, ON, OFF, 0 or 1
1587//! @returns
1588//! the new state. Which, in that case, is the current state
1589//!
1590int DataLogger::SetDebugOnOff(const Event& evt)
1591{
1592 const bool backupDebug = fDebugIsOn;
1593
1594 fDebugIsOn = evt.GetBool();
1595
1596 if (fDebugIsOn == backupDebug)
1597 Message("Debug mode was already in the requested state.");
1598
1599 ostringstream str;
1600 str << "Debug mode is now " << fDebugIsOn;
1601 Message(str);
1602
1603 return GetCurrentState();
1604}
1605// --------------------------------------------------------------------------
1606//
1607//! set the statistics update period duration. 0 disables the statistics
1608//! @param evt
1609//! the current event. contains the new duration.
1610//! @returns
1611//! the new state. Which, in that case, is the current state
1612//!
1613int DataLogger::SetStatsPeriod(const Event& evt)
1614{
1615 const float backupDuration = fStatsPeriodDuration;
1616
1617 fStatsPeriodDuration = evt.GetFloat();
1618
1619 if (fStatsPeriodDuration < 0)
1620 {
1621 Error("Statistics period duration should be greater than zero. Discarding provided value.");
1622 fStatsPeriodDuration = backupDuration;
1623 return GetCurrentState();
1624 }
1625 if (!finite(fStatsPeriodDuration))// != fStatsPeriodDuration)
1626 {
1627 Error("Provided duration does not appear to be a valid float. Discarding it.");
1628 fStatsPeriodDuration = backupDuration;
1629 return GetCurrentState();
1630 }
1631 if (backupDuration == fStatsPeriodDuration)
1632 Warn("Statistics period not modified. Supplied value already in use.");
1633
1634 if (fStatsPeriodDuration == 0.0f)
1635 Message("Statistics are now OFF");
1636 else
1637 {
1638 ostringstream str;
1639 str << "Statistics period is now " << fStatsPeriodDuration << " seconds";
1640 Message(str);
1641 }
1642
1643 return GetCurrentState();
1644}
1645// --------------------------------------------------------------------------
1646//
1647//! set the opened files service on or off.
1648//! @param evt
1649//! the current event. contains the instruction string. similar to setdebugonoff
1650//! @returns
1651//! the new state. Which, in that case, is the current state
1652//!
1653int DataLogger::SetOpenedFilesOnOff(const Event& evt)
1654{
1655 const bool backupOpened = fOpenedFilesIsOn;
1656
1657 fOpenedFilesIsOn = evt.GetBool();
1658
1659 if (fOpenedFilesIsOn == backupOpened)
1660 Message("Opened files service mode was already in the requested state.");
1661
1662 ostringstream str;
1663 str << "Opened files service mode is now " << fOpenedFilesIsOn;
1664 Message(str);
1665
1666 return GetCurrentState();
1667}
1668
1669// --------------------------------------------------------------------------
1670//
1671//! set the number of subscriptions and opened fits on and off
1672//! @param evt
1673//! the current event. contains the instruction string. similar to setdebugonoff
1674//! @returns
1675//! the new state. Which, in that case, is the current state
1676//!
1677int DataLogger::SetNumSubsAndFitsOnOff(const Event& evt)
1678{
1679 const bool backupSubs = fNumSubAndFitsIsOn;
1680
1681 fNumSubAndFitsIsOn = evt.GetBool();
1682
1683 if (fNumSubAndFitsIsOn == backupSubs)
1684 Message("Number of subscriptions service mode was already in the requested state");
1685
1686 ostringstream str;
1687 str << "Number of subscriptions service mode is now " << fNumSubAndFitsIsOn;
1688 Message(str);
1689
1690 return GetCurrentState();
1691}
1692// --------------------------------------------------------------------------
1693//
1694//! set the timeout delay for old run numbers
1695//! @param evt
1696//! the current event. contains the timeout delay long value
1697//! @returns
1698//! the new state. Which, in that case, is the current state
1699//!
1700int DataLogger::SetRunTimeoutDelay(const Event& evt)
1701{
1702 const long backupTimeout = fRunNumberTimeout;
1703 fRunNumberTimeout = evt.GetXtra();
1704
1705 if (fRunNumberTimeout == 0)
1706 {
1707 fRunNumberTimeout = backupTimeout;
1708 Error("Timeout delays for old run numbers must be greater than 0. Ignored.");
1709 return GetCurrentState();
1710 }
1711
1712 if (fRunNumberTimeout == backupTimeout)
1713 Message("New timeout for old run numbers is same value as previous one.");
1714
1715 ostringstream str;
1716 str << "Timeout delay for old run numbers is now " << fRunNumberTimeout;
1717 Message(str);
1718
1719 return GetCurrentState();
1720}
1721
1722int DataLogger::ConfigureFileName(string &target, const string &type, const EventImp &evt)
1723{
1724 if (!evt.GetText())
1725 {
1726 Error("Empty "+type+" folder given. Please specify a valid path.");
1727 return GetCurrentState();
1728 }
1729
1730 const string givenPath = evt.GetText();
1731 if (!DoesPathExist(givenPath))
1732 {
1733 Error("Provided "+type+" path '"+givenPath+"' is not a valid folder. Ignored");
1734 return GetCurrentState();
1735 }
1736
1737 Message("New "+type+" folder specified: "+givenPath);
1738
1739 target = givenPath;
1740
1741 return GetCurrentState();
1742}
1743
1744// --------------------------------------------------------------------------
1745//
1746//! Sets the path to use for the Nightly log file.
1747//! @param evt
1748//! the event transporting the path
1749//! @returns
1750//! currently only the current state.
1751//
1752int DataLogger::ConfigureNightlyFileName(const Event& evt)
1753{
1754 return ConfigureFileName(fNightlyFilePath, "nightly", evt);
1755}
1756
1757// --------------------------------------------------------------------------
1758//
1759//! Sets the path to use for the run log file.
1760//! @param evt
1761//! the event transporting the path
1762//! @returns
1763//! currently only the current state
1764int DataLogger::ConfigureRunFileName(const Event& evt)
1765{
1766 return ConfigureFileName(fRunFilePath, "run", evt);
1767}
1768
1769// --------------------------------------------------------------------------
1770//
1771//! Sets the run number.
1772//! @param evt
1773//! the event transporting the run number
1774//! @returns
1775//! currently only the current state
1776int DataLogger::ConfigureRunNumber(const Event& evt)
1777{
1778 AddNewRunNumber(evt.GetXtra(), evt.GetTime());
1779 return GetCurrentState();
1780}
1781// --------------------------------------------------------------------------
1782//
1783//! Notifies the DIM service that a particular file was opened
1784//! @ param name the base name of the opened file, i.e. without path nor extension.
1785//! WARNING: use string instead of string& because I pass values that do not convert to string&.
1786//! this is not a problem though because file are not opened so often.
1787//! @ param type the type of the opened file. 0 = none open, 1 = log, 2 = text, 4 = fits
1788inline void DataLogger::NotifyOpenedFile(const string &name, int type, DimDescribedService* service)
1789{
1790 if (!fOpenedFilesIsOn)
1791 return;
1792
1793 if (fDebugIsOn)
1794 {
1795 ostringstream str;
1796 str << "Updating " << service->getName() << " file '" << name << "' (type=" << type << ")";
1797 Debug(str);
1798
1799 str.str("");
1800 str << "Num subs: " << fNumSubAndFitsData.numSubscriptions << " Num open FITS: " << fNumSubAndFitsData.numOpenFits;
1801 Debug(str);
1802 }
1803
1804 if (name.size()+1 > FILENAME_MAX)
1805 {
1806 Error("Provided file name '" + name + "' is longer than allowed file name length.");
1807 return;
1808 }
1809
1810 OpenFileToDim fToDim;
1811 fToDim.code = type;
1812 memcpy(fToDim.fileName, name.c_str(), name.size()+1);
1813
1814 service->setData(reinterpret_cast<void*>(&fToDim), name.size()+1+sizeof(int));
1815 service->setQuality(0);
1816 service->updateService();
1817}
1818// --------------------------------------------------------------------------
1819//
1820//! Implements the Start transition.
1821//! Concatenates the given path for the Nightly file and the filename itself (based on the day),
1822//! and tries to open it.
1823//! @returns
1824//! kSM_NightlyOpen if success, kSM_BadNightlyConfig if failure
1825int DataLogger::StartPlease()
1826{
1827 if (fDebugIsOn)
1828 {
1829 Debug("Starting...");
1830 }
1831 fFullNightlyLogFileName = CompileFileName(fNightlyFilePath, "", "log");
1832 OpenTextFilePlease(fNightlyLogFile, fFullNightlyLogFileName);
1833
1834 fFullNightlyReportFileName = CompileFileName(fNightlyFilePath, "", "rep");
1835 OpenTextFilePlease(fNightlyReportFile, fFullNightlyReportFileName);
1836
1837 if (!fNightlyLogFile.is_open() || !fNightlyReportFile.is_open())
1838 {
1839 ostringstream str;
1840 str << "Something went wrong while openning nightly files " << fFullNightlyLogFileName << " and " << fFullNightlyReportFileName;
1841 Error(str);
1842 return kSM_BadNightlyConfig;
1843 }
1844 //get the size of the newly opened file.
1845 fBaseSizeNightly = GetFileSize(fFullNightlyLogFileName);
1846 fBaseSizeNightly += GetFileSize(fFullNightlyReportFileName);
1847 fFileSizesMap.clear();
1848 fBaseSizeRun = 0;
1849 fPreviousSize = 0;
1850
1851 //notify that a new file has been opened.
1852 const string baseFileName = CompileFileName(fNightlyFilePath, "", "");
1853 NotifyOpenedFile(baseFileName, 3, fOpenedNightlyFiles);
1854
1855 fOpenedNightlyFits.clear();
1856
1857 return kSM_NightlyOpen;
1858}
1859
1860#ifdef HAVE_FITS
1861// --------------------------------------------------------------------------
1862//
1863//! open if required a the FITS files corresponding to a given subscription
1864//! @param sub
1865//! the current DimInfo subscription being examined
1866void DataLogger::OpenFITSFilesPlease(SubscriptionType& sub, RunNumberType* cRunNumber)
1867{
1868 string serviceName(sub.dimInfo->getName());
1869
1870 //if run number has changed, reopen a new fits file with the correct run number.
1871 if (sub.runFile.IsOpen() && sub.runFile.fRunNumber != sub.runNumber)
1872 {
1873 if (fDebugIsOn)
1874 Debug("Run number changed. Closing " + sub.runFile.fFileName);
1875 sub.runFile.Close();
1876 }
1877
1878 //we must check if we should group this service subscription to a single fits file before we replace the / by _
1879 bool hasGrouping = false;
1880 if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging))
1881 {//will we find this service in the grouping list ?
1882 for (set<string>::const_iterator it=fGrouping.begin(); it!=fGrouping.end(); it++)
1883 {
1884 if (serviceName.find(*it) != string::npos)
1885 {
1886 hasGrouping = true;
1887 break;
1888 }
1889 }
1890 }
1891 hasGrouping = true;
1892 for (unsigned int i=0;i<serviceName.size(); i++)
1893 {
1894 if (serviceName[i] == '/')
1895 {
1896 serviceName[i] = '_';
1897 break;
1898 }
1899 }
1900 //we open the NightlyFile anyway, otherwise this function shouldn't have been called.
1901 if (!sub.nightlyFile.IsOpen())
1902 {
1903 string partialName = CompileFileName(fNightlyFilePath, serviceName, "fits");
1904
1905 const string fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1906 if (!sub.fitsBufferAllocated)
1907 AllocateFITSBuffers(sub);
1908 //get the size of the file we're about to open
1909 if (RememberFileOrigSizePlease(partialName, true))//and remember that the file was opened (i.e. not an update)
1910 fOpenedNightlyFits[fileNameOnly].push_back(serviceName);
1911
1912 if (!sub.nightlyFile.Open(partialName, serviceName, NULL, &fNumSubAndFitsData.numOpenFits, this, 0))
1913 {
1914 SetCurrentState(kSM_WriteError);
1915 return;
1916 }
1917 //notify the opening
1918 const string baseFileName = CompileFileName(fNightlyFilePath, "", "");
1919 NotifyOpenedFile(baseFileName, 7, fOpenedNightlyFiles);
1920 if (fNumSubAndFitsIsOn)
1921 fNumSubAndFits->updateService();
1922 if (fDebugIsOn)
1923 {
1924 ostringstream str;
1925 str << "Opened Nightly FITS: " << partialName << " and table: " << serviceName << ".current number of opened FITS: " << fNumSubAndFitsData.numOpenFits;
1926 Debug(str);
1927 }
1928 }
1929 //do the actual file open
1930 if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging) && sub.runNumber != 0)
1931 {//buffer for the run file have already been allocated when doing the Nightly file
1932 string fileNameOnly;
1933 string partialName;
1934 if (hasGrouping)
1935 {
1936 partialName = CompileFileName(fRunFilePath, sub.runNumber, "group", "fits");
1937 fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1938 }
1939 else
1940 {
1941 partialName = CompileFileName(fRunFilePath, sub.runNumber, serviceName, "fits");
1942 fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
1943 }
1944 //get the size of the file we're about to open
1945 if (RememberFileOrigSizePlease(partialName, false))//and remember that the file was opened (i.e. not an update)
1946 cRunNumber->openedFits[fileNameOnly].push_back(serviceName);
1947 else
1948 if (hasGrouping)
1949 {
1950 cRunNumber->addServiceToOpenedFits(fileNameOnly, serviceName);
1951 }
1952
1953 if (hasGrouping && (!cRunNumber->runFitsFile.get()))
1954 try
1955 {
1956 cRunNumber->runFitsFile = shared_ptr<CCfits::FITS>(new CCfits::FITS(partialName, CCfits::RWmode::Write));
1957 (fNumSubAndFitsData.numOpenFits)++;
1958 }
1959 catch (CCfits::FitsException e)
1960 {
1961 ostringstream str;
1962 str << "Could not open FITS Run file " << partialName << " reason: " << e.message();
1963 Error(str);
1964 cRunNumber->runFitsFile = shared_ptr<CCfits::FITS>();//NULL;
1965 }
1966
1967 const string baseFileName = CompileFileName(fRunFilePath, sub.runNumber, "", "");
1968 NotifyOpenedFile(baseFileName, 7, fOpenedRunFiles);// + '_' + serviceName, 4);
1969 if (hasGrouping)
1970 {
1971 if (!sub.runFile.Open(partialName, serviceName, (cRunNumber->runFitsFile).get(), &fNumSubAndFitsData.numOpenFits, this, sub.runNumber))
1972 {
1973 SetCurrentState(kSM_WriteError);
1974 return;
1975 }
1976 }
1977 else
1978 {
1979 if (sub.runFile.Open(partialName, serviceName, NULL, &fNumSubAndFitsData.numOpenFits, this, sub.runNumber))
1980 {
1981 SetCurrentState(kSM_WriteError);
1982 return;
1983 }
1984 }
1985 if (fNumSubAndFitsIsOn)
1986 fNumSubAndFits->updateService();
1987 if (fDebugIsOn)
1988 {
1989 ostringstream str;
1990 str << "Opened Run FITS: " << partialName << " and table: " << serviceName << ".current number of opened FITS: " << fNumSubAndFitsData.numOpenFits;
1991 Debug(str);
1992 }
1993 }
1994}
1995// --------------------------------------------------------------------------
1996//
1997//! Allocates the required memory for a given pair of fits files (nightly and run)
1998//! @param sub the subscription of interest.
1999//
2000void DataLogger::AllocateFITSBuffers(SubscriptionType& sub)
2001{
2002 //Init the time columns of the file
2003 Description dateDesc(string("Time"), string("Modified Julian Date"), string("MjD"));
2004 sub.nightlyFile.AddStandardColumn(dateDesc, "1D", &fMjD, sizeof(double));
2005 sub.runFile.AddStandardColumn(dateDesc, "1D", &fMjD, sizeof(double));
2006
2007 Description QoSDesc("Qos", "Quality of service", "None");
2008 sub.nightlyFile.AddStandardColumn(QoSDesc, "1J", &fQuality, sizeof(int));
2009 sub.runFile.AddStandardColumn(QoSDesc, "1J", &fQuality, sizeof(int));
2010
2011 const Converter::FormatList flist = sub.fConv->GetList();
2012 // Compilation failed
2013 if (!sub.fConv->valid())
2014 {
2015 Error("Compilation of format string failed.");
2016 return;
2017 }
2018
2019 //we've got a nice structure describing the format of this service's messages.
2020 //Let's create the appropriate FITS columns
2021 int size = sub.dimInfo->getSize();
2022
2023 vector<string> dataFormatsLocal;
2024 for (unsigned int i=0;i<flist.size()-1;i++)
2025 {
2026 ostringstream dataQualifier;
2027
2028 dataQualifier << flist[i].second.first;
2029 switch (flist[i].first.first->name()[0])
2030 {
2031 case 'c':
2032 case 'C':
2033 dataQualifier.str("S");
2034 break;
2035 case 's':
2036 dataQualifier << "I";
2037 break;
2038 case 'i':
2039 case 'I':
2040 dataQualifier << "J";
2041 break;
2042 case 'l':
2043 case 'L':
2044 dataQualifier << "J";
2045 break;
2046 case 'f':
2047 case 'F':
2048 dataQualifier << "E";
2049 break;
2050 case 'd':
2051 case 'D':
2052 dataQualifier << "D";
2053 break;
2054 case 'x':
2055 case 'X':
2056 dataQualifier << "K";
2057 break;
2058 case 'S':
2059 size--;
2060 //for strings, the number of elements I get is wrong. Correct it
2061 dataQualifier.str(""); //clear
2062 dataQualifier << size << "A";
2063 break;
2064
2065 default:
2066 Fatal("THIS SHOULD NEVER BE REACHED. dataLogger.cc ln 1198.");
2067 };
2068 //we skip the variable length strings for now (in fits only)
2069 if (dataQualifier.str() != "S")
2070 dataFormatsLocal.push_back(dataQualifier.str());
2071 }
2072 sub.nightlyFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), size);
2073 sub.runFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), size);
2074 sub.fitsBufferAllocated = true;
2075}
2076// --------------------------------------------------------------------------
2077//
2078//! write a dimInfo data to its corresponding FITS files
2079//
2080void DataLogger::WriteToFITS(SubscriptionType& sub)
2081{
2082 //nightly File status (open or not) already checked
2083 if (sub.nightlyFile.IsOpen())
2084 {
2085 if (!sub.nightlyFile.Write(sub.fConv.get()))
2086 SetCurrentState(kSM_WriteError);
2087 }
2088
2089 if (sub.runFile.IsOpen())
2090 {
2091 if (!sub.runFile.Write(sub.fConv.get()))
2092 SetCurrentState(kSM_WriteError);
2093 }
2094}
2095#endif //if has_fits
2096
2097std::string DataLogger::SetCurrentState(int state, const char *txt, const std::string &cmd)
2098{
2099 if (state == kSM_WriteError && GetCurrentState() == kSM_WriteError)
2100 return "";
2101 return StateMachineImp::SetCurrentState(state, txt, cmd);
2102}
2103// --------------------------------------------------------------------------
2104//
2105//! Implements the StartRun transition.
2106//! Concatenates the given path for the run file and the filename itself (based on the run number),
2107//! and tries to open it.
2108//! @returns
2109//! kSM_Logging if success, kSM_BadRunConfig if failure.
2110int DataLogger::StartRunPlease()
2111{
2112 if (fDebugIsOn)
2113 {
2114 Debug("Starting Run Logging...");
2115 }
2116 //open all the relevant run-files. i.e. all the files associated with run numbers.
2117 for (list<RunNumberType>::iterator it=fRunNumber.begin(); it != fRunNumber.end(); it++)
2118 OpenRunFile(*it);
2119
2120 return kSM_Logging;
2121}
2122
2123#ifdef HAVE_FITS
2124// --------------------------------------------------------------------------
2125//
2126//! Create a fits group file with all the run-fits that were written (either daily or run)
2127//! @param filesToGroup a map of filenames mapping to table names to be grouped (i.e. a
2128//! single file can contain several tables to group
2129//! @param runNumber the run number that should be used for grouping. 0 means nightly group
2130//
2131void DataLogger::CreateFitsGrouping(map<string, vector<string> > & filesToGroup, int runNumber)
2132{
2133 if (fDebugIsOn)
2134 {
2135 ostringstream str;
2136 str << "Creating fits group for ";
2137 if (runNumber != 0)
2138 str << "run files";
2139 else
2140 str << "nightly files";
2141 Debug(str);
2142 }
2143 //create the FITS group corresponding to the ending run.
2144 CCfits::FITS* groupFile;
2145 unsigned int numFilesToGroup = 0;
2146 for (map<string, vector<string> >::const_iterator it=filesToGroup.begin(); it != filesToGroup.end(); it++)
2147 {
2148 numFilesToGroup += it->second.size();
2149 }
2150 if (fDebugIsOn)
2151 {
2152 ostringstream str;
2153 str << "There are " << numFilesToGroup << " tables to group";
2154 Debug(str);
2155 }
2156 if (numFilesToGroup <= 1)
2157 {
2158 filesToGroup.clear();
2159 return;
2160 }
2161 string groupName;
2162 if (runNumber != 0)
2163 groupName = CompileFileName(fRunFilePath, runNumber, "", "fits");
2164 else
2165 groupName = CompileFileName(fNightlyFilePath, "", "fits");
2166
2167 CCfits::Table* groupTable;
2168 int maxCharLength = 50;//FILENAME_MAX;
2169 try
2170 {
2171 groupFile = new CCfits::FITS(groupName, CCfits::RWmode::Write);
2172 //setup the column names
2173 ostringstream pathTypeName;
2174 pathTypeName << maxCharLength << "A";
2175 vector<string> names;
2176 vector<string> dataTypes;
2177 names.push_back("MEMBER_XTENSION");
2178 dataTypes.push_back("8A");
2179 names.push_back("MEMBER_URI_TYPE");
2180 dataTypes.push_back("3A");
2181 names.push_back("MEMBER_LOCATION");
2182 dataTypes.push_back(pathTypeName.str());
2183 names.push_back("MEMBER_NAME");
2184 dataTypes.push_back(pathTypeName.str());
2185
2186 groupTable = groupFile->addTable("GROUPING", numFilesToGroup, names, dataTypes);
2187//TODO handle the case when the logger was stopped and restarted during the same day, i.e. the grouping file must be updated
2188 }
2189 catch (CCfits::FitsException e)
2190 {
2191 ostringstream str;
2192 str << "Could not open or create FITS table GROUPING in file " << groupName << " reason: " << e.message();
2193 Error(str);
2194 return;
2195 }
2196
2197 //CCfits seems to be buggy somehow: can't use the column's function "write": it create a compilation error: maybe strings were not thought about.
2198 //use cfitsio routines instead
2199 groupTable->makeThisCurrent();
2200 //create appropriate buffer.
2201 const unsigned int n = 8 + 3 + 2*maxCharLength + 1; //+1 for trailling character
2202
2203 vector<unsigned char> realBuffer;
2204 realBuffer.resize(n);
2205 unsigned char* fitsBuffer = &realBuffer[0];
2206 memset(fitsBuffer, 0, n);
2207
2208 char* startOfExtension = reinterpret_cast<char*>(fitsBuffer);
2209 char* startOfURI = reinterpret_cast<char*>(&fitsBuffer[8]);
2210 char* startOfLocation = reinterpret_cast<char*>(&fitsBuffer[8 + 3]);
2211 char* startOfName = reinterpret_cast<char*>(&fitsBuffer[8+3+maxCharLength]);
2212
2213 strcpy(startOfExtension, "BINTABLE");
2214 strcpy(startOfURI, "URL");
2215
2216 int i=1;
2217 for (map<string, vector<string> >::const_iterator it=filesToGroup.begin(); it!=filesToGroup.end(); it++)
2218 for (vector<string>::const_iterator jt=it->second.begin(); jt != it->second.end(); jt++, i++)
2219 {
2220 strcpy(startOfLocation, it->first.c_str());
2221 strcpy(startOfName, jt->c_str());
2222
2223 if (fDebugIsOn)
2224 {
2225 ostringstream str;
2226 str << "Grouping " << it->first << " " << *jt;
2227 Debug(str);
2228 }
2229
2230 int status = 0;
2231 fits_write_tblbytes(groupFile->fitsPointer(), i, 1, 8+3+2*maxCharLength, fitsBuffer, &status);
2232 if (status)
2233 {
2234 ostringstream str;
2235 str << "Could not write row #" << i << "In the fits grouping file " << groupName << ". Cfitsio error code: " << status;
2236 Error(str);
2237 }
2238 }
2239
2240 filesToGroup.clear();
2241 delete groupFile;
2242}
2243#endif //HAVE_FITS
2244
2245// --------------------------------------------------------------------------
2246//
2247//! Implements the StopRun transition.
2248//! Attempts to close the run file.
2249//! @returns
2250//! kSM_WaitingRun if success, kSM_FatalError otherwise
2251int DataLogger::StopRunPlease()
2252{
2253
2254 if (fDebugIsOn)
2255 {
2256 Debug("Stopping Run Logging...");
2257 }
2258 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it != fRunNumber.end(); it++)
2259 {
2260#ifdef RUN_LOGS
2261 if (!it->logFile->is_open() || !it->reportFile->is_open())
2262#else
2263 if (!it->reportFile->is_open())
2264#endif
2265 return kSM_FatalError;
2266#ifdef RUN_LOGS
2267 it->logFile->close();
2268#endif
2269 it->reportFile->close();
2270 }
2271
2272#ifdef HAVE_FITS
2273 for (SubscriptionsListType::iterator i = fServiceSubscriptions.begin(); i != fServiceSubscriptions.end(); i++)
2274 for (map<string, SubscriptionType>::iterator j = i->second.begin(); j != i->second.end(); j++)
2275 {
2276 if (j->second.runFile.IsOpen())
2277 j->second.runFile.Close();
2278 }
2279#endif
2280 NotifyOpenedFile("", 0, fOpenedRunFiles);
2281 if (fNumSubAndFitsIsOn)
2282 fNumSubAndFits->updateService();
2283
2284 while (fRunNumber.size() > 0)
2285 {
2286 RemoveOldestRunNumber();
2287 }
2288
2289 return kSM_WaitingRun;
2290}
2291// --------------------------------------------------------------------------
2292//
2293//! Implements the Stop and Reset transitions.
2294//! Attempts to close any openned file.
2295//! @returns
2296//! kSM_Ready
2297int DataLogger::GoToReadyPlease()
2298{
2299 if (fDebugIsOn)
2300 {
2301 Debug("Going to the Ready state...");
2302 }
2303 if (GetCurrentState() == kSM_Logging)
2304 StopRunPlease();
2305
2306 if (fNightlyLogFile.is_open())
2307 fNightlyLogFile.close();
2308 if (fNightlyReportFile.is_open())
2309 fNightlyReportFile.close();
2310
2311#ifdef HAVE_FITS
2312 for (SubscriptionsListType::iterator i = fServiceSubscriptions.begin(); i != fServiceSubscriptions.end(); i++)
2313 for (map<string, SubscriptionType>::iterator j = i->second.begin(); j != i->second.end(); j++)
2314 {
2315 if (j->second.nightlyFile.IsOpen())
2316 j->second.nightlyFile.Close();;
2317 }
2318#endif
2319 if (GetCurrentState() == kSM_Logging ||
2320 GetCurrentState() == kSM_WaitingRun ||
2321 GetCurrentState() == kSM_NightlyOpen)
2322 {
2323 NotifyOpenedFile("", 0, fOpenedNightlyFiles);
2324 if (fNumSubAndFitsIsOn)
2325 fNumSubAndFits->updateService();
2326 }
2327#ifdef HAVE_FITS
2328 CreateFitsGrouping(fOpenedNightlyFits, 0);
2329#endif
2330 return kSM_Ready;
2331}
2332// --------------------------------------------------------------------------
2333//
2334//! Implements the transition towards kSM_WaitingRun
2335//! Does nothing really.
2336//! @returns
2337//! kSM_WaitingRun
2338int DataLogger::NightlyToWaitRunPlease()
2339{
2340 if (fDebugIsOn)
2341 {
2342 Debug("Going to Wait Run Number state...");
2343 }
2344 return kSM_WaitingRun;
2345}
2346// --------------------------------------------------------------------------
2347//
2348//! Setup Logger's configuration from a Configuration object
2349//! @param conf the configuration object that should be used
2350//!
2351bool DataLogger::SetConfiguration(Configuration& conf)
2352{
2353 fDebugIsOn = conf.Get<bool>("debug");
2354
2355 //Set the block or allow list
2356 fBlackList.clear();
2357 fWhiteList.clear();
2358
2359 //Adding entries that should ALWAYS be ignored
2360 //fBlackList.insert("DATA_LOGGER/");
2361 fBlackList.insert("/SERVICE_LIST");
2362 fBlackList.insert("DIS_DNS/");
2363
2364 //set the black list
2365 if (conf.Has("block"))
2366 {
2367 const vector<string> vec = conf.Get<vector<string>>("block");
2368
2369 fBlackList.insert(vec.begin(), vec.end());
2370 }
2371
2372 //set the white list
2373 if (conf.Has("allow"))
2374 {
2375 const vector<string> vec = conf.Get<vector<string>>("allow");
2376 fWhiteList.insert(vec.begin(), vec.end());
2377 }
2378
2379 //Set the grouping
2380 if (conf.Has("group"))
2381 {
2382 const vector<string> vec = conf.Get<vector<string>>("group");
2383 fGrouping.insert(vec.begin(), vec.end());
2384 }
2385
2386 //set the old run numbers timeout delay
2387 if (conf.Has("runtimeout"))
2388 {
2389 const long timeout = conf.Get<long>("runtimeout");
2390 if (timeout <= 0)
2391 {
2392 Error("Time out delay for old run numbers should be greater than 0 minute");
2393 return false;
2394 }
2395 fRunNumberTimeout = timeout;
2396 }
2397 return true;
2398}
2399
2400// --------------------------------------------------------------------------
2401int RunDim(Configuration &conf)
2402{
2403 WindowLog wout;
2404
2405 //log.SetWindow(stdscr);
2406 if (conf.Has("log"))
2407 if (!wout.OpenLogFile(conf.Get<string>("log")))
2408 wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
2409
2410 // Start io_service.Run to use the StateMachineImp::Run() loop
2411 // Start io_service.run to only use the commandHandler command detaching
2412 DataLogger logger(wout);
2413 if (!logger.SetConfiguration(conf))
2414 return -1;
2415
2416 logger.Run(true);
2417
2418 return 0;
2419}
2420// --------------------------------------------------------------------------
2421void RunThread(DataLogger* logger)
2422{
2423 // This is necessary so that the StateMachine Thread can signal the
2424 // Readline thread to exit
2425 logger->Run(true);
2426 Readline::Stop();
2427}
2428// --------------------------------------------------------------------------
2429template<class T>
2430int RunShell(Configuration &conf)
2431{
2432 static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
2433
2434 WindowLog &win = shell.GetStreamIn();
2435 WindowLog &wout = shell.GetStreamOut();
2436
2437 if (conf.Has("log"))
2438 if (!wout.OpenLogFile(conf.Get<string>("log")))
2439 win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
2440
2441 DataLogger logger(wout);
2442
2443 if (!logger.SetConfiguration(conf))
2444 return -1;
2445
2446 shell.SetReceiver(logger);
2447
2448 boost::thread t(boost::bind(RunThread, &logger));
2449
2450 shell.Run(); // Run the shell
2451
2452 logger.Stop();
2453
2454 //Wait until the StateMachine has finished its thread
2455 //before returning and destroyinng the dim objects which might
2456 //still be in use.
2457 t.join();
2458
2459 return 0;
2460}
2461
2462/*
2463 Extract usage clause(s) [if any] for SYNOPSIS.
2464 Translators: "Usage" and "or" here are patterns (regular expressions) which
2465 are used to match the usage synopsis in program output. An example from cp
2466 (GNU coreutils) which contains both strings:
2467 Usage: cp [OPTION]... [-T] SOURCE DEST
2468 or: cp [OPTION]... SOURCE... DIRECTORY
2469 or: cp [OPTION]... -t DIRECTORY SOURCE...
2470 */
2471void PrintUsage()
2472{
2473 cout << "\n"
2474 "The data logger connects to all available Dim services and "
2475 "writes them to ascii and fits files.\n"
2476 "\n"
2477 "The default is that the program is started without user interaction. "
2478 "All actions are supposed to arrive as DimCommands. Using the -c "
2479 "option, a local shell can be initialized. With h or help a short "
2480 "help message about the usage can be brought to the screen.\n"
2481 "\n"
2482 "Usage: dataLogger [-c type] [OPTIONS]\n"
2483 " or: dataLogger [OPTIONS]\n";
2484 cout << endl;
2485
2486}
2487// --------------------------------------------------------------------------
2488void PrintHelp()
2489{
2490 /* Additional help text which is printed after the configuration
2491 options goes here */
2492 cout <<
2493 "\n"
2494 "If the allow list has any element, only the servers and/or services "
2495 "specified in the list will be used for subscription. The black list "
2496 "will disable service subscription and has higher priority than the "
2497 "allow list. If the allow list is not present by default all services "
2498 "will be subscribed."
2499 "\n"
2500 "For example, block=DIS_DNS/ will skip all the services offered by "
2501 "the DIS_DNS server, while block=/SERVICE_LIST will skip all the "
2502 "SERVICE_LIST services offered by any server and DIS_DNS/SERVICE_LIST "
2503 "will skip DIS_DNS/SERVICE_LIST.\n"
2504 << endl;
2505}
2506
2507// --------------------------------------------------------------------------
2508void SetupConfiguration(Configuration &conf)
2509{
2510 const string n = conf.GetName()+".log";
2511
2512 po::options_description configp("Program options");
2513 configp.add_options()
2514 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
2515 ("log,l", var<string>(n), "Write log-file")
2516 ("console,c", var<int>(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
2517 ;
2518
2519 po::options_description configs("DataLogger options");
2520 configs.add_options()
2521 ("block,b", vars<string>(), "Black-list of services")
2522 ("allow,a", vars<string>(), "White-list of services")
2523 ("debug", po_bool(), "Debug mode. Print clear text of received service reports to log-stream")
2524 ("group,g", vars<string>(), "Grouping of services into a single run-Fits")
2525 ("runtimeout", var<long>(), "Time out delay for old run numbers")
2526 ;
2527
2528 conf.AddEnv("dns", "DIM_DNS_NODE");
2529
2530 conf.AddOptions(configp);
2531 conf.AddOptions(configs);
2532}
2533// --------------------------------------------------------------------------
2534int main(int argc, const char* argv[])
2535{
2536 Configuration conf(argv[0]);
2537 conf.SetPrintUsage(PrintUsage);
2538 SetupConfiguration(conf);
2539
2540 po::variables_map vm;
2541 try
2542 {
2543 vm = conf.Parse(argc, argv);
2544 }
2545#if BOOST_VERSION > 104000
2546 catch (po::multiple_occurrences &e)
2547 {
2548 cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl;
2549 return -1;
2550 }
2551#endif
2552 catch (exception& e)
2553 {
2554 cerr << "Program options invalid due to: " << e.what() << endl;
2555 return -1;
2556 }
2557
2558 if (conf.HasVersion() || conf.HasPrint())
2559 return -1;
2560
2561 if (conf.HasHelp())
2562 {
2563 PrintHelp();
2564 return -1;
2565 }
2566
2567 Dim::Setup(conf.Get<string>("dns"));
2568
2569// try
2570 {
2571 // No console access at all
2572 if (!conf.Has("console"))
2573 return RunDim(conf);
2574
2575 // Console access w/ and w/o Dim
2576 if (conf.Get<int>("console")==0)
2577 return RunShell<LocalShell>(conf);
2578 else
2579 return RunShell<LocalConsole>(conf);
2580 }
2581/* catch (exception& e)
2582 {
2583 cerr << "Exception: " << e.what() << endl;
2584 return -1;
2585 }*/
2586
2587 return 0;
2588}
Note: See TracBrowser for help on using the repository browser.