source: trunk/FACT++/src/fad.cc@ 11237

Last change on this file since 11237 was 11228, checked in by tbretz, 13 years ago
Layout update FAD tab
File size: 19.5 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30public:
31 const int fBoardId;
32
33 double fStartTime;
34
35 void AsyncRead(ba::mutable_buffers_1 buffers)
36 {
37 ba::async_read(*this, buffers,
38 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39 dummy::error, dummy::bytes_transferred));
40 }
41
42 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43 {
44 ba::async_write(*socket, buffers,
45 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46 dummy::error, dummy::bytes_transferred));
47 }
48 void AsyncWait(ba::deadline_timer &timer, int seconds,
49 void (tcp_connection::*handler)(const bs::error_code&))// const
50 {
51 timer.expires_from_now(boost::posix_time::seconds(seconds));
52 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53 }
54
55 // The constructor is prvate to force the obtained pointer to be shared
56 tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
57 fBoardId(boardid), fTriggerSendData(ioservice)
58 {
59 }
60
61 // Callback when writing was successfull or failed
62 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63 {
64 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65 }
66
67 vector<uint16_t> fBufCommand;
68 vector<uint16_t> fBuffer;
69
70 vector<uint16_t> fCommand;
71
72 FAD::EventHeader fHeader;
73 FAD::EventHeader fRam;
74 FAD::ChannelHeader fChHeader[kNumChannels];
75
76 uint16_t fRamRoi[kNumChannels];
77
78 ba::deadline_timer fTriggerSendData;
79
80 bool fTriggerEnabled;
81 bool fCommandSocket;
82
83 int fSocket;
84
85 void SendData()
86 {
87 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
88 fHeader.fEventCounter++;
89 fHeader.fTriggerId = fHeader.fEventCounter;
90 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
91 fHeader.fFreqRefClock = 997+rand()/(RAND_MAX/7);
92
93 for (int i=0; i<FAD::kNumTemp; i++)
94 fHeader.fTempDrs[i] = (42.+fBoardId/40.+float(rand())/RAND_MAX*5)*16;
95
96 fBuffer.resize(0);
97
98 for (int i=0; i<kNumChannels; i++)
99 {
100 fChHeader[i].fStartCell = i*10;
101
102 const vector<uint16_t> buf = fChHeader[i].HtoN();
103
104 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
105 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42+fHeader.fEventCounter*100);
106
107 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
108 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
109 }
110
111 fBuffer.push_back(htons(FAD::kDelimiterEnd));
112
113 const vector<uint16_t> h = fHeader.HtoN();
114
115 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
116
117 if (fCommandSocket)
118 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
119 else
120 {
121 if (fSockets.size()==0)
122 return;
123
124 fSocket++;
125 fSocket %= fSockets.size();
126
127 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
128 }
129 }
130
131 void TriggerSendData(const boost::system::error_code &ec)
132 {
133 if (!is_open())
134 {
135 // For example: Here we could schedule a new accept if we
136 // would not want to allow two connections at the same time.
137 return;
138 }
139
140 if (ec==ba::error::basic_errors::operation_aborted)
141 return;
142
143 // Check whether the deadline has passed. We compare the deadline
144 // against the current time since a new asynchronous operation
145 // may have moved the deadline before this actor had a chance
146 // to run.
147 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
148 return;
149
150 // The deadline has passed.
151 if (fTriggerEnabled)
152 SendData();
153
154 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
155 }
156
157 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
158 {
159 // Do not schedule a new read if the connection failed.
160 if (bytes_received==0)
161 {
162 // Close the connection
163 close();
164 return;
165 }
166
167 // No command received yet
168 if (fCommand.size()==0)
169 {
170 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
171 fBufCommand.begin(), ntohs);
172
173 switch (fBufCommand[0])
174 {
175 case kCmdDrsEnable:
176 case kCmdDrsEnable+0x100:
177 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
178 cout << "-> DrsEnable" << endl;
179 break;
180
181 case kCmdDwrite:
182 case kCmdDwrite+0x100:
183 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
184 cout << "-> Dwrite" << endl;
185 break;
186
187 case kCmdTriggerLine:
188 case kCmdTriggerLine+0x100:
189 cout << "-> Trigger line" << endl;
190 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
191 fHeader.Enable(FAD::EventHeader::kTriggerLine, fTriggerEnabled);
192 break;
193
194 case kCmdSclk:
195 case kCmdSclk+0x100:
196 cout << "-> Sclk" << endl;
197 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
198 break;
199
200 case kCmdSrclk:
201 case kCmdSrclk+0x100:
202 cout << "-> Drclk" << endl;
203 break;
204
205 case kCmdRun:
206 case kCmdRun+0x100:
207 fStartTime = Time(Time::utc).UnixTime();
208 cout << "-> Run" << endl;
209 break;
210
211 case kCmdBusy:
212 case kCmdBusy+0x100:
213 cout << "-> Busy" << endl;
214 fHeader.Enable(FAD::EventHeader::kBusy, fBufCommand[0]==kCmdBusy);
215 break;
216
217 case kCmdSocket:
218 case kCmdSocket+0x100:
219 cout << "-> Socket" << endl;
220 fCommandSocket = fBufCommand[0]==kCmdSocket;
221 fHeader.Enable(FAD::EventHeader::kSock17, !fCommandSocket);
222 break;
223
224 case kCmdContTrigger:
225 case kCmdContTrigger+0x100:
226 if (fBufCommand[0]==kCmdContTrigger)
227 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
228 else
229 fTriggerSendData.cancel();
230 fHeader.Enable(FAD::EventHeader::kContTrigger, fBufCommand[0]==kCmdContTrigger);
231 cout << "-> ContTrig" << endl;
232 break;
233
234 case kCmdResetTriggerId:
235 cout << "-> ResetId" << endl;
236 fHeader.fEventCounter = 0;
237 break;
238
239 case kCmdSingleTrigger:
240 cout << "-> Trigger " << fBoardId << endl;
241 SendData();
242 break;
243
244 case kCmdWriteExecute:
245 cout << "-> Execute" << endl;
246 memcpy(fHeader.fDac, fRam.fDac, sizeof(fHeader.fDac));
247 for (int i=0; i<kNumChannels; i++)
248 fChHeader[i].fRegionOfInterest = fRamRoi[i];
249 fHeader.fRunNumber = fRam.fRunNumber;
250 break;
251
252 case kCmdWriteRunNumberMSW:
253 fCommand = fBufCommand;
254 break;
255
256 case kCmdWriteRunNumberLSW:
257 fCommand = fBufCommand;
258 break;
259
260 default:
261 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
262 {
263 fCommand.resize(2);
264 fCommand[0] = kCmdWriteRoi;
265 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
266 break;
267 }
268 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
269 {
270 fCommand.resize(2);
271 fCommand[0] = kCmdWriteDac;
272 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
273 break;
274 }
275
276 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
277 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
278 return;
279 }
280
281 fBufCommand.resize(1);
282 AsyncRead(ba::buffer(fBufCommand));
283 return;
284 }
285
286 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
287 fBufCommand.begin(), ntohs);
288
289 switch (fCommand[0])
290 {
291 case kCmdWriteRunNumberMSW:
292 fRam.fRunNumber &= 0xffff;
293 fRam.fRunNumber |= fBufCommand[0]<<16;
294 cout << "-> Set RunNumber MSW" << endl;
295 break;
296 case kCmdWriteRunNumberLSW:
297 fRam.fRunNumber &= 0xffff0000;
298 fRam.fRunNumber |= fBufCommand[0];
299 cout << "-> Set RunNumber LSW" << endl;
300 break;
301 case kCmdWriteRoi:
302 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
303 //fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
304 fRamRoi[fCommand[1]] = fBufCommand[0];
305 break;
306
307 case kCmdWriteDac:
308 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
309 fRam.fDac[fCommand[1]] = fBufCommand[0];
310 break;
311 }
312
313 fCommand.resize(0);
314
315 fBufCommand.resize(1);
316 AsyncRead(ba::buffer(fBufCommand));
317 }
318
319public:
320 typedef boost::shared_ptr<tcp_connection> shared_ptr;
321
322 static shared_ptr create(ba::io_service& io_service, int boardid)
323 {
324 return shared_ptr(new tcp_connection(io_service, boardid));
325 }
326
327 void start()
328 {
329 // Ownership of buffer must be valid until Handler is called.
330
331 fTriggerEnabled=false;
332 fCommandSocket=true;
333
334 fHeader.fStartDelimiter = FAD::kDelimiterStart;
335 fHeader.fVersion = 0x104;
336 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
337 fHeader.fRunNumber = 1;
338 fHeader.fDNA = reinterpret_cast<uint64_t>(this);
339 fHeader.fStatus = 0xf<<12 |
340 FAD::EventHeader::kDenable |
341 FAD::EventHeader::kDwrite |
342 FAD::EventHeader::kDcmLocked |
343 FAD::EventHeader::kDcmReady |
344 FAD::EventHeader::kSpiSclk;
345
346
347 fStartTime = Time(Time::utc).UnixTime();
348
349 for (int i=0; i<kNumChannels; i++)
350 {
351 fChHeader[i].fId = (i%9) | ((i/9)<<4);
352 fChHeader[i].fRegionOfInterest = 0;
353 }
354
355 // Emit something to be written to the socket
356 fBufCommand.resize(1);
357 AsyncRead(ba::buffer(fBufCommand));
358
359// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
360
361// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
362// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
363
364 }
365
366 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
367
368 ~tcp_connection()
369 {
370 fSockets.clear();
371 }
372
373 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
374 {
375 cout << this << " Added one socket[" << fBoardId << "] " << socket->remote_endpoint().address().to_v4().to_string();
376 cout << ":"<< port << endl;
377 fSockets.push_back(socket);
378 }
379};
380
381
382class tcp_server
383{
384 tcp::acceptor acc0;
385 tcp::acceptor acc1;
386 tcp::acceptor acc2;
387 tcp::acceptor acc3;
388 tcp::acceptor acc4;
389 tcp::acceptor acc5;
390 tcp::acceptor acc6;
391 tcp::acceptor acc7;
392
393 int fBoardId;
394
395public:
396 tcp_server(ba::io_service& ioservice, int port, int board) :
397 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
398 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
399 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
400 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
401 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
402 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
403 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
404 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
405 fBoardId(board)
406 {
407 // We could start listening for more than one connection
408 // here, but since there is only one handler executed each time
409 // it would not make sense. Before one handle_accept is not
410 // finished no new handle_accept will be called.
411 // Workround: Start a new thread in handle_accept
412 start_accept();
413 }
414
415private:
416 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
417 {
418 boost::shared_ptr<ba::ip::tcp::socket> connection =
419 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
420
421 acc.async_accept(*connection,
422 boost::bind(&tcp_connection::handle_accept,
423 dest, connection,
424 acc.local_endpoint().port(),
425 ba::placeholders::error));
426 }
427
428 void start_accept()
429 {
430 cout << "Start accept[" << fBoardId << "] " << acc0.local_endpoint().port() << "..." << flush;
431 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
432
433 cout << new_connection.get() << " ";
434
435 // This will accept a connection without blocking
436 acc0.async_accept(*new_connection,
437 boost::bind(&tcp_server::handle_accept,
438 this,
439 new_connection,
440 ba::placeholders::error));
441
442 start_accept(new_connection, acc1);
443 start_accept(new_connection, acc2);
444 start_accept(new_connection, acc3);
445 start_accept(new_connection, acc4);
446 start_accept(new_connection, acc5);
447 start_accept(new_connection, acc6);
448 start_accept(new_connection, acc7);
449
450 cout << "start-done." << endl;
451 }
452
453 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
454 {
455 // The connection has been accepted and is now ready to use
456
457 // not installing a new handler will stop run()
458 cout << new_connection.get() << " Handle accept[" << fBoardId << "]["<<new_connection->fBoardId<<"]..." << flush;
459 if (!error)
460 {
461 new_connection->start();
462
463 // The is now an open connection/server (tcp_connection)
464 // we immediatly schedule another connection
465 // This allowed two client-connection at the same time
466 start_accept();
467 }
468 cout << "handle-done." << endl;
469 }
470};
471
472int main(int argc, const char **argv)
473{
474 //try
475 {
476 ba::io_service io_service;
477
478 int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
479 int n = argc==3 ? lexical_cast<int>(argv[2]) : 1;
480
481 vector<shared_ptr<tcp_server>> servers;
482
483 for (int i=0; i<n; i++)
484 {
485 shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
486 servers.push_back(server);
487
488 port += 8;
489 }
490
491 // ba::add_service(io_service, &server);
492 // server.add_service(...);
493 //cout << "Run..." << flush;
494
495 // Calling run() from a single thread ensures no concurrent access
496 // of the handler which are called!!!
497 io_service.run();
498
499 //cout << "end." << endl;
500 }
501 /*catch (std::exception& e)
502 {
503 std::cerr << e.what() << std::endl;
504 }*/
505
506 return 0;
507}
508/* ====================== Buffers ===========================
509
510char d1[128]; ba::buffer(d1));
511std::vector<char> d2(128); ba::buffer(d2);
512boost::array<char, 128> d3; by::buffer(d3);
513
514// --------------------------------
515char d1[128];
516std::vector<char> d2(128);
517boost::array<char, 128> d3;
518
519boost::array<mutable_buffer, 3> bufs1 = {
520 ba::buffer(d1),
521 ba::buffer(d2),
522 ba::buffer(d3) };
523sock.read(bufs1);
524
525std::vector<const_buffer> bufs2;
526bufs2.push_back(boost::asio::buffer(d1));
527bufs2.push_back(boost::asio::buffer(d2));
528bufs2.push_back(boost::asio::buffer(d3));
529sock.write(bufs2);
530
531
532// ======================= Read functions =========================
533
534ba::async_read_until --> delimiter
535
536streambuf buf; // Ensure validity until handler!
537by::async_read(s, buf, ....);
538
539ba::async_read(s, ba:buffer(data, size), handler);
540 // Single buffer
541 boost::asio::async_read(s,
542 ba::buffer(data, size),
543 compl-func --> ba::transfer_at_least(32),
544 handler);
545
546 // Multiple buffers
547boost::asio::async_read(s, buffers,
548 compl-func --> boost::asio::transfer_all(),
549 handler);
550 */
551
552// ================= Others ===============================
553
554 /*
555 strand Provides serialised handler execution.
556 work Class to inform the io_service when it has work to do.
557
558
559io_service::
560dispatch Request the io_service to invoke the given handler.
561poll Run the io_service's event processing loop to execute ready
562 handlers.
563poll_one Run the io_service's event processing loop to execute one ready
564 handler.
565post Request the io_service to invoke the given handler and return
566 immediately.
567reset Reset the io_service in preparation for a subsequent run()
568 invocation.
569run Run the io_service's event processing loop.
570run_one Run the io_service's event processing loop to execute at most
571 one handler.
572stop Stop the io_service's event processing loop.
573wrap Create a new handler that automatically dispatches the wrapped
574 handler on the io_service.
575
576strand:: The io_service::strand class provides the ability to
577 post and dispatch handlers with the guarantee that none
578 of those handlers will execute concurrently.
579
580dispatch Request the strand to invoke the given handler.
581get_io_service Get the io_service associated with the strand.
582post Request the strand to invoke the given handler and return
583 immediately.
584wrap Create a new handler that automatically dispatches the
585 wrapped handler on the strand.
586
587work:: The work class is used to inform the io_service when
588 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
589get_io_service Get the io_service associated with the work.
590work Constructor notifies the io_service that work is starting.
591
592*/
593
594
Note: See TracBrowser for help on using the repository browser.