source: fact/Evidence/History.cc@ 18233

Last change on this file since 18233 was 17915, checked in by ogrimm, 10 years ago
Removed erroneous static declaration in History.cc that made a variable grow in size indefinitely. The data collector has new a service indicating its subscriptions (as the history server)
File size: 13.9 KB
Line 
1/********************************************************************\
2
3 History server of the Evidence Control System
4
5 - Subscribes to all services and keeps a ring buffer for each service.
6 - Data added to the buffer only if changed by minimum amount given by
7 configuration information.
8 - The history is available via an rpc call.
9 - The history buffers are written to disk at program termination and
10 are tried to be read when adding a service.
11 - The buffer should hold at least REQUEST_NUM entries (of the size currently
12 handled in infoHandler()), but should not be larger than MAX_SIZE_KB.
13 Oliver Grimm, June 2010
14
15\********************************************************************/
16
17#define SERVER_NAME "History"
18#include "Evidence.h"
19
20#include <string>
21#include <sstream>
22#include <map>
23#include <math.h>
24#include <float.h>
25#include <limits>
26#include <sys/stat.h>
27#include <sys/time.h>
28
29using namespace std;
30
31//
32// Class declaration
33//
34class History: public DimRpc, public DimClient, public EvidenceServer {
35
36 static const int MIN_SIZE_KB = 50; // Min and max buffersize in kByte (> 3*sizeof(int) !)
37 static const string DEFAULT_MAX_SIZE_KB;
38 static const string DEFAULT_NUM_ENTRIES; // Number of entries in each history buffer
39
40 struct Item {
41 DimStampedInfo *DataItem;
42 vector <char> Buffer;
43 int Next;
44 double LastValue;
45 double MinAbsChange;
46 pair <unsigned int, unsigned int> Range;
47 string Format;
48 time_t LastSave;
49 };
50 map<string, struct Item> Map;
51
52 DimInfo *ServerList;
53 DimService *SubscriptionService;
54 char *Directory;
55
56 void infoHandler();
57 void rpcHandler();
58 void AddService(string, const char *);
59 void RemoveService(string);
60 void SaveHistory(string);
61 off_t FileSize(FILE *);
62 FILE *OpenFile(string, const char *);
63
64 public:
65 History(char *);
66 ~History();
67};
68
69// Initialize non-integral constants (cannot be inside class declaration)
70const string History::DEFAULT_MAX_SIZE_KB = "2000";
71const string History::DEFAULT_NUM_ENTRIES = "1000";
72static const double MIN_SAVE_PERIOD = 0.5; // Minimum period between saving history buffers in hours
73
74
75// Constructor
76History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"),
77 EvidenceServer(SERVER_NAME),
78 Directory(Dir) {
79
80 // Get/initialize configuration
81 GetConfig("minchange", " ");
82 GetConfig("maxsize_kb", DEFAULT_MAX_SIZE_KB);
83 GetConfig("numentries", DEFAULT_NUM_ENTRIES);
84 GetConfig("saveperiod", "1");
85 GetConfig("exclude", "");
86
87 // Create services for information about subscribed services
88 SubscriptionService = new DimService(SERVER_NAME "/Subscriptions", "C", NULL, 0);
89
90 // Subscribe to top-level server list
91 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
92}
93
94
95// Destructor deletes all DIM subscriptions
96History::~History() {
97
98 delete ServerList;
99 while (Map.size() != 0) RemoveService((*Map.begin()).first);
100
101 delete SubscriptionService;
102}
103
104
105// Implementation of data handling
106void History::infoHandler() {
107
108 static string List;
109
110 DimInfo *I = getInfo();
111
112 // Check if service available
113 if (!ServiceOK(I)) return;
114
115 // ====== Part A: Handle service subscriptions ===
116
117 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
118 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
119 char *Token = strtok(I->getString(), "+-!@");
120 while (Token != NULL) {
121 AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!'
122 Token = strtok(NULL, "|"); // Skip server IP address
123 Token = strtok(NULL, "@");
124 }
125 return;
126 }
127
128 // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services
129 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
130
131 // Bug fix for empty SERVICE_LIST
132 if (strlen(I->getString()) == 0) {
133 string Tmp(I->getName());
134 RemoveService(I->getName());
135 AddService(Tmp.c_str(), (char *) "C");
136 return;
137 }
138
139 char *Type, *Name = strtok(I->getString(), "+-!|");
140 while (Name != NULL) {
141 // Only consider DIM services (not commands and RPCs)
142 if (((Type = strtok(NULL, "\n")) != NULL) &&
143 (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) {
144 if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name);
145 else {
146 Type[strlen(Type)-1] = '\0'; // Isolate service format
147 AddService(Name, Type);
148 }
149 }
150 Name = strtok(NULL, "|");
151 }
152
153 // Update service subscription list
154 stringstream Stream;
155
156 for (map<string, struct Item>::const_iterator i=Map.begin(); i!=Map.end(); i++) {
157 Stream << i->first << ':' << i->second.MinAbsChange << '|';
158 }
159 List = Stream.str();
160 SubscriptionService->updateService((void *) List.c_str(), List.size()+1); // Note that List is a static variable
161
162 return;
163 }
164
165 // ====== Part B: Handle history service ===
166
167 char *Service = I->getName();
168
169 // Check if service known and ignore empty or illegal time stamped service
170 if (Map.count(Service) == 0 || I->getSize()==0 || I->getTimestamp()<=0) return;
171
172 // Save history buffers periodically (in case of program crash)
173 if (time(NULL)-Map[Service].LastSave > max(atof(GetConfig("saveperiod").c_str()), MIN_SAVE_PERIOD)*3600) {
174 Map[Service].LastSave = time(NULL);
175 SaveHistory(Service);
176 }
177
178 // Resize buffer if necessary
179 unsigned long TargetSize = atoi(GetConfig("numentries").c_str()) * I->getSize();
180
181 if (Map[Service].Buffer.size() < TargetSize && TargetSize < atoi(GetConfig("maxsize_kb").c_str())*1024) {
182 Map[Service].Buffer.resize(TargetSize);
183 }
184
185 // Check minumum change before adding to history
186 if (Map[Service].MinAbsChange != 0) {
187 double Sum = 0;
188 unsigned int Count = 0;
189
190 // Add values within given range
191 vector<string> Values = Tokenize(EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize()));
192
193 for (unsigned int i=0; i<Values.size(); i++) {
194 if ((i < Map[Service].Range.first) || (i > Map[Service].Range.second)) continue;
195
196 Sum += atof(Values[i].c_str());
197 Count++;
198 }
199
200 // Minimum change of average value?
201 if (Count>0 && (fabs(Sum-Map[Service].LastValue)/Count < fabs(Map[Service].MinAbsChange))) return;
202 Map[Service].LastValue = Sum;
203 }
204
205 // Check if data fits into buffer
206 if (Map[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
207
208 int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = Map[Service].Next;
209 void *WrapPos = NULL;
210 char *Buffer = &Map[Service].Buffer[0];
211 int Oldest = *(int *) Buffer;
212
213 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
214 if (Next + Size >= Map[Service].Buffer.size()) {
215 WrapPos = Buffer + Next;
216 Next = sizeof(int);
217 }
218
219 // Adapt pointer to oldest entry
220 while ((Oldest < Next + Size) &&
221 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
222 // Check for wrap-around
223 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
224 Oldest = sizeof(int);
225 continue;
226 }
227 // Check if end marker reached, then only one event fits buffer
228 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
229 Oldest = Next;
230 break;
231 }
232 // Move to next entry
233 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
234 }
235 // Update pointer in buffer
236 *(int *) Buffer = Oldest;
237
238 // Write wrap mark if necessary
239 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
240
241 // Copy data into ring buffer
242 *((int *) (Buffer + Next)) = I->getTimestamp();
243 *((int *) (Buffer + Next + sizeof(int))) = I->getSize();
244 memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize());
245
246 // Adjust pointer for next entry and write end marker to buffer
247 Next += I->getSize() + sizeof(EvidenceHistory::Item);
248 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
249
250 Map[Service].Next = Next;
251}
252
253
254// Implementation of history buffer distribution
255void History::rpcHandler() {
256
257 string Name = ToString((char *) "C", getData(), getSize());
258
259 // Search for history buffer in memory
260 if (Map.count(Name) == 1) {
261 char *Buffer = new char [Map[Name].Format.size()+1+Map[Name].Buffer.size()];
262 strcpy(Buffer, Map[Name].Format.c_str());
263 memcpy(Buffer+Map[Name].Format.size()+1, &Map[Name].Buffer[0], Map[Name].Buffer.size());
264 setData((void *) Buffer, Map[Name].Format.size()+1+Map[Name].Buffer.size());
265 delete[] Buffer;
266 return;
267 }
268
269 // Try to open history file if not found in memory
270 FILE *File = OpenFile(Name, "rb");
271 if (File == NULL) {
272 setData(NULL, 0);
273 return;
274 }
275
276 // Read history file and send to client (data will contain format string and history)
277 off_t Size = FileSize(File);
278 if (Size != -1) {
279 char *Buffer = new char [Size-sizeof(int)];
280 fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer
281 if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) {
282 Message(WARN, "Error reading history file '%s' in rpcHandler()", Name.c_str());
283 setData(NULL, 0); // Default response
284 }
285 else setData((void *) Buffer, Size);
286 delete[] Buffer;
287 }
288
289 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", Name.c_str());
290}
291
292
293//
294// Add service to watch list
295//
296void History::AddService(string Name, const char *Format) {
297
298 // Return if already subscribed to this service or if excluded
299 if (Map.count(Name) != 0 || GetConfig("exclude").find(Name) != string::npos) return;
300
301 // Create new service subscription
302 Map[Name].LastValue = DBL_MAX;
303 Map[Name].Format = Format;
304 Map[Name].MinAbsChange = 0.0;
305 Map[Name].Range = pair <unsigned int, unsigned int> (0, numeric_limits<unsigned int>::max());
306 Map[Name].LastSave = 0.0;
307
308 // Set minimum required change if given in configuratrion
309 string Change = GetConfig("minchange");
310 size_t Pos = Change.find(Name+":");
311
312 if (Pos != string::npos) {
313 vector<string> Parts = Tokenize(Change.substr(Pos, Change.find(' ', Pos)-Pos), ":");
314
315 // Check if index range is given as well
316 if (Parts.size() == 2) Map[Name].MinAbsChange = atof(Parts[1].c_str());
317 else if (Parts.size() == 3) {
318 Map[Name].MinAbsChange = atof(Parts[2].c_str());
319
320 vector<string> Range = Tokenize(Parts[1], "-");
321 if (Range.size() == 1) Map[Name].Range = pair <unsigned int, unsigned int> (atoi(Range[0].c_str()), atoi(Range[0].c_str()));
322 else if (Range.size() == 2) Map[Name].Range = pair <unsigned int, unsigned int> (atoi(Range[0].c_str()), atoi(Range[1].c_str()));
323 }
324 }
325
326 SendToLog("Subscribing to service '%s': MinAbsChange %f, Index range %u-%u\n", Name.c_str(), Map[Name].MinAbsChange, Map[Name].Range.first, Map[Name].Range.second);
327
328 // Load history buffer from file if existing
329 FILE *File = OpenFile(Name, "rb");
330 off_t Size;
331
332 if (File != NULL && (Size = FileSize(File)) != -1) {
333
334 // If current buffer too small, resize
335 if (Size > Map[Name].Buffer.size()) Map[Name].Buffer.resize(Size);
336
337 // Read next pointer
338 fread(&Map[Name].Next, sizeof(Map[Name].Next), 1, File);
339 // Skip format string
340 while (fgetc(File) != 0 && feof(File) == 0) {}
341 // Read buffer
342 fread(&Map[Name].Buffer[0], sizeof(char), Size, File);
343
344 if (ferror(File) != 0) {
345 Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
346 Map[Name].Buffer.clear();
347 }
348 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
349 }
350
351 // If no buffer loaded, allocate empty buffer
352 if (Map[Name].Buffer.empty()) {
353 Map[Name].Buffer.resize(MIN_SIZE_KB*1024);
354 memset(&Map[Name].Buffer[0], 0, Map[Name].Buffer.size());
355 *(int *) &Map[Name].Buffer[0] = 4;
356 Map[Name].Next = 4;
357 }
358
359 // Subscribe to service
360 Map[Name].DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
361}
362
363
364//
365// Remove service from watch list
366//
367void History::RemoveService(string Name) {
368
369 // Check if actually subscribed to service
370 if (Map.count(Name) == 0) return;
371
372 SendToLog("Unsubscribing service '%s'", Name.c_str());
373
374 // Delete subscription first so handler and not called anymore
375 delete Map[Name].DataItem;
376
377 // Save history buffer
378 SaveHistory(Name);
379
380 Map.erase(Name);
381}
382
383
384//
385// Save history buffer to file
386//
387void History::SaveHistory(string Name) {
388
389 FILE *File = OpenFile(Name, "wb");
390
391 if (File != NULL) {
392 fwrite(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Next pointer
393 fwrite(Map[Name].Format.c_str(), Map[Name].Format.size()+1, 1, File); // Format
394 fwrite(&Map[Name].Buffer[0], sizeof(char), Map[Name].Buffer.size(), File); // Buffer
395
396 // If error, try to delete (possibly erroneous) file
397 if (ferror(File) != 0) {
398 if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s', could also not delete file", Name.c_str());
399 else Message(WARN, "Error writing history file '%s', deleted file", Name.c_str());
400 }
401 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s'", Name.c_str());
402 }
403 else Message(WARN, "Could not open history file '%s' for writing", Name.c_str());;
404}
405
406
407//
408// Determine size of file in kB
409//
410off_t History::FileSize(FILE *File) {
411
412 struct stat FileStatus;
413
414 if (fstat(fileno(File), &FileStatus) == -1) {
415 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
416 return -1;
417 }
418
419 return FileStatus.st_size;
420}
421
422//
423// Open file for service history
424//
425FILE *History::OpenFile(string Service, const char *Mode) {
426
427 // Create directory if not yet existing
428 if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
429
430 // Replace all '/' and non-graphical characters by '_' in string and open file
431 for (int i=0; i<Service.size(); i++) {
432 if (Service[i] == '/') Service[i] = '_';
433 if (isgraph(Service[i]) == 0) Service[i] = '_';
434 }
435
436 return fopen((string(Directory) + "/" + Service).c_str(), Mode);
437}
438
439//
440// Main program
441//
442int main(int argc, char *argv[]) {
443
444 if (argc != 2) {
445 printf("Usage: %s <History-Directory>\n", argv[0]);
446 exit(EXIT_FAILURE);
447 }
448
449 dic_disable_padding();
450 dis_disable_padding();
451
452 // Static ensures calling of destructor by exit()
453 static History Hist(argv[1]);
454
455 // Sleep until signal caught
456 while (!Hist.ExitRequest) pause();
457}
Note: See TracBrowser for help on using the repository browser.