source: Evidence/DColl.cc@ 145

Last change on this file since 145 was 145, checked in by ogrimm, 15 years ago
Updates
File size: 12.3 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated, with roll-over at 13:00 local time.
8 - For each service, it creates a new service with '.hist' appended
9 that contains a history of events kept within a ring buffer. Each
10 entry will be the result of a conversion to double of the text
11 written to the data file. Only if the new value has changed by a
12 minimum amout it will be added to the ring buffer.
13 - The command 'DColl/Log' writes the associated string to the log
14 file specified in the configuration.
15
16 Oliver Grimm, December 2009
17
18\********************************************************************/
19
20#define SERVER_NAME "DColl"
21
22#define DATE_ROLLOVER 12 // localtime hour after which next date is used
23
24#include "Evidence.h"
25
26#include <math.h>
27#include <float.h>
28#include <sys/stat.h>
29#include <ctype.h>
30#include <sys/types.h>
31#include <regex.h>
32
33//
34// Class declaration
35//
36class DataHandler: public DimClient, public DimBrowser,
37 public EvidenceServer {
38
39 struct Item {
40 DimStampedInfo *DataItem;
41 DimService *HistService;
42 struct EvidenceHistoryItem *HistBuffer;
43 int HistPointer;
44 double LastValue;
45 double MinAbsChange;
46 } *List;
47
48 DimCommand *LogCommand;
49
50 unsigned int NumItems;
51 FILE *DataFile;
52 FILE *LogFile;
53 float DataSizekB, LogSizekB;
54 int DataSizeLastUpdate, LogSizeLastUpdate;
55 char *DataDir;
56 DimService *LogSizeService, *DataSizeService;
57 int HistSize;
58 int SizeUpdateDelay;
59 int TimeForNextFile;
60
61 int RegExCount;
62 regex_t *RegEx;
63 double *MinChange;
64
65 void infoHandler();
66 void commandHandler();
67 void AddService(char *);
68 float FileSize(FILE *);
69
70 public:
71 DataHandler();
72 ~DataHandler();
73};
74
75//
76// Constructor
77//
78DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
79
80 // Initialization to prevent freeing unallocated memory
81 DataFile = NULL;
82 LogFile = NULL;
83 List = NULL;
84 LogSizeService = NULL;
85 DataSizeService = NULL;
86
87 DataSizeLastUpdate = 0;
88 LogSizeLastUpdate = 0;
89 TimeForNextFile = 0;
90
91 // Request configuration data
92 char *Change = GetConfig(SERVER_NAME " minchange");
93 DataDir = GetConfig(SERVER_NAME " datadir");
94 char *Logname = GetConfig(SERVER_NAME " logfile");
95 SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
96 HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
97 if (HistSize < 1) HistSize = 1; // Minimum one items
98
99 // Open log file
100 if ((LogFile = fopen(Logname, "a")) == NULL) {
101 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
102 }
103
104 // Provide logging command
105 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
106
107 // Create services for file sizes
108 DataSizekB = 0;
109 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
110
111 LogSizekB = FileSize(LogFile);
112 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
113
114 // Count how many minimum change value regular expressions are present
115 RegExCount = 0;
116 char *Token = strtok(Change, "\t ");
117 while (Token != NULL) {
118 RegExCount++;
119 Token = strtok(NULL, "\t ");
120 }
121
122 // Allocate memory for regular expressions and minimum change values
123 RegEx = new regex_t[RegExCount];
124 MinChange = new double [RegExCount];
125
126 // Compile regular expressions
127 int Pos = 0;
128 for (int i=0; i<RegExCount; i++) {
129 int Len = strlen(Change+Pos) + 1;
130 Token = strtok(Change + Pos, ": \t");
131
132 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
133 if (Ret != 0) {
134 char ErrMsg[200];
135 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
136 State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
137 }
138 else {
139 if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
140 else MinChange[i] = 0;
141 }
142 Pos += Len;
143 }
144
145 // Subscribe to list of servers at DIS_DNS
146 AddService((char *) "DIS_DNS/SERVER_LIST");
147
148 DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***");
149}
150
151//
152// Destructor: Close files and free memory
153//
154DataHandler::~DataHandler() {
155
156 // Delete DIM services and command first so handlers and not called anymore
157 for (int i=0; i<NumItems; i++) {
158 delete List[i].HistService;
159 delete List[i].DataItem;
160 delete[] List[i].HistBuffer;
161 }
162 free(List);
163
164 //delete LogSizeService; // These create segmentation faults?!
165 //delete DataSizeService;
166
167 delete LogCommand;
168
169 // Close files
170 if (LogFile != NULL) if (fclose(LogFile) != 0) {
171 State(ERROR, "Could not close log file (%s)", strerror(errno));
172 }
173 if (DataFile != NULL && fclose(DataFile) != 0) {
174 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
175 }
176
177 // Free memory for regular expressions handling
178 for (int i=0; i<RegExCount; i++) {
179 regfree(&RegEx[i]);
180 }
181 delete[] MinChange;
182 delete[] RegEx;
183}
184
185//
186// Implementation of data handling
187//
188// DIM ensures infoHandler() is called serialized, therefore
189// no mutex is needed to serialize writing to the file
190void DataHandler::infoHandler() {
191
192 DimInfo *Info = getInfo();
193
194 // Check if service available
195 if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
196
197 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
198 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {
199 char *Token = strtok(Info->getString(), "+-!|@");
200 while (Token != NULL) {
201 char *Buf;
202 if (MakeString(&Buf, "%s/SERVICE_LIST", Token) != -1) {
203 AddService(Buf);
204 free(Buf);
205 }
206 else State(ERROR, "MakeString() failed for server %s", Token);
207
208 Token = strtok(NULL, "|");
209 Token = strtok(NULL, "+-!|@");
210 }
211 return;
212 }
213
214 // If service is SERVICE_LIST of any server, scan all services.
215 // Subscribe to all services (but not to commands and RPCs)
216 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
217 char *Name = strtok(Info->getString(), "+-!|");
218 while (Name != NULL) {
219 char *Type = strtok(NULL, "\n");
220 if (Type == NULL) return; // for safety, should not happen
221 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
222 AddService(Name);
223 }
224 Name = strtok(NULL, "+-!|");
225 }
226 return;
227 }
228
229 // If it is time to open new data file, close the current one
230 if (time(NULL) >= TimeForNextFile) {
231 if (DataFile != NULL && fclose(DataFile) != 0) {
232 State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
233 }
234 DataFile = NULL;
235 }
236
237 // Open new data file if necessary
238 if (DataFile == NULL) {
239 time_t Time = time(NULL);
240 struct tm *T = localtime(&Time);
241
242 if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
243 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
244
245 char *Filename;
246 if (MakeString(&Filename, "%s/%d%02d%02d.slow", DataDir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == -1) State(FATAL, "Could not create filename, MakeString() failed");
247 if ((DataFile = fopen(Filename, "a")) == NULL) {
248 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
249 }
250 else State(INFO, "Opened data file '%s'", Filename);
251 free(Filename);
252
253 // Calculate time for next file opening
254 T->tm_sec = 0;
255 T->tm_min = 0;
256 T->tm_hour = DATE_ROLLOVER;
257 TimeForNextFile = mktime(T);
258 }
259
260 // Identify index of service
261 int Service;
262 for (Service=0; Service<NumItems; Service++) if (Info == List[Service].DataItem) break;
263 if (Service == NumItems) return; // Service not found
264
265 // If negative value for absolute change, ignore this entry
266 if (List[Service].MinAbsChange < 0) return;
267
268 // Write data header
269 time_t RawTime = Info->getTimestamp();
270 struct tm *TM = localtime(&RawTime);
271
272 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
273
274 // Translate data into ASCII
275 char *Text = EvidenceServer::ToString(Info);
276 if (Text != NULL) {
277 // Replace all control characters by white space
278 for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
279
280 // Write to file
281 fprintf(DataFile, "%s\n", Text);
282
283 // Add to history buffer if change large enough
284 if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) {
285 List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp();
286 List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text);
287 List[Service].HistService->updateService();
288 List[Service].HistPointer++;
289 if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0;
290 List[Service].LastValue = atof(Text);
291 }
292 free(Text);
293 }
294 else fprintf(DataFile, "Cannot interpret format identifier\n");
295
296 fflush(DataFile);
297
298 // Terminate if error because otherwise infinite loop might result as
299 // next call to this infoHandler() will try to (re-)open file
300 if(ferror(DataFile)) {
301 fclose(DataFile);
302 DataFile = NULL;
303 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
304 }
305
306 // Update datafile size service
307 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
308 DataSizekB = FileSize(DataFile);
309 DataSizeService->updateService();
310 DataSizeLastUpdate = time(NULL);
311 }
312}
313
314//
315// Implementation of log writing
316//
317void DataHandler::commandHandler() {
318
319 if (getCommand() != LogCommand || LogFile == NULL) return;
320
321 // Replace all carriage-return by line feed and non-printable characters
322 char *Text = getCommand()->getString();
323 for (unsigned int i=0; i<strlen(Text); i++) {
324 if (Text[i] == '\r') Text[i] = '\n';
325 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
326 }
327
328 time_t RawTime = time(NULL);
329 struct tm *TM = localtime(&RawTime);
330
331 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
332 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
333 TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
334
335 fflush(LogFile);
336
337 // If error close file (otherwise infinite loop because State() also writes to log)
338 if(ferror(LogFile)) {
339 fclose(LogFile);
340 LogFile = NULL;
341 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
342 }
343
344 // Update logfile size service
345 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
346 LogSizekB = FileSize(LogFile);
347 LogSizeService->updateService();
348 LogSizeLastUpdate = time(NULL);
349 }
350}
351
352
353//
354// Add service to watch list
355//
356void DataHandler::AddService(char *Name) {
357
358 // Do not subscribe to history services (otherwise infinite loop)
359 if (strstr(Name, ".hist") != NULL) return;
360
361 // Check if already subscribed to this service
362 for (int i=0; i<NumItems; i++) {
363 if(strcmp(Name, List[i].DataItem->getName()) == 0) return;
364 }
365
366 // Increase capacity of item list
367 struct Item *New = (struct Item *) realloc(List, (NumItems+1)*sizeof(struct Item));
368 if (New != NULL) List = New;
369 else {
370 State(ERROR, "Could not allocate memory for item list, service '' not added (%s)", Name, strerror(errno));
371 return;
372 }
373
374 // Set minimum required change by comparing to regular expressions
375 List[NumItems].MinAbsChange = 0;
376 for (int i=0; i<RegExCount; i++) {
377 if (regexec(&RegEx[i], Name, (size_t) 0, NULL, 0) == 0) {
378 List[NumItems].MinAbsChange = MinChange[i];
379 }
380 }
381
382 List[NumItems].LastValue = DBL_MAX;
383
384 // Create history service
385 List[NumItems].HistBuffer = new struct EvidenceHistoryItem [HistSize];
386 memset(List[NumItems].HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem));
387 List[NumItems].HistPointer = 0;
388
389 char *Buf;
390 if (MakeString(&Buf, "%s.hist", Name) == -1) {
391 State(ERROR, "Could not create history service for '%s', MakeString() failed", Name);
392 }
393 else {
394 List[NumItems].HistService = new DimService (Buf, (char *) "C",
395 List[NumItems].HistBuffer, HistSize*sizeof(EvidenceHistoryItem));
396 free(Buf);
397 }
398
399 // Subscribe to service
400 List[NumItems].DataItem = new DimStampedInfo(Name, NO_LINK, this);
401
402 // Increase number only after all set-up
403 NumItems++;
404}
405
406//
407// Determine size of file in kB
408//
409float DataHandler::FileSize(FILE *File) {
410
411 struct stat FileStatus;
412
413 if (fstat(fileno(File), &FileStatus) == -1) {
414 State(WARN, "Could not determine size of file (%s)", strerror(errno));
415 return -1;
416 }
417
418 return (float) FileStatus.st_size/1024;
419}
420
421//
422// Main program
423//
424int main() {
425
426 // Static ensures calling of destructor by exit()
427 static DataHandler Data;
428
429 // Sleep until signal caught
430 pause();
431}
Note: See TracBrowser for help on using the repository browser.