source: trunk/FACT++/src/ftm.cc@ 11439

Last change on this file since 11439 was 11380, checked in by tbretz, 13 years ago
Implemented sending triggers from the ftm to the fad via Dim.
File size: 22.1 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "Dim.h"
15#include "HeadersFTM.h"
16
17using namespace std;
18using namespace FTM;
19
20namespace ba = boost::asio;
21namespace bs = boost::system;
22namespace dummy = ba::placeholders;
23
24using boost::lexical_cast;
25using ba::ip::tcp;
26
27int Port = 0;
28
29// ------------------------------------------------------------------------
30
31
32// ------------------------------------------------------------------------
33
34class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
35{
36private:
37
38 double fStartTime;
39
40 void AsyncRead(ba::mutable_buffers_1 buffers)
41 {
42 ba::async_read(*this, buffers,
43 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
44 dummy::error, dummy::bytes_transferred));
45 }
46
47 void AsyncWrite(const ba::const_buffers_1 &buffers)
48 {
49 ba::async_write(*this, buffers,
50 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
51 dummy::error, dummy::bytes_transferred));
52 }
53 void AsyncWait(ba::deadline_timer &timer, int seconds,
54 void (tcp_connection::*handler)(const bs::error_code&))// const
55 {
56 timer.expires_from_now(boost::posix_time::milliseconds(seconds));
57 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
58 }
59
60 ba::deadline_timer fTriggerDynData;
61
62 // The constructor is prvate to force the obtained pointer to be shared
63 tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
64 fTriggerDynData(ioservice), fTriggerSendData(ioservice)
65 {
66 //deadline_.expires_at(boost::posix_time::pos_infin);
67
68 fHeader.fDelimiter=kDelimiterStart;
69 fHeader.fState=FTM::kFtmIdle;
70 fHeader.fBoardId=0xaffe;
71 fHeader.fFirmwareId=0x42;
72
73 fDelimiter = htons(kDelimiterEnd);
74
75 fStaticData.clear();
76
77 fStaticData.fMultiplicityPhysics = 1;
78 fStaticData.fMultiplicityCalib = 40;
79 fStaticData.fWindowCalib = 1;
80 fStaticData.fWindowPhysics = 0;
81 fStaticData.fDelayTrigger = 21;
82 fStaticData.fDelayTimeMarker = 42;
83 fStaticData.fDeadTime = 84;
84
85 fStaticData.fClockConditioner[0] = 100;
86 fStaticData.fClockConditioner[1] = 1;
87 fStaticData.fClockConditioner[2] = 8;
88 fStaticData.fClockConditioner[3] = 9;
89 fStaticData.fClockConditioner[4] = 11;
90 fStaticData.fClockConditioner[5] = 13;
91 fStaticData.fClockConditioner[6] = 14;
92 fStaticData.fClockConditioner[7] = 15;
93
94 fStaticData.fTriggerSequence = 1 | (2<<5) | (3<<10);
95
96 fStaticData.fGeneralSettings =
97 FTM::StaticData::kTrigger |
98 FTM::StaticData::kLPext |
99 FTM::StaticData::kPedestal;
100
101 fStaticData.fActiveFTU[0] = 0x3ff;
102 fStaticData.fActiveFTU[3] = 0x3ff;
103
104 for (int i=0; i<40; i++)
105 {
106 for (int p=0; p<4; p++)
107 fStaticData[i].fEnable[p] = 0x1ff;
108
109 for (int p=0; p<5; p++)
110 fStaticData[i].fDAC[p] = (p+1)*10;
111
112 fStaticData[i].fPrescaling = 42;
113 }
114
115 for (unsigned long long i=0; i<40; i++)
116 {
117 fFtuList[i].fDNA = (i<<48)|(i<<32)|(i<<16)|i;
118 fFtuList[i].fPingAddr = (1<<8) | i;
119 }
120
121 fFtuList[1].fPingAddr = (1<<9) | 1;
122 fFtuList[0].fPingAddr = 0;
123
124 fFtuList.fNumBoards = 19;
125 fFtuList.fNumBoardsCrate[0] = 9;
126 fFtuList.fNumBoardsCrate[1] = 0;
127 fFtuList.fNumBoardsCrate[2] = 0;
128 fFtuList.fNumBoardsCrate[3] = 10;
129 }
130
131 // Callback when writing was successfull or failed
132 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
133 {
134 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
135 }
136
137 vector<uint16_t> fBufCommand;
138 vector<uint16_t> fBufHeader;
139 vector<uint16_t> fBufFtuList;
140 vector<uint16_t> fBufStaticData;
141 vector<uint16_t> fBufDynamicData;
142
143 vector<uint16_t> fCommand;
144 FTM::Header fHeader;
145 FTM::FtuList fFtuList;
146 FTM::StaticData fStaticData;
147 FTM::DynamicData fDynamicData;
148
149 //vector<uint16_t> fStaticData;
150
151 uint16_t fDelimiter;
152 uint16_t fBufRegister;
153
154 uint16_t fCounter;
155 uint16_t fTimeStamp;
156
157 bool fReportsDisabled;
158
159 ba::deadline_timer fTriggerSendData;
160
161 void SendDynamicData()
162 {
163 if (fReportsDisabled)
164 return;
165
166 //if (fHeader.fState == FTM::kFtmRunning)
167 // fDynamicData.fOnTimeCounter = lrint(Time().UnixTime()-fStartTime);
168
169 fDynamicData.fTempSensor[0] = (23. + (6.*rand()/RAND_MAX-3))*10;
170 fDynamicData.fTempSensor[1] = (55. + (6.*rand()/RAND_MAX-3))*10;
171 fDynamicData.fTempSensor[2] = (39. + (6.*rand()/RAND_MAX-3))*10;
172 fDynamicData.fTempSensor[3] = (42. + (6.*rand()/RAND_MAX-3))*10;
173
174 for (int i=0; i<40; i++)
175 for (int p=0; p<4; p++)
176 fDynamicData[i].fRatePatch[p] = (1000 + (float(rand())/RAND_MAX-0.5)*25*p);
177
178 fHeader.fType=kDynamicData; // FtuList
179 fHeader.fDataSize=sizeof(FTM::DynamicData)/2+1;
180 fHeader.fTriggerCounter = fCounter;
181 fHeader.fTimeStamp = fTimeStamp++*1000000;//lrint(Time().UnixTime());
182
183 fBufHeader = fHeader.HtoN();
184 fBufDynamicData = fDynamicData.HtoN();
185
186 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
187 AsyncWrite(ba::buffer(ba::const_buffer(&fBufDynamicData[0], sizeof(FTM::DynamicData))));
188 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
189 }
190
191 void SendStaticData()
192 {
193 fHeader.fType=kStaticData; // FtuList
194 fHeader.fDataSize=sizeof(FTM::StaticData)/2+1;
195 fHeader.fTriggerCounter = fCounter;
196 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
197
198 for (int i=0; i<4; i++)
199 fFtuList.fActiveFTU[i] = fStaticData.fActiveFTU[i];
200
201 fBufHeader = fHeader.HtoN();
202 fBufStaticData = fStaticData.HtoN();
203
204 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
205 AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[0], fBufStaticData.size()*2)));
206 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
207 }
208
209 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
210 {
211 // Do not schedule a new read if the connection failed.
212 if (bytes_received==0)
213 {
214 // Close the connection
215 close();
216 return;
217 }
218
219 // No command received yet
220 if (fCommand.size()==0)
221 {
222 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
223 fBufCommand.begin(), ntohs);
224
225 if (fBufCommand[0]!='@')
226 {
227 cout << "Inavlid command: 0x" << hex << fBufCommand[0] << dec << endl;
228 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
229 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
230 return;
231 }
232
233 switch (fBufCommand[1])
234 {
235 case kCmdToggleLed:
236 cout << "-> TOGGLE_LED" << endl;
237
238 fBufCommand.resize(5);
239 AsyncRead(ba::buffer(fBufCommand));
240 return;
241
242 case kCmdPing:
243 cout << "-> PING" << endl;
244
245 fHeader.fType=kFtuList; // FtuList
246 fHeader.fDataSize=sizeof(FTM::FtuList)/2+1;
247 fHeader.fTriggerCounter = fCounter;
248 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
249
250 fFtuList[1].fPingAddr = ((rand()&1)<<9) | 1;
251 fFtuList[0].fPingAddr = ((rand()&1)<<8);
252
253 fBufHeader = fHeader.HtoN();
254 fBufFtuList = fFtuList.HtoN();
255
256 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
257 AsyncWrite(ba::buffer(ba::const_buffer(&fBufFtuList[0], fBufFtuList.size()*2)));
258 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
259
260 fBufCommand.resize(5);
261 AsyncRead(ba::buffer(fBufCommand));
262 return;
263
264 case kCmdRead: // kCmdRead
265 cout << "-> READ" << endl;
266 switch (fBufCommand[2])
267 {
268 case kCmdStaticData:
269 cout << "-> STATIC" << endl;
270
271 SendStaticData();
272
273 fBufCommand.resize(5);
274 AsyncRead(ba::buffer(fBufCommand));
275
276 return;
277
278 case kCmdDynamicData:
279 cout << "-> DYNAMIC" << endl;
280
281 SendDynamicData();
282
283 fBufCommand.resize(5);
284 AsyncRead(ba::buffer(fBufCommand));
285
286 return;
287
288 case kCmdRegister:
289 fCommand = fBufCommand;
290 cout << "-> REGISTER" << endl;
291
292 fBufCommand.resize(1);
293 AsyncRead(ba::buffer(fBufCommand));
294 return;
295 }
296 break;
297
298
299 case kCmdWrite:
300 switch (fBufCommand[2])
301 {
302 case kCmdRegister:
303 fCommand = fBufCommand;
304 cout << "-> REGISTER" << endl;
305
306 fBufCommand.resize(2);
307 AsyncRead(ba::buffer(fBufCommand));
308 return;
309
310 case kCmdStaticData:
311 fCommand = fBufCommand;
312 cout << "-> STATIC DATA" << endl;
313
314 fBufCommand.resize(sizeof(StaticData)/2);
315 AsyncRead(ba::buffer(fBufCommand));
316 return;
317 }
318 break;
319
320 case kCmdDisableReports:
321 cout << "-> DISABLE REPORTS " << !fBufCommand[2] << endl;
322 fReportsDisabled = !fBufCommand[2];
323
324 fBufCommand.resize(5);
325 AsyncRead(ba::buffer(fBufCommand));
326 return;
327
328 case kCmdStartRun:
329 fHeader.fState = FTM::kFtmRunning;
330
331 fStartTime = Time().UnixTime();
332
333 fCounter = 0;
334 fTimeStamp = 0;
335 fHeader.fTriggerCounter = fCounter;
336
337 fBufCommand.resize(5);
338 AsyncRead(ba::buffer(fBufCommand));
339
340 AsyncWait(fTriggerSendData, 0, &tcp_connection::TriggerSendData);
341 return;
342
343 case kCmdStopRun:
344 fHeader.fState = FTM::kFtmIdle;
345
346 fTriggerSendData.cancel();
347
348 fCounter = 0;
349 fTimeStamp = 0;
350
351 fBufCommand.resize(5);
352 AsyncRead(ba::buffer(fBufCommand));
353 return;
354 }
355
356 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
357 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
358 return;
359 }
360
361 // Command data received
362
363 // Prepare reception of next command
364 switch (fCommand[1])
365 {
366 case kCmdRead: // kCmdRead
367 {
368 const uint16_t addr = ntohs(fBufCommand[0]);
369 const uint16_t val = reinterpret_cast<uint16_t*>(&fStaticData)[addr];
370
371 cout << "-> GET REGISTER[" << addr << "]=" << val << endl;
372
373 fHeader.fType=kRegister; // FtuList
374 fHeader.fDataSize=2;
375 fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
376
377 fBufHeader = fHeader.HtoN();
378 fBufStaticData[addr] = htons(val);
379
380 AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
381 AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[addr], 2)));
382 AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
383 break;
384 }
385
386 case kCmdWrite:
387 switch (fCommand[2])
388 {
389 case kCmdRegister:
390 {
391 const uint16_t addr = ntohs(fBufCommand[0]);
392 const uint16_t val = ntohs(fBufCommand[1]);
393
394 cout << "-> SET REGISTER[" << addr << "]=" << val << endl;
395
396 reinterpret_cast<uint16_t*>(&fStaticData)[addr] = val;
397 }
398 break;
399
400 case kCmdStaticData:
401 {
402 cout << "-> SET STATIC DATA" << endl;
403 fStaticData = fBufCommand;
404 }
405 break;
406 }
407 break;
408 }
409
410 fCommand.resize(0);
411
412 fBufCommand.resize(5);
413 AsyncRead(ba::buffer(fBufCommand));
414 }
415
416 void SendDynData(const boost::system::error_code &ec)
417 {
418 if (!is_open())
419 {
420 // For example: Here we could schedule a new accept if we
421 // would not want to allow two connections at the same time.
422 return;
423 }
424
425 if (ec==ba::error::basic_errors::operation_aborted)
426 return;
427
428 // Check whether the deadline has passed. We compare the deadline
429 // against the current time since a new asynchronous operation
430 // may have moved the deadline before this actor had a chance
431 // to run.
432
433 if (fTriggerDynData.expires_at() > ba::deadline_timer::traits_type::now())
434 return;
435
436 // The deadline has passed.
437 SendDynamicData();
438
439 AsyncWait(fTriggerDynData, 1000, &tcp_connection::SendDynData);
440 }
441
442 void TriggerSendData(const boost::system::error_code &ec)
443 {
444 if (!is_open())
445 {
446 // For example: Here we could schedule a new accept if we
447 // would not want to allow two connections at the same time.
448 return;
449 }
450
451 if (ec==ba::error::basic_errors::operation_aborted)
452 return;
453
454 // Check whether the deadline has passed. We compare the deadline
455 // against the current time since a new asynchronous operation
456 // may have moved the deadline before this actor had a chance
457 // to run.
458 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
459 return;
460
461 Dim::SendCommand("FAD/TRIGGER");
462 fCounter++;
463
464 const uint16_t time = 100*float(rand())/RAND_MAX+50;
465
466 AsyncWait(fTriggerSendData, time, &tcp_connection::TriggerSendData);
467 }
468
469public:
470 typedef boost::shared_ptr<tcp_connection> shared_ptr;
471
472 static shared_ptr create(ba::io_service& io_service)
473 {
474 return shared_ptr(new tcp_connection(io_service));
475 }
476
477 void start()
478 {
479 // Ownership of buffer must be valid until Handler is called.
480
481 // Emit something to be written to the socket
482 fBufCommand.resize(5);
483 AsyncRead(ba::buffer(fBufCommand));
484
485 AsyncWait(fTriggerDynData, 1000, &tcp_connection::SendDynData);
486
487// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
488// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
489
490 }
491};
492
493
494class tcp_server : public tcp::acceptor
495{
496public:
497 tcp_server(ba::io_service& ioservice, int port) :
498 tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
499
500 {
501 // We could start listening for more than one connection
502 // here, but since there is only one handler executed each time
503 // it would not make sense. Before one handle_accept is not
504 // finished no new handle_accept will be called.
505 // Workround: Start a new thread in handle_accept
506 start_accept();
507 }
508
509private:
510 void start_accept()
511 {
512 cout << "Start accept..." << flush;
513 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/io_service());
514
515 // This will accept a connection without blocking
516 async_accept(*new_connection,
517 boost::bind(&tcp_server::handle_accept,
518 this,
519 new_connection,
520 ba::placeholders::error));
521
522 cout << "start-done." << endl;
523 }
524
525 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
526 {
527 // The connection has been accepted and is now ready to use
528
529 // not installing a new handler will stop run()
530 cout << "Handle accept..." << flush;
531 if (!error)
532 {
533 new_connection->start();
534
535 // The is now an open connection/server (tcp_connection)
536 // we immediatly schedule another connection
537 // This allowed two client-connection at the same time
538 start_accept();
539 }
540 cout << "handle-done." << endl;
541 }
542};
543
544#include "Configuration.h"
545
546void SetupConfiguration(::Configuration &conf)
547{
548 const string n = conf.GetName()+".log";
549
550 po::options_description config("Program options");
551 config.add_options()
552 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
553 ("port,p", var<uint16_t>(5000), "")
554 ;
555
556 po::positional_options_description p;
557 p.add("port", 1); // The first positional options
558 p.add("num", 1); // The second positional options
559
560 conf.AddEnv("dns", "DIM_DNS_NODE");
561
562 conf.AddOptions(config);
563 conf.SetArgumentPositions(p);
564}
565
566int main(int argc, const char **argv)
567{
568 ::Configuration conf(argv[0]);
569
570 SetupConfiguration(conf);
571
572 po::variables_map vm;
573 try
574 {
575 vm = conf.Parse(argc, argv);
576 }
577#if BOOST_VERSION > 104000
578 catch (po::multiple_occurrences &e)
579 {
580 cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl;
581 return -1;
582 }
583#endif
584 catch (exception& e)
585 {
586 cerr << "Program options invalid due to: " << e.what() << endl;
587 return -1;
588 }
589
590 if (conf.HasVersion() || conf.HasPrint() || conf.HasHelp())
591 return -1;
592
593 Dim::Setup(conf.Get<string>("dns"));
594
595 //try
596 {
597 ba::io_service io_service;
598
599 Port = conf.Get<uint16_t>("port");
600
601 tcp_server server(io_service, Port);
602 // ba::add_service(io_service, &server);
603 // server.add_service(...);
604 //cout << "Run..." << flush;
605
606 // Calling run() from a single thread ensures no concurrent access
607 // of the handler which are called!!!
608 io_service.run();
609
610 //cout << "end." << endl;
611 }
612 /*catch (std::exception& e)
613 {
614 std::cerr << e.what() << std::endl;
615 }*/
616
617 return 0;
618}
619/* ====================== Buffers ===========================
620
621char d1[128]; ba::buffer(d1));
622std::vector<char> d2(128); ba::buffer(d2);
623boost::array<char, 128> d3; by::buffer(d3);
624
625// --------------------------------
626char d1[128];
627std::vector<char> d2(128);
628boost::array<char, 128> d3;
629
630boost::array<mutable_buffer, 3> bufs1 = {
631 ba::buffer(d1),
632 ba::buffer(d2),
633 ba::buffer(d3) };
634sock.read(bufs1);
635
636std::vector<const_buffer> bufs2;
637bufs2.push_back(boost::asio::buffer(d1));
638bufs2.push_back(boost::asio::buffer(d2));
639bufs2.push_back(boost::asio::buffer(d3));
640sock.write(bufs2);
641
642
643// ======================= Read functions =========================
644
645ba::async_read_until --> delimiter
646
647streambuf buf; // Ensure validity until handler!
648by::async_read(s, buf, ....);
649
650ba::async_read(s, ba:buffer(data, size), handler);
651 // Single buffer
652 boost::asio::async_read(s,
653 ba::buffer(data, size),
654 compl-func --> ba::transfer_at_least(32),
655 handler);
656
657 // Multiple buffers
658boost::asio::async_read(s, buffers,
659 compl-func --> boost::asio::transfer_all(),
660 handler);
661 */
662
663// ================= Others ===============================
664
665 /*
666 strand Provides serialised handler execution.
667 work Class to inform the io_service when it has work to do.
668
669
670io_service::
671dispatch Request the io_service to invoke the given handler.
672poll Run the io_service's event processing loop to execute ready
673 handlers.
674poll_one Run the io_service's event processing loop to execute one ready
675 handler.
676post Request the io_service to invoke the given handler and return
677 immediately.
678reset Reset the io_service in preparation for a subsequent run()
679 invocation.
680run Run the io_service's event processing loop.
681run_one Run the io_service's event processing loop to execute at most
682 one handler.
683stop Stop the io_service's event processing loop.
684wrap Create a new handler that automatically dispatches the wrapped
685 handler on the io_service.
686
687strand:: The io_service::strand class provides the ability to
688 post and dispatch handlers with the guarantee that none
689 of those handlers will execute concurrently.
690
691dispatch Request the strand to invoke the given handler.
692get_io_service Get the io_service associated with the strand.
693post Request the strand to invoke the given handler and return
694 immediately.
695wrap Create a new handler that automatically dispatches the
696 wrapped handler on the strand.
697
698work:: The work class is used to inform the io_service when
699 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
700get_io_service Get the io_service associated with the work.
701work Constructor notifies the io_service that work is starting.
702
703*/
704
705
Note: See TracBrowser for help on using the repository browser.