source: trunk/FACT++/src/fad.cc@ 10899

Last change on this file since 10899 was 10876, checked in by tbretz, 14 years ago
Implemented command to set run number.
File size: 17.9 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31 const int fBoardId;
32
33 double fStartTime;
34 uint32_t fRunNumber;
35
36 void AsyncRead(ba::mutable_buffers_1 buffers)
37 {
38 ba::async_read(*this, buffers,
39 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
40 dummy::error, dummy::bytes_transferred));
41 }
42
43 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
44 {
45 ba::async_write(*socket, buffers,
46 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
47 dummy::error, dummy::bytes_transferred));
48 }
49 void AsyncWait(ba::deadline_timer &timer, int seconds,
50 void (tcp_connection::*handler)(const bs::error_code&))// const
51 {
52 timer.expires_from_now(boost::posix_time::seconds(seconds));
53 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
54 }
55
56 // The constructor is prvate to force the obtained pointer to be shared
57 tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
58 fBoardId(boardid), fTriggerSendData(ioservice)
59 {
60 }
61
62 // Callback when writing was successfull or failed
63 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
64 {
65 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
66 }
67
68 vector<uint16_t> fBufCommand;
69 vector<uint16_t> fBuffer;
70
71 vector<uint16_t> fCommand;
72
73 FAD::EventHeader fHeader;
74 FAD::ChannelHeader fChHeader[kNumChannels];
75
76 ba::deadline_timer fTriggerSendData;
77
78 bool fTriggerEnabled;
79 bool fCommandSocket;
80
81 int fSocket;
82
83 void SendData()
84 {
85 if (!fTriggerEnabled)
86 return;
87
88 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
89 fHeader.fEventCounter++;
90 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
91 fHeader.fRunNumber = fRunNumber;
92
93
94 fBuffer.resize(0);
95
96 for (int i=0; i<kNumChannels; i++)
97 {
98 fChHeader[i].fStartCell = i*10;
99
100 const vector<uint16_t> buf = fChHeader[i].HtoN();
101
102 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
103 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
104
105 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
106 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
107 }
108
109 fBuffer.push_back(htons(FAD::kDelimiterEnd));
110
111 const vector<uint16_t> h = fHeader.HtoN();
112
113 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
114
115 if (fCommandSocket)
116 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
117 else
118 {
119 if (fSockets.size()==0)
120 return;
121
122 fSocket++;
123 fSocket %= fSockets.size();
124
125 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
126 }
127 }
128
129 void TriggerSendData(const boost::system::error_code &ec)
130 {
131 if (!is_open())
132 {
133 // For example: Here we could schedule a new accept if we
134 // would not want to allow two connections at the same time.
135 return;
136 }
137
138 if (ec==ba::error::basic_errors::operation_aborted)
139 return;
140
141 // Check whether the deadline has passed. We compare the deadline
142 // against the current time since a new asynchronous operation
143 // may have moved the deadline before this actor had a chance
144 // to run.
145 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
146 return;
147
148 // The deadline has passed.
149 SendData();
150
151 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
152 }
153
154 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
155 {
156 // Do not schedule a new read if the connection failed.
157 if (bytes_received==0)
158 {
159 // Close the connection
160 close();
161 return;
162 }
163
164 // No command received yet
165 if (fCommand.size()==0)
166 {
167 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
168 fBufCommand.begin(), ntohs);
169
170 switch (fBufCommand[0])
171 {
172 case kCmdDrsEnable:
173 case kCmdDrsEnable+0x100:
174 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
175 cout << "-> DrsEnable" << endl;
176 break;
177
178 case kCmdDwrite:
179 case kCmdDwrite+0x100:
180 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
181 cout << "-> Dwrite" << endl;
182 break;
183
184 case kCmdTriggerLine:
185 case kCmdTriggerLine+0x100:
186 cout << "-> Trigger line" << endl;
187 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
188 break;
189
190 case kCmdSclk:
191 case kCmdSclk+0x100:
192 cout << "-> Sclk" << endl;
193 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
194 break;
195
196 case kCmdSrclk:
197 case kCmdSrclk+0x100:
198 cout << "-> Drclk" << endl;
199 break;
200
201 case kCmdRun:
202 case kCmdRun+0x100:
203 fStartTime = Time(Time::utc).UnixTime();
204 cout << "-> Run" << endl;
205 break;
206
207 case kCmdSocket:
208 case kCmdSocket+0x100:
209 cout << "-> Socket" << endl;
210 fCommandSocket = fBufCommand[0]==kCmdSocket;
211 break;
212
213 case kCmdContTriggerOn:
214 case kCmdContTriggerOff:
215 if (fBufCommand[0]==kCmdContTriggerOn)
216 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
217 else
218 fTriggerSendData.cancel();
219 cout << "-> ContTrig" << endl;
220 break;
221
222 case kCmdResetTriggerId:
223 cout << "-> Reset" << endl;
224 fHeader.fEventCounter = 0;
225 break;
226
227 case kCmdSingleTrigger:
228 cout << "-> Trigger" << endl;
229 SendData();
230 break;
231
232 case kCmdWriteRunNumberMSW:
233 fRunNumber &= 0xffff;
234 fRunNumber |= fBufCommand[0]<<16;
235 cout << "-> RunNumber MSW" << endl;
236 break;
237
238 case kCmdWriteRunNumberLSW:
239 fRunNumber &= 0xffff0000;
240 fRunNumber |= fBufCommand[0];
241 cout << "-> RunNumber LSW" << endl;
242 break;
243
244 default:
245 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
246 {
247 fCommand.resize(2);
248 fCommand[0] = kCmdWriteRoi;
249 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
250 break;
251 }
252 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
253 {
254 fCommand.resize(2);
255 fCommand[0] = kCmdWriteDac;
256 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
257 break;
258 }
259
260 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
261 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
262 return;
263 }
264
265 fBufCommand.resize(1);
266 AsyncRead(ba::buffer(fBufCommand));
267 return;
268 }
269
270 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
271 fBufCommand.begin(), ntohs);
272
273 switch (fCommand[0])
274 {
275 case kCmdWriteRoi:
276 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
277 fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
278 break;
279
280 case kCmdWriteDac:
281 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
282 fHeader.fDac[fCommand[1]] = fBufCommand[0];
283 break;
284 }
285
286 fCommand.resize(0);
287
288 fBufCommand.resize(1);
289 AsyncRead(ba::buffer(fBufCommand));
290 }
291
292public:
293 typedef boost::shared_ptr<tcp_connection> shared_ptr;
294
295 static shared_ptr create(ba::io_service& io_service, int boardid)
296 {
297 return shared_ptr(new tcp_connection(io_service, boardid));
298 }
299
300 void start()
301 {
302 // Ownership of buffer must be valid until Handler is called.
303
304 fTriggerEnabled=false;
305 fCommandSocket=true;
306
307 fHeader.fStartDelimiter = FAD::kDelimiterStart;
308 fHeader.fVersion = 0x104;
309 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
310 fHeader.fStatus = 0xf<<12 |
311 FAD::EventHeader::kDenable |
312 FAD::EventHeader::kDwrite |
313 FAD::EventHeader::kDcmLocked |
314 FAD::EventHeader::kDcmReady |
315 FAD::EventHeader::kSpiSclk;
316
317 fStartTime = Time(Time::utc).UnixTime();
318
319 for (int i=0; i<kNumChannels; i++)
320 {
321 fChHeader[i].fId = (i%9) | ((i/9)<<4);
322 fChHeader[i].fRegionOfInterest = 0;
323 }
324
325 // Emit something to be written to the socket
326 fBufCommand.resize(1);
327 AsyncRead(ba::buffer(fBufCommand));
328
329// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
330
331// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
332// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
333
334 }
335
336 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
337
338 ~tcp_connection()
339 {
340 fSockets.clear();
341 }
342
343 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
344 {
345 cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
346 cout << ":"<< port << endl;
347 fSockets.push_back(socket);
348 }
349};
350
351
352class tcp_server
353{
354 tcp::acceptor acc0;
355 tcp::acceptor acc1;
356 tcp::acceptor acc2;
357 tcp::acceptor acc3;
358 tcp::acceptor acc4;
359 tcp::acceptor acc5;
360 tcp::acceptor acc6;
361 tcp::acceptor acc7;
362
363 int fBoardId;
364
365public:
366 tcp_server(ba::io_service& ioservice, int port, int board) :
367 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
368 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
369 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
370 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
371 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
372 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
373 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
374 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
375 fBoardId(board)
376 {
377 // We could start listening for more than one connection
378 // here, but since there is only one handler executed each time
379 // it would not make sense. Before one handle_accept is not
380 // finished no new handle_accept will be called.
381 // Workround: Start a new thread in handle_accept
382 start_accept();
383 }
384
385private:
386 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
387 {
388 boost::shared_ptr<ba::ip::tcp::socket> connection =
389 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
390 acc.async_accept(*connection,
391 boost::bind(&tcp_connection::handle_accept,
392 dest, connection,
393 acc.local_endpoint().port(),
394 ba::placeholders::error));
395 }
396
397 void start_accept()
398 {
399 cout << "Start accept " << acc0.local_endpoint().port() << "..." << flush;
400 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
401
402 // This will accept a connection without blocking
403 acc0.async_accept(*new_connection,
404 boost::bind(&tcp_server::handle_accept,
405 this,
406 new_connection,
407 ba::placeholders::error));
408
409 start_accept(new_connection, acc1);
410 start_accept(new_connection, acc2);
411 start_accept(new_connection, acc3);
412 start_accept(new_connection, acc4);
413 start_accept(new_connection, acc5);
414 start_accept(new_connection, acc6);
415 start_accept(new_connection, acc7);
416
417 cout << "start-done." << endl;
418 }
419
420 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
421 {
422 // The connection has been accepted and is now ready to use
423
424 // not installing a new handler will stop run()
425 cout << "Handle accept..." << flush;
426 if (!error)
427 {
428 new_connection->start();
429
430 // The is now an open connection/server (tcp_connection)
431 // we immediatly schedule another connection
432 // This allowed two client-connection at the same time
433 start_accept();
434 }
435 cout << "handle-done." << endl;
436 }
437};
438
439int main(int argc, const char **argv)
440{
441 //try
442 {
443 ba::io_service io_service;
444
445 int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
446 int n = argc==3 ? lexical_cast<int>(argv[2]) : 1;
447
448 vector<shared_ptr<tcp_server>> servers;
449
450 for (int i=0; i<n; i++)
451 {
452 shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
453 servers.push_back(server);
454
455 port += 8;
456 }
457
458 // ba::add_service(io_service, &server);
459 // server.add_service(...);
460 //cout << "Run..." << flush;
461
462 // Calling run() from a single thread ensures no concurrent access
463 // of the handler which are called!!!
464 io_service.run();
465
466 //cout << "end." << endl;
467 }
468 /*catch (std::exception& e)
469 {
470 std::cerr << e.what() << std::endl;
471 }*/
472
473 return 0;
474}
475/* ====================== Buffers ===========================
476
477char d1[128]; ba::buffer(d1));
478std::vector<char> d2(128); ba::buffer(d2);
479boost::array<char, 128> d3; by::buffer(d3);
480
481// --------------------------------
482char d1[128];
483std::vector<char> d2(128);
484boost::array<char, 128> d3;
485
486boost::array<mutable_buffer, 3> bufs1 = {
487 ba::buffer(d1),
488 ba::buffer(d2),
489 ba::buffer(d3) };
490sock.read(bufs1);
491
492std::vector<const_buffer> bufs2;
493bufs2.push_back(boost::asio::buffer(d1));
494bufs2.push_back(boost::asio::buffer(d2));
495bufs2.push_back(boost::asio::buffer(d3));
496sock.write(bufs2);
497
498
499// ======================= Read functions =========================
500
501ba::async_read_until --> delimiter
502
503streambuf buf; // Ensure validity until handler!
504by::async_read(s, buf, ....);
505
506ba::async_read(s, ba:buffer(data, size), handler);
507 // Single buffer
508 boost::asio::async_read(s,
509 ba::buffer(data, size),
510 compl-func --> ba::transfer_at_least(32),
511 handler);
512
513 // Multiple buffers
514boost::asio::async_read(s, buffers,
515 compl-func --> boost::asio::transfer_all(),
516 handler);
517 */
518
519// ================= Others ===============================
520
521 /*
522 strand Provides serialised handler execution.
523 work Class to inform the io_service when it has work to do.
524
525
526io_service::
527dispatch Request the io_service to invoke the given handler.
528poll Run the io_service's event processing loop to execute ready
529 handlers.
530poll_one Run the io_service's event processing loop to execute one ready
531 handler.
532post Request the io_service to invoke the given handler and return
533 immediately.
534reset Reset the io_service in preparation for a subsequent run()
535 invocation.
536run Run the io_service's event processing loop.
537run_one Run the io_service's event processing loop to execute at most
538 one handler.
539stop Stop the io_service's event processing loop.
540wrap Create a new handler that automatically dispatches the wrapped
541 handler on the io_service.
542
543strand:: The io_service::strand class provides the ability to
544 post and dispatch handlers with the guarantee that none
545 of those handlers will execute concurrently.
546
547dispatch Request the strand to invoke the given handler.
548get_io_service Get the io_service associated with the strand.
549post Request the strand to invoke the given handler and return
550 immediately.
551wrap Create a new handler that automatically dispatches the
552 wrapped handler on the strand.
553
554work:: The work class is used to inform the io_service when
555 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
556get_io_service Get the io_service associated with the work.
557work Constructor notifies the io_service that work is starting.
558
559*/
560
561
Note: See TracBrowser for help on using the repository browser.