source: Evidence/DColl.cc @ 151

Last change on this file since 151 was 151, checked in by ogrimm, 11 years ago
DColl publishes also current data file name
File size: 12.5 KB
Line 
1/********************************************************************\
2
3  Central data collector of the Evidence Control System
4 
5  - DColl subscribes to all services given by the configuration
6    server and writes these to the data file at every update.
7  - One data file per day is generated, with roll-over at 13:00 local time.
8  - For each service, it creates a new service with '.hist' appended
9        that contains a history of events kept within a ring buffer. Each
10        entry will be the result of a conversion to double of the text
11        written to the data file. Only if the new value has changed by a
12        minimum amout it will be added to the ring buffer.
13  - The command 'DColl/Log' writes the associated string to the log
14    file specified in the configuration.
15 
16  Oliver Grimm, December 2009
17
18\********************************************************************/
19
20#define SERVER_NAME "DColl"
21
22#define DATE_ROLLOVER 12 // localtime hour after which next date is used
23
24#include "Evidence.h"
25
26#include <math.h>
27#include <float.h>
28#include <sys/stat.h>
29#include <ctype.h>
30#include <sys/types.h>
31#include <regex.h>
32
33//
34// Class declaration
35//
36class DataHandler:      public DimClient, public DimBrowser,
37                                        public EvidenceServer {
38
39        struct Item {
40          DimStampedInfo *DataItem;
41      DimService *HistService;
42          struct EvidenceHistoryItem *HistBuffer;
43          int HistPointer;
44          double LastValue;
45          double MinAbsChange;
46        } *List;
47
48        DimCommand *LogCommand;
49               
50    unsigned int NumItems;
51        char *Filename;
52    FILE *DataFile;
53    FILE *LogFile;
54        float DataSizekB, LogSizekB;
55        int DataSizeLastUpdate, LogSizeLastUpdate;
56        char *DataDir;
57    DimService *LogSizeService, *DataSizeService, *DataFilename;
58    int HistSize;
59        int SizeUpdateDelay;
60        int TimeForNextFile;
61       
62        int RegExCount;
63        regex_t *RegEx;
64        double *MinChange;
65       
66    void infoHandler();
67    void commandHandler();
68        void AddService(char *);
69        float FileSize(FILE *);
70           
71  public:
72    DataHandler();
73    ~DataHandler();
74}; 
75
76//
77// Constructor
78//
79DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
80
81  // Initialization to prevent freeing unallocated memory
82  Filename = NULL;
83  DataFile = NULL;
84  LogFile = NULL;
85  List = NULL;
86  LogSizeService = NULL;
87  DataSizeService = NULL;
88 
89  DataSizeLastUpdate = 0;
90  LogSizeLastUpdate = 0;
91  TimeForNextFile = 0;
92
93  // Request configuration data
94  char *Change = GetConfig(SERVER_NAME " minchange");
95  DataDir = GetConfig(SERVER_NAME " datadir");
96  char *Logname = GetConfig(SERVER_NAME " logfile");
97  SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
98  HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
99  if (HistSize < 1) HistSize = 1; // Minimum one items
100   
101  // Open log file
102  if ((LogFile = fopen(Logname, "a")) == NULL) {
103    State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
104  }
105
106  // Provide logging command   
107  LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
108             
109  // Create services for file sizes and data file name
110  DataSizekB = 0;
111  DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
112 
113  LogSizekB = FileSize(LogFile);
114  LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
115
116  DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
117
118  // Count how many minimum change value regular expressions are present
119  RegExCount = 0;
120  char *Token = strtok(Change, "\t ");
121  while (Token != NULL) {
122        RegExCount++;
123        Token = strtok(NULL, "\t ");
124  }
125 
126  // Allocate memory for regular expressions and minimum change values
127  RegEx = new regex_t[RegExCount];
128  MinChange = new double [RegExCount];
129
130  // Compile regular expressions
131  int Pos = 0;
132  for (int i=0; i<RegExCount; i++) {
133    int Len = strlen(Change+Pos) + 1;
134    Token = strtok(Change + Pos, ": \t");
135
136    int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
137        if (Ret != 0) {
138          char ErrMsg[200];
139          regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
140          State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
141        }
142        else {
143          if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
144          else MinChange[i] = 0;
145        }
146        Pos += Len;
147  }
148
149  // Subscribe to list of servers at DIS_DNS
150  AddService((char *) "DIS_DNS/SERVER_LIST");
151
152  DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***");
153}
154
155//
156// Destructor: Close files and free memory
157//
158DataHandler::~DataHandler() {
159
160  // Delete DIM services and command first so handlers and not called anymore
161  for (int i=0; i<NumItems; i++) {
162        delete List[i].HistService;
163    delete List[i].DataItem;
164    delete[] List[i].HistBuffer;
165  }
166  free(List);
167
168  delete DataFilename;
169  //delete LogSizeService; // These create segmentation faults?!
170  //delete DataSizeService;
171
172  delete LogCommand;
173  free(Filename);
174
175  // Close files
176  if (LogFile != NULL) if (fclose(LogFile) != 0) {
177        State(ERROR, "Could not close log file (%s)", strerror(errno));
178  }
179  if (DataFile != NULL && fclose(DataFile) != 0) {
180        State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
181  }
182
183  // Free memory for regular expressions handling
184  for (int i=0; i<RegExCount; i++) {
185    regfree(&RegEx[i]);
186  }
187  delete[] MinChange;
188  delete[] RegEx;
189}
190
191//
192// Implementation of data handling
193//
194// DIM ensures infoHandler() is called serialized, therefore
195// no mutex is needed to serialize writing to the file
196void DataHandler::infoHandler() {
197
198  DimInfo *Info = getInfo();
199
200  // Check if service available
201  if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
202
203  // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
204  if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {   
205        char *Token = strtok(Info->getString(), "+-!|@");       
206        while (Token != NULL) {
207          char *Buf;
208          if (MakeString(&Buf, "%s/SERVICE_LIST", Token) != -1) {
209            AddService(Buf);
210                free(Buf);
211          }
212          else State(ERROR, "MakeString() failed for server %s", Token);
213         
214          Token = strtok(NULL, "|");
215          Token = strtok(NULL, "+-!|@");       
216        }       
217        return;
218  }
219
220  // If service is SERVICE_LIST of any server, scan all services.
221  // Subscribe to all services (but not to commands and RPCs)
222  if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
223        char *Name = strtok(Info->getString(), "+-!|");
224        while (Name != NULL) {
225          char *Type = strtok(NULL, "\n");
226          if (Type == NULL) return; // for safety, should not happen
227      if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
228                AddService(Name);
229          }
230          Name = strtok(NULL, "+-!|");
231        }
232        return;
233  }
234
235  // If it is time to open new data file, close the current one
236  if (time(NULL) >= TimeForNextFile) {
237        if (DataFile != NULL && fclose(DataFile) != 0) {
238          State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
239        }
240        DataFile = NULL;
241  }
242 
243  // Open new data file if necessary
244  if (DataFile == NULL) {
245        time_t Time = time(NULL);
246        struct tm *T = localtime(&Time);
247       
248        if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
249        if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
250
251        free(Filename);
252        if (MakeString(&Filename, "%s/%d%02d%02d.slow", DataDir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == -1) State(FATAL, "Could not create filename, MakeString() failed");
253        if ((DataFile = fopen(Filename, "a")) == NULL) {
254      State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
255        }
256        else State(INFO, "Opened data file '%s'", Filename);
257        DataFilename->updateService(Filename);
258       
259        // Calculate time for next file opening
260        T->tm_sec = 0;
261        T->tm_min = 0;
262        T->tm_hour = DATE_ROLLOVER;
263        TimeForNextFile = mktime(T);
264  }
265   
266  // Identify index of service
267  int Service; 
268  for (Service=0; Service<NumItems; Service++) if (Info == List[Service].DataItem) break;
269  if (Service == NumItems) return;  // Service not found
270
271  // If negative value for absolute change, ignore this entry
272  if (List[Service].MinAbsChange < 0) return;
273
274  // Write data header
275  time_t RawTime = Info->getTimestamp();
276  struct tm *TM = localtime(&RawTime);
277
278  fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
279
280  // Translate data into ASCII
281  char *Text = EvidenceServer::ToString(Info);
282  if (Text != NULL) {
283        // Replace all control characters by white space
284        for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
285       
286        // Write to file
287    fprintf(DataFile, "%s\n", Text);
288       
289        // Add to history buffer if change large enough
290        if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) {
291          List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp();
292      List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text);
293      List[Service].HistService->updateService();
294          List[Service].HistPointer++;
295          if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0;
296          List[Service].LastValue = atof(Text);
297        }
298        free(Text);
299  }
300  else fprintf(DataFile, "Cannot interpret format identifier\n");
301 
302  fflush(DataFile);
303
304  // Terminate if error because otherwise infinite loop might result as
305  // next call to this infoHandler() will try to (re-)open file
306  if(ferror(DataFile)) {
307    fclose(DataFile);
308        DataFile = NULL;
309        State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
310  }
311
312  // Update datafile size service
313  if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
314        DataSizekB = FileSize(DataFile);
315        DataSizeService->updateService();
316        DataSizeLastUpdate = time(NULL);
317  }
318}
319
320//
321// Implementation of log writing
322//
323void DataHandler::commandHandler() {
324 
325  if (getCommand() != LogCommand || LogFile == NULL) return;
326
327  // Replace all carriage-return by line feed and non-printable characters
328  char *Text = getCommand()->getString();
329  for (unsigned int i=0; i<strlen(Text); i++) {
330    if (Text[i] == '\r') Text[i] = '\n';
331        if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
332  }
333 
334  time_t RawTime = time(NULL);
335  struct tm *TM = localtime(&RawTime);
336
337  fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
338                TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
339                TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
340
341  fflush(LogFile);
342 
343  // If error close file (otherwise infinite loop because State() also writes to log)
344  if(ferror(LogFile)) {
345    fclose(LogFile);
346        LogFile = NULL;
347    State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
348  }
349   
350  // Update logfile size service
351  if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
352        LogSizekB = FileSize(LogFile);
353        LogSizeService->updateService();
354        LogSizeLastUpdate = time(NULL);
355  }
356}
357
358
359//
360// Add service to watch list
361//
362void DataHandler::AddService(char *Name) {
363
364  // Do not subscribe to history services (otherwise infinite loop)
365  if (strstr(Name, ".hist") != NULL) return;
366
367  // Check if already subscribed to this service
368  for (int i=0; i<NumItems; i++) {
369        if(strcmp(Name, List[i].DataItem->getName()) == 0) return;
370  }
371
372  // Increase capacity of item list                     
373  struct Item *New = (struct Item *) realloc(List, (NumItems+1)*sizeof(struct Item));
374  if (New != NULL) List = New;
375  else {
376        State(ERROR, "Could not allocate memory for item list, service '' not added (%s)", Name, strerror(errno));
377        return;
378  }
379 
380  // Set minimum required change by comparing to regular expressions
381  List[NumItems].MinAbsChange = 0;
382  for (int i=0; i<RegExCount; i++) {
383    if (regexec(&RegEx[i], Name, (size_t) 0, NULL, 0) == 0) {
384          List[NumItems].MinAbsChange = MinChange[i];
385        }
386  }
387 
388  List[NumItems].LastValue = DBL_MAX;
389               
390  // Create history service
391  List[NumItems].HistBuffer = new struct EvidenceHistoryItem [HistSize];
392  memset(List[NumItems].HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem));
393  List[NumItems].HistPointer = 0;
394
395  char *Buf;
396  if (MakeString(&Buf, "%s.hist", Name) == -1) {
397    State(ERROR, "Could not create history service for '%s', MakeString() failed", Name);
398  }
399  else {
400        List[NumItems].HistService = new DimService (Buf, (char *) "C",
401                                          List[NumItems].HistBuffer, HistSize*sizeof(EvidenceHistoryItem));
402    free(Buf);
403  }
404
405  // Subscribe to service
406  List[NumItems].DataItem = new DimStampedInfo(Name, NO_LINK, this);
407
408  // Increase number only after all set-up
409  NumItems++;
410}
411
412//
413// Determine size of file in kB
414//
415float DataHandler::FileSize(FILE *File) {
416
417  struct stat FileStatus;
418
419  if (fstat(fileno(File), &FileStatus) == -1) {
420     State(WARN, "Could not determine size of file (%s)", strerror(errno));
421         return -1;
422  }
423
424  return (float) FileStatus.st_size/1024;
425}
426
427//         
428// Main program
429//
430int main() {
431       
432  // Static ensures calling of destructor by exit()
433  static DataHandler Data;
434 
435  // Sleep until signal caught
436  pause();
437}
Note: See TracBrowser for help on using the repository browser.