source: fact/Evidence/History.cc@ 12379

Last change on this file since 12379 was 11088, checked in by ogrimm, 13 years ago
Updates to Edd. History server now periodically saves histories to disk.
File size: 12.0 KB
Line 
1/********************************************************************\
2
3 History server of the Evidence Control System
4
5 - Subscribes to all services and keeps a ring buffer for each service.
6 - Data added to the buffer only if changed by minimum amount given by
7 configuration information.
8 - The history is available via an rpc call.
9 - The history buffers are written to disk at program termination and
10 are tried to be read when adding a service.
11 - The buffer should hold at least REQUEST_NUM entries (of the size currently
12 handled in infoHandler()), but should not be larger than MAX_SIZE_KB.
13 Oliver Grimm, June 2010
14
15\********************************************************************/
16
17#define SERVER_NAME "History"
18#include "Evidence.h"
19
20#include <string>
21#include <sstream>
22#include <map>
23#include <math.h>
24#include <float.h>
25#include <sys/stat.h>
26#include <sys/time.h>
27
28using namespace std;
29
30const int MIN_SIZE_KB = 50; // Min and max buffersize in kByte (> 3*sizeof(int) !)
31const string DEFAULT_MAX_SIZE_KB = "2000";
32const string DEFAULT_NUM_ENTRIES = "1000"; // Number of entries in each history buffer
33const double MIN_SAVE_PERDIOD = 0.5; // Minimum period between saving history buffers in hours
34
35//
36// Class declaration
37//
38class History: public DimRpc, public DimClient, public EvidenceServer {
39
40 struct Item {
41 DimStampedInfo *DataItem;
42 vector <char> Buffer;
43 int Next;
44 double LastValue;
45 double MinAbsChange;
46 string Format;
47 time_t LastSave;
48 };
49 map<string, struct Item> Map;
50
51 DimInfo *ServerList;
52 char *Directory;
53 string Change;
54
55 void infoHandler();
56 void rpcHandler();
57 void AddService(string, const char *);
58 void RemoveService(string);
59 void SaveHistory(string);
60 off_t FileSize(FILE *);
61 FILE *OpenFile(string, const char *);
62
63 public:
64 History(char *);
65 ~History();
66};
67
68
69// Constructor
70History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"),
71 EvidenceServer(SERVER_NAME),
72 Directory(Dir) {
73
74 // Get/initialize configuration
75 Change = GetConfig("minchange", " ");
76 GetConfig("maxsize_kb", DEFAULT_MAX_SIZE_KB);
77 GetConfig("numentries", DEFAULT_NUM_ENTRIES);
78 GetConfig("saveperiod", "1");
79 GetConfig("exclude", "");
80
81 // Subscribe to top-level server list
82 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
83}
84
85
86// Destructor deletes all DIM subscriptions
87History::~History() {
88
89 delete ServerList;
90 while (Map.size() != 0) RemoveService((*Map.begin()).first);
91}
92
93
94// Implementation of data handling
95void History::infoHandler() {
96
97 DimInfo *I = getInfo();
98
99 // Check if service available
100 if (!ServiceOK(I)) return;
101
102 // ====== Part A: Handle service subscriptions ===
103
104 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
105 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
106 char *Token = strtok(I->getString(), "+-!@");
107 while (Token != NULL) {
108 AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!'
109 Token = strtok(NULL, "|"); // Skip server IP address
110 Token = strtok(NULL, "@");
111 }
112 return;
113 }
114
115 // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services
116 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
117
118 // Bug fix for empty SERVICE_LIST
119 if (strlen(I->getString()) == 0) {
120 string Tmp(I->getName());
121 RemoveService(I->getName());
122 AddService(Tmp.c_str(), (char *) "C");
123 return;
124 }
125
126 char *Type, *Name = strtok(I->getString(), "+-!|");
127 while (Name != NULL) {
128 // Only consider DIM services (not commands and RPCs)
129 if (((Type = strtok(NULL, "\n")) != NULL) &&
130 (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) {
131 if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name);
132 else {
133 Type[strlen(Type)-1] = '\0'; // Isolate service format
134 AddService(Name, Type);
135 }
136 }
137 Name = strtok(NULL, "|");
138 }
139 return;
140 }
141
142 // ====== Part B: Handle history service ===
143
144 char *Service = I->getName();
145
146 // Check if service known and ignore empty or illegal time stamped service
147 if (Map.count(Service) == 0 || I->getSize()==0 || I->getTimestamp()<=0) return;
148
149 // Save history buffers periodically (in case of program crash)
150 if (time(NULL)-Map[Service].LastSave > max(atof(GetConfig("saveperiod").c_str()),MIN_SAVE_PERDIOD)*3600) {
151 Map[Service].LastSave = time(NULL);
152 SaveHistory(Service);
153 }
154
155 // Resize buffer if necessary
156 int NEntries = atoi(GetConfig("numentries").c_str());
157 if (Map[Service].Buffer.size() < NEntries*I->getSize()) {
158 if (NEntries*I->getSize() < atoi(GetConfig("maxsize_kb").c_str())*1024) {
159 Map[Service].Buffer.resize(NEntries*I->getSize());
160 }
161 }
162
163 // If data is number of single type, check minumum change before adding to history
164 if (strcmp(I->getFormat(), "C") != 0) {
165 // Calculate sum of all number in array
166 istringstream Text(EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize()));
167 double Num, Sum = 0;
168 while (Text.good()) {
169 Text >> Num;
170 Sum += fabs(Num);
171 }
172
173 // Minimum change?
174 if (fabs(Sum-Map[Service].LastValue) < fabs(Map[Service].MinAbsChange)) return;
175 Map[Service].LastValue = Sum;
176 }
177
178 // Check if data fits into buffer
179 if (Map[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
180
181 int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = Map[Service].Next;
182 void *WrapPos = NULL;
183 char *Buffer = &Map[Service].Buffer[0];
184 int Oldest = *(int *) Buffer;
185
186 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
187 if (Next + Size >= Map[Service].Buffer.size()) {
188 WrapPos = Buffer + Next;
189 Next = sizeof(int);
190 }
191
192 // Adapt pointer to oldest entry
193 while ((Oldest < Next + Size) &&
194 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
195 // Check for wrap-around
196 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
197 Oldest = sizeof(int);
198 continue;
199 }
200 // Check if end marker reached, then only one event fits buffer
201 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
202 Oldest = Next;
203 break;
204 }
205 // Move to next entry
206 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
207 }
208 // Update pointer in buffer
209 *(int *) Buffer = Oldest;
210
211 // Write wrap mark if necessary
212 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
213
214 // Copy data into ring buffer
215 *((int *) (Buffer + Next)) = I->getTimestamp();
216 *((int *) (Buffer + Next + sizeof(int))) = I->getSize();
217 memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize());
218
219 // Adjust pointer for next entry and write end marker to buffer
220 Next += I->getSize() + sizeof(EvidenceHistory::Item);
221 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
222
223 Map[Service].Next = Next;
224}
225
226
227// Implementation of history buffer distribution
228void History::rpcHandler() {
229
230 string Name = ToString((char *) "C", getData(), getSize());
231
232 // Search for history buffer in memory
233 if (Map.count(Name) == 1) {
234 char *Buffer = new char [Map[Name].Format.size()+1+Map[Name].Buffer.size()];
235 strcpy(Buffer, Map[Name].Format.c_str());
236 memcpy(Buffer+Map[Name].Format.size()+1, &Map[Name].Buffer[0], Map[Name].Buffer.size());
237 setData((void *) Buffer, Map[Name].Format.size()+1+Map[Name].Buffer.size());
238 delete[] Buffer;
239 return;
240 }
241
242 // Try to open history file if not found in memory
243 FILE *File = OpenFile(Name, "rb");
244 if (File == NULL) {
245 setData(NULL, 0);
246 return;
247 }
248
249 // Read history file and send to client (data will contain format string and history)
250 off_t Size = FileSize(File);
251 if (Size != -1) {
252 char *Buffer = new char [Size-sizeof(int)];
253 fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer
254 if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) {
255 Message(WARN, "Error reading history file '%s' in rpcHandler()", Name.c_str());
256 setData(NULL, 0); // Default response
257 }
258 else setData((void *) Buffer, Size);
259 delete[] Buffer;
260 }
261
262 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", Name.c_str());
263}
264
265
266//
267// Add service to watch list
268//
269void History::AddService(string Name, const char *Format) {
270
271 // Return if already subscribed to this service or if excluded
272 if (Map.count(Name) != 0 || GetConfig("exclude").find(Name) != string::npos) return;
273
274 // Create new service subscription
275 Map[Name].LastValue = DBL_MAX;
276 Map[Name].Format = Format;
277 Map[Name].MinAbsChange = 0.0;
278 Map[Name].LastSave = 0.0;
279
280 // Set minimum required change if given in configuratrion
281 size_t Pos = Change.find(Name+":");
282 if (Pos != string::npos) Map[Name].MinAbsChange = atof(Change.c_str() + Pos + Name.size() + 1);
283
284 // Load history buffer from file if existing
285 FILE *File = OpenFile(Name, "rb");
286 off_t Size;
287
288 if (File != NULL && (Size = FileSize(File)) != -1) {
289
290 // If current buffer too small, resize
291 if (Size > Map[Name].Buffer.size()) Map[Name].Buffer.resize(Size);
292
293 // Read next pointer
294 fread(&Map[Name].Next, sizeof(Map[Name].Next), 1, File);
295 // Skip format string
296 while (fgetc(File) != 0 && feof(File) == 0) {}
297 // Read buffer
298 fread(&Map[Name].Buffer[0], sizeof(char), Size, File);
299
300 if (ferror(File) != 0) {
301 Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
302 Map[Name].Buffer.clear();
303 }
304 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
305 }
306
307 // If no buffer loaded, allocate empty buffer
308 if (Map[Name].Buffer.empty()) {
309 Map[Name].Buffer.resize(MIN_SIZE_KB*1024);
310 memset(&Map[Name].Buffer[0], 0, Map[Name].Buffer.size());
311 *(int *) &Map[Name].Buffer[0] = 4;
312 Map[Name].Next = 4;
313 }
314
315 // Subscribe to service
316 Map[Name].DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
317}
318
319
320//
321// Remove service from watch list
322//
323void History::RemoveService(string Name) {
324
325 // Check if actually subscribed to service
326 if (Map.count(Name) == 0) return;
327
328 // Delete subscription first so handler and not called anymore
329 delete Map[Name].DataItem;
330
331 // Save history buffer
332 SaveHistory(Name);
333
334 Map.erase(Name);
335}
336
337
338//
339// Save history buffer to file
340//
341void History::SaveHistory(string Name) {
342
343 FILE *File = OpenFile(Name, "wb");
344
345 if (File != NULL) {
346 fwrite(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Next pointer
347 fwrite(Map[Name].Format.c_str(), Map[Name].Format.size()+1, 1, File); // Format
348 fwrite(&Map[Name].Buffer[0], sizeof(char), Map[Name].Buffer.size(), File); // Buffer
349
350 // If error, try to delete (possibly erroneous) file
351 if (ferror(File) != 0) {
352 if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s', could also not delete file", Name.c_str());
353 else Message(WARN, "Error writing history file '%s', deleted file", Name.c_str());
354 }
355 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s'", Name.c_str());
356 }
357 else Message(WARN, "Could not open history file '%s' for writing", Name.c_str());;
358}
359
360
361//
362// Determine size of file in kB
363//
364off_t History::FileSize(FILE *File) {
365
366 struct stat FileStatus;
367
368 if (fstat(fileno(File), &FileStatus) == -1) {
369 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
370 return -1;
371 }
372
373 return FileStatus.st_size;
374}
375
376//
377// Open file for service history
378//
379FILE *History::OpenFile(string Service, const char *Mode) {
380
381 // Create directory if not yet existing
382 if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
383
384 // Replace all '/' and non-graphical characters by '_' in string and open file
385 for (int i=0; i<Service.size(); i++) {
386 if (Service[i] == '/') Service[i] = '_';
387 if (isgraph(Service[i]) == 0) Service[i] = '_';
388 }
389
390 return fopen((string(Directory) + "/" + Service).c_str(), Mode);
391}
392
393//
394// Main program
395//
396int main(int argc, char *argv[]) {
397
398 if (argc != 2) {
399 printf("Usage: %s <History-Directory>\n", argv[0]);
400 exit(EXIT_FAILURE);
401 }
402
403 // Static ensures calling of destructor by exit()
404 static History Hist(argv[1]);
405
406 // Sleep until signal caught
407 while (!Hist.ExitRequest) pause();
408}
Note: See TracBrowser for help on using the repository browser.