source: Evidence/DColl.cc@ 215

Last change on this file since 215 was 213, checked in by ogrimm, 15 years ago
History read from file if not found in memory
File size: 17.1 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated.
8 - A history of events is kept within a ring buffer for each service. Each
9 entry will be the result of a conversion to double of the text
10 written to the data file. Only if the new value has changed by a
11 minimum amout it will be added to the ring buffer. The history is
12 available via an rpc call.
13 - The history buffers are written to disk at program termination and
14 are tired to be read when adding a service.
15 - The command 'DColl/Log' writes the associated string to the log
16 file specified in the configuration.
17
18 Oliver Grimm, May 2010
19
20\********************************************************************/
21
22#define SERVER_NAME "DColl"
23
24#define MIN_HIST_SIZE 1024 // Minimum history buffer in bytes (> 3*sizeof(int) !)
25
26#include "Evidence.h"
27
28#include <string>
29#include <sstream>
30#include <vector>
31#include <iomanip>
32
33#include <math.h>
34#include <float.h>
35#include <sys/stat.h>
36#include <ctype.h>
37#include <sys/types.h>
38#include <regex.h>
39
40using namespace std;
41
42//
43// Class declaration
44//
45class DataHandler: public DimRpc, public DimClient, public DimBrowser,
46 public EvidenceServer {
47
48 struct Item {
49 DimStampedInfo *DataItem;
50 char *Buffer;
51 unsigned int HistSize;
52 int Next;
53 double LastValue;
54 double MinAbsChange;
55 };
56 vector<struct Item> List;
57
58 DimCommand *LogCommand;
59
60 FILE *DataFile;
61 FILE *LogFile;
62 char *Filename;
63 float DataSizekB, LogSizekB;
64 int DataSizeLastUpdate, LogSizeLastUpdate;
65 char *DataDir;
66 DimService *LogSizeService, *DataSizeService, *DataFilename;
67 string HistDir;
68 int SizeUpdateDelay;
69 int TimeForNextFile;
70 int RollOver;
71
72 int RegExCount;
73 regex_t *RegEx;
74 double *MinChange;
75 unsigned int *HistSize;
76
77 void infoHandler();
78 void rpcHandler();
79 void commandHandler();
80 void AddService(string);
81 void RemoveService(string);
82 off_t FileSize(FILE *);
83 FILE *OpenHistFile(string, const char *);
84
85 public:
86 DataHandler();
87 ~DataHandler();
88};
89
90//
91// Constructor
92//
93DataHandler::DataHandler(): DimRpc("ServiceHistory", "C", "C"), EvidenceServer(SERVER_NAME) {
94
95 // Initialization to prevent freeing unallocated memory
96 DataFile = NULL;
97 LogFile = NULL;
98 Filename = NULL;
99
100 LogSizeService = NULL;
101 DataSizeService = NULL;
102
103 DataSizeLastUpdate = 0;
104 LogSizeLastUpdate = 0;
105 TimeForNextFile = 0;
106
107 // Request configuration data
108 DataDir = GetConfig("datadir");
109 SizeUpdateDelay = atoi(GetConfig("sizeupdate"));
110 HistDir = GetConfig("histdir");
111 RollOver = atoi(GetConfig("rollover"));
112
113 // Open log file
114 char *Logname = GetConfig("logfile");
115 if ((LogFile = fopen(Logname, "a")) == NULL) {
116 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
117 }
118
119 // Provide logging command
120 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
121
122 // Create services for file sizes and data file name
123 DataSizekB = 0;
124 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
125
126 LogSizekB = FileSize(LogFile)/1024.0;
127 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
128
129 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
130
131 // Count how many minimum change regular expressions are present
132 char *Change = GetConfig("items");
133 RegExCount = 0;
134 char *Token = strtok(Change, "\t ");
135 while (Token != NULL) {
136 RegExCount++;
137 Token = strtok(NULL, "\t ");
138 }
139
140 // Allocate memory for regular expressions, minimum change and history size
141 RegEx = new regex_t [RegExCount];
142 MinChange = new double [RegExCount];
143 HistSize = new unsigned int [RegExCount];
144
145 // Compile regular expressions and extract minimum change and history size
146 int Pos = 0;
147 for (int i=0; i<RegExCount; i++) {
148 int Len = strlen(Change+Pos) + 1;
149 Token = strtok(Change + Pos, ": \t");
150
151 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
152 if (Ret != 0) {
153 char ErrMsg[200];
154 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
155 RegExCount--;
156 i--;
157 State(ERROR, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
158 }
159 else {
160 if ((Token=strtok(NULL, ": \t")) != NULL) MinChange[i] = atof(Token);
161 else MinChange[i] = 0;
162
163 if ((Token=strtok(NULL, "")) != NULL) HistSize[i] = atoi(Token)*1024;
164 else HistSize[i] = 0;
165 }
166 Pos += Len;
167 }
168
169 // Subscribe to list of servers at DIS_DNS
170 AddService("DIS_DNS/SERVER_LIST");
171}
172
173//
174// Destructor: Close files and free memory
175//
176DataHandler::~DataHandler() {
177
178 // Delete all DIM subscriptions
179 while (List.size() != 0) RemoveService(List[0].DataItem->getName());
180
181 delete LogCommand;
182 delete DataFilename;
183 delete[] Filename;
184
185 delete LogSizeService;
186 delete DataSizeService;
187
188 // Close files
189 if (LogFile != NULL) if (fclose(LogFile) != 0) {
190 State(ERROR, "Could not close log file (%s)", strerror(errno));
191 }
192 if (DataFile != NULL && fclose(DataFile) != 0) {
193 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
194 }
195
196 // Free memory for regular expressions handling
197 for (int i=0; i<RegExCount; i++) {
198 regfree(&RegEx[i]);
199 }
200 delete[] MinChange;
201 delete[] RegEx;
202}
203
204//
205// Implementation of data handling
206//
207// DIM ensures infoHandler() is called serialized, therefore
208// no mutex is needed to serialize writing to the file
209void DataHandler::infoHandler() {
210
211 static const int WrapMark[] = {0, -1};
212 static const int EndMark[] = {0, 0};
213
214 DimInfo *Info = getInfo();
215
216 // Check if service available
217 if (!ServiceOK(Info)) return;
218
219 //
220 // ====== Part A: Handle service subscriptions ===
221 //
222 // Services are added here, removal only in constructor.
223
224 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
225 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
226 char *Token = strtok(Info->getString(), "+-!@");
227 while (Token != NULL) {
228 AddService(string(Token)+"/SERVICE_LIST"); // 'add' also for '-' and '!'
229 Token = strtok(NULL, "|"); // Skip server IP address
230 Token = strtok(NULL, "@");
231 }
232 return;
233 }
234
235 // If service is SERVICE_LIST of any server, scan all services.
236 // Subscribe to all services (but not to commands and RPCs)
237 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
238 char *Name = strtok(Info->getString(), "+-!|");
239 while (Name != NULL) {
240 // Check if item is a service
241 char *Type = strtok(NULL, "\n");
242 if (Type == NULL) return; // for safety, should not happen
243 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) AddService(Name); // 'add' also for '-' and '!'
244 Name = strtok(NULL, "|");
245 }
246 return;
247 }
248
249 //
250 // ====== Part B: Handle opening data files ===
251 //
252
253 // If it is time to open new data file, close the current one
254 if (time(NULL) >= TimeForNextFile) {
255 if (DataFile != NULL && fclose(DataFile) != 0) {
256 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
257 }
258 DataFile = NULL;
259 }
260
261 // Open new data file if necessary
262 if (DataFile == NULL) {
263 time_t Time = time(NULL);
264 struct tm *T = localtime(&Time);
265
266 // Get time structure with date rollover
267 if (T->tm_hour >= RollOver) T->tm_mday++;
268 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
269
270 // Create direcory if not existing (ignore error if already existing)
271 char *Dir;
272 if (asprintf(&Dir, "%s/%d%02d",DataDir, T->tm_year+1900, T->tm_mon + 1) == -1) {
273 State(FATAL, "asprintf() failed, could not create direcory name");
274 }
275 if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) {
276 State(FATAL, "Could not create directory '%s' (%s)", Dir, strerror(errno));
277 }
278
279 // Create filename
280 free(Filename);
281 if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) {
282 State(FATAL, "asprintf() failed, could not create filename");
283 }
284 free(Dir);
285
286 // Open file
287 if ((DataFile = fopen(Filename, "a")) == NULL) {
288 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
289 }
290 else State(INFO, "Opened data file '%s'", Filename);
291 DataFilename->updateService(Filename);
292
293 // Calculate time for next file opening
294 T->tm_sec = 0;
295 T->tm_min = 0;
296 T->tm_hour = RollOver;
297 TimeForNextFile = mktime(T);
298 }
299
300 //
301 // ====== Part C: Handle writing to data file ===
302 //
303
304 // Identify index of service
305 int Service;
306 for (Service=0; Service<List.size(); Service++) if (Info == List[Service].DataItem) break;
307 if (Service == List.size()) return; // Service not found
308
309 // If negative value for absolute change, do not write to file
310 if (List[Service].MinAbsChange >= 0) {
311 // Write data header
312 time_t RawTime = Info->getTimestamp();
313 struct tm *TM = localtime(&RawTime);
314
315 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
316
317 // Translate data into ASCII
318 char *Text = EvidenceServer::ToString(Info);
319
320 if (Text != NULL) {
321 // Replace new line by '\' and all other control characters by white space
322 for (int i=0; i<strlen(Text); i++) {
323 if (Text[i] == '\n') Text[i] = '\\';
324 else if (iscntrl(Text[i])) Text[i] = ' ';
325 }
326
327 // Write to file
328 fprintf(DataFile, "%s\n", Text);
329
330 free(Text);
331 }
332 else fprintf(DataFile, "Cannot interpret format identifier\n");
333
334 // Terminate if error because otherwise infinite loop might result as
335 // next call to this infoHandler() will try to (re-)open file
336 if(ferror(DataFile)) {
337 fclose(DataFile);
338 DataFile = NULL;
339 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
340 }
341
342 // Update datafile size service (not every time to avoid infinite loop)
343 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
344 fflush(DataFile); // not continuously to reduce load
345
346 DataSizekB = FileSize(DataFile)/1024.0;
347 DataSizeService->updateService();
348 DataSizeLastUpdate = time(NULL);
349 }
350 } // Check for disk writing
351
352 //
353 // ====== Part D: Handle history service ===
354 //
355
356 if (Info->getSize() == 0) return;
357
358 // Check if data should be added to history buffer
359 if (strcmp(Info->getFormat(),"C") != 0 && strlen(Info->getFormat())==1) {
360 // Calculate sum of all number in array
361 char *Text = EvidenceServer::ToString(Info);
362 char *Token = strtok(Text, " ");
363 double Sum = 0;
364 while (Token != NULL) {
365 Sum += atof(Token);
366 Token = strtok(NULL, " ");
367 }
368 free(Text);
369 // Minimum change?
370 if (fabs(Sum-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) return;
371 List[Service].LastValue = Sum;
372 }
373
374 // Check if data fits into buffer
375 if (List[Service].HistSize < Info->getSize() + 5*sizeof(int)) return;
376
377 int Size = Info->getSize() + 4*sizeof(int), Next = List[Service].Next;
378 void *WrapPos = NULL;
379 char *Buffer = List[Service].Buffer;
380 int Oldest = *(int *) Buffer;
381
382 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
383 if (Next + Size >= List[Service].HistSize) {
384 WrapPos = Buffer + Next;
385 Next = 4;
386 }
387
388 // Adapt pointer to oldest entry
389 while ((Oldest < Next + Size) &&
390 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
391 // Check for wrap-around
392 if (memcmp(Buffer + Oldest, WrapMark, sizeof(WrapMark)) == 0) {
393 Oldest = 4;
394 continue;
395 }
396 // Check if end marker reached, then only one event fits buffer
397 if (memcmp(Buffer + Oldest, EndMark, sizeof(EndMark)) == 0) {
398 Oldest = Next;
399 break;
400 }
401 // Move to next entry
402 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
403 }
404 // Update pointer in buffer
405 *(int *) Buffer = Oldest;
406
407 // Write wrap mark if necessary
408 if (WrapPos != NULL) memcpy(WrapPos, WrapMark, sizeof(WrapMark));
409
410 // Copy data into ring buffer
411 *((int *) (Buffer + Next)) = Info->getTimestamp();
412 *((int *) (Buffer + Next + sizeof(int))) = Info->getSize();
413 memcpy(Buffer + Next + 2*sizeof(int), Info->getData(), Info->getSize());
414
415 // Adjust pointer for next entry and write end marker to buffer
416 Next += Info->getSize() + 2*sizeof(int);
417 memcpy(Buffer + Next, EndMark, sizeof(EndMark));
418
419 List[Service].Next = Next;
420}
421
422//
423// Implementation of log writing
424//
425void DataHandler::commandHandler() {
426
427 if (getCommand() != LogCommand || LogFile == NULL) return;
428
429 // Replace all carriage-return by line feed and non-printable characters
430 char *Text = getCommand()->getString();
431 for (unsigned int i=0; i<strlen(Text); i++) {
432 if (Text[i] == '\r') Text[i] = '\n';
433 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
434 }
435
436 time_t RawTime = time(NULL);
437 struct tm *TM = localtime(&RawTime);
438
439 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
440 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
441 TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
442
443 fflush(LogFile);
444
445 // If error close file (otherwise infinite loop because State() also writes to log)
446 if(ferror(LogFile)) {
447 fclose(LogFile);
448 LogFile = NULL;
449 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
450 }
451
452 // Update logfile size service
453 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
454 LogSizekB = FileSize(LogFile)/1024.0;
455 LogSizeService->updateService();
456 LogSizeLastUpdate = time(NULL);
457 }
458}
459
460
461//
462// Implementation of history buffer distribution
463//
464void DataHandler::rpcHandler() {
465
466 // Search for history buffer
467 for (int i=0; i<List.size(); i++) {
468 if (strcmp(List[i].DataItem->getName(), getString()) == 0) {
469 setData((void *) List[i].Buffer, List[i].HistSize);
470 return;
471 }
472 }
473
474 // Try to open history file if not found in memory
475 FILE *File = OpenHistFile(getString(), "rb");
476 if (File == NULL) {
477 setData(NULL, 0);
478 return;
479 }
480
481 // Read history file
482 off_t Size = FileSize(File);
483 if (Size != -1) {
484 char *Buffer = new char [Size-sizeof(int)];
485 fseek(File, sizeof(int), SEEK_SET);
486 fread(Buffer, sizeof(char), Size-sizeof(int), File);
487 if (ferror(File) != 0) {
488 State(WARN, "Error reading history file '%s' in rpcHandler()", getString());
489 setData(NULL, 0); // Default response
490 }
491 else setData((void *) Buffer, Size);
492 delete[] Buffer;
493 }
494
495 if (fclose(File) != 0) State(WARN, "Error closing history file '%s' in rpcHandler()", getString());
496}
497
498
499//
500// Add service to watch list
501//
502void DataHandler::AddService(string Name) {
503
504 // Check if already subscribed to this service
505 for (int i=0; i<List.size(); i++) {
506 if (Name == List[i].DataItem->getName()) return;
507 }
508
509 // Set minimum required change by comparing to regular expressions
510 struct Item New;
511 New.MinAbsChange = 0;
512 New.HistSize = 0;
513 for (int i=0; i<RegExCount; i++) {
514 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) {
515 New.MinAbsChange = MinChange[i];
516 if (HistSize[i] != 0) New.HistSize = HistSize[i];
517 }
518 }
519
520 // At least 3*sizeof(int)
521 if (New.HistSize < MIN_HIST_SIZE) New.HistSize = MIN_HIST_SIZE;
522
523 // Create history service
524 New.Buffer = new char [New.HistSize];
525 memset(New.Buffer, 0, New.HistSize);
526 *(int *) New.Buffer = 4;
527 New.Next = 4;
528 New.LastValue = DBL_MAX;
529
530 // Load history buffer from file if existing
531 FILE *File = OpenHistFile(Name, "rb");
532 if (File != NULL) {
533 // Only load if current buffer size is equal or larger
534 if (FileSize(File) <= New.HistSize*sizeof(char)+sizeof(New.Next) && FileSize(File) != -1) {
535 fread(&New.Next, sizeof(New.Next), 1, File);
536 fread(New.Buffer, sizeof(char), New.HistSize, File);
537 if (ferror(File) != 0) State(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
538 if (fclose(File) != 0) State(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
539 }
540 }
541
542 // Subscribe to service
543 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
544
545 // Add item to list
546 List.push_back(New);
547}
548
549
550//
551// Remove service from watch list
552//
553void DataHandler::RemoveService(string Name) {
554
555 // Find service index
556 vector<struct Item>::iterator E;
557 for (E=List.begin(); E<List.end(); ++E) if (Name == (*E).DataItem->getName()) {
558 // Delete subscription first so handler and not called anymore
559 delete (*E).DataItem;
560
561 // Save history buffer
562 FILE *File = OpenHistFile(Name, "wb");
563 if (File != NULL) {
564 fwrite(&(*E).Next, sizeof((*E).Next), 1, File);
565 fwrite((*E).Buffer, sizeof(char), (*E).HistSize, File);
566 if (ferror(File) != 0) State(WARN, "Error writing history file '%s' in RemoveService()", Name.c_str());
567 if (fclose(File) != 0) State(WARN, "Error closing history file '%s' in RemoveService()", Name.c_str());;
568 }
569
570 // Delete history service and free memory
571 delete[] (*E).Buffer;
572 List.erase(E);
573 }
574}
575
576
577//
578// Determine size of file in kB
579//
580off_t DataHandler::FileSize(FILE *File) {
581
582 struct stat FileStatus;
583
584 if (fstat(fileno(File), &FileStatus) == -1) {
585 State(WARN, "Could not determine size of file (%s)", strerror(errno));
586 return -1;
587 }
588
589 return FileStatus.st_size;
590}
591
592//
593// Replace all '/' by '_' in string
594//
595FILE *DataHandler::OpenHistFile(string Service, const char *Mode) {
596
597 for (int i=0; i<Service.size(); i++) if (Service[i] == '/') Service[i] = '_';
598 return fopen((HistDir + "/" + Service).c_str(), Mode);
599}
600
601//
602// Main program
603//
604int main() {
605
606 // Static ensures calling of destructor by exit()
607 static DataHandler Data;
608
609 // Sleep until signal caught
610 pause();
611}
Note: See TracBrowser for help on using the repository browser.