/********************************************************************\ History server of the Evidence Control System - Subscribes to all services and keeps a ring buffer for each service. - Data added to the buffer only if changed by minimum amount given by configuration information. - The history is available via an rpc call. - The history buffers are written to disk at program termination and are tried to be read when adding a service. - The buffer should hold at least REQUEST_NUM entries (of the size currently handled in infoHandler()), but should not be larger than MAX_SIZE_KB. Oliver Grimm, June 2010 \********************************************************************/ #define SERVER_NAME "History" #include "Evidence.h" #include #include #include #include #include #include #include using namespace std; const int MIN_SIZE_KB = 50; // Min and max buffersize in kByte (> 3*sizeof(int) !) const string DEFAULT_MAX_SIZE_KB = "2000"; const string DEFAULT_NUM_ENTRIES = "1000"; // Number of entries in each history buffer // // Class declaration // class History: public DimRpc, public DimClient, public EvidenceServer { struct Item { DimStampedInfo *DataItem; vector Buffer; int Next; double LastValue; double MinAbsChange; string Format; }; map Map; DimInfo *ServerList; char *Directory; string Change; void infoHandler(); void rpcHandler(); void AddService(string, const char *); void RemoveService(string); off_t FileSize(FILE *); FILE *OpenFile(string, const char *); public: History(char *); ~History(); }; // Constructor History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"), EvidenceServer(SERVER_NAME), Directory(Dir) { // Get/initialize configuration Change = GetConfig("minchange", " "); GetConfig("maxsize_kb", DEFAULT_MAX_SIZE_KB); GetConfig("numentries", DEFAULT_NUM_ENTRIES); // Subscribe to top-level server list ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this); } // Destructor deletes all DIM subscriptions History::~History() { delete ServerList; while (Map.size() != 0) RemoveService((*Map.begin()).first); } // Implementation of data handling void History::infoHandler() { DimInfo *I = getInfo(); // Check if service available if (!ServiceOK(I)) return; // ====== Part A: Handle service subscriptions === // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) { char *Token = strtok(I->getString(), "+-!@"); while (Token != NULL) { AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!' Token = strtok(NULL, "|"); // Skip server IP address Token = strtok(NULL, "@"); } return; } // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services if (strstr(I->getName(), "/SERVICE_LIST") != NULL) { // Bug fix for empty SERVICE_LIST if (strlen(I->getString()) == 0) { string Tmp(I->getName()); RemoveService(I->getName()); AddService(Tmp.c_str(), (char *) "C"); return; } char *Type, *Name = strtok(I->getString(), "+-!|"); while (Name != NULL) { // Only consider DIM services (not commands and RPCs) if (((Type = strtok(NULL, "\n")) != NULL) && (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) { if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name); else { Type[strlen(Type)-1] = '\0'; // Isolate service format AddService(Name, Type); } } Name = strtok(NULL, "|"); } return; } // ====== Part B: Handle history service === char *Service = I->getName(); // Check if service known and ignore empty or illegal time stamped service if (Map.count(Service) == 0 || I->getSize()==0 || I->getTimestamp()<=0) return; // Resize buffer if necessary int NEntries = atoi(GetConfig("numentries").c_str()); if (Map[Service].Buffer.size() < NEntries*I->getSize()) { if (NEntries*I->getSize() < atoi(GetConfig("maxsize_kb").c_str())*1024) { Map[Service].Buffer.resize(NEntries*I->getSize()); } } // If data is number of single type, check minumum change before adding to history if (strcmp(I->getFormat(), "C") != 0) { // Calculate sum of all number in array istringstream Text(EvidenceServer::ToString(I->getFormat(), I->getData(), I->getSize())); double Num, Sum = 0; while (Text.good()) { Text >> Num; Sum += fabs(Num); } // Minimum change? if (fabs(Sum-Map[Service].LastValue) < fabs(Map[Service].MinAbsChange)) return; Map[Service].LastValue = Sum; } // Check if data fits into buffer if (Map[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return; int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = Map[Service].Next; void *WrapPos = NULL; char *Buffer = &Map[Service].Buffer[0]; int Oldest = *(int *) Buffer; // Check if buffer wrap-around (write wrap mark after Oldest is adjusted) if (Next + Size >= Map[Service].Buffer.size()) { WrapPos = Buffer + Next; Next = sizeof(int); } // Adapt pointer to oldest entry while ((Oldest < Next + Size) && (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) { // Check for wrap-around if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) { Oldest = sizeof(int); continue; } // Check if end marker reached, then only one event fits buffer if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) { Oldest = Next; break; } // Move to next entry Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int); } // Update pointer in buffer *(int *) Buffer = Oldest; // Write wrap mark if necessary if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)); // Copy data into ring buffer *((int *) (Buffer + Next)) = I->getTimestamp(); *((int *) (Buffer + Next + sizeof(int))) = I->getSize(); memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize()); // Adjust pointer for next entry and write end marker to buffer Next += I->getSize() + sizeof(EvidenceHistory::Item); memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)); Map[Service].Next = Next; } // Implementation of history buffer distribution void History::rpcHandler() { string Name = ToString((char *) "C", getData(), getSize()); // Search for history buffer in memory if (Map.count(Name) == 1) { char *Buffer = new char [Map[Name].Format.size()+1+Map[Name].Buffer.size()]; strcpy(Buffer, Map[Name].Format.c_str()); memcpy(Buffer+Map[Name].Format.size()+1, &Map[Name].Buffer[0], Map[Name].Buffer.size()); setData((void *) Buffer, Map[Name].Format.size()+1+Map[Name].Buffer.size()); delete[] Buffer; return; } // Try to open history file if not found in memory FILE *File = OpenFile(Name, "rb"); if (File == NULL) { setData(NULL, 0); return; } // Read history file and send to client (data will contain format string and history) off_t Size = FileSize(File); if (Size != -1) { char *Buffer = new char [Size-sizeof(int)]; fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) { Message(WARN, "Error reading history file '%s' in rpcHandler()", Name.c_str()); setData(NULL, 0); // Default response } else setData((void *) Buffer, Size); delete[] Buffer; } if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", Name.c_str()); } // // Add service to watch list // void History::AddService(string Name, const char *Format) { // Return if already subscribed to this service if (Map.count(Name) != 0) return; // Create new service subscription Map[Name].LastValue = DBL_MAX; Map[Name].Format = Format; Map[Name].MinAbsChange = 0.0; // Set minimum required change if given in configuratrion size_t Pos = Change.find(Name+":"); if (Pos != string::npos) Map[Name].MinAbsChange = atof(Change.c_str() + Pos + Name.size() + 1); // Load history buffer from file if existing FILE *File = OpenFile(Name, "rb"); off_t Size; if (File != NULL && (Size = FileSize(File)) != -1) { // If current buffer too small, resize if (Size > Map[Name].Buffer.size()) Map[Name].Buffer.resize(Size); // Read next pointer fread(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Skip format string while (fgetc(File) != 0 && feof(File) == 0) {} // Read buffer fread(&Map[Name].Buffer[0], sizeof(char), Size, File); if (ferror(File) != 0) { Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str()); Map[Name].Buffer.clear(); } if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());; } // If no buffer loaded, allocate empty buffer if (Map[Name].Buffer.empty()) { Map[Name].Buffer.resize(MIN_SIZE_KB*1024); memset(&Map[Name].Buffer[0], 0, Map[Name].Buffer.size()); *(int *) &Map[Name].Buffer[0] = 4; Map[Name].Next = 4; } // Subscribe to service Map[Name].DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this); } // // Remove service from watch list // void History::RemoveService(string Name) { // Check if actually subscribed to service if (Map.count(Name) == 0) return; // Delete subscription first so handler and not called anymore delete Map[Name].DataItem; // Save history buffer FILE *File = OpenFile(Name, "wb"); if (File != NULL) { fwrite(&Map[Name].Next, sizeof(Map[Name].Next), 1, File); // Next pointer fwrite(Map[Name].Format.c_str(), Map[Name].Format.size()+1, 1, File); // Format fwrite(&Map[Name].Buffer[0], sizeof(char), Map[Name].Buffer.size(), File); // Buffer // If error, try to delete (possibly erroneous) file if (ferror(File) != 0) { if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s' in RemoveService(), could also not delete file", Name.c_str()); else Message(WARN, "Error writing history file '%s' in RemoveService(), deleted file", Name.c_str()); } if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in RemoveService()", Name.c_str());; } Map.erase(Name); } // // Determine size of file in kB // off_t History::FileSize(FILE *File) { struct stat FileStatus; if (fstat(fileno(File), &FileStatus) == -1) { Message(WARN, "Could not determine size of file (%s)", strerror(errno)); return -1; } return FileStatus.st_size; } // // Open file for service history // FILE *History::OpenFile(string Service, const char *Mode) { // Create directory if not yet existing if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL; // Replace all '/' and non-graphical characters by '_' in string and open file for (int i=0; i\n", argv[0]); exit(EXIT_FAILURE); } // Static ensures calling of destructor by exit() static History Hist(argv[1]); // Sleep until signal caught while (!Hist.ExitRequest) pause(); }