source: Evidence/DColl/DColl.cc @ 142

Last change on this file since 142 was 142, checked in by ogrimm, 11 years ago
Various updates
File size: 12.3 KB
Line 
1/********************************************************************\
2
3  Central data collector of the Evidence Control System
4 
5  - DColl subscribes to all services given by the configuration
6    server and writes these to the data file at every update.
7  - One data file per day is generated, with roll-over at 13:00 local time.
8  - For each service, it creates a new service with '.hist' appended
9        that contains a history of events kept within a ring buffer. Each
10        entry will be the result of a conversion to double of the text
11        written to the data file. Only if the new value has changed by a
12        minimum amout it will be added to the ring buffer.
13  - The command 'DColl/Log' writes the associated string to the log
14    file specified in the configuration.
15 
16  Oliver Grimm, December 2009
17
18\********************************************************************/
19
20#define SERVER_NAME "DColl"
21
22#define DATE_ROLLOVER 12 // localtime hour after which next date is used
23
24#include "../Evidence.h"
25#include <math.h>
26#include <float.h>
27#include <sys/stat.h>
28#include <ctype.h>
29#include <sys/types.h>
30#include <regex.h>
31
32//
33// Class declaration
34//
35class DataHandler:      public DimClient, public DimBrowser,
36                                        public EvidenceServer {
37
38        struct Item {
39          DimStampedInfo *DataItem;
40      DimService *HistService;
41          struct EvidenceHistoryItem *HistBuffer;
42          int HistPointer;
43          double LastValue;
44          double MinAbsChange;
45        } *List;
46
47        DimCommand *LogCommand;
48               
49    unsigned int NumItems;
50    FILE *DataFile;
51    FILE *LogFile;
52        float DataSizekB, LogSizekB;
53        int DataSizeLastUpdate, LogSizeLastUpdate;
54        char *DataDir;
55    DimService *LogSizeService, *DataSizeService;
56    int HistSize;
57        int SizeUpdateDelay;
58        int TimeForNextFile;
59       
60        int RegExCount;
61        regex_t *RegEx;
62        double *MinChange;
63       
64    void infoHandler();
65    void commandHandler();
66        void AddService(char *);
67        float FileSize(FILE *);
68           
69  public:
70    DataHandler();
71    ~DataHandler();
72}; 
73
74//
75// Constructor
76//
77DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
78
79  // Initialization to prevent freeing unallocated memory
80  DataFile = NULL;
81  LogFile = NULL;
82  List = NULL;
83  LogSizeService = NULL;
84  DataSizeService = NULL;
85 
86  DataSizeLastUpdate = 0;
87  LogSizeLastUpdate = 0;
88  TimeForNextFile = 0;
89
90  // Request configuration data
91  char *Change = GetConfig(SERVER_NAME " minchange");
92  DataDir = GetConfig(SERVER_NAME " datadir");
93  char *Logname = GetConfig(SERVER_NAME " logfile");
94  SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
95  HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
96  if (HistSize < 1) HistSize = 1; // Minimum one items
97   
98  // Open log file
99  if ((LogFile = fopen(Logname, "a")) == NULL) {
100    State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
101  }
102
103  // Provide logging command   
104  LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
105             
106  // Create services for file sizes
107  DataSizekB = 0;
108  DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
109 
110  LogSizekB = FileSize(LogFile);
111  LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
112
113  // Count how many minimum change value regular expressions are present
114  RegExCount = 0;
115  char *Token = strtok(Change, "\t ");
116  while (Token != NULL) {
117        RegExCount++;
118        Token = strtok(NULL, "\t ");
119  }
120 
121  // Allocate memory for regular expressions and minimum change values
122  RegEx = new regex_t[RegExCount];
123  MinChange = new double [RegExCount];
124
125  // Compile regular expressions
126  int Pos = 0;
127  for (int i=0; i<RegExCount; i++) {
128    int Len = strlen(Change+Pos) + 1;
129    Token = strtok(Change + Pos, ": \t");
130
131    int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
132        if (Ret != 0) {
133          char ErrMsg[200];
134          regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
135          State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
136        }
137        else {
138          if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
139          else MinChange[i] = 0;
140        }
141        Pos += Len;
142  }
143
144  // Subscribe to list of servers at DIS_DNS
145  AddService((char *) "DIS_DNS/SERVER_LIST");
146
147  DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***");
148}
149
150//
151// Destructor: Close files and free memory
152//
153DataHandler::~DataHandler() {
154
155  // Delete DIM services and command first so handlers and not called anymore
156  for (int i=0; i<NumItems; i++) {
157        delete List[i].HistService;
158    delete List[i].DataItem;
159    delete[] List[i].HistBuffer;
160  }
161  free(List);
162
163  //delete LogSizeService; // These create segmentation faults?!
164  //delete DataSizeService;
165
166  delete LogCommand;
167
168  // Close files
169  if (LogFile != NULL) if (fclose(LogFile) != 0) {
170        State(ERROR, "Could not close log file (%s)", strerror(errno));
171  }
172  if (DataFile != NULL && fclose(DataFile) != 0) {
173        State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
174  }
175
176  // Free memory for regular expressions handling
177  for (int i=0; i<RegExCount; i++) {
178    regfree(&RegEx[i]);
179  }
180  delete[] MinChange;
181  delete[] RegEx;
182}
183
184//
185// Implementation of data handling
186//
187// DIM ensures infoHandler() is called serialized, therefore
188// no mutex is needed to serialize writing to the file
189void DataHandler::infoHandler() {
190
191  DimInfo *Info = getInfo();
192
193  // Check if service available
194  if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
195
196  // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
197  if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {   
198        char *Token = strtok(Info->getString(), "+-!|@");       
199        while (Token != NULL) {
200          char *Buf;
201          if (MakeString(&Buf, "%s/SERVICE_LIST", Token) != -1) {
202            AddService(Buf);
203                free(Buf);
204          }
205          else State(ERROR, "MakeString() failed for server %s", Token);
206         
207          Token = strtok(NULL, "|");
208          Token = strtok(NULL, "+-!|@");       
209        }       
210        return;
211  }
212
213  // If service is SERVICE_LIST of any server, scan all services.
214  // Subscribe to all services (but not to commands and RPCs)
215  if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
216        char *Name = strtok(Info->getString(), "+-!|");
217        while (Name != NULL) {
218          char *Type = strtok(NULL, "\n");
219          if (Type == NULL) return; // for safety, should not happen
220      if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
221                AddService(Name);
222          }
223          Name = strtok(NULL, "+-!|");
224        }
225        return;
226  }
227
228  // If it is time to open new data file, close the current one
229  if (time(NULL) >= TimeForNextFile) {
230        if (DataFile != NULL && fclose(DataFile) != 0) {
231          State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
232        }
233        DataFile = NULL;
234  }
235 
236  // Open new data file if necessary
237  if (DataFile == NULL) {
238        time_t Time = time(NULL);
239        struct tm *T = localtime(&Time);
240       
241        if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
242        if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
243
244        char *Filename;
245        if (MakeString(&Filename, "%s/%d%02d%02d.slow", DataDir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == -1) State(FATAL, "Could not create filename, MakeString() failed");
246        if ((DataFile = fopen(Filename, "a")) == NULL) {
247      State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
248        }
249        else State(INFO, "Opened data file '%s'", Filename);
250        free(Filename);
251       
252        // Calculate time for next file opening
253        T->tm_sec = 0;
254        T->tm_min = 0;
255        T->tm_hour = DATE_ROLLOVER;
256        TimeForNextFile = mktime(T);
257  }
258   
259  // Identify index of service
260  int Service; 
261  for (Service=0; Service<NumItems; Service++) if (Info == List[Service].DataItem) break;
262  if (Service == NumItems) return;  // Service not found
263
264  // If negative value for absolute change, ignore this entry
265  if (List[Service].MinAbsChange < 0) return;
266
267  // Write data header
268  time_t RawTime = Info->getTimestamp();
269  struct tm *TM = localtime(&RawTime);
270
271  fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
272
273  // Translate data into ASCII
274  char *Text = EvidenceServer::ToString(Info);
275  if (Text != NULL) {
276        // Replace all control characters by white space
277        for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
278       
279        // Write to file
280    fprintf(DataFile, "%s\n", Text);
281       
282        // Add to history buffer if change large enough
283        if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) {
284          List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp();
285      List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text);
286      List[Service].HistService->updateService();
287          List[Service].HistPointer++;
288          if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0;
289          List[Service].LastValue = atof(Text);
290        }
291        free(Text);
292  }
293  else fprintf(DataFile, "Cannot interpret format identifier\n");
294 
295  fflush(DataFile);
296
297  // Terminate if error because otherwise infinite loop might result as
298  // next call to this infoHandler() will try to (re-)open file
299  if(ferror(DataFile)) {
300    fclose(DataFile);
301        DataFile = NULL;
302        State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
303  }
304
305  // Update datafile size service
306  if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
307        DataSizekB = FileSize(DataFile);
308        DataSizeService->updateService();
309        DataSizeLastUpdate = time(NULL);
310  }
311}
312
313//
314// Implementation of log writing
315//
316void DataHandler::commandHandler() {
317 
318  if (getCommand() != LogCommand || LogFile == NULL) return;
319
320  // Replace all carriage-return by line feed and non-printable characters
321  char *Text = getCommand()->getString();
322  for (unsigned int i=0; i<strlen(Text); i++) {
323    if (Text[i] == '\r') Text[i] = '\n';
324        if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
325  }
326 
327  time_t RawTime = time(NULL);
328  struct tm *TM = localtime(&RawTime);
329
330  fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
331                TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
332                TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
333
334  fflush(LogFile);
335 
336  // If error close file (otherwise infinite loop because State() also writes to log)
337  if(ferror(LogFile)) {
338    fclose(LogFile);
339        LogFile = NULL;
340    State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
341  }
342   
343  // Update logfile size service
344  if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
345        LogSizekB = FileSize(LogFile);
346        LogSizeService->updateService();
347        LogSizeLastUpdate = time(NULL);
348  }
349}
350
351
352//
353// Add service to watch list
354//
355void DataHandler::AddService(char *Name) {
356
357  // Do not subscribe to history services (otherwise infinite loop)
358  if (strstr(Name, ".hist") != NULL) return;
359
360  // Check if already subscribed to this service
361  for (int i=0; i<NumItems; i++) {
362        if(strcmp(Name, List[i].DataItem->getName()) == 0) return;
363  }
364
365  // Increase capacity of item list                     
366  struct Item *New = (struct Item *) realloc(List, (NumItems+1)*sizeof(struct Item));
367  if (New != NULL) List = New;
368  else {
369        State(ERROR, "Could not allocate memory for item list, service '' not added (%s)", Name, strerror(errno));
370        return;
371  }
372 
373  // Set minimum required change by comparing to regular expressions
374  List[NumItems].MinAbsChange = 0;
375  for (int i=0; i<RegExCount; i++) {
376    if (regexec(&RegEx[i], Name, (size_t) 0, NULL, 0) == 0) {
377          List[NumItems].MinAbsChange = MinChange[i];
378        }
379  }
380 
381  List[NumItems].LastValue = DBL_MAX;
382               
383  // Create history service
384  List[NumItems].HistBuffer = new struct EvidenceHistoryItem [HistSize];
385  memset(List[NumItems].HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem));
386  List[NumItems].HistPointer = 0;
387
388  char *Buf;
389  if (MakeString(&Buf, "%s.hist", Name) == -1) {
390    State(ERROR, "Could not create history service for '%s', MakeString() failed", Name);
391  }
392  else {
393        List[NumItems].HistService = new DimService (Buf, (char *) "C",
394                                          List[NumItems].HistBuffer, HistSize*sizeof(EvidenceHistoryItem));
395    free(Buf);
396  }
397
398  // Subscribe to service
399  List[NumItems].DataItem = new DimStampedInfo(Name, NO_LINK, this);
400
401  // Increase number only after all set-up
402  NumItems++;
403}
404
405//
406// Determine size of file in kB
407//
408float DataHandler::FileSize(FILE *File) {
409
410  struct stat FileStatus;
411
412  if (fstat(fileno(File), &FileStatus) == -1) {
413     State(WARN, "Could not determine size of file (%s)", strerror(errno));
414         return -1;
415  }
416
417  return (float) FileStatus.st_size/1024;
418}
419
420//         
421// Main program
422//
423int main() {
424       
425  // Static ensures calling of destructor by exit()
426  static DataHandler Data;
427 
428  // Sleep until signal caught
429  pause();
430}
Note: See TracBrowser for help on using the repository browser.