source: fact/Evidence/History.cc@ 17120

Last change on this file since 17120 was 17017, checked in by ogrimm, 11 years ago
Fixed bug in History server caausing excessive memory allocation (signed instead of unsigned integers were compared)
File size: 13.6 KB
Line 
1/********************************************************************\
2
3 History server of the Evidence Control System
4
5 - Subscribes to all services and keeps a ring buffer for each service.
6 - Data added to the buffer only if changed by minimum amount given by
7 configuration information.
8 - The history is available via an rpc call.
9 - The history buffers are written to disk at program termination and
10 are tried to be read when adding a service.
11 - The buffer should hold at least REQUEST_NUM entries (of the size currently
12 handled in infoHandler()), but should not be larger than MAX_SIZE_KB.
13 Oliver Grimm, June 2010
14
15\********************************************************************/
16
17#define SERVER_NAME "History"
18#include "Evidence.h"
19
20#include <string>
21#include <sstream>
22#include <map>
23#include <math.h>
24#include <float.h>
25#include <limits>
26#include <sys/stat.h>
27#include <sys/time.h>
28
29using namespace std;
30
31const int MIN_SIZE_KB = 50; // Min and max buffersize in kByte (> 3*sizeof(int) !)
32const string DEFAULT_MAX_SIZE_KB = "2000";
33const string DEFAULT_NUM_ENTRIES = "1000"; // Number of entries in each history buffer
34const double MIN_SAVE_PERDIOD = 0.5; // Minimum period between saving history buffers in hours
35
36//
37// Class declaration
38//
39class History: public DimRpc, public DimClient, public EvidenceServer {
40
41 struct Item {
42 DimStampedInfo *DataItem;
43 vector <char> Buffer;
44 int Next;
45 double LastValue;
46 double MinAbsChange;
47 pair <unsigned int, unsigned int> Range;
48 string Format;
49 time_t LastSave;
50 };
51 map<string, struct Item> Map;
52
53 DimInfo *ServerList;
54 DimService *Service;
55 char *Directory;
56
57 void infoHandler();
58 void rpcHandler();
59 void AddService(string, const char *);
60 void RemoveService(string);
61 void SaveHistory(string);
62 off_t FileSize(FILE *);
63 FILE *OpenFile(string, const char *);
64
65 public:
66 History(char *);
67 ~History();
68};
69
70
71// Constructor
72History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"),
73 EvidenceServer(SERVER_NAME),
74 Directory(Dir) {
75 // Get/initialize configuration
76 GetConfig("minchange", " ");
77 GetConfig("maxsize_kb", DEFAULT_MAX_SIZE_KB);
78 GetConfig("numentries", DEFAULT_NUM_ENTRIES);
79 GetConfig("saveperiod", "1");
80 GetConfig("exclude", "");
81
82 // Create services for information about subscribed services
83 Service = new DimService(SERVER_NAME "/Subscriptions", "C", NULL, 0);
84
85 // Subscribe to top-level server list
86 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
87}
88
89
90// Destructor deletes all DIM subscriptions
91History::~History() {
92
93 delete ServerList;
94 while (Map.size() != 0) RemoveService((*Map.begin()).first);
95
96 delete Service;
97}
98
99
100// Implementation of data handling
101void History::infoHandler() {
102
103 static string List;
104
105 DimInfo *I = getInfo();
106
107 // Check if service available
108 if (!ServiceOK(I)) return;
109
110 // ====== Part A: Handle service subscriptions ===
111
112 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
113 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
114 char *Token = strtok(I->getString(), "+-!@");
115 while (Token != NULL) {
116 AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!'
117 Token = strtok(NULL, "|"); // Skip server IP address
118 Token = strtok(NULL, "@");
119 }
120 return;
121 }
122
123 // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services
124 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
125
126 // Bug fix for empty SERVICE_LIST
127 if (strlen(I->getString()) == 0) {
128 string Tmp(I->getName());
129 RemoveService(I->getName());
130 AddService(Tmp.c_str(), (char *) "C");
131 return;
132 }
133
134 char *Type, *Name = strtok(I->getString(), "+-!|");
135 while (Name != NULL) {
136 // Only consider DIM services (not commands and RPCs)
137 if (((Type = strtok(NULL, "\n")) != NULL) &&
138 (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) {
139 if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name);
140 else {
141 Type[strlen(Type)-1] = '\0'; // Isolate service format
142 AddService(Name, Type);
143 }
144 }
145 Name = strtok(NULL, "|");
146 }
147
148 // Update service subscription list
149 static stringstream Stream;
150
151 for (map<string, struct Item>::const_iterator i=Map.begin(); i!=Map.end(); i++) {
152 Stream << i->first << ':' << i->second.MinAbsChange << '|';
153 }
154 List = Stream.str();
155 Service->updateService((void *) List.c_str(), List.size()+1);
156
157 return;
158 }
159
160 // ====== Part B: Handle history service ===
161
162 char *Service = I->getName();
163
164 // Check if service known and ignore empty or illegal time stamped service
165 if (Map.count(Service) == 0 || I->getSize()==0 || I->getTimestamp()<=0) return;
166
167 // Save history buffers periodically (in case of program crash)
168 if (time(NULL)-Map[Service].LastSave > max(atof(GetConfig("saveperiod").c_str()),MIN_SAVE_PERDIOD)*3600) {
169 Map[Service].LastSave = time(NULL);
170 SaveHistory(Service);
171 }
172
173 // Resize buffer if necessary
174 unsigned long TargetSize = atoi(GetConfig("numentries").c_str()) * I->getSize();
175
176 if (Map[Service].Buffer.size() < TargetSize && TargetSize < atoi(GetConfig("maxsize_kb").c_str())*1024) {
177 Map[Service].Buffer.resize(TargetSize);
178 }
179
180 // Check minumum change before adding to history
181 if (Map[Service].MinAbsChange != 0) {
182 double Sum = 0;
183 unsigned int Count = 0;
184
185 // Add values within given range
186 vector<string> Values = Tokenize(EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize()));
187
188 for (unsigned int i=0; i<Values.size(); i++) {
189 if ((i < Map[Service].Range.first) || (i > Map[Service].Range.second)) continue;
190
191 Sum += atof(Values[i].c_str());
192 Count++;
193 }
194
195 // Minimum change of average value?
196 if (Count>0 && (fabs(Sum-Map[Service].LastValue)/Count < fabs(Map[Service].MinAbsChange))) return;
197 Map[Service].LastValue = Sum;
198 }
199
200 // Check if data fits into buffer
201 if (Map[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
202
203 int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = Map[Service].Next;
204 void *WrapPos = NULL;
205 char *Buffer = &Map[Service].Buffer[0];
206 int Oldest = *(int *) Buffer;
207
208 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
209 if (Next + Size >= Map[Service].Buffer.size()) {
210 WrapPos = Buffer + Next;
211 Next = sizeof(int);
212 }
213
214 // Adapt pointer to oldest entry
215 while ((Oldest < Next + Size) &&
216 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
217 // Check for wrap-around
218 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
219 Oldest = sizeof(int);
220 continue;
221 }
222 // Check if end marker reached, then only one event fits buffer
223 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
224 Oldest = Next;
225 break;
226 }
227 // Move to next entry
228 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
229 }
230 // Update pointer in buffer
231 *(int *) Buffer = Oldest;
232
233 // Write wrap mark if necessary
234 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
235
236 // Copy data into ring buffer
237 *((int *) (Buffer + Next)) = I->getTimestamp();
238 *((int *) (Buffer + Next + sizeof(int))) = I->getSize();
239 memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize());
240
241 // Adjust pointer for next entry and write end marker to buffer
242 Next += I->getSize() + sizeof(EvidenceHistory::Item);
243 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
244
245 Map[Service].Next = Next;
246}
247
248
249// Implementation of history buffer distribution
250void History::rpcHandler() {
251
252 string Name = ToString((char *) "C", getData(), getSize());
253
254 // Search for history buffer in memory
255 if (Map.count(Name) == 1) {
256 char *Buffer = new char [Map[Name].Format.size()+1+Map[Name].Buffer.size()];
257 strcpy(Buffer, Map[Name].Format.c_str());
258 memcpy(Buffer+Map[Name].Format.size()+1, &Map[Name].Buffer[0], Map[Name].Buffer.size());
259 setData((void *) Buffer, Map[Name].Format.size()+1+Map[Name].Buffer.size());
260 delete[] Buffer;
261 return;
262 }
263
264 // Try to open history file if not found in memory
265 FILE *File = OpenFile(Name, "rb");
266 if (File == NULL) {
267 setData(NULL, 0);
268 return;
269 }
270
271 // Read history file and send to client (data will contain format string and history)
272 off_t Size = FileSize(File);
273 if (Size != -1) {
274 char *Buffer = new char [Size-sizeof(int)];
275 fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer
276 if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) {
277 Message(WARN, "Error reading history file '%s' in rpcHandler()", Name.c_str());
278 setData(NULL, 0); // Default response
279 }
280 else setData((void *) Buffer, Size);
281 delete[] Buffer;
282 }
283
284 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", Name.c_str());
285}
286
287
288//
289// Add service to watch list
290//
291void History::AddService(string Name, const char *Format) {
292
293 // Return if already subscribed to this service or if excluded
294 if (Map.count(Name) != 0 || GetConfig("exclude").find(Name) != string::npos) return;
295
296 // Create new service subscription
297 Map[Name].LastValue = DBL_MAX;
298 Map[Name].Format = Format;
299 Map[Name].MinAbsChange = 0.0;
300 Map[Name].Range = pair <unsigned int, unsigned int> (0, numeric_limits<unsigned int>::max());
301 Map[Name].LastSave = 0.0;
302
303 // Set minimum required change if given in configuratrion
304 string Change = GetConfig("minchange");
305 size_t Pos = Change.find(Name+":");
306
307 if (Pos != string::npos) {
308 vector<string> Parts = Tokenize(Change.substr(Pos, Change.find(' ', Pos)-Pos), ":");
309
310 // Check if index range is given as well
311 if (Parts.size() == 2) Map[Name].MinAbsChange = atof(Parts[1].c_str());
312 else if (Parts.size() == 3) {
313 Map[Name].MinAbsChange = atof(Parts[2].c_str());
314
315 vector<string> Range = Tokenize(Parts[1], "-");
316 if (Range.size() == 1) Map[Name].Range = pair <unsigned int, unsigned int> (atoi(Range[0].c_str()), atoi(Range[0].c_str()));
317 else if (Range.size() == 2) Map[Name].Range = pair <unsigned int, unsigned int> (atoi(Range[0].c_str()), atoi(Range[1].c_str()));
318 }
319 }
320
321 SendToLog("Subscribing to service '%s': MinAbsChange %f, Index range %u-%u\n", Name.c_str(), Map[Name].MinAbsChange, Map[Name].Range.first, Map[Name].Range.second);
322
323 // Load history buffer from file if existing
324 FILE *File = OpenFile(Name, "rb");
325 off_t Size;
326
327 if (File != NULL && (Size = FileSize(File)) != -1) {
328
329 // If current buffer too small, resize
330 if (Size > Map[Name].Buffer.size()) Map[Name].Buffer.resize(Size);
331
332 // Read next pointer
333 fread(&Map[Name].Next, sizeof(Map[Name].Next), 1, File);
334 // Skip format string
335 while (fgetc(File) != 0 && feof(File) == 0) {}
336 // Read buffer
337 fread(&Map[Name].Buffer[0], sizeof(char), Size, File);
338
339 if (ferror(File) != 0) {
340 Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
341 Map[Name].Buffer.clear();
342 }
343 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
344 }
345
346 // If no buffer loaded, allocate empty buffer
347 if (Map[Name].Buffer.empty()) {
348 Map[Name].Buffer.resize(MIN_SIZE_KB*1024);
349 memset(&Map[Name].Buffer[0], 0, Map[Name].Buffer.size());
350 *(int *) &Map[Name].Buffer[0] = 4;
351 Map[Name].Next = 4;
352 }
353
354 // Subscribe to service
355 Map[Name].DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
356}
357
358
359//
360// Remove service from watch list
361//
362void History::RemoveService(string Name) {
363
364 // Check if actually subscribed to service
365 if (Map.count(Name) == 0) return;
366
367 SendToLog("Unsubscribing service '%s'", Name.c_str());
368
369 // Delete subscription first so handler and not called anymore
370 delete Map[Name].DataItem;
371
372 // Save history buffer
373 SaveHistory(Name);
374
375 Map.erase(Name);
376}
377
378
379//
380// Save history buffer to file
381//
382void History::SaveHistory(string Name) {
383
384 FILE *File = OpenFile(Name, "wb");
385
386 if (File != NULL) {
387 fwrite(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Next pointer
388 fwrite(Map[Name].Format.c_str(), Map[Name].Format.size()+1, 1, File); // Format
389 fwrite(&Map[Name].Buffer[0], sizeof(char), Map[Name].Buffer.size(), File); // Buffer
390
391 // If error, try to delete (possibly erroneous) file
392 if (ferror(File) != 0) {
393 if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s', could also not delete file", Name.c_str());
394 else Message(WARN, "Error writing history file '%s', deleted file", Name.c_str());
395 }
396 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s'", Name.c_str());
397 }
398 else Message(WARN, "Could not open history file '%s' for writing", Name.c_str());;
399}
400
401
402//
403// Determine size of file in kB
404//
405off_t History::FileSize(FILE *File) {
406
407 struct stat FileStatus;
408
409 if (fstat(fileno(File), &FileStatus) == -1) {
410 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
411 return -1;
412 }
413
414 return FileStatus.st_size;
415}
416
417//
418// Open file for service history
419//
420FILE *History::OpenFile(string Service, const char *Mode) {
421
422 // Create directory if not yet existing
423 if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
424
425 // Replace all '/' and non-graphical characters by '_' in string and open file
426 for (int i=0; i<Service.size(); i++) {
427 if (Service[i] == '/') Service[i] = '_';
428 if (isgraph(Service[i]) == 0) Service[i] = '_';
429 }
430
431 return fopen((string(Directory) + "/" + Service).c_str(), Mode);
432}
433
434//
435// Main program
436//
437int main(int argc, char *argv[]) {
438
439 if (argc != 2) {
440 printf("Usage: %s <History-Directory>\n", argv[0]);
441 exit(EXIT_FAILURE);
442 }
443
444 dic_disable_padding();
445 dis_disable_padding();
446
447 // Static ensures calling of destructor by exit()
448 static History Hist(argv[1]);
449
450 // Sleep until signal caught
451 while (!Hist.ExitRequest) pause();
452}
Note: See TracBrowser for help on using the repository browser.