source: trunk/FACT++/src/fad.cc@ 10939

Last change on this file since 10939 was 10926, checked in by tbretz, 13 years ago
Fixed setting run number.
File size: 18.1 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31 const int fBoardId;
32
33 double fStartTime;
34 uint32_t fRunNumber;
35
36 void AsyncRead(ba::mutable_buffers_1 buffers)
37 {
38 ba::async_read(*this, buffers,
39 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
40 dummy::error, dummy::bytes_transferred));
41 }
42
43 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
44 {
45 ba::async_write(*socket, buffers,
46 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
47 dummy::error, dummy::bytes_transferred));
48 }
49 void AsyncWait(ba::deadline_timer &timer, int seconds,
50 void (tcp_connection::*handler)(const bs::error_code&))// const
51 {
52 timer.expires_from_now(boost::posix_time::seconds(seconds));
53 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
54 }
55
56 // The constructor is prvate to force the obtained pointer to be shared
57 tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
58 fBoardId(boardid), fTriggerSendData(ioservice)
59 {
60 }
61
62 // Callback when writing was successfull or failed
63 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
64 {
65 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
66 }
67
68 vector<uint16_t> fBufCommand;
69 vector<uint16_t> fBuffer;
70
71 vector<uint16_t> fCommand;
72
73 FAD::EventHeader fHeader;
74 FAD::ChannelHeader fChHeader[kNumChannels];
75
76 ba::deadline_timer fTriggerSendData;
77
78 bool fTriggerEnabled;
79 bool fCommandSocket;
80
81 int fSocket;
82
83 void SendData()
84 {
85 if (!fTriggerEnabled)
86 return;
87
88 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
89 fHeader.fEventCounter++;
90 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
91 fHeader.fRunNumber = fRunNumber;
92
93
94 fBuffer.resize(0);
95
96 for (int i=0; i<kNumChannels; i++)
97 {
98 fChHeader[i].fStartCell = i*10;
99
100 const vector<uint16_t> buf = fChHeader[i].HtoN();
101
102 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
103 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
104
105 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
106 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
107 }
108
109 fBuffer.push_back(htons(FAD::kDelimiterEnd));
110
111 const vector<uint16_t> h = fHeader.HtoN();
112
113 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
114
115 if (fCommandSocket)
116 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
117 else
118 {
119 if (fSockets.size()==0)
120 return;
121
122 fSocket++;
123 fSocket %= fSockets.size();
124
125 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
126 }
127 }
128
129 void TriggerSendData(const boost::system::error_code &ec)
130 {
131 if (!is_open())
132 {
133 // For example: Here we could schedule a new accept if we
134 // would not want to allow two connections at the same time.
135 return;
136 }
137
138 if (ec==ba::error::basic_errors::operation_aborted)
139 return;
140
141 // Check whether the deadline has passed. We compare the deadline
142 // against the current time since a new asynchronous operation
143 // may have moved the deadline before this actor had a chance
144 // to run.
145 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
146 return;
147
148 // The deadline has passed.
149 SendData();
150
151 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
152 }
153
154 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
155 {
156 // Do not schedule a new read if the connection failed.
157 if (bytes_received==0)
158 {
159 // Close the connection
160 close();
161 return;
162 }
163
164 // No command received yet
165 if (fCommand.size()==0)
166 {
167 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
168 fBufCommand.begin(), ntohs);
169
170 switch (fBufCommand[0])
171 {
172 case kCmdDrsEnable:
173 case kCmdDrsEnable+0x100:
174 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
175 cout << "-> DrsEnable" << endl;
176 break;
177
178 case kCmdDwrite:
179 case kCmdDwrite+0x100:
180 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
181 cout << "-> Dwrite" << endl;
182 break;
183
184 case kCmdTriggerLine:
185 case kCmdTriggerLine+0x100:
186 cout << "-> Trigger line" << endl;
187 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
188 break;
189
190 case kCmdSclk:
191 case kCmdSclk+0x100:
192 cout << "-> Sclk" << endl;
193 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
194 break;
195
196 case kCmdSrclk:
197 case kCmdSrclk+0x100:
198 cout << "-> Drclk" << endl;
199 break;
200
201 case kCmdRun:
202 case kCmdRun+0x100:
203 fStartTime = Time(Time::utc).UnixTime();
204 cout << "-> Run" << endl;
205 break;
206
207 case kCmdSocket:
208 case kCmdSocket+0x100:
209 cout << "-> Socket" << endl;
210 fCommandSocket = fBufCommand[0]==kCmdSocket;
211 break;
212
213 case kCmdContTriggerOn:
214 case kCmdContTriggerOff:
215 if (fBufCommand[0]==kCmdContTriggerOn)
216 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
217 else
218 fTriggerSendData.cancel();
219 cout << "-> ContTrig" << endl;
220 break;
221
222 case kCmdResetTriggerId:
223 cout << "-> Reset" << endl;
224 fHeader.fEventCounter = 0;
225 break;
226
227 case kCmdSingleTrigger:
228 cout << "-> Trigger" << endl;
229 SendData();
230 break;
231
232 case kCmdWriteRunNumberMSW:
233 fCommand = fBufCommand;
234 break;
235
236 case kCmdWriteRunNumberLSW:
237 fCommand = fBufCommand;
238 break;
239
240 default:
241 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
242 {
243 fCommand.resize(2);
244 fCommand[0] = kCmdWriteRoi;
245 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
246 break;
247 }
248 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
249 {
250 fCommand.resize(2);
251 fCommand[0] = kCmdWriteDac;
252 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
253 break;
254 }
255
256 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
257 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
258 return;
259 }
260
261 fBufCommand.resize(1);
262 AsyncRead(ba::buffer(fBufCommand));
263 return;
264 }
265
266 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
267 fBufCommand.begin(), ntohs);
268
269 switch (fCommand[0])
270 {
271 case kCmdWriteRunNumberMSW:
272 fRunNumber &= 0xffff;
273 fRunNumber |= fBufCommand[0]<<16;
274 cout << "-> Set RunNumber MSW" << endl;
275 break;
276 case kCmdWriteRunNumberLSW:
277 fRunNumber &= 0xffff0000;
278 fRunNumber |= fBufCommand[0];
279 cout << "-> Set RunNumber LSW" << endl;
280 break;
281 case kCmdWriteRoi:
282 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
283 fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
284 break;
285
286 case kCmdWriteDac:
287 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
288 fHeader.fDac[fCommand[1]] = fBufCommand[0];
289 break;
290 }
291
292 fCommand.resize(0);
293
294 fBufCommand.resize(1);
295 AsyncRead(ba::buffer(fBufCommand));
296 }
297
298public:
299 typedef boost::shared_ptr<tcp_connection> shared_ptr;
300
301 static shared_ptr create(ba::io_service& io_service, int boardid)
302 {
303 return shared_ptr(new tcp_connection(io_service, boardid));
304 }
305
306 void start()
307 {
308 // Ownership of buffer must be valid until Handler is called.
309
310 fTriggerEnabled=false;
311 fCommandSocket=true;
312 fRunNumber = 1;
313
314 fHeader.fStartDelimiter = FAD::kDelimiterStart;
315 fHeader.fVersion = 0x104;
316 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
317 fHeader.fStatus = 0xf<<12 |
318 FAD::EventHeader::kDenable |
319 FAD::EventHeader::kDwrite |
320 FAD::EventHeader::kDcmLocked |
321 FAD::EventHeader::kDcmReady |
322 FAD::EventHeader::kSpiSclk;
323
324 fStartTime = Time(Time::utc).UnixTime();
325
326 for (int i=0; i<kNumChannels; i++)
327 {
328 fChHeader[i].fId = (i%9) | ((i/9)<<4);
329 fChHeader[i].fRegionOfInterest = 0;
330 }
331
332 // Emit something to be written to the socket
333 fBufCommand.resize(1);
334 AsyncRead(ba::buffer(fBufCommand));
335
336// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
337
338// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
339// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
340
341 }
342
343 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
344
345 ~tcp_connection()
346 {
347 fSockets.clear();
348 }
349
350 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
351 {
352 cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
353 cout << ":"<< port << endl;
354 fSockets.push_back(socket);
355 }
356};
357
358
359class tcp_server
360{
361 tcp::acceptor acc0;
362 tcp::acceptor acc1;
363 tcp::acceptor acc2;
364 tcp::acceptor acc3;
365 tcp::acceptor acc4;
366 tcp::acceptor acc5;
367 tcp::acceptor acc6;
368 tcp::acceptor acc7;
369
370 int fBoardId;
371
372public:
373 tcp_server(ba::io_service& ioservice, int port, int board) :
374 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
375 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
376 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
377 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
378 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
379 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
380 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
381 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
382 fBoardId(board)
383 {
384 // We could start listening for more than one connection
385 // here, but since there is only one handler executed each time
386 // it would not make sense. Before one handle_accept is not
387 // finished no new handle_accept will be called.
388 // Workround: Start a new thread in handle_accept
389 start_accept();
390 }
391
392private:
393 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
394 {
395 boost::shared_ptr<ba::ip::tcp::socket> connection =
396 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
397 acc.async_accept(*connection,
398 boost::bind(&tcp_connection::handle_accept,
399 dest, connection,
400 acc.local_endpoint().port(),
401 ba::placeholders::error));
402 }
403
404 void start_accept()
405 {
406 cout << "Start accept " << acc0.local_endpoint().port() << "..." << flush;
407 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
408
409 // This will accept a connection without blocking
410 acc0.async_accept(*new_connection,
411 boost::bind(&tcp_server::handle_accept,
412 this,
413 new_connection,
414 ba::placeholders::error));
415
416 start_accept(new_connection, acc1);
417 start_accept(new_connection, acc2);
418 start_accept(new_connection, acc3);
419 start_accept(new_connection, acc4);
420 start_accept(new_connection, acc5);
421 start_accept(new_connection, acc6);
422 start_accept(new_connection, acc7);
423
424 cout << "start-done." << endl;
425 }
426
427 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
428 {
429 // The connection has been accepted and is now ready to use
430
431 // not installing a new handler will stop run()
432 cout << "Handle accept..." << flush;
433 if (!error)
434 {
435 new_connection->start();
436
437 // The is now an open connection/server (tcp_connection)
438 // we immediatly schedule another connection
439 // This allowed two client-connection at the same time
440 start_accept();
441 }
442 cout << "handle-done." << endl;
443 }
444};
445
446int main(int argc, const char **argv)
447{
448 //try
449 {
450 ba::io_service io_service;
451
452 int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
453 int n = argc==3 ? lexical_cast<int>(argv[2]) : 1;
454
455 vector<shared_ptr<tcp_server>> servers;
456
457 for (int i=0; i<n; i++)
458 {
459 shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
460 servers.push_back(server);
461
462 port += 8;
463 }
464
465 // ba::add_service(io_service, &server);
466 // server.add_service(...);
467 //cout << "Run..." << flush;
468
469 // Calling run() from a single thread ensures no concurrent access
470 // of the handler which are called!!!
471 io_service.run();
472
473 //cout << "end." << endl;
474 }
475 /*catch (std::exception& e)
476 {
477 std::cerr << e.what() << std::endl;
478 }*/
479
480 return 0;
481}
482/* ====================== Buffers ===========================
483
484char d1[128]; ba::buffer(d1));
485std::vector<char> d2(128); ba::buffer(d2);
486boost::array<char, 128> d3; by::buffer(d3);
487
488// --------------------------------
489char d1[128];
490std::vector<char> d2(128);
491boost::array<char, 128> d3;
492
493boost::array<mutable_buffer, 3> bufs1 = {
494 ba::buffer(d1),
495 ba::buffer(d2),
496 ba::buffer(d3) };
497sock.read(bufs1);
498
499std::vector<const_buffer> bufs2;
500bufs2.push_back(boost::asio::buffer(d1));
501bufs2.push_back(boost::asio::buffer(d2));
502bufs2.push_back(boost::asio::buffer(d3));
503sock.write(bufs2);
504
505
506// ======================= Read functions =========================
507
508ba::async_read_until --> delimiter
509
510streambuf buf; // Ensure validity until handler!
511by::async_read(s, buf, ....);
512
513ba::async_read(s, ba:buffer(data, size), handler);
514 // Single buffer
515 boost::asio::async_read(s,
516 ba::buffer(data, size),
517 compl-func --> ba::transfer_at_least(32),
518 handler);
519
520 // Multiple buffers
521boost::asio::async_read(s, buffers,
522 compl-func --> boost::asio::transfer_all(),
523 handler);
524 */
525
526// ================= Others ===============================
527
528 /*
529 strand Provides serialised handler execution.
530 work Class to inform the io_service when it has work to do.
531
532
533io_service::
534dispatch Request the io_service to invoke the given handler.
535poll Run the io_service's event processing loop to execute ready
536 handlers.
537poll_one Run the io_service's event processing loop to execute one ready
538 handler.
539post Request the io_service to invoke the given handler and return
540 immediately.
541reset Reset the io_service in preparation for a subsequent run()
542 invocation.
543run Run the io_service's event processing loop.
544run_one Run the io_service's event processing loop to execute at most
545 one handler.
546stop Stop the io_service's event processing loop.
547wrap Create a new handler that automatically dispatches the wrapped
548 handler on the io_service.
549
550strand:: The io_service::strand class provides the ability to
551 post and dispatch handlers with the guarantee that none
552 of those handlers will execute concurrently.
553
554dispatch Request the strand to invoke the given handler.
555get_io_service Get the io_service associated with the strand.
556post Request the strand to invoke the given handler and return
557 immediately.
558wrap Create a new handler that automatically dispatches the
559 wrapped handler on the strand.
560
561work:: The work class is used to inform the io_service when
562 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
563get_io_service Get the io_service associated with the work.
564work Constructor notifies the io_service that work is starting.
565
566*/
567
568
Note: See TracBrowser for help on using the repository browser.