/********************************************************************\ Central data collector of the Evidence Control System - DColl subscribes to all services given by the configuration server and writes these to the data file at every update. - One data file per day is generated, with roll-over at 13:00 local time. - For each service, it creates a new service with '.hist' appended that contains a history of events kept within a ring buffer. Each entry will be the result of a conversion to double of the text written to the data file. Only if the new value has changed by a minimum amout it will be added to the ring buffer. - The history buffers are written to disk at program termination and are tired to be read when adding a service. - The command 'DColl/Log' writes the associated string to the log file specified in the configuration. Oliver Grimm, December 2009 \********************************************************************/ #define SERVER_NAME "DColl" #define DATE_ROLLOVER 12 // localtime hour after which next date is used #define MIN_HIST_SIZE 1024 // Minimum history buffer in bytes (> 3*sizeof(int) !) #include "Evidence.h" #include #include #include #include #include #include #include #include #include #include using namespace std; // // Class declaration // class DataHandler: public DimClient, public DimBrowser, public EvidenceServer { struct Item { DimStampedInfo *DataItem; DimService *HistService; char *Buffer; unsigned int HistSize; int Next; double LastValue; double MinAbsChange; }; vector List; DimCommand *LogCommand; FILE *DataFile; FILE *LogFile; char *Filename; float DataSizekB, LogSizekB; int DataSizeLastUpdate, LogSizeLastUpdate; char *DataDir; DimService *LogSizeService, *DataSizeService, *DataFilename; string HistDir; int SizeUpdateDelay; int TimeForNextFile; int RegExCount; regex_t *RegEx; double *MinChange; unsigned int *HistSize; void infoHandler(); void commandHandler(); void AddService(string); void RemoveService(string); float FileSize(FILE *); public: DataHandler(); ~DataHandler(); }; // // Constructor // DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) { // Initialization to prevent freeing unallocated memory DataFile = NULL; LogFile = NULL; Filename = NULL; LogSizeService = NULL; DataSizeService = NULL; DataSizeLastUpdate = 0; LogSizeLastUpdate = 0; TimeForNextFile = 0; // Request configuration data DataDir = GetConfig("datadir"); SizeUpdateDelay = atoi(GetConfig("sizeupdate")); HistDir = GetConfig("histdir"); // Open log file char *Logname = GetConfig("logfile"); if ((LogFile = fopen(Logname, "a")) == NULL) { State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno)); } // Provide logging command LogCommand = new DimCommand("DColl/Log", (char *) "C", this); // Create services for file sizes and data file name DataSizekB = 0; DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB); LogSizekB = FileSize(LogFile); LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB); DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) ""); // Count how many minimum change value regular expressions are present char *Change = GetConfig("items"); RegExCount = 0; char *Token = strtok(Change, "\t "); while (Token != NULL) { RegExCount++; Token = strtok(NULL, "\t "); } // Allocate memory for regular expressions, minimum change and history size RegEx = new regex_t[RegExCount]; MinChange = new double [RegExCount]; HistSize = new unsigned int [RegExCount]; // Compile regular expressions and extract minimum change and history size int Pos = 0; for (int i=0; igetName()); delete LogCommand; delete DataFilename; delete[] Filename; //delete LogSizeService; // These create segmentation faults?! //delete DataSizeService; // Close files if (LogFile != NULL) if (fclose(LogFile) != 0) { State(ERROR, "Could not close log file (%s)", strerror(errno)); } if (DataFile != NULL && fclose(DataFile) != 0) { State(ERROR, "Error: Could not close data file (%s)", strerror(errno)); } // Free memory for regular expressions handling for (int i=0; igetName(), "DIS_DNS/SERVER_LIST") == 0) { char *Token = strtok(Info->getString(), "+-!@"); while (Token != NULL) { if (*Info->getString()=='-' || *Info->getString()=='!') RemoveService(string(Token)+"/SERVICE_LIST"); else AddService(string(Token)+"/SERVICE_LIST"); Token = strtok(NULL, "|"); // Skip server IP address Token = strtok(NULL, "@"); } return; } // If service is SERVICE_LIST of any server, scan all services. // Subscribe to all services (but not to commands and RPCs) if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) { char *Name = strtok(Info->getString(), "+-!|"); while (Name != NULL) { // Check if item is a service char *Type = strtok(NULL, "\n"); if (Type == NULL) return; // for safety, should not happen if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) { // Add or remove service if (*Info->getString()=='-' || *Info->getString()=='!') RemoveService(Name); else AddService(Name); } Name = strtok(NULL, "|"); } return; } // // ====== Part B: Handle opening data files === // // If it is time to open new data file, close the current one if (time(NULL) >= TimeForNextFile) { if (DataFile != NULL && fclose(DataFile) != 0) { State(ERROR, "Error: Could not close data file (%s)", strerror(errno)); } DataFile = NULL; } // Open new data file if necessary if (DataFile == NULL) { time_t Time = time(NULL); struct tm *T = localtime(&Time); // Get time structure with date rollover if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++; if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename"); // Create direcory if not existing (ignore error if already existing) char *Dir; if (asprintf(&Dir, "%s/%d%02d",DataDir, T->tm_year+1900, T->tm_mon + 1) == -1) { State(FATAL, "asprintf() failed, could not create direcory name"); } if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) { State(FATAL, "Could not create directory '%s' (%s)", Dir, strerror(errno)); } // Create filename free(Filename); if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) { State(FATAL, "asprintf() failed, could not create filename"); } free(Dir); // Open file if ((DataFile = fopen(Filename, "a")) == NULL) { State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno)); } else State(INFO, "Opened data file '%s'", Filename); DataFilename->updateService(Filename); // Calculate time for next file opening T->tm_sec = 0; T->tm_min = 0; T->tm_hour = DATE_ROLLOVER; TimeForNextFile = mktime(T); } // // ====== Part C: Handle writing to data file === // // Identify index of service int Service; for (Service=0; Service= 0) { // Write data header time_t RawTime = Info->getTimestamp(); struct tm *TM = localtime(&RawTime); fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp()); // Translate data into ASCII char *Text = EvidenceServer::ToString(Info); if (Text != NULL) { // Replace all control characters by white space for (int i=0; i SizeUpdateDelay) { fflush(DataFile); // not continuously to reduce load DataSizekB = FileSize(DataFile); DataSizeService->updateService(); DataSizeLastUpdate = time(NULL); } } // Check for disk writing // // ====== Part D: Handle history service === // if (Info->getSize() == 0) return; // Check if data should be added to history buffer if (strcmp(Info->getFormat(),"C") != 0 && strlen(Info->getFormat())==1) { // Calculate sum of all number in array char *Text = EvidenceServer::ToString(Info); char *Token = strtok(Text, " "); double Sum = 0; while (Token != NULL) { Sum += atof(Token); Token = strtok(NULL, " "); } free(Text); // Minimum change? if (fabs(Sum-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) return; List[Service].LastValue = Sum; } // Check if data fits into buffer if (List[Service].HistSize < Info->getSize() + 5*sizeof(int)) return; int Size = Info->getSize() + 4*sizeof(int), Next = List[Service].Next; void *WrapPos = NULL; char *Buffer = List[Service].Buffer; int Oldest = *(int *) Buffer; // Check if buffer wrap-around (write wrap mark after Oldest is adjusted) if (Next + Size >= List[Service].HistSize) { WrapPos = Buffer + Next; Next = 4; } // Adapt pointer to oldest entry while ((Oldest < Next + Size) && (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) { // Check for wrap-around if (memcmp(Buffer + Oldest, WrapMark, sizeof(WrapMark)) == 0) { Oldest = 4; continue; } // Check if end marker reached, then only one event fits buffer if (memcmp(Buffer + Oldest, EndMark, sizeof(EndMark)) == 0) { Oldest = Next; break; } // Move to next entry Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int); } // Update pointer in buffer *(int *) Buffer = Oldest; // Write wrap mark if necessary if (WrapPos != NULL) memcpy(WrapPos, WrapMark, sizeof(WrapMark)); // Copy data into ring buffer *((int *) (Buffer + Next)) = Info->getTimestamp(); *((int *) (Buffer + Next + sizeof(int))) = Info->getSize(); memcpy(Buffer + Next + 2*sizeof(int), Info->getData(), Info->getSize()); // Adjust pointer for next entry and write end marker to buffer Next += Info->getSize() + 2*sizeof(int); memcpy(Buffer + Next, EndMark, sizeof(EndMark)); List[Service].Next = Next; } // // Implementation of log writing // void DataHandler::commandHandler() { if (getCommand() != LogCommand || LogFile == NULL) return; // Replace all carriage-return by line feed and non-printable characters char *Text = getCommand()->getString(); for (unsigned int i=0; itm_mday, TM->tm_mon+1, TM->tm_year+1900, TM->tm_hour, TM->tm_min, TM->tm_sec, Text); fflush(LogFile); // If error close file (otherwise infinite loop because State() also writes to log) if(ferror(LogFile)) { fclose(LogFile); LogFile = NULL; State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno)); } // Update logfile size service if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) { LogSizekB = FileSize(LogFile); LogSizeService->updateService(); LogSizeLastUpdate = time(NULL); } } // // Add service to watch list // void DataHandler::AddService(string Name) { // Do not subscribe to history services (otherwise infinite loop) if (Name.find(".hist") != string::npos) return; // Check if already subscribed to this service for (int i=0; igetName()) return; } // Set minimum required change by comparing to regular expressions struct Item New; New.MinAbsChange = 0; New.HistSize = 0; for (int i=0; igetName(); for (int j=0; j::iterator E; for (E=List.begin(); EgetName()) { // Delete subscription first so handler and not called anymore delete (*E).DataItem; // Save history buffer (replace '/' by '_') string Name = (*E).HistService->getName(); for (int j=0; j