source: Evidence/DColl.cc@ 203

Last change on this file since 203 was 203, checked in by ogrimm, 15 years ago
Config provides service containing all configuration data, config parsing improved
File size: 16.2 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated, with roll-over at 13:00 local time.
8 - For each service, it creates a new service with '.hist' appended
9 that contains a history of events kept within a ring buffer. Each
10 entry will be the result of a conversion to double of the text
11 written to the data file. Only if the new value has changed by a
12 minimum amout it will be added to the ring buffer.
13 - The history buffers are written to disk at program termination and
14 are tired to be read when adding a service.
15 - The command 'DColl/Log' writes the associated string to the log
16 file specified in the configuration.
17
18 Oliver Grimm, December 2009
19
20\********************************************************************/
21
22#define SERVER_NAME "DColl"
23
24#define DATE_ROLLOVER 12 // localtime hour after which next date is used
25#define MIN_HIST_SIZE 1024 // Minimum history buffer in bytes (> 3*sizeof(int) !)
26
27#include "Evidence.h"
28
29#include <string>
30#include <sstream>
31#include <vector>
32#include <iomanip>
33
34#include <math.h>
35#include <float.h>
36#include <sys/stat.h>
37#include <ctype.h>
38#include <sys/types.h>
39#include <regex.h>
40
41using namespace std;
42
43//
44// Class declaration
45//
46class DataHandler: public DimClient, public DimBrowser,
47 public EvidenceServer {
48
49 struct Item {
50 DimStampedInfo *DataItem;
51 DimService *HistService;
52 char *Buffer;
53 unsigned int HistSize;
54 int Next;
55 double LastValue;
56 double MinAbsChange;
57 };
58 vector<struct Item> List;
59
60 DimCommand *LogCommand;
61
62 FILE *DataFile;
63 FILE *LogFile;
64 char *Filename;
65 float DataSizekB, LogSizekB;
66 int DataSizeLastUpdate, LogSizeLastUpdate;
67 char *DataDir;
68 DimService *LogSizeService, *DataSizeService, *DataFilename;
69 string HistDir;
70 int SizeUpdateDelay;
71 int TimeForNextFile;
72
73 int RegExCount;
74 regex_t *RegEx;
75 double *MinChange;
76 unsigned int *HistSize;
77
78 void infoHandler();
79 void commandHandler();
80 void AddService(string);
81 void RemoveService(string);
82 float FileSize(FILE *);
83
84 public:
85 DataHandler();
86 ~DataHandler();
87};
88
89//
90// Constructor
91//
92DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
93
94 // Initialization to prevent freeing unallocated memory
95 DataFile = NULL;
96 LogFile = NULL;
97 Filename = NULL;
98
99 LogSizeService = NULL;
100 DataSizeService = NULL;
101
102 DataSizeLastUpdate = 0;
103 LogSizeLastUpdate = 0;
104 TimeForNextFile = 0;
105
106 // Request configuration data
107 DataDir = GetConfig("datadir");
108 SizeUpdateDelay = atoi(GetConfig("sizeupdate"));
109 HistDir = GetConfig("histdir");
110
111 // Open log file
112 char *Logname = GetConfig("logfile");
113 if ((LogFile = fopen(Logname, "a")) == NULL) {
114 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
115 }
116
117 // Provide logging command
118 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
119
120 // Create services for file sizes and data file name
121 DataSizekB = 0;
122 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
123
124 LogSizekB = FileSize(LogFile);
125 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
126
127 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
128
129 // Count how many minimum change value regular expressions are present
130 char *Change = GetConfig("items");
131 RegExCount = 0;
132 char *Token = strtok(Change, "\t ");
133 while (Token != NULL) {
134 RegExCount++;
135 Token = strtok(NULL, "\t ");
136 }
137
138 // Allocate memory for regular expressions, minimum change and history size
139 RegEx = new regex_t[RegExCount];
140 MinChange = new double [RegExCount];
141 HistSize = new unsigned int [RegExCount];
142
143 // Compile regular expressions and extract minimum change and history size
144 int Pos = 0;
145 for (int i=0; i<RegExCount; i++) {
146 int Len = strlen(Change+Pos) + 1;
147 Token = strtok(Change + Pos, ": \t");
148
149 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
150 if (Ret != 0) {
151 char ErrMsg[200];
152 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
153 State(FATAL, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
154 }
155 else {
156 if ((Token=strtok(NULL, ": \t")) != NULL) MinChange[i] = atof(Token);
157 else MinChange[i] = 0;
158
159 if ((Token=strtok(NULL, "")) != NULL) HistSize[i] = atoi(Token)*1024;
160 else HistSize[i] = 0;
161 }
162 Pos += Len;
163 }
164
165 // Subscribe to list of servers at DIS_DNS
166 AddService("DIS_DNS/SERVER_LIST");
167}
168
169//
170// Destructor: Close files and free memory
171//
172DataHandler::~DataHandler() {
173
174 // Delete all DIM subscriptions
175 while (List.size() != 0) RemoveService(List[0].DataItem->getName());
176
177 delete LogCommand;
178 delete DataFilename;
179 delete[] Filename;
180
181 //delete LogSizeService; // These create segmentation faults?!
182 //delete DataSizeService;
183
184 // Close files
185 if (LogFile != NULL) if (fclose(LogFile) != 0) {
186 State(ERROR, "Could not close log file (%s)", strerror(errno));
187 }
188 if (DataFile != NULL && fclose(DataFile) != 0) {
189 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
190 }
191
192 // Free memory for regular expressions handling
193 for (int i=0; i<RegExCount; i++) {
194 regfree(&RegEx[i]);
195 }
196 delete[] MinChange;
197 delete[] RegEx;
198}
199
200//
201// Implementation of data handling
202//
203// DIM ensures infoHandler() is called serialized, therefore
204// no mutex is needed to serialize writing to the file
205void DataHandler::infoHandler() {
206
207 static const int WrapMark[] = {0, -1};
208 static const int EndMark[] = {0, 0};
209
210 DimInfo *Info = getInfo();
211
212 // Check if service available
213 if (!ServiceOK(Info)) return;
214
215 //
216 // ====== Part A: Handle service subscriptions ===
217 //
218
219 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
220 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
221 char *Token = strtok(Info->getString(), "+-!@");
222 while (Token != NULL) {
223 if (*Info->getString()=='-' || *Info->getString()=='!') RemoveService(string(Token)+"/SERVICE_LIST");
224 else AddService(string(Token)+"/SERVICE_LIST");
225 Token = strtok(NULL, "|"); // Skip server IP address
226 Token = strtok(NULL, "@");
227 }
228 return;
229 }
230
231 // If service is SERVICE_LIST of any server, scan all services.
232 // Subscribe to all services (but not to commands and RPCs)
233 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
234 char *Name = strtok(Info->getString(), "+-!|");
235 while (Name != NULL) {
236 // Check if item is a service
237 char *Type = strtok(NULL, "\n");
238 if (Type == NULL) return; // for safety, should not happen
239 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
240 // Add or remove service
241 if (*Info->getString()=='-' || *Info->getString()=='!') RemoveService(Name);
242 else AddService(Name);
243 }
244 Name = strtok(NULL, "|");
245 }
246 return;
247 }
248
249 //
250 // ====== Part B: Handle opening data files ===
251 //
252
253 // If it is time to open new data file, close the current one
254 if (time(NULL) >= TimeForNextFile) {
255 if (DataFile != NULL && fclose(DataFile) != 0) {
256 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
257 }
258 DataFile = NULL;
259 }
260
261 // Open new data file if necessary
262 if (DataFile == NULL) {
263 time_t Time = time(NULL);
264 struct tm *T = localtime(&Time);
265
266 // Get time structure with date rollover
267 if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
268 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
269
270 // Create direcory if not existing (ignore error if already existing)
271 char *Dir;
272 if (asprintf(&Dir, "%s/%d%02d",DataDir, T->tm_year+1900, T->tm_mon + 1) == -1) {
273 State(FATAL, "asprintf() failed, could not create direcory name");
274 }
275 if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) {
276 State(FATAL, "Could not create directory '%s' (%s)", Dir, strerror(errno));
277 }
278
279 // Create filename
280 free(Filename);
281 if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) {
282 State(FATAL, "asprintf() failed, could not create filename");
283 }
284 free(Dir);
285
286 // Open file
287 if ((DataFile = fopen(Filename, "a")) == NULL) {
288 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
289 }
290 else State(INFO, "Opened data file '%s'", Filename);
291 DataFilename->updateService(Filename);
292
293 // Calculate time for next file opening
294 T->tm_sec = 0;
295 T->tm_min = 0;
296 T->tm_hour = DATE_ROLLOVER;
297 TimeForNextFile = mktime(T);
298 }
299
300 //
301 // ====== Part C: Handle writing to data file ===
302 //
303
304 // Identify index of service
305 int Service;
306 for (Service=0; Service<List.size(); Service++) if (Info == List[Service].DataItem) break;
307 if (Service == List.size()) return; // Service not found
308
309 // If negative value for absolute change, do not write to file
310 if (List[Service].MinAbsChange >= 0) {
311 // Write data header
312 time_t RawTime = Info->getTimestamp();
313 struct tm *TM = localtime(&RawTime);
314
315 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
316
317 // Translate data into ASCII
318 char *Text = EvidenceServer::ToString(Info);
319
320 if (Text != NULL) {
321 // Replace new line by '\' and all other control characters by white space
322 for (int i=0; i<strlen(Text); i++) {
323 if (Text[i] == '\n') Text[i] = '\\';
324 else if (iscntrl(Text[i])) Text[i] = ' ';
325 }
326
327 // Write to file
328 fprintf(DataFile, "%s\n", Text);
329
330 free(Text);
331 }
332 else fprintf(DataFile, "Cannot interpret format identifier\n");
333
334 // Terminate if error because otherwise infinite loop might result as
335 // next call to this infoHandler() will try to (re-)open file
336 if(ferror(DataFile)) {
337 fclose(DataFile);
338 DataFile = NULL;
339 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
340 }
341
342 // Update datafile size service (not every time to avoid infinite loop)
343 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
344 fflush(DataFile); // not continuously to reduce load
345
346 DataSizekB = FileSize(DataFile);
347 DataSizeService->updateService();
348 DataSizeLastUpdate = time(NULL);
349 }
350 } // Check for disk writing
351
352 //
353 // ====== Part D: Handle history service ===
354 //
355
356 if (Info->getSize() == 0) return;
357
358 // Check if data should be added to history buffer
359 if (strcmp(Info->getFormat(),"C") != 0 && strlen(Info->getFormat())==1) {
360 // Calculate sum of all number in array
361 char *Text = EvidenceServer::ToString(Info);
362 char *Token = strtok(Text, " ");
363 double Sum = 0;
364 while (Token != NULL) {
365 Sum += atof(Token);
366 Token = strtok(NULL, " ");
367 }
368 free(Text);
369 // Minimum change?
370 if (fabs(Sum-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) return;
371 List[Service].LastValue = Sum;
372 }
373
374 // Check if data fits into buffer
375 if (List[Service].HistSize < Info->getSize() + 5*sizeof(int)) return;
376
377 int Size = Info->getSize() + 4*sizeof(int), Next = List[Service].Next;
378 void *WrapPos = NULL;
379 char *Buffer = List[Service].Buffer;
380 int Oldest = *(int *) Buffer;
381
382 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
383 if (Next + Size >= List[Service].HistSize) {
384 WrapPos = Buffer + Next;
385 Next = 4;
386 }
387
388 // Adapt pointer to oldest entry
389 while ((Oldest < Next + Size) &&
390 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
391 // Check for wrap-around
392 if (memcmp(Buffer + Oldest, WrapMark, sizeof(WrapMark)) == 0) {
393 Oldest = 4;
394 continue;
395 }
396 // Check if end marker reached, then only one event fits buffer
397 if (memcmp(Buffer + Oldest, EndMark, sizeof(EndMark)) == 0) {
398 Oldest = Next;
399 break;
400 }
401 // Move to next entry
402 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
403 }
404 // Update pointer in buffer
405 *(int *) Buffer = Oldest;
406
407 // Write wrap mark if necessary
408 if (WrapPos != NULL) memcpy(WrapPos, WrapMark, sizeof(WrapMark));
409
410 // Copy data into ring buffer
411 *((int *) (Buffer + Next)) = Info->getTimestamp();
412 *((int *) (Buffer + Next + sizeof(int))) = Info->getSize();
413 memcpy(Buffer + Next + 2*sizeof(int), Info->getData(), Info->getSize());
414
415 // Adjust pointer for next entry and write end marker to buffer
416 Next += Info->getSize() + 2*sizeof(int);
417 memcpy(Buffer + Next, EndMark, sizeof(EndMark));
418
419 List[Service].Next = Next;
420}
421
422//
423// Implementation of log writing
424//
425void DataHandler::commandHandler() {
426
427 if (getCommand() != LogCommand || LogFile == NULL) return;
428
429 // Replace all carriage-return by line feed and non-printable characters
430 char *Text = getCommand()->getString();
431 for (unsigned int i=0; i<strlen(Text); i++) {
432 if (Text[i] == '\r') Text[i] = '\n';
433 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
434 }
435
436 time_t RawTime = time(NULL);
437 struct tm *TM = localtime(&RawTime);
438
439 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
440 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
441 TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
442
443 fflush(LogFile);
444
445 // If error close file (otherwise infinite loop because State() also writes to log)
446 if(ferror(LogFile)) {
447 fclose(LogFile);
448 LogFile = NULL;
449 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
450 }
451
452 // Update logfile size service
453 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
454 LogSizekB = FileSize(LogFile);
455 LogSizeService->updateService();
456 LogSizeLastUpdate = time(NULL);
457 }
458}
459
460
461//
462// Add service to watch list
463//
464void DataHandler::AddService(string Name) {
465
466 // Do not subscribe to history services (otherwise infinite loop)
467 if (Name.find(".hist") != string::npos) return;
468
469 // Check if already subscribed to this service
470 for (int i=0; i<List.size(); i++) {
471 if (Name == List[i].DataItem->getName()) return;
472 }
473
474 // Set minimum required change by comparing to regular expressions
475 struct Item New;
476 New.MinAbsChange = 0;
477 New.HistSize = 0;
478 for (int i=0; i<RegExCount; i++) {
479 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) {
480 New.MinAbsChange = MinChange[i];
481 if (HistSize[i] != 0) New.HistSize = HistSize[i];
482 }
483 }
484
485 // At least 3*sizeof(int)
486 if (New.HistSize < MIN_HIST_SIZE) New.HistSize = MIN_HIST_SIZE;
487
488 // Create history service
489 New.Buffer = new char [New.HistSize];
490 memset(New.Buffer, 0, New.HistSize);
491 *(int *) New.Buffer = 4;
492 New.Next = 4;
493 New.LastValue = DBL_MAX;
494 New.HistService = new DimService ((Name+".hist").c_str(), (char *) "C",
495 New.Buffer, New.HistSize);
496
497 // Load history buffer from file if existing
498 string Filename = New.HistService->getName();
499 for (int j=0; j<Filename.size(); j++) if (Filename[j] == '/') Filename[j] = '_';
500 FILE *File = fopen((HistDir + "/" + Filename).c_str(), "rb");
501 if (File != NULL) {
502 // Only load if current buffer size if equal or larger
503 if (FileSize(File) <= New.HistSize*sizeof(char)+sizeof(New.Next) && FileSize(File) != -1) {
504 fread(&New.Next, sizeof(New.Next), 1, File);
505 fread(New.Buffer, sizeof(char), New.HistSize, File);
506 fclose(File);
507 }
508 }
509
510 // Subscribe to service
511 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
512
513 // Add item to list
514 List.push_back(New);
515}
516
517
518//
519// Remove service from watch list
520//
521void DataHandler::RemoveService(string Name) {
522
523 // Find service index
524 vector<struct Item>::iterator E;
525 for (E=List.begin(); E<List.end(); ++E) if (Name == (*E).DataItem->getName()) {
526 // Delete subscription first so handler and not called anymore
527 delete (*E).DataItem;
528
529 // Save history buffer (replace '/' by '_')
530 string Name = (*E).HistService->getName();
531 for (int j=0; j<Name.size(); j++) if (Name[j] == '/') Name[j] = '_';
532 FILE *File = fopen((HistDir + "/" + Name).c_str(), "wb");
533 if (File != NULL) {
534 fwrite(&(*E).Next, sizeof((*E).Next), 1, File);
535 fwrite((*E).Buffer, sizeof(char), (*E).HistSize, File);
536 fclose(File);
537 }
538
539 // Delete history service and free memory
540 delete (*E).HistService;
541 delete[] (*E).Buffer;
542 List.erase(E);
543 }
544}
545
546
547//
548// Determine size of file in kB
549//
550float DataHandler::FileSize(FILE *File) {
551
552 struct stat FileStatus;
553
554 if (fstat(fileno(File), &FileStatus) == -1) {
555 State(WARN, "Could not determine size of file (%s)", strerror(errno));
556 return -1;
557 }
558
559 return (float) FileStatus.st_size/1024;
560}
561
562//
563// Main program
564//
565int main() {
566
567 // Static ensures calling of destructor by exit()
568 static DataHandler Data;
569
570 // Sleep until signal caught
571 pause();
572}
Note: See TracBrowser for help on using the repository browser.