source: trunk/FACT++/src/ftm.cc@ 15462

Last change on this file since 15462 was 14976, checked in by tbretz, 12 years ago
Replaced deprecated io_service() by get_io_service()
File size: 22.4 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "Dim.h"
15#include "HeadersFTM.h"
16
17using namespace std;
18using namespace FTM;
19
20namespace ba = boost::asio;
21namespace bs = boost::system;
22namespace dummy = ba::placeholders;
23
24using boost::lexical_cast;
25using ba::ip::tcp;
26
27int Port = 0;
28
29// ------------------------------------------------------------------------
30
31
32// ------------------------------------------------------------------------
33
34class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
35{
36private:
37
38 double fStartTime;
39
40 void AsyncRead(ba::mutable_buffers_1 buffers)
41 {
42 ba::async_read(*this, buffers,
43 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
44 dummy::error, dummy::bytes_transferred));
45 }
46
47 void AsyncWrite(const ba::const_buffers_1 &buffers)
48 {
49 ba::async_write(*this, buffers,
50 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
51 dummy::error, dummy::bytes_transferred));
52 }
53 void AsyncWait(ba::deadline_timer &timer, int seconds,
54 void (tcp_connection::*handler)(const bs::error_code&))// const
55 {
56 timer.expires_from_now(boost::posix_time::milliseconds(seconds));
57 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
58 }
59
60 ba::deadline_timer fTriggerDynData;
61
62 // The constructor is prvate to force the obtained pointer to be shared
63 tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
64 fTriggerDynData(ioservice), fTriggerSendData(ioservice)
65 {
66 //deadline_.expires_at(boost::posix_time::pos_infin);
67
68 fHeader.fDelimiter=kDelimiterStart;
69 fHeader.fState=FTM::kFtmIdle|FTM::kFtmLocked;
70 fHeader.fBoardId=0xaffe;
71 fHeader.fFirmwareId=0x42;
72
73 fDelimiter = htons(kDelimiterEnd);
74
75 fStaticData.clear();
76
77 fStaticData.fMultiplicityPhysics = 1;
78 fStaticData.fMultiplicityCalib = 40;
79 fStaticData.fWindowCalib = 1;
80 fStaticData.fWindowPhysics = 0;
81 fStaticData.fDelayTrigger = 21;
82 fStaticData.fDelayTimeMarker = 42;
83 fStaticData.fDeadTime = 84;
84
85 fStaticData.fClockConditioner[0] = 100;
86 fStaticData.fClockConditioner[1] = 1;
87 fStaticData.fClockConditioner[2] = 8;
88 fStaticData.fClockConditioner[3] = 9;
89 fStaticData.fClockConditioner[4] = 11;
90 fStaticData.fClockConditioner[5] = 13;
91 fStaticData.fClockConditioner[6] = 14;
92 fStaticData.fClockConditioner[7] = 15;
93
94 fStaticData.fTriggerSequence = 1 | (2<<5) | (3<<10);
95
96 fStaticData.fGeneralSettings =
97 FTM::StaticData::kTrigger |
98 FTM::StaticData::kLPext |
99 FTM::StaticData::kPedestal;
100
101 fStaticData.fActiveFTU[0] = 0x3ff;
102 fStaticData.fActiveFTU[3] = 0x3ff;
103
104 for (int i=0; i<40; i++)
105 {
106 for (int p=0; p<4; p++)
107 fStaticData[i].fEnable[p] = 0x1ff;
108
109 for (int p=0; p<5; p++)
110 fStaticData[i].fDAC[p] = (p+1)*10;
111
112 fStaticData[i].fPrescaling = 42;
113 }
114
115 for (unsigned long long i=0; i<40; i++)
116 {
117 fFtuList[i].fDNA = (i<<48)|(i<<32)|(i<<16)|i;
118 fFtuList[i].fPingAddr = (1<<8) | i;
119 }
120
121 fFtuList[1].fPingAddr = (1<<9) | 1;
122 fFtuList[0].fPingAddr = 0;
123
124 fFtuList.fNumBoards = 19;
125 fFtuList.fNumBoardsCrate[0] = 9;
126 fFtuList.fNumBoardsCrate[1] = 0;
127 fFtuList.fNumBoardsCrate[2] = 0;
128 fFtuList.fNumBoardsCrate[3] = 10;
129 }
130
131 // Callback when writing was successfull or failed
132 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
133 {
134 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
135 }
136
137 vector<uint16_t> fBufCommand;
138 vector<uint16_t> fBufHeader;
139 vector<uint16_t> fBufFtuList;
140 vector<uint16_t> fBufStaticData;
141 vector<uint16_t> fBufDynamicData;
142
143 vector<uint16_t> fCommand;
144 FTM::Header fHeader;
145 FTM::FtuList fFtuList;
146 FTM::StaticData fStaticData;
147 FTM::DynamicData fDynamicData;
148
149 //vector<uint16_t> fStaticData;
150
151 uint16_t fDelimiter;
152 uint16_t fBufRegister;
153
154 uint16_t fCounter;
155 uint16_t fTimeStamp;
156
157 bool fReportsDisabled;
158
159 ba::deadline_timer fTriggerSendData;
160
161 void SendDynamicData()
162 {
163 if (fReportsDisabled)
164 return;
165
166 //if (fHeader.fState == FTM::kFtmRunning)
167 // fDynamicData.fOnTimeCounter = lrint(Time().UnixTime()-fStartTime);
168
169 fDynamicData.fTempSensor[0] = (23. + (6.*rand()/RAND_MAX-3))*10;
170 fDynamicData.fTempSensor[1] = (55. + (6.*rand()/RAND_MAX-3))*10;
171 fDynamicData.fTempSensor[2] = (39. + (6.*rand()/RAND_MAX-3))*10;
172 fDynamicData.fTempSensor[3] = (42. + (6.*rand()/RAND_MAX-3))*10;
173
174 for (int i=0; i<40; i++)
175 for (int p=0; p<4; p++)
176 fDynamicData[i].fRatePatch[p] = (1000 + (float(rand())/RAND_MAX-0.5)*25*p);
177
178 fHeader.fType=kDynamicData; // FtuList
179 fHeader.fDataSize=sizeof(FTM::DynamicData)/2+1;
180 fHeader.fTriggerCounter = fCounter;
181 fHeader.fTimeStamp = fTimeStamp++*1000000;//lrint(Time().UnixTime());
182
183 fBufHeader = fHeader.HtoN();
184 fBufDynamicData = fDynamicData.HtoN();
185
186 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
187 AsyncWrite(ba::buffer(ba::const_buffer(&fBufDynamicData[0], sizeof(FTM::DynamicData))));
188 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
189 }
190
191 void SendStaticData()
192 {
193 fHeader.fType=kStaticData; // FtuList
194 fHeader.fDataSize=sizeof(FTM::StaticData)/2+1;
195 fHeader.fTriggerCounter = fCounter;
196 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
197
198 for (int i=0; i<4; i++)
199 fFtuList.fActiveFTU[i] = fStaticData.fActiveFTU[i];
200
201 fBufHeader = fHeader.HtoN();
202 fBufStaticData = fStaticData.HtoN();
203
204 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
205 AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[0], fBufStaticData.size()*2)));
206 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
207 }
208
209 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
210 {
211 // Do not schedule a new read if the connection failed.
212 if (bytes_received==0)
213 {
214 // Close the connection
215 close();
216 return;
217 }
218
219 // No command received yet
220 if (fCommand.size()==0)
221 {
222 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
223 fBufCommand.begin(), ntohs);
224
225 if (fBufCommand[0]!='@')
226 {
227 cout << "Inavlid command: 0x" << hex << fBufCommand[0] << dec << endl;
228 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
229 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
230 return;
231 }
232
233 switch (fBufCommand[1])
234 {
235 case kCmdToggleLed:
236 cout << "-> TOGGLE_LED" << endl;
237
238 fBufCommand.resize(5);
239 AsyncRead(ba::buffer(fBufCommand));
240 return;
241
242 case kCmdPing:
243 cout << "-> PING" << endl;
244
245 fHeader.fType=kFtuList; // FtuList
246 fHeader.fDataSize=sizeof(FTM::FtuList)/2+1;
247 fHeader.fTriggerCounter = fCounter;
248 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
249
250 fFtuList[1].fPingAddr = ((rand()&1)<<9) | 1;
251 fFtuList[0].fPingAddr = ((rand()&1)<<8);
252
253 fBufHeader = fHeader.HtoN();
254 fBufFtuList = fFtuList.HtoN();
255
256 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
257 AsyncWrite(ba::buffer(ba::const_buffer(&fBufFtuList[0], fBufFtuList.size()*2)));
258 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
259
260 fBufCommand.resize(5);
261 AsyncRead(ba::buffer(fBufCommand));
262 return;
263
264 case kCmdRead: // kCmdRead
265 cout << "-> READ" << endl;
266 switch (fBufCommand[2])
267 {
268 case kCmdStaticData:
269 cout << "-> STATIC" << endl;
270
271 SendStaticData();
272
273 fBufCommand.resize(5);
274 AsyncRead(ba::buffer(fBufCommand));
275
276 return;
277
278 case kCmdDynamicData:
279 cout << "-> DYNAMIC" << endl;
280
281 SendDynamicData();
282
283 fBufCommand.resize(5);
284 AsyncRead(ba::buffer(fBufCommand));
285
286 return;
287
288 case kCmdRegister:
289 fCommand = fBufCommand;
290 cout << "-> REGISTER" << endl;
291
292 fBufCommand.resize(1);
293 AsyncRead(ba::buffer(fBufCommand));
294 return;
295 }
296 break;
297
298
299 case kCmdWrite:
300 switch (fBufCommand[2])
301 {
302 case kCmdRegister:
303 fCommand = fBufCommand;
304 cout << "-> REGISTER" << endl;
305
306 fBufCommand.resize(2);
307 AsyncRead(ba::buffer(fBufCommand));
308 return;
309
310 case kCmdStaticData:
311 fCommand = fBufCommand;
312 cout << "-> STATIC DATA" << endl;
313
314 fBufCommand.resize(sizeof(StaticData)/2);
315 AsyncRead(ba::buffer(fBufCommand));
316 return;
317 }
318 break;
319
320 case kCmdDisableReports:
321 cout << "-> DISABLE REPORTS " << !fBufCommand[2] << endl;
322 fReportsDisabled = !fBufCommand[2];
323
324 fBufCommand.resize(5);
325 AsyncRead(ba::buffer(fBufCommand));
326 return;
327
328 case kCmdConfigFTU:
329 cout << "-> Configure FTU " << (fBufCommand[2]&0xff) << " " << (fBufCommand[2]>>8) << endl;
330
331 fBufCommand.resize(5);
332 AsyncRead(ba::buffer(fBufCommand));
333 return;
334
335 case kCmdStartRun:
336 fHeader.fState = FTM::kFtmRunning|FTM::kFtmLocked;
337
338 fStartTime = Time().UnixTime();
339
340 fCounter = 0;
341 fTimeStamp = 0;
342 fHeader.fTriggerCounter = fCounter;
343
344 fBufCommand.resize(5);
345 AsyncRead(ba::buffer(fBufCommand));
346
347 AsyncWait(fTriggerSendData, 0, &tcp_connection::TriggerSendData);
348 return;
349
350 case kCmdStopRun:
351 fHeader.fState = FTM::kFtmIdle|FTM::kFtmLocked;
352
353 fTriggerSendData.cancel();
354
355 fCounter = 0;
356 fTimeStamp = 0;
357
358 fBufCommand.resize(5);
359 AsyncRead(ba::buffer(fBufCommand));
360 return;
361 }
362
363 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
364 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
365 return;
366 }
367
368 // Command data received
369
370 // Prepare reception of next command
371 switch (fCommand[1])
372 {
373 case kCmdRead: // kCmdRead
374 {
375 const uint16_t addr = ntohs(fBufCommand[0]);
376 const uint16_t val = reinterpret_cast<uint16_t*>(&fStaticData)[addr];
377
378 cout << "-> GET REGISTER[" << addr << "]=" << val << endl;
379
380 fHeader.fType=kRegister; // FtuList
381 fHeader.fDataSize=2;
382 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
383
384 fBufHeader = fHeader.HtoN();
385 fBufStaticData[addr] = htons(val);
386
387 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
388 AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[addr], 2)));
389 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
390 break;
391 }
392
393 case kCmdWrite:
394 switch (fCommand[2])
395 {
396 case kCmdRegister:
397 {
398 const uint16_t addr = ntohs(fBufCommand[0]);
399 const uint16_t val = ntohs(fBufCommand[1]);
400
401 cout << "-> SET REGISTER[" << addr << "]=" << val << endl;
402
403 reinterpret_cast<uint16_t*>(&fStaticData)[addr] = val;
404 }
405 break;
406
407 case kCmdStaticData:
408 {
409 cout << "-> SET STATIC DATA" << endl;
410 fStaticData = fBufCommand;
411 }
412 break;
413 }
414 break;
415 }
416
417 fCommand.resize(0);
418
419 fBufCommand.resize(5);
420 AsyncRead(ba::buffer(fBufCommand));
421 }
422
423 void SendDynData(const boost::system::error_code &ec)
424 {
425 if (!is_open())
426 {
427 // For example: Here we could schedule a new accept if we
428 // would not want to allow two connections at the same time.
429 return;
430 }
431
432 if (ec==ba::error::basic_errors::operation_aborted)
433 return;
434
435 // Check whether the deadline has passed. We compare the deadline
436 // against the current time since a new asynchronous operation
437 // may have moved the deadline before this actor had a chance
438 // to run.
439
440 if (fTriggerDynData.expires_at() > ba::deadline_timer::traits_type::now())
441 return;
442
443 // The deadline has passed.
444 SendDynamicData();
445
446 AsyncWait(fTriggerDynData, 1000, &tcp_connection::SendDynData);
447 }
448
449 void TriggerSendData(const boost::system::error_code &ec)
450 {
451 if (!is_open())
452 {
453 // For example: Here we could schedule a new accept if we
454 // would not want to allow two connections at the same time.
455 return;
456 }
457
458 if (ec==ba::error::basic_errors::operation_aborted)
459 return;
460
461 // Check whether the deadline has passed. We compare the deadline
462 // against the current time since a new asynchronous operation
463 // may have moved the deadline before this actor had a chance
464 // to run.
465 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
466 return;
467
468
469 if (fStaticData.IsEnabled(StaticData::kTrigger))
470 Dim::SendCommand("FAD/TRIGGER", fCounter++);
471
472 const uint16_t time = 100*float(rand())/RAND_MAX+50;
473
474 AsyncWait(fTriggerSendData, time, &tcp_connection::TriggerSendData);
475 }
476
477public:
478 typedef boost::shared_ptr<tcp_connection> shared_ptr;
479
480 static shared_ptr create(ba::io_service& io_service)
481 {
482 return shared_ptr(new tcp_connection(io_service));
483 }
484
485 void start()
486 {
487 // Ownership of buffer must be valid until Handler is called.
488
489 // Emit something to be written to the socket
490 fBufCommand.resize(5);
491 AsyncRead(ba::buffer(fBufCommand));
492
493 AsyncWait(fTriggerDynData, 1000, &tcp_connection::SendDynData);
494
495// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
496// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
497
498 }
499};
500
501
502class tcp_server : public tcp::acceptor
503{
504public:
505 tcp_server(ba::io_service& ioservice, int port) :
506 tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
507
508 {
509 // We could start listening for more than one connection
510 // here, but since there is only one handler executed each time
511 // it would not make sense. Before one handle_accept is not
512 // finished no new handle_accept will be called.
513 // Workround: Start a new thread in handle_accept
514 start_accept();
515 }
516
517private:
518 void start_accept()
519 {
520 cout << "Start accept..." << flush;
521 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/get_io_service());
522
523 // This will accept a connection without blocking
524 async_accept(*new_connection,
525 boost::bind(&tcp_server::handle_accept,
526 this,
527 new_connection,
528 ba::placeholders::error));
529
530 cout << "start-done." << endl;
531 }
532
533 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
534 {
535 // The connection has been accepted and is now ready to use
536
537 // not installing a new handler will stop run()
538 cout << "Handle accept..." << flush;
539 if (!error)
540 {
541 new_connection->start();
542
543 // The is now an open connection/server (tcp_connection)
544 // we immediatly schedule another connection
545 // This allowed two client-connection at the same time
546 start_accept();
547 }
548 cout << "handle-done." << endl;
549 }
550};
551
552#include "Configuration.h"
553
554void SetupConfiguration(::Configuration &conf)
555{
556 const string n = conf.GetName()+".log";
557
558 po::options_description config("Program options");
559 config.add_options()
560 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
561 ("port,p", var<uint16_t>(5000), "")
562 ;
563
564 po::positional_options_description p;
565 p.add("port", 1); // The first positional options
566 p.add("num", 1); // The second positional options
567
568 conf.AddEnv("dns", "DIM_DNS_NODE");
569
570 conf.AddOptions(config);
571 conf.SetArgumentPositions(p);
572}
573
574int main(int argc, const char **argv)
575{
576 ::Configuration conf(argv[0]);
577
578 SetupConfiguration(conf);
579
580 po::variables_map vm;
581 try
582 {
583 vm = conf.Parse(argc, argv);
584 }
585#if BOOST_VERSION > 104000
586 catch (po::multiple_occurrences &e)
587 {
588 cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl;
589 return -1;
590 }
591#endif
592 catch (exception& e)
593 {
594 cerr << "Program options invalid due to: " << e.what() << endl;
595 return -1;
596 }
597
598 if (conf.HasVersion() || conf.HasPrint() || conf.HasHelp())
599 return -1;
600
601 Dim::Setup(conf.Get<string>("dns"));
602
603 //try
604 {
605 ba::io_service io_service;
606
607 Port = conf.Get<uint16_t>("port");
608
609 tcp_server server(io_service, Port);
610 // ba::add_service(io_service, &server);
611 // server.add_service(...);
612 //cout << "Run..." << flush;
613
614 // Calling run() from a single thread ensures no concurrent access
615 // of the handler which are called!!!
616 io_service.run();
617
618 //cout << "end." << endl;
619 }
620 /*catch (std::exception& e)
621 {
622 std::cerr << e.what() << std::endl;
623 }*/
624
625 return 0;
626}
627/* ====================== Buffers ===========================
628
629char d1[128]; ba::buffer(d1));
630std::vector<char> d2(128); ba::buffer(d2);
631boost::array<char, 128> d3; by::buffer(d3);
632
633// --------------------------------
634char d1[128];
635std::vector<char> d2(128);
636boost::array<char, 128> d3;
637
638boost::array<mutable_buffer, 3> bufs1 = {
639 ba::buffer(d1),
640 ba::buffer(d2),
641 ba::buffer(d3) };
642sock.read(bufs1);
643
644std::vector<const_buffer> bufs2;
645bufs2.push_back(boost::asio::buffer(d1));
646bufs2.push_back(boost::asio::buffer(d2));
647bufs2.push_back(boost::asio::buffer(d3));
648sock.write(bufs2);
649
650
651// ======================= Read functions =========================
652
653ba::async_read_until --> delimiter
654
655streambuf buf; // Ensure validity until handler!
656by::async_read(s, buf, ....);
657
658ba::async_read(s, ba:buffer(data, size), handler);
659 // Single buffer
660 boost::asio::async_read(s,
661 ba::buffer(data, size),
662 compl-func --> ba::transfer_at_least(32),
663 handler);
664
665 // Multiple buffers
666boost::asio::async_read(s, buffers,
667 compl-func --> boost::asio::transfer_all(),
668 handler);
669 */
670
671// ================= Others ===============================
672
673 /*
674 strand Provides serialised handler execution.
675 work Class to inform the io_service when it has work to do.
676
677
678io_service::
679dispatch Request the io_service to invoke the given handler.
680poll Run the io_service's event processing loop to execute ready
681 handlers.
682poll_one Run the io_service's event processing loop to execute one ready
683 handler.
684post Request the io_service to invoke the given handler and return
685 immediately.
686reset Reset the io_service in preparation for a subsequent run()
687 invocation.
688run Run the io_service's event processing loop.
689run_one Run the io_service's event processing loop to execute at most
690 one handler.
691stop Stop the io_service's event processing loop.
692wrap Create a new handler that automatically dispatches the wrapped
693 handler on the io_service.
694
695strand:: The io_service::strand class provides the ability to
696 post and dispatch handlers with the guarantee that none
697 of those handlers will execute concurrently.
698
699dispatch Request the strand to invoke the given handler.
700get_io_service Get the io_service associated with the strand.
701post Request the strand to invoke the given handler and return
702 immediately.
703wrap Create a new handler that automatically dispatches the
704 wrapped handler on the strand.
705
706work:: The work class is used to inform the io_service when
707 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
708get_io_service Get the io_service associated with the work.
709work Constructor notifies the io_service that work is starting.
710
711*/
712
713
Note: See TracBrowser for help on using the repository browser.