source: Evidence/History.cc@ 241

Last change on this file since 241 was 232, checked in by ogrimm, 14 years ago
Made Lock()/Unlock() public, automatic configuration tracking for Bridge
File size: 11.2 KB
Line 
1/********************************************************************\
2
3 History server of the Evidence Control System
4
5 - Subscribes to all services and keeps a ring buffer for each service.
6 - Data added to the buffer only if changed by minimum amount given by
7 configuration information.
8 - The history is available via an rpc call.
9 - The history buffers are written to disk at program termination and
10 are tried to be read when adding a service.
11 - The buffer should hold at least REQUEST_NUM entries (of the size currently
12 handled in infoHandler()), but should not be larger than MAX_SIZE_KB.
13 Oliver Grimm, June 2010
14
15\********************************************************************/
16
17#define SERVER_NAME "History"
18#include "Evidence.h"
19
20const int MIN_SIZE_KB = 50; // Minimum and maximum history buffer in kByte (> 3*sizeof(int) !)
21const int MAX_SIZE_KB = 2000;
22const int REQUEST_NUM = 1000; // Requested number of entries in each history buffer
23
24#include <string>
25#include <sstream>
26#include <map>
27#include <math.h>
28#include <float.h>
29#include <sys/stat.h>
30#include <sys/time.h>
31
32using namespace std;
33
34//
35// Class declaration
36//
37class History: public DimRpc, public DimClient, public EvidenceServer {
38
39 struct Item {
40 DimStampedInfo *DataItem;
41 vector <char> Buffer;
42 int Next;
43 double LastValue;
44 double MinAbsChange;
45 string Format;
46 };
47 map<string, struct Item> Map;
48
49 DimInfo *ServerList;
50 char *Directory;
51 string Change;
52
53 void infoHandler();
54 void rpcHandler();
55 void AddService(string, const char *);
56 void RemoveService(string);
57 off_t FileSize(FILE *);
58 FILE *OpenFile(string, const char *);
59
60 public:
61 History(char *);
62 ~History();
63};
64
65
66// Constructor
67History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"),
68 EvidenceServer(SERVER_NAME),
69 Directory(Dir) {
70
71 // Map of minimum required change for addition to history buffer
72 Change = GetConfig("minchange", " ");
73
74 // Subscribe to top-level server list
75 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
76}
77
78
79// Destructor deletes all DIM subscriptions
80History::~History() {
81
82 delete ServerList;
83 while (Map.size() != 0) RemoveService((*Map.begin()).first);
84}
85
86
87// Implementation of data handling
88void History::infoHandler() {
89
90 DimInfo *I = getInfo();
91
92 // Check if service available
93 if (!ServiceOK(I)) return;
94
95 // ====== Part A: Handle service subscriptions ===
96
97 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
98 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
99 char *Token = strtok(I->getString(), "+-!@");
100 while (Token != NULL) {
101 AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!'
102 Token = strtok(NULL, "|"); // Skip server IP address
103 Token = strtok(NULL, "@");
104 }
105 return;
106 }
107
108 // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services
109 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
110
111 // Bug fix for empty SERVICE_LIST
112 if (strlen(I->getString()) == 0) {
113 string Tmp(I->getName());
114 RemoveService(I->getName());
115 AddService(Tmp.c_str(), (char *) "C");
116 return;
117 }
118
119 char *Type, *Name = strtok(I->getString(), "+-!|");
120 while (Name != NULL) {
121 // Only consider DIM services (not commands and RPCs)
122 if (((Type = strtok(NULL, "\n")) != NULL) &&
123 (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) {
124 if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name);
125 else {
126 Type[strlen(Type)-1] = '\0'; // Isolate service format
127 AddService(Name, Type);
128 }
129 }
130 Name = strtok(NULL, "|");
131 }
132 return;
133 }
134
135 // ====== Part B: Handle history service ===
136
137 char *Service = I->getName();
138
139 // Check if service known and ignore empty or illegal time stamped service
140 if (Map.count(Service) == 0 || I->getSize()==0 || I->getTimestamp()<=0) return;
141
142 // Resize buffer if necessary
143 if (Map[Service].Buffer.size() < REQUEST_NUM*I->getSize()) {
144 if (REQUEST_NUM*I->getSize() < MAX_SIZE_KB*1024) Map[Service].Buffer.resize(REQUEST_NUM*I->getSize());
145 }
146
147 // If data is number of single type, check minumum change before adding to history
148 if (strcmp(I->getFormat(), "C") != 0) {
149 // Calculate sum of all number in array
150 istringstream Text(EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize()));
151 double Num, Sum = 0;
152 while (Text.good()) {
153 Text >> Num;
154 Sum += fabs(Num);
155 }
156
157 // Minimum change?
158 if (fabs(Sum-Map[Service].LastValue) < fabs(Map[Service].MinAbsChange)) return;
159 Map[Service].LastValue = Sum;
160 }
161
162 // Check if data fits into buffer
163 if (Map[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
164
165 int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = Map[Service].Next;
166 void *WrapPos = NULL;
167 char *Buffer = &Map[Service].Buffer[0];
168 int Oldest = *(int *) Buffer;
169
170 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
171 if (Next + Size >= Map[Service].Buffer.size()) {
172 WrapPos = Buffer + Next;
173 Next = sizeof(int);
174 }
175
176 // Adapt pointer to oldest entry
177 while ((Oldest < Next + Size) &&
178 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
179 // Check for wrap-around
180 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
181 Oldest = sizeof(int);
182 continue;
183 }
184 // Check if end marker reached, then only one event fits buffer
185 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
186 Oldest = Next;
187 break;
188 }
189 // Move to next entry
190 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
191 }
192 // Update pointer in buffer
193 *(int *) Buffer = Oldest;
194
195 // Write wrap mark if necessary
196 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
197
198 // Copy data into ring buffer
199 *((int *) (Buffer + Next)) = I->getTimestamp();
200 *((int *) (Buffer + Next + sizeof(int))) = I->getSize();
201 memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize());
202
203 // Adjust pointer for next entry and write end marker to buffer
204 Next += I->getSize() + sizeof(EvidenceHistory::Item);
205 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
206
207 Map[Service].Next = Next;
208}
209
210
211// Implementation of history buffer distribution
212void History::rpcHandler() {
213
214 char *Name = getString();
215
216 // Search for history buffer in memory
217 if (Map.count(Name) == 1) {
218 char *Buffer = new char [Map[Name].Format.size()+1+Map[Name].Buffer.size()];
219 strcpy(Buffer, Map[Name].Format.c_str());
220 memcpy(Buffer+Map[Name].Format.size()+1, &Map[Name].Buffer[0], Map[Name].Buffer.size());
221 setData((void *) Buffer, Map[Name].Format.size()+1+Map[Name].Buffer.size());
222 delete[] Buffer;
223 return;
224 }
225
226 // Try to open history file if not found in memory
227 FILE *File = OpenFile(Name, "rb");
228 if (File == NULL) {
229 setData(NULL, 0);
230 return;
231 }
232
233 // Read history file and send to client (data will contain format string and history)
234 off_t Size = FileSize(File);
235 if (Size != -1) {
236 char *Buffer = new char [Size-sizeof(int)];
237 fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer
238 if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) {
239 Message(WARN, "Error reading history file '%s' in rpcHandler()", Name);
240 setData(NULL, 0); // Default response
241 }
242 else setData((void *) Buffer, Size);
243 delete[] Buffer;
244 }
245
246 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", Name);
247}
248
249
250//
251// Add service to watch list
252//
253void History::AddService(string Name, const char *Format) {
254
255 // Return if already subscribed to this service
256 if (Map.count(Name) != 0) return;
257
258 // Create new service subscription
259 Map[Name].LastValue = DBL_MAX;
260 Map[Name].Format = Format;
261 Map[Name].MinAbsChange = 0.0;
262
263 // Set minimum required change if given in configuratrion
264 size_t Pos = Change.find(Name+":");
265 if (Pos != string::npos) Map[Name].MinAbsChange = atof(Change.c_str() + Pos + Name.size() + 1);
266
267 // Load history buffer from file if existing
268 FILE *File = OpenFile(Name, "rb");
269 off_t Size;
270
271 if (File != NULL && (Size = FileSize(File)) != -1) {
272
273 // If current buffer too small, resize
274 if (Size > Map[Name].Buffer.size()) Map[Name].Buffer.resize(Size);
275
276 // Read next pointer
277 fread(&Map[Name].Next, sizeof(Map[Name].Next), 1, File);
278 // Skip format string
279 while (fgetc(File) != 0 && feof(File) == 0) {}
280 // Read buffer
281 fread(&Map[Name].Buffer[0], sizeof(char), Size, File);
282
283 if (ferror(File) != 0) {
284 Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
285 Map[Name].Buffer.clear();
286 }
287 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
288 }
289
290 // If no buffer loaded, allocate empty buffer
291 if (Map[Name].Buffer.empty()) {
292 Map[Name].Buffer.resize(MIN_SIZE_KB*1024);
293 memset(&Map[Name].Buffer[0], 0, Map[Name].Buffer.size());
294 *(int *) &Map[Name].Buffer[0] = 4;
295 Map[Name].Next = 4;
296 }
297
298 // Subscribe to service
299 Map[Name].DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
300}
301
302
303//
304// Remove service from watch list
305//
306void History::RemoveService(string Name) {
307
308 // Check if actually subscribed to service
309 if (Map.count(Name) == 0) return;
310
311 // Delete subscription first so handler and not called anymore
312 delete Map[Name].DataItem;
313
314 // Save history buffer
315 FILE *File = OpenFile(Name, "wb");
316 if (File != NULL) {
317 fwrite(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Next pointer
318 fwrite(Map[Name].Format.c_str(), Map[Name].Format.size()+1, 1, File); // Format
319 fwrite(&Map[Name].Buffer[0], sizeof(char), Map[Name].Buffer.size(), File); // Buffer
320
321 // If error, try to delete (possibly erroneous) file
322 if (ferror(File) != 0) {
323 if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s' in RemoveService(), could also not delete file", Name.c_str());
324 else Message(WARN, "Error writing history file '%s' in RemoveService(), deleted file", Name.c_str());
325 }
326 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in RemoveService()", Name.c_str());;
327 }
328
329 Map.erase(Name);
330}
331
332//
333// Determine size of file in kB
334//
335off_t History::FileSize(FILE *File) {
336
337 struct stat FileStatus;
338
339 if (fstat(fileno(File), &FileStatus) == -1) {
340 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
341 return -1;
342 }
343
344 return FileStatus.st_size;
345}
346
347//
348// Open file for service history
349//
350FILE *History::OpenFile(string Service, const char *Mode) {
351
352 // Create directory if not yet existing
353 if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
354
355 // Replace all '/' and non-graphical characters by '_' in string and open file
356 for (int i=0; i<Service.size(); i++) {
357 if (Service[i] == '/') Service[i] = '_';
358 if (isgraph(Service[i]) == 0) Service[i] = '_';
359 }
360
361 return fopen((string(Directory) + "/" + Service).c_str(), Mode);
362}
363
364//
365// Main program
366//
367int main(int argc, char *argv[]) {
368
369 if (argc != 2) {
370 printf("Usage: %s <History-Directory>\n", argv[0]);
371 exit(EXIT_FAILURE);
372 }
373
374 // Static ensures calling of destructor by exit()
375 static History Hist(argv[1]);
376
377 // Sleep until signal caught
378 while (!Hist.ExitRequest) pause();
379}
Note: See TracBrowser for help on using the repository browser.