source: Evidence/DColl.cc@ 209

Last change on this file since 209 was 209, checked in by ogrimm, 15 years ago
Service histories now available via DimRpc from DColl
File size: 16.2 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated.
8 - A history of events is kept within a ring buffer for each service. Each
9 entry will be the result of a conversion to double of the text
10 written to the data file. Only if the new value has changed by a
11 minimum amout it will be added to the ring buffer. The history is
12 available via an rpc call.
13 - The history buffers are written to disk at program termination and
14 are tired to be read when adding a service.
15 - The command 'DColl/Log' writes the associated string to the log
16 file specified in the configuration.
17
18 Oliver Grimm, May 2010
19
20\********************************************************************/
21
22#define SERVER_NAME "DColl"
23
24#define MIN_HIST_SIZE 1024 // Minimum history buffer in bytes (> 3*sizeof(int) !)
25
26#include "Evidence.h"
27
28#include <string>
29#include <sstream>
30#include <vector>
31#include <iomanip>
32
33#include <math.h>
34#include <float.h>
35#include <sys/stat.h>
36#include <ctype.h>
37#include <sys/types.h>
38#include <regex.h>
39
40using namespace std;
41
42//
43// Class declaration
44//
45class DataHandler: public DimRpc, public DimClient, public DimBrowser,
46 public EvidenceServer {
47
48 struct Item {
49 DimStampedInfo *DataItem;
50 char *Buffer;
51 unsigned int HistSize;
52 int Next;
53 double LastValue;
54 double MinAbsChange;
55 };
56 vector<struct Item> List;
57
58 DimCommand *LogCommand;
59
60 FILE *DataFile;
61 FILE *LogFile;
62 char *Filename;
63 float DataSizekB, LogSizekB;
64 int DataSizeLastUpdate, LogSizeLastUpdate;
65 char *DataDir;
66 DimService *LogSizeService, *DataSizeService, *DataFilename;
67 string HistDir;
68 int SizeUpdateDelay;
69 int TimeForNextFile;
70 int RollOver;
71
72 int RegExCount;
73 regex_t *RegEx;
74 double *MinChange;
75 unsigned int *HistSize;
76
77 void infoHandler();
78 void rpcHandler();
79 void commandHandler();
80 void AddService(string);
81 void RemoveService(string);
82 float FileSize(FILE *);
83 FILE *OpenHistFile(string, const char *);
84
85 public:
86 DataHandler();
87 ~DataHandler();
88};
89
90//
91// Constructor
92//
93DataHandler::DataHandler(): DimRpc("ServiceHistory", "C", "C"), EvidenceServer(SERVER_NAME) {
94
95 // Initialization to prevent freeing unallocated memory
96 DataFile = NULL;
97 LogFile = NULL;
98 Filename = NULL;
99
100 LogSizeService = NULL;
101 DataSizeService = NULL;
102
103 DataSizeLastUpdate = 0;
104 LogSizeLastUpdate = 0;
105 TimeForNextFile = 0;
106
107 // Request configuration data
108 DataDir = GetConfig("datadir");
109 SizeUpdateDelay = atoi(GetConfig("sizeupdate"));
110 HistDir = GetConfig("histdir");
111 RollOver = atoi(GetConfig("rollover"));
112
113 // Open log file
114 char *Logname = GetConfig("logfile");
115 if ((LogFile = fopen(Logname, "a")) == NULL) {
116 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
117 }
118
119 // Provide logging command
120 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
121
122 // Create services for file sizes and data file name
123 DataSizekB = 0;
124 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
125
126 LogSizekB = FileSize(LogFile);
127 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
128
129 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
130
131 // Count how many minimum change regular expressions are present
132 char *Change = GetConfig("items");
133 RegExCount = 0;
134 char *Token = strtok(Change, "\t ");
135 while (Token != NULL) {
136 RegExCount++;
137 Token = strtok(NULL, "\t ");
138 }
139
140 // Allocate memory for regular expressions, minimum change and history size
141 RegEx = new regex_t [RegExCount];
142 MinChange = new double [RegExCount];
143 HistSize = new unsigned int [RegExCount];
144
145 // Compile regular expressions and extract minimum change and history size
146 int Pos = 0;
147 for (int i=0; i<RegExCount; i++) {
148 int Len = strlen(Change+Pos) + 1;
149 Token = strtok(Change + Pos, ": \t");
150
151 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
152 if (Ret != 0) {
153 char ErrMsg[200];
154 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
155 RegExCount--;
156 i--;
157 State(ERROR, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
158 }
159 else {
160 if ((Token=strtok(NULL, ": \t")) != NULL) MinChange[i] = atof(Token);
161 else MinChange[i] = 0;
162
163 if ((Token=strtok(NULL, "")) != NULL) HistSize[i] = atoi(Token)*1024;
164 else HistSize[i] = 0;
165 }
166 Pos += Len;
167 }
168
169 // Subscribe to list of servers at DIS_DNS
170 AddService("DIS_DNS/SERVER_LIST");
171}
172
173//
174// Destructor: Close files and free memory
175//
176DataHandler::~DataHandler() {
177
178 // Delete all DIM subscriptions
179 while (List.size() != 0) RemoveService(List[0].DataItem->getName());
180
181 delete LogCommand;
182 delete DataFilename;
183 delete[] Filename;
184
185 delete LogSizeService;
186 delete DataSizeService;
187
188 // Close files
189 if (LogFile != NULL) if (fclose(LogFile) != 0) {
190 State(ERROR, "Could not close log file (%s)", strerror(errno));
191 }
192 if (DataFile != NULL && fclose(DataFile) != 0) {
193 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
194 }
195
196 // Free memory for regular expressions handling
197 for (int i=0; i<RegExCount; i++) {
198 regfree(&RegEx[i]);
199 }
200 delete[] MinChange;
201 delete[] RegEx;
202}
203
204//
205// Implementation of data handling
206//
207// DIM ensures infoHandler() is called serialized, therefore
208// no mutex is needed to serialize writing to the file
209void DataHandler::infoHandler() {
210
211 static const int WrapMark[] = {0, -1};
212 static const int EndMark[] = {0, 0};
213
214 DimInfo *Info = getInfo();
215
216 // Check if service available
217 if (!ServiceOK(Info)) return;
218
219 //
220 // ====== Part A: Handle service subscriptions ===
221 //
222
223 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
224 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
225 char *Token = strtok(Info->getString(), "+-!@");
226 while (Token != NULL) {
227 if (*Info->getString()=='-' || *Info->getString()=='!') RemoveService(string(Token)+"/SERVICE_LIST");
228 else AddService(string(Token)+"/SERVICE_LIST");
229 Token = strtok(NULL, "|"); // Skip server IP address
230 Token = strtok(NULL, "@");
231 }
232 return;
233 }
234
235 // If service is SERVICE_LIST of any server, scan all services.
236 // Subscribe to all services (but not to commands and RPCs)
237 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
238 char *Name = strtok(Info->getString(), "+-!|");
239 while (Name != NULL) {
240 // Check if item is a service
241 char *Type = strtok(NULL, "\n");
242 if (Type == NULL) return; // for safety, should not happen
243 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
244 // Add or remove service
245 if (*Info->getString()=='-' || *Info->getString()=='!') RemoveService(Name);
246 else AddService(Name);
247 }
248 Name = strtok(NULL, "|");
249 }
250 return;
251 }
252
253 //
254 // ====== Part B: Handle opening data files ===
255 //
256
257 // If it is time to open new data file, close the current one
258 if (time(NULL) >= TimeForNextFile) {
259 if (DataFile != NULL && fclose(DataFile) != 0) {
260 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
261 }
262 DataFile = NULL;
263 }
264
265 // Open new data file if necessary
266 if (DataFile == NULL) {
267 time_t Time = time(NULL);
268 struct tm *T = localtime(&Time);
269
270 // Get time structure with date rollover
271 if (T->tm_hour >= RollOver) T->tm_mday++;
272 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
273
274 // Create direcory if not existing (ignore error if already existing)
275 char *Dir;
276 if (asprintf(&Dir, "%s/%d%02d",DataDir, T->tm_year+1900, T->tm_mon + 1) == -1) {
277 State(FATAL, "asprintf() failed, could not create direcory name");
278 }
279 if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) {
280 State(FATAL, "Could not create directory '%s' (%s)", Dir, strerror(errno));
281 }
282
283 // Create filename
284 free(Filename);
285 if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) {
286 State(FATAL, "asprintf() failed, could not create filename");
287 }
288 free(Dir);
289
290 // Open file
291 if ((DataFile = fopen(Filename, "a")) == NULL) {
292 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
293 }
294 else State(INFO, "Opened data file '%s'", Filename);
295 DataFilename->updateService(Filename);
296
297 // Calculate time for next file opening
298 T->tm_sec = 0;
299 T->tm_min = 0;
300 T->tm_hour = RollOver;
301 TimeForNextFile = mktime(T);
302 }
303
304 //
305 // ====== Part C: Handle writing to data file ===
306 //
307
308 // Identify index of service
309 int Service;
310 for (Service=0; Service<List.size(); Service++) if (Info == List[Service].DataItem) break;
311 if (Service == List.size()) return; // Service not found
312
313 // If negative value for absolute change, do not write to file
314 if (List[Service].MinAbsChange >= 0) {
315 // Write data header
316 time_t RawTime = Info->getTimestamp();
317 struct tm *TM = localtime(&RawTime);
318
319 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
320
321 // Translate data into ASCII
322 char *Text = EvidenceServer::ToString(Info);
323
324 if (Text != NULL) {
325 // Replace new line by '\' and all other control characters by white space
326 for (int i=0; i<strlen(Text); i++) {
327 if (Text[i] == '\n') Text[i] = '\\';
328 else if (iscntrl(Text[i])) Text[i] = ' ';
329 }
330
331 // Write to file
332 fprintf(DataFile, "%s\n", Text);
333
334 free(Text);
335 }
336 else fprintf(DataFile, "Cannot interpret format identifier\n");
337
338 // Terminate if error because otherwise infinite loop might result as
339 // next call to this infoHandler() will try to (re-)open file
340 if(ferror(DataFile)) {
341 fclose(DataFile);
342 DataFile = NULL;
343 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
344 }
345
346 // Update datafile size service (not every time to avoid infinite loop)
347 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
348 fflush(DataFile); // not continuously to reduce load
349
350 DataSizekB = FileSize(DataFile);
351 DataSizeService->updateService();
352 DataSizeLastUpdate = time(NULL);
353 }
354 } // Check for disk writing
355
356 //
357 // ====== Part D: Handle history service ===
358 //
359
360 if (Info->getSize() == 0) return;
361
362 // Check if data should be added to history buffer
363 if (strcmp(Info->getFormat(),"C") != 0 && strlen(Info->getFormat())==1) {
364 // Calculate sum of all number in array
365 char *Text = EvidenceServer::ToString(Info);
366 char *Token = strtok(Text, " ");
367 double Sum = 0;
368 while (Token != NULL) {
369 Sum += atof(Token);
370 Token = strtok(NULL, " ");
371 }
372 free(Text);
373 // Minimum change?
374 if (fabs(Sum-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) return;
375 List[Service].LastValue = Sum;
376 }
377
378 // Check if data fits into buffer
379 if (List[Service].HistSize < Info->getSize() + 5*sizeof(int)) return;
380
381 int Size = Info->getSize() + 4*sizeof(int), Next = List[Service].Next;
382 void *WrapPos = NULL;
383 char *Buffer = List[Service].Buffer;
384 int Oldest = *(int *) Buffer;
385
386 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
387 if (Next + Size >= List[Service].HistSize) {
388 WrapPos = Buffer + Next;
389 Next = 4;
390 }
391
392 // Adapt pointer to oldest entry
393 while ((Oldest < Next + Size) &&
394 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
395 // Check for wrap-around
396 if (memcmp(Buffer + Oldest, WrapMark, sizeof(WrapMark)) == 0) {
397 Oldest = 4;
398 continue;
399 }
400 // Check if end marker reached, then only one event fits buffer
401 if (memcmp(Buffer + Oldest, EndMark, sizeof(EndMark)) == 0) {
402 Oldest = Next;
403 break;
404 }
405 // Move to next entry
406 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
407 }
408 // Update pointer in buffer
409 *(int *) Buffer = Oldest;
410
411 // Write wrap mark if necessary
412 if (WrapPos != NULL) memcpy(WrapPos, WrapMark, sizeof(WrapMark));
413
414 // Copy data into ring buffer
415 *((int *) (Buffer + Next)) = Info->getTimestamp();
416 *((int *) (Buffer + Next + sizeof(int))) = Info->getSize();
417 memcpy(Buffer + Next + 2*sizeof(int), Info->getData(), Info->getSize());
418
419 // Adjust pointer for next entry and write end marker to buffer
420 Next += Info->getSize() + 2*sizeof(int);
421 memcpy(Buffer + Next, EndMark, sizeof(EndMark));
422
423 List[Service].Next = Next;
424}
425
426//
427// Implementation of log writing
428//
429void DataHandler::commandHandler() {
430
431 if (getCommand() != LogCommand || LogFile == NULL) return;
432
433 // Replace all carriage-return by line feed and non-printable characters
434 char *Text = getCommand()->getString();
435 for (unsigned int i=0; i<strlen(Text); i++) {
436 if (Text[i] == '\r') Text[i] = '\n';
437 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
438 }
439
440 time_t RawTime = time(NULL);
441 struct tm *TM = localtime(&RawTime);
442
443 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
444 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
445 TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
446
447 fflush(LogFile);
448
449 // If error close file (otherwise infinite loop because State() also writes to log)
450 if(ferror(LogFile)) {
451 fclose(LogFile);
452 LogFile = NULL;
453 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
454 }
455
456 // Update logfile size service
457 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
458 LogSizekB = FileSize(LogFile);
459 LogSizeService->updateService();
460 LogSizeLastUpdate = time(NULL);
461 }
462}
463
464
465//
466// Implementation of history buffer distribution
467//
468void DataHandler::rpcHandler() {
469
470 // Search for history buffer
471 for (int i=0; i<List.size(); i++) {
472 if (strcmp(List[i].DataItem->getName(), getString()) == 0) {
473 setData((void *) List[i].Buffer, List[i].HistSize);
474 return;
475 }
476 }
477
478 // Default response
479 setData(NULL, 0);
480}
481
482
483//
484// Add service to watch list
485//
486void DataHandler::AddService(string Name) {
487
488 // Check if already subscribed to this service
489 for (int i=0; i<List.size(); i++) {
490 if (Name == List[i].DataItem->getName()) return;
491 }
492
493 // Set minimum required change by comparing to regular expressions
494 struct Item New;
495 New.MinAbsChange = 0;
496 New.HistSize = 0;
497 for (int i=0; i<RegExCount; i++) {
498 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) {
499 New.MinAbsChange = MinChange[i];
500 if (HistSize[i] != 0) New.HistSize = HistSize[i];
501 }
502 }
503
504 // At least 3*sizeof(int)
505 if (New.HistSize < MIN_HIST_SIZE) New.HistSize = MIN_HIST_SIZE;
506
507 // Create history service
508 New.Buffer = new char [New.HistSize];
509 memset(New.Buffer, 0, New.HistSize);
510 *(int *) New.Buffer = 4;
511 New.Next = 4;
512 New.LastValue = DBL_MAX;
513
514 // Load history buffer from file if existing
515 FILE *File = OpenHistFile(Name, "rb");
516 if (File != NULL) {
517 // Only load if current buffer size is equal or larger
518 if (FileSize(File) <= New.HistSize*sizeof(char)+sizeof(New.Next) && FileSize(File) != -1) {
519 fread(&New.Next, sizeof(New.Next), 1, File);
520 fread(New.Buffer, sizeof(char), New.HistSize, File);
521 fclose(File);
522 }
523 }
524
525 // Subscribe to service
526 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
527
528 // Add item to list
529 List.push_back(New);
530}
531
532
533//
534// Remove service from watch list
535//
536void DataHandler::RemoveService(string Name) {
537
538 // Find service index
539 vector<struct Item>::iterator E;
540 for (E=List.begin(); E<List.end(); ++E) if (Name == (*E).DataItem->getName()) {
541 // Delete subscription first so handler and not called anymore
542 delete (*E).DataItem;
543
544 // Save history buffer
545 FILE *File = OpenHistFile(Name, "wb");
546 if (File != NULL) {
547 fwrite(&(*E).Next, sizeof((*E).Next), 1, File);
548 fwrite((*E).Buffer, sizeof(char), (*E).HistSize, File);
549 fclose(File);
550 }
551
552 // Delete history service and free memory
553 delete[] (*E).Buffer;
554 List.erase(E);
555 }
556}
557
558
559//
560// Determine size of file in kB
561//
562float DataHandler::FileSize(FILE *File) {
563
564 struct stat FileStatus;
565
566 if (fstat(fileno(File), &FileStatus) == -1) {
567 State(WARN, "Could not determine size of file (%s)", strerror(errno));
568 return -1;
569 }
570
571 return (float) FileStatus.st_size/1024;
572}
573
574//
575// Replace all '/' by '_' in string
576//
577FILE *DataHandler::OpenHistFile(string Service, const char *Mode) {
578
579 for (int i=0; i<Service.size(); i++) if (Service[i] == '/') Service[i] = '_';
580 return fopen((HistDir + "/" + Service).c_str(), Mode);
581}
582
583//
584// Main program
585//
586int main() {
587
588 // Static ensures calling of destructor by exit()
589 static DataHandler Data;
590
591 // Sleep until signal caught
592 pause();
593}
Note: See TracBrowser for help on using the repository browser.