source: fact/Evidence/DColl.cc@ 17087

Last change on this file since 17087 was 12910, checked in by ogrimm, 13 years ago
EvidenceServer::ToString() can handle all DIM formats as long as padding is disabled
File size: 11.2 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated.
8 - The command 'DColl/Log' writes the associated string to the log file
9
10 Oliver Grimm, June 2010
11
12\********************************************************************/
13
14#define SERVER_NAME "DColl"
15#define LOG_FILENAME "Evidence.log"
16
17#include "Evidence.h"
18
19#include <string>
20#include <sstream>
21#include <iostream>
22#include <vector>
23#include <sys/stat.h>
24#include <regex.h>
25
26using namespace std;
27
28//
29// Class declaration
30//
31class DataHandler: public DimClient, public EvidenceServer {
32
33 struct Item {
34 DimStampedInfo *DataItem;
35 bool Exclude;
36 };
37 vector<struct Item> List;
38
39 DimCommand *LogCommand;
40 DimInfo *ServerList;
41
42 FILE *DataFile;
43 FILE *LogFile;
44 char *Filename;
45 float DataSizeMB, LogSizeMB;
46 int DataSizeLastUpdate, LogSizeLastUpdate;
47 DimService *LogSizeService, *DataSizeService, *DataFilename;
48 int TimeForNextFile;
49
50 vector<regex_t> RegEx;
51
52 void infoHandler();
53 void commandHandler();
54 void AddService(string);
55 void RemoveService(string);
56 off_t FileSize(FILE *);
57
58 public:
59 DataHandler();
60 ~DataHandler();
61};
62
63//
64// Constructor
65//
66DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
67
68 // Initialization to prevent freeing unallocated memory
69 DataFile = NULL;
70 LogFile = NULL;
71 Filename = NULL;
72
73 LogSizeService = NULL;
74 DataSizeService = NULL;
75
76 DataSizeLastUpdate = 0;
77 LogSizeLastUpdate = 0;
78 TimeForNextFile = 0;
79
80 // Request configuration data / initialise for later non-blocking updates
81 GetConfig("basedir");
82 GetConfig("sizeupdate");
83 GetConfig("rollover");
84
85 // Open log file
86 if ((LogFile = fopen((GetConfig("basedir")+"/"+LOG_FILENAME).c_str(), "a")) == NULL) {
87 Message(FATAL, "Could not open log file (%s)", strerror(errno));
88 }
89
90 // Create services for file sizes and data file name
91 DataSizeMB = 0;
92 DataSizeService = new DimService(SERVER_NAME "/DataSizeMB", DataSizeMB);
93
94 LogSizeMB = FileSize(LogFile)/1024.0/1024.0;
95 LogSizeService = new DimService(SERVER_NAME "/LogSizeMB", LogSizeMB);
96
97 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
98
99 // Compile regular expressions
100 regex_t R;
101 vector<string> Exclude = Tokenize(GetConfig("exclude"), " \t");
102 for (int i=0; i<Exclude.size(); i++) {
103 int Ret = regcomp(&R, Exclude[i].c_str(), REG_EXTENDED|REG_NOSUB);
104 if (Ret != 0) {
105 char Err[200];
106 regerror(Ret, &R, Err, sizeof(Err));
107 Message(ERROR, "Error compiling regular expression '%s' (%s)", Exclude[i].c_str(), Err);
108 }
109 else RegEx.push_back(R);
110 }
111
112 // Provide logging command
113 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
114
115 // Subsribe to top-level server list (not via AddService() due to thread issue)
116 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
117}
118
119//
120// Destructor: Close files and free memory
121//
122DataHandler::~DataHandler() {
123
124 // Delete all DIM subscriptions
125 delete ServerList;
126 while (List.size() != 0) RemoveService(List[0].DataItem->getName());
127
128 delete LogCommand;
129 delete DataFilename;
130 delete[] Filename;
131
132 delete LogSizeService;
133 delete DataSizeService;
134
135 // Close files
136 if (LogFile != NULL) if (fclose(LogFile) != 0) {
137 Message(ERROR, "Could not close log file (%s)", strerror(errno));
138 }
139 if (DataFile != NULL && fclose(DataFile) != 0) {
140 Message(ERROR, "Error: Could not close data file (%s)", strerror(errno));
141 }
142
143 // Free memory for regular expressions handling
144 for (int i=0; i<RegEx.size(); i++) regfree(&RegEx[i]);
145}
146
147//
148// Implementation of data handling
149//
150// DIM ensures infoHandler() is called serialized, therefore
151// no mutex is needed to serialize writing to the file
152void DataHandler::infoHandler() {
153
154 // Check if service available
155 DimInfo *I = getInfo();
156 if (!ServiceOK(I)) return;
157
158 //
159 // ====== Part A: Handle service subscriptions ===
160 //
161 // Services are added here, removal only in destructor.
162
163 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
164 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
165 char *Token = strtok(I->getString(), "+-!@");
166 while (Token != NULL) {
167 AddService(string(Token)+"/SERVICE_LIST"); // 'add' also for '-' and '!'
168 Token = strtok(NULL, "|"); // Skip server IP address
169 Token = strtok(NULL, "@");
170 }
171 return;
172 }
173
174 // If service is SERVICE_LIST of any server, scan all services.
175 // Subscribe to all services (but not to commands and RPCs)
176 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
177
178 // Bug fix for empty SERVICE_LIST
179 if (strlen(I->getString()) == 0) {
180 string Tmp(I->getName());
181 RemoveService(I->getName());
182 AddService(Tmp.c_str());
183 return;
184 }
185
186 char *Name = strtok(I->getString(), "+-!|");
187 while (Name != NULL) {
188 // Check if item is a service
189 char *Type = strtok(NULL, "\n");
190 if (Type == NULL) return; // for safety, should not happen
191 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) AddService(Name); // 'add' also for '-' and '!'
192 Name = strtok(NULL, "|");
193 }
194 return;
195 }
196
197 //
198 // ====== Part B: Handle opening of data files ===
199 //
200
201 // If it is time to open new data file, close the current one
202 if (time(NULL) >= TimeForNextFile) {
203 if (DataFile != NULL && fclose(DataFile) != 0) {
204 Message(ERROR, "Error: Could not close data file (%s)", strerror(errno));
205 }
206 DataFile = NULL;
207 }
208
209 // Open new data file if necessary
210 if (DataFile == NULL) {
211 time_t Time = time(NULL);
212 struct tm *T = localtime(&Time);
213
214 // Get time structure with date rollover
215 if (T->tm_hour >= atoi(GetConfig("rollover").c_str())) T->tm_mday++;
216 if (mktime(T) == -1) Message(ERROR, "mktime() failed, check filename");
217
218 // Create direcory if not existing (ignore error if already existing)
219 char *Dir;
220 if (asprintf(&Dir, "%s/%d", GetConfig("basedir").c_str(), T->tm_year+1900) == -1) {
221 Message(FATAL, "asprintf() failed, could not create direcory name");
222 }
223 if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) {
224 Message(FATAL, "Could not create directory '%s' (%s)", Dir, strerror(errno));
225 }
226
227 // Create filename
228 free(Filename);
229 if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) {
230 Message(FATAL, "asprintf() failed, could not create filename");
231 }
232 free(Dir);
233
234 // Open file
235 if ((DataFile = fopen(Filename, "a")) == NULL) {
236 Message(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
237 }
238 else Message(INFO, "Opened data file '%s'", Filename);
239 DataFilename->updateService(Filename);
240
241 // Calculate time for next file opening
242 T->tm_sec = 0;
243 T->tm_min = 0;
244 T->tm_hour = atoi(GetConfig("rollover").c_str());
245 TimeForNextFile = mktime(T);
246 }
247
248 //
249 // ====== Part C: Handle writing to data file ===
250 //
251
252 // Identify index of service
253 for (int Service=0; Service<List.size(); Service++) if (I == List[Service].DataItem) {
254
255 // Service excluded from writing or contains no data?
256 if (List[Service].Exclude || I->getSize()==0) return;
257
258 // Write data header
259 time_t RawTime = I->getTimestamp();
260 struct tm *TM = localtime(&RawTime);
261
262 fprintf(DataFile, "%s %d %d %d %d %d %d %d %d %d ", I->getName(), I->getQuality(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, I->getTimestampMillisecs(), I->getTimestamp());
263
264 // Translate data into ASCII
265 string Text = EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize());
266
267 if (!Text.empty()) {
268 // Replace new line by '\' and all other control characters by white space
269 for (int i=0; i<Text.size(); i++) {
270 if (Text[i] == '\n') Text[i] = '\\';
271 else if (iscntrl(Text[i])) Text[i] = ' ';
272 }
273 // Write to file
274 fprintf(DataFile, "%s\n", Text.c_str());
275 }
276 else fprintf(DataFile, "Cannot interpret service format\n");
277
278 // Terminate if error because otherwise infinite loop might result as
279 // next call to this infoHandler() will try to (re-)open file
280 if(ferror(DataFile) != 0) {
281 if (fclose(DataFile) != 0) Message(ERROR, "Error closing data file (%s)", strerror(errno));
282 DataFile = NULL;
283 Message(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
284 }
285
286 // Update datafile size service (not every time to avoid infinite loop)
287 if (time(NULL) - DataSizeLastUpdate > max(atoi(GetConfig("sizeupdate").c_str()), 1)) {
288 fflush(DataFile); // not continuously to reduce load
289
290 DataSizeMB = FileSize(DataFile)/1024.0/1024.0;
291 DataSizeService->updateService();
292 DataSizeLastUpdate = time(NULL);
293 }
294 } // Check for disk writing
295}
296
297//
298// Implementation of log writing
299//
300void DataHandler::commandHandler() {
301
302 // Translate text safely to string
303 string Text = ToString((char *) "C", getCommand()->getData(), getCommand()->getSize());
304
305 // Safety check
306 if (getCommand() != LogCommand || LogFile == NULL || Text.empty()) return;
307
308 // Replace all carriage-return by line feed and non-printable characters
309 for (unsigned int i=0; i<Text.size(); i++) {
310 if (Text[i] == '\r') Text[i] = '\n';
311 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
312 }
313
314 time_t RawTime = time(NULL);
315 struct tm *TM = localtime(&RawTime);
316
317 fprintf(LogFile, "%.2d/%.2d/%4d %.2d:%.2d:%2.d %s (ID %d): %s\n",
318 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
319 TM->tm_hour, TM->tm_min, TM->tm_sec, getClientName(), getClientId(), Text.c_str());
320 fflush(LogFile);
321
322 // If error close file (otherwise infinite loop because Message() also writes to log)
323 if(ferror(LogFile)) {
324 fclose(LogFile);
325 LogFile = NULL;
326 Message(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
327 }
328
329 // Update logfile size service (not every time to avoid infinite loop)
330 if (time(NULL) - LogSizeLastUpdate > atoi(GetConfig("sizeupdate").c_str())) {
331 LogSizeMB = FileSize(LogFile)/1024.0/1024.0;
332 LogSizeService->updateService();
333 LogSizeLastUpdate = time(NULL);
334 }
335}
336
337
338//
339// Add service to watch list
340//
341void DataHandler::AddService(string Name) {
342
343 struct Item New;
344
345 // Check if already subscribed to this service
346 for (int i=0; i<List.size(); i++) {
347 if (Name == List[i].DataItem->getName()) return;
348 }
349
350 // Should service be ignored?
351 New.Exclude = false;
352 for (int i=0; i<RegEx.size(); i++) {
353 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) New.Exclude = true;
354 }
355
356 // Subscribe to service
357 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
358
359 List.push_back(New);
360}
361
362
363//
364// Remove service from watch list
365//
366void DataHandler::RemoveService(string Name) {
367
368 // Find service index
369 vector<struct Item>::iterator E;
370 for (E=List.begin(); E<List.end(); ++E) if (Name == (*E).DataItem->getName()) {
371 delete (*E).DataItem;
372 List.erase(E);
373 }
374}
375
376//
377// Determine size of file in kB
378//
379off_t DataHandler::FileSize(FILE *File) {
380
381 struct stat FileStatus;
382
383 if (fstat(fileno(File), &FileStatus) == -1) {
384 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
385 return -1;
386 }
387 return FileStatus.st_size;
388}
389
390
391//
392// Main program
393//
394int main() {
395
396 dic_disable_padding();
397 dis_disable_padding();
398
399 // Static ensures calling of destructor by exit()
400 static DataHandler DataInstance;
401
402 // Sleep until exit requested
403 while (!DataInstance.ExitRequest) pause();
404}
Note: See TracBrowser for help on using the repository browser.