source: Evidence/DColl/DColl.cc@ 132

Last change on this file since 132 was 127, checked in by ogrimm, 15 years ago
First commit of Evidence control system core components
File size: 7.9 KB
Line 
1/********************************************************************\
2
3 Central data collector of the Evidence Control System
4
5 - DColl subscribes to all services given by the configuration
6 server and writes these to the data file at every update.
7 - For each service, it creates a new service with '.hist' appended that
8 contains a history of events kept within a ring buffer. Each entry will
9 be the result of a conversion to double of the text written to the data file.
10 - The command 'DColl/Log' writes the associated string to the log
11 file specified in the configuration.
12
13 Oliver Grimm, December 2009
14
15\********************************************************************/
16
17#define SERVER_NAME "DColl"
18
19#include "../Evidence.h"
20
21#define NO_LINK "__&DIM&NOLINK&__" // for checking if DIMserver is alive
22
23//
24// Class declaration
25//
26class DataHandler: public DimClient, public DimCommand,
27 public DimBrowser, public EvidenceServer {
28 private:
29 pthread_mutex_t DataMutex;
30 pthread_mutex_t LogMutex;
31
32 unsigned int NumItems;
33 FILE *DataFile;
34 FILE *LogFile;
35 char *Filename;
36
37 DimStampedInfo **DataItem;
38 DimService **HistService;
39 int HistSize;
40 struct EvidenceHistoryItem **HistBuffer;
41 int *HistPointer;
42
43 void infoHandler();
44 void commandHandler();
45 bool ReallocMem(void *&, int);
46
47 public:
48 DataHandler();
49 ~DataHandler();
50};
51
52//
53// Constructor
54//
55DataHandler::DataHandler(): DimCommand((char *) "DColl/Log", (char *) "C"), EvidenceServer(SERVER_NAME) {
56
57 // Initialization to prevent freeing unallocated memory
58 DataFile = NULL;
59 Filename = NULL;
60
61 DataItem = NULL;
62 HistService = NULL;
63 HistBuffer = NULL;
64 HistPointer = NULL;
65
66 // Request configuration data
67 char *Items = GetConfig(SERVER_NAME " items");
68 char *DataDir = GetConfig(SERVER_NAME " datadir");
69 char *Logname = GetConfig(SERVER_NAME " logfile");
70 HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
71 if (HistSize < 1) HistSize = 1; // Minimum one items
72
73 // Create mutex for thread synchronization
74 if ((errno=pthread_mutex_init(&DataMutex, NULL)) != 0) {
75 Msg(FATAL, "pthread_mutex_init() failed for data file (%s)", strerror(errno));
76 }
77 if ((errno=pthread_mutex_init(&LogMutex, NULL)) != 0) {
78 Msg(FATAL, "pthread_mutex_init() failed for log file (%s)", strerror(errno));
79 }
80
81 // Interpret DIM services to observe and prepare history buffer
82 char *Buf, *ServiceName, *Format;
83 int NumServices;
84 NumItems = 0;
85 char *NextToken = strtok(Items, " \t");
86 while (NextToken != NULL) {
87 NumServices = getServices(NextToken);
88 for (int j=0; j<NumServices; j++) {
89 if (getNextService(ServiceName, Format) == DimSERVICE) {
90
91 // Check if already subsubed to this service
92 for (int i=0; i<NumItems; i++) {
93 if(strcmp(ServiceName, DataItem[i]->getName()) == 0) continue;
94 }
95
96 // Increase capacity of arrays
97 if (!ReallocMem((void *&) DataItem, NumItems+1)) continue;
98 if (!ReallocMem((void *&) HistService, NumItems+1)) continue;
99 if (!ReallocMem((void *&) HistBuffer, NumItems+1)) continue;
100 if (!ReallocMem((void *&) HistPointer, NumItems+1)) continue;
101
102 // Subscribe to service
103 DataItem[NumItems] = new DimStampedInfo(ServiceName, (char *) NO_LINK, this);
104
105 // Create history service
106 HistBuffer[NumItems] = new struct EvidenceHistoryItem [HistSize];
107 memset(HistBuffer[NumItems], 0, HistSize*sizeof(EvidenceHistoryItem));
108 HistPointer[NumItems] = 0;
109
110 if (asprintf(&Buf, "%s.hist", ServiceName) == -1) {
111 Msg(ERROR, "Could not create Hist service for %s because asprintf() failed", ServiceName);
112 }
113 else {
114 HistService[NumItems] = new DimService (Buf, (char *) "C",
115 HistBuffer[NumItems], HistSize*sizeof(EvidenceHistoryItem));
116 free(Buf);
117 }
118 NumItems++;
119 }
120 }
121 NextToken = strtok(NULL, " \t");
122 }
123
124 // Open data file
125 time_t rawtime = time(NULL);
126 struct tm * timeinfo = gmtime(&rawtime);
127 if(timeinfo->tm_hour >= 13) rawtime += 12*60*60;
128 timeinfo = gmtime(&rawtime);
129
130 if (asprintf(&Filename, "%s/%d%02d%02d.slow", DataDir, timeinfo->tm_year+1900, timeinfo->tm_mon+1, timeinfo->tm_mday) == -1) {
131 Filename = NULL;
132 Msg(FATAL, "Could not create filename with asprintf()(%s)", strerror(errno));
133 }
134 if ((DataFile = fopen(Filename, "a")) == NULL) {
135 Msg(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
136 }
137
138 // Open log file
139 if ((LogFile = fopen(Logname, "a")) == NULL) {
140 Msg(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
141 }
142
143}
144
145//
146// Destructor: Close files and free memory
147//
148DataHandler::~DataHandler() {
149
150 // Close files
151 if (LogFile != NULL && fclose(LogFile) != 0) {
152 Msg(ERROR, "Could not close log file (%s)", strerror(errno));
153 }
154 if (DataFile != NULL && fclose(DataFile) != 0) {
155 Msg(ERROR, "Error: Could not close data file (%s)", strerror(errno));
156 }
157
158 // Free all memory
159 for (int i=0; i<NumItems; i++) {
160 delete[] HistBuffer[i];
161 delete DataItem[i];
162 }
163
164 free(Filename);
165 free(HistService);
166 free(HistPointer);
167 free(HistBuffer);
168 free(DataItem);
169
170 // Destroy mutex
171 pthread_mutex_destroy (&LogMutex);
172 pthread_mutex_destroy (&DataMutex);
173}
174
175//
176// Implementation of data handling
177//
178// More than one infoHandler() might run in parallel, therefore
179// the mutex mechanism is used to serialize writing to the file
180void DataHandler::infoHandler() {
181
182 DimInfo *Info = getInfo();
183
184 // Check if service actually available, if it contains data and if data file is open
185 if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
186 if (Info->getSize() == 0 || DataFile == NULL) return;
187
188 // Identify index of service
189 int Service;
190 for (Service=0; Service<NumItems; Service++) if (Info == DataItem[Service]) break;
191 if (Service == NumItems) return; // Not found: should never happen
192
193 pthread_mutex_lock(&DataMutex);
194
195 // Write data header
196 time_t RawTime = Info->getTimestamp();
197 struct tm *TM = localtime(&RawTime);
198
199 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
200
201 // Translate info data into ASCII and write to file and to history buffer
202 char *Text = EvidenceServer::ToString(Info);
203 if (Text != NULL) {
204 fprintf(DataFile, "%s\n", Text);
205
206 HistBuffer[Service][HistPointer[Service]].Seconds = Info->getTimestamp();
207 HistBuffer[Service][HistPointer[Service]].Value = atof(Text);
208 HistService[Service]->updateService();
209 HistPointer[Service]++;
210 if (HistPointer[Service] >= HistSize) HistPointer[Service] = 0;
211
212 free(Text);
213 }
214 else fprintf(DataFile, "Cannot interpret format identifier\n");
215
216 fflush(DataFile);
217 if(ferror(DataFile)) Msg(ERROR, "Error writing to data file (%s)", strerror(errno));
218
219 pthread_mutex_unlock(&DataMutex);
220}
221
222//
223// Implementation of log writing (the only command is 'DColl/Log')
224//
225void DataHandler::commandHandler() {
226
227 if (LogFile == NULL) return; // Handler might be called before file is opened
228
229 pthread_mutex_lock(&LogMutex);
230
231 time_t RawTime = time(NULL);
232 struct tm *TM = localtime(&RawTime);
233
234 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
235 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
236 TM->tm_hour, TM->tm_min, TM->tm_sec, getString());
237
238 fflush(LogFile);
239 if(ferror(LogFile)) Msg(ERROR, "Error writing to log file (%s)", strerror(errno));
240 pthread_mutex_unlock(&LogMutex);
241}
242
243
244//
245// Implementation of memory re-allocation for pointer arrays
246//
247bool DataHandler::ReallocMem(void *&Memory, int Size) {
248
249 void *NewMem = realloc(Memory, Size*sizeof(void *));
250
251 if (NewMem != NULL) {
252 Memory = NewMem;
253 return true;
254 }
255 else {
256 Msg(ERROR, "Could not allocate memory ()", strerror(errno));
257 return false;
258 }
259}
260
261//
262// Main program
263//
264int main() {
265
266 // Static ensures calling of destructor by exit()
267 static DataHandler Data;
268
269 pause(); // Sleep until signal caught
270}
Note: See TracBrowser for help on using the repository browser.