source: trunk/FACT++/src/ftm.cc@ 11520

Last change on this file since 11520 was 11492, checked in by tbretz, 13 years ago
Send the trigger id with the trigger.
File size: 22.1 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "Dim.h"
15#include "HeadersFTM.h"
16
17using namespace std;
18using namespace FTM;
19
20namespace ba = boost::asio;
21namespace bs = boost::system;
22namespace dummy = ba::placeholders;
23
24using boost::lexical_cast;
25using ba::ip::tcp;
26
27int Port = 0;
28
29// ------------------------------------------------------------------------
30
31
32// ------------------------------------------------------------------------
33
34class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
35{
36private:
37
38 double fStartTime;
39
40 void AsyncRead(ba::mutable_buffers_1 buffers)
41 {
42 ba::async_read(*this, buffers,
43 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
44 dummy::error, dummy::bytes_transferred));
45 }
46
47 void AsyncWrite(const ba::const_buffers_1 &buffers)
48 {
49 ba::async_write(*this, buffers,
50 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
51 dummy::error, dummy::bytes_transferred));
52 }
53 void AsyncWait(ba::deadline_timer &timer, int seconds,
54 void (tcp_connection::*handler)(const bs::error_code&))// const
55 {
56 timer.expires_from_now(boost::posix_time::milliseconds(seconds));
57 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
58 }
59
60 ba::deadline_timer fTriggerDynData;
61
62 // The constructor is prvate to force the obtained pointer to be shared
63 tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
64 fTriggerDynData(ioservice), fTriggerSendData(ioservice)
65 {
66 //deadline_.expires_at(boost::posix_time::pos_infin);
67
68 fHeader.fDelimiter=kDelimiterStart;
69 fHeader.fState=FTM::kFtmIdle;
70 fHeader.fBoardId=0xaffe;
71 fHeader.fFirmwareId=0x42;
72
73 fDelimiter = htons(kDelimiterEnd);
74
75 fStaticData.clear();
76
77 fStaticData.fMultiplicityPhysics = 1;
78 fStaticData.fMultiplicityCalib = 40;
79 fStaticData.fWindowCalib = 1;
80 fStaticData.fWindowPhysics = 0;
81 fStaticData.fDelayTrigger = 21;
82 fStaticData.fDelayTimeMarker = 42;
83 fStaticData.fDeadTime = 84;
84
85 fStaticData.fClockConditioner[0] = 100;
86 fStaticData.fClockConditioner[1] = 1;
87 fStaticData.fClockConditioner[2] = 8;
88 fStaticData.fClockConditioner[3] = 9;
89 fStaticData.fClockConditioner[4] = 11;
90 fStaticData.fClockConditioner[5] = 13;
91 fStaticData.fClockConditioner[6] = 14;
92 fStaticData.fClockConditioner[7] = 15;
93
94 fStaticData.fTriggerSequence = 1 | (2<<5) | (3<<10);
95
96 fStaticData.fGeneralSettings =
97 FTM::StaticData::kTrigger |
98 FTM::StaticData::kLPext |
99 FTM::StaticData::kPedestal;
100
101 fStaticData.fActiveFTU[0] = 0x3ff;
102 fStaticData.fActiveFTU[3] = 0x3ff;
103
104 for (int i=0; i<40; i++)
105 {
106 for (int p=0; p<4; p++)
107 fStaticData[i].fEnable[p] = 0x1ff;
108
109 for (int p=0; p<5; p++)
110 fStaticData[i].fDAC[p] = (p+1)*10;
111
112 fStaticData[i].fPrescaling = 42;
113 }
114
115 for (unsigned long long i=0; i<40; i++)
116 {
117 fFtuList[i].fDNA = (i<<48)|(i<<32)|(i<<16)|i;
118 fFtuList[i].fPingAddr = (1<<8) | i;
119 }
120
121 fFtuList[1].fPingAddr = (1<<9) | 1;
122 fFtuList[0].fPingAddr = 0;
123
124 fFtuList.fNumBoards = 19;
125 fFtuList.fNumBoardsCrate[0] = 9;
126 fFtuList.fNumBoardsCrate[1] = 0;
127 fFtuList.fNumBoardsCrate[2] = 0;
128 fFtuList.fNumBoardsCrate[3] = 10;
129 }
130
131 // Callback when writing was successfull or failed
132 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
133 {
134 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
135 }
136
137 vector<uint16_t> fBufCommand;
138 vector<uint16_t> fBufHeader;
139 vector<uint16_t> fBufFtuList;
140 vector<uint16_t> fBufStaticData;
141 vector<uint16_t> fBufDynamicData;
142
143 vector<uint16_t> fCommand;
144 FTM::Header fHeader;
145 FTM::FtuList fFtuList;
146 FTM::StaticData fStaticData;
147 FTM::DynamicData fDynamicData;
148
149 //vector<uint16_t> fStaticData;
150
151 uint16_t fDelimiter;
152 uint16_t fBufRegister;
153
154 uint16_t fCounter;
155 uint16_t fTimeStamp;
156
157 bool fReportsDisabled;
158
159 ba::deadline_timer fTriggerSendData;
160
161 void SendDynamicData()
162 {
163 if (fReportsDisabled)
164 return;
165
166 //if (fHeader.fState == FTM::kFtmRunning)
167 // fDynamicData.fOnTimeCounter = lrint(Time().UnixTime()-fStartTime);
168
169 fDynamicData.fTempSensor[0] = (23. + (6.*rand()/RAND_MAX-3))*10;
170 fDynamicData.fTempSensor[1] = (55. + (6.*rand()/RAND_MAX-3))*10;
171 fDynamicData.fTempSensor[2] = (39. + (6.*rand()/RAND_MAX-3))*10;
172 fDynamicData.fTempSensor[3] = (42. + (6.*rand()/RAND_MAX-3))*10;
173
174 for (int i=0; i<40; i++)
175 for (int p=0; p<4; p++)
176 fDynamicData[i].fRatePatch[p] = (1000 + (float(rand())/RAND_MAX-0.5)*25*p);
177
178 fHeader.fType=kDynamicData; // FtuList
179 fHeader.fDataSize=sizeof(FTM::DynamicData)/2+1;
180 fHeader.fTriggerCounter = fCounter;
181 fHeader.fTimeStamp = fTimeStamp++*1000000;//lrint(Time().UnixTime());
182
183 fBufHeader = fHeader.HtoN();
184 fBufDynamicData = fDynamicData.HtoN();
185
186 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
187 AsyncWrite(ba::buffer(ba::const_buffer(&fBufDynamicData[0], sizeof(FTM::DynamicData))));
188 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
189 }
190
191 void SendStaticData()
192 {
193 fHeader.fType=kStaticData; // FtuList
194 fHeader.fDataSize=sizeof(FTM::StaticData)/2+1;
195 fHeader.fTriggerCounter = fCounter;
196 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
197
198 for (int i=0; i<4; i++)
199 fFtuList.fActiveFTU[i] = fStaticData.fActiveFTU[i];
200
201 fBufHeader = fHeader.HtoN();
202 fBufStaticData = fStaticData.HtoN();
203
204 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
205 AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[0], fBufStaticData.size()*2)));
206 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
207 }
208
209 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
210 {
211 // Do not schedule a new read if the connection failed.
212 if (bytes_received==0)
213 {
214 // Close the connection
215 close();
216 return;
217 }
218
219 // No command received yet
220 if (fCommand.size()==0)
221 {
222 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
223 fBufCommand.begin(), ntohs);
224
225 if (fBufCommand[0]!='@')
226 {
227 cout << "Inavlid command: 0x" << hex << fBufCommand[0] << dec << endl;
228 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
229 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
230 return;
231 }
232
233 switch (fBufCommand[1])
234 {
235 case kCmdToggleLed:
236 cout << "-> TOGGLE_LED" << endl;
237
238 fBufCommand.resize(5);
239 AsyncRead(ba::buffer(fBufCommand));
240 return;
241
242 case kCmdPing:
243 cout << "-> PING" << endl;
244
245 fHeader.fType=kFtuList; // FtuList
246 fHeader.fDataSize=sizeof(FTM::FtuList)/2+1;
247 fHeader.fTriggerCounter = fCounter;
248 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
249
250 fFtuList[1].fPingAddr = ((rand()&1)<<9) | 1;
251 fFtuList[0].fPingAddr = ((rand()&1)<<8);
252
253 fBufHeader = fHeader.HtoN();
254 fBufFtuList = fFtuList.HtoN();
255
256 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
257 AsyncWrite(ba::buffer(ba::const_buffer(&fBufFtuList[0], fBufFtuList.size()*2)));
258 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
259
260 fBufCommand.resize(5);
261 AsyncRead(ba::buffer(fBufCommand));
262 return;
263
264 case kCmdRead: // kCmdRead
265 cout << "-> READ" << endl;
266 switch (fBufCommand[2])
267 {
268 case kCmdStaticData:
269 cout << "-> STATIC" << endl;
270
271 SendStaticData();
272
273 fBufCommand.resize(5);
274 AsyncRead(ba::buffer(fBufCommand));
275
276 return;
277
278 case kCmdDynamicData:
279 cout << "-> DYNAMIC" << endl;
280
281 SendDynamicData();
282
283 fBufCommand.resize(5);
284 AsyncRead(ba::buffer(fBufCommand));
285
286 return;
287
288 case kCmdRegister:
289 fCommand = fBufCommand;
290 cout << "-> REGISTER" << endl;
291
292 fBufCommand.resize(1);
293 AsyncRead(ba::buffer(fBufCommand));
294 return;
295 }
296 break;
297
298
299 case kCmdWrite:
300 switch (fBufCommand[2])
301 {
302 case kCmdRegister:
303 fCommand = fBufCommand;
304 cout << "-> REGISTER" << endl;
305
306 fBufCommand.resize(2);
307 AsyncRead(ba::buffer(fBufCommand));
308 return;
309
310 case kCmdStaticData:
311 fCommand = fBufCommand;
312 cout << "-> STATIC DATA" << endl;
313
314 fBufCommand.resize(sizeof(StaticData)/2);
315 AsyncRead(ba::buffer(fBufCommand));
316 return;
317 }
318 break;
319
320 case kCmdDisableReports:
321 cout << "-> DISABLE REPORTS " << !fBufCommand[2] << endl;
322 fReportsDisabled = !fBufCommand[2];
323
324 fBufCommand.resize(5);
325 AsyncRead(ba::buffer(fBufCommand));
326 return;
327
328 case kCmdStartRun:
329 fHeader.fState = FTM::kFtmRunning;
330
331 fStartTime = Time().UnixTime();
332
333 fCounter = 0;
334 fTimeStamp = 0;
335 fHeader.fTriggerCounter = fCounter;
336
337 fBufCommand.resize(5);
338 AsyncRead(ba::buffer(fBufCommand));
339
340 AsyncWait(fTriggerSendData, 0, &tcp_connection::TriggerSendData);
341 return;
342
343 case kCmdStopRun:
344 fHeader.fState = FTM::kFtmIdle;
345
346 fTriggerSendData.cancel();
347
348 fCounter = 0;
349 fTimeStamp = 0;
350
351 fBufCommand.resize(5);
352 AsyncRead(ba::buffer(fBufCommand));
353 return;
354 }
355
356 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
357 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
358 return;
359 }
360
361 // Command data received
362
363 // Prepare reception of next command
364 switch (fCommand[1])
365 {
366 case kCmdRead: // kCmdRead
367 {
368 const uint16_t addr = ntohs(fBufCommand[0]);
369 const uint16_t val = reinterpret_cast<uint16_t*>(&fStaticData)[addr];
370
371 cout << "-> GET REGISTER[" << addr << "]=" << val << endl;
372
373 fHeader.fType=kRegister; // FtuList
374 fHeader.fDataSize=2;
375 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
376
377 fBufHeader = fHeader.HtoN();
378 fBufStaticData[addr] = htons(val);
379
380 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
381 AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[addr], 2)));
382 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
383 break;
384 }
385
386 case kCmdWrite:
387 switch (fCommand[2])
388 {
389 case kCmdRegister:
390 {
391 const uint16_t addr = ntohs(fBufCommand[0]);
392 const uint16_t val = ntohs(fBufCommand[1]);
393
394 cout << "-> SET REGISTER[" << addr << "]=" << val << endl;
395
396 reinterpret_cast<uint16_t*>(&fStaticData)[addr] = val;
397 }
398 break;
399
400 case kCmdStaticData:
401 {
402 cout << "-> SET STATIC DATA" << endl;
403 fStaticData = fBufCommand;
404 }
405 break;
406 }
407 break;
408 }
409
410 fCommand.resize(0);
411
412 fBufCommand.resize(5);
413 AsyncRead(ba::buffer(fBufCommand));
414 }
415
416 void SendDynData(const boost::system::error_code &ec)
417 {
418 if (!is_open())
419 {
420 // For example: Here we could schedule a new accept if we
421 // would not want to allow two connections at the same time.
422 return;
423 }
424
425 if (ec==ba::error::basic_errors::operation_aborted)
426 return;
427
428 // Check whether the deadline has passed. We compare the deadline
429 // against the current time since a new asynchronous operation
430 // may have moved the deadline before this actor had a chance
431 // to run.
432
433 if (fTriggerDynData.expires_at() > ba::deadline_timer::traits_type::now())
434 return;
435
436 // The deadline has passed.
437 SendDynamicData();
438
439 AsyncWait(fTriggerDynData, 1000, &tcp_connection::SendDynData);
440 }
441
442 void TriggerSendData(const boost::system::error_code &ec)
443 {
444 if (!is_open())
445 {
446 // For example: Here we could schedule a new accept if we
447 // would not want to allow two connections at the same time.
448 return;
449 }
450
451 if (ec==ba::error::basic_errors::operation_aborted)
452 return;
453
454 // Check whether the deadline has passed. We compare the deadline
455 // against the current time since a new asynchronous operation
456 // may have moved the deadline before this actor had a chance
457 // to run.
458 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
459 return;
460
461
462 if (fStaticData.IsEnabled(StaticData::kTrigger))
463 Dim::SendCommand("FAD/TRIGGER", fCounter++);
464
465 const uint16_t time = 100*float(rand())/RAND_MAX+50;
466
467 AsyncWait(fTriggerSendData, time, &tcp_connection::TriggerSendData);
468 }
469
470public:
471 typedef boost::shared_ptr<tcp_connection> shared_ptr;
472
473 static shared_ptr create(ba::io_service& io_service)
474 {
475 return shared_ptr(new tcp_connection(io_service));
476 }
477
478 void start()
479 {
480 // Ownership of buffer must be valid until Handler is called.
481
482 // Emit something to be written to the socket
483 fBufCommand.resize(5);
484 AsyncRead(ba::buffer(fBufCommand));
485
486 AsyncWait(fTriggerDynData, 1000, &tcp_connection::SendDynData);
487
488// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
489// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
490
491 }
492};
493
494
495class tcp_server : public tcp::acceptor
496{
497public:
498 tcp_server(ba::io_service& ioservice, int port) :
499 tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
500
501 {
502 // We could start listening for more than one connection
503 // here, but since there is only one handler executed each time
504 // it would not make sense. Before one handle_accept is not
505 // finished no new handle_accept will be called.
506 // Workround: Start a new thread in handle_accept
507 start_accept();
508 }
509
510private:
511 void start_accept()
512 {
513 cout << "Start accept..." << flush;
514 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/io_service());
515
516 // This will accept a connection without blocking
517 async_accept(*new_connection,
518 boost::bind(&tcp_server::handle_accept,
519 this,
520 new_connection,
521 ba::placeholders::error));
522
523 cout << "start-done." << endl;
524 }
525
526 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
527 {
528 // The connection has been accepted and is now ready to use
529
530 // not installing a new handler will stop run()
531 cout << "Handle accept..." << flush;
532 if (!error)
533 {
534 new_connection->start();
535
536 // The is now an open connection/server (tcp_connection)
537 // we immediatly schedule another connection
538 // This allowed two client-connection at the same time
539 start_accept();
540 }
541 cout << "handle-done." << endl;
542 }
543};
544
545#include "Configuration.h"
546
547void SetupConfiguration(::Configuration &conf)
548{
549 const string n = conf.GetName()+".log";
550
551 po::options_description config("Program options");
552 config.add_options()
553 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
554 ("port,p", var<uint16_t>(5000), "")
555 ;
556
557 po::positional_options_description p;
558 p.add("port", 1); // The first positional options
559 p.add("num", 1); // The second positional options
560
561 conf.AddEnv("dns", "DIM_DNS_NODE");
562
563 conf.AddOptions(config);
564 conf.SetArgumentPositions(p);
565}
566
567int main(int argc, const char **argv)
568{
569 ::Configuration conf(argv[0]);
570
571 SetupConfiguration(conf);
572
573 po::variables_map vm;
574 try
575 {
576 vm = conf.Parse(argc, argv);
577 }
578#if BOOST_VERSION > 104000
579 catch (po::multiple_occurrences &e)
580 {
581 cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl;
582 return -1;
583 }
584#endif
585 catch (exception& e)
586 {
587 cerr << "Program options invalid due to: " << e.what() << endl;
588 return -1;
589 }
590
591 if (conf.HasVersion() || conf.HasPrint() || conf.HasHelp())
592 return -1;
593
594 Dim::Setup(conf.Get<string>("dns"));
595
596 //try
597 {
598 ba::io_service io_service;
599
600 Port = conf.Get<uint16_t>("port");
601
602 tcp_server server(io_service, Port);
603 // ba::add_service(io_service, &server);
604 // server.add_service(...);
605 //cout << "Run..." << flush;
606
607 // Calling run() from a single thread ensures no concurrent access
608 // of the handler which are called!!!
609 io_service.run();
610
611 //cout << "end." << endl;
612 }
613 /*catch (std::exception& e)
614 {
615 std::cerr << e.what() << std::endl;
616 }*/
617
618 return 0;
619}
620/* ====================== Buffers ===========================
621
622char d1[128]; ba::buffer(d1));
623std::vector<char> d2(128); ba::buffer(d2);
624boost::array<char, 128> d3; by::buffer(d3);
625
626// --------------------------------
627char d1[128];
628std::vector<char> d2(128);
629boost::array<char, 128> d3;
630
631boost::array<mutable_buffer, 3> bufs1 = {
632 ba::buffer(d1),
633 ba::buffer(d2),
634 ba::buffer(d3) };
635sock.read(bufs1);
636
637std::vector<const_buffer> bufs2;
638bufs2.push_back(boost::asio::buffer(d1));
639bufs2.push_back(boost::asio::buffer(d2));
640bufs2.push_back(boost::asio::buffer(d3));
641sock.write(bufs2);
642
643
644// ======================= Read functions =========================
645
646ba::async_read_until --> delimiter
647
648streambuf buf; // Ensure validity until handler!
649by::async_read(s, buf, ....);
650
651ba::async_read(s, ba:buffer(data, size), handler);
652 // Single buffer
653 boost::asio::async_read(s,
654 ba::buffer(data, size),
655 compl-func --> ba::transfer_at_least(32),
656 handler);
657
658 // Multiple buffers
659boost::asio::async_read(s, buffers,
660 compl-func --> boost::asio::transfer_all(),
661 handler);
662 */
663
664// ================= Others ===============================
665
666 /*
667 strand Provides serialised handler execution.
668 work Class to inform the io_service when it has work to do.
669
670
671io_service::
672dispatch Request the io_service to invoke the given handler.
673poll Run the io_service's event processing loop to execute ready
674 handlers.
675poll_one Run the io_service's event processing loop to execute one ready
676 handler.
677post Request the io_service to invoke the given handler and return
678 immediately.
679reset Reset the io_service in preparation for a subsequent run()
680 invocation.
681run Run the io_service's event processing loop.
682run_one Run the io_service's event processing loop to execute at most
683 one handler.
684stop Stop the io_service's event processing loop.
685wrap Create a new handler that automatically dispatches the wrapped
686 handler on the io_service.
687
688strand:: The io_service::strand class provides the ability to
689 post and dispatch handlers with the guarantee that none
690 of those handlers will execute concurrently.
691
692dispatch Request the strand to invoke the given handler.
693get_io_service Get the io_service associated with the strand.
694post Request the strand to invoke the given handler and return
695 immediately.
696wrap Create a new handler that automatically dispatches the
697 wrapped handler on the strand.
698
699work:: The work class is used to inform the io_service when
700 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
701get_io_service Get the io_service associated with the work.
702work Constructor notifies the io_service that work is starting.
703
704*/
705
706
Note: See TracBrowser for help on using the repository browser.