/********************************************************************\ Central data collector of the Evidence Control System - DColl subscribes to all services given by the configuration server and writes these to the data file at every update. - One data file per day is generated, with roll-over at 13:00 local time. - For each service, it creates a new service with '.hist' appended that contains a history of events kept within a ring buffer. Each entry will be the result of a conversion to double of the text written to the data file. Only if the new value has changed by a minimum amout it will be added to the ring buffer. - The history buffers are written to disk at program termination and are tired to be read when adding a service. - The command 'DColl/Log' writes the associated string to the log file specified in the configuration. Oliver Grimm, December 2009 \********************************************************************/ #define SERVER_NAME "DColl" #define DATE_ROLLOVER 12 // localtime hour after which next date is used #include "Evidence.h" #include #include #include #include #include #include #include #include #include #include using namespace std; // // Class declaration // class DataHandler: public DimClient, public DimBrowser, public EvidenceServer { struct Item { DimStampedInfo *DataItem; DimService *HistService; char *Buffer; int Next; double LastValue; double MinAbsChange; }; vector List; DimCommand *LogCommand; FILE *DataFile; FILE *LogFile; char *Filename; float DataSizekB, LogSizekB; int DataSizeLastUpdate, LogSizeLastUpdate; char *DataDir; DimService *LogSizeService, *DataSizeService, *DataFilename; string HistDir; int HistSize; int SizeUpdateDelay; int TimeForNextFile; int RegExCount; regex_t *RegEx; double *MinChange; void infoHandler(); void commandHandler(); void AddService(string); float FileSize(FILE *); public: DataHandler(); ~DataHandler(); }; // // Constructor // DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) { // Initialization to prevent freeing unallocated memory DataFile = NULL; LogFile = NULL; Filename = NULL; LogSizeService = NULL; DataSizeService = NULL; DataSizeLastUpdate = 0; LogSizeLastUpdate = 0; TimeForNextFile = 0; // Request configuration data char *Change = GetConfig(SERVER_NAME " minchange"); DataDir = GetConfig(SERVER_NAME " datadir"); char *Logname = GetConfig(SERVER_NAME " logfile"); SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate")); HistSize = atoi(GetConfig(SERVER_NAME " histsize")); if (HistSize < 3*sizeof(int)) HistSize = 3*sizeof(int); HistDir = GetConfig(SERVER_NAME " histdir"); // Open log file if ((LogFile = fopen(Logname, "a")) == NULL) { State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno)); } // Provide logging command LogCommand = new DimCommand("DColl/Log", (char *) "C", this); // Create services for file sizes and data file name DataSizekB = 0; DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB); LogSizekB = FileSize(LogFile); LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB); DataFilename = new DimService(SERVER_NAME "/CurrentFile", (char *) ""); // Count how many minimum change value regular expressions are present RegExCount = 0; char *Token = strtok(Change, "\t "); while (Token != NULL) { RegExCount++; Token = strtok(NULL, "\t "); } // Allocate memory for regular expressions and minimum change values RegEx = new regex_t[RegExCount]; MinChange = new double [RegExCount]; // Compile regular expressions int Pos = 0; for (int i=0; igetName(); for (int j=0; jgetName(), "DIS_DNS/SERVER_LIST") == 0) { char *Token = strtok(Info->getString(), "@"); while (Token != NULL) { if (isalpha(*Token) == 0) Token++; // Name can start with +,-,! AddService(string(Token)+"/SERVICE_LIST"); Token = strtok(NULL, "|"); Token = strtok(NULL, "|@"); // ???? Why needed ????? } return; } // If service is SERVICE_LIST of any server, scan all services. // Subscribe to all services (but not to commands and RPCs) if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) { char *Name = strtok(Info->getString(), "|"); while (Name != NULL) { char *Type = strtok(NULL, "\n"); if (Type == NULL) return; // for safety, should not happen if (isalpha(*Name) == 0) Name++; // Name can start with +,-,! if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) { AddService(Name); } Name = strtok(NULL, "|"); } return; } // // ====== Part B: Handle opening data files === // // If it is time to open new data file, close the current one if (time(NULL) >= TimeForNextFile) { if (DataFile != NULL && fclose(DataFile) != 0) { State(ERROR, "Error: Could not close data file (%s)", strerror(errno)); } DataFile = NULL; } // Open new data file if necessary if (DataFile == NULL) { time_t Time = time(NULL); struct tm *T = localtime(&Time); // Get time structure with date rollover if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++; if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename"); // Create direcory if not existing (ignore error if already existing) char *Dir; if (asprintf(&Dir, "%s/%d%02d",DataDir, T->tm_year+1900, T->tm_mon + 1) == -1) { State(FATAL, "asprintf() failed, could not create direcory name"); } if(mkdir(Dir, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) { State(FATAL, "Could not create direcory '%s' (%s)", Dir, strerror(errno)); } // Create filename free(Filename); if (asprintf(&Filename, "%s/%d%02d%02d.slow", Dir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == 1) { State(FATAL, "asprintf() failed, could not create filename"); } free(Dir); // Open file if ((DataFile = fopen(Filename, "a")) == NULL) { State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno)); } else State(INFO, "Opened data file '%s'", Filename); DataFilename->updateService(Filename); // Calculate time for next file opening T->tm_sec = 0; T->tm_min = 0; T->tm_hour = DATE_ROLLOVER; TimeForNextFile = mktime(T); } // // ====== Part C: Handle writing to data file === // // Identify index of service int Service; for (Service=0; Service= 0) { // Write data header time_t RawTime = Info->getTimestamp(); struct tm *TM = localtime(&RawTime); fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp()); // Translate data into ASCII char *Text = EvidenceServer::ToString(Info); if (Text != NULL) { // Replace all control characters by white space for (int i=0; i SizeUpdateDelay) { fflush(DataFile); // not continuously to reduce load DataSizekB = FileSize(DataFile); DataSizeService->updateService(); DataSizeLastUpdate = time(NULL); } } // Check for MinAbsChange // // ====== Part D: Handle history service === // if (Info->getSize() == 0) return; // Check if data should be added to history buffer char *Text = EvidenceServer::ToString(Info); if (Text != NULL && strcmp(Info->getFormat(),"C") != 0 && fabs(atof(Text)-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) { free(Text); return; } free(Text); // Check if data fits into buffer if (HistSize < Info->getSize() + 5*sizeof(int)) return; int Size = Info->getSize() + 4*sizeof(int), Next = List[Service].Next; void *WrapPos = NULL; char *Buffer = List[Service].Buffer; int Oldest = *(int *) Buffer; // Check if buffer wrap-around (write wrap mark after Oldest is adjusted) if (Next + Size >= HistSize) { WrapPos = Buffer + Next; Next = 4; } // Adapt pointer to oldest entry while ((Oldest < Next + Size) && (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) { // Check for wrap-around if (memcmp(Buffer + Oldest, WrapMark, sizeof(WrapMark)) == 0) { Oldest = 4; continue; } // Check if end marker reached, then only one event fits buffer if (memcmp(Buffer + Oldest, EndMark, sizeof(EndMark)) == 0) { Oldest = Next; break; } // Move to next entry Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int); } // Update pointer in buffer *(int *) Buffer = Oldest; // Write wrap mark if necessary if (WrapPos != NULL) memcpy(WrapPos, WrapMark, sizeof(WrapMark)); // Copy data into ring buffer *((int *) (Buffer + Next)) = Info->getTimestamp(); *((int *) (Buffer + Next + sizeof(int))) = Info->getSize(); memcpy(Buffer + Next + 2*sizeof(int), Info->getData(), Info->getSize()); // Adjust pointer for next entry and write end marker to buffer Next += Info->getSize() + 2*sizeof(int); memcpy(Buffer + Next, EndMark, sizeof(EndMark)); List[Service].Next = Next; } // // Implementation of log writing // void DataHandler::commandHandler() { if (getCommand() != LogCommand || LogFile == NULL) return; // Replace all carriage-return by line feed and non-printable characters char *Text = getCommand()->getString(); for (unsigned int i=0; itm_mday, TM->tm_mon+1, TM->tm_year+1900, TM->tm_hour, TM->tm_min, TM->tm_sec, Text); fflush(LogFile); // If error close file (otherwise infinite loop because State() also writes to log) if(ferror(LogFile)) { fclose(LogFile); LogFile = NULL; State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno)); } // Update logfile size service if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) { LogSizekB = FileSize(LogFile); LogSizeService->updateService(); LogSizeLastUpdate = time(NULL); } } // // Add service to watch list // void DataHandler::AddService(string Name) { // Do not subscribe to history services (otherwise infinite loop) if (Name.find(".hist") != string::npos) return; // Check if already subscribed to this service for (int i=0; igetName()) return; } struct Item New; // Set minimum required change by comparing to regular expressions New.MinAbsChange = 0; for (int i=0; igetName(); for (int j=0; j