source: Evidence/DColl.cc@ 152

Last change on this file since 152 was 152, checked in by ogrimm, 12 years ago
Updates
File size: 13.3 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated, with roll-over at 13:00 local time.
8 - For each service, it creates a new service with '.hist' appended
9 that contains a history of events kept within a ring buffer. Each
10 entry will be the result of a conversion to double of the text
11 written to the data file. Only if the new value has changed by a
12 minimum amout it will be added to the ring buffer.
13 - The history buffers are written to disk at program termination and
14 are tired to be read when adding a service.
15 - The command 'DColl/Log' writes the associated string to the log
16 file specified in the configuration.
17
18 Oliver Grimm, December 2009
19
20\********************************************************************/
21
22#define SERVER_NAME "DColl"
23
24#define DATE_ROLLOVER 12 // localtime hour after which next date is used
25
26#include "Evidence.h"
27
28#include <string>
29#include <sstream>
30#include <vector>
31#include <iomanip>
32
33#include <math.h>
34#include <float.h>
35#include <sys/stat.h>
36#include <ctype.h>
37#include <sys/types.h>
38#include <regex.h>
39
40//
41// Class declaration
42//
43class DataHandler: public DimClient, public DimBrowser,
44 public EvidenceServer {
45
46 struct Item {
47 DimStampedInfo *DataItem;
48 DimService *HistService;
49 struct EvidenceHistoryItem *HistBuffer;
50 int HistPointer;
51 double LastValue;
52 double MinAbsChange;
53 };
54 vector<struct Item> List;
55
56 DimCommand *LogCommand;
57
58 FILE *DataFile;
59 FILE *LogFile;
60 char *Filename;
61 float DataSizekB, LogSizekB;
62 int DataSizeLastUpdate, LogSizeLastUpdate;
63 char *DataDir;
64 DimService *LogSizeService, *DataSizeService, *DataFilename;
65 string HistDir;
66 int HistSize;
67 int SizeUpdateDelay;
68 int TimeForNextFile;
69
70 int RegExCount;
71 regex_t *RegEx;
72 double *MinChange;
73
74 void infoHandler();
75 void commandHandler();
76 void AddService(string);
77 float FileSize(FILE *);
78
79 public:
80 DataHandler();
81 ~DataHandler();
82};
83
84//
85// Constructor
86//
87DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
88
89 // Initialization to prevent freeing unallocated memory
90 DataFile = NULL;
91 LogFile = NULL;
92 Filename = NULL;
93
94 LogSizeService = NULL;
95 DataSizeService = NULL;
96
97 DataSizeLastUpdate = 0;
98 LogSizeLastUpdate = 0;
99 TimeForNextFile = 0;
100
101 // Request configuration data
102 char *Change = GetConfig(SERVER_NAME " minchange");
103
104 DataDir = GetConfig(SERVER_NAME " datadir");
105
106 char *Logname = GetConfig(SERVER_NAME " logfile");
107
108 SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
109
110 HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
111 if (HistSize < 1) HistSize = 1; // Minimum one items
112
113 HistDir = GetConfig(SERVER_NAME " histdir");
114
115 // Open log file
116 if ((LogFile = fopen(Logname, "a")) == NULL) {
117 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
118 }
119
120 // Provide logging command
121 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
122
123 // Create services for file sizes and data file name
124 DataSizekB = 0;
125 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
126
127 LogSizekB = FileSize(LogFile);
128 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
129
130 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
131
132 // Count how many minimum change value regular expressions are present
133 RegExCount = 0;
134 char *Token = strtok(Change, "\t ");
135 while (Token != NULL) {
136 RegExCount++;
137 Token = strtok(NULL, "\t ");
138 }
139
140 // Allocate memory for regular expressions and minimum change values
141 RegEx = new regex_t[RegExCount];
142 MinChange = new double [RegExCount];
143
144 // Compile regular expressions
145 int Pos = 0;
146 for (int i=0; i<RegExCount; i++) {
147 int Len = strlen(Change+Pos) + 1;
148 Token = strtok(Change + Pos, ": \t");
149
150 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
151 if (Ret != 0) {
152 char ErrMsg[200];
153 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
154 State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
155 }
156 else {
157 if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
158 else MinChange[i] = 0;
159 }
160 Pos += Len;
161 }
162
163 // Subscribe to list of servers at DIS_DNS
164 AddService("DIS_DNS/SERVER_LIST");
165
166 DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***");
167}
168
169//
170// Destructor: Close files and free memory
171//
172DataHandler::~DataHandler() {
173
174 DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging stopped ***");
175
176 // Delete DIM subscriptions and command first so handlers and not called anymore
177 for (int i=0; i<List.size(); i++) {
178 delete List[i].DataItem;
179 }
180 delete LogCommand;
181
182 // Save history buffers to files (replace '/' by '_')
183 for (int i=0; i<List.size(); i++) {
184 string Name = List[i].HistService->getName();
185 for (int j=0; j<Name.size(); j++) if (Name[j] == '/') Name[j] = '_';
186 FILE *File = fopen((HistDir + "/" + Name).c_str(), "wb");
187 if (File != NULL) {
188 fwrite(&List[i].HistPointer, sizeof(List[i].HistPointer), 1, File);
189 fwrite(List[i].HistBuffer, sizeof(EvidenceHistoryItem), HistSize, File);
190 fclose(File);
191 }
192 delete List[i].HistService;
193 delete[] List[i].HistBuffer;
194 }
195
196 delete DataFilename;
197 delete[] Filename;
198
199 //delete LogSizeService; // These create segmentation faults?!
200 //delete DataSizeService;
201
202 // Close files
203 if (LogFile != NULL) if (fclose(LogFile) != 0) {
204 State(ERROR, "Could not close log file (%s)", strerror(errno));
205 }
206 if (DataFile != NULL && fclose(DataFile) != 0) {
207 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
208 }
209
210 // Free memory for regular expressions handling
211 for (int i=0; i<RegExCount; i++) {
212 regfree(&RegEx[i]);
213 }
214 delete[] MinChange;
215 delete[] RegEx;
216}
217
218//
219// Implementation of data handling
220//
221// DIM ensures infoHandler() is called serialized, therefore
222// no mutex is needed to serialize writing to the file
223void DataHandler::infoHandler() {
224
225 DimInfo *Info = getInfo();
226
227 // Check if service available
228 if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
229
230 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
231 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
232 char *Token = strtok(Info->getString(), "@");
233 while (Token != NULL) {
234 if (isalpha(*Token) == 0) Token++; // Name can start with +,-,!
235
236 AddService(string(Token)+"/SERVICE_LIST");
237
238 Token = strtok(NULL, "|");
239 Token = strtok(NULL, "|@"); // ???? Why needed ?????
240 }
241 return;
242 }
243
244 // If service is SERVICE_LIST of any server, scan all services.
245 // Subscribe to all services (but not to commands and RPCs)
246 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
247
248 char *Name = strtok(Info->getString(), "|");
249 while (Name != NULL) {
250 char *Type = strtok(NULL, "\n");
251 if (Type == NULL) return; // for safety, should not happen
252 if (isalpha(*Name) == 0) Name++; // Name can start with +,-,!
253 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
254 AddService(Name);
255 }
256 Name = strtok(NULL, "|");
257 }
258 return;
259 }
260
261 // If it is time to open new data file, close the current one
262 if (time(NULL) >= TimeForNextFile) {
263 if (DataFile != NULL && fclose(DataFile) != 0) {
264 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
265 }
266 DataFile = NULL;
267 }
268
269 // Open new data file if necessary
270 if (DataFile == NULL) {
271 time_t Time = time(NULL);
272 struct tm *T = localtime(&Time);
273 ostringstream Buf;
274
275 // Generate file name from date
276 if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
277 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
278 Buf << DataDir << "/" << T->tm_year+1900 << setw(2) << setfill('0') << T->tm_mon+1 << T->tm_mday << ".slow";
279
280 // Copy filename to permanent buffer
281 delete[] Filename;
282 Filename = new char [Buf.str().size()+1];
283 strcpy(Filename, Buf.str().c_str());
284
285 // Open file
286 if ((DataFile = fopen(Filename, "a")) == NULL) {
287 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
288 }
289 else State(INFO, "Opened data file '%s'", Filename);
290 DataFilename->updateService(Filename);
291
292 // Calculate time for next file opening
293 T->tm_sec = 0;
294 T->tm_min = 0;
295 T->tm_hour = DATE_ROLLOVER;
296 TimeForNextFile = mktime(T);
297 }
298
299 // Identify index of service
300 int Service;
301 for (Service=0; Service<List.size(); Service++) if (Info == List[Service].DataItem) break;
302 if (Service == List.size()) return; // Service not found
303
304 // If negative value for absolute change, ignore this entry
305 if (List[Service].MinAbsChange < 0) return;
306
307 // Write data header
308 time_t RawTime = Info->getTimestamp();
309 struct tm *TM = localtime(&RawTime);
310
311 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
312
313 // Translate data into ASCII
314 char *Text = EvidenceServer::ToString(Info);
315 if (Text != NULL) {
316 // Replace all control characters by white space
317 for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
318
319 // Write to file
320 fprintf(DataFile, "%s\n", Text);
321
322 // Add to history buffer if change large enough
323 if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) {
324 List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp();
325 List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text);
326 List[Service].HistService->updateService();
327 List[Service].HistPointer++;
328 if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0;
329 List[Service].LastValue = atof(Text);
330 }
331 free(Text);
332 }
333 else fprintf(DataFile, "Cannot interpret format identifier\n");
334
335 fflush(DataFile);
336
337 // Terminate if error because otherwise infinite loop might result as
338 // next call to this infoHandler() will try to (re-)open file
339 if(ferror(DataFile)) {
340 fclose(DataFile);
341 DataFile = NULL;
342 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
343 }
344
345 // Update datafile size service
346 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
347 DataSizekB = FileSize(DataFile);
348 DataSizeService->updateService();
349 DataSizeLastUpdate = time(NULL);
350 }
351}
352
353//
354// Implementation of log writing
355//
356void DataHandler::commandHandler() {
357
358 if (getCommand() != LogCommand || LogFile == NULL) return;
359
360 // Replace all carriage-return by line feed and non-printable characters
361 char *Text = getCommand()->getString();
362 for (unsigned int i=0; i<strlen(Text); i++) {
363 if (Text[i] == '\r') Text[i] = '\n';
364 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
365 }
366
367 time_t RawTime = time(NULL);
368 struct tm *TM = localtime(&RawTime);
369
370 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
371 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
372 TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
373
374 fflush(LogFile);
375
376 // If error close file (otherwise infinite loop because State() also writes to log)
377 if(ferror(LogFile)) {
378 fclose(LogFile);
379 LogFile = NULL;
380 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
381 }
382
383 // Update logfile size service
384 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
385 LogSizekB = FileSize(LogFile);
386 LogSizeService->updateService();
387 LogSizeLastUpdate = time(NULL);
388 }
389}
390
391
392//
393// Add service to watch list
394//
395void DataHandler::AddService(string Name) {
396
397 // Do not subscribe to history services (otherwise infinite loop)
398 if (Name.find(".hist") != string::npos) return;
399
400 // Check if already subscribed to this service
401 for (int i=0; i<List.size(); i++) {
402 if (Name == List[i].DataItem->getName()) return;
403 }
404
405 // Create new entry in item list
406 struct Item New;
407
408 // Set minimum required change by comparing to regular expressions
409 New.MinAbsChange = 0;
410 for (int i=0; i<RegExCount; i++) {
411 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) {
412 New.MinAbsChange = MinChange[i];
413 }
414 }
415
416 // Create history service
417 New.HistBuffer = new struct EvidenceHistoryItem [HistSize];
418 memset(New.HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem));
419 New.HistPointer = 0;
420 New.LastValue = DBL_MAX;
421 New.HistService = new DimService ((Name+".hist").c_str(), (char *) "C",
422 New.HistBuffer, HistSize*sizeof(EvidenceHistoryItem));
423
424 // Load history buffer from file if existing
425 string Filename = New.HistService->getName();
426 for (int j=0; j<Filename.size(); j++) if (Filename[j] == '/') Filename[j] = '_';
427 FILE *File = fopen((HistDir + "/" + Filename).c_str(), "rb");
428 if (File != NULL) {
429 fread(&New.HistPointer, sizeof(New.HistPointer), 1, File);
430 fread(New.HistBuffer, sizeof(EvidenceHistoryItem), HistSize, File);
431 fclose(File);
432 }
433
434 // Subscribe to service
435 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
436
437 // Add item to list
438 List.push_back(New);
439}
440
441//
442// Determine size of file in kB
443//
444float DataHandler::FileSize(FILE *File) {
445
446 struct stat FileStatus;
447
448 if (fstat(fileno(File), &FileStatus) == -1) {
449 State(WARN, "Could not determine size of file (%s)", strerror(errno));
450 return -1;
451 }
452
453 return (float) FileStatus.st_size/1024;
454}
455
456//
457// Main program
458//
459int main() {
460
461 // Static ensures calling of destructor by exit()
462 static DataHandler Data;
463
464 // Sleep until signal caught
465 pause();
466}
Note: See TracBrowser for help on using the repository browser.