Changeset 11766 for trunk/FACT++/src
- Timestamp:
- 08/03/11 21:00:18 (13 years ago)
- Location:
- trunk/FACT++/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r11748 r11766 13 13 #include <errno.h> 14 14 #include <unistd.h> 15 #include <sys/types.h> 15 #include <sys/types.h> 16 16 #include <sys/socket.h> 17 17 #include <netinet/in.h> … … 24 24 enum Severity 25 25 { 26 kMessage = 10,///< Just a message, usually obsolete27 kInfo = 20,///< An info telling something which can be interesting to know28 kWarn = 30,///< A warning, things that somehow might result in unexpected or unwanted bahaviour29 kError = 40,///< Error, something unexpected happened, but can still be handled by the program30 kFatal = 50,///< An error which cannot be handled at all happend, the only solution is program termination31 kDebug = 99,///< A message used for debugging only26 kMessage = 10, ///< Just a message, usually obsolete 27 kInfo = 20, ///< An info telling something which can be interesting to know 28 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour 29 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program 30 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination 31 kDebug = 99, ///< A message used for debugging only 32 32 }; 33 33 34 #define MIN_LEN 32 // min #bytes needed to interpret FADheader35 #define MAX_LEN 256*1024 // size of read-buffer per socket34 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 35 #define MAX_LEN 256*1024 // size of read-buffer per socket 36 36 37 37 //#define nanosleep(x,y) 38 38 39 extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ; 40 extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ; 41 extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ; 42 extern void factOut(int severity, int err, char* message ) ; 43 extern void gotNewRun( int runnr, PEVNT_HEADER *headers ); 44 45 46 extern void factStat(GUI_STAT gj) ; 47 48 extern void factStatNew(EVT_STAT gi) ; 49 50 extern int eventCheck( uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event) ; 51 52 extern int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer) ; 53 54 extern void debugHead(int i, int j, void *buf); 55 56 extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt, 57 int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) ; 58 extern void debugStream(int isock, void *buf, int len) ; 59 60 int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt); 61 62 63 int g_maxProc ; 64 int g_maxSize ; 65 int gi_maxSize ; 66 int gi_maxProc ; 67 68 uint g_actTime ; 69 uint g_actUsec ; 70 int g_runStat ; 71 int g_reset ; 72 int g_useFTM ; 73 74 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX ; 75 size_t g_maxMem ; //maximum memory allowed for buffer 39 extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len); 40 extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len); 41 extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len); 42 //extern int runFinish (uint32_t runnr); 43 44 extern void factOut (int severity, int err, char *message); 45 46 extern void gotNewRun (int runnr, PEVNT_HEADER * headers); 47 48 49 extern void factStat (GUI_STAT gj); 50 51 extern void factStatNew (EVT_STAT gi); 52 53 extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event); 54 55 extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, 56 int8_t * buffer); 57 58 extern void debugHead (int i, int j, void *buf); 59 60 extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, 61 int32_t runnr, int state, uint32_t tsec, 62 uint32_t tusec); 63 extern void debugStream (int isock, void *buf, int len); 64 65 int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt); 66 67 68 int g_maxProc; 69 int g_maxSize; 70 int gi_maxSize; 71 int gi_maxProc; 72 73 uint g_actTime; 74 uint g_actUsec; 75 int g_runStat; 76 int g_reset; 77 int g_useFTM; 78 79 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; 80 size_t g_maxMem; //maximum memory allowed for buffer 76 81 77 82 //no longer needed ... 78 int g_maxBoards ;//maximum number of boards to be initialized79 int g_actBoards;83 int g_maxBoards; //maximum number of boards to be initialized 84 int g_actBoards; 80 85 // 81 86 82 FACT_SOCK g_port[NBOARDS] ;// .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"83 84 85 int gi_runStat;86 int gp_runStat;87 int gw_runStat;88 89 int gi_memStat = +1;90 91 uint32_t gi_myRun = 0;92 uint32_t actrun = 0;93 94 95 uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards87 FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" 88 89 90 int gi_runStat; 91 int gp_runStat; 92 int gw_runStat; 93 94 int gi_memStat = +1; 95 96 uint32_t gi_myRun = 0; 97 uint32_t actrun = 0; 98 99 100 uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards 96 101 97 102 //uint gi_EvtStart= 0 ; … … 104 109 //uint gp_EvtTot = 0 ; 105 110 106 PIX_MAP g_pixMap[NPIX] 107 108 EVT_STAT gi;109 GUI_STAT gj;110 111 EVT_CTRL evtCtrl ;//control of events during processing112 int evtIdx[MAX_EVT*MAX_RUN] ;//index from mBuffer to evtCtrl113 114 WRK_DATA mBuffer[MAX_EVT*MAX_RUN];//local working space115 116 117 118 119 RUN_HEAD actRun;120 121 RUN_CTRL runCtrl[MAX_RUN];122 123 RUN_TAIL runTail[MAX_RUN];111 PIX_MAP g_pixMap[NPIX]; 112 113 EVT_STAT gi; 114 GUI_STAT gj; 115 116 EVT_CTRL evtCtrl; //control of events during processing 117 int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl 118 119 WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space 120 121 122 123 124 RUN_HEAD actRun; 125 126 RUN_CTRL runCtrl[MAX_RUN]; 127 128 RUN_TAIL runTail[MAX_RUN]; 124 129 125 130 … … 129 134 130 135 131 typedef union { 132 int8_t B[MAX_LEN ]; 133 int16_t S[MAX_LEN/2]; 134 int32_t I[MAX_LEN/4]; 135 int64_t L[MAX_LEN/8]; 136 } CNV_FACT ; 137 138 typedef struct { 139 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ... 140 int32_t bufPos ; //next byte to read to the buffer next 141 int32_t bufLen ; //number of bytes left to read 142 int32_t skip ; //number of bytes skipped before start of event 143 144 int sockStat ; //-1 if socket not yet connected , 99 if not exist 145 int socket ; //contains the sockets 146 struct sockaddr_in SockAddr ; //IP for each socket 147 148 int evtID ; // event ID of event currently read 149 int runID ; // run " 150 int ftmID ; // event ID from FTM 151 uint fadLen ; // FADlength of event currently read 152 int fadVers ; // Version of FAD 153 int ftmTyp ; // trigger type 154 int board ; // boardID (softwareID: 0..40 ) 155 int Port ; 156 157 CNV_FACT *rBuf ; 136 typedef union 137 { 138 int8_t B[MAX_LEN]; 139 int16_t S[MAX_LEN / 2]; 140 int32_t I[MAX_LEN / 4]; 141 int64_t L[MAX_LEN / 8]; 142 } CNV_FACT; 143 144 typedef struct 145 { 146 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ... 147 int32_t bufPos; //next byte to read to the buffer next 148 int32_t bufLen; //number of bytes left to read 149 int32_t skip; //number of bytes skipped before start of event 150 151 int sockStat; //-1 if socket not yet connected , 99 if not exist 152 int socket; //contains the sockets 153 struct sockaddr_in SockAddr; //IP for each socket 154 155 int evtID; // event ID of event currently read 156 int runID; // run " 157 int ftmID; // event ID from FTM 158 uint fadLen; // FADlength of event currently read 159 int fadVers; // Version of FAD 160 int ftmTyp; // trigger type 161 int board; // boardID (softwareID: 0..40 ) 162 int Port; 163 164 CNV_FACT *rBuf; 158 165 159 166 #ifdef EVTDEBUG 160 CNV_FACT *xBuf ;//a copy of rBuf (temporary for debuging)167 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging) 161 168 #endif 162 169 163 } READ_STRUCT ; 164 165 166 typedef union { 167 int8_t B[2]; 168 int16_t S ; 169 } SHORT_BYTE ; 170 171 172 173 174 175 #define MXSTR 1000 176 char str[MXSTR] ; 177 178 SHORT_BYTE start, stop; 179 180 READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer 170 } READ_STRUCT; 171 172 173 typedef union 174 { 175 int8_t B[2]; 176 int16_t S; 177 } SHORT_BYTE; 178 179 180 181 182 183 #define MXSTR 1000 184 char str[MXSTR]; 185 186 SHORT_BYTE start, stop; 187 188 READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer 181 189 182 190 … … 187 195 /*-----------------------------------------------------------------*/ 188 196 189 190 191 int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) { 197 int 198 runFinish1 (uint32_t runnr) 199 { 200 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)", 201 runnr); 202 factOut (kInfo, 173, str); //but continue anyhow 203 return 0; 204 } 205 206 207 int 208 GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr, 209 READ_STRUCT * rd) 210 { 192 211 /* 193 212 *** generate Address, create sockets and allocates readbuffer for it … … 200 219 */ 201 220 202 int j;203 int optval = 1 ;//activate keepalive204 socklen_t optlen = sizeof(optval);205 206 if (rd->sockStat ==0 ) {//close socket if open207 j=close(rd->socket);208 if (j>0) {209 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);210 factOut(kFatal,771, str );211 } else {212 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);213 factOut(kInfo,771, str );214 }215 }216 217 218 if (flag < 0) {219 free(rd->rBuf) ;//and never open again221 int j; 222 int optval = 1; //activate keepalive 223 socklen_t optlen = sizeof (optval); 224 225 if (rd->sockStat == 0) { //close socket if open 226 j = close (rd->socket); 227 if (j > 0) { 228 snprintf (str, MXSTR, "Error closing socket %d | %m", sid); 229 factOut (kFatal, 771, str); 230 } else { 231 snprintf (str, MXSTR, "Succesfully closed socket %d", sid); 232 factOut (kInfo, 771, str); 233 } 234 } 235 236 237 if (flag < 0) { 238 free (rd->rBuf); //and never open again 220 239 #ifdef EVTDEBUG 221 free(rd->xBuf) ;//and never open again240 free (rd->xBuf); //and never open again 222 241 #endif 223 rd->rBuf = NULL ;224 rd->sockStat = 99 ;225 return 0;226 }227 228 229 if (flag == 0) {//generate address and buffer ...230 rd->Port = port;231 rd->SockAddr.sin_family = sockAddr->sin_family;232 rd->SockAddr.sin_port = htons(port);233 rd->SockAddr.sin_addr = sockAddr->sin_addr;242 rd->rBuf = NULL; 243 rd->sockStat = 99; 244 return 0; 245 } 246 247 248 if (flag == 0) { //generate address and buffer ... 249 rd->Port = port; 250 rd->SockAddr.sin_family = sockAddr->sin_family; 251 rd->SockAddr.sin_port = htons (port); 252 rd->SockAddr.sin_addr = sockAddr->sin_addr; 234 253 235 254 #ifdef EVTDEBUG 236 rd->xBuf = malloc(sizeof(CNV_FACT) );255 rd->xBuf = malloc (sizeof (CNV_FACT)); 237 256 #endif 238 rd->rBuf = malloc(sizeof(CNV_FACT) );239 if ( rd->rBuf == NULL) {240 snprintf(str,MXSTR,"Could not create local buffer %d",sid);241 factOut(kFatal,774, str );242 rd->sockStat = 77;243 return -3;244 }245 }246 247 248 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {249 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);250 factOut(kFatal,773, str );251 rd->sockStat = 88;252 return -2;253 }254 optval=1;255 if ( setsockopt(rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {256 snprintf(str,MXSTR,"Could not set keepalive %d | %m",sid);257 factOut(kInfo,173, str ) ;//but continue anyhow258 }259 optval=10;//start after 10 seconds260 if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {261 snprintf(str,MXSTR,"Could not set keepidle %d | %m",sid);262 factOut(kInfo,173, str ) ;//but continue anyhow263 }264 optval=10;//do every 10 seconds265 if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {266 snprintf(str,MXSTR,"Could not set keepintvl %d | %m",sid);267 factOut(kInfo,173, str ) ;//but continue anyhow268 }269 optval=2;//close after 2 unsuccessful tries270 if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {271 snprintf(str,MXSTR,"Could not set keepalive probes %d | %m",sid);272 factOut(kInfo,173, str ) ;//but continue anyhow273 }274 275 276 277 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);278 factOut(kInfo,773, str );279 rd->sockStat = -1 ;//try to (re)open socket280 return 0;257 rd->rBuf = malloc (sizeof (CNV_FACT)); 258 if (rd->rBuf == NULL) { 259 snprintf (str, MXSTR, "Could not create local buffer %d", sid); 260 factOut (kFatal, 774, str); 261 rd->sockStat = 77; 262 return -3; 263 } 264 } 265 266 267 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { 268 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid); 269 factOut (kFatal, 773, str); 270 rd->sockStat = 88; 271 return -2; 272 } 273 optval = 1; 274 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) { 275 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid); 276 factOut (kInfo, 173, str); //but continue anyhow 277 } 278 optval = 10; //start after 10 seconds 279 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) { 280 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid); 281 factOut (kInfo, 173, str); //but continue anyhow 282 } 283 optval = 10; //do every 10 seconds 284 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) { 285 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid); 286 factOut (kInfo, 173, str); //but continue anyhow 287 } 288 optval = 2; //close after 2 unsuccessful tries 289 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) { 290 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid); 291 factOut (kInfo, 173, str); //but continue anyhow 292 } 293 294 295 296 snprintf (str, MXSTR, "Successfully generated socket %d ", sid); 297 factOut (kInfo, 773, str); 298 rd->sockStat = -1; //try to (re)open socket 299 return 0; 281 300 282 301 } /*-----------------------------------------------------------------*/ … … 287 306 288 307 289 int mBufInit() { 308 int 309 mBufInit () 310 { 290 311 // initialize mBuffer (mark all entries as unused\empty) 291 312 292 int i 293 uint32_t actime 294 295 actime = g_actTime + 50000000 296 297 for (i =0; i<MAX_EVT*MAX_RUN; i++) {313 int i; 314 uint32_t actime; 315 316 actime = g_actTime + 50000000; 317 318 for (i = 0; i < MAX_EVT * MAX_RUN; i++) { 298 319 mBuffer[i].evNum = mBuffer[i].nRoi = -1; 299 mBuffer[i].runNum = 0;300 301 evtCtrl.evtBuf[ i] = -1;302 evtCtrl.evtStat[ i] = -1;303 evtCtrl.pcTime[ i] = actime ;//initiate to far future304 } 305 306 307 actRun.FADhead = malloc ( NBOARDS* sizeof(PEVNT_HEADER) );308 309 evtCtrl.frstPtr = 0 310 evtCtrl.lastPtr = 0 311 312 return 0 320 mBuffer[i].runNum = 0; 321 322 evtCtrl.evtBuf[i] = -1; 323 evtCtrl.evtStat[i] = -1; 324 evtCtrl.pcTime[i] = actime; //initiate to far future 325 } 326 327 328 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER)); 329 330 evtCtrl.frstPtr = 0; 331 evtCtrl.lastPtr = 0; 332 333 return 0; 313 334 314 335 } /*-----------------------------------------------------------------*/ … … 317 338 318 339 319 int mBufEvt( int evID, uint runID, int nRoi[], int sk, 320 int fadlen, int trgTyp, int trgNum, int fadNum) { 340 int 341 mBufEvt (int evID, uint runID, int nRoi[], int sk, 342 int fadlen, int trgTyp, int trgNum, int fadNum) 343 { 321 344 // generate a new Event into mBuffer: 322 345 // make sure only complete Event are possible, so 'free' will always work … … 327 350 // < 0 if no space left 328 351 329 struct timeval *tv, atv; 330 tv=&atv; 331 uint32_t tsec, tusec ; 332 333 int i, k, jr, b, evFree ; 334 int headmem=0 ; 335 size_t needmem = 0 ; 336 337 338 b = sk/7 ; 339 340 if (nRoi[0] <0 || nRoi[0] > 1024) { 341 snprintf(str,MXSTR,"illegal nRoi[0]: %d",nRoi[0]) ; 342 factOut(kError, 999, str ) ; 343 gj.badRoiR++ ; 344 gj.badRoi[b]++ ; 345 return -9999 ; 346 } 347 348 for (jr=1; jr<8; jr++) { 349 if ( nRoi[jr] != nRoi[0] ) { 350 snprintf(str,MXSTR,"wrong nRoi[%d]: %d %d",jr,nRoi[jr],nRoi[0]) ; 351 factOut(kError,711, str ) ; 352 gj.badRoiB++ ; 353 gj.badRoi[b]++ ; 354 return -7101 ; 355 } 356 } 357 if ( nRoi[8] < nRoi[0] ) { 358 snprintf(str,MXSTR,"wrong nRoi_TM: %d %d",nRoi[8],nRoi[0]) ; 359 factOut(kError,712, str ) ; 360 gj.badRoiB++ ; 361 gj.badRoi[b]++ ; 362 return -7102 ; 363 } 364 365 366 i = evID % MAX_EVT ; 367 evFree = -1 ; 368 369 for ( k=0; k<MAX_RUN; k++) { 370 if ( mBuffer[i].evNum == evID 371 && mBuffer[i].runNum== runID ) { //event is already registered; 372 // is it ok ???? 373 if ( mBuffer[i].nRoi != nRoi[0] 374 || mBuffer[i].nRoiTM != nRoi[8] ) { 375 snprintf(str,MXSTR,"illegal evt_roi %d %d ; %d %d", 376 nRoi[0],nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM ); 377 factOut(kError,821, str ) ; 378 gj.badRoiE++ ; 379 gj.badRoi[b]++ ; 380 return -8201 ; 352 struct timeval *tv, atv; 353 tv = &atv; 354 uint32_t tsec, tusec; 355 uint oldest; 356 int jold; 357 358 int i, k, jr, b, evFree; 359 int headmem = 0; 360 size_t needmem = 0; 361 362 363 b = sk / 7; 364 365 if (nRoi[0] < 0 || nRoi[0] > 1024) { 366 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]); 367 factOut (kError, 999, str); 368 gj.badRoiR++; 369 gj.badRoi[b]++; 370 return -9999; 371 } 372 373 for (jr = 1; jr < 8; jr++) { 374 if (nRoi[jr] != nRoi[0]) { 375 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr], 376 nRoi[0]); 377 factOut (kError, 711, str); 378 gj.badRoiB++; 379 gj.badRoi[b]++; 380 return -7101; 381 } 382 } 383 if (nRoi[8] < nRoi[0]) { 384 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]); 385 factOut (kError, 712, str); 386 gj.badRoiB++; 387 gj.badRoi[b]++; 388 return -7102; 389 } 390 391 392 i = evID % MAX_EVT; 393 evFree = -1; 394 395 for (k = 0; k < MAX_RUN; k++) { 396 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered; 397 // is it ok ???? 398 if (mBuffer[i].nRoi != nRoi[0] 399 || mBuffer[i].nRoiTM != nRoi[8]) { 400 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d", 401 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM); 402 factOut (kError, 821, str); 403 gj.badRoiE++; 404 gj.badRoi[b]++; 405 return -8201; 381 406 } 382 383 407 // count for inconsistencies 384 408 385 if ( mBuffer[i].trgNum != trgNum ) mBuffer[i].Errors[0]++ ; 386 if ( mBuffer[i].fadNum != fadNum ) mBuffer[i].Errors[1]++ ; 387 if ( mBuffer[i].trgTyp != trgTyp ) mBuffer[i].Errors[2]++ ; 409 if (mBuffer[i].trgNum != trgNum) 410 mBuffer[i].Errors[0]++; 411 if (mBuffer[i].fadNum != fadNum) 412 mBuffer[i].Errors[1]++; 413 if (mBuffer[i].trgTyp != trgTyp) 414 mBuffer[i].Errors[2]++; 388 415 389 416 //everything seems fine so far ==> use this slot .... 390 return i ; 391 } 392 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ; 393 i += MAX_EVT ; 417 return i; 418 } 419 if (evFree < 0 && mBuffer[i].evNum < 0) 420 evFree = i; 421 i += MAX_EVT; 394 422 } 395 423 396 424 397 425 //event does not yet exist; create it 398 if (evFree < 0 ) {//no space available in ctrl399 snprintf(str,MXSTR,"no control slot to keep event %d",evID);400 factOut(kError,881, str );401 return -1;402 } 403 i = evFree ;//found free entry; use it ...404 405 gettimeofday (tv, NULL);406 tsec = atv.tv_sec 407 tusec = atv.tv_usec ;426 if (evFree < 0) { //no space available in ctrl 427 snprintf (str, MXSTR, "no control slot to keep event %d", evID); 428 factOut (kError, 881, str); 429 return -1; 430 } 431 i = evFree; //found free entry; use it ... 432 433 gettimeofday (tv, NULL); 434 tsec = atv.tv_sec; 435 tusec = atv.tv_usec; 408 436 409 437 //check if runId already registered in runCtrl 410 evFree = -1 ; 411 for (k=0; k<MAX_RUN; k++) { 412 if (runCtrl[k].runId == runID ) { 413 if ( runCtrl[k].roi0 != nRoi[0] 414 || runCtrl[k].roi8 != nRoi[8] ) { 415 snprintf(str,MXSTR,"illegal run_roi %d %d ; %d %d", 416 nRoi[0],nRoi[8],runCtrl[k].roi0,runCtrl[k].roi8 ); 417 factOut(kError,931, str ) ; 418 gj.badRoiR++ ; 419 gj.badRoi[b]++ ; 420 return -9301 ; 438 evFree = -1; 439 oldest = g_actTime + 1000; 440 jold = -1; 441 for (k = 0; k < MAX_RUN; k++) { 442 if (runCtrl[k].runId == runID) { 443 if (runCtrl[k].procId > 0) { //run is closed -> reject 444 snprintf (str, MXSTR, "skip event since run %d finished", runID); 445 factOut (kInfo, 931, str); 446 return -21; 421 447 } 422 goto RUNFOUND ; 423 } 424 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ; 425 } 426 427 if (evFree <0 ) { 428 snprintf(str,MXSTR,"not able to register the new run %d",runID); 429 factOut(kFatal,883, str ) ; 430 return -1001 ; 448 449 if (runCtrl[k].roi0 != nRoi[0] 450 || runCtrl[k].roi8 != nRoi[8]) { 451 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d", 452 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8); 453 factOut (kError, 931, str); 454 gj.badRoiR++; 455 gj.badRoi[b]++; 456 return -9301; 457 } 458 goto RUNFOUND; 459 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used 460 evFree = k; 461 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed 462 if (runCtrl[k].closeTime < oldest) { 463 oldest = runCtrl[k].closeTime; 464 jold = k; 465 } 466 } 467 } 468 469 if (evFree < 0 && jold < 0) { 470 snprintf (str, MXSTR, "not able to register the new run %d", runID); 471 factOut (kFatal, 883, str); 472 return -1001; 431 473 } else { 432 snprintf(str,MXSTR,"register new run %d roi: %d %d",runID,nRoi[0],nRoi[8]) ; 433 factOut(kInfo,503, str ) ; 434 runCtrl[evFree].runId = runID ; 435 runCtrl[evFree].roi0 = nRoi[0] ; 436 runCtrl[evFree].roi8 = nRoi[8] ; 437 runCtrl[evFree].fileId = -2 ; 438 runCtrl[evFree].lastEvt= -1 ; 439 runCtrl[evFree].nextEvt= 0; 474 if (evFree < 0) 475 evFree = jold; 476 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID, 477 evFree, nRoi[0], nRoi[8]); 478 factOut (kInfo, 503, str); 479 runCtrl[evFree].runId = runID; 480 runCtrl[evFree].roi0 = nRoi[0]; 481 runCtrl[evFree].roi8 = nRoi[8]; 482 runCtrl[evFree].fileId = -2; 483 runCtrl[evFree].procId = -2; 484 runCtrl[evFree].lastEvt = -1; 485 runCtrl[evFree].nextEvt = 0; 440 486 runCtrl[evFree].actEvt = 0; 441 runCtrl[evFree]. maxEvt = 999999999 ; //max number events allowed442 runCtrl[evFree]. firstUsec=tusec ;443 runCtrl[evFree].first Time=444 runCtrl[evFree]. lastTime=tsec;445 runCtrl[evFree].closeTime =tsec + 3600*24 ;//max time allowed446 runCtrl[evFree].lastTime = 0 487 runCtrl[evFree].procEvt = 0; 488 runCtrl[evFree].maxEvt = 999999999; //max number events allowed 489 runCtrl[evFree].firstUsec = tusec; 490 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec; 491 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed 492 runCtrl[evFree].lastTime = 0; 447 493 448 494 runTail[evFree].nEventsOk = 449 runTail[evFree].nEventsRej = 450 runTail[evFree].nEventsBad = 451 runTail[evFree].PCtime0 = 452 runTail[evFree].PCtimeX = 0 ; 453 } 454 455 RUNFOUND: 456 457 needmem = sizeof(EVENT) + NPIX*nRoi[0]*2 + NTMARK*nRoi[0]*2; // 458 459 headmem = NBOARDS* sizeof(PEVNT_HEADER) ; 460 461 if ( gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) { 462 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize ; 463 if (gi_memStat>0 ) { 464 gi_memStat = -99 ; 465 snprintf(str,MXSTR,"no memory left to keep event %6d sock %3d",evID,sk) ; 466 factOut(kError,882, str ) ; 467 } else { 468 snprintf(str,MXSTR,"no memory left to keep event %6d sock %3d",evID,sk) ; 469 factOut(kDebug,882, str ) ; 470 } 471 return -11 ; 472 } 473 474 mBuffer[i].FADhead = malloc( headmem ) ; 495 runTail[evFree].nEventsRej = 496 runTail[evFree].nEventsBad = 497 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0; 498 } 499 500 RUNFOUND: 501 502 needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; // 503 504 headmem = NBOARDS * sizeof (PEVNT_HEADER); 505 506 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) { 507 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize; 508 if (gi_memStat > 0) { 509 gi_memStat = -99; 510 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d", 511 evID, sk); 512 factOut (kError, 882, str); 513 } else { 514 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d", 515 evID, sk); 516 factOut (kDebug, 882, str); 517 } 518 return -11; 519 } 520 521 mBuffer[i].FADhead = malloc (headmem); 475 522 if (mBuffer[i].FADhead == NULL) { 476 snprintf(str,MXSTR,"malloc header failed for event %d",evID);477 factOut(kError,882, str );523 snprintf (str, MXSTR, "malloc header failed for event %d", evID); 524 factOut (kError, 882, str); 478 525 return -12; 479 526 } 480 527 481 mBuffer[i].fEvent = malloc( needmem );482 if (mBuffer[i].fEvent 483 snprintf(str,MXSTR,"malloc data failed for event %d",evID);484 factOut(kError,882, str );485 free (mBuffer[i].FADhead);486 mBuffer[i].FADhead = NULL 528 mBuffer[i].fEvent = malloc (needmem); 529 if (mBuffer[i].fEvent == NULL) { 530 snprintf (str, MXSTR, "malloc data failed for event %d", evID); 531 factOut (kError, 882, str); 532 free (mBuffer[i].FADhead); 533 mBuffer[i].FADhead = NULL; 487 534 return -22; 488 535 } 489 536 490 mBuffer[i].buffer = malloc( gi_maxSize );537 mBuffer[i].buffer = malloc (gi_maxSize); 491 538 if (mBuffer[i].buffer == NULL) { 492 snprintf(str,MXSTR,"malloc buffer failed for event %d",evID);493 factOut(kError,882, str );494 free (mBuffer[i].FADhead);495 mBuffer[i].FADhead = NULL 496 free (mBuffer[i].fEvent);497 mBuffer[i].fEvent = NULL 539 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID); 540 factOut (kError, 882, str); 541 free (mBuffer[i].FADhead); 542 mBuffer[i].FADhead = NULL; 543 free (mBuffer[i].fEvent); 544 mBuffer[i].fEvent = NULL; 498 545 return -32; 499 546 } 500 501 502 503 547 //flag all boards as unused 504 mBuffer[i].nBoard = 0 505 for (k =0; k<NBOARDS; k++) {548 mBuffer[i].nBoard = 0; 549 for (k = 0; k < NBOARDS; k++) { 506 550 mBuffer[i].board[k] = -1; 507 551 } 508 552 //flag all pixels as unused 509 for (k =0; k<NPIX; k++) {510 mBuffer[i].fEvent->StartPix[k] = -1 553 for (k = 0; k < NPIX; k++) { 554 mBuffer[i].fEvent->StartPix[k] = -1; 511 555 } 512 556 //flag all TMark as unused 513 for (k=0; k<NTMARK; k++ ) { 514 mBuffer[i].fEvent->StartTM[k] = -1 ; 515 } 516 517 mBuffer[i].fEvent->NumBoards = 0 ; 518 mBuffer[i].fEvent->PCUsec = tusec ; 519 mBuffer[i].fEvent->PCTime = 520 mBuffer[i].pcTime = tsec ; 521 mBuffer[i].nRoi = nRoi[0] ; 522 mBuffer[i].nRoiTM = nRoi[8] ; 523 mBuffer[i].evNum = evID ; 524 mBuffer[i].runNum = runID ; 525 mBuffer[i].fadNum = fadNum; 526 mBuffer[i].trgNum = trgNum; 527 mBuffer[i].trgTyp = trgTyp ; 528 mBuffer[i].evtLen = needmem ; 557 for (k = 0; k < NTMARK; k++) { 558 mBuffer[i].fEvent->StartTM[k] = -1; 559 } 560 561 mBuffer[i].fEvent->NumBoards = 0; 562 mBuffer[i].fEvent->PCUsec = tusec; 563 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec; 564 mBuffer[i].nRoi = nRoi[0]; 565 mBuffer[i].nRoiTM = nRoi[8]; 566 mBuffer[i].evNum = evID; 567 mBuffer[i].runNum = runID; 568 mBuffer[i].fadNum = fadNum; 569 mBuffer[i].trgNum = trgNum; 570 mBuffer[i].trgTyp = trgTyp; 571 mBuffer[i].evtLen = needmem; 529 572 mBuffer[i].Errors[0] = 530 mBuffer[i].Errors[1] =531 mBuffer[i].Errors[2] = 532 mBuffer[i].Errors[3] = 0;533 534 gj.usdMem += needmem + headmem + gi_maxSize;535 if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ; 536 537 gj.bufTot++ ;538 if (gj.bufTot > gj.maxEvt ) gj.maxEvt = gj.bufTot;539 540 gj.rateNew++ 573 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0; 574 575 gj.usdMem += needmem + headmem + gi_maxSize; 576 if (gj.usdMem > gj.maxMem) 577 gj.maxMem = gj.usdMem; 578 579 gj.bufTot++; 580 if (gj.bufTot > gj.maxEvt) 581 gj.maxEvt = gj.bufTot; 582 583 gj.rateNew++; 541 584 542 585 //register event in 'active list (reading)' 543 586 544 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ; 545 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ; 546 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ; 547 evtIdx[i] = evtCtrl.lastPtr ; 548 549 550 snprintf(str,MXSTR,"%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", 551 evID,runID,i,evtCtrl.lastPtr,sk,fadlen,mBuffer[i].pcTime); 552 factOut(kDebug,-11, str ) ; 553 evtCtrl.lastPtr++ ; 554 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0; 555 556 gi.evtGet++ ; 557 558 return i ; 559 587 evtCtrl.evtBuf[evtCtrl.lastPtr] = i; 588 evtCtrl.evtStat[evtCtrl.lastPtr] = 0; 589 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime; 590 evtIdx[i] = evtCtrl.lastPtr; 591 592 593 snprintf (str, MXSTR, 594 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID, 595 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime); 596 factOut (kDebug, -11, str); 597 evtCtrl.lastPtr++; 598 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN) 599 evtCtrl.lastPtr = 0; 600 601 gi.evtGet++; 602 603 return i; 604 560 605 } /*-----------------------------------------------------------------*/ 561 606 … … 563 608 564 609 565 int mBufFree(int i) { 610 int 611 mBufFree (int i) 612 { 566 613 //delete entry [i] from mBuffer: 567 614 //(and make sure multiple calls do no harm ....) 568 615 569 int headmem=0 ; 570 int evid ; 571 size_t freemem = 0 ; 572 573 evid = mBuffer[i].evNum ; 574 freemem = mBuffer[i].evtLen ; 575 576 free(mBuffer[i].fEvent ) ; 577 mBuffer[i].fEvent = NULL ; 578 579 free(mBuffer[i].FADhead ) ; 580 mBuffer[i].FADhead = NULL ; 581 582 free(mBuffer[i].buffer ) ; 583 mBuffer[i].buffer = NULL ; 584 585 headmem = NBOARDS* sizeof(PEVNT_HEADER) ; 586 mBuffer[i].evNum = mBuffer[i].nRoi= -1; 587 mBuffer[i].runNum = 0; 588 589 gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize ; 590 gj.bufTot-- ; 591 592 if (gi_memStat < 0 ) { 593 if (gj.usdMem <= 0.75 * gj.maxMem ) gi_memStat = +1 ; 594 } 595 596 597 return 0 ; 598 616 int headmem = 0; 617 int evid; 618 size_t freemem = 0; 619 620 evid = mBuffer[i].evNum; 621 freemem = mBuffer[i].evtLen; 622 623 free (mBuffer[i].fEvent); 624 mBuffer[i].fEvent = NULL; 625 626 free (mBuffer[i].FADhead); 627 mBuffer[i].FADhead = NULL; 628 629 free (mBuffer[i].buffer); 630 mBuffer[i].buffer = NULL; 631 632 headmem = NBOARDS * sizeof (PEVNT_HEADER); 633 mBuffer[i].evNum = mBuffer[i].nRoi = -1; 634 mBuffer[i].runNum = 0; 635 636 gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize; 637 gj.bufTot--; 638 639 if (gi_memStat < 0) { 640 if (gj.usdMem <= 0.75 * gj.maxMem) 641 gi_memStat = +1; 642 } 643 644 645 return 0; 646 599 647 } /*-----------------------------------------------------------------*/ 600 648 601 649 602 void resetEvtStat() { 603 int i ; 604 605 for (i=0; i<MAX_SOCK; i++) gi.numRead[i] = 0 ; 606 607 for (i=0; i<NBOARDS; i++ ) { 608 gi.gotByte[i] = 0 ; 609 gi.gotErr[i] = 0 ; 610 611 } 612 613 gi.evtGet = 0 ; //#new Start of Events read 614 gi.evtTot = 0 ; //#complete Events read 615 gi.evtErr = 0 ; //#Events with Errors 616 gi.evtSkp = 0 ; //#Events incomplete (timeout) 617 618 gi.procTot = 0 ; //#Events processed 619 gi.procErr = 0 ; //#Events showed problem in processing 620 gi.procTrg = 0 ; //#Events accepted by SW trigger 621 gi.procSkp = 0 ; //#Events rejected by SW trigger 622 623 gi.feedTot = 0 ; //#Events used for feedBack system 624 gi.feedErr = 0 ; //#Events rejected by feedBack 625 626 gi.wrtTot = 0 ; //#Events written to disk 627 gi.wrtErr = 0 ; //#Events with write-error 628 629 gi.runOpen = 0 ; //#Runs opened 630 gi.runClose= 0 ; //#Runs closed 631 gi.runErr = 0 ; //#Runs with open/close errors 632 633 return ; 650 void 651 resetEvtStat () 652 { 653 int i; 654 655 for (i = 0; i < MAX_SOCK; i++) 656 gi.numRead[i] = 0; 657 658 for (i = 0; i < NBOARDS; i++) { 659 gi.gotByte[i] = 0; 660 gi.gotErr[i] = 0; 661 662 } 663 664 gi.evtGet = 0; //#new Start of Events read 665 gi.evtTot = 0; //#complete Events read 666 gi.evtErr = 0; //#Events with Errors 667 gi.evtSkp = 0; //#Events incomplete (timeout) 668 669 gi.procTot = 0; //#Events processed 670 gi.procErr = 0; //#Events showed problem in processing 671 gi.procTrg = 0; //#Events accepted by SW trigger 672 gi.procSkp = 0; //#Events rejected by SW trigger 673 674 gi.feedTot = 0; //#Events used for feedBack system 675 gi.feedErr = 0; //#Events rejected by feedBack 676 677 gi.wrtTot = 0; //#Events written to disk 678 gi.wrtErr = 0; //#Events with write-error 679 680 gi.runOpen = 0; //#Runs opened 681 gi.runClose = 0; //#Runs closed 682 gi.runErr = 0; //#Runs with open/close errors 683 684 return; 634 685 } /*-----------------------------------------------------------------*/ 635 686 636 687 637 688 638 void initReadFAD() { 639 return ; 689 void 690 initReadFAD () 691 { 692 return; 640 693 } /*-----------------------------------------------------------------*/ 641 694 642 695 643 696 644 void *readFAD( void *ptr ) { 697 void * 698 readFAD (void *ptr) 699 { 645 700 /* *** main loop reading FAD data and sorting them to complete events */ 646 int head_len,frst_len,numok,numok2,numokx,dest,evID,i,k ; 647 int actBoards = 0, minLen ; 648 int32_t jrd ; 649 uint gi_SecTime ; //time in seconds 650 int boardId, roi[9],drs,px,src,pixS,pixH,pixC,pixR,tmS ; 651 652 int goodhed = 0 ; 653 int errcnt0 = 0 ; 654 655 int sockDef[NBOARDS]; //internal state of sockets 656 int jrdx ; 657 658 659 struct timespec xwait ; 660 661 662 struct timeval *tv, atv; 663 tv=&atv; 664 uint32_t tsec, tusec ; 665 666 667 snprintf(str,MXSTR,"start initializing"); 668 factOut(kInfo,-1, str ) ; 669 670 int cpu = 7 ; //read thread 671 cpu_set_t mask; 701 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k; 702 int actBoards = 0, minLen; 703 int32_t jrd; 704 uint gi_SecTime; //time in seconds 705 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS; 706 707 int goodhed = 0; 708 709 int sockDef[NBOARDS]; //internal state of sockets 710 int jrdx; 711 712 713 struct timespec xwait; 714 715 716 struct timeval *tv, atv; 717 tv = &atv; 718 uint32_t tsec, tusec; 719 720 721 snprintf (str, MXSTR, "start initializing"); 722 factOut (kInfo, -1, str); 723 724 int cpu = 7; //read thread 725 cpu_set_t mask; 672 726 673 727 /* CPU_ZERO initializes all the bits in the mask to zero. */ 674 CPU_ZERO( &mask);728 CPU_ZERO (&mask); 675 729 /* CPU_SET sets only the bit corresponding to cpu. */ 676 cpu = 7;677 CPU_SET( cpu, &mask);730 cpu = 7; 731 CPU_SET (cpu, &mask); 678 732 679 733 /* sched_setaffinity returns 0 in success */ 680 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1) {681 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);682 factOut(kWarn,-1, str );683 }684 685 686 head_len = sizeof(PEVNT_HEADER);687 frst_len = head_len ;//max #bytes to read first: fad_header only, so each event must be longer, even for roi=0688 minLen = head_len ;//min #bytes needed to check header: full header for debug689 690 start.S=0xFB01;691 stop.S= 0x04FE;734 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { 735 snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); 736 factOut (kWarn, -1, str); 737 } 738 739 740 head_len = sizeof (PEVNT_HEADER); 741 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0 742 minLen = head_len; //min #bytes needed to check header: full header for debug 743 744 start.S = 0xFB01; 745 stop.S = 0x04FE; 692 746 693 747 /* initialize run control logics */ 694 for (i=0; i<MAX_RUN; i++) { 695 runCtrl[i].runId = 0 ; 696 runCtrl[i].fileId = -2 ; 697 } 698 gi_resetS = gi_resetR = 9; 699 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ; 700 701 START: 702 gettimeofday( tv, NULL); 703 g_actTime = tsec = atv.tv_sec ; 704 g_actUsec = tusec= atv.tv_usec ; 705 gi_myRun = g_actTime ; 706 evtCtrl.frstPtr = 0 ; 707 evtCtrl.lastPtr = 0 ; 708 709 gi_SecTime = g_actTime ; 710 gi_runStat = g_runStat ; 711 gj.readStat= g_runStat ; 712 numok = numok2 = 0 ; 713 714 if ( gi_resetS > 0) { 715 //make sure all sockets are preallocated as 'not exist' 716 for (i=0; i<MAX_SOCK; i++) { 717 rd[i].socket = -1 ; 718 rd[i].sockStat = 99 ; 719 } 720 721 for (k=0; k<NBOARDS; k++) { 722 gi_NumConnect[k]=0; 723 gi.numConn[k] =0; 724 gj.numConn[k] =0; 725 gj.errConn[k] =0; 726 gj.rateBytes[k] =0; 727 gj.totBytes[k] =0; 728 } 729 730 } 731 732 733 if ( gi_resetR > 0) { 734 resetEvtStat(); 735 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0 ; 736 gj.usdMem = gj.maxMem = gj.xxxMem = 0 ; 737 gj.totMem = g_maxMem ; 738 gj.bufNew = gj.bufEvt = 0 ; 739 gj.badRoiE = gj.badRoiR = gj.badRoiB = 740 gj.evtSkip = gj.evtWrite = gj.evtErr = 0 ; 741 742 int b ; 743 for (b=0; b<NBOARDS; b++) gj.badRoi[b]=0 ; 744 745 mBufInit() ; //initialize buffers 746 747 snprintf(str,MXSTR,"end initializing"); 748 factOut(kInfo,-1, str ) ; 749 } 750 751 752 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0 ; 753 754 while (g_runStat >=0 && g_reset ==0 ) { //loop until global variable g_runStat claims stop 755 756 gi_runStat = g_runStat; 757 gj.readStat= g_runStat; 758 gettimeofday( tv, NULL); 759 g_actTime = tsec = atv.tv_sec ; 760 g_actUsec = tusec= atv.tv_usec ; 761 762 763 int b,p,p0,s0,nch; 764 nch = 0 ; 765 for (b=0; b<NBOARDS; b++ ) { 766 k = b*7 ; 767 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ... 768 nch++ ; 769 gi_NumConnect[ b ] = 0 ; //must close all connections 770 gi.numConn[ b ] = 0; 771 gj.numConn[ b ] = 0; 772 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened 773 else if (g_port[b].sockDef == 0) s0= -1 ; //sockets to be destroyed 774 else s0= +1 ; //sockets to be closed and reopened 775 776 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port); 777 else p0=0 ; 778 779 for (p=p0+1; p<p0+8; p++) { 780 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket 781 k++ ; 782 } 783 sockDef[b] = g_port[b].sockDef ; 784 } 785 } 786 787 if (nch > 0 ) { 788 actBoards = 0 ; 789 for (b=0; b<NBOARDS; b++ ) { 790 if ( sockDef[b] > 0 ) actBoards++ ; 791 } 792 } 793 794 795 jrdx = 0; 796 numokx= 0; 797 numok = 0 ; //count number of succesfull actions 798 799 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read 800 b = i / 7 ; 801 if (sockDef[b] > 0) s0= +1 ; 802 else s0= -1 ; 803 804 if (rd[i].sockStat <0 ) { //try to connect if not yet done 805 rd[i].sockStat=connect(rd[i].socket, 806 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ; 807 if (rd[i].sockStat ==0 ) { //successfull ==> 808 if (sockDef[b] > 0) { 809 rd[i].bufTyp = 0 ; // expect a header 810 rd[i].bufLen = frst_len ; // max size to read at begining 811 } else { 812 rd[i].bufTyp = -1 ; // full data to be skipped 813 rd[i].bufLen = MAX_LEN ; //huge for skipping 814 } 815 rd[i].bufPos = 0 ; // no byte read so far 816 rd[i].skip = 0 ; // start empty 817 gi_NumConnect[ b ]++ ; 818 gi.numConn[ b ]++ ; 819 gj.numConn[ b ]++ ; 820 snprintf(str,MXSTR,"+++connect %d %d",b,gi.numConn[ b ]); 821 factOut(kInfo,-1, str ) ; 822 } 823 } 824 825 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read 826 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full] 827 numok++ ; 828 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); 829 830 if (jrd >0 ) { 831 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ; 748 for (i = 0; i < MAX_RUN; i++) { 749 runCtrl[i].runId = 0; 750 runCtrl[i].fileId = -2; 751 runCtrl[i].procId = -2; 752 } 753 gi_resetS = gi_resetR = 9; 754 for (i = 0; i < NBOARDS; i++) 755 sockDef[i] = 0; 756 757 START: 758 gettimeofday (tv, NULL); 759 g_actTime = tsec = atv.tv_sec; 760 g_actUsec = tusec = atv.tv_usec; 761 gi_myRun = g_actTime; 762 evtCtrl.frstPtr = 0; 763 evtCtrl.lastPtr = 0; 764 765 gi_SecTime = g_actTime; 766 gi_runStat = g_runStat; 767 gj.readStat = g_runStat; 768 numok = numok2 = 0; 769 770 if (gi_resetS > 0) { 771 //make sure all sockets are preallocated as 'not exist' 772 for (i = 0; i < MAX_SOCK; i++) { 773 rd[i].socket = -1; 774 rd[i].sockStat = 99; 775 } 776 777 for (k = 0; k < NBOARDS; k++) { 778 gi_NumConnect[k] = 0; 779 gi.numConn[k] = 0; 780 gj.numConn[k] = 0; 781 gj.errConn[k] = 0; 782 gj.rateBytes[k] = 0; 783 gj.totBytes[k] = 0; 784 } 785 786 } 787 788 789 if (gi_resetR > 0) { 790 resetEvtStat (); 791 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0; 792 gj.usdMem = gj.maxMem = gj.xxxMem = 0; 793 gj.totMem = g_maxMem; 794 gj.bufNew = gj.bufEvt = 0; 795 gj.badRoiE = gj.badRoiR = gj.badRoiB = 796 gj.evtSkip = gj.evtWrite = gj.evtErr = 0; 797 798 int b; 799 for (b = 0; b < NBOARDS; b++) 800 gj.badRoi[b] = 0; 801 802 mBufInit (); //initialize buffers 803 804 snprintf (str, MXSTR, "end initializing"); 805 factOut (kInfo, -1, str); 806 } 807 808 809 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0; 810 811 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop 812 813 gi_runStat = g_runStat; 814 gj.readStat = g_runStat; 815 gettimeofday (tv, NULL); 816 g_actTime = tsec = atv.tv_sec; 817 g_actUsec = tusec = atv.tv_usec; 818 819 820 int b, p, p0, s0, nch; 821 nch = 0; 822 for (b = 0; b < NBOARDS; b++) { 823 k = b * 7; 824 if (g_port[b].sockDef != sockDef[b]) { //something has changed ... 825 nch++; 826 gi_NumConnect[b] = 0; //must close all connections 827 gi.numConn[b] = 0; 828 gj.numConn[b] = 0; 829 if (sockDef[b] == 0) 830 s0 = 0; //sockets to be defined and opened 831 else if (g_port[b].sockDef == 0) 832 s0 = -1; //sockets to be destroyed 833 else 834 s0 = +1; //sockets to be closed and reopened 835 836 if (s0 == 0) 837 p0 = ntohs (g_port[b].sockAddr.sin_port); 838 else 839 p0 = 0; 840 841 for (p = p0 + 1; p < p0 + 8; p++) { 842 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket 843 k++; 844 } 845 sockDef[b] = g_port[b].sockDef; 846 } 847 } 848 849 if (nch > 0) { 850 actBoards = 0; 851 for (b = 0; b < NBOARDS; b++) { 852 if (sockDef[b] > 0) 853 actBoards++; 854 } 855 } 856 857 858 jrdx = 0; 859 numokx = 0; 860 numok = 0; //count number of succesfull actions 861 862 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read 863 b = i / 7; 864 if (sockDef[b] > 0) 865 s0 = +1; 866 else 867 s0 = -1; 868 869 if (rd[i].sockStat < 0) { //try to connect if not yet done 870 rd[i].sockStat = connect (rd[i].socket, 871 (struct sockaddr *) &rd[i].SockAddr, 872 sizeof (rd[i].SockAddr)); 873 if (rd[i].sockStat == 0) { //successfull ==> 874 if (sockDef[b] > 0) { 875 rd[i].bufTyp = 0; // expect a header 876 rd[i].bufLen = frst_len; // max size to read at begining 877 } else { 878 rd[i].bufTyp = -1; // full data to be skipped 879 rd[i].bufLen = MAX_LEN; //huge for skipping 880 } 881 rd[i].bufPos = 0; // no byte read so far 882 rd[i].skip = 0; // start empty 883 gi_NumConnect[b]++; 884 gi.numConn[b]++; 885 gj.numConn[b]++; 886 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]); 887 factOut (kInfo, -1, str); 888 } 889 } 890 891 if (rd[i].sockStat == 0) { //we have a connection ==> try to read 892 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full] 893 numok++; 894 jrd = 895 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos], 896 rd[i].bufLen, MSG_DONTWAIT); 897 898 if (jrd > 0) { 899 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd); 832 900 #ifdef EVTDEBUG 833 memcpy(&rd[i].xBuf->B[ rd[i].bufPos], &rd[i].rBuf->B[ rd[i].bufPos], jrd) ; 834 snprintf(str,MXSTR,"read sock %3d bytes %5d len %5d first %d %d",i,jrd,rd[i].bufLen, 835 rd[i].rBuf->B[ rd[i].bufPos ], 836 rd[i].rBuf->B[ rd[i].bufPos +1] ); 837 factOut(kDebug,301, str ) ; 901 memcpy (&rd[i].xBuf->B[rd[i].bufPos], 902 &rd[i].rBuf->B[rd[i].bufPos], jrd); 903 snprintf (str, MXSTR, 904 "read sock %3d bytes %5d len %5d first %d %d", i, 905 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos], 906 rd[i].rBuf->B[rd[i].bufPos + 1]); 907 factOut (kDebug, 301, str); 838 908 #endif 839 } 840 841 if (jrd == 0) { //connection has closed ... 842 snprintf(str,MXSTR,"Socket %d closed by FAD",i); 843 factOut(kInfo,441, str ) ; 844 GenSock(s0, i, 0,NULL, &rd[i]) ; 845 gi.gotErr[ b ]++ ; 846 gi_NumConnect[ b ]-- ; 847 gi.numConn[ b ]-- ; 848 gj.numConn[ b ]-- ; 849 850 } else if ( jrd<0 ) { //did not read anything 851 if (errno != EAGAIN && errno != EWOULDBLOCK ) { 852 snprintf(str,MXSTR,"Error Reading from %d | %m",i); 853 factOut(kError,442, str ) ; 854 gi.gotErr[ b ]++ ; 855 } else numok-- ; //else nothing waiting to be read 856 jrd = 0 ; 857 } 858 } else { 859 jrd = 0 ; //did read nothing as requested 860 snprintf(str,MXSTR,"do not read from socket %d %d",i,rd[i].bufLen ) ; 861 factOut(kDebug,301, str ) ; 862 } 863 864 gi.gotByte[ b ] += jrd ; 865 gj.rateBytes[b] += jrd ; 866 867 if (jrd>0) {numokx++ ; jrdx+= jrd; } 868 869 870 if ( rd[i].bufTyp <0 ) { // we are skipping this board ... 909 } 910 911 if (jrd == 0) { //connection has closed ... 912 snprintf (str, MXSTR, "Socket %d closed by FAD", i); 913 factOut (kInfo, 441, str); 914 GenSock (s0, i, 0, NULL, &rd[i]); 915 gi.gotErr[b]++; 916 gi_NumConnect[b]--; 917 gi.numConn[b]--; 918 gj.numConn[b]--; 919 920 } else if (jrd < 0) { //did not read anything 921 if (errno != EAGAIN && errno != EWOULDBLOCK) { 922 snprintf (str, MXSTR, "Error Reading from %d | %m", i); 923 factOut (kError, 442, str); 924 gi.gotErr[b]++; 925 } else 926 numok--; //else nothing waiting to be read 927 jrd = 0; 928 } 929 } else { 930 jrd = 0; //did read nothing as requested 931 snprintf (str, MXSTR, "do not read from socket %d %d", i, 932 rd[i].bufLen); 933 factOut (kDebug, 301, str); 934 } 935 936 gi.gotByte[b] += jrd; 937 gj.rateBytes[b] += jrd; 938 939 if (jrd > 0) { 940 numokx++; 941 jrdx += jrd; 942 } 943 944 945 if (rd[i].bufTyp < 0) { // we are skipping this board ... 871 946 // just do nothing 872 947 #ifdef EVTDEBUG 873 snprintf(str,MXSTR,"skipping %d bytes on socket %d",jrd,i) ; 874 factOut(kInfo,301, str ) ; 948 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd, 949 i); 950 factOut (kInfo, 301, str); 875 951 #endif 876 952 877 } else if ( rd[i].bufTyp >0 ) { // we are reading data ... 878 if ( jrd < rd[i].bufLen ) { //not yet all read 879 rd[i].bufPos += jrd ; //==> prepare for continuation 880 rd[i].bufLen -= jrd ; 881 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data 882 } else { //full dataset read 883 rd[i].bufLen = 0 ; 884 rd[i].bufPos = rd[i].fadLen ; 885 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0] 886 || rd[i].rBuf->B[ rd[i].bufPos-2] != stop.B[1]) { 887 gi.evtErr++ ; 888 snprintf(str,MXSTR,"wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ", 889 i,rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0], rd[i].rBuf->B[1], rd[i].rBuf->B[ rd[i].bufPos-2], 890 rd[i].rBuf->B[ rd[i].bufPos-1]); 891 factOut(kError,301, str ) ; 892 goto EndBuf ; 953 } else if (rd[i].bufTyp > 0) { // we are reading data ... 954 if (jrd < rd[i].bufLen) { //not yet all read 955 rd[i].bufPos += jrd; //==> prepare for continuation 956 rd[i].bufLen -= jrd; 957 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data 958 } else { //full dataset read 959 rd[i].bufLen = 0; 960 rd[i].bufPos = rd[i].fadLen; 961 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0] 962 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) { 963 gi.evtErr++; 964 snprintf (str, MXSTR, 965 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ", 966 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0], 967 rd[i].rBuf->B[1], 968 rd[i].rBuf->B[rd[i].bufPos - 2], 969 rd[i].rBuf->B[rd[i].bufPos - 1]); 970 factOut (kError, 301, str); 971 goto EndBuf; 893 972 894 973 #ifdef EVTDEBUG 895 } else { 896 snprintf(str,MXSTR,"good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d", 897 i,rd[i].fadLen, 898 rd[i].rBuf->B[ 0 ], rd[i].rBuf->B[ 1 ], start.B[1],start.B[0], 899 rd[i].rBuf->B[ rd[i].bufPos-2], rd[i].rBuf->B[ rd[i].bufPos-1], stop.B[1], stop.B[0]); 900 factOut(kDebug,301, str ) ; 974 } else { 975 snprintf (str, MXSTR, 976 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d", 977 i, rd[i].fadLen, rd[i].rBuf->B[0], 978 rd[i].rBuf->B[1], start.B[1], start.B[0], 979 rd[i].rBuf->B[rd[i].bufPos - 2], 980 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1], 981 stop.B[0]); 982 factOut (kDebug, 301, str); 901 983 #endif 902 } 903 904 if (jrd>0) debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event 905 906 //we have a complete buffer, copy to WORK area 907 int jr ; 908 roi[0] = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ; 909 for (jr=0; jr<9; jr++) { 910 roi[jr] = ntohs(rd[i].rBuf->S[ head_len/2 + 2 + jr*(roi[0]+4) ]) ; 911 } 912 //get index into mBuffer for this event (create if needed) 913 914 int actid; 915 if (g_useFTM >0) actid = rd[i].evtID ; 916 else actid = rd[i].ftmID ; 917 918 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi, i, 919 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID, rd[i].evtID ) ; 920 921 if (evID <-1000) { 922 goto EndBuf ; //not usable board/event/run --> skip it 923 } 924 if (evID < 0) { //no space left, retry later 984 } 985 986 if (jrd > 0) 987 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event 988 989 //we have a complete buffer, copy to WORK area 990 int jr; 991 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]); 992 for (jr = 0; jr < 9; jr++) { 993 roi[jr] = 994 ntohs (rd[i]. 995 rBuf->S[head_len / 2 + 2 + jr * (roi[0] + 4)]); 996 } 997 //get index into mBuffer for this event (create if needed) 998 999 int actid; 1000 if (g_useFTM > 0) 1001 actid = rd[i].evtID; 1002 else 1003 actid = rd[i].ftmID; 1004 1005 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i, 1006 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID, 1007 rd[i].evtID); 1008 1009 if (evID < -1000) { 1010 goto EndBuf; //not usable board/event/run --> skip it 1011 } 1012 if (evID < 0) { //no space left, retry later 925 1013 #ifdef EVTDEBUG 926 if ( rd[i].bufLen != 0) {927 snprintf(str,MXSTR,"something screwed up");928 factOut(kFatal, 1, str );929 }1014 if (rd[i].bufLen != 0) { 1015 snprintf (str, MXSTR, "something screwed up"); 1016 factOut (kFatal, 1, str); 1017 } 930 1018 #endif 931 xwait.tv_sec = 0; 932 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec 933 nanosleep( &xwait , NULL ) ; 934 goto EndBuf1 ; //hope there is free space next round 935 } 936 937 938 //we have a valid entry in mBuffer[]; fill it 1019 xwait.tv_sec = 0; 1020 xwait.tv_nsec = 10000000; // sleep for ~10 msec 1021 nanosleep (&xwait, NULL); 1022 goto EndBuf1; //hope there is free space next round 1023 } 1024 //we have a valid entry in mBuffer[]; fill it 939 1025 940 1026 #ifdef EVTDEBUG 941 int xchk = memcmp(&rd[i].xBuf->B[0], &rd[i].rBuf->B[0], rd[i].fadLen ) ; 942 if (xchk != 0) { 943 snprintf(str,MXSTR,"ERROR OVERWRITE %d %d on port %d",xchk,rd[i].fadLen,i) ; 944 factOut(kFatal, 1, str ) ; 945 946 uint iq; 947 for (iq=0; iq < rd[i].fadLen ; iq++) { 948 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq] ) { 949 snprintf(str,MXSTR,"ERROR %4d %4d %x %x",i,iq,rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]); 950 factOut(kFatal, 1, str ) ; 951 } 1027 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0], 1028 rd[i].fadLen); 1029 if (xchk != 0) { 1030 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d", 1031 xchk, rd[i].fadLen, i); 1032 factOut (kFatal, 1, str); 1033 1034 uint iq; 1035 for (iq = 0; iq < rd[i].fadLen; iq++) { 1036 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) { 1037 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq, 1038 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]); 1039 factOut (kFatal, 1, str); 1040 } 1041 } 1042 } 1043 #endif 1044 int qncpy = 0; 1045 boardId = b; 1046 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1047 int fadCrate = fadBoard / 256; 1048 if (boardId != (fadCrate * 10 + fadBoard % 256)) { 1049 snprintf (str, MXSTR, "wrong Board ID %d %d %d", 1050 fadCrate, fadBoard % 256, boardId); 1051 factOut (kWarn, 301, str); 1052 } 1053 if (mBuffer[evID].board[boardId] != -1) { 1054 snprintf (str, MXSTR, 1055 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ", 1056 evID, boardId, i, rd[i].fadLen, 1057 rd[i].rBuf->B[0], rd[i].rBuf->B[1], 1058 rd[i].rBuf->B[rd[i].bufPos - 2], 1059 rd[i].rBuf->B[rd[i].bufPos - 1]); 1060 factOut (kWarn, 501, str); 1061 goto EndBuf; //--> skip Board 1062 } 1063 1064 int iDx = evtIdx[evID]; //index into evtCtrl 1065 1066 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag, 1067 &rd[i].rBuf->S[0], head_len); 1068 qncpy += head_len; 1069 1070 src = head_len / 2; 1071 for (px = 0; px < 9; px++) { //different sort in FAD board..... 1072 for (drs = 0; drs < 4; drs++) { 1073 pixH = ntohs (rd[i].rBuf->S[src++]); // ID 1074 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell 1075 pixR = ntohs (rd[i].rBuf->S[src++]); // roi 1076 //here we should check if pixH is correct .... 1077 1078 pixS = boardId * 36 + drs * 9 + px; 1079 src++; 1080 1081 1082 mBuffer[evID].fEvent->StartPix[pixS] = pixC; 1083 dest = pixS * roi[0]; 1084 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest], 1085 &rd[i].rBuf->S[src], roi[0] * 2); 1086 qncpy += roi[0] * 2; 1087 src += pixR; 1088 1089 if (px == 8) { 1090 tmS = boardId * 4 + drs; 1091 if (pixR > roi[0]) { //and we have additional TM info 1092 dest = tmS * roi[0] + NPIX * roi[0]; 1093 int srcT = src - roi[0]; 1094 mBuffer[evID].fEvent->StartTM[tmS] = 1095 (pixC + pixR - roi[0]) % 1024; 1096 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest], 1097 &rd[i].rBuf->S[srcT], roi[0] * 2); 1098 qncpy += roi[0] * 2; 1099 } else { 1100 mBuffer[evID].fEvent->StartTM[tmS] = -1; 1101 } 1102 } 1103 } 1104 } // now we have stored a new board contents into Event structure 1105 1106 mBuffer[evID].fEvent->NumBoards++; 1107 mBuffer[evID].board[boardId] = boardId; 1108 evtCtrl.evtStat[iDx]++; 1109 evtCtrl.pcTime[iDx] = g_actTime; 1110 1111 if (++mBuffer[evID].nBoard >= actBoards) { 1112 int qnrun = 0; 1113 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ??? 1114 actrun = mBuffer[evID].runNum; 1115 int ir; 1116 for (ir = 0; ir < MAX_RUN; ir++) { 1117 qnrun++; 1118 if (runCtrl[ir].runId == actrun) { 1119 if (++runCtrl[ir].lastEvt == 0) { 1120 gotNewRun (actrun, mBuffer[evID].FADhead); 1121 snprintf (str, MXSTR, "gotNewRun %d (ev %d)", 1122 mBuffer[evID].runNum, 1123 mBuffer[evID].evNum); 1124 factOut (kInfo, 1, str); 1125 break; 1126 } 1127 } 1128 } 1129 } 1130 snprintf (str, MXSTR, 1131 "%5d complete event roi %4d roiTM %d cpy %8d %5d", 1132 mBuffer[evID].evNum, roi[0], roi[8] - roi[0], 1133 qncpy, qnrun); 1134 factOut (kDebug, -1, str); 1135 1136 //complete event read ---> flag for next processing 1137 evtCtrl.evtStat[iDx] = 99; 1138 gi.evtTot++; 1139 } 1140 1141 EndBuf: 1142 rd[i].bufTyp = 0; //ready to read next header 1143 rd[i].bufLen = frst_len; 1144 rd[i].bufPos = 0; 1145 EndBuf1: 1146 ; 952 1147 } 953 } 1148 1149 } else { //we are reading event header 1150 rd[i].bufPos += jrd; 1151 rd[i].bufLen -= jrd; 1152 if (rd[i].bufPos >= minLen) { //sufficient data to take action 1153 //check if startflag correct; else shift block .... 1154 for (k = 0; k < rd[i].bufPos - 1; k++) { 1155 if (rd[i].rBuf->B[k] == start.B[1] 1156 && rd[i].rBuf->B[k + 1] == start.B[0]) 1157 break; 1158 } 1159 rd[i].skip += k; 1160 1161 if (k >= rd[i].bufPos - 1) { //no start of header found 1162 rd[i].bufPos = 0; 1163 rd[i].bufLen = head_len; 1164 } else if (k > 0) { 1165 rd[i].bufPos -= k; 1166 rd[i].bufLen += k; 1167 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], 1168 rd[i].bufPos); 1169 #ifdef EVTDEBUG 1170 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k], 1171 rd[i].bufPos); 954 1172 #endif 955 int qncpy = 0 ; 956 boardId = b ; 957 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ; 958 int fadCrate = fadBoard/256 ; 959 if (boardId != (fadCrate*10 + fadBoard%256) ) { 960 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ; 961 factOut(kWarn,301, str ) ; 962 } 963 if ( mBuffer[evID].board[ boardId ] != -1) { 964 snprintf(str,MXSTR,"double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ", 965 evID,boardId,i,rd[i].fadLen, rd[i].rBuf->B[0], 966 rd[i].rBuf->B[1], rd[i].rBuf->B[ rd[i].bufPos-2], rd[i].rBuf->B[ rd[i].bufPos-1]); 967 factOut(kWarn,501, str ) ; 968 goto EndBuf ; //--> skip Board 969 } 970 971 int iDx = evtIdx[evID] ; //index into evtCtrl 972 973 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag, 974 &rd[i].rBuf->S[0], head_len) ; 975 qncpy+=head_len ; 976 977 src = head_len/2 ; 978 for ( px=0; px<9; px++ ) { //different sort in FAD board..... 979 for ( drs=0; drs<4; drs++ ) { 980 pixH= ntohs(rd[i].rBuf->S[src++]) ; // ID 981 pixC= ntohs(rd[i].rBuf->S[src++]) ; // start-cell 982 pixR= ntohs(rd[i].rBuf->S[src++]) ; // roi 983 //here we should check if pixH is correct .... 984 985 pixS = boardId*36 + drs*9 + px ; 986 src++ ; 987 988 989 mBuffer[evID].fEvent->StartPix[pixS] =pixC; 990 dest= pixS * roi[0] ; 991 memcpy( 992 &mBuffer[evID].fEvent->Adc_Data[dest], 993 &rd[i].rBuf->S[src], roi[0] * 2) ; 994 qncpy+=roi[0]*2 ; 995 src+= pixR ; 996 997 if ( px==8 ) { 998 tmS =boardId*4 + drs ; 999 if ( pixR > roi[0]) { //and we have additional TM info 1000 dest= tmS * roi[0] + NPIX* roi[0] ; 1001 int srcT= src - roi[0] ; 1002 mBuffer[evID].fEvent->StartTM[tmS] = (pixC+pixR-roi[0])%1024 ; 1003 memcpy( 1004 &mBuffer[evID].fEvent->Adc_Data[dest], 1005 &rd[i].rBuf->S[srcT], roi[0] * 2) ; 1006 qncpy+=roi[0]*2 ; 1007 } else { 1008 mBuffer[evID].fEvent->StartTM[tmS] = -1 ; 1009 } 1010 } 1011 } 1012 }// now we have stored a new board contents into Event structure 1013 1014 mBuffer[evID].fEvent->NumBoards++ ; 1015 mBuffer[evID].board[ boardId ] = boardId ; 1016 evtCtrl.evtStat[ iDx ]++ ; 1017 evtCtrl.pcTime[ iDx ] = g_actTime ; 1018 1019 if (++mBuffer[evID].nBoard >= actBoards ) { 1020 int qnrun =0 ; 1021 if (mBuffer[evID].runNum != actrun ) { // have we already reported first event of this run ??? 1022 actrun = mBuffer[evID].runNum ; 1023 int ir ; 1024 for ( ir=0; ir<MAX_RUN; ir++) { 1025 qnrun++ ; 1026 if ( runCtrl[ir].runId == actrun) { 1027 if ( ++runCtrl[ir].lastEvt ==0 ) { 1028 gotNewRun( actrun, mBuffer[evID].FADhead ); 1029 snprintf(str,MXSTR,"gotNewRun %d (ev %d)",mBuffer[evID].runNum,mBuffer[evID].evNum); 1030 factOut(kInfo,1, str ) ; 1031 break ; 1032 } 1033 } 1034 } 1035 } 1036 snprintf(str,MXSTR,"%5d complete event roi %4d roiTM %d cpy %8d %5d", 1037 mBuffer[evID].evNum,roi[0],roi[8]-roi[0],qncpy,qnrun); 1038 factOut(kDebug,-1, str ) ; 1039 1040 //complete event read ---> flag for next processing 1041 evtCtrl.evtStat[ iDx ] = 99; 1042 gi.evtTot++ ; 1043 } 1044 1045 EndBuf: 1046 rd[i].bufTyp = 0 ; //ready to read next header 1047 rd[i].bufLen = frst_len ; 1048 rd[i].bufPos = 0 ; 1049 EndBuf1: 1050 ; 1051 } 1052 1053 } else { //we are reading event header 1054 rd[i].bufPos += jrd ; 1055 rd[i].bufLen -= jrd ; 1056 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action 1057 //check if startflag correct; else shift block .... 1058 for (k=0; k<rd[i].bufPos -1 ; k++) { 1059 if (rd[i].rBuf->B[k ] == start.B[1] 1060 && rd[i].rBuf->B[k+1] == start.B[0] ) break ; 1061 } 1062 rd[i].skip += k ; 1063 1064 if (k >= rd[i].bufPos-1 ) { //no start of header found 1065 rd[i].bufPos = 0 ; 1066 rd[i].bufLen = head_len ; 1067 } else if ( k>0 ) { 1068 rd[i].bufPos -= k ; 1069 rd[i].bufLen += k ; 1070 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ; 1173 } 1174 if (rd[i].bufPos >= minLen) { 1175 if (rd[i].skip > 0) { 1176 snprintf (str, MXSTR, "skipped %d bytes on port%d", 1177 rd[i].skip, i); 1178 factOut (kInfo, 666, str); 1179 rd[i].skip = 0; 1180 } 1181 goodhed++; 1182 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2; 1183 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]); 1184 rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]); 1185 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt) 1186 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt) 1187 rd[i].runID = ntohl (rd[i].rBuf->I[11]); 1188 rd[i].bufTyp = 1; //ready to read full record 1189 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; 1190 1191 int fadboard = ntohs (rd[i].rBuf->S[12]); 1192 int fadcrate = fadboard / 256; 1193 fadboard = (fadcrate * 10 + fadboard % 256); 1071 1194 #ifdef EVTDEBUG 1072 memcpy(&rd[i].xBuf->B[0], &rd[i].xBuf->B[k], rd[i].bufPos ) ; 1195 snprintf (str, MXSTR, 1196 "sk %3d head: %5d %5d %5d %10d %4d %6d", i, 1197 rd[i].fadLen, rd[i].evtID, rd[i].ftmID, 1198 rd[i].runID, fadboard, jrd); 1199 factOut (kDebug, 1, str); 1073 1200 #endif 1074 } 1075 if ( rd[i].bufPos >= minLen ) { 1076 if ( rd[i].skip >0 ) { 1077 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ; 1078 factOut(kInfo,666, str ) ; 1079 rd[i].skip = 0 ; 1080 } 1081 goodhed++; 1082 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ; 1083 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ; 1084 rd[i].ftmTyp = ntohl(rd[i].rBuf->S[5]) ; 1085 rd[i].ftmID = ntohl(rd[i].rBuf->I[3]) ; //(FTMevt) 1086 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt) 1087 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ; 1088 rd[i].bufTyp = 1 ; //ready to read full record 1089 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ; 1090 1091 int fadboard = ntohs(rd[i].rBuf->S[12] ) ; 1092 int fadcrate = fadboard/256 ; 1093 fadboard = (fadcrate*10 + fadboard%256) ; 1201 1202 if (rd[i].runID == 0) 1203 rd[i].runID = gi_myRun; 1204 1205 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) { 1206 snprintf (str, MXSTR, 1207 "illegal event-length on port %d", i); 1208 factOut (kFatal, 881, str); 1209 rd[i].bufLen = 100000; //? 1210 } 1211 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1212 debugHead (i, fadBoard, rd[i].rBuf); 1213 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event 1214 } else { 1215 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1216 } 1217 } else { 1218 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1219 } 1220 1221 } //end interpreting last read 1222 } //end of successful read anything 1223 } //finished trying to read all sockets 1224 1094 1225 #ifdef EVTDEBUG 1095 snprintf(str,MXSTR,"sk %3d head: %5d %5d %5d %10d %4d %6d",i,rd[i].fadLen,rd[i].evtID,rd[i].ftmID,rd[i].runID,fadboard,jrd);1096 factOut(kDebug,1, str );1226 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx); 1227 factOut (kDebug, -1, str); 1097 1228 #endif 1098 1229 1099 if (rd[i].runID ==0 ) rd[i].runID = gi_myRun ; 1100 1101 if (rd[i].bufLen <=head_len || rd[i].bufLen > MAX_LEN ) { 1102 snprintf(str,MXSTR,"illegal event-length on port %d",i) ; 1103 factOut(kFatal,881, str ) ; 1104 rd[i].bufLen = 100000 ; //? 1105 } 1106 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ; 1107 debugHead(i,fadBoard,rd[i].rBuf); 1108 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event 1109 } else { 1110 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1111 } 1112 } else { 1113 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1114 } 1115 1116 } //end interpreting last read 1117 } //end of successful read anything 1118 } //finished trying to read all sockets 1119 1120 #ifdef EVTDEBUG 1121 snprintf(str,MXSTR,"Loop ---- %3d --- %8d",numokx,jrdx); 1122 factOut(kDebug,-1, str ) ; 1123 #endif 1124 1125 gi.numRead[ numok ] ++ ; 1126 1127 g_actTime = time(NULL) ; 1128 if ( g_actTime > gi_SecTime ) { 1129 gi_SecTime = g_actTime ; 1130 1131 1132 //loop over all active events and flag those older than read-timeout 1133 //delete those that are written to disk .... 1134 1135 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1136 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1137 1138 gj.bufNew = gj.bufEvt = 0 ; 1139 int k1=evtCtrl.frstPtr; 1140 for ( k=k1; k<(k1+kd); k++ ) { 1141 int k0 = k % (MAX_EVT*MAX_RUN) ; 1230 gi.numRead[numok]++; 1231 1232 g_actTime = time (NULL); 1233 if (g_actTime > gi_SecTime) { 1234 gi_SecTime = g_actTime; 1235 1236 1237 //loop over all active events and flag those older than read-timeout 1238 //delete those that are written to disk .... 1239 1240 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1241 if (kd < 0) 1242 kd += (MAX_EVT * MAX_RUN); 1243 1244 gj.bufNew = gj.bufEvt = 0; 1245 int k1 = evtCtrl.frstPtr; 1246 for (k = k1; k < (k1 + kd); k++) { 1247 int k0 = k % (MAX_EVT * MAX_RUN); 1142 1248 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 1143 1249 1144 if (evtCtrl.evtStat[k0] > 0 1145 && evtCtrl.evtStat[k0] < 92 ) { 1146 gj.bufNew++ ; //incomplete event in Buffer 1147 if ( evtCtrl.evtStat[k0] < 90 1148 && evtCtrl.pcTime[k0] < g_actTime-10 ) { 1149 int id =evtCtrl.evtBuf[k0] ; 1150 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]); 1151 factOut(kWarn,601, str ) ; 1152 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events 1153 gi.evtSkp++ ; 1154 gi.evtTot++ ; 1155 gj.evtSkip++; 1156 } 1157 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete' 1158 || evtCtrl.evtStat[k0] == 0 ) { //'useless' 1159 1160 int id =evtCtrl.evtBuf[k0] ; 1161 snprintf(str,MXSTR,"%5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ; 1162 factOut(kDebug,-1, str ) ; 1163 mBufFree(id) ; //event written--> free memory 1164 evtCtrl.evtStat[k0] = -1; 1165 gj.evtWrite++ ; 1166 gj.rateWrite++ ; 1167 } else if (evtCtrl.evtStat[k0] >= 95 ) { 1168 gj.bufEvt++ ; //complete event in Buffer 1169 } 1170 1171 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) { 1172 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ; 1173 } 1174 } 1175 1176 1177 gj.deltaT = 1000 ; //temporary, must be improved 1178 1179 int b; 1180 for ( b=0; b<NBOARDS; b++) gj.totBytes[b] +=gj.rateBytes[b] ; 1181 gj.totMem = g_maxMem ; 1182 if (gj.maxMem > gj.xxxMem) gj.xxxMem = gj.maxMem ; 1183 if (gj.maxEvt > gj.xxxEvt) gj.xxxEvt = gj.maxEvt ; 1184 1185 factStat(gj); 1186 factStatNew(gi) ; 1187 gj.rateNew = gj.rateWrite = 0 ; 1188 gj.maxMem = gj.usdMem ; 1189 gj.maxEvt = gj.bufTot ; 1190 for ( b=0; b<NBOARDS; b++) gj.rateBytes[b] =0 ; 1191 } 1192 1193 if (numok > 0 ) numok2=0; 1194 else if (numok2++ > 3) { 1195 if (g_runStat == 1) { 1196 xwait.tv_sec = 1; 1197 xwait.tv_nsec= 0 ; // hibernate for 1 sec 1198 } else { 1199 xwait.tv_sec = 0; 1200 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1201 } 1202 nanosleep( &xwait , NULL ) ; 1203 } 1204 1205 } //and do next loop over all sockets ... 1206 1207 1208 snprintf(str,MXSTR,"stop reading ... RESET=%d",g_reset); 1209 factOut(kInfo,-1, str ) ; 1210 1211 if (g_reset >0 ) { 1212 gi_reset = g_reset ; 1213 gi_resetR = gi_reset%10 ; //shall we stop reading ? 1214 gi_resetS = (gi_reset/10)%10 ; //shall we close sockets ? 1215 gi_resetW = (gi_reset/100)%10 ; //shall we close files ? 1216 gi_resetX = gi_reset/1000 ; //shall we simply wait resetX seconds ? 1217 g_reset= 0; 1218 } else { 1219 gi_reset = 0; 1220 if ( g_runStat== -1 ) gi_resetR = 1 ; 1221 else gi_resetR = 7 ; 1222 gi_resetS = 7 ; //close all sockets 1223 gi_resetW = 7 ; //close all files 1224 gi_resetX = 0 ; 1225 1226 //inform others we have to quit .... 1227 gi_runStat = -11 ; //inform all that no update to happen any more 1228 gj.readStat= -11 ; //inform all that no update to happen any more 1229 } 1230 1231 if (gi_resetS > 0) { 1232 //must close all open sockets ... 1233 snprintf(str,MXSTR,"close all sockets ..."); 1234 factOut(kInfo,-1, str ) ; 1235 for (i=0; i<MAX_SOCK; i++) { 1236 if (rd[i].sockStat ==0 ) { 1237 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy open socket 1238 if ( i%7 == 0 ) { 1239 gi_NumConnect[ i/7 ]-- ; 1240 gi.numConn[ i/7 ]-- ; 1241 gj.numConn[ i/7 ]-- ; 1242 sockDef[i/7] = 0 ; //flag ro recreate the sockets ... 1243 rd[i/7].sockStat = -1; //and try to open asap 1244 } 1245 } 1246 } 1247 } 1248 1249 1250 if (gi_resetR > 0) { 1251 //flag all events as 'read finished' 1252 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1253 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1254 1255 int k1=evtCtrl.frstPtr; 1256 for ( k=k1; k<(k1+kd); k++ ) { 1257 int k0 = k % (MAX_EVT*MAX_RUN) ; 1258 if (evtCtrl.evtStat[k0] > 0 1259 && evtCtrl.evtStat[k0] < 90 ) { 1260 evtCtrl.evtStat[k0] = 91 ; 1261 gi.evtSkp++ ; 1262 gi.evtTot++ ; 1263 } 1264 } 1265 1266 xwait.tv_sec = 0; 1267 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1268 nanosleep( &xwait , NULL ) ; 1269 1270 //and clear all buffers (might have to wait until all others are done) 1271 int minclear ; 1272 if (gi_resetR == 1) { 1273 minclear = 900 ; 1274 snprintf(str,MXSTR,"drain all buffers ..."); 1275 } else { 1276 minclear = 0 ; 1277 snprintf(str,MXSTR,"flush all buffers ..."); 1278 } 1279 factOut(kInfo,-1, str ) ; 1280 1281 int numclear=1 ; 1282 while (numclear > 0 ) { 1283 numclear = 0 ; 1284 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1285 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1286 1287 int k1=evtCtrl.frstPtr; 1288 for ( k=k1; k<(k1+kd); k++ ) { 1289 int k0 = k % (MAX_EVT*MAX_RUN) ; 1290 if (evtCtrl.evtStat[k0] > minclear ) { 1291 int id =evtCtrl.evtBuf[k0] ; 1292 snprintf(str,MXSTR,"ev %5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ; 1293 factOut(kDebug,-1, str ) ; 1294 mBufFree(id) ; //event written--> free memory 1295 evtCtrl.evtStat[k0] = -1; 1296 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing... 1297 1298 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) 1299 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ; 1300 } 1301 1302 xwait.tv_sec = 0; 1303 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1304 nanosleep( &xwait , NULL ) ; 1305 } 1306 } 1307 1308 if (gi_reset > 0) { 1309 if (gi_resetW > 0) { 1310 CloseRunFile(0,0,0) ; //ask all Runs to be closed 1311 } 1312 if (gi_resetX > 0) { 1313 xwait.tv_sec = gi_resetX; 1314 xwait.tv_nsec= 0 ; 1315 nanosleep( &xwait , NULL ) ; 1316 } 1317 1318 snprintf(str,MXSTR,"Continue read Process ..."); 1319 factOut(kInfo,-1, str ) ; 1320 gi_reset = 0 ; 1321 goto START ; 1322 } 1323 1324 1325 1326 snprintf(str,MXSTR,"Exit read Process ..."); 1327 factOut(kInfo,-1, str ) ; 1328 gi_runStat = -99 ; 1329 gj.readStat= -99 ; 1330 factStat(gj); 1331 factStatNew(gi) ; 1332 return 0; 1250 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) { 1251 gj.bufNew++; //incomplete event in Buffer 1252 if (evtCtrl.evtStat[k0] < 90 1253 && evtCtrl.pcTime[k0] < g_actTime - 10) { 1254 int id = evtCtrl.evtBuf[k0]; 1255 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d", 1256 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0, 1257 evtCtrl.evtStat[k0]); 1258 factOut (kWarn, 601, str); 1259 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events 1260 gi.evtSkp++; 1261 gi.evtTot++; 1262 gj.evtSkip++; 1263 } 1264 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete' 1265 || evtCtrl.evtStat[k0] == 0) { //'useless' 1266 1267 int id = evtCtrl.evtBuf[k0]; 1268 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d", 1269 mBuffer[id].evNum, mBuffer[id].nBoard); 1270 factOut (kDebug, -1, str); 1271 mBufFree (id); //event written--> free memory 1272 evtCtrl.evtStat[k0] = -1; 1273 gj.evtWrite++; 1274 gj.rateWrite++; 1275 } else if (evtCtrl.evtStat[k0] >= 95) { 1276 gj.bufEvt++; //complete event in Buffer 1277 } 1278 1279 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) { 1280 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN); 1281 } 1282 } 1283 1284 1285 gj.deltaT = 1000; //temporary, must be improved 1286 1287 int b; 1288 for (b = 0; b < NBOARDS; b++) 1289 gj.totBytes[b] += gj.rateBytes[b]; 1290 gj.totMem = g_maxMem; 1291 if (gj.maxMem > gj.xxxMem) 1292 gj.xxxMem = gj.maxMem; 1293 if (gj.maxEvt > gj.xxxEvt) 1294 gj.xxxEvt = gj.maxEvt; 1295 1296 factStat (gj); 1297 factStatNew (gi); 1298 gj.rateNew = gj.rateWrite = 0; 1299 gj.maxMem = gj.usdMem; 1300 gj.maxEvt = gj.bufTot; 1301 for (b = 0; b < NBOARDS; b++) 1302 gj.rateBytes[b] = 0; 1303 } 1304 1305 if (numok > 0) 1306 numok2 = 0; 1307 else if (numok2++ > 3) { 1308 if (g_runStat == 1) { 1309 xwait.tv_sec = 1; 1310 xwait.tv_nsec = 0; // hibernate for 1 sec 1311 } else { 1312 xwait.tv_sec = 0; 1313 xwait.tv_nsec = 2000000; // sleep for ~2 msec 1314 } 1315 nanosleep (&xwait, NULL); 1316 } 1317 1318 } //and do next loop over all sockets ... 1319 1320 1321 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset); 1322 factOut (kInfo, -1, str); 1323 1324 if (g_reset > 0) { 1325 gi_reset = g_reset; 1326 gi_resetR = gi_reset % 10; //shall we stop reading ? 1327 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ? 1328 gi_resetW = (gi_reset / 100) % 10; //shall we close files ? 1329 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ? 1330 g_reset = 0; 1331 } else { 1332 gi_reset = 0; 1333 if (g_runStat == -1) 1334 gi_resetR = 1; 1335 else 1336 gi_resetR = 7; 1337 gi_resetS = 7; //close all sockets 1338 gi_resetW = 7; //close all files 1339 gi_resetX = 0; 1340 1341 //inform others we have to quit .... 1342 gi_runStat = -11; //inform all that no update to happen any more 1343 gj.readStat = -11; //inform all that no update to happen any more 1344 } 1345 1346 if (gi_resetS > 0) { 1347 //must close all open sockets ... 1348 snprintf (str, MXSTR, "close all sockets ..."); 1349 factOut (kInfo, -1, str); 1350 for (i = 0; i < MAX_SOCK; i++) { 1351 if (rd[i].sockStat == 0) { 1352 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket 1353 if (i % 7 == 0) { 1354 gi_NumConnect[i / 7]--; 1355 gi.numConn[i / 7]--; 1356 gj.numConn[i / 7]--; 1357 sockDef[i / 7] = 0; //flag ro recreate the sockets ... 1358 rd[i / 7].sockStat = -1; //and try to open asap 1359 } 1360 } 1361 } 1362 } 1363 1364 1365 if (gi_resetR > 0) { 1366 //flag all events as 'read finished' 1367 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1368 if (kd < 0) 1369 kd += (MAX_EVT * MAX_RUN); 1370 1371 int k1 = evtCtrl.frstPtr; 1372 for (k = k1; k < (k1 + kd); k++) { 1373 int k0 = k % (MAX_EVT * MAX_RUN); 1374 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) { 1375 evtCtrl.evtStat[k0] = 91; 1376 gi.evtSkp++; 1377 gi.evtTot++; 1378 } 1379 } 1380 1381 xwait.tv_sec = 0; 1382 xwait.tv_nsec = 2000000; // sleep for ~2 msec 1383 nanosleep (&xwait, NULL); 1384 1385 //and clear all buffers (might have to wait until all others are done) 1386 int minclear; 1387 if (gi_resetR == 1) { 1388 minclear = 900; 1389 snprintf (str, MXSTR, "drain all buffers ..."); 1390 } else { 1391 minclear = 0; 1392 snprintf (str, MXSTR, "flush all buffers ..."); 1393 } 1394 factOut (kInfo, -1, str); 1395 1396 int numclear = 1; 1397 while (numclear > 0) { 1398 numclear = 0; 1399 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1400 if (kd < 0) 1401 kd += (MAX_EVT * MAX_RUN); 1402 1403 int k1 = evtCtrl.frstPtr; 1404 for (k = k1; k < (k1 + kd); k++) { 1405 int k0 = k % (MAX_EVT * MAX_RUN); 1406 if (evtCtrl.evtStat[k0] > minclear) { 1407 int id = evtCtrl.evtBuf[k0]; 1408 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d", 1409 mBuffer[id].evNum, mBuffer[id].nBoard); 1410 factOut (kDebug, -1, str); 1411 mBufFree (id); //event written--> free memory 1412 evtCtrl.evtStat[k0] = -1; 1413 } else if (evtCtrl.evtStat[k0] > 0) 1414 numclear++; //writing is still ongoing... 1415 1416 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) 1417 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN); 1418 } 1419 1420 xwait.tv_sec = 0; 1421 xwait.tv_nsec = 2000000; // sleep for ~2 msec 1422 nanosleep (&xwait, NULL); 1423 } 1424 } 1425 1426 if (gi_reset > 0) { 1427 if (gi_resetW > 0) { 1428 CloseRunFile (0, 0, 0); //ask all Runs to be closed 1429 } 1430 if (gi_resetX > 0) { 1431 xwait.tv_sec = gi_resetX; 1432 xwait.tv_nsec = 0; 1433 nanosleep (&xwait, NULL); 1434 } 1435 1436 snprintf (str, MXSTR, "Continue read Process ..."); 1437 factOut (kInfo, -1, str); 1438 gi_reset = 0; 1439 goto START; 1440 } 1441 1442 1443 1444 snprintf (str, MXSTR, "Exit read Process ..."); 1445 factOut (kInfo, -1, str); 1446 gi_runStat = -99; 1447 gj.readStat = -99; 1448 factStat (gj); 1449 factStatNew (gi); 1450 return 0; 1333 1451 1334 1452 } /*-----------------------------------------------------------------*/ 1335 1453 1336 1454 1337 void *subProc( void *thrid ) { 1338 int threadID,status,numWait,numProc,kd,k1,k0,k,jret; 1339 struct timespec xwait ; 1340 1341 threadID= (int) thrid; 1342 1343 snprintf(str,MXSTR,"Starting sub-process-thread %d",threadID); 1344 factOut(kInfo,-1, str ) ; 1345 1346 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 1347 numWait = numProc = 0 ; 1348 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1349 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1350 1351 int k1=evtCtrl.frstPtr; 1352 for ( k=k1; k<(k1+kd); k++ ) { 1353 int k0 = k % (MAX_EVT*MAX_RUN) ; 1354 1355 if (evtCtrl.evtStat[k0] ==1000+threadID) { 1356 if ( gi_resetR > 1 ) { //we are asked to flush buffers asap 1357 jret= 9100 ; //flag to be deleted 1358 } else { 1359 int id = evtCtrl.evtBuf[k0] ; 1360 jret=subProcEvt(threadID, mBuffer[id].FADhead, mBuffer[id].fEvent, mBuffer[id].buffer) ; 1361 if (jret <= threadID) { 1362 snprintf(str,MXSTR,"process %d wants to send to process %d",threadID,jret) ; 1363 factOut(kError,-1, str ) ; 1364 jret = 5300; 1365 } else if ( jret <=0 ) jret = 9200+threadID ; //flag as 'to be deleted' 1366 else if ( jret >=gi_maxProc ) jret = 5200+threadID ; //flag as 'to be written' 1367 else jret = 1000+jret ; //flag for next proces 1368 } 1369 evtCtrl.evtStat[k0] = jret ; 1370 numProc++ ; 1371 } else if (evtCtrl.evtStat[k0] <1000+threadID) numWait++ ; 1372 } 1373 1374 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do 1375 snprintf(str,MXSTR,"Exit subProcessing Process %d",threadID); 1376 factOut(kInfo,-1, str ) ; 1377 return 0 ; 1378 } 1379 if (numProc == 0) { 1380 //seems we have nothing to do, so sleep a little 1381 xwait.tv_sec = 0; 1382 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1383 nanosleep( &xwait , NULL ) ; 1384 } 1385 } 1386 1387 snprintf(str,MXSTR,"Ending sub-process-thread %d",threadID); 1388 factOut(kInfo,-1, str ) ; 1389 return ; 1455 void * 1456 subProc (void *thrid) 1457 { 1458 int threadID, status, numWait, numProc, kd, k1, k0, k, jret; 1459 struct timespec xwait; 1460 1461 threadID = (int) thrid; 1462 1463 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID); 1464 factOut (kInfo, -1, str); 1465 1466 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 1467 numWait = numProc = 0; 1468 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1469 if (kd < 0) 1470 kd += (MAX_EVT * MAX_RUN); 1471 1472 int k1 = evtCtrl.frstPtr; 1473 for (k = k1; k < (k1 + kd); k++) { 1474 int k0 = k % (MAX_EVT * MAX_RUN); 1475 1476 if (evtCtrl.evtStat[k0] == 1000 + threadID) { 1477 if (gi_resetR > 1) { //we are asked to flush buffers asap 1478 jret = 9100; //flag to be deleted 1479 } else { 1480 int id = evtCtrl.evtBuf[k0]; 1481 jret = 1482 subProcEvt (threadID, mBuffer[id].FADhead, 1483 mBuffer[id].fEvent, mBuffer[id].buffer); 1484 if (jret <= threadID) { 1485 snprintf (str, MXSTR, 1486 "process %d wants to send to process %d", 1487 threadID, jret); 1488 factOut (kError, -1, str); 1489 jret = 5300; 1490 } else if (jret <= 0) 1491 jret = 9200 + threadID; //flag as 'to be deleted' 1492 else if (jret >= gi_maxProc) 1493 jret = 5200 + threadID; //flag as 'to be written' 1494 else 1495 jret = 1000 + jret; //flag for next proces 1496 } 1497 evtCtrl.evtStat[k0] = jret; 1498 numProc++; 1499 } else if (evtCtrl.evtStat[k0] < 1000 + threadID) 1500 numWait++; 1501 } 1502 1503 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 1504 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID); 1505 factOut (kInfo, -1, str); 1506 return 0; 1507 } 1508 if (numProc == 0) { 1509 //seems we have nothing to do, so sleep a little 1510 xwait.tv_sec = 0; 1511 xwait.tv_nsec = 2000000; // sleep for ~2 msec 1512 nanosleep (&xwait, NULL); 1513 } 1514 } 1515 1516 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID); 1517 factOut (kInfo, -1, str); 1518 return; 1390 1519 } /*-----------------------------------------------------------------*/ 1391 1520 1392 1521 1393 void *procEvt( void *ptr ) { 1522 void * 1523 procEvt (void *ptr) 1524 { 1394 1525 /* *** main loop processing file, including SW-trigger */ 1395 int numProc, numWait ; 1396 int k, status ; 1397 struct timespec xwait ; 1398 char str[MXSTR] ; 1399 1400 1401 1402 1403 cpu_set_t mask; 1404 int cpu = 1 ; //process thread (will be several in final version) 1405 1406 snprintf(str,MXSTR,"Starting process-thread with %d subprocess",gi_maxProc); 1407 factOut(kInfo,-1, str ) ; 1526 int numProc, numWait; 1527 int k, status, j; 1528 struct timespec xwait; 1529 char str[MXSTR]; 1530 1531 1532 1533 int lastRun = 0; //usually run from last event still valid 1534 1535 cpu_set_t mask; 1536 int cpu = 1; //process thread (will be several in final version) 1537 1538 snprintf (str, MXSTR, "Starting process-thread with %d subprocess", 1539 gi_maxProc); 1540 factOut (kInfo, -1, str); 1408 1541 1409 1542 /* CPU_ZERO initializes all the bits in the mask to zero. */ 1410 CPU_ZERO ( &mask);1543 CPU_ZERO (&mask); 1411 1544 /* CPU_SET sets only the bit corresponding to cpu. */ 1412 1545 // CPU_SET( 0 , &mask ); leave for system 1413 1546 // CPU_SET( 1 , &mask ); used by write process 1414 CPU_SET ( 2 , &mask);1415 CPU_SET ( 3 , &mask);1416 CPU_SET ( 4 , &mask);1417 CPU_SET ( 5 , &mask);1418 CPU_SET ( 6 , &mask);1547 CPU_SET (2, &mask); 1548 CPU_SET (3, &mask); 1549 CPU_SET (4, &mask); 1550 CPU_SET (5, &mask); 1551 CPU_SET (6, &mask); 1419 1552 // CPU_SET( 7 , &mask ); used by read process 1420 1553 /* sched_setaffinity returns 0 in success */ 1421 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) { 1422 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu); 1423 factOut(kWarn,-1, str ) ; 1424 } 1425 1426 1427 pthread_t thread[100] ; 1428 int th_ret[100]; 1429 1430 for (k=0; k < gi_maxProc; k++) { 1431 th_ret[k] = pthread_create( &thread[k], NULL, subProc, (void *)k ); 1432 } 1433 1434 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 1435 1436 numWait = numProc = 0 ; 1437 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1438 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1439 1440 int k1=evtCtrl.frstPtr; 1441 for ( k=k1; k<(k1+kd); k++ ) { 1442 int k0 = k % (MAX_EVT*MAX_RUN) ; 1554 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { 1555 snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu); 1556 factOut (kWarn, -1, str); 1557 } 1558 1559 1560 pthread_t thread[100]; 1561 int th_ret[100]; 1562 1563 for (k = 0; k < gi_maxProc; k++) { 1564 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k); 1565 } 1566 1567 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 1568 1569 numWait = numProc = 0; 1570 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1571 if (kd < 0) 1572 kd += (MAX_EVT * MAX_RUN); 1573 1574 int k1 = evtCtrl.frstPtr; 1575 for (k = k1; k < (k1 + kd); k++) { 1576 int k0 = k % (MAX_EVT * MAX_RUN); 1443 1577 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 1444 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <1000) { 1445 1446 if ( gi_resetR > 1 ) { //we are asked to flush buffers asap 1447 evtCtrl.evtStat[k0] = 9991 ; 1448 } else { 1449 1450 int id = evtCtrl.evtBuf[k0] ; 1451 int ievt = mBuffer[id].evNum ; 1452 int itevt= mBuffer[id].trgNum ; 1453 int itrg = mBuffer[id].trgTyp ; 1454 int roi = mBuffer[id].nRoi ; 1455 int roiTM= mBuffer[id].nRoiTM ; 1456 // uint32_t irun = mBuffer[id].runNum ; 1457 //snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ; 1458 //factOut(kDebug,-1, str ) ; 1578 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) { 1579 1580 if (gi_resetR > 1) { //we are asked to flush buffers asap 1581 evtCtrl.evtStat[k0] = 9991; 1582 } else { 1583 1584 //-------- it is better to open the run already here, so call can be used to initialize 1585 //-------- buffers etc. needed to interprete run (e.g. DRS calibration) 1586 int id = evtCtrl.evtBuf[k0]; 1587 uint32_t irun = mBuffer[id].runNum; 1588 int32_t ievt = mBuffer[id].evNum; 1589 if (runCtrl[lastRun].runId == irun) { 1590 j = lastRun; 1591 } else { 1592 //check which fileID to use (or open if needed) 1593 for (j = 0; j < MAX_RUN; j++) { 1594 if (runCtrl[j].runId == irun) 1595 break; 1596 } 1597 if (j >= MAX_RUN) { 1598 snprintf (str, MXSTR, 1599 "P error: can not find run %d for event %d in %d", 1600 irun, ievt, id); 1601 factOut (kFatal, 901, str); 1602 } 1603 lastRun = j; 1604 } 1605 1606 if (runCtrl[j].fileId < 0) { 1607 //---- we need to open a new run ==> make sure all older runs are 1608 //---- finished and marked to be closed .... 1609 int j1; 1610 for (j1 = 0; j1 < MAX_RUN; j1++) { 1611 if (runCtrl[j1].fileId == 0) { 1612 runCtrl[j1].procId = 2; //--> do no longer accept events for processing 1613 //---- problem: processing still going on ==> must wait for closing .... 1614 snprintf (str, MXSTR, 1615 "P finish run since new one opened %d", 1616 runCtrl[j1].runId); 1617 runFinish1 (runCtrl[j1].runId); 1618 } 1619 1620 } 1621 1622 actRun.Version = 1; 1623 actRun.RunType = -1; //to be adapted 1624 1625 actRun.Nroi = runCtrl[j].roi0; 1626 actRun.NroiTM = runCtrl[j].roi8; 1627 if (actRun.Nroi == actRun.NroiTM) 1628 actRun.NroiTM = 0; 1629 actRun.RunTime = runCtrl[j].firstTime; 1630 actRun.RunUsec = runCtrl[j].firstTime; 1631 actRun.NBoard = NBOARDS; 1632 actRun.NPix = NPIX; 1633 actRun.NTm = NTMARK; 1634 actRun.Nroi = mBuffer[id].nRoi; 1635 memcpy (actRun.FADhead, mBuffer[id].FADhead, 1636 NBOARDS * sizeof (PEVNT_HEADER)); 1637 1638 runCtrl[j].fileHd = 1639 runOpen (irun, &actRun, sizeof (actRun)); 1640 if (runCtrl[j].fileHd == NULL) { 1641 snprintf (str, MXSTR, 1642 "P could not open a file for run %d", irun); 1643 factOut (kError, 502, str); 1644 runCtrl[j].fileId = 91; 1645 runCtrl[j].procId = 91; 1646 } else { 1647 snprintf (str, MXSTR, "P opened new run_file %d evt %d", 1648 irun, ievt); 1649 factOut (kInfo, -1, str); 1650 runCtrl[j].fileId = 0; 1651 runCtrl[j].procId = 0; 1652 } 1653 1654 } 1655 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) 1656 if (runCtrl[j].procId == 0) { 1657 if (runCtrl[j].closeTime < g_actTime 1658 || runCtrl[j].lastTime < g_actTime - 300 1659 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) { 1660 snprintf (str, MXSTR, 1661 "P reached end of run condition for run %d", 1662 irun); 1663 factOut (kInfo, 502, str); 1664 runFinish1 (runCtrl[j].runId); 1665 runCtrl[j].procId = 1; 1666 } 1667 } 1668 if (runCtrl[j].procId != 0) { 1669 snprintf (str, MXSTR, 1670 "P skip event %d because no active run %d", ievt, 1671 irun); 1672 factOut (kInfo, 502, str); 1673 evtCtrl.evtStat[k0] = 9091; 1674 } else { 1675 //-------- 1676 //-------- 1677 int id = evtCtrl.evtBuf[k0]; 1678 int itevt = mBuffer[id].trgNum; 1679 int itrg = mBuffer[id].trgTyp; 1680 int roi = mBuffer[id].nRoi; 1681 int roiTM = mBuffer[id].nRoiTM; 1459 1682 1460 1683 //make sure unused pixels/tmarks are cleared to zero 1461 if (roiTM == roi) roiTM=0 ; 1462 int ip,it,dest,ib; 1463 for (ip=0; ip<NPIX; ip++) { 1464 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) { 1465 dest= ip*roi ; 1466 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ; 1467 } 1468 } 1469 for (it=0; it<NTMARK; it++) { 1470 if (mBuffer[id].fEvent->StartTM[it] == -1 ) { 1471 dest= it*roi + NPIX*roi ; 1472 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ; 1473 } 1474 } 1684 if (roiTM == roi) 1685 roiTM = 0; 1686 int ip, it, dest, ib; 1687 for (ip = 0; ip < NPIX; ip++) { 1688 if (mBuffer[id].fEvent->StartPix[ip] == -1) { 1689 dest = ip * roi; 1690 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2); 1691 } 1692 } 1693 for (it = 0; it < NTMARK; it++) { 1694 if (mBuffer[id].fEvent->StartTM[it] == -1) { 1695 dest = it * roi + NPIX * roi; 1696 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2); 1697 } 1698 } 1475 1699 1476 1700 1477 1701 //and set correct event header ; also check for consistency in event (not yet) 1478 mBuffer[id].fEvent->Roi = roi ; 1479 mBuffer[id].fEvent->RoiTM = roiTM ; 1480 mBuffer[id].fEvent->EventNum = ievt ; 1481 mBuffer[id].fEvent->TriggerNum = itevt ; 1482 mBuffer[id].fEvent->TriggerType = itrg ; 1483 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0] ; 1484 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1] ; 1485 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2] ; 1486 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3] ; 1487 mBuffer[id].fEvent->SoftTrig = 0 ; 1488 1489 1490 for (ib=0; ib<NBOARDS; ib++) { 1491 if (mBuffer[id].board[ib] == -1 ) { //board is not read 1492 mBuffer[id].FADhead[ib].start_package_flag = 0 ; 1493 mBuffer[id].fEvent->BoardTime[ib] = 0 ; 1494 } else { 1495 mBuffer[id].fEvent->BoardTime[ib] = 1496 ntohl(mBuffer[id].FADhead[ib].time) ; 1497 } 1498 } 1499 1500 int i=eventCheck(mBuffer[id].runNum, mBuffer[id].FADhead, 1501 mBuffer[id].fEvent) ; 1502 // gj.procEvt++ ; 1503 gi.procTot++ ; 1504 numProc++ ; 1505 1506 if (i<0) { 1507 evtCtrl.evtStat[k0] = 9999 ; //flag event to be skipped 1508 gi.procErr++ ; 1509 } else { 1510 evtCtrl.evtStat[k0] = 1000 ; 1511 } 1702 mBuffer[id].fEvent->Roi = roi; 1703 mBuffer[id].fEvent->RoiTM = roiTM; 1704 mBuffer[id].fEvent->EventNum = ievt; 1705 mBuffer[id].fEvent->TriggerNum = itevt; 1706 mBuffer[id].fEvent->TriggerType = itrg; 1707 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0]; 1708 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1]; 1709 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2]; 1710 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3]; 1711 mBuffer[id].fEvent->SoftTrig = 0; 1712 1713 1714 for (ib = 0; ib < NBOARDS; ib++) { 1715 if (mBuffer[id].board[ib] == -1) { //board is not read 1716 mBuffer[id].FADhead[ib].start_package_flag = 0; 1717 mBuffer[id].fEvent->BoardTime[ib] = 0; 1718 } else { 1719 mBuffer[id].fEvent->BoardTime[ib] = 1720 ntohl (mBuffer[id].FADhead[ib].time); 1721 } 1722 } 1723 1724 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead, 1725 mBuffer[id].fEvent); 1726 gi.procTot++; 1727 numProc++; 1728 1729 if (i < 0) { 1730 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped 1731 gi.procErr++; 1732 } else { 1733 evtCtrl.evtStat[k0] = 1000; 1734 runCtrl[j].procEvt++; 1735 } 1736 } 1737 } 1738 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) { 1739 numWait++; 1512 1740 } 1513 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) { 1514 numWait++ ; 1515 } 1516 } 1517 1518 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do 1519 snprintf(str,MXSTR,"Exit Processing Process ..."); 1520 factOut(kInfo,-1, str ) ; 1521 gp_runStat = -22 ; //==> we should exit 1522 gj.procStat= -22 ; //==> we should exit 1523 return 0 ; 1524 } 1525 1526 if (numProc == 0) { 1527 //seems we have nothing to do, so sleep a little 1528 xwait.tv_sec = 0; 1529 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1530 nanosleep( &xwait , NULL ) ; 1531 } 1532 gp_runStat = gi_runStat ; 1533 gj.procStat= gj.readStat ; 1534 1535 } 1536 1537 //we are asked to abort asap ==> must flag all remaining events 1538 // when gi_runStat claims that all events are in the buffer... 1539 1540 snprintf(str,MXSTR,"Abort Processing Process ..."); 1541 factOut(kInfo,-1, str ) ; 1542 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1543 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1544 1545 for (k=0; k<gi_maxProc; k++) { 1546 pthread_join( thread[k], (void **)&status) ; 1547 } 1548 1549 int k1=evtCtrl.frstPtr; 1550 for ( k=k1; k<(k1+kd); k++ ) { 1551 int k0 = k % (MAX_EVT*MAX_RUN) ; 1552 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <1000) { 1553 evtCtrl.evtStat[k0] = 9800 ; //flag event as 'processed' 1554 } 1555 } 1556 1557 gp_runStat = -99 ; 1558 gj.procStat= -99 ; 1559 1560 return 0; 1561 1741 } 1742 1743 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 1744 snprintf (str, MXSTR, "Exit Processing Process ..."); 1745 factOut (kInfo, -1, str); 1746 gp_runStat = -22; //==> we should exit 1747 gj.procStat = -22; //==> we should exit 1748 return 0; 1749 } 1750 1751 if (numProc == 0) { 1752 //seems we have nothing to do, so sleep a little 1753 xwait.tv_sec = 0; 1754 xwait.tv_nsec = 2000000; // sleep for ~2 msec 1755 nanosleep (&xwait, NULL); 1756 } 1757 gp_runStat = gi_runStat; 1758 gj.procStat = gj.readStat; 1759 1760 } 1761 1762 //we are asked to abort asap ==> must flag all remaining events 1763 // when gi_runStat claims that all events are in the buffer... 1764 1765 snprintf (str, MXSTR, "Abort Processing Process ..."); 1766 factOut (kInfo, -1, str); 1767 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1768 if (kd < 0) 1769 kd += (MAX_EVT * MAX_RUN); 1770 1771 for (k = 0; k < gi_maxProc; k++) { 1772 pthread_join (thread[k], (void **) &status); 1773 } 1774 1775 int k1 = evtCtrl.frstPtr; 1776 for (k = k1; k < (k1 + kd); k++) { 1777 int k0 = k % (MAX_EVT * MAX_RUN); 1778 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) { 1779 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed' 1780 } 1781 } 1782 1783 gp_runStat = -99; 1784 gj.procStat = -99; 1785 1786 return 0; 1787 1562 1788 } /*-----------------------------------------------------------------*/ 1563 1789 1564 int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt) { 1790 int 1791 CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt) 1792 { 1565 1793 /* close run runId (all all runs if runId=0) */ 1566 1794 /* return: 0=close scheduled / >0 already closed / <0 does not exist */ 1567 int j;1568 1569 1570 if ( runId == 0) {1571 for ( j=0; j<MAX_RUN; j++) {1572 if ( runCtrl[j].fileId == 0) { //run is open1573 runCtrl[j].closeTime = closeTime;1574 runCtrl[j].maxEvt = maxEvt;1575 }1576 }1577 return 0;1578 }1579 1580 for ( j=0; j<MAX_RUN; j++) {1581 if ( runCtrl[j].runId == runId) {1582 if ( runCtrl[j].fileId == 0) { //run is open1583 runCtrl[j].closeTime = closeTime;1584 runCtrl[j].maxEvt = maxEvt;1585 return 0;1586 } else if ( runCtrl[j].fileId <0 ) {//run not yet opened1587 runCtrl[j].closeTime = closeTime;1588 runCtrl[j].maxEvt = maxEvt;1589 return +1;1590 } else {// run already closed1591 return +2;1592 }1593 }1594 }//we only reach here if the run was never created1595 return -1;1795 int j; 1796 1797 1798 if (runId == 0) { 1799 for (j = 0; j < MAX_RUN; j++) { 1800 if (runCtrl[j].fileId == 0) { //run is open 1801 runCtrl[j].closeTime = closeTime; 1802 runCtrl[j].maxEvt = maxEvt; 1803 } 1804 } 1805 return 0; 1806 } 1807 1808 for (j = 0; j < MAX_RUN; j++) { 1809 if (runCtrl[j].runId == runId) { 1810 if (runCtrl[j].fileId == 0) { //run is open 1811 runCtrl[j].closeTime = closeTime; 1812 runCtrl[j].maxEvt = maxEvt; 1813 return 0; 1814 } else if (runCtrl[j].fileId < 0) { //run not yet opened 1815 runCtrl[j].closeTime = closeTime; 1816 runCtrl[j].maxEvt = maxEvt; 1817 return +1; 1818 } else { // run already closed 1819 return +2; 1820 } 1821 } 1822 } //we only reach here if the run was never created 1823 return -1; 1596 1824 1597 1825 } /*-----------------------------------------------------------------*/ 1598 1826 1599 1827 1600 void *writeEvt( void *ptr ) { 1828 void * 1829 writeEvt (void *ptr) 1830 { 1601 1831 /* *** main loop writing event (including opening and closing run-files */ 1602 1832 1603 int numWrite, numWait;1604 int k,j,i;1605 struct timespec xwait;1606 char str[MXSTR];1607 1608 cpu_set_t mask;1609 int cpu = 1 ;//write thread1610 1611 snprintf(str,MXSTR,"Starting write-thread");1612 factOut(kInfo,-1, str );1833 int numWrite, numWait; 1834 int k, j, i; 1835 struct timespec xwait; 1836 char str[MXSTR]; 1837 1838 cpu_set_t mask; 1839 int cpu = 1; //write thread 1840 1841 snprintf (str, MXSTR, "Starting write-thread"); 1842 factOut (kInfo, -1, str); 1613 1843 1614 1844 /* CPU_ZERO initializes all the bits in the mask to zero. */ 1615 CPU_ZERO ( &mask);1845 CPU_ZERO (&mask); 1616 1846 /* CPU_SET sets only the bit corresponding to cpu. */ 1617 CPU_SET ( cpu, &mask);1847 CPU_SET (cpu, &mask); 1618 1848 /* sched_setaffinity returns 0 in success */ 1619 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) { 1620 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu); 1621 } 1622 1623 int lastRun = 0 ; //usually run from last event still valid 1624 1625 while (g_runStat >-2) { 1626 1627 numWait = numWrite = 0 ; 1628 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1629 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1630 1631 int k1=evtCtrl.frstPtr; 1632 for ( k=k1; k<(k1+kd); k++ ) { 1633 int k0 = k % (MAX_EVT*MAX_RUN) ; 1849 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { 1850 snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); 1851 } 1852 1853 int lastRun = 0; //usually run from last event still valid 1854 1855 while (g_runStat > -2) { 1856 1857 numWait = numWrite = 0; 1858 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1859 if (kd < 0) 1860 kd += (MAX_EVT * MAX_RUN); 1861 1862 int k1 = evtCtrl.frstPtr; 1863 for (k = k1; k < (k1 + kd); k++) { 1864 int k0 = k % (MAX_EVT * MAX_RUN); 1634 1865 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 1635 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9900) { 1636 1637 if ( gi_resetR > 1 ) { //we must drain the buffer asap 1638 evtCtrl.evtStat[k0] = 9904 ; 1866 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9900) { 1867 1868 if (gi_resetR > 1) { //we must drain the buffer asap 1869 evtCtrl.evtStat[k0] = 9904; 1870 } else { 1871 1872 1873 int id = evtCtrl.evtBuf[k0]; 1874 uint32_t irun = mBuffer[id].runNum; 1875 int32_t ievt = mBuffer[id].evNum; 1876 1877 gi.wrtTot++; 1878 if (runCtrl[lastRun].runId == irun) { 1879 j = lastRun; 1880 } else { 1881 //check which fileID to use (or open if needed) 1882 for (j = 0; j < MAX_RUN; j++) { 1883 if (runCtrl[j].runId == irun) 1884 break; 1885 } 1886 if (j >= MAX_RUN) { 1887 snprintf (str, MXSTR, 1888 "W error: can not find run %d for event %d in %d", 1889 irun, ievt, id); 1890 factOut (kFatal, 901, str); 1891 gi.wrtErr++; 1892 } 1893 lastRun = j; 1894 } 1895 1896 if (runCtrl[j].fileId < 0) { 1897 actRun.Version = 1; 1898 actRun.RunType = -1; //to be adapted 1899 1900 actRun.Nroi = runCtrl[j].roi0; 1901 actRun.NroiTM = runCtrl[j].roi8; 1902 if (actRun.Nroi == actRun.NroiTM) 1903 actRun.NroiTM = 0; 1904 actRun.RunTime = runCtrl[j].firstTime; 1905 actRun.RunUsec = runCtrl[j].firstTime; 1906 actRun.NBoard = NBOARDS; 1907 actRun.NPix = NPIX; 1908 actRun.NTm = NTMARK; 1909 actRun.Nroi = mBuffer[id].nRoi; 1910 memcpy (actRun.FADhead, mBuffer[id].FADhead, 1911 NBOARDS * sizeof (PEVNT_HEADER)); 1912 1913 runCtrl[j].fileHd = 1914 runOpen (irun, &actRun, sizeof (actRun)); 1915 if (runCtrl[j].fileHd == NULL) { 1916 snprintf (str, MXSTR, 1917 "W could not open a file for run %d", irun); 1918 factOut (kError, 502, str); 1919 runCtrl[j].fileId = 91; 1920 } else { 1921 snprintf (str, MXSTR, "W opened new run_file %d evt %d", 1922 irun, ievt); 1923 factOut (kInfo, -1, str); 1924 runCtrl[j].fileId = 0; 1925 } 1926 1927 } 1928 1929 if (runCtrl[j].fileId != 0) { 1930 if (runCtrl[j].fileId < 0) { 1931 snprintf (str, MXSTR, 1932 "W never opened file for this run %d", irun); 1933 factOut (kError, 123, str); 1934 } else if (runCtrl[j].fileId < 100) { 1935 snprintf (str, MXSTR, "W.file for this run is closed %d", 1936 irun); 1937 factOut (kWarn, 123, str); 1938 runCtrl[j].fileId += 100; 1939 } else { 1940 snprintf (str, MXSTR, "W:file for this run is closed %d", 1941 irun); 1942 factOut (kDebug, 123, str); 1943 } 1944 evtCtrl.evtStat[k0] = 9903; 1945 gi.wrtErr++; 1946 } else { 1947 i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent, 1948 sizeof (mBuffer[id])); 1949 if (i >= 0) { 1950 runCtrl[j].lastTime = g_actTime; 1951 runCtrl[j].actEvt++; 1952 evtCtrl.evtStat[k0] = 9901; 1953 snprintf (str, MXSTR, 1954 "%5d successfully wrote for run %d id %5d", 1955 ievt, irun, k0); 1956 factOut (kDebug, 504, str); 1957 // gj.writEvt++ ; 1958 } else { 1959 snprintf (str, MXSTR, "W error writing event for run %d", 1960 irun); 1961 factOut (kError, 503, str); 1962 evtCtrl.evtStat[k0] = 9902; 1963 gi.wrtErr++; 1964 } 1965 1966 if (i < 0 1967 || runCtrl[j].lastTime < g_actTime - 300 1968 || runCtrl[j].closeTime < g_actTime 1969 || runCtrl[j].maxEvt < runCtrl[j].actEvt) { 1970 int ii = 0; 1971 if (i < 0) 1972 ii = 1; 1973 else if (runCtrl[j].closeTime < g_actTime) 1974 ii = 2; 1975 else if (runCtrl[j].lastTime < g_actTime - 300) 1976 ii = 3; 1977 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt) 1978 ii = 4; 1979 1980 1981 1982 //close run for whatever reason 1983 if (runCtrl[j].runId == gi_myRun) 1984 gi_myRun = g_actTime; 1985 1986 if (runCtrl[j].procId == 0) { 1987 runFinish1 (runCtrl[j].runId); 1988 runCtrl[j].procId = 92; 1989 } 1990 1991 runCtrl[j].closeTime = g_actTime - 1; 1992 i = runClose (runCtrl[j].fileHd, &runTail[j], 1993 sizeof (runTail[j])); 1994 if (i < 0) { 1995 snprintf (str, MXSTR, "error closing run %d %d AAA", 1996 runCtrl[j].runId, i); 1997 factOut (kError, 503, str); 1998 runCtrl[j].fileId = 92; 1999 } else { 2000 snprintf (str, MXSTR, "W closed run %d AAA %d", irun, 2001 ii); 2002 factOut (kInfo, 503, str); 2003 runCtrl[j].fileId = 93; 2004 } 2005 } 2006 } 2007 } 2008 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000) 2009 numWait++; 2010 } 2011 2012 //check if we should close a run (mainly when no event pending) 2013 for (j = 0; j < MAX_RUN; j++) { 2014 if (runCtrl[j].fileId == 0 2015 && (runCtrl[j].closeTime < g_actTime 2016 || runCtrl[j].lastTime < g_actTime - 300 2017 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) { 2018 if (runCtrl[j].runId == gi_myRun) 2019 gi_myRun = g_actTime; 2020 int ii = 0; 2021 if (runCtrl[j].closeTime < g_actTime) 2022 ii = 2; 2023 else if (runCtrl[j].lastTime < g_actTime - 300) 2024 ii = 3; 2025 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt) 2026 ii = 4; 2027 2028 if (runCtrl[j].procId == 0) { 2029 runFinish1 (runCtrl[j].runId); 2030 runCtrl[j].procId = 92; 2031 } 2032 2033 runCtrl[j].closeTime = g_actTime - 1; 2034 i = runClose (runCtrl[j].fileHd, &runTail[j], 2035 sizeof (runTail[j])); 2036 if (i < 0) { 2037 snprintf (str, MXSTR, "error closing run %d %d BBB", 2038 runCtrl[j].runId, i); 2039 factOut (kError, 506, str); 2040 runCtrl[j].fileId = 94; 2041 } else { 2042 snprintf (str, MXSTR, "W closed run %d BBB %d", 2043 runCtrl[j].runId, ii); 2044 factOut (kInfo, 507, str); 2045 runCtrl[j].fileId = 95; 2046 } 2047 } 2048 } 2049 2050 if (numWrite == 0) { 2051 //seems we have nothing to do, so sleep a little 2052 xwait.tv_sec = 0; 2053 xwait.tv_nsec = 2000000; // sleep for ~2 msec 2054 nanosleep (&xwait, NULL); 2055 } 2056 2057 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 2058 snprintf (str, MXSTR, "Finish Write Process ..."); 2059 factOut (kInfo, -1, str); 2060 gw_runStat = -22; //==> we should exit 2061 gj.writStat = -22; //==> we should exit 2062 goto closerun; 2063 } 2064 gw_runStat = gi_runStat; 2065 gj.writStat = gj.readStat; 2066 2067 } 2068 2069 //must close all open files .... 2070 snprintf (str, MXSTR, "Abort Writing Process ..."); 2071 factOut (kInfo, -1, str); 2072 2073 closerun: 2074 snprintf (str, MXSTR, "Close all open files ..."); 2075 factOut (kInfo, -1, str); 2076 for (j = 0; j < MAX_RUN; j++) 2077 if (runCtrl[j].fileId == 0) { 2078 if (runCtrl[j].runId == gi_myRun) 2079 gi_myRun = g_actTime; 2080 2081 if (runCtrl[j].procId == 0) { 2082 runFinish (runCtrl[j].runId); 2083 runCtrl[j].procId = 92; 2084 } 2085 2086 runCtrl[j].closeTime = g_actTime - 1; 2087 i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j])); 2088 int ii = 0; 2089 if (runCtrl[j].closeTime < g_actTime) 2090 ii = 2; 2091 else if (runCtrl[j].lastTime < g_actTime - 300) 2092 ii = 3; 2093 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt) 2094 ii = 4; 2095 if (i < 0) { 2096 snprintf (str, MXSTR, "error closing run %d %d CCC", 2097 runCtrl[j].runId, i); 2098 factOut (kError, 506, str); 2099 runCtrl[j].fileId = 96; 1639 2100 } else { 1640 1641 1642 int id = evtCtrl.evtBuf[k0] ; 1643 uint32_t irun = mBuffer[id].runNum ; 1644 int ievt = mBuffer[id].evNum ; 1645 1646 gi.wrtTot++ ; 1647 if (runCtrl[lastRun].runId == irun ) { 1648 j = lastRun ; 1649 } else { 1650 //check which fileID to use (or open if needed) 1651 for ( j=0; j<MAX_RUN; j++) { 1652 if ( runCtrl[j].runId == irun ) break ; 1653 } 1654 if ( j >= MAX_RUN ) { 1655 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id); 1656 factOut(kFatal,901, str ) ; 1657 gi.wrtErr++ ; 1658 } 1659 lastRun = j ; 1660 } 1661 1662 if (runCtrl[j].fileId < 0 ) { 1663 actRun.Version = 1 ; 1664 actRun.RunType = -1 ; //to be adapted 1665 1666 actRun.Nroi = runCtrl[j].roi0 ; 1667 actRun.NroiTM = runCtrl[j].roi8 ; 1668 if ( actRun.Nroi == actRun.NroiTM ) actRun.NroiTM = 0 ; 1669 actRun.RunTime = runCtrl[j].firstTime ; 1670 actRun.RunUsec = runCtrl[j].firstTime ; 1671 actRun.NBoard = NBOARDS ; 1672 actRun.NPix = NPIX ; 1673 actRun.NTm = NTMARK ; 1674 actRun.Nroi = mBuffer[id].nRoi ; 1675 memcpy(actRun.FADhead, mBuffer[id].FADhead, NBOARDS* sizeof(PEVNT_HEADER) ) ; 1676 1677 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ; 1678 if (runCtrl[j].fileHd == NULL ) { 1679 snprintf(str,MXSTR,"W could not open a file for run %d",irun); 1680 factOut(kError,502, str ) ; 1681 runCtrl[j].fileId = 91 ; 1682 } else { 1683 snprintf(str,MXSTR,"W opened new run_file %d evt %d",irun,ievt) ; 1684 factOut(kInfo,-1, str ) ; 1685 runCtrl[j].fileId = 0 ; 1686 } 1687 1688 } 1689 1690 if (runCtrl[j].fileId != 0 ) { 1691 if (runCtrl[j].fileId < 0 ) { 1692 snprintf(str,MXSTR,"W never opened file for this run %d",irun) ; 1693 factOut(kError,123,str) ; 1694 } else if (runCtrl[j].fileId < 100 ) { 1695 snprintf(str,MXSTR,"W.file for this run is closed %d",irun) ; 1696 factOut(kWarn,123,str) ; 1697 runCtrl[j].fileId += 100 ; 1698 } else { 1699 snprintf(str,MXSTR,"W:file for this run is closed %d",irun) ; 1700 factOut(kDebug,123,str) ; 1701 } 1702 evtCtrl.evtStat[k0] = 9903 ; 1703 gi.wrtErr++ ; 1704 } else { 1705 i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) ); 1706 if ( i>=0 ) { 1707 runCtrl[j].lastTime = g_actTime; 1708 runCtrl[j].actEvt++ ; 1709 evtCtrl.evtStat[k0] = 9901 ; 1710 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0); 1711 factOut(kDebug,504, str ) ; 1712 // gj.writEvt++ ; 1713 } else { 1714 snprintf(str,MXSTR,"W error writing event for run %d",irun) ; 1715 factOut(kError,503, str ) ; 1716 evtCtrl.evtStat[k0] = 9902 ; 1717 gi.wrtErr++ ; 1718 } 1719 1720 if ( i < 0 1721 || runCtrl[j].lastTime < g_actTime-300 1722 || runCtrl[j].closeTime < g_actTime 1723 || runCtrl[j].maxEvt < runCtrl[j].actEvt ) { 1724 int ii =0 ; 1725 if ( i < 0 ) ii=1 ; 1726 else if (runCtrl[j].closeTime < g_actTime ) ii=2 ; 1727 else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ; 1728 else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ; 1729 1730 1731 1732 //close run for whatever reason 1733 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ; 1734 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) ); 1735 if (i<0) { 1736 snprintf(str,MXSTR,"error closing run %d %d AAA",runCtrl[j].runId,i) ; 1737 factOut(kError,503, str ) ; 1738 runCtrl[j].fileId = 92 ; 1739 } else { 1740 snprintf(str,MXSTR,"W closed run %d AAA %d",irun,ii) ; 1741 factOut(kInfo,503, str ) ; 1742 runCtrl[j].fileId = 93 ; 1743 } 1744 } 1745 } 2101 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId, 2102 ii); 2103 factOut (kInfo, 507, str); 2104 runCtrl[j].fileId = 97; 1746 2105 } 1747 } else if (evtCtrl.evtStat[k0] > 0 1748 && evtCtrl.evtStat[k0] < 9000 ) numWait++ ; 1749 } 1750 1751 //check if we should close a run (mainly when no event pending) 1752 for ( j=0; j<MAX_RUN; j++) { 1753 if ( runCtrl[j].fileId==0 1754 && ( runCtrl[j].closeTime < g_actTime 1755 ||runCtrl[j].lastTime < g_actTime-300 1756 ||runCtrl[j].maxEvt < runCtrl[j].actEvt ) ) { 1757 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ; 1758 int ii =0 ; 1759 if (runCtrl[j].closeTime < g_actTime ) ii=2 ; 1760 else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ; 1761 else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ; 1762 1763 1764 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) ); 1765 if (i<0) { 1766 snprintf(str,MXSTR,"error closing run %d %d BBB",runCtrl[j].runId,i) ; 1767 factOut(kError,506, str ) ; 1768 runCtrl[j].fileId = 94 ; 1769 } else { 1770 snprintf(str,MXSTR,"W closed run %d BBB %d",runCtrl[j].runId,ii) ; 1771 factOut(kInfo,507, str ) ; 1772 runCtrl[j].fileId = 95 ; 1773 } 1774 } 1775 } 1776 1777 if (numWrite == 0) { 1778 //seems we have nothing to do, so sleep a little 1779 xwait.tv_sec = 0; 1780 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1781 nanosleep( &xwait , NULL ) ; 1782 } 1783 1784 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do 1785 snprintf(str,MXSTR,"Finish Write Process ..."); 1786 factOut(kInfo,-1, str ) ; 1787 gw_runStat = -22 ; //==> we should exit 1788 gj.writStat= -22 ; //==> we should exit 1789 goto closerun ; 1790 } 1791 gw_runStat = gi_runStat ; 1792 gj.writStat= gj.readStat ; 1793 1794 } 1795 1796 //must close all open files .... 1797 snprintf(str,MXSTR,"Abort Writing Process ..."); 1798 factOut(kInfo,-1, str ) ; 1799 1800 closerun: 1801 snprintf(str,MXSTR,"Close all open files ..."); 1802 factOut(kInfo,-1, str ) ; 1803 for ( j=0; j<MAX_RUN; j++) 1804 if ( runCtrl[j].fileId ==0 ) { 1805 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ; 1806 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) ); 1807 int ii =0 ; 1808 if (runCtrl[j].closeTime < g_actTime ) ii=2 ; 1809 else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ; 1810 else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ; 1811 if (i<0) { 1812 snprintf(str,MXSTR,"error closing run %d %d CCC",runCtrl[j].runId,i) ; 1813 factOut(kError,506, str ) ; 1814 runCtrl[j].fileId = 96 ; 1815 } else { 1816 snprintf(str,MXSTR,"W closed run %d CCC %d",runCtrl[j].runId,ii) ; 1817 factOut(kInfo,507, str ) ; 1818 runCtrl[j].fileId = 97 ; 1819 } 1820 } 1821 1822 gw_runStat = -99; 1823 gj.writStat= -99; 1824 snprintf(str,MXSTR,"Exit Writing Process ..."); 1825 factOut(kInfo,-1, str ) ; 1826 return 0; 1827 1828 1829 2106 } 2107 2108 gw_runStat = -99; 2109 gj.writStat = -99; 2110 snprintf (str, MXSTR, "Exit Writing Process ..."); 2111 factOut (kInfo, -1, str); 2112 return 0; 2113 2114 2115 1830 2116 1831 2117 } /*-----------------------------------------------------------------*/ … … 1834 2120 1835 2121 1836 void StartEvtBuild() { 1837 1838 int i,j,imax,status,th_ret[50] ; 1839 pthread_t thread[50] ; 1840 struct timespec xwait ; 1841 1842 gi_runStat = gp_runStat = gw_runStat = 0 ; 1843 gj.readStat= gj.procStat= gj.writStat= 0 ; 1844 1845 snprintf(str,MXSTR,"Starting EventBuilder V15.07 A"); 1846 factOut(kInfo,-1, str ) ; 2122 void 2123 StartEvtBuild () 2124 { 2125 2126 int i, j, imax, status, th_ret[50]; 2127 pthread_t thread[50]; 2128 struct timespec xwait; 2129 2130 gi_runStat = gp_runStat = gw_runStat = 0; 2131 gj.readStat = gj.procStat = gj.writStat = 0; 2132 2133 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A"); 2134 factOut (kInfo, -1, str); 1847 2135 1848 2136 //initialize run control logics 1849 for (i=0; i<MAX_RUN; i++) {1850 runCtrl[i].runId = 0;1851 runCtrl[i].fileId = -2;1852 }2137 for (i = 0; i < MAX_RUN; i++) { 2138 runCtrl[i].runId = 0; 2139 runCtrl[i].fileId = -2; 2140 } 1853 2141 1854 2142 //prepare for subProcesses 1855 gi_maxSize = g_maxSize;1856 if (gi_maxSize <=0 ) gi_maxSize = 1 ;1857 1858 gi_maxProc = g_maxProc ; 1859 if (gi_maxProc <=0 || gi_maxProc>90) {1860 snprintf(str,MXSTR,"illegal number of processes %d",gi_maxProc ) ;1861 factOut(kFatal,301, str );1862 gi_maxProc=1;1863 }1864 2143 gi_maxSize = g_maxSize; 2144 if (gi_maxSize <= 0) 2145 gi_maxSize = 1; 2146 2147 gi_maxProc = g_maxProc; 2148 if (gi_maxProc <= 0 || gi_maxProc > 90) { 2149 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc); 2150 factOut (kFatal, 301, str); 2151 gi_maxProc = 1; 2152 } 1865 2153 //partially initialize event control logics 1866 evtCtrl.frstPtr = 0;1867 evtCtrl.lastPtr = 0;2154 evtCtrl.frstPtr = 0; 2155 evtCtrl.lastPtr = 0; 1868 2156 1869 2157 //start all threads (more to come) when we are allowed to .... 1870 while (g_runStat == 0) {1871 xwait.tv_sec = 0;1872 xwait.tv_nsec= 10000000 ;// sleep for ~10 msec1873 nanosleep( &xwait , NULL );1874 }1875 1876 i=0;1877 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL);1878 i++;1879 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL);1880 i++;1881 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL);1882 i++;1883 imax=i;2158 while (g_runStat == 0) { 2159 xwait.tv_sec = 0; 2160 xwait.tv_nsec = 10000000; // sleep for ~10 msec 2161 nanosleep (&xwait, NULL); 2162 } 2163 2164 i = 0; 2165 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL); 2166 i++; 2167 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL); 2168 i++; 2169 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL); 2170 i++; 2171 imax = i; 1884 2172 1885 2173 1886 2174 #ifdef BILAND 1887 1888 xwait.tv_nsec= 0 ;// sleep for ~20sec1889 nanosleep( &xwait , NULL );1890 1891 printf("close all runs in 2 seconds\n");1892 1893 CloseRunFile( 0, time(NULL)+2, 0);1894 1895 1896 xwait.tv_nsec= 0 ;// sleep for ~20sec1897 nanosleep( &xwait , NULL );1898 1899 printf("setting g_runstat to -1\n");1900 1901 g_runStat = -1;2175 xwait.tv_sec = 30;; 2176 xwait.tv_nsec = 0; // sleep for ~20sec 2177 nanosleep (&xwait, NULL); 2178 2179 printf ("close all runs in 2 seconds\n"); 2180 2181 CloseRunFile (0, time (NULL) + 2, 0); 2182 2183 xwait.tv_sec = 1;; 2184 xwait.tv_nsec = 0; // sleep for ~20sec 2185 nanosleep (&xwait, NULL); 2186 2187 printf ("setting g_runstat to -1\n"); 2188 2189 g_runStat = -1; 1902 2190 #endif 1903 2191 1904 2192 1905 2193 //wait for all threads to finish 1906 for (i=0; i<imax; i++) {1907 j = pthread_join ( thread[i], (void **)&status);1908 }2194 for (i = 0; i < imax; i++) { 2195 j = pthread_join (thread[i], (void **) &status); 2196 } 1909 2197 1910 2198 } /*-----------------------------------------------------------------*/ … … 1932 2220 #ifdef BILAND 1933 2221 1934 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer) 1935 { 1936 printf("called subproc %d\n",threadID) ; 1937 return threadID+1 ; 2222 int 2223 subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, 2224 int8_t * buffer) 2225 { 2226 printf ("called subproc %d\n", threadID); 2227 return threadID + 1; 1938 2228 } 1939 2229 … … 1947 2237 /*-----------------------------------------------------------------*/ 1948 2238 1949 1950 1951 1952 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) 1953 { return 1; } ; 1954 1955 int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) 1956 { return 1; usleep(10000); return 1; } 2239 2240 2241 2242 FileHandle_t 2243 runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len) 2244 { 2245 return 1; 2246 }; 2247 2248 int 2249 runWrite (FileHandle_t fileHd, EVENT * event, size_t len) 2250 { 2251 return 1; 2252 usleep (10000); 2253 return 1; 2254 } 1957 2255 1958 2256 1959 2257 //{ return 1; } ; 1960 2258 1961 int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) 1962 { return 1; } ; 1963 1964 1965 1966 1967 int eventCheck( uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event) 1968 { 1969 int i=0; 2259 int 2260 runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len) 2261 { 2262 return 1; 2263 }; 2264 2265 2266 2267 2268 int 2269 eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event) 2270 { 2271 int i = 0; 1970 2272 1971 2273 // printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) ); … … 1977 2279 1978 2280 1979 void factStatNew(EVT_STAT gi) { 1980 int i ; 2281 void 2282 factStatNew (EVT_STAT gi) 2283 { 2284 int i; 1981 2285 1982 2286 //for (i=0;i<MAX_SOCK;i++) { … … 1986 2290 } 1987 2291 1988 void gotNewRun( int runnr, PEVNT_HEADER *headers ) 1989 { printf("got new run %d\n",runnr); return; } 1990 1991 void factStat(GUI_STAT gj) { 2292 void 2293 gotNewRun (int runnr, PEVNT_HEADER * headers) 2294 { 2295 printf ("got new run %d\n", runnr); 2296 return; 2297 } 2298 2299 void 2300 factStat (GUI_STAT gj) 2301 { 1992 2302 // printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n", 1993 2303 // array[0],array[1],array[2],array[3],array[4],array[5],array[6]); … … 1995 2305 1996 2306 1997 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) { 2307 void 2308 debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, 2309 int state, uint32_t tsec, uint32_t tusec) 2310 { 1998 2311 // printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ; 1999 2312 } … … 2001 2314 2002 2315 2003 void debugStream(int isock, void *buf, int len) { 2316 void 2317 debugStream (int isock, void *buf, int len) 2318 { 2004 2319 } 2005 2320 2006 void debugHead(int i, int j, void *buf) { 2321 void 2322 debugHead (int i, int j, void *buf) 2323 { 2007 2324 } 2008 2325 2009 2326 2010 void factOut(int severity, int err, char* message ) { 2011 static FILE * fd ; 2012 static int file=0 ; 2013 2014 if (file==0) { 2015 printf("open file\n"); 2016 fd=fopen("x.out","w+") ; 2017 file=999; 2018 } 2019 2020 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ; 2021 2022 if (severity != kDebug) 2023 printf("%3d %3d | %s\n",severity,err,message) ; 2327 void 2328 factOut (int severity, int err, char *message) 2329 { 2330 static FILE *fd; 2331 static int file = 0; 2332 2333 if (file == 0) { 2334 printf ("open file\n"); 2335 fd = fopen ("x.out", "w+"); 2336 file = 999; 2337 } 2338 2339 fprintf (fd, "%3d %3d | %s \n", severity, err, message); 2340 2341 if (severity != kDebug) 2342 printf ("%3d %3d | %s\n", severity, err, message); 2024 2343 } 2025 2344 2026 2345 2027 2346 2028 int main() { 2029 int i,b,c,p ; 2030 char ipStr[100] ; 2031 struct in_addr IPaddr ; 2032 2033 g_maxMem = 1024*1024 ; //MBytes 2347 int 2348 main () 2349 { 2350 int i, b, c, p; 2351 char ipStr[100]; 2352 struct in_addr IPaddr; 2353 2354 g_maxMem = 1024 * 1024; //MBytes 2034 2355 //g_maxMem = g_maxMem * 1024 *10 ; //10GBytes 2035 g_maxMem = g_maxMem * 200;//100MBytes2036 2037 g_maxProc = 20;2038 g_maxSize = 30000;2039 2040 g_runStat = 40;2041 2042 i=0;2356 g_maxMem = g_maxMem * 200; //100MBytes 2357 2358 g_maxProc = 20; 2359 g_maxSize = 30000; 2360 2361 g_runStat = 40; 2362 2363 i = 0; 2043 2364 2044 2365 // version for standard crates … … 2058 2379 // 2059 2380 //version for PC-test * 2060 for (c=0; c<4; c++) { 2061 for (b=0; b<10; b++) { 2062 sprintf(ipStr,"10.0.%d.11",128+c) ; 2063 if (c<2) sprintf(ipStr,"10.0.%d.11",128) ; 2064 else sprintf(ipStr,"10.0.%d.11",131) ; 2381 for (c = 0; c < 4; c++) { 2382 for (b = 0; b < 10; b++) { 2383 sprintf (ipStr, "10.0.%d.11", 128 + c); 2384 if (c < 2) 2385 sprintf (ipStr, "10.0.%d.11", 128); 2386 else 2387 sprintf (ipStr, "10.0.%d.11", 131); 2065 2388 // if (c==0) sprintf(ipStr,"10.0.100.11") ; 2066 2389 2067 inet_pton(PF_INET, ipStr, &IPaddr);2068 p = 31919+100*c+10*b;2069 2070 2071 g_port[i].sockAddr.sin_family = PF_INET;2072 g_port[i].sockAddr.sin_port = htons(p);2073 g_port[i].sockAddr.sin_addr = IPaddr;2074 g_port[i].sockDef = 1;2075 2076 i++;2077 }2078 }2390 inet_pton (PF_INET, ipStr, &IPaddr); 2391 p = 31919 + 100 * c + 10 * b; 2392 2393 2394 g_port[i].sockAddr.sin_family = PF_INET; 2395 g_port[i].sockAddr.sin_port = htons (p); 2396 g_port[i].sockAddr.sin_addr = IPaddr; 2397 g_port[i].sockDef = 1; 2398 2399 i++; 2400 } 2401 } 2079 2402 2080 2403 … … 2082 2405 //g_actBoards-- ; 2083 2406 2084 StartEvtBuild();2085 2086 return 0;2407 StartEvtBuild (); 2408 2409 return 0; 2087 2410 2088 2411 } -
trunk/FACT++/src/FAD.h
r11750 r11766 192 192 uint32_t maxEvt ; //maximum number of events to write 193 193 uint32_t actEvt ; //actual number of events written so far 194 uint32_t procEvt ; //actual number of events processed so far 194 195 int32_t lastEvt ; //last event of this run read so far 195 196 uint32_t nextEvt ; //next event number to be written 196 197 uint32_t waitEvt ; //event that would be ready to be written 197 int32_t fileId ; //<0 never opened, 0=open, >0 closed 198 int16_t fileId ; //<0 never opened, 0=open, >0 closed 199 int16_t procId ; //processing <0 never opened, 0=open, >0 closed 198 200 int16_t roi0 ; //roi for normal pixels 199 201 int16_t roi8 ; //roi for pixels8
Note:
See TracChangeset
for help on using the changeset viewer.