/********************************************************************\ History server of the Evidence Control System - Subscribes to all services and keeps a ring buffer for each service. - Data added to the buffer only if changed by minimum amount given by configuration information. - The history is available via an rpc call. - The history buffers are written to disk at program termination and are tried to be read when adding a service. - The buffer should hold at least REQUEST_NUM entries (of the size currently handled in infoHandler()), but should not be larger than MAX_SIZE_KB. Oliver Grimm, June 2010 \********************************************************************/ #define SERVER_NAME "History" #include "Evidence.h" const int MIN_SIZE_KB = 50; // Minimum and maximum history buffer in kByte (> 3*sizeof(int) !) const int MAX_SIZE_KB = 2000; const int REQUEST_NUM = 1000; // Requested number of entries in each history buffer #include #include #include #include #include using namespace std; // // Class declaration // class History: public DimRpc, public DimClient, public EvidenceServer { struct Item { DimStampedInfo *DataItem; vector Buffer; int Next; double LastValue; double MinAbsChange; string Format; }; vector List; DimInfo *ServerList; char *Directory; char *Change; void infoHandler(); void rpcHandler(); void AddService(string, const char *); void RemoveService(string); off_t FileSize(FILE *); FILE *OpenFile(string, const char *); public: History(char *); ~History(); }; // Constructor History::History(char *Dir): DimRpc("ServiceHistory", "C", "C"), EvidenceServer(SERVER_NAME), Directory(Dir) { // List of minimum required change for addition to history buffer Change = GetConfig("minchange", " "); // Subscribe to top-level server list ServerList = new DimInfo((char *) "DIS_DNS/SERVER_LIST", NO_LINK, this); } // Destructor deletes all DIM subscriptions History::~History() { delete ServerList; while (List.size() != 0) RemoveService(List[0].DataItem->getName()); } // Implementation of data handling void History::infoHandler() { DimInfo *I = getInfo(); // Check if service available if (!ServiceOK(I)) return; // ====== Part A: Handle service subscriptions === // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services if (strcmp(I->getName(), "DIS_DNS/SERVER_LIST") == 0) { char *Token = strtok(I->getString(), "+-!@"); while (Token != NULL) { AddService(string(Token)+"/SERVICE_LIST", "C"); // 'add' also for '-' and '!' Token = strtok(NULL, "|"); // Skip server IP address Token = strtok(NULL, "@"); } return; } // If service is SERVICE_LIST, scan and subscribe/unsubscribe to services if (strstr(I->getName(), "/SERVICE_LIST") != NULL) { char *Type, *Name = strtok(I->getString(), "+-!|"); while (Name != NULL) { // Only consider DIM services (not commands and RPCs) if (((Type = strtok(NULL, "\n")) != NULL) && (strstr(Type, "|CMD") == NULL) && (strstr(Type, "|RPC") == NULL)) { if (*I->getString() == '-' || *I->getString() == '!') RemoveService(Name); else { Type[strlen(Type)-1] = '\0'; // Isolate service format AddService(Name, Type); } } Name = strtok(NULL, "|"); } return; } // ====== Part B: Handle history service === // Identify index of service int Service; for (Service=0; ServicegetSize()==0 || I->getTimestamp()==0) return; // Resize buffer if necessary if (List[Service].Buffer.size() < REQUEST_NUM*I->getSize()) { if (REQUEST_NUM*I->getSize() < MAX_SIZE_KB*1024) List[Service].Buffer.resize(REQUEST_NUM*I->getSize()); } // If data is number of single type, a minumum change might be requested before addind to history if (strcmp(I->getFormat(),"C") != 0 && strlen(I->getFormat())==1) { // Calculate sum of all number in array char *Text = EvidenceServer::ToString(I); char *Token = strtok(Text, " "); double Sum = 0; while (Token != NULL) { Sum += atof(Token); Token = strtok(NULL, " "); } free(Text); // Minimum change? if (fabs(Sum-List[Service].LastValue) < fabs(List[Service].MinAbsChange)) return; List[Service].LastValue = Sum; } // Check if data fits into buffer if (List[Service].Buffer.size() < I->getSize() + sizeof(int)+ 2*sizeof(EvidenceHistory::Item)) return; int Size = I->getSize() + 2*sizeof(EvidenceHistory::Item), Next = List[Service].Next; void *WrapPos = NULL; char *Buffer = &List[Service].Buffer[0]; int Oldest = *(int *) Buffer; // Check if buffer wrap-around (write wrap mark after Oldest is adjusted) if (Next + Size >= List[Service].Buffer.size()) { WrapPos = Buffer + Next; Next = sizeof(int); } // Adapt pointer to oldest entry while ((Oldest < Next + Size) && (Oldest + *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int) > Next)) { // Check for wrap-around if (memcmp(Buffer + Oldest, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)) == 0) { Oldest = sizeof(int); continue; } // Check if end marker reached, then only one event fits buffer if (memcmp(Buffer + Oldest, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)) == 0) { Oldest = Next; break; } // Move to next entry Oldest += *((int *) (Buffer + Oldest) + 1) + 2*sizeof(int); } // Update pointer in buffer *(int *) Buffer = Oldest; // Write wrap mark if necessary if (WrapPos != NULL) memcpy(WrapPos, &EvidenceHistory::WrapMark, sizeof(EvidenceHistory::WrapMark)); // Copy data into ring buffer *((int *) (Buffer + Next)) = I->getTimestamp(); *((int *) (Buffer + Next + sizeof(int))) = I->getSize(); memcpy(Buffer + Next + 2*sizeof(int), I->getData(), I->getSize()); // Adjust pointer for next entry and write end marker to buffer Next += I->getSize() + sizeof(EvidenceHistory::Item); memcpy(Buffer + Next, &EvidenceHistory::EndMark, sizeof(EvidenceHistory::EndMark)); List[Service].Next = Next; } // Implementation of history buffer distribution void History::rpcHandler() { // Search for history buffer for (int i=0; igetName(), getString()) == 0) { char *Buffer = new char [List[i].Format.size()+1+List[i].Buffer.size()]; strcpy(Buffer, List[i].Format.c_str()); memcpy(Buffer+List[i].Format.size()+1, &List[i].Buffer[0], List[i].Buffer.size()); // setData((void *) &List[i].Buffer[0], List[i].Buffer.size()); setData((void *) Buffer, List[i].Format.size()+1+List[i].Buffer.size()); delete[] Buffer; return; } } // Try to open history file if not found in memory FILE *File = OpenFile(getString(), "rb"); if (File == NULL) { setData(NULL, 0); return; } // Read history file and send to client (data will contain format string and history) off_t Size = FileSize(File); if (Size != -1) { char *Buffer = new char [Size-sizeof(int)]; fseek(File, sizeof(int), SEEK_SET); // Skip 'Next' pointer if ((fread(Buffer, sizeof(char), Size-sizeof(int), File) != Size-sizeof(int)) || (ferror(File) != 0)) { Message(WARN, "Error reading history file '%s' in rpcHandler()", getString()); setData(NULL, 0); // Default response } else setData((void *) Buffer, Size); delete[] Buffer; } if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in rpcHandler()", getString()); } // // Add service to watch list // void History::AddService(string Name, const char *Format) { // Check if already subscribed to this service for (int i=0; igetName()) return; } struct Item New; New.LastValue = DBL_MAX; New.Format = Format; // Set minimum required change if given in configuratrion char *Pnt = strstr(Change, Name.c_str()); if (Pnt != NULL && *(Pnt+Name.size()) == ':') New.MinAbsChange = atof(Pnt+Name.size()+1); else New.MinAbsChange = 0; // Load history buffer from file if existing FILE *File = OpenFile(Name, "rb"); off_t Size; if (File != NULL && (Size = FileSize(File)) != -1) { // If current buffer too small, resize if (Size > New.Buffer.size()) New.Buffer.resize(Size); // Read next pointer fread(&New.Next, sizeof(New.Next), 1, File); // Skip format string while (fgetc(File) != 0 && feof(File) == 0) {} // Read buffer fread(&New.Buffer[0], sizeof(char), Size, File); if (ferror(File) != 0) { Message(WARN, "Error reading history file '%s' in AddService()", Name.c_str()); New.Buffer.clear(); } if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in AddService()", Name.c_str());; } // If no buffer loaded, allocate empty buffer if (New.Buffer.empty()) { New.Buffer.resize(MIN_SIZE_KB*1024); memset(&New.Buffer[0], 0, New.Buffer.size()); *(int *) &New.Buffer[0] = 4; New.Next = 4; } // Subscribe to service New.DataItem = new DimStampedInfo(Name.c_str(), NO_LINK, this); List.push_back(New); } // // Remove service from watch list // void History::RemoveService(string Name) { // Find service index vector::iterator E; for (E=List.begin(); EgetName()) { // Delete subscription first so handler and not called anymore delete (*E).DataItem; // Save history buffer FILE *File = OpenFile(Name, "wb"); if (File != NULL) { fwrite(&(*E).Next, sizeof((*E).Next), 1, File); // Next pointer fwrite((*E).Format.c_str(), (*E).Format.size()+1, 1, File); // Format fwrite(&(*E).Buffer[0], sizeof(char), (*E).Buffer.size(), File); // Buffer // If error, try to delete (possibly erroneous) file if (ferror(File) != 0) { if (remove(Name.c_str()) == -1) Message(WARN, "Error writing history file '%s' in RemoveService(), could also not delete file", Name.c_str()); else Message(WARN, "Error writing history file '%s' in RemoveService(), deleted file", Name.c_str()); } if (fclose(File) != 0) Message(WARN, "Error closing history file '%s' in RemoveService()", Name.c_str());; } List.erase(E); } } // // Determine size of file in kB // off_t History::FileSize(FILE *File) { struct stat FileStatus; if (fstat(fileno(File), &FileStatus) == -1) { Message(WARN, "Could not determine size of file (%s)", strerror(errno)); return -1; } return FileStatus.st_size; } // // Open file for service history // FILE *History::OpenFile(string Service, const char *Mode) { // Create directory if not yet existing if(mkdir(Directory, S_IRWXU|S_IRWXG)==-1 && errno!=EEXIST) return NULL; // Replace all '/' and non-graphical characters by '_' in string and open file for (int i=0; i\n", argv[0]); exit(EXIT_FAILURE); } // Static ensures calling of destructor by exit() static History Hist(argv[1]); // Sleep until signal caught pause(); }