Changeset 142 for Evidence/DColl


Ignore:
Timestamp:
01/13/10 12:47:33 (15 years ago)
Author:
ogrimm
Message:
Various updates
File:
1 edited

Legend:

Unmodified
Added
Removed
  • Evidence/DColl/DColl.cc

    r127 r142  
    55  - DColl subscribes to all services given by the configuration
    66    server and writes these to the data file at every update.
    7   - For each service, it creates a new service with '.hist' appended that
    8     contains a history of events kept within a ring buffer. Each entry will
    9         be the result of a conversion to double of the text written to the data file.
     7  - One data file per day is generated, with roll-over at 13:00 local time.
     8  - For each service, it creates a new service with '.hist' appended
     9        that contains a history of events kept within a ring buffer. Each
     10        entry will be the result of a conversion to double of the text
     11        written to the data file. Only if the new value has changed by a
     12        minimum amout it will be added to the ring buffer.
    1013  - The command 'DColl/Log' writes the associated string to the log
    1114    file specified in the configuration.
     
    1720#define SERVER_NAME "DColl"
    1821
     22#define DATE_ROLLOVER 12 // localtime hour after which next date is used
     23
    1924#include "../Evidence.h"
    20 
    21 #define NO_LINK "__&DIM&NOLINK&__" // for checking if DIMserver is alive
     25#include <math.h>
     26#include <float.h>
     27#include <sys/stat.h>
     28#include <ctype.h>
     29#include <sys/types.h>
     30#include <regex.h>
    2231
    2332//
    2433// Class declaration
    2534//
    26 class DataHandler:      public DimClient, public DimCommand,
    27                                         public DimBrowser, public EvidenceServer {
    28   private:
    29     pthread_mutex_t DataMutex;
    30     pthread_mutex_t LogMutex;
    31 
     35class DataHandler:      public DimClient, public DimBrowser,
     36                                        public EvidenceServer {
     37
     38        struct Item {
     39          DimStampedInfo *DataItem;
     40      DimService *HistService;
     41          struct EvidenceHistoryItem *HistBuffer;
     42          int HistPointer;
     43          double LastValue;
     44          double MinAbsChange;
     45        } *List;
     46
     47        DimCommand *LogCommand;
     48               
    3249    unsigned int NumItems;
    3350    FILE *DataFile;
    3451    FILE *LogFile;
    35     char *Filename;
    36 
    37     DimStampedInfo **DataItem;
    38     DimService **HistService;       
     52        float DataSizekB, LogSizekB;
     53        int DataSizeLastUpdate, LogSizeLastUpdate;
     54        char *DataDir;
     55    DimService *LogSizeService, *DataSizeService;
    3956    int HistSize;
    40     struct EvidenceHistoryItem **HistBuffer;
    41         int *HistPointer;
    42        
     57        int SizeUpdateDelay;
     58        int TimeForNextFile;
     59       
     60        int RegExCount;
     61        regex_t *RegEx;
     62        double *MinChange;
     63       
    4364    void infoHandler();
    4465    void commandHandler();
    45     bool ReallocMem(void *&, int);
     66        void AddService(char *);
     67        float FileSize(FILE *);
    4668           
    4769  public:
     
    5375// Constructor
    5476//
    55 DataHandler::DataHandler(): DimCommand((char *) "DColl/Log", (char *) "C"), EvidenceServer(SERVER_NAME) {
     77DataHandler::DataHandler(): EvidenceServer(SERVER_NAME) {
    5678
    5779  // Initialization to prevent freeing unallocated memory
    5880  DataFile = NULL;
    59   Filename = NULL;
    60 
    61   DataItem = NULL;
    62   HistService = NULL;
    63   HistBuffer = NULL;
    64   HistPointer = NULL;
    65  
     81  LogFile = NULL;
     82  List = NULL;
     83  LogSizeService = NULL;
     84  DataSizeService = NULL;
     85 
     86  DataSizeLastUpdate = 0;
     87  LogSizeLastUpdate = 0;
     88  TimeForNextFile = 0;
     89
    6690  // Request configuration data
    67   char *Items = GetConfig(SERVER_NAME " items");
    68   char *DataDir = GetConfig(SERVER_NAME " datadir");
     91  char *Change = GetConfig(SERVER_NAME " minchange");
     92  DataDir = GetConfig(SERVER_NAME " datadir");
    6993  char *Logname = GetConfig(SERVER_NAME " logfile");
     94  SizeUpdateDelay = atoi(GetConfig(SERVER_NAME " sizeupdate"));
    7095  HistSize = atoi(GetConfig(SERVER_NAME " histsize"));
    7196  if (HistSize < 1) HistSize = 1; // Minimum one items
    7297   
    73   // Create mutex for thread synchronization
    74   if ((errno=pthread_mutex_init(&DataMutex, NULL)) != 0) {
    75     Msg(FATAL, "pthread_mutex_init() failed for data file (%s)", strerror(errno));
    76   }
    77   if ((errno=pthread_mutex_init(&LogMutex, NULL)) != 0) {
    78     Msg(FATAL, "pthread_mutex_init() failed for log file (%s)", strerror(errno));
    79   }
    80 
    81   // Interpret DIM services to observe and prepare history buffer
    82   char *Buf, *ServiceName, *Format;
    83   int NumServices;
    84   NumItems = 0;
    85   char *NextToken = strtok(Items, " \t");
    86   while (NextToken != NULL) {
    87     NumServices = getServices(NextToken);
    88     for (int j=0; j<NumServices; j++)  {
    89           if (getNextService(ServiceName, Format) == DimSERVICE) {
    90 
    91             // Check if already subsubed to this service
    92                 for (int i=0; i<NumItems; i++) {
    93                   if(strcmp(ServiceName, DataItem[i]->getName()) == 0) continue;
    94                 }
    95                
    96             // Increase capacity of arrays
    97             if (!ReallocMem((void *&) DataItem, NumItems+1)) continue;
    98             if (!ReallocMem((void *&) HistService, NumItems+1)) continue;
    99             if (!ReallocMem((void *&) HistBuffer, NumItems+1)) continue;
    100             if (!ReallocMem((void *&) HistPointer, NumItems+1)) continue;
    101 
    102                 // Subscribe to service
    103         DataItem[NumItems] = new DimStampedInfo(ServiceName, (char *) NO_LINK, this);
    104 
    105         // Create history service
    106         HistBuffer[NumItems] = new struct EvidenceHistoryItem [HistSize];
    107                 memset(HistBuffer[NumItems], 0, HistSize*sizeof(EvidenceHistoryItem));
    108                 HistPointer[NumItems] = 0;
    109 
    110         if (asprintf(&Buf, "%s.hist", ServiceName) == -1) {
    111           Msg(ERROR, "Could not create Hist service for %s because asprintf() failed", ServiceName);
    112         }
    113         else {
    114                   HistService[NumItems] = new DimService (Buf, (char *) "C",
    115                                                                 HistBuffer[NumItems], HistSize*sizeof(EvidenceHistoryItem));
    116           free(Buf);
    117                 }
    118                 NumItems++;
    119           }
    120         }
    121     NextToken = strtok(NULL, " \t");
    122   }
    123 
    124   // Open data file
    125   time_t rawtime = time(NULL);
    126   struct tm * timeinfo = gmtime(&rawtime);
    127   if(timeinfo->tm_hour >= 13) rawtime += 12*60*60;
    128   timeinfo = gmtime(&rawtime);
    129 
    130   if (asprintf(&Filename, "%s/%d%02d%02d.slow", DataDir, timeinfo->tm_year+1900, timeinfo->tm_mon+1, timeinfo->tm_mday) == -1) {
    131     Filename = NULL;
    132     Msg(FATAL, "Could not create filename with asprintf()(%s)", strerror(errno));
    133   }
    134   if ((DataFile = fopen(Filename, "a")) == NULL) {
    135     Msg(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
    136   }
    137  
    13898  // Open log file
    13999  if ((LogFile = fopen(Logname, "a")) == NULL) {
    140     Msg(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
    141   }
    142 
     100    State(FATAL, "Could not open log file '%s' (%s)", Logname, strerror(errno));
     101  }
     102
     103  // Provide logging command   
     104  LogCommand = new DimCommand("DColl/Log", (char *) "C", this);
     105             
     106  // Create services for file sizes
     107  DataSizekB = 0;
     108  DataSizeService = new DimService(SERVER_NAME "/DataSizekB", DataSizekB);
     109 
     110  LogSizekB = FileSize(LogFile);
     111  LogSizeService = new DimService(SERVER_NAME "/LogSizekB", LogSizekB);
     112
     113  // Count how many minimum change value regular expressions are present
     114  RegExCount = 0;
     115  char *Token = strtok(Change, "\t ");
     116  while (Token != NULL) {
     117        RegExCount++;
     118        Token = strtok(NULL, "\t ");
     119  }
     120 
     121  // Allocate memory for regular expressions and minimum change values
     122  RegEx = new regex_t[RegExCount];
     123  MinChange = new double [RegExCount];
     124
     125  // Compile regular expressions
     126  int Pos = 0;
     127  for (int i=0; i<RegExCount; i++) {
     128    int Len = strlen(Change+Pos) + 1;
     129    Token = strtok(Change + Pos, ": \t");
     130
     131    int Ret = regcomp(&RegEx[i], Token, REG_EXTENDED|REG_NOSUB);
     132        if (Ret != 0) {
     133          char ErrMsg[200];
     134          regerror(Ret, &RegEx[i], ErrMsg, sizeof(ErrMsg));
     135          State(WARN, "Error compiling regular expression '%s' (%s)", Token, ErrMsg);
     136        }
     137        else {
     138          if ((Token=strtok(NULL, "")) != NULL) MinChange[i] = atof(Token);
     139          else MinChange[i] = 0;
     140        }
     141        Pos += Len;
     142  }
     143
     144  // Subscribe to list of servers at DIS_DNS
     145  AddService((char *) "DIS_DNS/SERVER_LIST");
     146
     147  DimClient::sendCommand("DColl/Log", SERVER_NAME" *** Logging started ***");
    143148}
    144149
     
    148153DataHandler::~DataHandler() {
    149154
     155  // Delete DIM services and command first so handlers and not called anymore
     156  for (int i=0; i<NumItems; i++) {
     157        delete List[i].HistService;
     158    delete List[i].DataItem;
     159    delete[] List[i].HistBuffer;
     160  }
     161  free(List);
     162
     163  //delete LogSizeService; // These create segmentation faults?!
     164  //delete DataSizeService;
     165
     166  delete LogCommand;
     167
    150168  // Close files
    151   if (LogFile != NULL && fclose(LogFile) != 0) {
    152         Msg(ERROR, "Could not close log file (%s)", strerror(errno));
     169  if (LogFile != NULL) if (fclose(LogFile) != 0) {
     170        State(ERROR, "Could not close log file (%s)", strerror(errno));
    153171  }
    154172  if (DataFile != NULL && fclose(DataFile) != 0) {
    155         Msg(ERROR, "Error: Could not close data file (%s)", strerror(errno));
    156   }     
    157 
    158   // Free all memory
    159   for (int i=0; i<NumItems; i++) {
    160     delete[] HistBuffer[i];
    161     delete DataItem[i];
    162   }
    163 
    164   free(Filename);
    165   free(HistService);
    166   free(HistPointer);
    167   free(HistBuffer);
    168   free(DataItem);
    169  
    170   // Destroy mutex
    171   pthread_mutex_destroy (&LogMutex);
    172   pthread_mutex_destroy (&DataMutex);
     173        State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
     174  }
     175
     176  // Free memory for regular expressions handling
     177  for (int i=0; i<RegExCount; i++) {
     178    regfree(&RegEx[i]);
     179  }
     180  delete[] MinChange;
     181  delete[] RegEx;
    173182}
    174183
     
    176185// Implementation of data handling
    177186//
    178 // More than one infoHandler() might run in parallel, therefore
    179 // the mutex mechanism is used to serialize writing to the file
     187// DIM ensures infoHandler() is called serialized, therefore
     188// no mutex is needed to serialize writing to the file
    180189void DataHandler::infoHandler() {
    181  
     190
    182191  DimInfo *Info = getInfo();
    183192
    184   // Check if service actually available, if it contains data and if data file is open
     193  // Check if service available
    185194  if (Info->getSize()==strlen(NO_LINK)+1 && strcmp(Info->getString(), NO_LINK)==0) return;
    186   if (Info->getSize() == 0 || DataFile == NULL) return;
    187 
     195
     196  // If service is DIS_DNS/SERVER_LIST, subscribe to all SERVICE_LIST services
     197  if (strcmp(Info->getName(), "DIS_DNS/SERVER_LIST") == 0) {   
     198        char *Token = strtok(Info->getString(), "+-!|@");       
     199        while (Token != NULL) {
     200          char *Buf;
     201          if (MakeString(&Buf, "%s/SERVICE_LIST", Token) != -1) {
     202            AddService(Buf);
     203                free(Buf);
     204          }
     205          else State(ERROR, "MakeString() failed for server %s", Token);
     206         
     207          Token = strtok(NULL, "|");
     208          Token = strtok(NULL, "+-!|@");       
     209        }       
     210        return;
     211  }
     212
     213  // If service is SERVICE_LIST of any server, scan all services.
     214  // Subscribe to all services (but not to commands and RPCs)
     215  if (strstr(Info->getName(), "/SERVICE_LIST") != NULL) {
     216        char *Name = strtok(Info->getString(), "+-!|");
     217        while (Name != NULL) {
     218          char *Type = strtok(NULL, "\n");
     219          if (Type == NULL) return; // for safety, should not happen
     220      if (strstr(Type, "|CMD")==NULL && strstr(Type, "|RPC")==NULL) {
     221                AddService(Name);
     222          }
     223          Name = strtok(NULL, "+-!|");
     224        }
     225        return;
     226  }
     227
     228  // If it is time to open new data file, close the current one
     229  if (time(NULL) >= TimeForNextFile) {
     230        if (DataFile != NULL && fclose(DataFile) != 0) {
     231          State(ERROR, "Error: Could not close data file (%s)", strerror(errno));
     232        }
     233        DataFile = NULL;
     234  }
     235 
     236  // Open new data file if necessary
     237  if (DataFile == NULL) {
     238        time_t Time = time(NULL);
     239        struct tm *T = localtime(&Time);
     240       
     241        if(T->tm_hour >= DATE_ROLLOVER) T->tm_mday++;
     242        if (mktime(T) == -1) State(ERROR, "mktime() failed, check filename");
     243
     244        char *Filename;
     245        if (MakeString(&Filename, "%s/%d%02d%02d.slow", DataDir, T->tm_year+1900, T->tm_mon+1, T->tm_mday) == -1) State(FATAL, "Could not create filename, MakeString() failed");
     246        if ((DataFile = fopen(Filename, "a")) == NULL) {
     247      State(FATAL, "Could not open data file '%s' (%s)", Filename, strerror(errno));
     248        }
     249        else State(INFO, "Opened data file '%s'", Filename);
     250        free(Filename);
     251       
     252        // Calculate time for next file opening
     253        T->tm_sec = 0;
     254        T->tm_min = 0;
     255        T->tm_hour = DATE_ROLLOVER;
     256        TimeForNextFile = mktime(T);
     257  }
     258   
    188259  // Identify index of service
    189260  int Service; 
    190   for (Service=0; Service<NumItems; Service++) if (Info == DataItem[Service]) break;
    191   if (Service == NumItems) return;  // Not found: should never happen
    192 
    193   pthread_mutex_lock(&DataMutex);
    194  
     261  for (Service=0; Service<NumItems; Service++) if (Info == List[Service].DataItem) break;
     262  if (Service == NumItems) return;  // Service not found
     263
     264  // If negative value for absolute change, ignore this entry
     265  if (List[Service].MinAbsChange < 0) return;
     266
    195267  // Write data header
    196268  time_t RawTime = Info->getTimestamp();
     
    199271  fprintf(DataFile, "%s %d %d %d %d %d %d %d %lu ", Info->getName(), TM->tm_year+1900, TM->tm_mon+1, TM->tm_mday, TM->tm_hour, TM->tm_min, TM->tm_sec, Info->getTimestampMillisecs(), Info->getTimestamp());
    200272
    201   // Translate info data into ASCII and write to file and to history buffer
     273  // Translate data into ASCII
    202274  char *Text = EvidenceServer::ToString(Info);
    203275  if (Text != NULL) {
     276        // Replace all control characters by white space
     277        for (int i=0; i<strlen(Text); i++) if (iscntrl(Text[i])) Text[i] = ' ';
     278       
     279        // Write to file
    204280    fprintf(DataFile, "%s\n", Text);
    205281       
    206         HistBuffer[Service][HistPointer[Service]].Seconds = Info->getTimestamp();
    207     HistBuffer[Service][HistPointer[Service]].Value = atof(Text);
    208     HistService[Service]->updateService();
    209         HistPointer[Service]++;
    210         if (HistPointer[Service] >= HistSize) HistPointer[Service] = 0;
    211 
     282        // Add to history buffer if change large enough
     283        if ((fabs(atof(Text)-List[Service].LastValue) > List[Service].MinAbsChange)) {
     284          List[Service].HistBuffer[List[Service].HistPointer].Seconds = Info->getTimestamp();
     285      List[Service].HistBuffer[List[Service].HistPointer].Value = atof(Text);
     286      List[Service].HistService->updateService();
     287          List[Service].HistPointer++;
     288          if (List[Service].HistPointer >= HistSize) List[Service].HistPointer = 0;
     289          List[Service].LastValue = atof(Text);
     290        }
    212291        free(Text);
    213292  }
     
    215294 
    216295  fflush(DataFile);
    217   if(ferror(DataFile)) Msg(ERROR, "Error writing to data file (%s)", strerror(errno));
    218 
    219   pthread_mutex_unlock(&DataMutex);
    220 }
    221 
    222 //
    223 // Implementation of log writing (the only command is 'DColl/Log')
     296
     297  // Terminate if error because otherwise infinite loop might result as
     298  // next call to this infoHandler() will try to (re-)open file
     299  if(ferror(DataFile)) {
     300    fclose(DataFile);
     301        DataFile = NULL;
     302        State(FATAL, "Error writing to data file, closed file (%s)", strerror(errno));
     303  }
     304
     305  // Update datafile size service
     306  if (time(NULL) - DataSizeLastUpdate > SizeUpdateDelay) {
     307        DataSizekB = FileSize(DataFile);
     308        DataSizeService->updateService();
     309        DataSizeLastUpdate = time(NULL);
     310  }
     311}
     312
     313//
     314// Implementation of log writing
    224315//
    225316void DataHandler::commandHandler() {
    226 
    227   if (LogFile == NULL) return;  // Handler might be called before file is opened
    228317 
    229   pthread_mutex_lock(&LogMutex);
    230 
     318  if (getCommand() != LogCommand || LogFile == NULL) return;
     319
     320  // Replace all carriage-return by line feed and non-printable characters
     321  char *Text = getCommand()->getString();
     322  for (unsigned int i=0; i<strlen(Text); i++) {
     323    if (Text[i] == '\r') Text[i] = '\n';
     324        if(isprint(Text[i])==0 && isspace(Text[i])==0) Text[i] = ' ';
     325  }
     326 
    231327  time_t RawTime = time(NULL);
    232328  struct tm *TM = localtime(&RawTime);
     
    234330  fprintf(LogFile, "%d/%d/%d %d:%d:%d: %s\n",
    235331                TM->tm_mday, TM->tm_mon+1, TM->tm_year+1900,
    236                 TM->tm_hour, TM->tm_min, TM->tm_sec, getString());
     332                TM->tm_hour, TM->tm_min, TM->tm_sec, Text);
    237333
    238334  fflush(LogFile);
    239   if(ferror(LogFile)) Msg(ERROR, "Error writing to log file (%s)", strerror(errno));
    240   pthread_mutex_unlock(&LogMutex);
    241 }
    242 
    243 
    244 //
    245 // Implementation of memory re-allocation for pointer arrays
    246 //
    247 bool DataHandler::ReallocMem(void *&Memory, int Size) {
    248 
    249   void *NewMem = realloc(Memory, Size*sizeof(void *));
    250 
    251   if (NewMem != NULL) {
    252     Memory = NewMem;
    253         return true;
    254   }
     335 
     336  // If error close file (otherwise infinite loop because State() also writes to log)
     337  if(ferror(LogFile)) {
     338    fclose(LogFile);
     339        LogFile = NULL;
     340    State(ERROR, "Error writing to log file, closing file (%s)", strerror(errno));
     341  }
     342   
     343  // Update logfile size service
     344  if (time(NULL) - LogSizeLastUpdate > SizeUpdateDelay) {
     345        LogSizekB = FileSize(LogFile);
     346        LogSizeService->updateService();
     347        LogSizeLastUpdate = time(NULL);
     348  }
     349}
     350
     351
     352//
     353// Add service to watch list
     354//
     355void DataHandler::AddService(char *Name) {
     356
     357  // Do not subscribe to history services (otherwise infinite loop)
     358  if (strstr(Name, ".hist") != NULL) return;
     359
     360  // Check if already subscribed to this service
     361  for (int i=0; i<NumItems; i++) {
     362        if(strcmp(Name, List[i].DataItem->getName()) == 0) return;
     363  }
     364
     365  // Increase capacity of item list                     
     366  struct Item *New = (struct Item *) realloc(List, (NumItems+1)*sizeof(struct Item));
     367  if (New != NULL) List = New;
    255368  else {
    256         Msg(ERROR, "Could not allocate memory ()", strerror(errno));
    257         return false;
    258   }
     369        State(ERROR, "Could not allocate memory for item list, service '' not added (%s)", Name, strerror(errno));
     370        return;
     371  }
     372 
     373  // Set minimum required change by comparing to regular expressions
     374  List[NumItems].MinAbsChange = 0;
     375  for (int i=0; i<RegExCount; i++) {
     376    if (regexec(&RegEx[i], Name, (size_t) 0, NULL, 0) == 0) {
     377          List[NumItems].MinAbsChange = MinChange[i];
     378        }
     379  }
     380 
     381  List[NumItems].LastValue = DBL_MAX;
     382               
     383  // Create history service
     384  List[NumItems].HistBuffer = new struct EvidenceHistoryItem [HistSize];
     385  memset(List[NumItems].HistBuffer, 0, HistSize*sizeof(EvidenceHistoryItem));
     386  List[NumItems].HistPointer = 0;
     387
     388  char *Buf;
     389  if (MakeString(&Buf, "%s.hist", Name) == -1) {
     390    State(ERROR, "Could not create history service for '%s', MakeString() failed", Name);
     391  }
     392  else {
     393        List[NumItems].HistService = new DimService (Buf, (char *) "C",
     394                                          List[NumItems].HistBuffer, HistSize*sizeof(EvidenceHistoryItem));
     395    free(Buf);
     396  }
     397
     398  // Subscribe to service
     399  List[NumItems].DataItem = new DimStampedInfo(Name, NO_LINK, this);
     400
     401  // Increase number only after all set-up
     402  NumItems++;
     403}
     404
     405//
     406// Determine size of file in kB
     407//
     408float DataHandler::FileSize(FILE *File) {
     409
     410  struct stat FileStatus;
     411
     412  if (fstat(fileno(File), &FileStatus) == -1) {
     413     State(WARN, "Could not determine size of file (%s)", strerror(errno));
     414         return -1;
     415  }
     416
     417  return (float) FileStatus.st_size/1024;
    259418}
    260419
     
    267426  static DataHandler Data;
    268427 
    269   pause();        // Sleep until signal caught
    270 }
     428  // Sleep until signal caught
     429  pause();
     430}
Note: See TracChangeset for help on using the changeset viewer.