source: trunk/FACT++/src/fad.cc@ 11302

Last change on this file since 11302 was 11284, checked in by tbretz, 13 years ago
Added the possibility to set the update rate for self triggered events.
File size: 19.9 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30public:
31 const int fBoardId;
32
33 double fStartTime;
34
35 void AsyncRead(ba::mutable_buffers_1 buffers)
36 {
37 ba::async_read(*this, buffers,
38 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39 dummy::error, dummy::bytes_transferred));
40 }
41
42 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43 {
44 ba::async_write(*socket, buffers,
45 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46 dummy::error, dummy::bytes_transferred));
47 }
48 void AsyncWait(ba::deadline_timer &timer, int seconds,
49 void (tcp_connection::*handler)(const bs::error_code&))// const
50 {
51 timer.expires_from_now(boost::posix_time::milliseconds(seconds));
52 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53 }
54
55 // The constructor is prvate to force the obtained pointer to be shared
56 tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
57 fBoardId(boardid), fTriggerSendData(ioservice)
58 {
59 }
60
61 // Callback when writing was successfull or failed
62 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63 {
64 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65 }
66
67 vector<uint16_t> fBufCommand;
68 vector<uint16_t> fBuffer;
69
70 vector<uint16_t> fCommand;
71
72 FAD::EventHeader fHeader;
73 FAD::EventHeader fRam;
74 FAD::ChannelHeader fChHeader[kNumChannels];
75
76 uint16_t fRamRoi[kNumChannels];
77
78 ba::deadline_timer fTriggerSendData;
79
80 bool fTriggerEnabled;
81 bool fCommandSocket;
82
83 int fSocket;
84
85 void SendData()
86 {
87 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
88 fHeader.fEventCounter++;
89 fHeader.fTriggerId = fHeader.fEventCounter;
90 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
91 fHeader.fFreqRefClock = 997+rand()/(RAND_MAX/7);
92
93 for (int i=0; i<FAD::kNumTemp; i++)
94 fHeader.fTempDrs[i] = (42.+fBoardId/40.+float(rand())/RAND_MAX*5)*16;
95
96 fBuffer.resize(0);
97
98 for (int i=0; i<kNumChannels; i++)
99 {
100 fChHeader[i].fStartCell = i*10;
101
102 const vector<uint16_t> buf = fChHeader[i].HtoN();
103
104 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
105 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42+fHeader.fEventCounter*100);
106
107 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
108 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
109 }
110
111 fBuffer.push_back(htons(FAD::kDelimiterEnd));
112
113 const vector<uint16_t> h = fHeader.HtoN();
114
115 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
116
117 if (fCommandSocket)
118 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
119 else
120 {
121 if (fSockets.size()==0)
122 return;
123
124 fSocket++;
125 fSocket %= fSockets.size();
126
127 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
128 }
129 }
130
131 void TriggerSendData(const boost::system::error_code &ec)
132 {
133 if (!is_open())
134 {
135 // For example: Here we could schedule a new accept if we
136 // would not want to allow two connections at the same time.
137 return;
138 }
139
140 if (ec==ba::error::basic_errors::operation_aborted)
141 return;
142
143 // Check whether the deadline has passed. We compare the deadline
144 // against the current time since a new asynchronous operation
145 // may have moved the deadline before this actor had a chance
146 // to run.
147 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
148 return;
149
150 // The deadline has passed.
151 if (fTriggerEnabled)
152 SendData();
153
154 AsyncWait(fTriggerSendData, fHeader.fTriggerGeneratorPrescaler, &tcp_connection::TriggerSendData);
155 }
156
157 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
158 {
159 // Do not schedule a new read if the connection failed.
160 if (bytes_received==0)
161 {
162 // Close the connection
163 close();
164 return;
165 }
166
167 // No command received yet
168 if (fCommand.size()==0)
169 {
170 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
171 fBufCommand.begin(), ntohs);
172
173 switch (fBufCommand[0])
174 {
175 case kCmdDrsEnable:
176 case kCmdDrsEnable+0x100:
177 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
178 cout << "-> DrsEnable" << endl;
179 break;
180
181 case kCmdDwrite:
182 case kCmdDwrite+0x100:
183 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
184 cout << "-> Dwrite" << endl;
185 break;
186
187 case kCmdTriggerLine:
188 case kCmdTriggerLine+0x100:
189 cout << "-> Trigger line" << endl;
190 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
191 fHeader.Enable(FAD::EventHeader::kTriggerLine, fTriggerEnabled);
192 break;
193
194 case kCmdSclk:
195 case kCmdSclk+0x100:
196 cout << "-> Sclk" << endl;
197 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
198 break;
199
200 case kCmdSrclk:
201 case kCmdSrclk+0x100:
202 cout << "-> Drclk" << endl;
203 break;
204
205 case kCmdRun:
206 case kCmdRun+0x100:
207 fStartTime = Time(Time::utc).UnixTime();
208 cout << "-> Run" << endl;
209 break;
210
211 case kCmdBusy:
212 case kCmdBusy+0x100:
213 cout << "-> Busy" << endl;
214 fHeader.Enable(FAD::EventHeader::kBusy, fBufCommand[0]==kCmdBusy);
215 break;
216
217 case kCmdSocket:
218 case kCmdSocket+0x100:
219 cout << "-> Socket" << endl;
220 fCommandSocket = fBufCommand[0]==kCmdSocket;
221 fHeader.Enable(FAD::EventHeader::kSock17, !fCommandSocket);
222 break;
223
224 case kCmdContTrigger:
225 case kCmdContTrigger+0x100:
226 if (fBufCommand[0]==kCmdContTrigger)
227 AsyncWait(fTriggerSendData, 0, &tcp_connection::TriggerSendData);
228 else
229 fTriggerSendData.cancel();
230 fHeader.Enable(FAD::EventHeader::kContTrigger, fBufCommand[0]==kCmdContTrigger);
231 cout << "-> ContTrig" << endl;
232 break;
233
234 case kCmdResetTriggerId:
235 cout << "-> ResetId" << endl;
236 fHeader.fEventCounter = 0;
237 break;
238
239 case kCmdSingleTrigger:
240 cout << "-> Trigger " << fBoardId << endl;
241 SendData();
242 break;
243
244 case kCmdWriteExecute:
245 cout << "-> Execute" << endl;
246 memcpy(fHeader.fDac, fRam.fDac, sizeof(fHeader.fDac));
247 for (int i=0; i<kNumChannels; i++)
248 fChHeader[i].fRegionOfInterest = fRamRoi[i];
249 fHeader.fRunNumber = fRam.fRunNumber;
250 break;
251
252 case kCmdWriteRunNumberMSW:
253 fCommand = fBufCommand;
254 break;
255
256 case kCmdWriteRunNumberLSW:
257 fCommand = fBufCommand;
258 break;
259
260 default:
261 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
262 {
263 fCommand.resize(2);
264 fCommand[0] = kCmdWriteRoi;
265 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
266 break;
267 }
268 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
269 {
270 fCommand.resize(2);
271 fCommand[0] = kCmdWriteDac;
272 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
273 break;
274 }
275 if (fBufCommand[0]==kCmdWriteRate)
276 {
277 fCommand.resize(1);
278 fCommand[0] = kCmdWriteRate;
279 break;
280 }
281
282 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
283 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
284 return;
285 }
286
287 fBufCommand.resize(1);
288 AsyncRead(ba::buffer(fBufCommand));
289 return;
290 }
291
292 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
293 fBufCommand.begin(), ntohs);
294
295 switch (fCommand[0])
296 {
297 case kCmdWriteRunNumberMSW:
298 fRam.fRunNumber &= 0xffff;
299 fRam.fRunNumber |= fBufCommand[0]<<16;
300 cout << "-> Set RunNumber MSW" << endl;
301 break;
302 case kCmdWriteRunNumberLSW:
303 fRam.fRunNumber &= 0xffff0000;
304 fRam.fRunNumber |= fBufCommand[0];
305 cout << "-> Set RunNumber LSW" << endl;
306 break;
307 case kCmdWriteRoi:
308 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
309 //fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
310 fRamRoi[fCommand[1]] = fBufCommand[0];
311 break;
312
313 case kCmdWriteDac:
314 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
315 fRam.fDac[fCommand[1]] = fBufCommand[0];
316 break;
317
318 case kCmdWriteRate:
319 cout << "-> Set Rate =" << fBufCommand[0] << endl;
320 fHeader.fTriggerGeneratorPrescaler = fBufCommand[0];
321 break;
322 }
323
324 fCommand.resize(0);
325
326 fBufCommand.resize(1);
327 AsyncRead(ba::buffer(fBufCommand));
328 }
329
330public:
331 typedef boost::shared_ptr<tcp_connection> shared_ptr;
332
333 static shared_ptr create(ba::io_service& io_service, int boardid)
334 {
335 return shared_ptr(new tcp_connection(io_service, boardid));
336 }
337
338 void start()
339 {
340 // Ownership of buffer must be valid until Handler is called.
341
342 fTriggerEnabled=false;
343 fCommandSocket=true;
344
345 fHeader.fStartDelimiter = FAD::kDelimiterStart;
346 fHeader.fVersion = 0x104;
347 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
348 fHeader.fRunNumber = 1;
349 fHeader.fDNA = reinterpret_cast<uint64_t>(this);
350 fHeader.fTriggerGeneratorPrescaler = 100;
351 fHeader.fStatus = 0xf<<12 |
352 FAD::EventHeader::kDenable |
353 FAD::EventHeader::kDwrite |
354 FAD::EventHeader::kDcmLocked |
355 FAD::EventHeader::kDcmReady |
356 FAD::EventHeader::kSpiSclk;
357
358
359 fStartTime = Time(Time::utc).UnixTime();
360
361 for (int i=0; i<kNumChannels; i++)
362 {
363 fChHeader[i].fId = (i%9) | ((i/9)<<4);
364 fChHeader[i].fRegionOfInterest = 0;
365 }
366
367 // Emit something to be written to the socket
368 fBufCommand.resize(1);
369 AsyncRead(ba::buffer(fBufCommand));
370
371// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
372
373// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
374// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
375
376 }
377
378 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
379
380 ~tcp_connection()
381 {
382 fSockets.clear();
383 }
384
385 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
386 {
387 cout << this << " Added one socket[" << fBoardId << "] " << socket->remote_endpoint().address().to_v4().to_string();
388 cout << ":"<< port << endl;
389 fSockets.push_back(socket);
390 }
391};
392
393
394class tcp_server
395{
396 tcp::acceptor acc0;
397 tcp::acceptor acc1;
398 tcp::acceptor acc2;
399 tcp::acceptor acc3;
400 tcp::acceptor acc4;
401 tcp::acceptor acc5;
402 tcp::acceptor acc6;
403 tcp::acceptor acc7;
404
405 int fBoardId;
406
407public:
408 tcp_server(ba::io_service& ioservice, int port, int board) :
409 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
410 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
411 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
412 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
413 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
414 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
415 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
416 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
417 fBoardId(board)
418 {
419 // We could start listening for more than one connection
420 // here, but since there is only one handler executed each time
421 // it would not make sense. Before one handle_accept is not
422 // finished no new handle_accept will be called.
423 // Workround: Start a new thread in handle_accept
424 start_accept();
425 }
426
427private:
428 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
429 {
430 boost::shared_ptr<ba::ip::tcp::socket> connection =
431 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
432
433 acc.async_accept(*connection,
434 boost::bind(&tcp_connection::handle_accept,
435 dest, connection,
436 acc.local_endpoint().port(),
437 ba::placeholders::error));
438 }
439
440 void start_accept()
441 {
442 cout << "Start accept[" << fBoardId << "] " << acc0.local_endpoint().port() << "..." << flush;
443 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
444
445 cout << new_connection.get() << " ";
446
447 // This will accept a connection without blocking
448 acc0.async_accept(*new_connection,
449 boost::bind(&tcp_server::handle_accept,
450 this,
451 new_connection,
452 ba::placeholders::error));
453
454 start_accept(new_connection, acc1);
455 start_accept(new_connection, acc2);
456 start_accept(new_connection, acc3);
457 start_accept(new_connection, acc4);
458 start_accept(new_connection, acc5);
459 start_accept(new_connection, acc6);
460 start_accept(new_connection, acc7);
461
462 cout << "start-done." << endl;
463 }
464
465 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
466 {
467 // The connection has been accepted and is now ready to use
468
469 // not installing a new handler will stop run()
470 cout << new_connection.get() << " Handle accept[" << fBoardId << "]["<<new_connection->fBoardId<<"]..." << flush;
471 if (!error)
472 {
473 new_connection->start();
474
475 // The is now an open connection/server (tcp_connection)
476 // we immediatly schedule another connection
477 // This allowed two client-connection at the same time
478 start_accept();
479 }
480 cout << "handle-done." << endl;
481 }
482};
483
484int main(int argc, const char **argv)
485{
486 //try
487 {
488 ba::io_service io_service;
489
490 int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
491 int n = argc==3 ? lexical_cast<int>(argv[2]) : 1;
492
493 vector<shared_ptr<tcp_server>> servers;
494
495 for (int i=0; i<n; i++)
496 {
497 shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
498 servers.push_back(server);
499
500 port += 8;
501 }
502
503 // ba::add_service(io_service, &server);
504 // server.add_service(...);
505 //cout << "Run..." << flush;
506
507 // Calling run() from a single thread ensures no concurrent access
508 // of the handler which are called!!!
509 io_service.run();
510
511 //cout << "end." << endl;
512 }
513 /*catch (std::exception& e)
514 {
515 std::cerr << e.what() << std::endl;
516 }*/
517
518 return 0;
519}
520/* ====================== Buffers ===========================
521
522char d1[128]; ba::buffer(d1));
523std::vector<char> d2(128); ba::buffer(d2);
524boost::array<char, 128> d3; by::buffer(d3);
525
526// --------------------------------
527char d1[128];
528std::vector<char> d2(128);
529boost::array<char, 128> d3;
530
531boost::array<mutable_buffer, 3> bufs1 = {
532 ba::buffer(d1),
533 ba::buffer(d2),
534 ba::buffer(d3) };
535sock.read(bufs1);
536
537std::vector<const_buffer> bufs2;
538bufs2.push_back(boost::asio::buffer(d1));
539bufs2.push_back(boost::asio::buffer(d2));
540bufs2.push_back(boost::asio::buffer(d3));
541sock.write(bufs2);
542
543
544// ======================= Read functions =========================
545
546ba::async_read_until --> delimiter
547
548streambuf buf; // Ensure validity until handler!
549by::async_read(s, buf, ....);
550
551ba::async_read(s, ba:buffer(data, size), handler);
552 // Single buffer
553 boost::asio::async_read(s,
554 ba::buffer(data, size),
555 compl-func --> ba::transfer_at_least(32),
556 handler);
557
558 // Multiple buffers
559boost::asio::async_read(s, buffers,
560 compl-func --> boost::asio::transfer_all(),
561 handler);
562 */
563
564// ================= Others ===============================
565
566 /*
567 strand Provides serialised handler execution.
568 work Class to inform the io_service when it has work to do.
569
570
571io_service::
572dispatch Request the io_service to invoke the given handler.
573poll Run the io_service's event processing loop to execute ready
574 handlers.
575poll_one Run the io_service's event processing loop to execute one ready
576 handler.
577post Request the io_service to invoke the given handler and return
578 immediately.
579reset Reset the io_service in preparation for a subsequent run()
580 invocation.
581run Run the io_service's event processing loop.
582run_one Run the io_service's event processing loop to execute at most
583 one handler.
584stop Stop the io_service's event processing loop.
585wrap Create a new handler that automatically dispatches the wrapped
586 handler on the io_service.
587
588strand:: The io_service::strand class provides the ability to
589 post and dispatch handlers with the guarantee that none
590 of those handlers will execute concurrently.
591
592dispatch Request the strand to invoke the given handler.
593get_io_service Get the io_service associated with the strand.
594post Request the strand to invoke the given handler and return
595 immediately.
596wrap Create a new handler that automatically dispatches the
597 wrapped handler on the strand.
598
599work:: The work class is used to inform the io_service when
600 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
601get_io_service Get the io_service associated with the work.
602work Constructor notifies the io_service that work is starting.
603
604*/
605
606
Note: See TracBrowser for help on using the repository browser.