source: Evidence/DColl.cc@ 163

Last change on this file since 163 was 160, checked in by ogrimm, 15 years ago
Data written into one directory per month
File size: 15.2 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated, with roll-over at 13:00 local time.
8 - For each service, it creates a new service with '.hist' appended
9 that contains a history of events kept within a ring buffer. Each
10 entry will be the result of a conversion to double of the text
11 written to the data file. Only if the new value has changed by a
12 minimum amout it will be added to the ring buffer.
13 - The history buffers are written to disk at program termination and
14 are tired to be read when adding a service.
15 - The command 'DColl/Log' writes the associated string to the log
16 file specified in the configuration.
17
18 Oliver Grimm, December 2009
19
20\********************************************************************/
21
22#define SERVER_NAME "DColl"
23
24#define DATE_ROLLOVER 12 // localtime hour after which next date is used
25
26#include "Evidence.h"
27
28#include <string>
29#include <sstream>
30#include <vector>
31#include <iomanip>
32
33#include <math.h>
34#include <float.h>
35#include <sys/stat.h>
36#include <ctype.h>
37#include <sys/types.h>
38#include <regex.h>
39
40using namespace std;
41
42//
43// Class declaration
44//
45class DataHandler: public DimClient, public DimBrowser,
46 public EvidenceServer {
47
48 struct Item {
49 DimStampedInfo *DataItem;
50 DimService *HistService;
51 char *Buffer;
52 int Next;
53 double LastValue;
54 double MinAbsChange;
55 };
56 vector<struct Item> List;
57
58 DimCommand *LogCommand;
59
60 FILE *DataFile;
61 FILE *LogFile;
62 char *Filename;
63 float DataSizekB, LogSizekB;
64 int DataSizeLastUpdate, LogSizeLastUpdate;
65 char *DataDir;
66 DimService *LogSizeService, *DataSizeService, *DataFilename;
67 string HistDir;
68 int HistSize;
69 int SizeUpdateDelay;
70 int TimeForNextFile;
71
72 int RegExCount;
73 regex_t *RegEx;
74 double *MinChange;
75
76 void infoHandler();
77 void commandHandler();
78 void AddService(string);
79 float FileSize(FILE *);
80
81 public:
82 DataHandler();
83 ~DataHandler();
84};
85
86//
87// Constructor
88//
89DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
90
91 // Initialization to prevent freeing unallocated memory
92 DataFile = NULL;
93 LogFile = NULL;
94 Filename = NULL;
95
96 LogSizeService = NULL;
97 DataSizeService = NULL;
98
99 DataSizeLastUpdate = 0;
100 LogSizeLastUpdate = 0;
101 TimeForNextFile = 0;
102
103 // Request configuration data
104 char *Change = GetConfig(SERVER_NAME " minchange");
105 DataDir = GetConfig(SERVER_NAME " datadir");
106 char *Logname = GetConfig(SERVER_NAME " logfile");
107 SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
108 HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
109 if (HistSize < 3*sizeof(int)) HistSize = 3*sizeof(int);
110 HistDir = GetConfig(SERVER_NAME " histdir");
111
112 // Open log file
113 if ((LogFile = fopen(Logname, "a")) == NULL) {
114 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
115 }
116
117 // Provide logging command
118 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
119
120 // Create services for file sizes and data file name
121 DataSizekB = 0;
122 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
123
124 LogSizekB = FileSize(LogFile);
125 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
126
127 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
128
129 // Count how many minimum change value regular expressions are present
130 RegExCount = 0;
131 char *Token = strtok(Change, "\t ");
132 while (Token != NULL) {
133 RegExCount++;
134 Token = strtok(NULL, "\t ");
135 }
136
137 // Allocate memory for regular expressions and minimum change values
138 RegEx = new regex_t[RegExCount];
139 MinChange = new double [RegExCount];
140
141 // Compile regular expressions
142 int Pos = 0;
143 for (int i=0; i<RegExCount; i++) {
144 int Len = strlen(Change+Pos) + 1;
145 Token = strtok(Change + Pos, ": \t");
146
147 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
148 if (Ret != 0) {
149 char ErrMsg[200];
150 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
151 State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
152 }
153 else {
154 if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
155 else MinChange[i] = 0;
156 }
157 Pos += Len;
158 }
159
160 // Subscribe to list of servers at DIS_DNS
161 AddService("DIS_DNS/SERVER_LIST");
162}
163
164//
165// Destructor: Close files and free memory
166//
167DataHandler::~DataHandler() {
168
169 // Delete DIM subscriptions and command first so handlers and not called anymore
170 for (int i=0; i<List.size(); i++) {
171 delete List[i].DataItem;
172 }
173 delete LogCommand;
174
175 // Save history buffers to files (replace '/' by '_')
176 State(INFO, "Writing history buffers to files");
177
178 for (int i=0; i<List.size(); i++) {
179 string Name = List[i].HistService->getName();
180 for (int j=0; j<Name.size(); j++) if (Name[j] == '/') Name[j] = '_';
181 FILE *File = fopen((HistDir + "/" + Name).c_str(), "wb");
182 if (File != NULL) {
183 fwrite(&List[i].Next, sizeof(List[i].Next), 1, File);
184 fwrite(List[i].Buffer, sizeof(char), HistSize, File);
185 fclose(File);
186 }
187 delete List[i].HistService;
188 delete[] List[i].Buffer;
189 }
190
191 delete DataFilename;
192 delete[] Filename;
193
194 //delete LogSizeService; // These create segmentation faults?!
195 //delete DataSizeService;
196
197 // Close files
198 if (LogFile != NULL) if (fclose(LogFile) != 0) {
199 State(ERROR, "Could not close log file (%s)", strerror(errno));
200 }
201 if (DataFile != NULL && fclose(DataFile) != 0) {
202 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
203 }
204
205 // Free memory for regular expressions handling
206 for (int i=0; i<RegExCount; i++) {
207 regfree(&RegEx[i]);
208 }
209 delete[] MinChange;
210 delete[] RegEx;
211}
212
213//
214// Implementation of data handling
215//
216// DIM ensures infoHandler() is called serialized, therefore
217// no mutex is needed to serialize writing to the file
218void DataHandler::infoHandler() {
219
220 static const int WrapMark[] = {0, -1};
221 static const int EndMark[] = {0, 0};
222
223 DimInfo *Info = getInfo();
224
225 // Check if service available
226 if (!ServiceOK(Info)) return;
227
228 //
229 // ====== Part A: Handle service subscriptions ===
230 //
231
232 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
233 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
234 char *Token = strtok(Info->getString(), "@");
235 while (Token != NULL) {
236 if (isalpha(*Token) == 0) Token++; // Name can start with +,-,!
237
238 AddService(string(Token)+"/SERVICE_LIST");
239
240 Token = strtok(NULL, "|");
241 Token = strtok(NULL, "|@"); // ???? Why needed ?????
242 }
243 return;
244 }
245
246 // If service is SERVICE_LIST of any server, scan all services.
247 // Subscribe to all services (but not to commands and RPCs)
248 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
249
250 char *Name = strtok(Info->getString(), "|");
251 while (Name != NULL) {
252 char *Type = strtok(NULL, "\n");
253 if (Type == NULL) return; // for safety, should not happen
254 if (isalpha(*Name) == 0) Name++; // Name can start with +,-,!
255 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
256 AddService(Name);
257 }
258 Name = strtok(NULL, "|");
259 }
260 return;
261 }
262
263 //
264 // ====== Part B: Handle opening data files ===
265 //
266
267 // If it is time to open new data file, close the current one
268 if (time(NULL) >= TimeForNextFile) {
269 if (DataFile != NULL && fclose(DataFile) != 0) {
270 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
271 }
272 DataFile = NULL;
273 }
274
275 // Open new data file if necessary
276 if (DataFile == NULL) {
277 time_t Time = time(NULL);
278 struct tm *T = localtime(&Time);
279
280 // Get time structure with date rollover
281 if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
282 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
283
284 // Create direcory if not existing (ignore error if already existing)
285 char *Dir;
286 if (asprintf(&Dir, "%s/%d%02d",DataDir, T->tm_year+1900, T->tm_mon + 1) == -1) {
287 State(FATAL, "asprintf() failed, could not create direcory name");
288 }
289 if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) {
290 State(FATAL, "Could not create direcory '%s' (%s)", Dir, strerror(errno));
291 }
292
293 // Create filename
294 free(Filename);
295 if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) {
296 State(FATAL, "asprintf() failed, could not create filename");
297 }
298 free(Dir);
299
300 // Open file
301 if ((DataFile = fopen(Filename, "a")) == NULL) {
302 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
303 }
304 else State(INFO, "Opened data file '%s'", Filename);
305 DataFilename->updateService(Filename);
306
307 // Calculate time for next file opening
308 T->tm_sec = 0;
309 T->tm_min = 0;
310 T->tm_hour = DATE_ROLLOVER;
311 TimeForNextFile = mktime(T);
312 }
313
314 //
315 // ====== Part C: Handle writing to data file ===
316 //
317
318 // Identify index of service
319 int Service;
320 for (Service=0; Service<List.size(); Service++) if (Info == List[Service].DataItem) break;
321 if (Service == List.size()) return; // Service not found
322
323 // If negative value for absolute change, do not write to file
324 if (List[Service].MinAbsChange >= 0) {
325
326 // Write data header
327 time_t RawTime = Info->getTimestamp();
328 struct tm *TM = localtime(&RawTime);
329
330 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
331
332 // Translate data into ASCII
333 char *Text = EvidenceServer::ToString(Info);
334
335 if (Text != NULL) {
336 // Replace all control characters by white space
337 for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
338
339 // Write to file
340 fprintf(DataFile, "%s\n", Text);
341
342 free(Text);
343 }
344 else fprintf(DataFile, "Cannot interpret format identifier\n");
345
346 // Terminate if error because otherwise infinite loop might result as
347 // next call to this infoHandler() will try to (re-)open file
348 if(ferror(DataFile)) {
349 fclose(DataFile);
350 DataFile = NULL;
351 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
352 }
353
354 // Update datafile size service (not every time to avoid infinite loop)
355 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
356 fflush(DataFile); // not continuously to reduce load
357
358 DataSizekB = FileSize(DataFile);
359 DataSizeService->updateService();
360 DataSizeLastUpdate = time(NULL);
361 }
362
363 } // Check for MinAbsChange
364
365 //
366 // ====== Part D: Handle history service ===
367 //
368
369 if (Info->getSize() == 0) return;
370
371 // Check if data should be added to history buffer
372 char *Text = EvidenceServer::ToString(Info);
373 if (Text != NULL && strcmp(Info->getFormat(),"C") != 0
374 && fabs(atof(Text)-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) {
375 free(Text);
376 return;
377 }
378 free(Text);
379
380 // Check if data fits into buffer
381 if (HistSize < Info->getSize() + 5*sizeof(int)) return;
382
383 int Size = Info->getSize() + 4*sizeof(int), Next = List[Service].Next;
384 void *WrapPos = NULL;
385 char *Buffer = List[Service].Buffer;
386 int Oldest = *(int *) Buffer;
387
388 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
389 if (Next + Size >= HistSize) {
390 WrapPos = Buffer + Next;
391 Next = 4;
392 }
393
394 // Adapt pointer to oldest entry
395 while ((Oldest < Next + Size) &&
396 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
397 // Check for wrap-around
398 if (memcmp(Buffer + Oldest, WrapMark, sizeof(WrapMark)) == 0) {
399 Oldest = 4;
400 continue;
401 }
402 // Check if end marker reached, then only one event fits buffer
403 if (memcmp(Buffer + Oldest, EndMark, sizeof(EndMark)) == 0) {
404 Oldest = Next;
405 break;
406 }
407 // Move to next entry
408 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
409 }
410 // Update pointer in buffer
411 *(int *) Buffer = Oldest;
412
413 // Write wrap mark if necessary
414 if (WrapPos != NULL) memcpy(WrapPos, WrapMark, sizeof(WrapMark));
415
416 // Copy data into ring buffer
417 *((int *) (Buffer + Next)) = Info->getTimestamp();
418 *((int *) (Buffer + Next + sizeof(int))) = Info->getSize();
419 memcpy(Buffer + Next + 2*sizeof(int), Info->getData(), Info->getSize());
420
421 // Adjust pointer for next entry and write end marker to buffer
422 Next += Info->getSize() + 2*sizeof(int);
423 memcpy(Buffer + Next, EndMark, sizeof(EndMark));
424
425 List[Service].Next = Next;
426}
427
428//
429// Implementation of log writing
430//
431void DataHandler::commandHandler() {
432
433 if (getCommand() != LogCommand || LogFile == NULL) return;
434
435 // Replace all carriage-return by line feed and non-printable characters
436 char *Text = getCommand()->getString();
437 for (unsigned int i=0; i<strlen(Text); i++) {
438 if (Text[i] == '\r') Text[i] = '\n';
439 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
440 }
441
442 time_t RawTime = time(NULL);
443 struct tm *TM = localtime(&RawTime);
444
445 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
446 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
447 TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
448
449 fflush(LogFile);
450
451 // If error close file (otherwise infinite loop because State() also writes to log)
452 if(ferror(LogFile)) {
453 fclose(LogFile);
454 LogFile = NULL;
455 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
456 }
457
458 // Update logfile size service
459 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
460 LogSizekB = FileSize(LogFile);
461 LogSizeService->updateService();
462 LogSizeLastUpdate = time(NULL);
463 }
464}
465
466
467//
468// Add service to watch list
469//
470void DataHandler::AddService(string Name) {
471
472 // Do not subscribe to history services (otherwise infinite loop)
473 if (Name.find(".hist") != string::npos) return;
474
475 // Check if already subscribed to this service
476 for (int i=0; i<List.size(); i++) {
477 if (Name == List[i].DataItem->getName()) return;
478 }
479
480 struct Item New;
481
482 // Set minimum required change by comparing to regular expressions
483 New.MinAbsChange = 0;
484 for (int i=0; i<RegExCount; i++) {
485 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) {
486 New.MinAbsChange = MinChange[i];
487 }
488 }
489
490 // Create history service
491 New.Buffer = new char [HistSize];
492 memset(New.Buffer, 0, HistSize);
493 *(int *) New.Buffer = 4;
494 New.Next = 4;
495 New.LastValue = DBL_MAX;
496 New.HistService = new DimService ((Name+".hist").c_str(), (char *) "C",
497 New.Buffer, HistSize);
498
499 // Load history buffer from file if existing
500 string Filename = New.HistService->getName();
501 for (int j=0; j<Filename.size(); j++) if (Filename[j] == '/') Filename[j] = '_';
502 FILE *File = fopen((HistDir + "/" + Filename).c_str(), "rb");
503 if (File != NULL) {
504 // Only load if current buffer size if equal or larger
505 if (FileSize(File) <= HistSize*sizeof(char)+sizeof(New.Next) && FileSize(File) != -1) {
506 fread(&New.Next, sizeof(New.Next), 1, File);
507 fread(New.Buffer, sizeof(char), HistSize, File);
508 fclose(File);
509 }
510 }
511
512 // Subscribe to service
513 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
514
515 // Add item to list
516 List.push_back(New);
517}
518
519//
520// Determine size of file in kB
521//
522float DataHandler::FileSize(FILE *File) {
523
524 struct stat FileStatus;
525
526 if (fstat(fileno(File), &FileStatus) == -1) {
527 State(WARN, "Could not determine size of file (%s)", strerror(errno));
528 return -1;
529 }
530
531 return (float) FileStatus.st_size/1024;
532}
533
534//
535// Main program
536//
537int main() {
538
539 // Static ensures calling of destructor by exit()
540 static DataHandler Data;
541
542 // Sleep until signal caught
543 pause();
544}
Note: See TracBrowser for help on using the repository browser.