source: Evidence/DColl.cc @ 145

Last change on this file since 145 was 145, checked in by ogrimm, 11 years ago
Updates
File size: 12.3 KB
Line 
1/********************************************************************\
2
3  Central data collector of the Evidence Control System
4 
5  - DColl subscribes to all services given by the configuration
6    server and writes these to the data file at every update.
7  - One data file per day is generated, with roll-over at 13:00 local time.
8  - For each service, it creates a new service with '.hist' appended
9        that contains a history of events kept within a ring buffer. Each
10        entry will be the result of a conversion to double of the text
11        written to the data file. Only if the new value has changed by a
12        minimum amout it will be added to the ring buffer.
13  - The command 'DColl/Log' writes the associated string to the log
14    file specified in the configuration.
15 
16  Oliver Grimm, December 2009
17
18\********************************************************************/
19
20#define SERVER_NAME "DColl"
21
22#define DATE_ROLLOVER 12 // localtime hour after which next date is used
23
24#include "Evidence.h"
25
26#include <math.h>
27#include <float.h>
28#include <sys/stat.h>
29#include <ctype.h>
30#include <sys/types.h>
31#include <regex.h>
32
33//
34// Class declaration
35//
36class DataHandler:      public DimClient, public DimBrowser,
37                                        public EvidenceServer {
38
39        struct Item {
40          DimStampedInfo *DataItem;
41      DimService *HistService;
42          struct EvidenceHistoryItem *HistBuffer;
43          int HistPointer;
44          double LastValue;
45          double MinAbsChange;
46        } *List;
47
48        DimCommand *LogCommand;
49               
50    unsigned int NumItems;
51    FILE *DataFile;
52    FILE *LogFile;
53        float DataSizekB, LogSizekB;
54        int DataSizeLastUpdate, LogSizeLastUpdate;
55        char *DataDir;
56    DimService *LogSizeService, *DataSizeService;
57    int HistSize;
58        int SizeUpdateDelay;
59        int TimeForNextFile;
60       
61        int RegExCount;
62        regex_t *RegEx;
63        double *MinChange;
64       
65    void infoHandler();
66    void commandHandler();
67        void AddService(char *);
68        float FileSize(FILE *);
69           
70  public:
71    DataHandler();
72    ~DataHandler();
73}; 
74
75//
76// Constructor
77//
78DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
79
80  // Initialization to prevent freeing unallocated memory
81  DataFile = NULL;
82  LogFile = NULL;
83  List = NULL;
84  LogSizeService = NULL;
85  DataSizeService = NULL;
86 
87  DataSizeLastUpdate = 0;
88  LogSizeLastUpdate = 0;
89  TimeForNextFile = 0;
90
91  // Request configuration data
92  char *Change = GetConfig(SERVER_NAME " minchange");
93  DataDir = GetConfig(SERVER_NAME " datadir");
94  char *Logname = GetConfig(SERVER_NAME " logfile");
95  SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
96  HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
97  if (HistSize < 1) HistSize = 1; // Minimum one items
98   
99  // Open log file
100  if ((LogFile = fopen(Logname, "a")) == NULL) {
101    State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
102  }
103
104  // Provide logging command   
105  LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
106             
107  // Create services for file sizes
108  DataSizekB = 0;
109  DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
110 
111  LogSizekB = FileSize(LogFile);
112  LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
113
114  // Count how many minimum change value regular expressions are present
115  RegExCount = 0;
116  char *Token = strtok(Change, "\t ");
117  while (Token != NULL) {
118        RegExCount++;
119        Token = strtok(NULL, "\t ");
120  }
121 
122  // Allocate memory for regular expressions and minimum change values
123  RegEx = new regex_t[RegExCount];
124  MinChange = new double [RegExCount];
125
126  // Compile regular expressions
127  int Pos = 0;
128  for (int i=0; i<RegExCount; i++) {
129    int Len = strlen(Change+Pos) + 1;
130    Token = strtok(Change + Pos, ": \t");
131
132    int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
133        if (Ret != 0) {
134          char ErrMsg[200];
135          regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
136          State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
137        }
138        else {
139          if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
140          else MinChange[i] = 0;
141        }
142        Pos += Len;
143  }
144
145  // Subscribe to list of servers at DIS_DNS
146  AddService((char *) "DIS_DNS/SERVER_LIST");
147
148  DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***");
149}
150
151//
152// Destructor: Close files and free memory
153//
154DataHandler::~DataHandler() {
155
156  // Delete DIM services and command first so handlers and not called anymore
157  for (int i=0; i<NumItems; i++) {
158        delete List[i].HistService;
159    delete List[i].DataItem;
160    delete[] List[i].HistBuffer;
161  }
162  free(List);
163
164  //delete LogSizeService; // These create segmentation faults?!
165  //delete DataSizeService;
166
167  delete LogCommand;
168
169  // Close files
170  if (LogFile != NULL) if (fclose(LogFile) != 0) {
171        State(ERROR, "Could not close log file (%s)", strerror(errno));
172  }
173  if (DataFile != NULL && fclose(DataFile) != 0) {
174        State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
175  }
176
177  // Free memory for regular expressions handling
178  for (int i=0; i<RegExCount; i++) {
179    regfree(&RegEx[i]);
180  }
181  delete[] MinChange;
182  delete[] RegEx;
183}
184
185//
186// Implementation of data handling
187//
188// DIM ensures infoHandler() is called serialized, therefore
189// no mutex is needed to serialize writing to the file
190void DataHandler::infoHandler() {
191
192  DimInfo *Info = getInfo();
193
194  // Check if service available
195  if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
196
197  // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
198  if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {   
199        char *Token = strtok(Info->getString(), "+-!|@");       
200        while (Token != NULL) {
201          char *Buf;
202          if (MakeString(&Buf, "%s/SERVICE_LIST", Token) != -1) {
203            AddService(Buf);
204                free(Buf);
205          }
206          else State(ERROR, "MakeString() failed for server %s", Token);
207         
208          Token = strtok(NULL, "|");
209          Token = strtok(NULL, "+-!|@");       
210        }       
211        return;
212  }
213
214  // If service is SERVICE_LIST of any server, scan all services.
215  // Subscribe to all services (but not to commands and RPCs)
216  if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
217        char *Name = strtok(Info->getString(), "+-!|");
218        while (Name != NULL) {
219          char *Type = strtok(NULL, "\n");
220          if (Type == NULL) return; // for safety, should not happen
221      if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
222                AddService(Name);
223          }
224          Name = strtok(NULL, "+-!|");
225        }
226        return;
227  }
228
229  // If it is time to open new data file, close the current one
230  if (time(NULL) >= TimeForNextFile) {
231        if (DataFile != NULL && fclose(DataFile) != 0) {
232          State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
233        }
234        DataFile = NULL;
235  }
236 
237  // Open new data file if necessary
238  if (DataFile == NULL) {
239        time_t Time = time(NULL);
240        struct tm *T = localtime(&Time);
241       
242        if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
243        if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
244
245        char *Filename;
246        if (MakeString(&Filename, "%s/%d%02d%02d.slow", DataDir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == -1) State(FATAL, "Could not create filename, MakeString() failed");
247        if ((DataFile = fopen(Filename, "a")) == NULL) {
248      State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
249        }
250        else State(INFO, "Opened data file '%s'", Filename);
251        free(Filename);
252       
253        // Calculate time for next file opening
254        T->tm_sec = 0;
255        T->tm_min = 0;
256        T->tm_hour = DATE_ROLLOVER;
257        TimeForNextFile = mktime(T);
258  }
259   
260  // Identify index of service
261  int Service; 
262  for (Service=0; Service<NumItems; Service++) if (Info == List[Service].DataItem) break;
263  if (Service == NumItems) return;  // Service not found
264
265  // If negative value for absolute change, ignore this entry
266  if (List[Service].MinAbsChange < 0) return;
267
268  // Write data header
269  time_t RawTime = Info->getTimestamp();
270  struct tm *TM = localtime(&RawTime);
271
272  fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
273
274  // Translate data into ASCII
275  char *Text = EvidenceServer::ToString(Info);
276  if (Text != NULL) {
277        // Replace all control characters by white space
278        for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
279       
280        // Write to file
281    fprintf(DataFile, "%s\n", Text);
282       
283        // Add to history buffer if change large enough
284        if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) {
285          List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp();
286      List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text);
287      List[Service].HistService->updateService();
288          List[Service].HistPointer++;
289          if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0;
290          List[Service].LastValue = atof(Text);
291        }
292        free(Text);
293  }
294  else fprintf(DataFile, "Cannot interpret format identifier\n");
295 
296  fflush(DataFile);
297
298  // Terminate if error because otherwise infinite loop might result as
299  // next call to this infoHandler() will try to (re-)open file
300  if(ferror(DataFile)) {
301    fclose(DataFile);
302        DataFile = NULL;
303        State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
304  }
305
306  // Update datafile size service
307  if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
308        DataSizekB = FileSize(DataFile);
309        DataSizeService->updateService();
310        DataSizeLastUpdate = time(NULL);
311  }
312}
313
314//
315// Implementation of log writing
316//
317void DataHandler::commandHandler() {
318 
319  if (getCommand() != LogCommand || LogFile == NULL) return;
320
321  // Replace all carriage-return by line feed and non-printable characters
322  char *Text = getCommand()->getString();
323  for (unsigned int i=0; i<strlen(Text); i++) {
324    if (Text[i] == '\r') Text[i] = '\n';
325        if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
326  }
327 
328  time_t RawTime = time(NULL);
329  struct tm *TM = localtime(&RawTime);
330
331  fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
332                TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
333                TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
334
335  fflush(LogFile);
336 
337  // If error close file (otherwise infinite loop because State() also writes to log)
338  if(ferror(LogFile)) {
339    fclose(LogFile);
340        LogFile = NULL;
341    State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
342  }
343   
344  // Update logfile size service
345  if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
346        LogSizekB = FileSize(LogFile);
347        LogSizeService->updateService();
348        LogSizeLastUpdate = time(NULL);
349  }
350}
351
352
353//
354// Add service to watch list
355//
356void DataHandler::AddService(char *Name) {
357
358  // Do not subscribe to history services (otherwise infinite loop)
359  if (strstr(Name, ".hist") != NULL) return;
360
361  // Check if already subscribed to this service
362  for (int i=0; i<NumItems; i++) {
363        if(strcmp(Name, List[i].DataItem->getName()) == 0) return;
364  }
365
366  // Increase capacity of item list                     
367  struct Item *New = (struct Item *) realloc(List, (NumItems+1)*sizeof(struct Item));
368  if (New != NULL) List = New;
369  else {
370        State(ERROR, "Could not allocate memory for item list, service '' not added (%s)", Name, strerror(errno));
371        return;
372  }
373 
374  // Set minimum required change by comparing to regular expressions
375  List[NumItems].MinAbsChange = 0;
376  for (int i=0; i<RegExCount; i++) {
377    if (regexec(&RegEx[i], Name, (size_t) 0, NULL, 0) == 0) {
378          List[NumItems].MinAbsChange = MinChange[i];
379        }
380  }
381 
382  List[NumItems].LastValue = DBL_MAX;
383               
384  // Create history service
385  List[NumItems].HistBuffer = new struct EvidenceHistoryItem [HistSize];
386  memset(List[NumItems].HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem));
387  List[NumItems].HistPointer = 0;
388
389  char *Buf;
390  if (MakeString(&Buf, "%s.hist", Name) == -1) {
391    State(ERROR, "Could not create history service for '%s', MakeString() failed", Name);
392  }
393  else {
394        List[NumItems].HistService = new DimService (Buf, (char *) "C",
395                                          List[NumItems].HistBuffer, HistSize*sizeof(EvidenceHistoryItem));
396    free(Buf);
397  }
398
399  // Subscribe to service
400  List[NumItems].DataItem = new DimStampedInfo(Name, NO_LINK, this);
401
402  // Increase number only after all set-up
403  NumItems++;
404}
405
406//
407// Determine size of file in kB
408//
409float DataHandler::FileSize(FILE *File) {
410
411  struct stat FileStatus;
412
413  if (fstat(fileno(File), &FileStatus) == -1) {
414     State(WARN, "Could not determine size of file (%s)", strerror(errno));
415         return -1;
416  }
417
418  return (float) FileStatus.st_size/1024;
419}
420
421//         
422// Main program
423//
424int main() {
425       
426  // Static ensures calling of destructor by exit()
427  static DataHandler Data;
428 
429  // Sleep until signal caught
430  pause();
431}
Note: See TracBrowser for help on using the repository browser.