source: fact/Evidence/History.cc@ 16323

Last change on this file since 16323 was 12940, checked in by ogrimm, 13 years ago
History handling in Edd faster, improved drag and drop functionality, version of Edd for La Palma
File size: 13.6 KB
Line 
1/********************************************************************\
2
3 History server of the Evidence Control System
4
5 - Subscribes to all services and keeps a ring buffer for each service.
6 - Data added to the buffer only if changed by minimum amount given by
7 configuration information.
8 - The history is available via an rpc call.
9 - The history buffers are written to disk at program termination and
10 are tried to be read when adding a service.
11 - The buffer should hold at least REQUEST_NUM entries (of the size currently
12 handled in infoHandler()), but should not be larger than MAX_SIZE_KB.
13 Oliver Grimm, June 2010
14
15\********************************************************************/
16
17#define SERVER_NAME "History"
18#include "Evidence.h"
19
20#include <string>
21#include <sstream>
22#include <map>
23#include <math.h>
24#include <float.h>
25#include <limits>
26#include <sys/stat.h>
27#include <sys/time.h>
28
29using namespace std;
30
31const int MIN_SIZE_KB = 50; // Min and max buffersize in kByte (> 3*sizeof(int) !)
32const string DEFAULT_MAX_SIZE_KB = "2000";
33const string DEFAULT_NUM_ENTRIES = "1000"; // Number of entries in each history buffer
34const double MIN_SAVE_PERDIOD = 0.5; // Minimum period between saving history buffers in hours
35
36//
37// Class declaration
38//
39class History: public DimRpc, public DimClient, public EvidenceServer {
40
41 struct Item {
42 DimStampedInfo *DataItem;
43 vector <char> Buffer;
44 int Next;
45 double LastValue;
46 double MinAbsChange;
47 pair <unsigned int, unsigned int> Range;
48 string Format;
49 time_t LastSave;
50 };
51 map<string, struct Item> Map;
52
53 DimInfo *ServerList;
54 DimService *Service;
55 char *Directory;
56
57 void infoHandler();
58 void rpcHandler();
59 void AddService(string, const char *);
60 void RemoveService(string);
61 void SaveHistory(string);
62 off_t FileSize(FILE *);
63 FILE *OpenFile(string, const char *);
64
65 public:
66 History(char *);
67 ~History();
68};
69
70
71// Constructor
72History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"),
73 EvidenceServer(SERVER_NAME),
74 Directory(Dir) {
75 // Get/initialize configuration
76 GetConfig("minchange", " ");
77 GetConfig("maxsize_kb", DEFAULT_MAX_SIZE_KB);
78 GetConfig("numentries", DEFAULT_NUM_ENTRIES);
79 GetConfig("saveperiod", "1");
80 GetConfig("exclude", "");
81
82 // Create services for information about subscribed services
83 Service = new DimService(SERVER_NAME "/Subscriptions", "C", NULL, 0);
84
85 // Subscribe to top-level server list
86 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
87}
88
89
90// Destructor deletes all DIM subscriptions
91History::~History() {
92
93 delete ServerList;
94 while (Map.size() != 0) RemoveService((*Map.begin()).first);
95
96 delete Service;
97}
98
99
100// Implementation of data handling
101void History::infoHandler() {
102
103 static string List;
104
105 DimInfo *I = getInfo();
106
107 // Check if service available
108 if (!ServiceOK(I)) return;
109
110 // ====== Part A: Handle service subscriptions ===
111
112 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
113 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
114 char *Token = strtok(I->getString(), "+-!@");
115 while (Token != NULL) {
116 AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!'
117 Token = strtok(NULL, "|"); // Skip server IP address
118 Token = strtok(NULL, "@");
119 }
120 return;
121 }
122
123 // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services
124 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
125
126 // Bug fix for empty SERVICE_LIST
127 if (strlen(I->getString()) == 0) {
128 string Tmp(I->getName());
129 RemoveService(I->getName());
130 AddService(Tmp.c_str(), (char *) "C");
131 return;
132 }
133
134 char *Type, *Name = strtok(I->getString(), "+-!|");
135 while (Name != NULL) {
136 // Only consider DIM services (not commands and RPCs)
137 if (((Type = strtok(NULL, "\n")) != NULL) &&
138 (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) {
139 if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name);
140 else {
141 Type[strlen(Type)-1] = '\0'; // Isolate service format
142 AddService(Name, Type);
143 }
144 }
145 Name = strtok(NULL, "|");
146 }
147
148 // Update service subscription list
149 static stringstream Stream;
150
151 for (map<string, struct Item>::const_iterator i=Map.begin(); i!=Map.end(); i++) {
152 Stream << i->first << ':' << i->second.MinAbsChange << '|';
153 }
154 List = Stream.str();
155 Service->updateService((void *) List.c_str(), List.size()+1);
156
157 return;
158 }
159
160 // ====== Part B: Handle history service ===
161
162 char *Service = I->getName();
163
164 // Check if service known and ignore empty or illegal time stamped service
165 if (Map.count(Service) == 0 || I->getSize()==0 || I->getTimestamp()<=0) return;
166
167 // Save history buffers periodically (in case of program crash)
168 if (time(NULL)-Map[Service].LastSave > max(atof(GetConfig("saveperiod").c_str()),MIN_SAVE_PERDIOD)*3600) {
169 Map[Service].LastSave = time(NULL);
170 SaveHistory(Service);
171 }
172
173 // Resize buffer if necessary
174 int NEntries = atoi(GetConfig("numentries").c_str());
175 if (Map[Service].Buffer.size() < NEntries*I->getSize()) {
176 if (NEntries*I->getSize() < atoi(GetConfig("maxsize_kb").c_str())*1024) {
177 Map[Service].Buffer.resize(NEntries*I->getSize());
178 }
179 }
180
181 // Check minumum change before adding to history
182 if (Map[Service].MinAbsChange != 0) {
183 double Sum = 0;
184 unsigned int Count = 0;
185
186 // Add values within given range
187 vector<string> Values = Tokenize(EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize()));
188
189 for (unsigned int i=0; i<Values.size(); i++) {
190 if ((i < Map[Service].Range.first) || (i > Map[Service].Range.second)) continue;
191
192 Sum += atof(Values[i].c_str());
193 Count++;
194 }
195
196 // Minimum change of average value?
197 if (Count>0 && (fabs(Sum-Map[Service].LastValue)/Count < fabs(Map[Service].MinAbsChange))) return;
198 Map[Service].LastValue = Sum;
199 }
200
201 // Check if data fits into buffer
202 if (Map[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
203
204 int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = Map[Service].Next;
205 void *WrapPos = NULL;
206 char *Buffer = &Map[Service].Buffer[0];
207 int Oldest = *(int *) Buffer;
208
209 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
210 if (Next + Size >= Map[Service].Buffer.size()) {
211 WrapPos = Buffer + Next;
212 Next = sizeof(int);
213 }
214
215 // Adapt pointer to oldest entry
216 while ((Oldest < Next + Size) &&
217 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
218 // Check for wrap-around
219 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
220 Oldest = sizeof(int);
221 continue;
222 }
223 // Check if end marker reached, then only one event fits buffer
224 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
225 Oldest = Next;
226 break;
227 }
228 // Move to next entry
229 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
230 }
231 // Update pointer in buffer
232 *(int *) Buffer = Oldest;
233
234 // Write wrap mark if necessary
235 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
236
237 // Copy data into ring buffer
238 *((int *) (Buffer + Next)) = I->getTimestamp();
239 *((int *) (Buffer + Next + sizeof(int))) = I->getSize();
240 memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize());
241
242 // Adjust pointer for next entry and write end marker to buffer
243 Next += I->getSize() + sizeof(EvidenceHistory::Item);
244 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
245
246 Map[Service].Next = Next;
247}
248
249
250// Implementation of history buffer distribution
251void History::rpcHandler() {
252
253 string Name = ToString((char *) "C", getData(), getSize());
254
255 // Search for history buffer in memory
256 if (Map.count(Name) == 1) {
257 char *Buffer = new char [Map[Name].Format.size()+1+Map[Name].Buffer.size()];
258 strcpy(Buffer, Map[Name].Format.c_str());
259 memcpy(Buffer+Map[Name].Format.size()+1, &Map[Name].Buffer[0], Map[Name].Buffer.size());
260 setData((void *) Buffer, Map[Name].Format.size()+1+Map[Name].Buffer.size());
261 delete[] Buffer;
262 return;
263 }
264
265 // Try to open history file if not found in memory
266 FILE *File = OpenFile(Name, "rb");
267 if (File == NULL) {
268 setData(NULL, 0);
269 return;
270 }
271
272 // Read history file and send to client (data will contain format string and history)
273 off_t Size = FileSize(File);
274 if (Size != -1) {
275 char *Buffer = new char [Size-sizeof(int)];
276 fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer
277 if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) {
278 Message(WARN, "Error reading history file '%s' in rpcHandler()", Name.c_str());
279 setData(NULL, 0); // Default response
280 }
281 else setData((void *) Buffer, Size);
282 delete[] Buffer;
283 }
284
285 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", Name.c_str());
286}
287
288
289//
290// Add service to watch list
291//
292void History::AddService(string Name, const char *Format) {
293
294 // Return if already subscribed to this service or if excluded
295 if (Map.count(Name) != 0 || GetConfig("exclude").find(Name) != string::npos) return;
296
297 // Create new service subscription
298 Map[Name].LastValue = DBL_MAX;
299 Map[Name].Format = Format;
300 Map[Name].MinAbsChange = 0.0;
301 Map[Name].Range = pair <unsigned int, unsigned int> (0, numeric_limits<unsigned int>::max());
302 Map[Name].LastSave = 0.0;
303
304 // Set minimum required change if given in configuratrion
305 string Change = GetConfig("minchange");
306 size_t Pos = Change.find(Name+":");
307
308 if (Pos != string::npos) {
309 vector<string> Parts = Tokenize(Change.substr(Pos, Change.find(' ', Pos)-Pos), ":");
310
311 // Check if index range is given as well
312 if (Parts.size() == 2) Map[Name].MinAbsChange = atof(Parts[1].c_str());
313 else if (Parts.size() == 3) {
314 Map[Name].MinAbsChange = atof(Parts[2].c_str());
315
316 vector<string> Range = Tokenize(Parts[1], "-");
317 if (Range.size() == 1) Map[Name].Range = pair <unsigned int, unsigned int> (atoi(Range[0].c_str()), atoi(Range[0].c_str()));
318 else if (Range.size() == 2) Map[Name].Range = pair <unsigned int, unsigned int> (atoi(Range[0].c_str()), atoi(Range[1].c_str()));
319 }
320 }
321
322 SendToLog("Subscribing to service '%s': MinAbsChange %f, Index range %u-%u\n", Name.c_str(), Map[Name].MinAbsChange, Map[Name].Range.first, Map[Name].Range.second);
323
324 // Load history buffer from file if existing
325 FILE *File = OpenFile(Name, "rb");
326 off_t Size;
327
328 if (File != NULL && (Size = FileSize(File)) != -1) {
329
330 // If current buffer too small, resize
331 if (Size > Map[Name].Buffer.size()) Map[Name].Buffer.resize(Size);
332
333 // Read next pointer
334 fread(&Map[Name].Next, sizeof(Map[Name].Next), 1, File);
335 // Skip format string
336 while (fgetc(File) != 0 && feof(File) == 0) {}
337 // Read buffer
338 fread(&Map[Name].Buffer[0], sizeof(char), Size, File);
339
340 if (ferror(File) != 0) {
341 Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
342 Map[Name].Buffer.clear();
343 }
344 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
345 }
346
347 // If no buffer loaded, allocate empty buffer
348 if (Map[Name].Buffer.empty()) {
349 Map[Name].Buffer.resize(MIN_SIZE_KB*1024);
350 memset(&Map[Name].Buffer[0], 0, Map[Name].Buffer.size());
351 *(int *) &Map[Name].Buffer[0] = 4;
352 Map[Name].Next = 4;
353 }
354
355 // Subscribe to service
356 Map[Name].DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
357}
358
359
360//
361// Remove service from watch list
362//
363void History::RemoveService(string Name) {
364
365 // Check if actually subscribed to service
366 if (Map.count(Name) == 0) return;
367
368 SendToLog("Unsubscribing service '%s'", Name.c_str());
369
370 // Delete subscription first so handler and not called anymore
371 delete Map[Name].DataItem;
372
373 // Save history buffer
374 SaveHistory(Name);
375
376 Map.erase(Name);
377}
378
379
380//
381// Save history buffer to file
382//
383void History::SaveHistory(string Name) {
384
385 FILE *File = OpenFile(Name, "wb");
386
387 if (File != NULL) {
388 fwrite(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Next pointer
389 fwrite(Map[Name].Format.c_str(), Map[Name].Format.size()+1, 1, File); // Format
390 fwrite(&Map[Name].Buffer[0], sizeof(char), Map[Name].Buffer.size(), File); // Buffer
391
392 // If error, try to delete (possibly erroneous) file
393 if (ferror(File) != 0) {
394 if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s', could also not delete file", Name.c_str());
395 else Message(WARN, "Error writing history file '%s', deleted file", Name.c_str());
396 }
397 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s'", Name.c_str());
398 }
399 else Message(WARN, "Could not open history file '%s' for writing", Name.c_str());;
400}
401
402
403//
404// Determine size of file in kB
405//
406off_t History::FileSize(FILE *File) {
407
408 struct stat FileStatus;
409
410 if (fstat(fileno(File), &FileStatus) == -1) {
411 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
412 return -1;
413 }
414
415 return FileStatus.st_size;
416}
417
418//
419// Open file for service history
420//
421FILE *History::OpenFile(string Service, const char *Mode) {
422
423 // Create directory if not yet existing
424 if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
425
426 // Replace all '/' and non-graphical characters by '_' in string and open file
427 for (int i=0; i<Service.size(); i++) {
428 if (Service[i] == '/') Service[i] = '_';
429 if (isgraph(Service[i]) == 0) Service[i] = '_';
430 }
431
432 return fopen((string(Directory) + "/" + Service).c_str(), Mode);
433}
434
435//
436// Main program
437//
438int main(int argc, char *argv[]) {
439
440 if (argc != 2) {
441 printf("Usage: %s <History-Directory>\n", argv[0]);
442 exit(EXIT_FAILURE);
443 }
444
445 dic_disable_padding();
446 dis_disable_padding();
447
448 // Static ensures calling of destructor by exit()
449 static History Hist(argv[1]);
450
451 // Sleep until signal caught
452 while (!Hist.ExitRequest) pause();
453}
Note: See TracBrowser for help on using the repository browser.