source: Evidence/DColl/DColl.cc @ 127

Last change on this file since 127 was 127, checked in by ogrimm, 11 years ago
First commit of Evidence control system core components
File size: 7.9 KB
Line 
1/********************************************************************\
2
3  Central data collector of the Evidence Control System
4 
5  - DColl subscribes to all services given by the configuration
6    server and writes these to the data file at every update.
7  - For each service, it creates a new service with '.hist' appended that
8    contains a history of events kept within a ring buffer. Each entry will
9        be the result of a conversion to double of the text written to the data file.
10  - The command 'DColl/Log' writes the associated string to the log
11    file specified in the configuration.
12 
13  Oliver Grimm, December 2009
14
15\********************************************************************/
16
17#define SERVER_NAME "DColl"
18
19#include "../Evidence.h"
20
21#define NO_LINK "__&DIM&NOLINK&__" // for checking if DIMserver is alive
22
23//
24// Class declaration
25//
26class DataHandler:      public DimClient, public DimCommand,
27                                        public DimBrowser, public EvidenceServer {
28  private:
29    pthread_mutex_t DataMutex;
30    pthread_mutex_t LogMutex;
31
32    unsigned int NumItems;
33    FILE *DataFile;
34    FILE *LogFile;
35    char *Filename;
36
37    DimStampedInfo **DataItem;
38    DimService **HistService;       
39    int HistSize;
40    struct EvidenceHistoryItem **HistBuffer;
41        int *HistPointer;
42       
43    void infoHandler();
44    void commandHandler();
45    bool ReallocMem(void *&, int);
46           
47  public:
48    DataHandler();
49    ~DataHandler();
50}; 
51
52//
53// Constructor
54//
55DataHandler::DataHandler(): DimCommand((char *) "DColl/Log", (char *) "C"), EvidenceServer(SERVER_NAME) {
56
57  // Initialization to prevent freeing unallocated memory
58  DataFile = NULL;
59  Filename = NULL;
60
61  DataItem = NULL;
62  HistService = NULL;
63  HistBuffer = NULL;
64  HistPointer = NULL;
65 
66  // Request configuration data
67  char *Items = GetConfig(SERVER_NAME " items");
68  char *DataDir = GetConfig(SERVER_NAME " datadir");
69  char *Logname = GetConfig(SERVER_NAME " logfile");
70  HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
71  if (HistSize < 1) HistSize = 1; // Minimum one items
72   
73  // Create mutex for thread synchronization
74  if ((errno=pthread_mutex_init(&DataMutex, NULL)) != 0) {
75    Msg(FATAL, "pthread_mutex_init() failed for data file (%s)", strerror(errno));
76  }
77  if ((errno=pthread_mutex_init(&LogMutex, NULL)) != 0) {
78    Msg(FATAL, "pthread_mutex_init() failed for log file (%s)", strerror(errno));
79  }
80
81  // Interpret DIM services to observe and prepare history buffer
82  char *Buf, *ServiceName, *Format;
83  int NumServices;
84  NumItems = 0;
85  char *NextToken = strtok(Items, " \t");
86  while (NextToken != NULL) {
87    NumServices = getServices(NextToken);
88    for (int j=0; j<NumServices; j++)  {
89          if (getNextService(ServiceName, Format) == DimSERVICE) {
90
91            // Check if already subsubed to this service
92                for (int i=0; i<NumItems; i++) {
93                  if(strcmp(ServiceName, DataItem[i]->getName()) == 0) continue;
94                }
95               
96            // Increase capacity of arrays
97            if (!ReallocMem((void *&) DataItem, NumItems+1)) continue;
98            if (!ReallocMem((void *&) HistService, NumItems+1)) continue;
99            if (!ReallocMem((void *&) HistBuffer, NumItems+1)) continue;
100            if (!ReallocMem((void *&) HistPointer, NumItems+1)) continue;
101
102                // Subscribe to service
103        DataItem[NumItems] = new DimStampedInfo(ServiceName, (char *) NO_LINK, this);
104
105        // Create history service
106        HistBuffer[NumItems] = new struct EvidenceHistoryItem [HistSize];
107                memset(HistBuffer[NumItems], 0, HistSize*sizeof(EvidenceHistoryItem));
108                HistPointer[NumItems] = 0;
109
110        if (asprintf(&Buf, "%s.hist", ServiceName) == -1) {
111          Msg(ERROR, "Could not create Hist service for %s because asprintf() failed", ServiceName);
112        }
113        else {
114                  HistService[NumItems] = new DimService (Buf, (char *) "C",
115                                                                HistBuffer[NumItems], HistSize*sizeof(EvidenceHistoryItem));
116          free(Buf);
117                }
118                NumItems++;
119          }
120        }
121    NextToken = strtok(NULL, " \t");
122  }
123
124  // Open data file
125  time_t rawtime = time(NULL);
126  struct tm * timeinfo = gmtime(&rawtime);
127  if(timeinfo->tm_hour >= 13) rawtime += 12*60*60;
128  timeinfo = gmtime(&rawtime);
129
130  if (asprintf(&Filename, "%s/%d%02d%02d.slow", DataDir, timeinfo->tm_year+1900, timeinfo->tm_mon+1, timeinfo->tm_mday) == -1) {
131    Filename = NULL;
132    Msg(FATAL, "Could not create filename with asprintf()(%s)", strerror(errno));
133  }
134  if ((DataFile = fopen(Filename, "a")) == NULL) {
135    Msg(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
136  }
137 
138  // Open log file
139  if ((LogFile = fopen(Logname, "a")) == NULL) {
140    Msg(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
141  }
142
143}
144
145//
146// Destructor: Close files and free memory
147//
148DataHandler::~DataHandler() {
149
150  // Close files
151  if (LogFile != NULL && fclose(LogFile) != 0) {
152        Msg(ERROR, "Could not close log file (%s)", strerror(errno));
153  }
154  if (DataFile != NULL && fclose(DataFile) != 0) {
155        Msg(ERROR, "Error: Could not close data file (%s)", strerror(errno));
156  }     
157
158  // Free all memory
159  for (int i=0; i<NumItems; i++) {
160    delete[] HistBuffer[i];
161    delete DataItem[i];
162  }
163
164  free(Filename);
165  free(HistService);
166  free(HistPointer);
167  free(HistBuffer);
168  free(DataItem);
169 
170  // Destroy mutex
171  pthread_mutex_destroy (&LogMutex);
172  pthread_mutex_destroy (&DataMutex);
173}
174
175//
176// Implementation of data handling
177//
178// More than one infoHandler() might run in parallel, therefore
179// the mutex mechanism is used to serialize writing to the file
180void DataHandler::infoHandler() {
181 
182  DimInfo *Info = getInfo();
183
184  // Check if service actually available, if it contains data and if data file is open
185  if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
186  if (Info->getSize() == 0 || DataFile == NULL) return;
187
188  // Identify index of service
189  int Service; 
190  for (Service=0; Service<NumItems; Service++) if (Info == DataItem[Service]) break;
191  if (Service == NumItems) return;  // Not found: should never happen
192
193  pthread_mutex_lock(&DataMutex);
194 
195  // Write data header
196  time_t RawTime = Info->getTimestamp();
197  struct tm *TM = localtime(&RawTime);
198
199  fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
200
201  // Translate info data into ASCII and write to file and to history buffer
202  char *Text = EvidenceServer::ToString(Info);
203  if (Text != NULL) {
204    fprintf(DataFile, "%s\n", Text);
205       
206        HistBuffer[Service][HistPointer[Service]].Seconds = Info->getTimestamp();
207    HistBuffer[Service][HistPointer[Service]].Value = atof(Text);
208    HistService[Service]->updateService();
209        HistPointer[Service]++;
210        if (HistPointer[Service] >= HistSize) HistPointer[Service] = 0;
211
212        free(Text);
213  }
214  else fprintf(DataFile, "Cannot interpret format identifier\n");
215 
216  fflush(DataFile);
217  if(ferror(DataFile)) Msg(ERROR, "Error writing to data file (%s)", strerror(errno));
218
219  pthread_mutex_unlock(&DataMutex);
220}
221
222//
223// Implementation of log writing (the only command is 'DColl/Log')
224//
225void DataHandler::commandHandler() {
226
227  if (LogFile == NULL) return;  // Handler might be called before file is opened
228 
229  pthread_mutex_lock(&LogMutex);
230
231  time_t RawTime = time(NULL);
232  struct tm *TM = localtime(&RawTime);
233
234  fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
235                TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
236                TM->tm_hour, TM->tm_min, TM->tm_sec, getString());
237
238  fflush(LogFile);
239  if(ferror(LogFile)) Msg(ERROR, "Error writing to log file (%s)", strerror(errno));
240  pthread_mutex_unlock(&LogMutex);
241}
242
243
244//
245// Implementation of memory re-allocation for pointer arrays
246//
247bool DataHandler::ReallocMem(void *&Memory, int Size) {
248
249  void *NewMem = realloc(Memory, Size*sizeof(void *));
250
251  if (NewMem != NULL) {
252    Memory = NewMem;
253        return true;
254  }
255  else {
256        Msg(ERROR, "Could not allocate memory ()", strerror(errno));
257        return false;
258  }
259}
260
261//         
262// Main program
263//
264int main() {
265       
266  // Static ensures calling of destructor by exit()
267  static DataHandler Data;
268 
269  pause();        // Sleep until signal caught
270}
Note: See TracBrowser for help on using the repository browser.