source: Evidence/History.cc@ 226

Last change on this file since 226 was 224, checked in by ogrimm, 14 years ago
Message severity encoded now using standard DIM structure, other updates
File size: 11.2 KB
Line 
1/********************************************************************\
2
3 History server of the Evidence Control System
4
5 - Subscribes to all services and keeps a ring buffer for each service.
6 - Data added to the buffer only if changed by minimum amount given by
7 configuration information.
8 - The history is available via an rpc call.
9 - The history buffers are written to disk at program termination and
10 are tried to be read when adding a service.
11 - The buffer should hold at least REQUEST_NUM entries (of the size currently
12 handled in infoHandler()), but should not be larger than MAX_SIZE_KB.
13 Oliver Grimm, June 2010
14
15\********************************************************************/
16
17#define SERVER_NAME "History"
18#include "Evidence.h"
19
20const int MIN_SIZE_KB = 50; // Minimum and maximum history buffer in kByte (> 3*sizeof(int) !)
21const int MAX_SIZE_KB = 2000;
22const int REQUEST_NUM = 1000; // Requested number of entries in each history buffer
23
24#include <string>
25#include <vector>
26#include <math.h>
27#include <float.h>
28#include <sys/stat.h>
29
30using namespace std;
31
32//
33// Class declaration
34//
35class History: public DimRpc, public DimClient, public EvidenceServer {
36
37 struct Item {
38 DimStampedInfo *DataItem;
39 vector <char> Buffer;
40 int Next;
41 double LastValue;
42 double MinAbsChange;
43 string Format;
44 };
45 vector<struct Item> List;
46
47 DimInfo *ServerList;
48 char *Directory;
49 char *Change;
50
51 void infoHandler();
52 void rpcHandler();
53 void AddService(string, const char *);
54 void RemoveService(string);
55 off_t FileSize(FILE *);
56 FILE *OpenFile(string, const char *);
57
58 public:
59 History(char *);
60 ~History();
61};
62
63
64// Constructor
65History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"),
66 EvidenceServer(SERVER_NAME),
67 Directory(Dir) {
68
69 // List of minimum required change for addition to history buffer
70 Change = GetConfig("minchange", " ");
71
72 // Subscribe to top-level server list
73 ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this);
74}
75
76
77// Destructor deletes all DIM subscriptions
78History::~History() {
79
80 delete ServerList;
81 while (List.size() != 0) RemoveService(List[0].DataItem->getName());
82}
83
84
85// Implementation of data handling
86void History::infoHandler() {
87
88 DimInfo *I = getInfo();
89
90 // Check if service available
91 if (!ServiceOK(I)) return;
92
93 // ====== Part A: Handle service subscriptions ===
94
95 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
96 if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) {
97 char *Token = strtok(I->getString(), "+-!@");
98 while (Token != NULL) {
99 AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!'
100 Token = strtok(NULL, "|"); // Skip server IP address
101 Token = strtok(NULL, "@");
102 }
103 return;
104 }
105
106 // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services
107 if (strstr(I->getName(), "/SERVICE_LIST") != NULL) {
108 char *Type, *Name = strtok(I->getString(), "+-!|");
109 while (Name != NULL) {
110 // Only consider DIM services (not commands and RPCs)
111 if (((Type = strtok(NULL, "\n")) != NULL) &&
112 (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) {
113 if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name);
114 else {
115 Type[strlen(Type)-1] = '\0'; // Isolate service format
116 AddService(Name, Type);
117 }
118 }
119 Name = strtok(NULL, "|");
120 }
121 return;
122 }
123
124 // ====== Part B: Handle history service ===
125
126 // Identify index of service
127 int Service;
128 for (Service=0; Service<List.size(); Service++) if (I == List[Service].DataItem) break;
129 if (Service == List.size()) return; // Service not found
130
131 // Ignore empty services
132 if (I->getSize()==0 || I->getTimestamp()==0) return;
133
134 // Resize buffer if necessary
135 if (List[Service].Buffer.size() < REQUEST_NUM*I->getSize()) {
136 if (REQUEST_NUM*I->getSize() < MAX_SIZE_KB*1024) List[Service].Buffer.resize(REQUEST_NUM*I->getSize());
137 }
138
139 // If data is number of single type, a minumum change might be requested before addind to history
140 if (strcmp(I->getFormat(),"C") != 0 && strlen(I->getFormat())==1) {
141 // Calculate sum of all number in array
142 char *Text = EvidenceServer::ToString(I);
143 char *Token = strtok(Text, " ");
144 double Sum = 0;
145 while (Token != NULL) {
146 Sum += atof(Token);
147 Token = strtok(NULL, " ");
148 }
149 free(Text);
150
151 // Minimum change?
152 if (fabs(Sum-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) return;
153 List[Service].LastValue = Sum;
154 }
155
156 // Check if data fits into buffer
157 if (List[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return;
158
159 int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = List[Service].Next;
160 void *WrapPos = NULL;
161 char *Buffer = &List[Service].Buffer[0];
162 int Oldest = *(int *) Buffer;
163
164 // Check if buffer wrap-around (write wrap mark after Oldest is adjusted)
165 if (Next + Size >= List[Service].Buffer.size()) {
166 WrapPos = Buffer + Next;
167 Next = sizeof(int);
168 }
169
170 // Adapt pointer to oldest entry
171 while ((Oldest < Next + Size) &&
172 (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) {
173 // Check for wrap-around
174 if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) {
175 Oldest = sizeof(int);
176 continue;
177 }
178 // Check if end marker reached, then only one event fits buffer
179 if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) {
180 Oldest = Next;
181 break;
182 }
183 // Move to next entry
184 Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int);
185 }
186 // Update pointer in buffer
187 *(int *) Buffer = Oldest;
188
189 // Write wrap mark if necessary
190 if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark));
191
192 // Copy data into ring buffer
193 *((int *) (Buffer + Next)) = I->getTimestamp();
194 *((int *) (Buffer + Next + sizeof(int))) = I->getSize();
195 memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize());
196
197 // Adjust pointer for next entry and write end marker to buffer
198 Next += I->getSize() + sizeof(EvidenceHistory::Item);
199 memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark));
200
201 List[Service].Next = Next;
202}
203
204
205// Implementation of history buffer distribution
206void History::rpcHandler() {
207
208 // Search for history buffer
209 for (int i=0; i<List.size(); i++) {
210 if (strcmp(List[i].DataItem->getName(), getString()) == 0) {
211 char *Buffer = new char [List[i].Format.size()+1+List[i].Buffer.size()];
212 strcpy(Buffer, List[i].Format.c_str());
213 memcpy(Buffer+List[i].Format.size()+1, &List[i].Buffer[0], List[i].Buffer.size());
214// setData((void *) &List[i].Buffer[0], List[i].Buffer.size());
215 setData((void *) Buffer, List[i].Format.size()+1+List[i].Buffer.size());
216 delete[] Buffer;
217 return;
218 }
219 }
220
221 // Try to open history file if not found in memory
222 FILE *File = OpenFile(getString(), "rb");
223 if (File == NULL) {
224 setData(NULL, 0);
225 return;
226 }
227
228 // Read history file and send to client (data will contain format string and history)
229 off_t Size = FileSize(File);
230 if (Size != -1) {
231 char *Buffer = new char [Size-sizeof(int)];
232 fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer
233 if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) {
234 Message(WARN, "Error reading history file '%s' in rpcHandler()", getString());
235 setData(NULL, 0); // Default response
236 }
237 else setData((void *) Buffer, Size);
238 delete[] Buffer;
239 }
240
241 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", getString());
242}
243
244
245//
246// Add service to watch list
247//
248void History::AddService(string Name, const char *Format) {
249
250 // Check if already subscribed to this service
251 for (int i=0; i<List.size(); i++) {
252 if (Name == List[i].DataItem->getName()) return;
253 }
254
255 struct Item New;
256 New.LastValue = DBL_MAX;
257 New.Format = Format;
258
259 // Set minimum required change if given in configuratrion
260 char *Pnt = strstr(Change, Name.c_str());
261
262 if (Pnt != NULL && *(Pnt+Name.size()) == ':') New.MinAbsChange = atof(Pnt+Name.size()+1);
263 else New.MinAbsChange = 0;
264
265 // Load history buffer from file if existing
266 FILE *File = OpenFile(Name, "rb");
267 off_t Size;
268
269 if (File != NULL && (Size = FileSize(File)) != -1) {
270
271 // If current buffer too small, resize
272 if (Size > New.Buffer.size()) New.Buffer.resize(Size);
273
274 // Read next pointer
275 fread(&New.Next, sizeof(New.Next), 1, File);
276 // Skip format string
277 while (fgetc(File) != 0 && feof(File) == 0) {}
278 // Read buffer
279 fread(&New.Buffer[0], sizeof(char), Size, File);
280
281 if (ferror(File) != 0) {
282 Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str());
283 New.Buffer.clear();
284 }
285 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());;
286 }
287
288 // If no buffer loaded, allocate empty buffer
289 if (New.Buffer.empty()) {
290 New.Buffer.resize(MIN_SIZE_KB*1024);
291 memset(&New.Buffer[0], 0, New.Buffer.size());
292 *(int *) &New.Buffer[0] = 4;
293 New.Next = 4;
294 }
295
296 // Subscribe to service
297 New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this);
298
299 List.push_back(New);
300}
301
302
303//
304// Remove service from watch list
305//
306void History::RemoveService(string Name) {
307
308 // Find service index
309 vector<struct Item>::iterator E;
310 for (E=List.begin(); E<List.end(); ++E) if (Name == (*E).DataItem->getName()) {
311 // Delete subscription first so handler and not called anymore
312 delete (*E).DataItem;
313
314 // Save history buffer
315 FILE *File = OpenFile(Name, "wb");
316 if (File != NULL) {
317 fwrite(&(*E).Next, sizeof((*E).Next), 1, File); // Next pointer
318 fwrite((*E).Format.c_str(), (*E).Format.size()+1, 1, File); // Format
319 fwrite(&(*E).Buffer[0], sizeof(char), (*E).Buffer.size(), File); // Buffer
320
321 // If error, try to delete (possibly erroneous) file
322 if (ferror(File) != 0) {
323 if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s' in RemoveService(), could also not delete file", Name.c_str());
324 else Message(WARN, "Error writing history file '%s' in RemoveService(), deleted file", Name.c_str());
325 }
326 if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in RemoveService()", Name.c_str());;
327 }
328
329 List.erase(E);
330 }
331}
332
333//
334// Determine size of file in kB
335//
336off_t History::FileSize(FILE *File) {
337
338 struct stat FileStatus;
339
340 if (fstat(fileno(File), &FileStatus) == -1) {
341 Message(WARN, "Could not determine size of file (%s)", strerror(errno));
342 return -1;
343 }
344
345 return FileStatus.st_size;
346}
347
348//
349// Open file for service history
350//
351FILE *History::OpenFile(string Service, const char *Mode) {
352
353 // Create directory if not yet existing
354 if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL;
355
356 // Replace all '/' and non-graphical characters by '_' in string and open file
357 for (int i=0; i<Service.size(); i++) {
358 if (Service[i] == '/') Service[i] = '_';
359 if (isgraph(Service[i]) == 0) Service[i] = '_';
360 }
361
362 return fopen((string(Directory) + "/" + Service).c_str(), Mode);
363}
364
365//
366// Main program
367//
368int main(int argc, char *argv[]) {
369
370 if (argc != 2) {
371 printf("Usage: %s <History-Directory>\n", argv[0]);
372 exit(EXIT_FAILURE);
373 }
374
375 // Static ensures calling of destructor by exit()
376 static History Hist(argv[1]);
377
378 // Sleep until signal caught
379 pause();
380}
Note: See TracBrowser for help on using the repository browser.