Changeset 16618
- Timestamp:
- 06/03/13 11:10:01 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.cc
r16584 r16618 7 7 #include <cstdarg> 8 8 #include <list> 9 #include <queue> 9 10 10 11 #include <boost/algorithm/string/join.hpp> … … 25 26 //#define USE_SELECT 26 27 //#define COMPLETE_EPOLL 28 //#define PRIORITY_QUEUE 27 29 28 30 // Reading only 1024: 13: 77Hz, 87% … … 207 209 208 210 timeval time; 209 uint64_t rateBytes; 210 uint32_t skip; // number of bytes skipped before start of event 211 uint64_t totBytes; // total received bytes 212 uint64_t relBytes; // total released bytes 213 uint32_t skip; // number of bytes skipped before start of event 211 214 212 215 uint32_t len() const { return uint32_t(H.package_length)*2; } … … 217 220 // -------------------------------- 218 221 219 READ_STRUCT() : socket(-1), connected(false), rateBytes(0)222 READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0) 220 223 { 221 224 if (fd_epoll<0) … … 231 234 bool check(int, sockaddr_in addr); 232 235 bool read(); 236 233 237 }; 238 239 #ifdef PRIORITY_QUEUE 240 struct READ_STRUCTcomp 241 { 242 bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2) 243 { 244 const int64_t rel1 = r1->totBytes - r1->relBytes; 245 const int64_t rel2 = r2->totBytes - r2->relBytes; 246 return rel1 > rel2; 247 } 248 }; 249 #endif 234 250 235 251 int READ_STRUCT::wait() … … 382 398 } 383 399 384 bufPos = B; // no byte read so far 385 skip = 0; // start empty 400 bufPos = B; // no byte read so far 401 skip = 0; // start empty 402 totBytes = 0; 403 relBytes = 0; 386 404 387 405 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket); … … 427 445 } 428 446 429 rateBytes += jrd;447 totBytes += jrd; 430 448 431 449 // are we skipping this board ... … … 1076 1094 #endif 1077 1095 1078 #if defined(USE_POLL) 1096 #ifdef PRIORITY_QUEUE 1097 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio; 1098 1099 for (int i=0; i<NBOARDS; i++) 1100 if (rd[i].connected) 1101 prio.push(rd+i); 1102 1103 if (!prio.empty()) do 1104 #endif 1105 1106 1107 #ifdef USE_POLL 1079 1108 for (int jj=0; jj<nn; jj++) 1080 1109 #endif 1081 #if defined(USE_EPOLL)1110 #ifdef USE_EPOLL 1082 1111 for (int jj=0; jj<rc_epoll; jj++) 1083 1112 #endif 1084 #if !defined(USE_EPOLL) && !defined(USE_POLL) 1113 #if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE) 1085 1114 for (int jj=0; jj<NBOARDS; jj++) 1086 1115 #endif 1087 1116 { 1117 #ifdef PRIORITY_QUEUE 1118 READ_STRUCT *rs = prio.top(); 1119 #endif 1088 1120 #ifdef USE_SELECT 1089 1121 if (!FD_ISSET(rs->socket, &readfs)) … … 1106 1138 #endif 1107 1139 1108 #if !defined(USE_POLL) && !defined(USE_EPOLL) 1140 #if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE) 1109 1141 const int i = (jj%4)*10 + (jj/4); 1110 1142 READ_STRUCT *rs = &rd[i]; … … 1265 1297 primaryQueue.post(evt); 1266 1298 1299 // package_len is 0 if nothing was received. 1300 for (int ib=0; ib<40; ib++) 1301 rd[ib].relBytes += (*it)->FADhead[ib].package_length; 1302 1267 1303 it++; 1268 1304 evtCtrl.pop_front(); … … 1299 1335 #endif 1300 1336 } // end for loop over all sockets 1337 #ifdef PRIORITY_QUEUE 1338 while (0); // convert continue into break ;) 1339 #endif 1301 1340 1302 1341 // ================================================================== … … 1334 1373 const uint64_t rep = reportIncomplete(evt, "timeout"); 1335 1374 factReportIncomplete(rep); 1375 1376 // package_len is 0 when nothing was received from this board 1377 for (int ib=0; ib<40; ib++) 1378 rd[ib].relBytes += (*it)->FADhead[ib].package_length; 1336 1379 1337 1380 it++; … … 1354 1397 bool changed = false; 1355 1398 1399 static vector<uint64_t> store(NBOARDS); 1400 1356 1401 for (int ib=0; ib<NBOARDS; ib++) 1357 1402 { 1358 gj.rateBytes[ib] = rd[ib].rateBytes; 1359 gj.totBytes[ib] += rd[ib].rateBytes; 1403 gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib]; 1404 gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes; 1405 1406 store[ib] = rd[ib].totBytes; 1360 1407 1361 1408 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr)) … … 1369 1416 1370 1417 Memory::max_inuse = 0; 1371 1372 for (int ib=0; ib<NBOARDS; ib++)1373 rd[ib].rateBytes = 0;1374 1418 1375 1419 // =================================================================
Note:
See TracChangeset
for help on using the changeset viewer.