source: fact/Evidence/History.cc@ 11014

Last change on this file since 11014 was 10996, checked in by ogrimm, 13 years ago
History allows to exclude services
File size: 11.4 KB
Line 
1/********************************************************************\
2
3 History server of the Evidence Control System
4
5 - Subscribes to all services and keeps a ring buffer for each service.
6 - Data added to the buffer only if changed by minimum amount given by
7 configuration information.
8 - The history is available via an rpc call.
9 - The history buffers are written to disk at program termination and
10 are tried to be read when adding a service.
11 - The buffer should hold at least REQUEST_NUM entries (of the size currently
12 handled in infoHandler()), but should not be larger than MAX_SIZE_KB.
13 Oliver Grimm, June 2010
14
15\********************************************************************/
16
17#define SERVER_NAME "History"
18#include "Evidence.h"
19
20#include <string>
21#include <sstream>
22#include <map>
23#include <math.h>
24#include <float.h>
25#include <sys/stat.h>
26#include <sys/time.h>
27
28using namespace std;
29
30const int MIN_SIZE_KB = 50; // Min and max buffersize in kByte (> 3*sizeof(int) !)
31const string DEFAULT_MAX_SIZE_KB = "2000";
32const string DEFAULT_NUM_ENTRIES = "1000"; // Number of entries in each history buffer
33
34//
35// Class declaration
36//
37class History: public DimRpc, public DimClient, public EvidenceServer {
38
39 struct Item {
40 DimStampedInfo *DataItem;
41 vector <char> Buffer;
42 int Next;
43 double LastValue;
44 double MinAbsChange;
45 string Format;
46 };
47 map<string, struct Item> Map;
48
49 DimInfo *ServerList;
50 char *Directory;
51 string Change;
52
53 void infoHandler();
54 void rpcHandler();
55 void AddService(string, const char *);
56 void RemoveService(string);
57 off_t FileSize(FILE *);
58 FILE *OpenFile(string, const char *);
59
60 public:
61 History(char *);
62 ~History();
63};
64
65
66// Constructor
67History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"),
68 EvidenceServer(SERVER_NAME),
69 Directory(Dir) {
70
71 // Get/initialize configuration
72 Change = GetConfig("minchange", " ");
73 GetConfig("maxsize_kb", DEFAULT_MAX_SIZE_KB);
74 GetConfig("numentries", DEFAULT_NUM_ENTRIES);
75 GetConfig("exclude", "");
76
77 // Subscribe to top-level server list
78 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
79}
80
81
82// Destructor deletes all DIM subscriptions
83History::~History() {
84
85 delete ServerList;
86 while (Map.size() != 0) RemoveService((*Map.begin()).first);
87}
88
89
90// Implementation of data handling
91void History::infoHandler() {
92
93 DimInfo *I = getInfo();
94
95 // Check if service available
96 if (!ServiceOK(I)) return;
97
98 // ====== Part A: Handle service subscriptions ===
99
100 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
101 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
102 char *Token = strtok(I->getString(), "+-!@");
103 while (Token != NULL) {
104 AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!'
105 Token = strtok(NULL, "|"); // Skip server IP address
106 Token = strtok(NULL, "@");
107 }
108 return;
109 }
110
111 // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services
112 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
113
114 // Bug fix for empty SERVICE_LIST
115 if (strlen(I->getString()) == 0) {
116 string Tmp(I->getName());
117 RemoveService(I->getName());
118 AddService(Tmp.c_str(), (char *) "C");
119 return;
120 }
121
122 char *Type, *Name = strtok(I->getString(), "+-!|");
123 while (Name != NULL) {
124 // Only consider DIM services (not commands and RPCs)
125 if (((Type = strtok(NULL, "\n")) != NULL) &&
126 (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) {
127 if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name);
128 else {
129 Type[strlen(Type)-1] = '\0'; // Isolate service format
130 AddService(Name, Type);
131 }
132 }
133 Name = strtok(NULL, "|");
134 }
135 return;
136 }
137
138 // ====== Part B: Handle history service ===
139
140 char *Service = I->getName();
141
142 // Check if service known and ignore empty or illegal time stamped service
143 if (Map.count(Service) == 0 || I->getSize()==0 || I->getTimestamp()<=0) return;
144
145 // Resize buffer if necessary
146 int NEntries = atoi(GetConfig("numentries").c_str());
147 if (Map[Service].Buffer.size() < NEntries*I->getSize()) {
148 if (NEntries*I->getSize() < atoi(GetConfig("maxsize_kb").c_str())*1024) {
149 Map[Service].Buffer.resize(NEntries*I->getSize());
150 }
151 }
152
153 // If data is number of single type, check minumum change before adding to history
154 if (strcmp(I->getFormat(), "C") != 0) {
155 // Calculate sum of all number in array
156 istringstream Text(EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize()));
157 double Num, Sum = 0;
158 while (Text.good()) {
159 Text >> Num;
160 Sum += fabs(Num);
161 }
162
163 // Minimum change?
164 if (fabs(Sum-Map[Service].LastValue) < fabs(Map[Service].MinAbsChange)) return;
165 Map[Service].LastValue = Sum;
166 }
167
168 // Check if data fits into buffer
169 if (Map[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
170
171 int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = Map[Service].Next;
172 void *WrapPos = NULL;
173 char *Buffer = &Map[Service].Buffer[0];
174 int Oldest = *(int *) Buffer;
175
176 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
177 if (Next + Size >= Map[Service].Buffer.size()) {
178 WrapPos = Buffer + Next;
179 Next = sizeof(int);
180 }
181
182 // Adapt pointer to oldest entry
183 while ((Oldest < Next + Size) &&
184 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
185 // Check for wrap-around
186 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
187 Oldest = sizeof(int);
188 continue;
189 }
190 // Check if end marker reached, then only one event fits buffer
191 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
192 Oldest = Next;
193 break;
194 }
195 // Move to next entry
196 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
197 }
198 // Update pointer in buffer
199 *(int *) Buffer = Oldest;
200
201 // Write wrap mark if necessary
202 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
203
204 // Copy data into ring buffer
205 *((int *) (Buffer + Next)) = I->getTimestamp();
206 *((int *) (Buffer + Next + sizeof(int))) = I->getSize();
207 memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize());
208
209 // Adjust pointer for next entry and write end marker to buffer
210 Next += I->getSize() + sizeof(EvidenceHistory::Item);
211 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
212
213 Map[Service].Next = Next;
214}
215
216
217// Implementation of history buffer distribution
218void History::rpcHandler() {
219
220 string Name = ToString((char *) "C", getData(), getSize());
221
222 // Search for history buffer in memory
223 if (Map.count(Name) == 1) {
224 char *Buffer = new char [Map[Name].Format.size()+1+Map[Name].Buffer.size()];
225 strcpy(Buffer, Map[Name].Format.c_str());
226 memcpy(Buffer+Map[Name].Format.size()+1, &Map[Name].Buffer[0], Map[Name].Buffer.size());
227 setData((void *) Buffer, Map[Name].Format.size()+1+Map[Name].Buffer.size());
228 delete[] Buffer;
229 return;
230 }
231
232 // Try to open history file if not found in memory
233 FILE *File = OpenFile(Name, "rb");
234 if (File == NULL) {
235 setData(NULL, 0);
236 return;
237 }
238
239 // Read history file and send to client (data will contain format string and history)
240 off_t Size = FileSize(File);
241 if (Size != -1) {
242 char *Buffer = new char [Size-sizeof(int)];
243 fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer
244 if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) {
245 Message(WARN, "Error reading history file '%s' in rpcHandler()", Name.c_str());
246 setData(NULL, 0); // Default response
247 }
248 else setData((void *) Buffer, Size);
249 delete[] Buffer;
250 }
251
252 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", Name.c_str());
253}
254
255
256//
257// Add service to watch list
258//
259void History::AddService(string Name, const char *Format) {
260
261 // Return if already subscribed to this service or if excluded
262 if (Map.count(Name) != 0 || GetConfig("exclude").find(Name) != string::npos) return;
263
264 // Create new service subscription
265 Map[Name].LastValue = DBL_MAX;
266 Map[Name].Format = Format;
267 Map[Name].MinAbsChange = 0.0;
268
269 // Set minimum required change if given in configuratrion
270 size_t Pos = Change.find(Name+":");
271 if (Pos != string::npos) Map[Name].MinAbsChange = atof(Change.c_str() + Pos + Name.size() + 1);
272
273 // Load history buffer from file if existing
274 FILE *File = OpenFile(Name, "rb");
275 off_t Size;
276
277 if (File != NULL && (Size = FileSize(File)) != -1) {
278
279 // If current buffer too small, resize
280 if (Size > Map[Name].Buffer.size()) Map[Name].Buffer.resize(Size);
281
282 // Read next pointer
283 fread(&Map[Name].Next, sizeof(Map[Name].Next), 1, File);
284 // Skip format string
285 while (fgetc(File) != 0 && feof(File) == 0) {}
286 // Read buffer
287 fread(&Map[Name].Buffer[0], sizeof(char), Size, File);
288
289 if (ferror(File) != 0) {
290 Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
291 Map[Name].Buffer.clear();
292 }
293 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
294 }
295
296 // If no buffer loaded, allocate empty buffer
297 if (Map[Name].Buffer.empty()) {
298 Map[Name].Buffer.resize(MIN_SIZE_KB*1024);
299 memset(&Map[Name].Buffer[0], 0, Map[Name].Buffer.size());
300 *(int *) &Map[Name].Buffer[0] = 4;
301 Map[Name].Next = 4;
302 }
303
304 // Subscribe to service
305 Map[Name].DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
306}
307
308
309//
310// Remove service from watch list
311//
312void History::RemoveService(string Name) {
313
314 // Check if actually subscribed to service
315 if (Map.count(Name) == 0) return;
316
317 // Delete subscription first so handler and not called anymore
318 delete Map[Name].DataItem;
319
320 // Save history buffer
321 FILE *File = OpenFile(Name, "wb");
322 if (File != NULL) {
323 fwrite(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Next pointer
324 fwrite(Map[Name].Format.c_str(), Map[Name].Format.size()+1, 1, File); // Format
325 fwrite(&Map[Name].Buffer[0], sizeof(char), Map[Name].Buffer.size(), File); // Buffer
326
327 // If error, try to delete (possibly erroneous) file
328 if (ferror(File) != 0) {
329 if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s' in RemoveService(), could also not delete file", Name.c_str());
330 else Message(WARN, "Error writing history file '%s' in RemoveService(), deleted file", Name.c_str());
331 }
332 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in RemoveService()", Name.c_str());;
333 }
334
335 Map.erase(Name);
336}
337
338//
339// Determine size of file in kB
340//
341off_t History::FileSize(FILE *File) {
342
343 struct stat FileStatus;
344
345 if (fstat(fileno(File), &FileStatus) == -1) {
346 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
347 return -1;
348 }
349
350 return FileStatus.st_size;
351}
352
353//
354// Open file for service history
355//
356FILE *History::OpenFile(string Service, const char *Mode) {
357
358 // Create directory if not yet existing
359 if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
360
361 // Replace all '/' and non-graphical characters by '_' in string and open file
362 for (int i=0; i<Service.size(); i++) {
363 if (Service[i] == '/') Service[i] = '_';
364 if (isgraph(Service[i]) == 0) Service[i] = '_';
365 }
366
367 return fopen((string(Directory) + "/" + Service).c_str(), Mode);
368}
369
370//
371// Main program
372//
373int main(int argc, char *argv[]) {
374
375 if (argc != 2) {
376 printf("Usage: %s <History-Directory>\n", argv[0]);
377 exit(EXIT_FAILURE);
378 }
379
380 // Static ensures calling of destructor by exit()
381 static History Hist(argv[1]);
382
383 // Sleep until signal caught
384 while (!Hist.ExitRequest) pause();
385}
Note: See TracBrowser for help on using the repository browser.