source: trunk/FACT++/src/fad.cc@ 10868

Last change on this file since 10868 was 10864, checked in by tbretz, 14 years ago
Don't catch exceptions in main; propagare continous number as board id.
File size: 17.5 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31 const int fBoardId;
32
33 double fStartTime;
34 uint32_t fRunNumber;
35
36 void AsyncRead(ba::mutable_buffers_1 buffers)
37 {
38 ba::async_read(*this, buffers,
39 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
40 dummy::error, dummy::bytes_transferred));
41 }
42
43 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
44 {
45 ba::async_write(*socket, buffers,
46 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
47 dummy::error, dummy::bytes_transferred));
48 }
49 void AsyncWait(ba::deadline_timer &timer, int seconds,
50 void (tcp_connection::*handler)(const bs::error_code&))// const
51 {
52 timer.expires_from_now(boost::posix_time::seconds(seconds));
53 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
54 }
55
56 // The constructor is prvate to force the obtained pointer to be shared
57 tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
58 fBoardId(boardid), fTriggerSendData(ioservice)
59 {
60 }
61
62 // Callback when writing was successfull or failed
63 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
64 {
65 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
66 }
67
68 vector<uint16_t> fBufCommand;
69 vector<uint16_t> fBuffer;
70
71 vector<uint16_t> fCommand;
72
73 FAD::EventHeader fHeader;
74 FAD::ChannelHeader fChHeader[kNumChannels];
75
76 ba::deadline_timer fTriggerSendData;
77
78 bool fTriggerEnabled;
79 bool fCommandSocket;
80
81 int fSocket;
82
83 void SendData()
84 {
85 if (!fTriggerEnabled)
86 return;
87
88 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
89 fHeader.fEventCounter++;
90 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
91
92
93 fBuffer.resize(0);
94
95 for (int i=0; i<kNumChannels; i++)
96 {
97 fChHeader[i].fStartCell = i*10;
98
99 const vector<uint16_t> buf = fChHeader[i].HtoN();
100
101 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
102 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
103
104 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
105 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
106 }
107
108 fBuffer.push_back(htons(FAD::kDelimiterEnd));
109
110 const vector<uint16_t> h = fHeader.HtoN();
111
112 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
113
114 if (fCommandSocket)
115 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
116 else
117 {
118 if (fSockets.size()==0)
119 return;
120
121 fSocket++;
122 fSocket %= fSockets.size();
123
124 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
125 }
126 }
127
128 void TriggerSendData(const boost::system::error_code &ec)
129 {
130 if (!is_open())
131 {
132 // For example: Here we could schedule a new accept if we
133 // would not want to allow two connections at the same time.
134 return;
135 }
136
137 if (ec==ba::error::basic_errors::operation_aborted)
138 return;
139
140 // Check whether the deadline has passed. We compare the deadline
141 // against the current time since a new asynchronous operation
142 // may have moved the deadline before this actor had a chance
143 // to run.
144 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
145 return;
146
147 // The deadline has passed.
148 SendData();
149
150 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
151 }
152
153 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
154 {
155 // Do not schedule a new read if the connection failed.
156 if (bytes_received==0)
157 {
158 // Close the connection
159 close();
160 return;
161 }
162
163 // No command received yet
164 if (fCommand.size()==0)
165 {
166 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
167 fBufCommand.begin(), ntohs);
168
169 switch (fBufCommand[0])
170 {
171 case kCmdDrsEnable:
172 case kCmdDrsEnable+0x100:
173 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
174 cout << "-> DrsEnable" << endl;
175 break;
176
177 case kCmdDwrite:
178 case kCmdDwrite+0x100:
179 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
180 cout << "-> Dwrite" << endl;
181 break;
182
183 case kCmdTriggerLine:
184 case kCmdTriggerLine+0x100:
185 cout << "-> Trigger line" << endl;
186 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
187 break;
188
189 case kCmdSclk:
190 case kCmdSclk+0x100:
191 cout << "-> Sclk" << endl;
192 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
193 break;
194
195 case kCmdSrclk:
196 case kCmdSrclk+0x100:
197 cout << "-> Drclk" << endl;
198 break;
199
200 case kCmdRun:
201 case kCmdRun+0x100:
202 fStartTime = Time(Time::utc).UnixTime();
203 cout << "-> Run" << endl;
204 break;
205
206 case kCmdSocket:
207 case kCmdSocket+0x100:
208 cout << "-> Socket" << endl;
209 fCommandSocket = fBufCommand[0]==kCmdSocket;
210 break;
211
212 case kCmdContTriggerOn:
213 case kCmdContTriggerOff:
214 if (fBufCommand[0]==kCmdContTriggerOn)
215 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
216 else
217 fTriggerSendData.cancel();
218 cout << "-> ContTrig" << endl;
219 break;
220
221 case kCmdResetTriggerId:
222 cout << "-> Reset" << endl;
223 fHeader.fEventCounter = 0;
224 break;
225
226 case kCmdSingleTrigger:
227 cout << "-> Trigger" << endl;
228 SendData();
229 break;
230
231 default:
232 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
233 {
234 fCommand.resize(2);
235 fCommand[0] = kCmdWriteRoi;
236 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
237 break;
238 }
239 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
240 {
241 fCommand.resize(2);
242 fCommand[0] = kCmdWriteDac;
243 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
244 break;
245 }
246
247 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
248 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
249 return;
250 }
251
252 fBufCommand.resize(1);
253 AsyncRead(ba::buffer(fBufCommand));
254 return;
255 }
256
257 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
258 fBufCommand.begin(), ntohs);
259
260 switch (fCommand[0])
261 {
262 case kCmdWriteRoi:
263 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
264 fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
265 break;
266
267 case kCmdWriteDac:
268 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
269 fHeader.fDac[fCommand[1]] = fBufCommand[0];
270 break;
271 }
272
273 fCommand.resize(0);
274
275 fBufCommand.resize(1);
276 AsyncRead(ba::buffer(fBufCommand));
277 }
278
279public:
280 typedef boost::shared_ptr<tcp_connection> shared_ptr;
281
282 static shared_ptr create(ba::io_service& io_service, int boardid)
283 {
284 return shared_ptr(new tcp_connection(io_service, boardid));
285 }
286
287 void start()
288 {
289 // Ownership of buffer must be valid until Handler is called.
290
291 fTriggerEnabled=false;
292 fCommandSocket=true;
293
294 fHeader.fStartDelimiter = FAD::kDelimiterStart;
295 fHeader.fVersion = 0x104;
296 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
297 fHeader.fStatus = 0xf<<12 |
298 FAD::EventHeader::kDenable |
299 FAD::EventHeader::kDwrite |
300 FAD::EventHeader::kDcmLocked |
301 FAD::EventHeader::kDcmReady |
302 FAD::EventHeader::kSpiSclk;
303
304 fStartTime = Time(Time::utc).UnixTime();
305
306 for (int i=0; i<kNumChannels; i++)
307 {
308 fChHeader[i].fId = (i%9) | ((i/9)<<4);
309 fChHeader[i].fRegionOfInterest = 0;
310 }
311
312 // Emit something to be written to the socket
313 fBufCommand.resize(1);
314 AsyncRead(ba::buffer(fBufCommand));
315
316// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
317
318// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
319// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
320
321 }
322
323 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
324
325 ~tcp_connection()
326 {
327 fSockets.clear();
328 }
329
330 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
331 {
332 cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
333 cout << ":"<< port << endl;
334 fSockets.push_back(socket);
335 }
336};
337
338
339class tcp_server
340{
341 tcp::acceptor acc0;
342 tcp::acceptor acc1;
343 tcp::acceptor acc2;
344 tcp::acceptor acc3;
345 tcp::acceptor acc4;
346 tcp::acceptor acc5;
347 tcp::acceptor acc6;
348 tcp::acceptor acc7;
349
350 int fBoardId;
351
352public:
353 tcp_server(ba::io_service& ioservice, int port, int board) :
354 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
355 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
356 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
357 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
358 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
359 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
360 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
361 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
362 fBoardId(board)
363 {
364 // We could start listening for more than one connection
365 // here, but since there is only one handler executed each time
366 // it would not make sense. Before one handle_accept is not
367 // finished no new handle_accept will be called.
368 // Workround: Start a new thread in handle_accept
369 start_accept();
370 }
371
372private:
373 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
374 {
375 boost::shared_ptr<ba::ip::tcp::socket> connection =
376 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
377 acc.async_accept(*connection,
378 boost::bind(&tcp_connection::handle_accept,
379 dest, connection,
380 acc.local_endpoint().port(),
381 ba::placeholders::error));
382 }
383
384 void start_accept()
385 {
386 cout << "Start accept " << acc0.local_endpoint().port() << "..." << flush;
387 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
388
389 // This will accept a connection without blocking
390 acc0.async_accept(*new_connection,
391 boost::bind(&tcp_server::handle_accept,
392 this,
393 new_connection,
394 ba::placeholders::error));
395
396 start_accept(new_connection, acc1);
397 start_accept(new_connection, acc2);
398 start_accept(new_connection, acc3);
399 start_accept(new_connection, acc4);
400 start_accept(new_connection, acc5);
401 start_accept(new_connection, acc6);
402 start_accept(new_connection, acc7);
403
404 cout << "start-done." << endl;
405 }
406
407 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
408 {
409 // The connection has been accepted and is now ready to use
410
411 // not installing a new handler will stop run()
412 cout << "Handle accept..." << flush;
413 if (!error)
414 {
415 new_connection->start();
416
417 // The is now an open connection/server (tcp_connection)
418 // we immediatly schedule another connection
419 // This allowed two client-connection at the same time
420 start_accept();
421 }
422 cout << "handle-done." << endl;
423 }
424};
425
426int main(int argc, const char **argv)
427{
428 //try
429 {
430 ba::io_service io_service;
431
432 int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
433 int n = argc==3 ? lexical_cast<int>(argv[2]) : 1;
434
435 vector<shared_ptr<tcp_server>> servers;
436
437 for (int i=0; i<n; i++)
438 {
439 shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
440 servers.push_back(server);
441
442 port += 8;
443 }
444
445 // ba::add_service(io_service, &server);
446 // server.add_service(...);
447 //cout << "Run..." << flush;
448
449 // Calling run() from a single thread ensures no concurrent access
450 // of the handler which are called!!!
451 io_service.run();
452
453 //cout << "end." << endl;
454 }
455 /*catch (std::exception& e)
456 {
457 std::cerr << e.what() << std::endl;
458 }*/
459
460 return 0;
461}
462/* ====================== Buffers ===========================
463
464char d1[128]; ba::buffer(d1));
465std::vector<char> d2(128); ba::buffer(d2);
466boost::array<char, 128> d3; by::buffer(d3);
467
468// --------------------------------
469char d1[128];
470std::vector<char> d2(128);
471boost::array<char, 128> d3;
472
473boost::array<mutable_buffer, 3> bufs1 = {
474 ba::buffer(d1),
475 ba::buffer(d2),
476 ba::buffer(d3) };
477sock.read(bufs1);
478
479std::vector<const_buffer> bufs2;
480bufs2.push_back(boost::asio::buffer(d1));
481bufs2.push_back(boost::asio::buffer(d2));
482bufs2.push_back(boost::asio::buffer(d3));
483sock.write(bufs2);
484
485
486// ======================= Read functions =========================
487
488ba::async_read_until --> delimiter
489
490streambuf buf; // Ensure validity until handler!
491by::async_read(s, buf, ....);
492
493ba::async_read(s, ba:buffer(data, size), handler);
494 // Single buffer
495 boost::asio::async_read(s,
496 ba::buffer(data, size),
497 compl-func --> ba::transfer_at_least(32),
498 handler);
499
500 // Multiple buffers
501boost::asio::async_read(s, buffers,
502 compl-func --> boost::asio::transfer_all(),
503 handler);
504 */
505
506// ================= Others ===============================
507
508 /*
509 strand Provides serialised handler execution.
510 work Class to inform the io_service when it has work to do.
511
512
513io_service::
514dispatch Request the io_service to invoke the given handler.
515poll Run the io_service's event processing loop to execute ready
516 handlers.
517poll_one Run the io_service's event processing loop to execute one ready
518 handler.
519post Request the io_service to invoke the given handler and return
520 immediately.
521reset Reset the io_service in preparation for a subsequent run()
522 invocation.
523run Run the io_service's event processing loop.
524run_one Run the io_service's event processing loop to execute at most
525 one handler.
526stop Stop the io_service's event processing loop.
527wrap Create a new handler that automatically dispatches the wrapped
528 handler on the io_service.
529
530strand:: The io_service::strand class provides the ability to
531 post and dispatch handlers with the guarantee that none
532 of those handlers will execute concurrently.
533
534dispatch Request the strand to invoke the given handler.
535get_io_service Get the io_service associated with the strand.
536post Request the strand to invoke the given handler and return
537 immediately.
538wrap Create a new handler that automatically dispatches the
539 wrapped handler on the strand.
540
541work:: The work class is used to inform the io_service when
542 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
543get_io_service Get the io_service associated with the work.
544work Constructor notifies the io_service that work is starting.
545
546*/
547
548
Note: See TracBrowser for help on using the repository browser.