source: Evidence/DColl.cc@ 151

Last change on this file since 151 was 151, checked in by ogrimm, 12 years ago
DColl publishes also current data file name
File size: 12.5 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated, with roll-over at 13:00 local time.
8 - For each service, it creates a new service with '.hist' appended
9 that contains a history of events kept within a ring buffer. Each
10 entry will be the result of a conversion to double of the text
11 written to the data file. Only if the new value has changed by a
12 minimum amout it will be added to the ring buffer.
13 - The command 'DColl/Log' writes the associated string to the log
14 file specified in the configuration.
15
16 Oliver Grimm, December 2009
17
18\********************************************************************/
19
20#define SERVER_NAME "DColl"
21
22#define DATE_ROLLOVER 12 // localtime hour after which next date is used
23
24#include "Evidence.h"
25
26#include <math.h>
27#include <float.h>
28#include <sys/stat.h>
29#include <ctype.h>
30#include <sys/types.h>
31#include <regex.h>
32
33//
34// Class declaration
35//
36class DataHandler: public DimClient, public DimBrowser,
37 public EvidenceServer {
38
39 struct Item {
40 DimStampedInfo *DataItem;
41 DimService *HistService;
42 struct EvidenceHistoryItem *HistBuffer;
43 int HistPointer;
44 double LastValue;
45 double MinAbsChange;
46 } *List;
47
48 DimCommand *LogCommand;
49
50 unsigned int NumItems;
51 char *Filename;
52 FILE *DataFile;
53 FILE *LogFile;
54 float DataSizekB, LogSizekB;
55 int DataSizeLastUpdate, LogSizeLastUpdate;
56 char *DataDir;
57 DimService *LogSizeService, *DataSizeService, *DataFilename;
58 int HistSize;
59 int SizeUpdateDelay;
60 int TimeForNextFile;
61
62 int RegExCount;
63 regex_t *RegEx;
64 double *MinChange;
65
66 void infoHandler();
67 void commandHandler();
68 void AddService(char *);
69 float FileSize(FILE *);
70
71 public:
72 DataHandler();
73 ~DataHandler();
74};
75
76//
77// Constructor
78//
79DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
80
81 // Initialization to prevent freeing unallocated memory
82 Filename = NULL;
83 DataFile = NULL;
84 LogFile = NULL;
85 List = NULL;
86 LogSizeService = NULL;
87 DataSizeService = NULL;
88
89 DataSizeLastUpdate = 0;
90 LogSizeLastUpdate = 0;
91 TimeForNextFile = 0;
92
93 // Request configuration data
94 char *Change = GetConfig(SERVER_NAME " minchange");
95 DataDir = GetConfig(SERVER_NAME " datadir");
96 char *Logname = GetConfig(SERVER_NAME " logfile");
97 SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
98 HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
99 if (HistSize < 1) HistSize = 1; // Minimum one items
100
101 // Open log file
102 if ((LogFile = fopen(Logname, "a")) == NULL) {
103 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
104 }
105
106 // Provide logging command
107 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
108
109 // Create services for file sizes and data file name
110 DataSizekB = 0;
111 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
112
113 LogSizekB = FileSize(LogFile);
114 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
115
116 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
117
118 // Count how many minimum change value regular expressions are present
119 RegExCount = 0;
120 char *Token = strtok(Change, "\t ");
121 while (Token != NULL) {
122 RegExCount++;
123 Token = strtok(NULL, "\t ");
124 }
125
126 // Allocate memory for regular expressions and minimum change values
127 RegEx = new regex_t[RegExCount];
128 MinChange = new double [RegExCount];
129
130 // Compile regular expressions
131 int Pos = 0;
132 for (int i=0; i<RegExCount; i++) {
133 int Len = strlen(Change+Pos) + 1;
134 Token = strtok(Change + Pos, ": \t");
135
136 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
137 if (Ret != 0) {
138 char ErrMsg[200];
139 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
140 State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
141 }
142 else {
143 if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
144 else MinChange[i] = 0;
145 }
146 Pos += Len;
147 }
148
149 // Subscribe to list of servers at DIS_DNS
150 AddService((char *) "DIS_DNS/SERVER_LIST");
151
152 DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***");
153}
154
155//
156// Destructor: Close files and free memory
157//
158DataHandler::~DataHandler() {
159
160 // Delete DIM services and command first so handlers and not called anymore
161 for (int i=0; i<NumItems; i++) {
162 delete List[i].HistService;
163 delete List[i].DataItem;
164 delete[] List[i].HistBuffer;
165 }
166 free(List);
167
168 delete DataFilename;
169 //delete LogSizeService; // These create segmentation faults?!
170 //delete DataSizeService;
171
172 delete LogCommand;
173 free(Filename);
174
175 // Close files
176 if (LogFile != NULL) if (fclose(LogFile) != 0) {
177 State(ERROR, "Could not close log file (%s)", strerror(errno));
178 }
179 if (DataFile != NULL && fclose(DataFile) != 0) {
180 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
181 }
182
183 // Free memory for regular expressions handling
184 for (int i=0; i<RegExCount; i++) {
185 regfree(&RegEx[i]);
186 }
187 delete[] MinChange;
188 delete[] RegEx;
189}
190
191//
192// Implementation of data handling
193//
194// DIM ensures infoHandler() is called serialized, therefore
195// no mutex is needed to serialize writing to the file
196void DataHandler::infoHandler() {
197
198 DimInfo *Info = getInfo();
199
200 // Check if service available
201 if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
202
203 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
204 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
205 char *Token = strtok(Info->getString(), "+-!|@");
206 while (Token != NULL) {
207 char *Buf;
208 if (MakeString(&Buf, "%s/SERVICE_LIST", Token) != -1) {
209 AddService(Buf);
210 free(Buf);
211 }
212 else State(ERROR, "MakeString() failed for server %s", Token);
213
214 Token = strtok(NULL, "|");
215 Token = strtok(NULL, "+-!|@");
216 }
217 return;
218 }
219
220 // If service is SERVICE_LIST of any server, scan all services.
221 // Subscribe to all services (but not to commands and RPCs)
222 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
223 char *Name = strtok(Info->getString(), "+-!|");
224 while (Name != NULL) {
225 char *Type = strtok(NULL, "\n");
226 if (Type == NULL) return; // for safety, should not happen
227 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
228 AddService(Name);
229 }
230 Name = strtok(NULL, "+-!|");
231 }
232 return;
233 }
234
235 // If it is time to open new data file, close the current one
236 if (time(NULL) >= TimeForNextFile) {
237 if (DataFile != NULL && fclose(DataFile) != 0) {
238 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
239 }
240 DataFile = NULL;
241 }
242
243 // Open new data file if necessary
244 if (DataFile == NULL) {
245 time_t Time = time(NULL);
246 struct tm *T = localtime(&Time);
247
248 if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
249 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
250
251 free(Filename);
252 if (MakeString(&Filename, "%s/%d%02d%02d.slow", DataDir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == -1) State(FATAL, "Could not create filename, MakeString() failed");
253 if ((DataFile = fopen(Filename, "a")) == NULL) {
254 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
255 }
256 else State(INFO, "Opened data file '%s'", Filename);
257 DataFilename->updateService(Filename);
258
259 // Calculate time for next file opening
260 T->tm_sec = 0;
261 T->tm_min = 0;
262 T->tm_hour = DATE_ROLLOVER;
263 TimeForNextFile = mktime(T);
264 }
265
266 // Identify index of service
267 int Service;
268 for (Service=0; Service<NumItems; Service++) if (Info == List[Service].DataItem) break;
269 if (Service == NumItems) return; // Service not found
270
271 // If negative value for absolute change, ignore this entry
272 if (List[Service].MinAbsChange < 0) return;
273
274 // Write data header
275 time_t RawTime = Info->getTimestamp();
276 struct tm *TM = localtime(&RawTime);
277
278 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
279
280 // Translate data into ASCII
281 char *Text = EvidenceServer::ToString(Info);
282 if (Text != NULL) {
283 // Replace all control characters by white space
284 for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
285
286 // Write to file
287 fprintf(DataFile, "%s\n", Text);
288
289 // Add to history buffer if change large enough
290 if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) {
291 List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp();
292 List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text);
293 List[Service].HistService->updateService();
294 List[Service].HistPointer++;
295 if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0;
296 List[Service].LastValue = atof(Text);
297 }
298 free(Text);
299 }
300 else fprintf(DataFile, "Cannot interpret format identifier\n");
301
302 fflush(DataFile);
303
304 // Terminate if error because otherwise infinite loop might result as
305 // next call to this infoHandler() will try to (re-)open file
306 if(ferror(DataFile)) {
307 fclose(DataFile);
308 DataFile = NULL;
309 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
310 }
311
312 // Update datafile size service
313 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
314 DataSizekB = FileSize(DataFile);
315 DataSizeService->updateService();
316 DataSizeLastUpdate = time(NULL);
317 }
318}
319
320//
321// Implementation of log writing
322//
323void DataHandler::commandHandler() {
324
325 if (getCommand() != LogCommand || LogFile == NULL) return;
326
327 // Replace all carriage-return by line feed and non-printable characters
328 char *Text = getCommand()->getString();
329 for (unsigned int i=0; i<strlen(Text); i++) {
330 if (Text[i] == '\r') Text[i] = '\n';
331 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
332 }
333
334 time_t RawTime = time(NULL);
335 struct tm *TM = localtime(&RawTime);
336
337 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
338 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
339 TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
340
341 fflush(LogFile);
342
343 // If error close file (otherwise infinite loop because State() also writes to log)
344 if(ferror(LogFile)) {
345 fclose(LogFile);
346 LogFile = NULL;
347 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
348 }
349
350 // Update logfile size service
351 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
352 LogSizekB = FileSize(LogFile);
353 LogSizeService->updateService();
354 LogSizeLastUpdate = time(NULL);
355 }
356}
357
358
359//
360// Add service to watch list
361//
362void DataHandler::AddService(char *Name) {
363
364 // Do not subscribe to history services (otherwise infinite loop)
365 if (strstr(Name, ".hist") != NULL) return;
366
367 // Check if already subscribed to this service
368 for (int i=0; i<NumItems; i++) {
369 if(strcmp(Name, List[i].DataItem->getName()) == 0) return;
370 }
371
372 // Increase capacity of item list
373 struct Item *New = (struct Item *) realloc(List, (NumItems+1)*sizeof(struct Item));
374 if (New != NULL) List = New;
375 else {
376 State(ERROR, "Could not allocate memory for item list, service '' not added (%s)", Name, strerror(errno));
377 return;
378 }
379
380 // Set minimum required change by comparing to regular expressions
381 List[NumItems].MinAbsChange = 0;
382 for (int i=0; i<RegExCount; i++) {
383 if (regexec(&RegEx[i], Name, (size_t) 0, NULL, 0) == 0) {
384 List[NumItems].MinAbsChange = MinChange[i];
385 }
386 }
387
388 List[NumItems].LastValue = DBL_MAX;
389
390 // Create history service
391 List[NumItems].HistBuffer = new struct EvidenceHistoryItem [HistSize];
392 memset(List[NumItems].HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem));
393 List[NumItems].HistPointer = 0;
394
395 char *Buf;
396 if (MakeString(&Buf, "%s.hist", Name) == -1) {
397 State(ERROR, "Could not create history service for '%s', MakeString() failed", Name);
398 }
399 else {
400 List[NumItems].HistService = new DimService (Buf, (char *) "C",
401 List[NumItems].HistBuffer, HistSize*sizeof(EvidenceHistoryItem));
402 free(Buf);
403 }
404
405 // Subscribe to service
406 List[NumItems].DataItem = new DimStampedInfo(Name, NO_LINK, this);
407
408 // Increase number only after all set-up
409 NumItems++;
410}
411
412//
413// Determine size of file in kB
414//
415float DataHandler::FileSize(FILE *File) {
416
417 struct stat FileStatus;
418
419 if (fstat(fileno(File), &FileStatus) == -1) {
420 State(WARN, "Could not determine size of file (%s)", strerror(errno));
421 return -1;
422 }
423
424 return (float) FileStatus.st_size/1024;
425}
426
427//
428// Main program
429//
430int main() {
431
432 // Static ensures calling of destructor by exit()
433 static DataHandler Data;
434
435 // Sleep until signal caught
436 pause();
437}
Note: See TracBrowser for help on using the repository browser.