source: fact/Evidence/DColl.cc@ 18123

Last change on this file since 18123 was 17915, checked in by ogrimm, 10 years ago
Removed erroneous static declaration in History.cc that made a variable grow in size indefinitely. The data collector has new a service indicating its subscriptions (as the history server)
File size: 11.8 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - One data file per day is generated.
8 - The command 'DColl/Log' writes the associated string to the log file
9
10 Oliver Grimm, June 2010
11
12\********************************************************************/
13
14#define SERVER_NAME "DColl"
15#define LOG_FILENAME "Evidence.log"
16
17#include "Evidence.h"
18
19#include <string>
20#include <sstream>
21#include <iostream>
22#include <vector>
23#include <sys/stat.h>
24#include <regex.h>
25
26using namespace std;
27
28//
29// Class declaration
30//
31class DataHandler: public DimClient, public EvidenceServer {
32
33 struct Item {
34 DimStampedInfo *DataItem;
35 bool Exclude;
36 };
37 vector<struct Item> List;
38
39 DimCommand *LogCommand;
40 DimInfo *ServerList;
41
42 FILE *DataFile;
43 FILE *LogFile;
44 char *Filename;
45 float DataSizeMB, LogSizeMB;
46 int DataSizeLastUpdate, LogSizeLastUpdate;
47 DimService *LogSizeService, *DataSizeService, *DataFilename, *SubscriptionService;
48
49 int TimeForNextFile;
50
51 vector<regex_t> RegEx;
52
53 void infoHandler();
54 void commandHandler();
55 void AddService(string);
56 void RemoveService(string);
57 off_t FileSize(FILE *);
58
59 public:
60 DataHandler();
61 ~DataHandler();
62};
63
64//
65// Constructor
66//
67DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
68
69 // Initialization to prevent freeing unallocated memory
70 DataFile = NULL;
71 LogFile = NULL;
72 Filename = NULL;
73
74 LogSizeService = NULL;
75 DataSizeService = NULL;
76
77 DataSizeLastUpdate = 0;
78 LogSizeLastUpdate = 0;
79 TimeForNextFile = 0;
80
81 // Request configuration data / initialise for later non-blocking updates
82 GetConfig("basedir");
83 GetConfig("sizeupdate");
84 GetConfig("rollover");
85
86 // Open log file
87 if ((LogFile = fopen((GetConfig("basedir")+"/"+LOG_FILENAME).c_str(), "a")) == NULL) {
88 Message(FATAL, "Could not open log file (%s)", strerror(errno));
89 }
90
91 // Create services for file sizes and data file name
92 DataSizeMB = 0;
93 DataSizeService = new DimService(SERVER_NAME "/DataSizeMB", DataSizeMB);
94
95 LogSizeMB = FileSize(LogFile)/1024.0/1024.0;
96 LogSizeService = new DimService(SERVER_NAME "/LogSizeMB", LogSizeMB);
97
98 DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) "");
99
100 // Compile regular expressions
101 regex_t R;
102 vector<string> Exclude = Tokenize(GetConfig("exclude"), " \t");
103 for (int i=0; i<Exclude.size(); i++) {
104 int Ret = regcomp(&R, Exclude[i].c_str(), REG_EXTENDED|REG_NOSUB);
105 if (Ret != 0) {
106 char Err[200];
107 regerror(Ret, &R, Err, sizeof(Err));
108 Message(ERROR, "Error compiling regular expression '%s' (%s)", Exclude[i].c_str(), Err);
109 }
110 else RegEx.push_back(R);
111 }
112
113 // Provide logging command
114 LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
115
116 // Create services for information about subscribed services
117 SubscriptionService = new DimService(SERVER_NAME "/Subscriptions", "C", NULL, 0);
118
119 // Subscribe to top-level server list (not via AddService() due to thread issue)
120 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
121}
122
123//
124// Destructor: Close files and free memory
125//
126DataHandler::~DataHandler() {
127
128 // Delete all DIM subscriptions
129 delete ServerList;
130 while (List.size() != 0) RemoveService(List[0].DataItem->getName());
131
132 delete LogCommand;
133 delete DataFilename;
134 delete[] Filename;
135
136 delete LogSizeService;
137 delete DataSizeService;
138 delete SubscriptionService;
139
140 // Close files
141 if (LogFile != NULL) if (fclose(LogFile) != 0) {
142 Message(ERROR, "Could not close log file (%s)", strerror(errno));
143 }
144 if (DataFile != NULL && fclose(DataFile) != 0) {
145 Message(ERROR, "Error: Could not close data file (%s)", strerror(errno));
146 }
147
148 // Free memory for regular expressions handling
149 for (int i=0; i<RegEx.size(); i++) regfree(&RegEx[i]);
150}
151
152//
153// Implementation of data handling
154//
155// DIM ensures infoHandler() is called serialized, therefore
156// no mutex is needed to serialize writing to the file
157void DataHandler::infoHandler() {
158
159 static string SubscriptionList;
160
161 // Check if service available
162 DimInfo *I = getInfo();
163 if (!ServiceOK(I)) return;
164
165 //
166 // ====== Part A: Handle service subscriptions ===
167 //
168 // Services are added here, removal only in destructor.
169
170 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
171 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
172 char *Token = strtok(I->getString(), "+-!@");
173 while (Token != NULL) {
174 AddService(string(Token)+"/SERVICE_LIST"); // 'add' also for '-' and '!'
175 Token = strtok(NULL, "|"); // Skip server IP address
176 Token = strtok(NULL, "@");
177 }
178 return;
179 }
180
181 // If service is SERVICE_LIST of any server, scan all services.
182 // Subscribe to all services (but not to commands and RPCs)
183 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
184
185 // Bug fix for empty SERVICE_LIST
186 if (strlen(I->getString()) == 0) {
187 string Tmp(I->getName());
188 RemoveService(I->getName());
189 AddService(Tmp.c_str());
190 return;
191 }
192
193 char *Name = strtok(I->getString(), "+-!|");
194 while (Name != NULL) {
195 // Check if item is a service
196 char *Type = strtok(NULL, "\n");
197 if (Type == NULL) return; // for safety, should not happen
198 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) AddService(Name); // 'add' also for '-' and '!'
199 Name = strtok(NULL, "|");
200 }
201
202 // Update service subscription list
203 stringstream Stream;
204
205 for (unsigned int i=0; i<List.size(); i++) Stream << List[i].DataItem->getName() << '|';
206
207 SubscriptionList = Stream.str();
208 SubscriptionService->updateService((void *) SubscriptionList.c_str(), SubscriptionList.size()+1); // Note that Subscription is a static variable
209
210 return;
211 }
212
213 //
214 // ====== Part B: Handle opening of data files ===
215 //
216
217 // If it is time to open new data file, close the current one
218 if (time(NULL) >= TimeForNextFile) {
219 if (DataFile != NULL && fclose(DataFile) != 0) {
220 Message(ERROR, "Error: Could not close data file (%s)", strerror(errno));
221 }
222 DataFile = NULL;
223 }
224
225 // Open new data file if necessary
226 if (DataFile == NULL) {
227 time_t Time = time(NULL);
228 struct tm *T = localtime(&Time);
229
230 // Get time structure with date rollover
231 if (T->tm_hour >= atoi(GetConfig("rollover").c_str())) T->tm_mday++;
232 if (mktime(T) == -1) Message(ERROR, "mktime() failed, check filename");
233
234 // Create direcory if not existing (ignore error if already existing)
235 char *Dir;
236 if (asprintf(&Dir, "%s/%d", GetConfig("basedir").c_str(), T->tm_year+1900) == -1) {
237 Message(FATAL, "asprintf() failed, could not create direcory name");
238 }
239 if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) {
240 Message(FATAL, "Could not create directory '%s' (%s)", Dir, strerror(errno));
241 }
242
243 // Create filename
244 free(Filename);
245 if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) {
246 Message(FATAL, "asprintf() failed, could not create filename");
247 }
248 free(Dir);
249
250 // Open file
251 if ((DataFile = fopen(Filename, "a")) == NULL) {
252 Message(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
253 }
254 else Message(INFO, "Opened data file '%s'", Filename);
255 DataFilename->updateService(Filename);
256
257 // Calculate time for next file opening
258 T->tm_sec = 0;
259 T->tm_min = 0;
260 T->tm_hour = atoi(GetConfig("rollover").c_str());
261 TimeForNextFile = mktime(T);
262 }
263
264 //
265 // ====== Part C: Handle writing to data file ===
266 //
267
268 // Identify index of service
269 for (int Service=0; Service<List.size(); Service++) if (I == List[Service].DataItem) {
270
271 // Service excluded from writing or contains no data?
272 if (List[Service].Exclude || I->getSize()==0) return;
273
274 // Write data header
275 time_t RawTime = I->getTimestamp();
276 struct tm *TM = localtime(&RawTime);
277
278 fprintf(DataFile, "%s %d %d %d %d %d %d %d %d %d ", I->getName(), I->getQuality(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, I->getTimestampMillisecs(), I->getTimestamp());
279
280 // Translate data into ASCII
281 string Text = EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize());
282
283 if (!Text.empty()) {
284 // Replace new line by '\' and all other control characters by white space
285 for (int i=0; i<Text.size(); i++) {
286 if (Text[i] == '\n') Text[i] = '\\';
287 else if (iscntrl(Text[i])) Text[i] = ' ';
288 }
289 // Write to file
290 fprintf(DataFile, "%s\n", Text.c_str());
291 }
292 else fprintf(DataFile, "Cannot interpret service format\n");
293
294 // Terminate if error because otherwise infinite loop might result as
295 // next call to this infoHandler() will try to (re-)open file
296 if(ferror(DataFile) != 0) {
297 if (fclose(DataFile) != 0) Message(ERROR, "Error closing data file (%s)", strerror(errno));
298 DataFile = NULL;
299 Message(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
300 }
301
302 // Update datafile size service (not every time to avoid infinite loop)
303 if (time(NULL) - DataSizeLastUpdate > max(atoi(GetConfig("sizeupdate").c_str()), 1)) {
304 fflush(DataFile); // not continuously to reduce load
305
306 DataSizeMB = FileSize(DataFile)/1024.0/1024.0;
307 DataSizeService->updateService();
308 DataSizeLastUpdate = time(NULL);
309 }
310 } // Check for disk writing
311}
312
313//
314// Implementation of log writing
315//
316void DataHandler::commandHandler() {
317
318 // Translate text safely to string
319 string Text = ToString((char *) "C", getCommand()->getData(), getCommand()->getSize());
320
321 // Safety check
322 if (getCommand() != LogCommand || LogFile == NULL || Text.empty()) return;
323
324 // Replace all carriage-return by line feed and non-printable characters
325 for (unsigned int i=0; i<Text.size(); i++) {
326 if (Text[i] == '\r') Text[i] = '\n';
327 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
328 }
329
330 time_t RawTime = time(NULL);
331 struct tm *TM = localtime(&RawTime);
332
333 fprintf(LogFile, "%.2d/%.2d/%4d %.2d:%.2d:%2.d %s (ID %d): %s\n",
334 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
335 TM->tm_hour, TM->tm_min, TM->tm_sec, getClientName(), getClientId(), Text.c_str());
336 fflush(LogFile);
337
338 // If error close file (otherwise infinite loop because Message() also writes to log)
339 if(ferror(LogFile)) {
340 fclose(LogFile);
341 LogFile = NULL;
342 Message(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
343 }
344
345 // Update logfile size service (not every time to avoid infinite loop)
346 if (time(NULL) - LogSizeLastUpdate > atoi(GetConfig("sizeupdate").c_str())) {
347 LogSizeMB = FileSize(LogFile)/1024.0/1024.0;
348 LogSizeService->updateService();
349 LogSizeLastUpdate = time(NULL);
350 }
351}
352
353
354//
355// Add service to watch list
356//
357void DataHandler::AddService(string Name) {
358
359 struct Item New;
360
361 // Check if already subscribed to this service
362 for (int i=0; i<List.size(); i++) {
363 if (Name == List[i].DataItem->getName()) return;
364 }
365
366 // Should service be ignored?
367 New.Exclude = false;
368 for (int i=0; i<RegEx.size(); i++) {
369 if (regexec(&RegEx[i], Name.c_str(), (size_t) 0, NULL, 0) == 0) New.Exclude = true;
370 }
371
372 // Subscribe to service
373 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
374
375 List.push_back(New);
376}
377
378
379//
380// Remove service from watch list
381//
382void DataHandler::RemoveService(string Name) {
383
384 // Find service index
385 vector<struct Item>::iterator E;
386 for (E=List.begin(); E<List.end(); ++E) if (Name == (*E).DataItem->getName()) {
387 delete (*E).DataItem;
388 List.erase(E);
389 }
390}
391
392//
393// Determine size of file in kB
394//
395off_t DataHandler::FileSize(FILE *File) {
396
397 struct stat FileStatus;
398
399 if (fstat(fileno(File), &FileStatus) == -1) {
400 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
401 return -1;
402 }
403 return FileStatus.st_size;
404}
405
406
407//
408// Main program
409//
410int main() {
411
412 dic_disable_padding();
413 dis_disable_padding();
414
415 // Static ensures calling of destructor by exit()
416 static DataHandler DataInstance;
417
418 // Sleep until exit requested
419 while (!DataInstance.ExitRequest) pause();
420}
Note: See TracBrowser for help on using the repository browser.