Changeset 142 for Evidence/DColl
- Timestamp:
- 01/13/10 12:47:33 (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
Evidence/DColl/DColl.cc
r127 r142 5 5 - DColl subscribes to all services given by the configuration 6 6 server and writes these to the data file at every update. 7 - For each service, it creates a new service with '.hist' appended that 8 contains a history of events kept within a ring buffer. Each entry will 9 be the result of a conversion to double of the text written to the data file. 7 - One data file per day is generated, with roll-over at 13:00 local time. 8 - For each service, it creates a new service with '.hist' appended 9 that contains a history of events kept within a ring buffer. Each 10 entry will be the result of a conversion to double of the text 11 written to the data file. Only if the new value has changed by a 12 minimum amout it will be added to the ring buffer. 10 13 - The command 'DColl/Log' writes the associated string to the log 11 14 file specified in the configuration. … … 17 20 #define SERVER_NAME "DColl" 18 21 22 #define DATE_ROLLOVER 12 // localtime hour after which next date is used 23 19 24 #include "../Evidence.h" 20 21 #define NO_LINK "__&DIM&NOLINK&__" // for checking if DIMserver is alive 25 #include <math.h> 26 #include <float.h> 27 #include <sys/stat.h> 28 #include <ctype.h> 29 #include <sys/types.h> 30 #include <regex.h> 22 31 23 32 // 24 33 // Class declaration 25 34 // 26 class DataHandler: public DimClient, public DimCommand, 27 public DimBrowser, public EvidenceServer { 28 private: 29 pthread_mutex_t DataMutex; 30 pthread_mutex_t LogMutex; 31 35 class DataHandler: public DimClient, public DimBrowser, 36 public EvidenceServer { 37 38 struct Item { 39 DimStampedInfo *DataItem; 40 DimService *HistService; 41 struct EvidenceHistoryItem *HistBuffer; 42 int HistPointer; 43 double LastValue; 44 double MinAbsChange; 45 } *List; 46 47 DimCommand *LogCommand; 48 32 49 unsigned int NumItems; 33 50 FILE *DataFile; 34 51 FILE *LogFile; 35 char *Filename;36 37 DimStampedInfo **DataItem;38 DimService * *HistService;52 float DataSizekB, LogSizekB; 53 int DataSizeLastUpdate, LogSizeLastUpdate; 54 char *DataDir; 55 DimService *LogSizeService, *DataSizeService; 39 56 int HistSize; 40 struct EvidenceHistoryItem **HistBuffer; 41 int *HistPointer; 42 57 int SizeUpdateDelay; 58 int TimeForNextFile; 59 60 int RegExCount; 61 regex_t *RegEx; 62 double *MinChange; 63 43 64 void infoHandler(); 44 65 void commandHandler(); 45 bool ReallocMem(void *&, int); 66 void AddService(char *); 67 float FileSize(FILE *); 46 68 47 69 public: … … 53 75 // Constructor 54 76 // 55 DataHandler::DataHandler(): DimCommand((char *) "DColl/Log", (char *) "C"),EvidenceServer(SERVER_NAME) {77 DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) { 56 78 57 79 // Initialization to prevent freeing unallocated memory 58 80 DataFile = NULL; 59 Filename = NULL; 60 61 DataItem = NULL; 62 HistService = NULL; 63 HistBuffer = NULL; 64 HistPointer = NULL; 65 81 LogFile = NULL; 82 List = NULL; 83 LogSizeService = NULL; 84 DataSizeService = NULL; 85 86 DataSizeLastUpdate = 0; 87 LogSizeLastUpdate = 0; 88 TimeForNextFile = 0; 89 66 90 // Request configuration data 67 char * Items = GetConfig(SERVER_NAME " items");68 char *DataDir = GetConfig(SERVER_NAME " datadir");91 char *Change = GetConfig(SERVER_NAME " minchange"); 92 DataDir = GetConfig(SERVER_NAME " datadir"); 69 93 char *Logname = GetConfig(SERVER_NAME " logfile"); 94 SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate")); 70 95 HistSize = atoi(GetConfig(SERVER_NAME " histsize")); 71 96 if (HistSize < 1) HistSize = 1; // Minimum one items 72 97 73 // Create mutex for thread synchronization74 if ((errno=pthread_mutex_init(&DataMutex, NULL)) != 0) {75 Msg(FATAL, "pthread_mutex_init() failed for data file (%s)", strerror(errno));76 }77 if ((errno=pthread_mutex_init(&LogMutex, NULL)) != 0) {78 Msg(FATAL, "pthread_mutex_init() failed for log file (%s)", strerror(errno));79 }80 81 // Interpret DIM services to observe and prepare history buffer82 char *Buf, *ServiceName, *Format;83 int NumServices;84 NumItems = 0;85 char *NextToken = strtok(Items, " \t");86 while (NextToken != NULL) {87 NumServices = getServices(NextToken);88 for (int j=0; j<NumServices; j++) {89 if (getNextService(ServiceName, Format) == DimSERVICE) {90 91 // Check if already subsubed to this service92 for (int i=0; i<NumItems; i++) {93 if(strcmp(ServiceName, DataItem[i]->getName()) == 0) continue;94 }95 96 // Increase capacity of arrays97 if (!ReallocMem((void *&) DataItem, NumItems+1)) continue;98 if (!ReallocMem((void *&) HistService, NumItems+1)) continue;99 if (!ReallocMem((void *&) HistBuffer, NumItems+1)) continue;100 if (!ReallocMem((void *&) HistPointer, NumItems+1)) continue;101 102 // Subscribe to service103 DataItem[NumItems] = new DimStampedInfo(ServiceName, (char *) NO_LINK, this);104 105 // Create history service106 HistBuffer[NumItems] = new struct EvidenceHistoryItem [HistSize];107 memset(HistBuffer[NumItems], 0, HistSize*sizeof(EvidenceHistoryItem));108 HistPointer[NumItems] = 0;109 110 if (asprintf(&Buf, "%s.hist", ServiceName) == -1) {111 Msg(ERROR, "Could not create Hist service for %s because asprintf() failed", ServiceName);112 }113 else {114 HistService[NumItems] = new DimService (Buf, (char *) "C",115 HistBuffer[NumItems], HistSize*sizeof(EvidenceHistoryItem));116 free(Buf);117 }118 NumItems++;119 }120 }121 NextToken = strtok(NULL, " \t");122 }123 124 // Open data file125 time_t rawtime = time(NULL);126 struct tm * timeinfo = gmtime(&rawtime);127 if(timeinfo->tm_hour >= 13) rawtime += 12*60*60;128 timeinfo = gmtime(&rawtime);129 130 if (asprintf(&Filename, "%s/%d%02d%02d.slow", DataDir, timeinfo->tm_year+1900, timeinfo->tm_mon+1, timeinfo->tm_mday) == -1) {131 Filename = NULL;132 Msg(FATAL, "Could not create filename with asprintf()(%s)", strerror(errno));133 }134 if ((DataFile = fopen(Filename, "a")) == NULL) {135 Msg(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));136 }137 138 98 // Open log file 139 99 if ((LogFile = fopen(Logname, "a")) == NULL) { 140 Msg(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno)); 141 } 142 100 State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno)); 101 } 102 103 // Provide logging command 104 LogCommand = new DimCommand("DColl/Log", (char *) "C", this); 105 106 // Create services for file sizes 107 DataSizekB = 0; 108 DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB); 109 110 LogSizekB = FileSize(LogFile); 111 LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB); 112 113 // Count how many minimum change value regular expressions are present 114 RegExCount = 0; 115 char *Token = strtok(Change, "\t "); 116 while (Token != NULL) { 117 RegExCount++; 118 Token = strtok(NULL, "\t "); 119 } 120 121 // Allocate memory for regular expressions and minimum change values 122 RegEx = new regex_t[RegExCount]; 123 MinChange = new double [RegExCount]; 124 125 // Compile regular expressions 126 int Pos = 0; 127 for (int i=0; i<RegExCount; i++) { 128 int Len = strlen(Change+Pos) + 1; 129 Token = strtok(Change + Pos, ": \t"); 130 131 int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB); 132 if (Ret != 0) { 133 char ErrMsg[200]; 134 regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg)); 135 State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg); 136 } 137 else { 138 if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token); 139 else MinChange[i] = 0; 140 } 141 Pos += Len; 142 } 143 144 // Subscribe to list of servers at DIS_DNS 145 AddService((char *) "DIS_DNS/SERVER_LIST"); 146 147 DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***"); 143 148 } 144 149 … … 148 153 DataHandler::~DataHandler() { 149 154 155 // Delete DIM services and command first so handlers and not called anymore 156 for (int i=0; i<NumItems; i++) { 157 delete List[i].HistService; 158 delete List[i].DataItem; 159 delete[] List[i].HistBuffer; 160 } 161 free(List); 162 163 //delete LogSizeService; // These create segmentation faults?! 164 //delete DataSizeService; 165 166 delete LogCommand; 167 150 168 // Close files 151 if (LogFile != NULL &&fclose(LogFile) != 0) {152 Msg(ERROR, "Could not close log file (%s)", strerror(errno));169 if (LogFile != NULL) if (fclose(LogFile) != 0) { 170 State(ERROR, "Could not close log file (%s)", strerror(errno)); 153 171 } 154 172 if (DataFile != NULL && fclose(DataFile) != 0) { 155 Msg(ERROR, "Error: Could not close data file (%s)", strerror(errno)); 156 } 157 158 // Free all memory 159 for (int i=0; i<NumItems; i++) { 160 delete[] HistBuffer[i]; 161 delete DataItem[i]; 162 } 163 164 free(Filename); 165 free(HistService); 166 free(HistPointer); 167 free(HistBuffer); 168 free(DataItem); 169 170 // Destroy mutex 171 pthread_mutex_destroy (&LogMutex); 172 pthread_mutex_destroy (&DataMutex); 173 State(ERROR, "Error: Could not close data file (%s)", strerror(errno)); 174 } 175 176 // Free memory for regular expressions handling 177 for (int i=0; i<RegExCount; i++) { 178 regfree(&RegEx[i]); 179 } 180 delete[] MinChange; 181 delete[] RegEx; 173 182 } 174 183 … … 176 185 // Implementation of data handling 177 186 // 178 // More than one infoHandler() might run in parallel, therefore179 // the mutex mechanism is used to serialize writing to the file187 // DIM ensures infoHandler() is called serialized, therefore 188 // no mutex is needed to serialize writing to the file 180 189 void DataHandler::infoHandler() { 181 190 182 191 DimInfo *Info = getInfo(); 183 192 184 // Check if service a ctually available, if it contains data and if data file is open193 // Check if service available 185 194 if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return; 186 if (Info->getSize() == 0 || DataFile == NULL) return; 187 195 196 // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services 197 if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) { 198 char *Token = strtok(Info->getString(), "+-!|@"); 199 while (Token != NULL) { 200 char *Buf; 201 if (MakeString(&Buf, "%s/SERVICE_LIST", Token) != -1) { 202 AddService(Buf); 203 free(Buf); 204 } 205 else State(ERROR, "MakeString() failed for server %s", Token); 206 207 Token = strtok(NULL, "|"); 208 Token = strtok(NULL, "+-!|@"); 209 } 210 return; 211 } 212 213 // If service is SERVICE_LIST of any server, scan all services. 214 // Subscribe to all services (but not to commands and RPCs) 215 if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) { 216 char *Name = strtok(Info->getString(), "+-!|"); 217 while (Name != NULL) { 218 char *Type = strtok(NULL, "\n"); 219 if (Type == NULL) return; // for safety, should not happen 220 if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) { 221 AddService(Name); 222 } 223 Name = strtok(NULL, "+-!|"); 224 } 225 return; 226 } 227 228 // If it is time to open new data file, close the current one 229 if (time(NULL) >= TimeForNextFile) { 230 if (DataFile != NULL && fclose(DataFile) != 0) { 231 State(ERROR, "Error: Could not close data file (%s)", strerror(errno)); 232 } 233 DataFile = NULL; 234 } 235 236 // Open new data file if necessary 237 if (DataFile == NULL) { 238 time_t Time = time(NULL); 239 struct tm *T = localtime(&Time); 240 241 if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++; 242 if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename"); 243 244 char *Filename; 245 if (MakeString(&Filename, "%s/%d%02d%02d.slow", DataDir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == -1) State(FATAL, "Could not create filename, MakeString() failed"); 246 if ((DataFile = fopen(Filename, "a")) == NULL) { 247 State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno)); 248 } 249 else State(INFO, "Opened data file '%s'", Filename); 250 free(Filename); 251 252 // Calculate time for next file opening 253 T->tm_sec = 0; 254 T->tm_min = 0; 255 T->tm_hour = DATE_ROLLOVER; 256 TimeForNextFile = mktime(T); 257 } 258 188 259 // Identify index of service 189 260 int Service; 190 for (Service=0; Service<NumItems; Service++) if (Info == DataItem[Service]) break; 191 if (Service == NumItems) return; // Not found: should never happen 192 193 pthread_mutex_lock(&DataMutex); 194 261 for (Service=0; Service<NumItems; Service++) if (Info == List[Service].DataItem) break; 262 if (Service == NumItems) return; // Service not found 263 264 // If negative value for absolute change, ignore this entry 265 if (List[Service].MinAbsChange < 0) return; 266 195 267 // Write data header 196 268 time_t RawTime = Info->getTimestamp(); … … 199 271 fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp()); 200 272 201 // Translate info data into ASCII and write to file and to history buffer273 // Translate data into ASCII 202 274 char *Text = EvidenceServer::ToString(Info); 203 275 if (Text != NULL) { 276 // Replace all control characters by white space 277 for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' '; 278 279 // Write to file 204 280 fprintf(DataFile, "%s\n", Text); 205 281 206 HistBuffer[Service][HistPointer[Service]].Seconds = Info->getTimestamp(); 207 HistBuffer[Service][HistPointer[Service]].Value = atof(Text); 208 HistService[Service]->updateService(); 209 HistPointer[Service]++; 210 if (HistPointer[Service] >= HistSize) HistPointer[Service] = 0; 211 282 // Add to history buffer if change large enough 283 if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) { 284 List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp(); 285 List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text); 286 List[Service].HistService->updateService(); 287 List[Service].HistPointer++; 288 if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0; 289 List[Service].LastValue = atof(Text); 290 } 212 291 free(Text); 213 292 } … … 215 294 216 295 fflush(DataFile); 217 if(ferror(DataFile)) Msg(ERROR, "Error writing to data file (%s)", strerror(errno)); 218 219 pthread_mutex_unlock(&DataMutex); 220 } 221 222 // 223 // Implementation of log writing (the only command is 'DColl/Log') 296 297 // Terminate if error because otherwise infinite loop might result as 298 // next call to this infoHandler() will try to (re-)open file 299 if(ferror(DataFile)) { 300 fclose(DataFile); 301 DataFile = NULL; 302 State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno)); 303 } 304 305 // Update datafile size service 306 if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) { 307 DataSizekB = FileSize(DataFile); 308 DataSizeService->updateService(); 309 DataSizeLastUpdate = time(NULL); 310 } 311 } 312 313 // 314 // Implementation of log writing 224 315 // 225 316 void DataHandler::commandHandler() { 226 227 if (LogFile == NULL) return; // Handler might be called before file is opened228 317 229 pthread_mutex_lock(&LogMutex); 230 318 if (getCommand() != LogCommand || LogFile == NULL) return; 319 320 // Replace all carriage-return by line feed and non-printable characters 321 char *Text = getCommand()->getString(); 322 for (unsigned int i=0; i<strlen(Text); i++) { 323 if (Text[i] == '\r') Text[i] = '\n'; 324 if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' '; 325 } 326 231 327 time_t RawTime = time(NULL); 232 328 struct tm *TM = localtime(&RawTime); … … 234 330 fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n", 235 331 TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900, 236 TM->tm_hour, TM->tm_min, TM->tm_sec, getString());332 TM->tm_hour, TM->tm_min, TM->tm_sec, Text); 237 333 238 334 fflush(LogFile); 239 if(ferror(LogFile)) Msg(ERROR, "Error writing to log file (%s)", strerror(errno)); 240 pthread_mutex_unlock(&LogMutex); 241 } 242 243 244 // 245 // Implementation of memory re-allocation for pointer arrays 246 // 247 bool DataHandler::ReallocMem(void *&Memory, int Size) { 248 249 void *NewMem = realloc(Memory, Size*sizeof(void *)); 250 251 if (NewMem != NULL) { 252 Memory = NewMem; 253 return true; 254 } 335 336 // If error close file (otherwise infinite loop because State() also writes to log) 337 if(ferror(LogFile)) { 338 fclose(LogFile); 339 LogFile = NULL; 340 State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno)); 341 } 342 343 // Update logfile size service 344 if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) { 345 LogSizekB = FileSize(LogFile); 346 LogSizeService->updateService(); 347 LogSizeLastUpdate = time(NULL); 348 } 349 } 350 351 352 // 353 // Add service to watch list 354 // 355 void DataHandler::AddService(char *Name) { 356 357 // Do not subscribe to history services (otherwise infinite loop) 358 if (strstr(Name, ".hist") != NULL) return; 359 360 // Check if already subscribed to this service 361 for (int i=0; i<NumItems; i++) { 362 if(strcmp(Name, List[i].DataItem->getName()) == 0) return; 363 } 364 365 // Increase capacity of item list 366 struct Item *New = (struct Item *) realloc(List, (NumItems+1)*sizeof(struct Item)); 367 if (New != NULL) List = New; 255 368 else { 256 Msg(ERROR, "Could not allocate memory ()", strerror(errno)); 257 return false; 258 } 369 State(ERROR, "Could not allocate memory for item list, service '' not added (%s)", Name, strerror(errno)); 370 return; 371 } 372 373 // Set minimum required change by comparing to regular expressions 374 List[NumItems].MinAbsChange = 0; 375 for (int i=0; i<RegExCount; i++) { 376 if (regexec(&RegEx[i], Name, (size_t) 0, NULL, 0) == 0) { 377 List[NumItems].MinAbsChange = MinChange[i]; 378 } 379 } 380 381 List[NumItems].LastValue = DBL_MAX; 382 383 // Create history service 384 List[NumItems].HistBuffer = new struct EvidenceHistoryItem [HistSize]; 385 memset(List[NumItems].HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem)); 386 List[NumItems].HistPointer = 0; 387 388 char *Buf; 389 if (MakeString(&Buf, "%s.hist", Name) == -1) { 390 State(ERROR, "Could not create history service for '%s', MakeString() failed", Name); 391 } 392 else { 393 List[NumItems].HistService = new DimService (Buf, (char *) "C", 394 List[NumItems].HistBuffer, HistSize*sizeof(EvidenceHistoryItem)); 395 free(Buf); 396 } 397 398 // Subscribe to service 399 List[NumItems].DataItem = new DimStampedInfo(Name, NO_LINK, this); 400 401 // Increase number only after all set-up 402 NumItems++; 403 } 404 405 // 406 // Determine size of file in kB 407 // 408 float DataHandler::FileSize(FILE *File) { 409 410 struct stat FileStatus; 411 412 if (fstat(fileno(File), &FileStatus) == -1) { 413 State(WARN, "Could not determine size of file (%s)", strerror(errno)); 414 return -1; 415 } 416 417 return (float) FileStatus.st_size/1024; 259 418 } 260 419 … … 267 426 static DataHandler Data; 268 427 269 pause(); // Sleep until signal caught 270 } 428 // Sleep until signal caught 429 pause(); 430 }
Note:
See TracChangeset
for help on using the changeset viewer.