source: trunk/FACT++/src/ftm.cc@ 10699

Last change on this file since 10699 was 10691, checked in by tbretz, 14 years ago
Added some docu; renamed Coincidence to multiplicity.
File size: 20.5 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFTM.h"
15
16using namespace std;
17using namespace FTM;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26int Port = 0;
27
28// ------------------------------------------------------------------------
29
30
31// ------------------------------------------------------------------------
32
33class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
34{
35private:
36
37 double fStartTime;
38
39 void AsyncRead(ba::mutable_buffers_1 buffers)
40 {
41 ba::async_read(*this, buffers,
42 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
43 dummy::error, dummy::bytes_transferred));
44 }
45
46 void AsyncWrite(const ba::const_buffers_1 &buffers)
47 {
48 ba::async_write(*this, buffers,
49 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
50 dummy::error, dummy::bytes_transferred));
51 }
52 void AsyncWait(ba::deadline_timer &timer, int seconds,
53 void (tcp_connection::*handler)(const bs::error_code&))// const
54 {
55 timer.expires_from_now(boost::posix_time::seconds(seconds));
56 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
57 }
58
59 ba::deadline_timer deadline_;
60
61 ba::deadline_timer fTriggerDynData;
62
63 // The constructor is prvate to force the obtained pointer to be shared
64 tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
65 deadline_(ioservice), fTriggerDynData(ioservice)
66 {
67 deadline_.expires_at(boost::posix_time::pos_infin);
68
69 fHeader.fDelimiter=kDelimiterStart;
70 fHeader.fState=FTM::kFtmIdle;
71 fHeader.fBoardId=0xaffe;
72 fHeader.fFirmwareId=0x42;
73
74 fDelimiter = htons(kDelimiterEnd);
75
76 fStaticData.clear();
77
78 fStaticData.fMultiplicityPhysics = 1;
79 fStaticData.fMultiplicityCalib = 40;
80 fStaticData.fWindowCalib = 1;
81 fStaticData.fWindowPhysics = 0;
82 fStaticData.fDelayTrigger = 21;
83 fStaticData.fDelayTimeMarker = 42;
84 fStaticData.fDeadTime = 84;
85
86 fStaticData.fClockConditioner[0] = 100;
87 fStaticData.fClockConditioner[1] = 1;
88 fStaticData.fClockConditioner[2] = 8;
89 fStaticData.fClockConditioner[3] = 9;
90 fStaticData.fClockConditioner[4] = 11;
91 fStaticData.fClockConditioner[5] = 13;
92 fStaticData.fClockConditioner[6] = 14;
93 fStaticData.fClockConditioner[7] = 15;
94
95 fStaticData.fTriggerSequence = 1 | (2<<5) | (3<<10);
96
97 fStaticData.fGeneralSettings =
98 FTM::StaticData::kTrigger |
99 FTM::StaticData::kLPext |
100 FTM::StaticData::kPedestal;
101
102 fStaticData.fActiveFTU[0] = 0x3ff;
103 fStaticData.fActiveFTU[3] = 0x3ff;
104
105 for (int i=0; i<40; i++)
106 {
107 for (int p=0; p<4; p++)
108 fStaticData[i].fEnable[p] = 0x5555;
109
110 for (int p=0; p<5; p++)
111 fStaticData[i].fDAC[p] = (p+1)*10;
112
113 fStaticData[i].fPrescaling = 42;
114 }
115
116 for (unsigned long long i=0; i<40; i++)
117 {
118 fFtuList[i].fDNA = (i<<48)|(i<<32)|(i<<16)|i;
119 fFtuList[i].fPingAddr = (1<<8) | i;
120 }
121
122 fFtuList[1].fPingAddr = (1<<9) | 1;
123 fFtuList[0].fPingAddr = 0;
124
125 fFtuList.fNumBoards = 19;
126 fFtuList.fNumBoardsCrate[0] = 9;
127 fFtuList.fNumBoardsCrate[1] = 0;
128 fFtuList.fNumBoardsCrate[2] = 0;
129 fFtuList.fNumBoardsCrate[3] = 10;
130 }
131
132 // Callback when writing was successfull or failed
133 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
134 {
135 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
136 }
137
138 vector<uint16_t> fBufCommand;
139 vector<uint16_t> fBufHeader;
140 vector<uint16_t> fBufFtuList;
141 vector<uint16_t> fBufStaticData;
142 vector<uint16_t> fBufDynamicData;
143
144 vector<uint16_t> fCommand;
145 FTM::Header fHeader;
146 FTM::FtuList fFtuList;
147 FTM::StaticData fStaticData;
148 FTM::DynamicData fDynamicData;
149
150 //vector<uint16_t> fStaticData;
151
152 uint16_t fDelimiter;
153 uint16_t fBufRegister;
154
155 uint16_t fCounter;
156
157 bool fReportsDisabled;
158
159 void SendDynamicData()
160 {
161 if (fReportsDisabled)
162 return;
163
164 if (fHeader.fState == FTM::kFtmRunning)
165 fDynamicData.fOnTimeCounter = lrint(Time().UnixTime()-fStartTime);
166
167 fDynamicData.fTempSensor[0] = (23. + (6.*rand()/RAND_MAX-3))*10;
168 fDynamicData.fTempSensor[1] = (55. + (6.*rand()/RAND_MAX-3))*10;
169 fDynamicData.fTempSensor[2] = (39. + (6.*rand()/RAND_MAX-3))*10;
170 fDynamicData.fTempSensor[3] = (42. + (6.*rand()/RAND_MAX-3))*10;
171
172 for (int i=0; i<40; i++)
173 for (int p=0; p<4; p++)
174 fDynamicData[i].fRatePatch[p] = (1000 + (float(rand())/RAND_MAX-0.5)*25*p);
175
176 fHeader.fType=kDynamicData; // FtuList
177 fHeader.fDataSize=sizeof(FTM::DynamicData)/2+1;
178 fHeader.fTriggerCounter = fCounter++;
179 fHeader.fTimeStamp = lrint(Time().UnixTime());
180
181 fBufHeader = fHeader.HtoN();
182 fBufDynamicData = fDynamicData.HtoN();
183
184 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
185 AsyncWrite(ba::buffer(ba::const_buffer(&fBufDynamicData[0], sizeof(FTM::DynamicData))));
186 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
187 }
188
189 void SendStaticData()
190 {
191 fHeader.fType=kStaticData; // FtuList
192 fHeader.fDataSize=sizeof(FTM::StaticData)/2+1;
193 fHeader.fTriggerCounter = fCounter++;
194 fHeader.fTimeStamp = lrint(Time().UnixTime());
195
196 for (int i=0; i<4; i++)
197 fFtuList.fActiveFTU[i] = fStaticData.fActiveFTU[i];
198
199 fBufHeader = fHeader.HtoN();
200 fBufStaticData = fStaticData.HtoN();
201
202 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
203 AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[0], fBufStaticData.size()*2)));
204 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
205 }
206
207 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
208 {
209 // Do not schedule a new read if the connection failed.
210 if (bytes_received==0)
211 {
212 // Close the connection
213 close();
214 deadline_.cancel();
215 return;
216 }
217
218 // No command received yet
219 if (fCommand.size()==0)
220 {
221 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
222 fBufCommand.begin(), ntohs);
223
224 if (fBufCommand[0]!='@')
225 {
226 cout << "Inavlid command: 0x" << hex << fBufCommand[0] << dec << endl;
227 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
228 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
229 return;
230 }
231
232 switch (fBufCommand[1])
233 {
234 case kCmdToggleLed:
235 cout << "-> TOGGLE_LED" << endl;
236
237 fBufCommand.resize(5);
238 AsyncRead(ba::buffer(fBufCommand));
239 return;
240
241 case kCmdPing:
242 cout << "-> PING" << endl;
243
244 fHeader.fType=kFtuList; // FtuList
245 fHeader.fDataSize=sizeof(FTM::FtuList)/2+1;
246 fHeader.fTriggerCounter = fCounter++;
247 fHeader.fTimeStamp = lrint(Time().UnixTime());
248
249 fFtuList[1].fPingAddr = ((rand()&1)<<9) | 1;
250 fFtuList[0].fPingAddr = ((rand()&1)<<8);
251
252 fBufHeader = fHeader.HtoN();
253 fBufFtuList = fFtuList.HtoN();
254
255 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
256 AsyncWrite(ba::buffer(ba::const_buffer(&fBufFtuList[0], fBufFtuList.size()*2)));
257 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
258
259 fBufCommand.resize(5);
260 AsyncRead(ba::buffer(fBufCommand));
261 return;
262
263 case kCmdRead: // kCmdRead
264 cout << "-> READ" << endl;
265 switch (fBufCommand[2])
266 {
267 case kCmdStaticData:
268 cout << "-> STATIC" << endl;
269
270 SendStaticData();
271
272 fBufCommand.resize(5);
273 AsyncRead(ba::buffer(fBufCommand));
274
275 return;
276
277 case kCmdDynamicData:
278 cout << "-> DYNAMIC" << endl;
279
280 SendDynamicData();
281
282 fBufCommand.resize(5);
283 AsyncRead(ba::buffer(fBufCommand));
284
285 return;
286
287 case kCmdRegister:
288 fCommand = fBufCommand;
289 cout << "-> REGISTER" << endl;
290
291 fBufCommand.resize(1);
292 AsyncRead(ba::buffer(fBufCommand));
293 return;
294 }
295 break;
296
297
298 case kCmdWrite:
299 switch (fBufCommand[2])
300 {
301 case kCmdRegister:
302 fCommand = fBufCommand;
303 cout << "-> REGISTER" << endl;
304
305 fBufCommand.resize(2);
306 AsyncRead(ba::buffer(fBufCommand));
307 return;
308
309 case kCmdStaticData:
310 fCommand = fBufCommand;
311 cout << "-> STATIC DATA" << endl;
312
313 fBufCommand.resize(sizeof(StaticData)/2);
314 AsyncRead(ba::buffer(fBufCommand));
315 return;
316 }
317 break;
318
319 case kCmdDisableReports:
320 cout << "-> DISABLE REPORTS " << !fBufCommand[2] << endl;
321 fReportsDisabled = !fBufCommand[2];
322
323 fBufCommand.resize(5);
324 AsyncRead(ba::buffer(fBufCommand));
325 return;
326
327 case kCmdStartRun:
328 fHeader.fState = FTM::kFtmRunning;
329
330 fStartTime = Time().UnixTime();
331
332 fBufCommand.resize(5);
333 AsyncRead(ba::buffer(fBufCommand));
334 return;
335
336 case kCmdStopRun:
337 fHeader.fState = FTM::kFtmIdle;
338
339 fBufCommand.resize(5);
340 AsyncRead(ba::buffer(fBufCommand));
341 return;
342 }
343
344 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
345 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
346 return;
347 }
348
349 // Command data received
350
351 // Prepare reception of next command
352 switch (fCommand[1])
353 {
354 case kCmdRead: // kCmdRead
355 {
356 const uint16_t addr = ntohs(fBufCommand[0]);
357 const uint16_t val = reinterpret_cast<uint16_t*>(&fStaticData)[addr];
358
359 cout << "-> GET REGISTER[" << addr << "]=" << val << endl;
360
361 fHeader.fType=kRegister; // FtuList
362 fHeader.fDataSize=2;
363 fHeader.fTriggerCounter = fCounter++;
364 fHeader.fTimeStamp = lrint(Time().UnixTime());
365
366 fBufHeader = fHeader.HtoN();
367 fBufStaticData[addr] = htons(val);
368
369 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
370 AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[addr], 2)));
371 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
372 break;
373 }
374
375 case kCmdWrite:
376 switch (fCommand[2])
377 {
378 case kCmdRegister:
379 {
380 const uint16_t addr = ntohs(fBufCommand[0]);
381 const uint16_t val = ntohs(fBufCommand[1]);
382
383 cout << "-> SET REGISTER[" << addr << "]=" << val << endl;
384
385 reinterpret_cast<uint16_t*>(&fStaticData)[addr] = val;
386 }
387 break;
388
389 case kCmdStaticData:
390 {
391 cout << "-> SET STATIC DATA" << endl;
392 fStaticData = fBufCommand;
393 }
394 break;
395 }
396 break;
397 }
398
399 fCommand.resize(0);
400
401 fBufCommand.resize(5);
402 AsyncRead(ba::buffer(fBufCommand));
403 }
404
405 void check_deadline(const boost::system::error_code &)
406 {
407 if (!is_open())
408 {
409 // For example: Here we could schedule a new accept if we
410 // would not want to allow two connections at the same time.
411 return;
412 }
413
414 // Check whether the deadline has passed. We compare the deadline
415 // against the current time since a new asynchronous operation
416 // may have moved the deadline before this actor had a chance
417 // to run.
418 if (deadline_.expires_at() <= ba::deadline_timer::traits_type::now())
419 {
420 // The deadline has passed. Stop the session. The other
421 // actors will terminate as soon as possible.
422// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
423// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
424
425 return;
426 }
427
428 AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
429 }
430
431 void SendDynData(const boost::system::error_code &)
432 {
433 if (!is_open())
434 {
435 // For example: Here we could schedule a new accept if we
436 // would not want to allow two connections at the same time.
437 return;
438 }
439
440 // Check whether the deadline has passed. We compare the deadline
441 // against the current time since a new asynchronous operation
442 // may have moved the deadline before this actor had a chance
443 // to run.
444 if (deadline_.expires_at() <= ba::deadline_timer::traits_type::now())
445 return;
446
447 // The deadline has passed.
448 SendDynamicData();
449
450 AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
451 return;
452 }
453
454public:
455 typedef boost::shared_ptr<tcp_connection> shared_ptr;
456
457 static shared_ptr create(ba::io_service& io_service)
458 {
459 return shared_ptr(new tcp_connection(io_service));
460 }
461
462 void start()
463 {
464 // Ownership of buffer must be valid until Handler is called.
465
466 // Emit something to be written to the socket
467 fBufCommand.resize(5);
468 AsyncRead(ba::buffer(fBufCommand));
469
470 AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
471
472// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
473// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
474
475 }
476};
477
478
479class tcp_server : public tcp::acceptor
480{
481public:
482 tcp_server(ba::io_service& ioservice, int port) :
483 tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
484
485 {
486 // We could start listening for more than one connection
487 // here, but since there is only one handler executed each time
488 // it would not make sense. Before one handle_accept is not
489 // finished no new handle_accept will be called.
490 // Workround: Start a new thread in handle_accept
491 start_accept();
492 }
493
494private:
495 void start_accept()
496 {
497 cout << "Start accept..." << flush;
498 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/io_service());
499
500 // This will accept a connection without blocking
501 async_accept(*new_connection,
502 boost::bind(&tcp_server::handle_accept,
503 this,
504 new_connection,
505 ba::placeholders::error));
506
507 cout << "start-done." << endl;
508 }
509
510 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
511 {
512 // The connection has been accepted and is now ready to use
513
514 // not installing a new handler will stop run()
515 cout << "Handle accept..." << flush;
516 if (!error)
517 {
518 new_connection->start();
519
520 // The is now an open connection/server (tcp_connection)
521 // we immediatly schedule another connection
522 // This allowed two client-connection at the same time
523 start_accept();
524 }
525 cout << "handle-done." << endl;
526 }
527};
528
529int main(int argc, const char **argv)
530{
531 try
532 {
533 ba::io_service io_service;
534
535 Port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
536
537 tcp_server server(io_service, Port);
538 // ba::add_service(io_service, &server);
539 // server.add_service(...);
540 cout << "Run..." << flush;
541
542 // Calling run() from a single thread ensures no concurrent access
543 // of the handler which are called!!!
544 io_service.run();
545
546 cout << "end." << endl;
547 }
548 catch (std::exception& e)
549 {
550 std::cerr << e.what() << std::endl;
551 }
552
553 return 0;
554}
555/* ====================== Buffers ===========================
556
557char d1[128]; ba::buffer(d1));
558std::vector<char> d2(128); ba::buffer(d2);
559boost::array<char, 128> d3; by::buffer(d3);
560
561// --------------------------------
562char d1[128];
563std::vector<char> d2(128);
564boost::array<char, 128> d3;
565
566boost::array<mutable_buffer, 3> bufs1 = {
567 ba::buffer(d1),
568 ba::buffer(d2),
569 ba::buffer(d3) };
570sock.read(bufs1);
571
572std::vector<const_buffer> bufs2;
573bufs2.push_back(boost::asio::buffer(d1));
574bufs2.push_back(boost::asio::buffer(d2));
575bufs2.push_back(boost::asio::buffer(d3));
576sock.write(bufs2);
577
578
579// ======================= Read functions =========================
580
581ba::async_read_until --> delimiter
582
583streambuf buf; // Ensure validity until handler!
584by::async_read(s, buf, ....);
585
586ba::async_read(s, ba:buffer(data, size), handler);
587 // Single buffer
588 boost::asio::async_read(s,
589 ba::buffer(data, size),
590 compl-func --> ba::transfer_at_least(32),
591 handler);
592
593 // Multiple buffers
594boost::asio::async_read(s, buffers,
595 compl-func --> boost::asio::transfer_all(),
596 handler);
597 */
598
599// ================= Others ===============================
600
601 /*
602 strand Provides serialised handler execution.
603 work Class to inform the io_service when it has work to do.
604
605
606io_service::
607dispatch Request the io_service to invoke the given handler.
608poll Run the io_service's event processing loop to execute ready
609 handlers.
610poll_one Run the io_service's event processing loop to execute one ready
611 handler.
612post Request the io_service to invoke the given handler and return
613 immediately.
614reset Reset the io_service in preparation for a subsequent run()
615 invocation.
616run Run the io_service's event processing loop.
617run_one Run the io_service's event processing loop to execute at most
618 one handler.
619stop Stop the io_service's event processing loop.
620wrap Create a new handler that automatically dispatches the wrapped
621 handler on the io_service.
622
623strand:: The io_service::strand class provides the ability to
624 post and dispatch handlers with the guarantee that none
625 of those handlers will execute concurrently.
626
627dispatch Request the strand to invoke the given handler.
628get_io_service Get the io_service associated with the strand.
629post Request the strand to invoke the given handler and return
630 immediately.
631wrap Create a new handler that automatically dispatches the
632 wrapped handler on the strand.
633
634work:: The work class is used to inform the io_service when
635 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
636get_io_service Get the io_service associated with the work.
637work Constructor notifies the io_service that work is starting.
638
639*/
640
641
Note: See TracBrowser for help on using the repository browser.