source: Evidence/DColl.cc@ 216

Last change on this file since 216 was 216, checked in by ogrimm, 15 years ago
Changed service 'Status' to 'Message' for clarity, added client information in log file entries
File size: 17.5 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated.
8 - A history of events is kept within a ring buffer for each service. Each
9 entry will be the result of a conversion to double of the text
10 written to the data file. Only if the new value has changed by a
11 minimum amout it will be added to the ring buffer. The history is
12 available via an rpc call.
13 - The history buffers are written to disk at program termination and
14 are tired to be read when adding a service.
15 - The command 'DColl/Log' writes the associated string to the log file
16
17 Oliver Grimm, May 2010
18
19\********************************************************************/
20
21#define SERVER_NAME "DColl"
22
23#define MIN_HIST_SIZE 1024 // Minimum history buffer in bytes (> 3*sizeof(int) !)
24#define LOG_FILENAME "Evidence.log"
25
26#include "Evidence.h"
27
28#include <string>
29#include <sstream>
30#include <vector>
31#include <iomanip>
32
33#include <math.h>
34#include <float.h>
35#include <sys/stat.h>
36#include <ctype.h>
37#include <sys/types.h>
38#include <regex.h>
39
40using namespace std;
41
42//
43// Class declaration
44//
45class DataHandler: public DimRpc, public DimClient, public DimBrowser,
46 public EvidenceServer {
47
48 struct Item {
49 DimStampedInfo *DataItem;
50 char *Buffer;
51 unsigned int HistSize;
52 int Next;
53 double LastValue;
54 double MinAbsChange;
55 };
56 vector<struct Item> List;
57
58 DimCommand *LogCommand;
59
60 FILE *DataFile;
61 FILE *LogFile;
62 char *Filename;
63 float DataSizeMB, LogSizeMB;
64 int DataSizeLastUpdate, LogSizeLastUpdate;
65 char *BaseDir;
66 DimService *LogSizeService, *DataSizeService, *DataFilename;
67 int SizeUpdateDelay;
68 int TimeForNextFile;
69 int RollOver;
70
71 int RegExCount;
72 regex_t *RegEx;
73 double *MinChange;
74 unsigned int *HistSize;
75
76 void infoHandler();
77 void rpcHandler();
78 void commandHandler();
79 void AddService(string);
80 void RemoveService(string);
81 off_t FileSize(FILE *);
82 FILE *OpenHistFile(string, const char *);
83
84 public:
85 DataHandler();
86 ~DataHandler();
87};
88
89//
90// Constructor
91//
92DataHandler::DataHandler(): DimRpc("ServiceHistory", "C", "C"), EvidenceServer(SERVER_NAME) {
93
94 // Initialization to prevent freeing unallocated memory
95 DataFile = NULL;
96 LogFile = NULL;
97 Filename = NULL;
98
99 LogSizeService = NULL;
100 DataSizeService = NULL;
101
102 DataSizeLastUpdate = 0;
103 LogSizeLastUpdate = 0;
104 TimeForNextFile = 0;
105
106 // Request configuration data
107 BaseDir = GetConfig("basedir");
108 SizeUpdateDelay = atoi(GetConfig("sizeupdate"));
109 RollOver = atoi(GetConfig("rollover"));
110
111 // Open log file
112 if ((LogFile = fopen((string(BaseDir)+"/"+LOG_FILENAME).c_str(), "a")) == NULL) {
113 Message(FATAL, "Could not open log file (%s)", strerror(errno));
114 }
115
116 // Provide logging command
117 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
118
119 // Create services for file sizes and data file name
120 DataSizeMB = 0;
121 DataSizeService = new DimService(SERVER_NAME "/DataSizeMB", DataSizeMB);
122
123 LogSizeMB = FileSize(LogFile)/1024.0/1024.0;
124 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizeMB);
125
126 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
127
128 // Count how many minimum change regular expressions are present
129 char *Change = GetConfig("items");
130 RegExCount = 0;
131 char *Token = strtok(Change, "\t ");
132 while (Token != NULL) {
133 RegExCount++;
134 Token = strtok(NULL, "\t ");
135 }
136
137 // Allocate memory for regular expressions, minimum change and history size
138 RegEx = new regex_t [RegExCount];
139 MinChange = new double [RegExCount];
140 HistSize = new unsigned int [RegExCount];
141
142 // Compile regular expressions and extract minimum change and history size
143 int Pos = 0;
144 for (int i=0; i<RegExCount; i++) {
145 int Len = strlen(Change+Pos) + 1;
146 Token = strtok(Change + Pos, ": \t");
147
148 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
149 if (Ret != 0) {
150 char ErrMsg[200];
151 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
152 RegExCount--;
153 i--;
154 Message(ERROR, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
155 }
156 else {
157 if ((Token=strtok(NULL, ": \t")) != NULL) MinChange[i] = atof(Token);
158 else MinChange[i] = 0;
159
160 if ((Token=strtok(NULL, "")) != NULL) HistSize[i] = atoi(Token)*1024;
161 else HistSize[i] = 0;
162 }
163 Pos += Len;
164 }
165
166 // Subscribe to list of servers at DIS_DNS
167 AddService("DIS_DNS/SERVER_LIST");
168}
169
170//
171// Destructor: Close files and free memory
172//
173DataHandler::~DataHandler() {
174
175 // Delete all DIM subscriptions
176 while (List.size() != 0) RemoveService(List[0].DataItem->getName());
177
178 delete LogCommand;
179 delete DataFilename;
180 delete[] Filename;
181
182 delete LogSizeService;
183 delete DataSizeService;
184
185 // Close files
186 if (LogFile != NULL) if (fclose(LogFile) != 0) {
187 Message(ERROR, "Could not close log file (%s)", strerror(errno));
188 }
189 if (DataFile != NULL && fclose(DataFile) != 0) {
190 Message(ERROR, "Error: Could not close data file (%s)", strerror(errno));
191 }
192
193 // Free memory for regular expressions handling
194 for (int i=0; i<RegExCount; i++) {
195 regfree(&RegEx[i]);
196 }
197 delete[] MinChange;
198 delete[] RegEx;
199}
200
201//
202// Implementation of data handling
203//
204// DIM ensures infoHandler() is called serialized, therefore
205// no mutex is needed to serialize writing to the file
206void DataHandler::infoHandler() {
207
208 DimInfo *Info = getInfo();
209
210 // Check if service available
211 if (!ServiceOK(Info)) return;
212
213 //
214 // ====== Part A: Handle service subscriptions ===
215 //
216 // Services are added here, removal only in constructor.
217
218 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
219 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
220 char *Token = strtok(Info->getString(), "+-!@");
221 while (Token != NULL) {
222 AddService(string(Token)+"/SERVICE_LIST"); // 'add' also for '-' and '!'
223 Token = strtok(NULL, "|"); // Skip server IP address
224 Token = strtok(NULL, "@");
225 }
226 return;
227 }
228
229 // If service is SERVICE_LIST of any server, scan all services.
230 // Subscribe to all services (but not to commands and RPCs)
231 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
232 char *Name = strtok(Info->getString(), "+-!|");
233 while (Name != NULL) {
234 // Check if item is a service
235 char *Type = strtok(NULL, "\n");
236 if (Type == NULL) return; // for safety, should not happen
237 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) AddService(Name); // 'add' also for '-' and '!'
238 Name = strtok(NULL, "|");
239 }
240 return;
241 }
242
243 //
244 // ====== Part B: Handle opening data files ===
245 //
246
247 // If it is time to open new data file, close the current one
248 if (time(NULL) >= TimeForNextFile) {
249 if (DataFile != NULL && fclose(DataFile) != 0) {
250 Message(ERROR, "Error: Could not close data file (%s)", strerror(errno));
251 }
252 DataFile = NULL;
253 }
254
255 // Open new data file if necessary
256 if (DataFile == NULL) {
257 time_t Time = time(NULL);
258 struct tm *T = localtime(&Time);
259
260 // Get time structure with date rollover
261 if (T->tm_hour >= RollOver) T->tm_mday++;
262 if (mktime(T) == -1) Message(ERROR, "mktime() failed, check filename");
263
264 // Create direcory if not existing (ignore error if already existing)
265 char *Dir;
266 if (asprintf(&Dir, "%s/%d%02d", BaseDir, T->tm_year+1900, T->tm_mon + 1) == -1) {
267 Message(FATAL, "asprintf() failed, could not create direcory name");
268 }
269 if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) {
270 Message(FATAL, "Could not create directory '%s' (%s)", Dir, strerror(errno));
271 }
272
273 // Create filename
274 free(Filename);
275 if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) {
276 Message(FATAL, "asprintf() failed, could not create filename");
277 }
278 free(Dir);
279
280 // Open file
281 if ((DataFile = fopen(Filename, "a")) == NULL) {
282 Message(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
283 }
284 else Message(INFO, "Opened data file '%s'", Filename);
285 DataFilename->updateService(Filename);
286
287 // Calculate time for next file opening
288 T->tm_sec = 0;
289 T->tm_min = 0;
290 T->tm_hour = RollOver;
291 TimeForNextFile = mktime(T);
292 }
293
294 //
295 // ====== Part C: Handle writing to data file ===
296 //
297
298 // Identify index of service
299 int Service;
300 for (Service=0; Service<List.size(); Service++) if (Info == List[Service].DataItem) break;
301 if (Service == List.size()) return; // Service not found
302
303 // If negative value for absolute change, do not write to file
304 if (List[Service].MinAbsChange >= 0) {
305 // Write data header
306 time_t RawTime = Info->getTimestamp();
307 struct tm *TM = localtime(&RawTime);
308
309 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
310
311 // Translate data into ASCII
312 char *Text = EvidenceServer::ToString(Info);
313
314 if (Text != NULL) {
315 // Replace new line by '\' and all other control characters by white space
316 for (int i=0; i<strlen(Text); i++) {
317 if (Text[i] == '\n') Text[i] = '\\';
318 else if (iscntrl(Text[i])) Text[i] = ' ';
319 }
320
321 // Write to file
322 fprintf(DataFile, "%s\n", Text);
323
324 free(Text);
325 }
326 else fprintf(DataFile, "Cannot interpret format identifier\n");
327
328 // Terminate if error because otherwise infinite loop might result as
329 // next call to this infoHandler() will try to (re-)open file
330 if(ferror(DataFile)) {
331 fclose(DataFile);
332 DataFile = NULL;
333 Message(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
334 }
335
336 // Update datafile size service (not every time to avoid infinite loop)
337 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
338 fflush(DataFile); // not continuously to reduce load
339
340 DataSizeMB = FileSize(DataFile)/1024.0/1024.0;
341 DataSizeService->updateService();
342 DataSizeLastUpdate = time(NULL);
343 }
344 } // Check for disk writing
345
346 //
347 // ====== Part D: Handle history service ===
348 //
349
350 if (Info->getSize()==0 || Info->getTimestamp()==0) return;
351
352 // Check if data should be added to history buffer
353 if (strcmp(Info->getFormat(),"C") != 0 && strlen(Info->getFormat())==1) {
354 // Calculate sum of all number in array
355 char *Text = EvidenceServer::ToString(Info);
356 char *Token = strtok(Text, " ");
357 double Sum = 0;
358 while (Token != NULL) {
359 Sum += atof(Token);
360 Token = strtok(NULL, " ");
361 }
362 free(Text);
363 // Minimum change?
364 if (fabs(Sum-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) return;
365 List[Service].LastValue = Sum;
366 }
367
368 // Check if data fits into buffer
369 if (List[Service].HistSize < Info->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
370
371 int Size = Info->getSize() + 2*sizeof(EvidenceHistory::Item), Next = List[Service].Next;
372 void *WrapPos = NULL;
373 char *Buffer = List[Service].Buffer;
374 int Oldest = *(int *) Buffer;
375
376 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
377 if (Next + Size >= List[Service].HistSize) {
378 WrapPos = Buffer + Next;
379 Next = 4;
380 }
381
382 // Adapt pointer to oldest entry
383 while ((Oldest < Next + Size) &&
384 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
385 // Check for wrap-around
386 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
387 Oldest = 4;
388 continue;
389 }
390 // Check if end marker reached, then only one event fits buffer
391 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
392 Oldest = Next;
393 break;
394 }
395 // Move to next entry
396 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
397 }
398 // Update pointer in buffer
399 *(int *) Buffer = Oldest;
400
401 // Write wrap mark if necessary
402 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
403
404 // Copy data into ring buffer
405 *((int *) (Buffer + Next)) = Info->getTimestamp();
406 *((int *) (Buffer + Next + sizeof(int))) = Info->getSize();
407 memcpy(Buffer + Next + 2*sizeof(int), Info->getData(), Info->getSize());
408
409 // Adjust pointer for next entry and write end marker to buffer
410 Next += Info->getSize() + sizeof(EvidenceHistory::Item);
411 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
412
413 List[Service].Next = Next;
414}
415
416//
417// Implementation of log writing
418//
419void DataHandler::commandHandler() {
420
421 if (getCommand() != LogCommand || LogFile == NULL) return;
422
423 // Replace all carriage-return by line feed and non-printable characters
424 char *Text = getCommand()->getString();
425 for (unsigned int i=0; i<strlen(Text); i++) {
426 if (Text[i] == '\r') Text[i] = '\n';
427 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
428 }
429
430 time_t RawTime = time(NULL);
431 struct tm *TM = localtime(&RawTime);
432
433 fprintf(LogFile, "%2d/%2d/%4d %2d:%2d:%2d %s (ID %d): %s\n",
434 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
435 TM->tm_hour, TM->tm_min, TM->tm_sec, getClientName(), getClientId(), Text);
436
437 fflush(LogFile);
438
439 // If error close file (otherwise infinite loop because Message() also writes to log)
440 if(ferror(LogFile)) {
441 fclose(LogFile);
442 LogFile = NULL;
443 Message(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
444 }
445
446 // Update logfile size service
447 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
448 LogSizeMB = FileSize(LogFile)/1024.0/1024.0;
449 LogSizeService->updateService();
450 LogSizeLastUpdate = time(NULL);
451 }
452}
453
454
455//
456// Implementation of history buffer distribution
457//
458void DataHandler::rpcHandler() {
459
460 // Search for history buffer
461 for (int i=0; i<List.size(); i++) {
462 if (strcmp(List[i].DataItem->getName(), getString()) == 0) {
463 setData((void *) List[i].Buffer, List[i].HistSize);
464 return;
465 }
466 }
467
468 // Try to open history file if not found in memory
469 FILE *File = OpenHistFile(getString(), "rb");
470 if (File == NULL) {
471 setData(NULL, 0);
472 return;
473 }
474
475 // Read history file
476 off_t Size = FileSize(File);
477 if (Size != -1) {
478 char *Buffer = new char [Size-sizeof(int)];
479 fseek(File, sizeof(int), SEEK_SET);
480 fread(Buffer, sizeof(char), Size-sizeof(int), File);
481 if (ferror(File) != 0) {
482 Message(WARN, "Error reading history file '%s' in rpcHandler()", getString());
483 setData(NULL, 0); // Default response
484 }
485 else setData((void *) Buffer, Size);
486 delete[] Buffer;
487 }
488
489 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", getString());
490}
491
492
493//
494// Add service to watch list
495//
496void DataHandler::AddService(string Name) {
497
498 // Check if already subscribed to this service
499 for (int i=0; i<List.size(); i++) {
500 if (Name == List[i].DataItem->getName()) return;
501 }
502
503 // Set minimum required change by comparing to regular expressions
504 struct Item New;
505 New.MinAbsChange = 0;
506 New.HistSize = 0;
507 for (int i=0; i<RegExCount; i++) {
508 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) {
509 New.MinAbsChange = MinChange[i];
510 if (HistSize[i] != 0) New.HistSize = HistSize[i];
511 }
512 }
513
514 // At least 3*sizeof(int)
515 if (New.HistSize < MIN_HIST_SIZE) New.HistSize = MIN_HIST_SIZE;
516
517 // Create history service
518 New.Buffer = new char [New.HistSize];
519 memset(New.Buffer, 0, New.HistSize);
520 *(int *) New.Buffer = 4;
521 New.Next = 4;
522 New.LastValue = DBL_MAX;
523
524 // Load history buffer from file if existing
525 FILE *File = OpenHistFile(Name, "rb");
526 if (File != NULL) {
527 // Only load if current buffer size is equal or larger
528 if (FileSize(File) <= New.HistSize*sizeof(char)+sizeof(New.Next) && FileSize(File) != -1) {
529 fread(&New.Next, sizeof(New.Next), 1, File);
530 fread(New.Buffer, sizeof(char), New.HistSize, File);
531 if (ferror(File) != 0) Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
532 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
533 }
534 }
535
536 // Subscribe to service
537 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
538
539 // Add item to list
540 List.push_back(New);
541}
542
543
544//
545// Remove service from watch list
546//
547void DataHandler::RemoveService(string Name) {
548
549 // Find service index
550 vector<struct Item>::iterator E;
551 for (E=List.begin(); E<List.end(); ++E) if (Name == (*E).DataItem->getName()) {
552 // Delete subscription first so handler and not called anymore
553 delete (*E).DataItem;
554
555 // Save history buffer
556 FILE *File = OpenHistFile(Name, "wb");
557 if (File != NULL) {
558 fwrite(&(*E).Next, sizeof((*E).Next), 1, File);
559 fwrite((*E).Buffer, sizeof(char), (*E).HistSize, File);
560 if (ferror(File) != 0) Message(WARN, "Error writing history file '%s' in RemoveService()", Name.c_str());
561 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in RemoveService()", Name.c_str());;
562 }
563
564 // Delete history service and free memory
565 delete[] (*E).Buffer;
566 List.erase(E);
567 }
568}
569
570//
571// Determine size of file in kB
572//
573off_t DataHandler::FileSize(FILE *File) {
574
575 struct stat FileStatus;
576
577 if (fstat(fileno(File), &FileStatus) == -1) {
578 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
579 return -1;
580 }
581
582 return FileStatus.st_size;
583}
584
585//
586// Open file for service history
587//
588FILE *DataHandler::OpenHistFile(string Service, const char *Mode) {
589
590 string Dir = string(BaseDir) + "/Histories/";
591
592 // Create directory if not yet existing
593 if(mkdir(Dir.c_str(), S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
594
595 // Replace all '/' by '_' in string and open file
596 for (int i=0; i<Service.size(); i++) if (Service[i] == '/') Service[i] = '_';
597 return fopen((Dir + Service).c_str(), Mode);
598}
599
600//
601// Main program
602//
603int main() {
604
605 // Static ensures calling of destructor by exit()
606 static DataHandler Data;
607
608 // Sleep until signal caught
609 pause();
610}
Note: See TracBrowser for help on using the repository browser.