source: Evidence/History.cc@ 827

Last change on this file since 827 was 253, checked in by ogrimm, 14 years ago
Added command ResetAlarm, Evidence servers now always safely translate a DIM string into a C string, added documentation, replaced several vectors my maps
File size: 11.4 KB
Line 
1/********************************************************************\
2
3 History server of the Evidence Control System
4
5 - Subscribes to all services and keeps a ring buffer for each service.
6 - Data added to the buffer only if changed by minimum amount given by
7 configuration information.
8 - The history is available via an rpc call.
9 - The history buffers are written to disk at program termination and
10 are tried to be read when adding a service.
11 - The buffer should hold at least REQUEST_NUM entries (of the size currently
12 handled in infoHandler()), but should not be larger than MAX_SIZE_KB.
13 Oliver Grimm, June 2010
14
15\********************************************************************/
16
17#define SERVER_NAME "History"
18#include "Evidence.h"
19
20#include <string>
21#include <sstream>
22#include <map>
23#include <math.h>
24#include <float.h>
25#include <sys/stat.h>
26#include <sys/time.h>
27
28using namespace std;
29
30const int MIN_SIZE_KB = 50; // Min and max buffersize in kByte (> 3*sizeof(int) !)
31const string DEFAULT_MAX_SIZE_KB = "2000";
32const string DEFAULT_NUM_ENTRIES = "1000"; // Number of entries in each history buffer
33
34//
35// Class declaration
36//
37class History: public DimRpc, public DimClient, public EvidenceServer {
38
39 struct Item {
40 DimStampedInfo *DataItem;
41 vector <char> Buffer;
42 int Next;
43 double LastValue;
44 double MinAbsChange;
45 string Format;
46 };
47 map<string, struct Item> Map;
48
49 DimInfo *ServerList;
50 char *Directory;
51 string Change;
52
53 void infoHandler();
54 void rpcHandler();
55 void AddService(string, const char *);
56 void RemoveService(string);
57 off_t FileSize(FILE *);
58 FILE *OpenFile(string, const char *);
59
60 public:
61 History(char *);
62 ~History();
63};
64
65
66// Constructor
67History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"),
68 EvidenceServer(SERVER_NAME),
69 Directory(Dir) {
70
71 // Get/initialize configuration
72 Change = GetConfig("minchange", " ");
73 GetConfig("maxsize_kb", DEFAULT_MAX_SIZE_KB);
74 GetConfig("numentries", DEFAULT_NUM_ENTRIES);
75
76 // Subscribe to top-level server list
77 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
78}
79
80
81// Destructor deletes all DIM subscriptions
82History::~History() {
83
84 delete ServerList;
85 while (Map.size() != 0) RemoveService((*Map.begin()).first);
86}
87
88
89// Implementation of data handling
90void History::infoHandler() {
91
92 DimInfo *I = getInfo();
93
94 // Check if service available
95 if (!ServiceOK(I)) return;
96
97 // ====== Part A: Handle service subscriptions ===
98
99 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
100 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
101 char *Token = strtok(I->getString(), "+-!@");
102 while (Token != NULL) {
103 AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!'
104 Token = strtok(NULL, "|"); // Skip server IP address
105 Token = strtok(NULL, "@");
106 }
107 return;
108 }
109
110 // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services
111 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
112
113 // Bug fix for empty SERVICE_LIST
114 if (strlen(I->getString()) == 0) {
115 string Tmp(I->getName());
116 RemoveService(I->getName());
117 AddService(Tmp.c_str(), (char *) "C");
118 return;
119 }
120
121 char *Type, *Name = strtok(I->getString(), "+-!|");
122 while (Name != NULL) {
123 // Only consider DIM services (not commands and RPCs)
124 if (((Type = strtok(NULL, "\n")) != NULL) &&
125 (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) {
126 if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name);
127 else {
128 Type[strlen(Type)-1] = '\0'; // Isolate service format
129 AddService(Name, Type);
130 }
131 }
132 Name = strtok(NULL, "|");
133 }
134 return;
135 }
136
137 // ====== Part B: Handle history service ===
138
139 char *Service = I->getName();
140
141 // Check if service known and ignore empty or illegal time stamped service
142 if (Map.count(Service) == 0 || I->getSize()==0 || I->getTimestamp()<=0) return;
143
144 // Resize buffer if necessary
145 int NEntries = atoi(GetConfig("numentries").c_str());
146 if (Map[Service].Buffer.size() < NEntries*I->getSize()) {
147 if (NEntries*I->getSize() < atoi(GetConfig("maxsize_kb").c_str())*1024) {
148 Map[Service].Buffer.resize(NEntries*I->getSize());
149 }
150 }
151
152 // If data is number of single type, check minumum change before adding to history
153 if (strcmp(I->getFormat(), "C") != 0) {
154 // Calculate sum of all number in array
155 istringstream Text(EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize()));
156 double Num, Sum = 0;
157 while (Text.good()) {
158 Text >> Num;
159 Sum += fabs(Num);
160 }
161
162 // Minimum change?
163 if (fabs(Sum-Map[Service].LastValue) < fabs(Map[Service].MinAbsChange)) return;
164 Map[Service].LastValue = Sum;
165 }
166
167 // Check if data fits into buffer
168 if (Map[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
169
170 int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = Map[Service].Next;
171 void *WrapPos = NULL;
172 char *Buffer = &Map[Service].Buffer[0];
173 int Oldest = *(int *) Buffer;
174
175 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
176 if (Next + Size >= Map[Service].Buffer.size()) {
177 WrapPos = Buffer + Next;
178 Next = sizeof(int);
179 }
180
181 // Adapt pointer to oldest entry
182 while ((Oldest < Next + Size) &&
183 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
184 // Check for wrap-around
185 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
186 Oldest = sizeof(int);
187 continue;
188 }
189 // Check if end marker reached, then only one event fits buffer
190 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
191 Oldest = Next;
192 break;
193 }
194 // Move to next entry
195 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
196 }
197 // Update pointer in buffer
198 *(int *) Buffer = Oldest;
199
200 // Write wrap mark if necessary
201 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
202
203 // Copy data into ring buffer
204 *((int *) (Buffer + Next)) = I->getTimestamp();
205 *((int *) (Buffer + Next + sizeof(int))) = I->getSize();
206 memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize());
207
208 // Adjust pointer for next entry and write end marker to buffer
209 Next += I->getSize() + sizeof(EvidenceHistory::Item);
210 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
211
212 Map[Service].Next = Next;
213}
214
215
216// Implementation of history buffer distribution
217void History::rpcHandler() {
218
219 string Name = ToString((char *) "C", getData(), getSize());
220
221 // Search for history buffer in memory
222 if (Map.count(Name) == 1) {
223 char *Buffer = new char [Map[Name].Format.size()+1+Map[Name].Buffer.size()];
224 strcpy(Buffer, Map[Name].Format.c_str());
225 memcpy(Buffer+Map[Name].Format.size()+1, &Map[Name].Buffer[0], Map[Name].Buffer.size());
226 setData((void *) Buffer, Map[Name].Format.size()+1+Map[Name].Buffer.size());
227 delete[] Buffer;
228 return;
229 }
230
231 // Try to open history file if not found in memory
232 FILE *File = OpenFile(Name, "rb");
233 if (File == NULL) {
234 setData(NULL, 0);
235 return;
236 }
237
238 // Read history file and send to client (data will contain format string and history)
239 off_t Size = FileSize(File);
240 if (Size != -1) {
241 char *Buffer = new char [Size-sizeof(int)];
242 fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer
243 if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) {
244 Message(WARN, "Error reading history file '%s' in rpcHandler()", Name.c_str());
245 setData(NULL, 0); // Default response
246 }
247 else setData((void *) Buffer, Size);
248 delete[] Buffer;
249 }
250
251 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", Name.c_str());
252}
253
254
255//
256// Add service to watch list
257//
258void History::AddService(string Name, const char *Format) {
259
260 // Return if already subscribed to this service
261 if (Map.count(Name) != 0) return;
262
263 // Create new service subscription
264 Map[Name].LastValue = DBL_MAX;
265 Map[Name].Format = Format;
266 Map[Name].MinAbsChange = 0.0;
267
268 // Set minimum required change if given in configuratrion
269 size_t Pos = Change.find(Name+":");
270 if (Pos != string::npos) Map[Name].MinAbsChange = atof(Change.c_str() + Pos + Name.size() + 1);
271
272 // Load history buffer from file if existing
273 FILE *File = OpenFile(Name, "rb");
274 off_t Size;
275
276 if (File != NULL && (Size = FileSize(File)) != -1) {
277
278 // If current buffer too small, resize
279 if (Size > Map[Name].Buffer.size()) Map[Name].Buffer.resize(Size);
280
281 // Read next pointer
282 fread(&Map[Name].Next, sizeof(Map[Name].Next), 1, File);
283 // Skip format string
284 while (fgetc(File) != 0 && feof(File) == 0) {}
285 // Read buffer
286 fread(&Map[Name].Buffer[0], sizeof(char), Size, File);
287
288 if (ferror(File) != 0) {
289 Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
290 Map[Name].Buffer.clear();
291 }
292 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
293 }
294
295 // If no buffer loaded, allocate empty buffer
296 if (Map[Name].Buffer.empty()) {
297 Map[Name].Buffer.resize(MIN_SIZE_KB*1024);
298 memset(&Map[Name].Buffer[0], 0, Map[Name].Buffer.size());
299 *(int *) &Map[Name].Buffer[0] = 4;
300 Map[Name].Next = 4;
301 }
302
303 // Subscribe to service
304 Map[Name].DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
305}
306
307
308//
309// Remove service from watch list
310//
311void History::RemoveService(string Name) {
312
313 // Check if actually subscribed to service
314 if (Map.count(Name) == 0) return;
315
316 // Delete subscription first so handler and not called anymore
317 delete Map[Name].DataItem;
318
319 // Save history buffer
320 FILE *File = OpenFile(Name, "wb");
321 if (File != NULL) {
322 fwrite(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Next pointer
323 fwrite(Map[Name].Format.c_str(), Map[Name].Format.size()+1, 1, File); // Format
324 fwrite(&Map[Name].Buffer[0], sizeof(char), Map[Name].Buffer.size(), File); // Buffer
325
326 // If error, try to delete (possibly erroneous) file
327 if (ferror(File) != 0) {
328 if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s' in RemoveService(), could also not delete file", Name.c_str());
329 else Message(WARN, "Error writing history file '%s' in RemoveService(), deleted file", Name.c_str());
330 }
331 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in RemoveService()", Name.c_str());;
332 }
333
334 Map.erase(Name);
335}
336
337//
338// Determine size of file in kB
339//
340off_t History::FileSize(FILE *File) {
341
342 struct stat FileStatus;
343
344 if (fstat(fileno(File), &FileStatus) == -1) {
345 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
346 return -1;
347 }
348
349 return FileStatus.st_size;
350}
351
352//
353// Open file for service history
354//
355FILE *History::OpenFile(string Service, const char *Mode) {
356
357 // Create directory if not yet existing
358 if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
359
360 // Replace all '/' and non-graphical characters by '_' in string and open file
361 for (int i=0; i<Service.size(); i++) {
362 if (Service[i] == '/') Service[i] = '_';
363 if (isgraph(Service[i]) == 0) Service[i] = '_';
364 }
365
366 return fopen((string(Directory) + "/" + Service).c_str(), Mode);
367}
368
369//
370// Main program
371//
372int main(int argc, char *argv[]) {
373
374 if (argc != 2) {
375 printf("Usage: %s <History-Directory>\n", argv[0]);
376 exit(EXIT_FAILURE);
377 }
378
379 // Static ensures calling of destructor by exit()
380 static History Hist(argv[1]);
381
382 // Sleep until signal caught
383 while (!Hist.ExitRequest) pause();
384}
Note: See TracBrowser for help on using the repository browser.