source: Evidence/DColl.cc @ 152

Last change on this file since 152 was 152, checked in by ogrimm, 11 years ago
Updates
File size: 13.3 KB
Line 
1/********************************************************************\
2
3  Central data collector of the Evidence Control System
4 
5  - DColl subscribes to all services given by the configuration
6    server and writes these to the data file at every update.
7  - One data file per day is generated, with roll-over at 13:00 local time.
8  - For each service, it creates a new service with '.hist' appended
9        that contains a history of events kept within a ring buffer. Each
10        entry will be the result of a conversion to double of the text
11        written to the data file. Only if the new value has changed by a
12        minimum amout it will be added to the ring buffer.
13  - The history buffers are written to disk at program termination and
14    are tired to be read when adding a service.
15  - The command 'DColl/Log' writes the associated string to the log
16    file specified in the configuration.
17 
18  Oliver Grimm, December 2009
19
20\********************************************************************/
21
22#define SERVER_NAME "DColl"
23
24#define DATE_ROLLOVER 12 // localtime hour after which next date is used
25
26#include "Evidence.h"
27
28#include <string>
29#include <sstream>
30#include <vector>
31#include <iomanip>
32
33#include <math.h>
34#include <float.h>
35#include <sys/stat.h>
36#include <ctype.h>
37#include <sys/types.h>
38#include <regex.h>
39
40//
41// Class declaration
42//
43class DataHandler:      public DimClient, public DimBrowser,
44                                        public EvidenceServer {
45
46        struct Item {
47          DimStampedInfo *DataItem;
48      DimService *HistService;
49          struct EvidenceHistoryItem *HistBuffer;
50          int HistPointer;
51          double LastValue;
52          double MinAbsChange;
53        };
54        vector<struct Item> List;
55       
56        DimCommand *LogCommand;
57               
58    FILE *DataFile;
59    FILE *LogFile;
60        char *Filename;
61        float DataSizekB, LogSizekB;
62        int DataSizeLastUpdate, LogSizeLastUpdate;
63        char *DataDir;
64    DimService *LogSizeService, *DataSizeService, *DataFilename;
65        string HistDir;
66    int HistSize;
67        int SizeUpdateDelay;
68        int TimeForNextFile;
69       
70        int RegExCount;
71        regex_t *RegEx;
72        double *MinChange;
73       
74    void infoHandler();
75    void commandHandler();
76        void AddService(string);
77        float FileSize(FILE *);
78           
79  public:
80    DataHandler();
81    ~DataHandler();
82}; 
83
84//
85// Constructor
86//
87DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
88
89  // Initialization to prevent freeing unallocated memory
90  DataFile = NULL;
91  LogFile = NULL;
92  Filename = NULL;
93 
94  LogSizeService = NULL;
95  DataSizeService = NULL;
96 
97  DataSizeLastUpdate = 0;
98  LogSizeLastUpdate = 0;
99  TimeForNextFile = 0;
100
101  // Request configuration data
102  char *Change = GetConfig(SERVER_NAME " minchange");
103
104  DataDir = GetConfig(SERVER_NAME " datadir");
105
106  char *Logname = GetConfig(SERVER_NAME " logfile");
107
108  SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
109
110  HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
111  if (HistSize < 1) HistSize = 1; // Minimum one items
112
113  HistDir = GetConfig(SERVER_NAME " histdir");
114   
115  // Open log file
116  if ((LogFile = fopen(Logname, "a")) == NULL) {
117    State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
118  }
119
120  // Provide logging command   
121  LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
122             
123  // Create services for file sizes and data file name
124  DataSizekB = 0;
125  DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
126 
127  LogSizekB = FileSize(LogFile);
128  LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
129
130  DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
131
132  // Count how many minimum change value regular expressions are present
133  RegExCount = 0;
134  char *Token = strtok(Change, "\t ");
135  while (Token != NULL) {
136        RegExCount++;
137        Token = strtok(NULL, "\t ");
138  }
139 
140  // Allocate memory for regular expressions and minimum change values
141  RegEx = new regex_t[RegExCount];
142  MinChange = new double [RegExCount];
143
144  // Compile regular expressions
145  int Pos = 0;
146  for (int i=0; i<RegExCount; i++) {
147    int Len = strlen(Change+Pos) + 1;
148    Token = strtok(Change + Pos, ": \t");
149
150    int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
151        if (Ret != 0) {
152          char ErrMsg[200];
153          regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
154          State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
155        }
156        else {
157          if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
158          else MinChange[i] = 0;
159        }
160        Pos += Len;
161  }
162
163  // Subscribe to list of servers at DIS_DNS
164  AddService("DIS_DNS/SERVER_LIST");
165
166  DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***");
167}
168
169//
170// Destructor: Close files and free memory
171//
172DataHandler::~DataHandler() {
173
174  DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging stopped ***");
175
176  // Delete DIM subscriptions and command first so handlers and not called anymore
177  for (int i=0; i<List.size(); i++) {
178    delete List[i].DataItem;
179  }
180  delete LogCommand;
181 
182  // Save history buffers to files (replace '/' by '_')
183  for (int i=0; i<List.size(); i++) {
184    string Name = List[i].HistService->getName();
185        for (int j=0; j<Name.size(); j++) if (Name[j] == '/') Name[j] = '_';
186    FILE *File = fopen((HistDir + "/" + Name).c_str(), "wb");
187        if (File != NULL) {
188      fwrite(&List[i].HistPointer, sizeof(List[i].HistPointer), 1, File);
189      fwrite(List[i].HistBuffer, sizeof(EvidenceHistoryItem), HistSize, File);
190      fclose(File);
191        }
192        delete List[i].HistService;
193    delete[] List[i].HistBuffer;
194  }
195
196  delete DataFilename;
197  delete[] Filename;
198 
199  //delete LogSizeService; // These create segmentation faults?!
200  //delete DataSizeService;
201
202  // Close files
203  if (LogFile != NULL) if (fclose(LogFile) != 0) {
204        State(ERROR, "Could not close log file (%s)", strerror(errno));
205  }
206  if (DataFile != NULL && fclose(DataFile) != 0) {
207        State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
208  }
209
210  // Free memory for regular expressions handling
211  for (int i=0; i<RegExCount; i++) {
212    regfree(&RegEx[i]);
213  }
214  delete[] MinChange;
215  delete[] RegEx;
216}
217
218//
219// Implementation of data handling
220//
221// DIM ensures infoHandler() is called serialized, therefore
222// no mutex is needed to serialize writing to the file
223void DataHandler::infoHandler() {
224
225  DimInfo *Info = getInfo();
226
227  // Check if service available
228  if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
229
230  // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
231  if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {   
232        char *Token = strtok(Info->getString(), "@");   
233        while (Token != NULL) {
234          if (isalpha(*Token) == 0) Token++; // Name can start with +,-,!
235         
236          AddService(string(Token)+"/SERVICE_LIST");     
237         
238          Token = strtok(NULL, "|");
239          Token = strtok(NULL, "|@");   // ???? Why needed ?????
240        }       
241        return;
242  }
243
244  // If service is SERVICE_LIST of any server, scan all services.
245  // Subscribe to all services (but not to commands and RPCs)
246  if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
247
248        char *Name = strtok(Info->getString(), "|");
249        while (Name != NULL) {
250          char *Type = strtok(NULL, "\n");
251          if (Type == NULL) return; // for safety, should not happen
252          if (isalpha(*Name) == 0) Name++; // Name can start with +,-,!
253      if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
254                AddService(Name);
255          }
256          Name = strtok(NULL, "|");
257        }
258        return;
259  }
260
261  // If it is time to open new data file, close the current one
262  if (time(NULL) >= TimeForNextFile) {
263        if (DataFile != NULL && fclose(DataFile) != 0) {
264          State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
265        }
266        DataFile = NULL;
267  }
268 
269  // Open new data file if necessary
270  if (DataFile == NULL) {
271        time_t Time = time(NULL);
272        struct tm *T = localtime(&Time);
273        ostringstream Buf;
274       
275        // Generate file name from date
276        if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
277        if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
278        Buf << DataDir << "/" << T->tm_year+1900 << setw(2) << setfill('0') << T->tm_mon+1 << T->tm_mday << ".slow";
279       
280        // Copy filename to permanent buffer
281        delete[] Filename;
282        Filename = new char [Buf.str().size()+1];
283        strcpy(Filename, Buf.str().c_str());
284       
285        // Open file
286        if ((DataFile = fopen(Filename, "a")) == NULL) {
287      State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
288        }
289        else State(INFO, "Opened data file '%s'", Filename);
290        DataFilename->updateService(Filename);
291       
292        // Calculate time for next file opening
293        T->tm_sec = 0;
294        T->tm_min = 0;
295        T->tm_hour = DATE_ROLLOVER;
296        TimeForNextFile = mktime(T);
297  }
298   
299  // Identify index of service
300  int Service; 
301  for (Service=0; Service<List.size(); Service++) if (Info == List[Service].DataItem) break;
302  if (Service == List.size()) return;  // Service not found
303
304  // If negative value for absolute change, ignore this entry
305  if (List[Service].MinAbsChange < 0) return;
306
307  // Write data header
308  time_t RawTime = Info->getTimestamp();
309  struct tm *TM = localtime(&RawTime);
310
311  fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
312
313  // Translate data into ASCII
314  char *Text = EvidenceServer::ToString(Info);
315  if (Text != NULL) {
316        // Replace all control characters by white space
317        for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
318       
319        // Write to file
320    fprintf(DataFile, "%s\n", Text);
321       
322        // Add to history buffer if change large enough
323        if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) {
324          List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp();
325      List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text);
326      List[Service].HistService->updateService();
327          List[Service].HistPointer++;
328          if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0;
329          List[Service].LastValue = atof(Text);
330        }
331        free(Text);
332  }
333  else fprintf(DataFile, "Cannot interpret format identifier\n");
334 
335  fflush(DataFile);
336
337  // Terminate if error because otherwise infinite loop might result as
338  // next call to this infoHandler() will try to (re-)open file
339  if(ferror(DataFile)) {
340    fclose(DataFile);
341        DataFile = NULL;
342        State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
343  }
344
345  // Update datafile size service
346  if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
347        DataSizekB = FileSize(DataFile);
348        DataSizeService->updateService();
349        DataSizeLastUpdate = time(NULL);
350  }
351}
352
353//
354// Implementation of log writing
355//
356void DataHandler::commandHandler() {
357 
358  if (getCommand() != LogCommand || LogFile == NULL) return;
359
360  // Replace all carriage-return by line feed and non-printable characters
361  char *Text = getCommand()->getString();
362  for (unsigned int i=0; i<strlen(Text); i++) {
363    if (Text[i] == '\r') Text[i] = '\n';
364        if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
365  }
366 
367  time_t RawTime = time(NULL);
368  struct tm *TM = localtime(&RawTime);
369
370  fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
371                TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
372                TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
373
374  fflush(LogFile);
375 
376  // If error close file (otherwise infinite loop because State() also writes to log)
377  if(ferror(LogFile)) {
378    fclose(LogFile);
379        LogFile = NULL;
380    State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
381  }
382   
383  // Update logfile size service
384  if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
385        LogSizekB = FileSize(LogFile);
386        LogSizeService->updateService();
387        LogSizeLastUpdate = time(NULL);
388  }
389}
390
391
392//
393// Add service to watch list
394//
395void DataHandler::AddService(string Name) {
396
397  // Do not subscribe to history services (otherwise infinite loop)
398  if (Name.find(".hist") != string::npos) return;
399
400  // Check if already subscribed to this service
401  for (int i=0; i<List.size(); i++) {
402        if (Name == List[i].DataItem->getName()) return;
403  }
404 
405  // Create new entry in item list
406  struct Item New;
407
408  // Set minimum required change by comparing to regular expressions
409  New.MinAbsChange = 0;
410  for (int i=0; i<RegExCount; i++) {
411    if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) {
412          New.MinAbsChange = MinChange[i];
413        }
414  }
415               
416  // Create history service
417  New.HistBuffer = new struct EvidenceHistoryItem [HistSize];
418  memset(New.HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem));
419  New.HistPointer = 0;
420  New.LastValue = DBL_MAX;
421  New.HistService = new DimService ((Name+".hist").c_str(), (char *) "C",
422                                          New.HistBuffer, HistSize*sizeof(EvidenceHistoryItem));
423       
424  // Load history buffer from file if existing
425  string Filename = New.HistService->getName();
426  for (int j=0; j<Filename.size(); j++) if (Filename[j] == '/') Filename[j] = '_';
427  FILE *File = fopen((HistDir + "/" + Filename).c_str(), "rb");
428  if (File != NULL) {
429    fread(&New.HistPointer, sizeof(New.HistPointer), 1, File);
430    fread(New.HistBuffer, sizeof(EvidenceHistoryItem), HistSize, File);
431    fclose(File);
432  }
433 
434  // Subscribe to service
435  New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
436
437  // Add item to list
438  List.push_back(New);
439}
440
441//
442// Determine size of file in kB
443//
444float DataHandler::FileSize(FILE *File) {
445
446  struct stat FileStatus;
447
448  if (fstat(fileno(File), &FileStatus) == -1) {
449     State(WARN, "Could not determine size of file (%s)", strerror(errno));
450         return -1;
451  }
452
453  return (float) FileStatus.st_size/1024;
454}
455
456//         
457// Main program
458//
459int main() {
460       
461  // Static ensures calling of destructor by exit()
462  static DataHandler Data;
463 
464  // Sleep until signal caught
465  pause();
466}
Note: See TracBrowser for help on using the repository browser.