source: Evidence/DColl.cc@ 173

Last change on this file since 173 was 173, checked in by daqct3, 15 years ago
Update to regular expression matching
File size: 16.2 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated, with roll-over at 13:00 local time.
8 - For each service, it creates a new service with '.hist' appended
9 that contains a history of events kept within a ring buffer. Each
10 entry will be the result of a conversion to double of the text
11 written to the data file. Only if the new value has changed by a
12 minimum amout it will be added to the ring buffer.
13 - The history buffers are written to disk at program termination and
14 are tired to be read when adding a service.
15 - The command 'DColl/Log' writes the associated string to the log
16 file specified in the configuration.
17
18 Oliver Grimm, December 2009
19
20\********************************************************************/
21
22#define SERVER_NAME "DColl"
23
24#define DATE_ROLLOVER 12 // localtime hour after which next date is used
25#define MIN_HIST_SIZE 1024 // Minimum history buffer in bytes (> 3*sizeof(int) !)
26
27#include "Evidence.h"
28
29#include <string>
30#include <sstream>
31#include <vector>
32#include <iomanip>
33
34#include <math.h>
35#include <float.h>
36#include <sys/stat.h>
37#include <ctype.h>
38#include <sys/types.h>
39#include <regex.h>
40
41using namespace std;
42
43//
44// Class declaration
45//
46class DataHandler: public DimClient, public DimBrowser,
47 public EvidenceServer {
48
49 struct Item {
50 DimStampedInfo *DataItem;
51 DimService *HistService;
52 char *Buffer;
53 unsigned int HistSize;
54 int Next;
55 double LastValue;
56 double MinAbsChange;
57 };
58 vector<struct Item> List;
59
60 DimCommand *LogCommand;
61
62 FILE *DataFile;
63 FILE *LogFile;
64 char *Filename;
65 float DataSizekB, LogSizekB;
66 int DataSizeLastUpdate, LogSizeLastUpdate;
67 char *DataDir;
68 DimService *LogSizeService, *DataSizeService, *DataFilename;
69 string HistDir;
70 int SizeUpdateDelay;
71 int TimeForNextFile;
72
73 int RegExCount;
74 regex_t *RegEx;
75 double *MinChange;
76 unsigned int *HistSize;
77
78 void infoHandler();
79 void commandHandler();
80 void AddService(string);
81 void RemoveService(string);
82 float FileSize(FILE *);
83
84 public:
85 DataHandler();
86 ~DataHandler();
87};
88
89//
90// Constructor
91//
92DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
93
94 // Initialization to prevent freeing unallocated memory
95 DataFile = NULL;
96 LogFile = NULL;
97 Filename = NULL;
98
99 LogSizeService = NULL;
100 DataSizeService = NULL;
101
102 DataSizeLastUpdate = 0;
103 LogSizeLastUpdate = 0;
104 TimeForNextFile = 0;
105
106 // Request configuration data
107 char *Change = GetConfig(SERVER_NAME " items");
108 DataDir = GetConfig(SERVER_NAME " datadir");
109 char *Logname = GetConfig(SERVER_NAME " logfile");
110 SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
111 HistDir = GetConfig(SERVER_NAME " histdir");
112
113 // Open log file
114 if ((LogFile = fopen(Logname, "a")) == NULL) {
115 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
116 }
117
118 // Provide logging command
119 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
120
121 // Create services for file sizes and data file name
122 DataSizekB = 0;
123 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
124
125 LogSizekB = FileSize(LogFile);
126 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
127
128 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
129
130 // Count how many minimum change value regular expressions are present
131 RegExCount = 0;
132 char *Token = strtok(Change, "\t ");
133 while (Token != NULL) {
134 RegExCount++;
135 Token = strtok(NULL, "\t ");
136 }
137
138 // Allocate memory for regular expressions, minimum change and history size
139 RegEx = new regex_t[RegExCount];
140 MinChange = new double [RegExCount];
141 HistSize = new unsigned int [RegExCount];
142
143 // Compile regular expressions and extract minimum change and history size
144 int Pos = 0;
145 for (int i=0; i<RegExCount; i++) {
146 int Len = strlen(Change+Pos) + 1;
147 Token = strtok(Change + Pos, ": \t");
148
149 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
150 if (Ret != 0) {
151 char ErrMsg[200];
152 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
153 State(FATAL, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
154 }
155 else {
156 if ((Token=strtok(NULL, ": \t")) != NULL) MinChange[i] = atof(Token);
157 else MinChange[i] = 0;
158
159 if ((Token=strtok(NULL, "")) != NULL) HistSize[i] = atoi(Token)*1024;
160 else HistSize[i] = 0;
161 }
162 Pos += Len;
163 }
164
165 // Subscribe to list of servers at DIS_DNS
166 AddService("DIS_DNS/SERVER_LIST");
167}
168
169//
170// Destructor: Close files and free memory
171//
172DataHandler::~DataHandler() {
173
174 // Delete all DIM subscriptions
175 while (List.size() != 0) RemoveService(List[0].DataItem->getName());
176
177 delete LogCommand;
178 delete DataFilename;
179 delete[] Filename;
180
181 //delete LogSizeService; // These create segmentation faults?!
182 //delete DataSizeService;
183
184 // Close files
185 if (LogFile != NULL) if (fclose(LogFile) != 0) {
186 State(ERROR, "Could not close log file (%s)", strerror(errno));
187 }
188 if (DataFile != NULL && fclose(DataFile) != 0) {
189 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
190 }
191
192 // Free memory for regular expressions handling
193 for (int i=0; i<RegExCount; i++) {
194 regfree(&RegEx[i]);
195 }
196 delete[] MinChange;
197 delete[] RegEx;
198}
199
200//
201// Implementation of data handling
202//
203// DIM ensures infoHandler() is called serialized, therefore
204// no mutex is needed to serialize writing to the file
205void DataHandler::infoHandler() {
206
207 static const int WrapMark[] = {0, -1};
208 static const int EndMark[] = {0, 0};
209
210 DimInfo *Info = getInfo();
211
212 // Check if service available
213 if (!ServiceOK(Info)) return;
214
215 //
216 // ====== Part A: Handle service subscriptions ===
217 //
218
219 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
220 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
221 char *Token = strtok(Info->getString(), "+-!@");
222 while (Token != NULL) {
223 if (*Info->getString()=='-' || *Info->getString()=='!') RemoveService(string(Token)+"/SERVICE_LIST");
224 else AddService(string(Token)+"/SERVICE_LIST");
225 Token = strtok(NULL, "|"); // Skip server IP address
226 Token = strtok(NULL, "@");
227 }
228 return;
229 }
230
231 // If service is SERVICE_LIST of any server, scan all services.
232 // Subscribe to all services (but not to commands and RPCs)
233 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
234 char *Name = strtok(Info->getString(), "+-!|");
235 while (Name != NULL) {
236 // Check if item is a service
237 char *Type = strtok(NULL, "\n");
238 if (Type == NULL) return; // for safety, should not happen
239 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
240 // Add or remove service
241 if (*Info->getString()=='-' || *Info->getString()=='!') RemoveService(Name);
242 else AddService(Name);
243 }
244 Name = strtok(NULL, "|");
245 }
246 return;
247 }
248
249 //
250 // ====== Part B: Handle opening data files ===
251 //
252
253 // If it is time to open new data file, close the current one
254 if (time(NULL) >= TimeForNextFile) {
255 if (DataFile != NULL && fclose(DataFile) != 0) {
256 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
257 }
258 DataFile = NULL;
259 }
260
261 // Open new data file if necessary
262 if (DataFile == NULL) {
263 time_t Time = time(NULL);
264 struct tm *T = localtime(&Time);
265
266 // Get time structure with date rollover
267 if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
268 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
269
270 // Create direcory if not existing (ignore error if already existing)
271 char *Dir;
272 if (asprintf(&Dir, "%s/%d%02d",DataDir, T->tm_year+1900, T->tm_mon + 1) == -1) {
273 State(FATAL, "asprintf() failed, could not create direcory name");
274 }
275 if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) {
276 State(FATAL, "Could not create direcory '%s' (%s)", Dir, strerror(errno));
277 }
278
279 // Create filename
280 free(Filename);
281 if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) {
282 State(FATAL, "asprintf() failed, could not create filename");
283 }
284 free(Dir);
285
286 // Open file
287 if ((DataFile = fopen(Filename, "a")) == NULL) {
288 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
289 }
290 else State(INFO, "Opened data file '%s'", Filename);
291 DataFilename->updateService(Filename);
292
293 // Calculate time for next file opening
294 T->tm_sec = 0;
295 T->tm_min = 0;
296 T->tm_hour = DATE_ROLLOVER;
297 TimeForNextFile = mktime(T);
298 }
299
300 //
301 // ====== Part C: Handle writing to data file ===
302 //
303
304 // Identify index of service
305 int Service;
306 for (Service=0; Service<List.size(); Service++) if (Info == List[Service].DataItem) break;
307 if (Service == List.size()) return; // Service not found
308
309 // If negative value for absolute change, do not write to file
310 if (List[Service].MinAbsChange >= 0) {
311 // Write data header
312 time_t RawTime = Info->getTimestamp();
313 struct tm *TM = localtime(&RawTime);
314
315 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
316
317 // Translate data into ASCII
318 char *Text = EvidenceServer::ToString(Info);
319
320 if (Text != NULL) {
321 // Replace all control characters by white space
322 for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
323
324 // Write to file
325 fprintf(DataFile, "%s\n", Text);
326
327 free(Text);
328 }
329 else fprintf(DataFile, "Cannot interpret format identifier\n");
330
331 // Terminate if error because otherwise infinite loop might result as
332 // next call to this infoHandler() will try to (re-)open file
333 if(ferror(DataFile)) {
334 fclose(DataFile);
335 DataFile = NULL;
336 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
337 }
338
339 // Update datafile size service (not every time to avoid infinite loop)
340 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
341 fflush(DataFile); // not continuously to reduce load
342
343 DataSizekB = FileSize(DataFile);
344 DataSizeService->updateService();
345 DataSizeLastUpdate = time(NULL);
346 }
347 } // Check for disk writing
348
349 //
350 // ====== Part D: Handle history service ===
351 //
352
353 if (Info->getSize() == 0) return;
354
355 // Check if data should be added to history buffer
356 if (strcmp(Info->getFormat(),"C") != 0 && strlen(Info->getFormat())==1) {
357 // Calculate sum of all number in array
358 char *Text = EvidenceServer::ToString(Info);
359 char *Token = strtok(Text, " ");
360 double Sum = 0;
361 while (Token != NULL) {
362 Sum += atof(Token);
363 Token = strtok(NULL, " ");
364 }
365 free(Text);
366 // Minimum change?
367 if (fabs(Sum-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) return;
368 List[Service].LastValue = Sum;
369 }
370
371 // Check if data fits into buffer
372 if (List[Service].HistSize < Info->getSize() + 5*sizeof(int)) return;
373
374 int Size = Info->getSize() + 4*sizeof(int), Next = List[Service].Next;
375 void *WrapPos = NULL;
376 char *Buffer = List[Service].Buffer;
377 int Oldest = *(int *) Buffer;
378
379 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
380 if (Next + Size >= List[Service].HistSize) {
381 WrapPos = Buffer + Next;
382 Next = 4;
383 }
384
385 // Adapt pointer to oldest entry
386 while ((Oldest < Next + Size) &&
387 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
388 // Check for wrap-around
389 if (memcmp(Buffer + Oldest, WrapMark, sizeof(WrapMark)) == 0) {
390 Oldest = 4;
391 continue;
392 }
393 // Check if end marker reached, then only one event fits buffer
394 if (memcmp(Buffer + Oldest, EndMark, sizeof(EndMark)) == 0) {
395 Oldest = Next;
396 break;
397 }
398 // Move to next entry
399 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
400 }
401 // Update pointer in buffer
402 *(int *) Buffer = Oldest;
403
404 // Write wrap mark if necessary
405 if (WrapPos != NULL) memcpy(WrapPos, WrapMark, sizeof(WrapMark));
406
407 // Copy data into ring buffer
408 *((int *) (Buffer + Next)) = Info->getTimestamp();
409 *((int *) (Buffer + Next + sizeof(int))) = Info->getSize();
410 memcpy(Buffer + Next + 2*sizeof(int), Info->getData(), Info->getSize());
411
412 // Adjust pointer for next entry and write end marker to buffer
413 Next += Info->getSize() + 2*sizeof(int);
414 memcpy(Buffer + Next, EndMark, sizeof(EndMark));
415
416 List[Service].Next = Next;
417}
418
419//
420// Implementation of log writing
421//
422void DataHandler::commandHandler() {
423
424 if (getCommand() != LogCommand || LogFile == NULL) return;
425
426 // Replace all carriage-return by line feed and non-printable characters
427 char *Text = getCommand()->getString();
428 for (unsigned int i=0; i<strlen(Text); i++) {
429 if (Text[i] == '\r') Text[i] = '\n';
430 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
431 }
432
433 time_t RawTime = time(NULL);
434 struct tm *TM = localtime(&RawTime);
435
436 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
437 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
438 TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
439
440 fflush(LogFile);
441
442 // If error close file (otherwise infinite loop because State() also writes to log)
443 if(ferror(LogFile)) {
444 fclose(LogFile);
445 LogFile = NULL;
446 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
447 }
448
449 // Update logfile size service
450 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
451 LogSizekB = FileSize(LogFile);
452 LogSizeService->updateService();
453 LogSizeLastUpdate = time(NULL);
454 }
455}
456
457
458//
459// Add service to watch list
460//
461void DataHandler::AddService(string Name) {
462
463 // Do not subscribe to history services (otherwise infinite loop)
464 if (Name.find(".hist") != string::npos) return;
465
466 // Check if already subscribed to this service
467 for (int i=0; i<List.size(); i++) {
468 if (Name == List[i].DataItem->getName()) return;
469 }
470
471 struct Item New;
472
473 // Set minimum required change by comparing to regular expressions
474 New.MinAbsChange = 0;
475 New.HistSize = 0;
476 for (int i=0; i<RegExCount; i++) {
477 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) {
478 New.MinAbsChange = MinChange[i];
479 if (HistSize[i] != 0) New.HistSize = HistSize[i];
480 }
481 }
482
483 // At least 3*sizeof(int)
484 if (New.HistSize < MIN_HIST_SIZE) New.HistSize = MIN_HIST_SIZE;
485
486 // Create history service
487 New.Buffer = new char [New.HistSize];
488 memset(New.Buffer, 0, New.HistSize);
489 *(int *) New.Buffer = 4;
490 New.Next = 4;
491 New.LastValue = DBL_MAX;
492 New.HistService = new DimService ((Name+".hist").c_str(), (char *) "C",
493 New.Buffer, New.HistSize);
494
495 // Load history buffer from file if existing
496 string Filename = New.HistService->getName();
497 for (int j=0; j<Filename.size(); j++) if (Filename[j] == '/') Filename[j] = '_';
498 FILE *File = fopen((HistDir + "/" + Filename).c_str(), "rb");
499 if (File != NULL) {
500 // Only load if current buffer size if equal or larger
501 if (FileSize(File) <= New.HistSize*sizeof(char)+sizeof(New.Next) && FileSize(File) != -1) {
502 fread(&New.Next, sizeof(New.Next), 1, File);
503 fread(New.Buffer, sizeof(char), New.HistSize, File);
504 fclose(File);
505 }
506 }
507
508 // Subscribe to service
509 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
510
511 // Add item to list
512 List.push_back(New);
513}
514
515
516//
517// Remove service from watch list
518//
519void DataHandler::RemoveService(string Name) {
520
521 // Find service index
522 vector<struct Item>::iterator E;
523 for (E=List.begin(); E<List.end(); ++E) if (Name == (*E).DataItem->getName()) {
524 // Delete subscription first so handler and not called anymore
525 delete (*E).DataItem;
526
527 // Save history buffer (replace '/' by '_')
528 string Name = (*E).HistService->getName();
529 for (int j=0; j<Name.size(); j++) if (Name[j] == '/') Name[j] = '_';
530 FILE *File = fopen((HistDir + "/" + Name).c_str(), "wb");
531 if (File != NULL) {
532 fwrite(&(*E).Next, sizeof((*E).Next), 1, File);
533 fwrite((*E).Buffer, sizeof(char), (*E).HistSize, File);
534 fclose(File);
535 }
536
537 // Delete history service and free memory
538 delete (*E).HistService;
539 delete[] (*E).Buffer;
540 List.erase(E);
541 }
542}
543
544
545//
546// Determine size of file in kB
547//
548float DataHandler::FileSize(FILE *File) {
549
550 struct stat FileStatus;
551
552 if (fstat(fileno(File), &FileStatus) == -1) {
553 State(WARN, "Could not determine size of file (%s)", strerror(errno));
554 return -1;
555 }
556
557 return (float) FileStatus.st_size/1024;
558}
559
560//
561// Main program
562//
563int main() {
564
565 // Static ensures calling of destructor by exit()
566 static DataHandler Data;
567
568 // Sleep until signal caught
569 pause();
570}
Note: See TracBrowser for help on using the repository browser.