source: trunk/FACT++/src/fad.cc@ 10853

Last change on this file since 10853 was 10853, checked in by tbretz, 14 years ago
Fixed a floating point exception if no connection is open.
File size: 17.0 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31
32 double fStartTime;
33 uint32_t fRunNumber;
34
35 void AsyncRead(ba::mutable_buffers_1 buffers)
36 {
37 ba::async_read(*this, buffers,
38 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39 dummy::error, dummy::bytes_transferred));
40 }
41
42 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43 {
44 ba::async_write(*socket, buffers,
45 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46 dummy::error, dummy::bytes_transferred));
47 }
48 void AsyncWait(ba::deadline_timer &timer, int seconds,
49 void (tcp_connection::*handler)(const bs::error_code&))// const
50 {
51 timer.expires_from_now(boost::posix_time::seconds(seconds));
52 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53 }
54
55 // The constructor is prvate to force the obtained pointer to be shared
56 tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
57 fTriggerSendData(ioservice)
58 {
59 }
60
61 // Callback when writing was successfull or failed
62 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63 {
64 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65 }
66
67 vector<uint16_t> fBufCommand;
68 vector<uint16_t> fBuffer;
69
70 vector<uint16_t> fCommand;
71
72 FAD::EventHeader fHeader;
73 FAD::ChannelHeader fChHeader[kNumChannels];
74
75 ba::deadline_timer fTriggerSendData;
76
77 bool fTriggerEnabled;
78 bool fCommandSocket;
79
80 int fSocket;
81
82 void SendData()
83 {
84 if (!fTriggerEnabled)
85 return;
86
87 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
88 fHeader.fEventCounter++;
89 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
90
91
92 fBuffer.resize(0);
93
94 for (int i=0; i<kNumChannels; i++)
95 {
96 fChHeader[i].fStartCell = i*10;
97
98 const vector<uint16_t> buf = fChHeader[i].HtoN();
99
100 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
101 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
102
103 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
104 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
105 }
106
107 fBuffer.push_back(htons(FAD::kDelimiterEnd));
108
109 const vector<uint16_t> h = fHeader.HtoN();
110
111 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
112
113 if (fCommandSocket)
114 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
115 else
116 {
117 if (fSockets.size()==0)
118 return;
119
120 fSocket++;
121 fSocket %= fSockets.size();
122
123 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
124 }
125 }
126
127 void TriggerSendData(const boost::system::error_code &ec)
128 {
129 if (!is_open())
130 {
131 // For example: Here we could schedule a new accept if we
132 // would not want to allow two connections at the same time.
133 return;
134 }
135
136 if (ec==ba::error::basic_errors::operation_aborted)
137 return;
138
139 // Check whether the deadline has passed. We compare the deadline
140 // against the current time since a new asynchronous operation
141 // may have moved the deadline before this actor had a chance
142 // to run.
143 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
144 return;
145
146 // The deadline has passed.
147 SendData();
148
149 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
150 }
151
152 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
153 {
154 // Do not schedule a new read if the connection failed.
155 if (bytes_received==0)
156 {
157 // Close the connection
158 close();
159 return;
160 }
161
162 // No command received yet
163 if (fCommand.size()==0)
164 {
165 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
166 fBufCommand.begin(), ntohs);
167
168 switch (fBufCommand[0])
169 {
170 case kCmdDrsEnable:
171 case kCmdDrsEnable+0x100:
172 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
173 cout << "-> DrsEnable" << endl;
174 break;
175
176 case kCmdDwrite:
177 case kCmdDwrite+0x100:
178 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
179 cout << "-> Dwrite" << endl;
180 break;
181
182 case kCmdTriggerLine:
183 case kCmdTriggerLine+0x100:
184 cout << "-> Trigger line" << endl;
185 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
186 break;
187
188 case kCmdSclk:
189 case kCmdSclk+0x100:
190 cout << "-> Sclk" << endl;
191 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
192 break;
193
194 case kCmdSrclk:
195 case kCmdSrclk+0x100:
196 cout << "-> Drclk" << endl;
197 break;
198
199 case kCmdRun:
200 case kCmdRun+0x100:
201 cout << "-> Run" << endl;
202 break;
203
204 case kCmdSocket:
205 case kCmdSocket+0x100:
206 cout << "-> Socket" << endl;
207 fCommandSocket = fBufCommand[0]==kCmdSocket;
208 break;
209
210 case kCmdContTriggerOn:
211 case kCmdContTriggerOff:
212 if (fBufCommand[0]==kCmdContTriggerOn)
213 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
214 else
215 fTriggerSendData.cancel();
216 cout << "-> ContTrig" << endl;
217 break;
218
219 case kCmdResetTriggerId:
220 cout << "-> Reset" << endl;
221 fHeader.fEventCounter = 0;
222 break;
223
224 case kCmdSingleTrigger:
225 cout << "-> Trigger" << endl;
226 SendData();
227 break;
228
229 default:
230 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
231 {
232 fCommand.resize(2);
233 fCommand[0] = kCmdWriteRoi;
234 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
235 break;
236 }
237 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
238 {
239 fCommand.resize(2);
240 fCommand[0] = kCmdWriteDac;
241 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
242 break;
243 }
244
245 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
246 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
247 return;
248 }
249
250 fBufCommand.resize(1);
251 AsyncRead(ba::buffer(fBufCommand));
252 return;
253 }
254
255 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
256 fBufCommand.begin(), ntohs);
257
258 switch (fCommand[0])
259 {
260 case kCmdWriteRoi:
261 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
262 fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
263 break;
264
265 case kCmdWriteDac:
266 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
267 fHeader.fDac[fCommand[1]] = fBufCommand[0];
268 break;
269 }
270
271 fCommand.resize(0);
272
273 fBufCommand.resize(1);
274 AsyncRead(ba::buffer(fBufCommand));
275 }
276
277public:
278 typedef boost::shared_ptr<tcp_connection> shared_ptr;
279
280 static shared_ptr create(ba::io_service& io_service)
281 {
282 return shared_ptr(new tcp_connection(io_service));
283 }
284
285 void start()
286 {
287 // Ownership of buffer must be valid until Handler is called.
288
289 fTriggerEnabled=false;
290 fCommandSocket=true;
291
292 fHeader.fStartDelimiter = FAD::kDelimiterStart;
293 fHeader.fVersion = 0x104;
294 fHeader.fStatus = 0xf<<12 |
295 FAD::EventHeader::kDenable |
296 FAD::EventHeader::kDwrite |
297 FAD::EventHeader::kDcmLocked |
298 FAD::EventHeader::kDcmReady |
299 FAD::EventHeader::kSpiSclk;
300
301 fStartTime = Time(Time::utc).UnixTime();
302
303 for (int i=0; i<kNumChannels; i++)
304 {
305 fChHeader[i].fId = (i%9) | ((i/9)<<4);
306 fChHeader[i].fRegionOfInterest = 0;
307 }
308
309 // Emit something to be written to the socket
310 fBufCommand.resize(1);
311 AsyncRead(ba::buffer(fBufCommand));
312
313// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
314
315// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
316// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
317
318 }
319
320 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
321
322 ~tcp_connection()
323 {
324 fSockets.clear();
325 }
326
327 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
328 {
329 cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
330 cout << ":"<< port << endl;
331 fSockets.push_back(socket);
332 }
333};
334
335
336class tcp_server
337{
338 tcp::acceptor acc0;
339 tcp::acceptor acc1;
340 tcp::acceptor acc2;
341 tcp::acceptor acc3;
342 tcp::acceptor acc4;
343 tcp::acceptor acc5;
344 tcp::acceptor acc6;
345 tcp::acceptor acc7;
346
347 int fPort;
348
349public:
350 tcp_server(ba::io_service& ioservice, int port) :
351 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
352 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
353 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
354 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
355 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
356 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
357 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
358 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
359 fPort(port)
360 {
361 // We could start listening for more than one connection
362 // here, but since there is only one handler executed each time
363 // it would not make sense. Before one handle_accept is not
364 // finished no new handle_accept will be called.
365 // Workround: Start a new thread in handle_accept
366 start_accept();
367 }
368
369private:
370 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
371 {
372 boost::shared_ptr<ba::ip::tcp::socket> connection =
373 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
374 acc.async_accept(*connection,
375 boost::bind(&tcp_connection::handle_accept,
376 dest, connection,
377 acc.local_endpoint().port(),
378 ba::placeholders::error));
379 }
380
381 void start_accept()
382 {
383 cout << "Start accept..." << flush;
384 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service());
385
386 // This will accept a connection without blocking
387 acc0.async_accept(*new_connection,
388 boost::bind(&tcp_server::handle_accept,
389 this,
390 new_connection,
391 ba::placeholders::error));
392
393 start_accept(new_connection, acc1);
394 start_accept(new_connection, acc2);
395 start_accept(new_connection, acc3);
396 start_accept(new_connection, acc4);
397 start_accept(new_connection, acc5);
398 start_accept(new_connection, acc6);
399 start_accept(new_connection, acc7);
400
401 cout << "start-done." << endl;
402 }
403
404 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
405 {
406 // The connection has been accepted and is now ready to use
407
408 // not installing a new handler will stop run()
409 cout << "Handle accept..." << flush;
410 if (!error)
411 {
412 new_connection->start();
413
414 // The is now an open connection/server (tcp_connection)
415 // we immediatly schedule another connection
416 // This allowed two client-connection at the same time
417 start_accept();
418 }
419 cout << "handle-done." << endl;
420 }
421};
422
423int main(int argc, const char **argv)
424{
425 try
426 {
427 ba::io_service io_service;
428
429 int port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
430
431 tcp_server server(io_service, port);
432
433 // ba::add_service(io_service, &server);
434 // server.add_service(...);
435 //cout << "Run..." << flush;
436
437 // Calling run() from a single thread ensures no concurrent access
438 // of the handler which are called!!!
439 io_service.run();
440
441 //cout << "end." << endl;
442 }
443 catch (std::exception& e)
444 {
445 std::cerr << e.what() << std::endl;
446 }
447
448 return 0;
449}
450/* ====================== Buffers ===========================
451
452char d1[128]; ba::buffer(d1));
453std::vector<char> d2(128); ba::buffer(d2);
454boost::array<char, 128> d3; by::buffer(d3);
455
456// --------------------------------
457char d1[128];
458std::vector<char> d2(128);
459boost::array<char, 128> d3;
460
461boost::array<mutable_buffer, 3> bufs1 = {
462 ba::buffer(d1),
463 ba::buffer(d2),
464 ba::buffer(d3) };
465sock.read(bufs1);
466
467std::vector<const_buffer> bufs2;
468bufs2.push_back(boost::asio::buffer(d1));
469bufs2.push_back(boost::asio::buffer(d2));
470bufs2.push_back(boost::asio::buffer(d3));
471sock.write(bufs2);
472
473
474// ======================= Read functions =========================
475
476ba::async_read_until --> delimiter
477
478streambuf buf; // Ensure validity until handler!
479by::async_read(s, buf, ....);
480
481ba::async_read(s, ba:buffer(data, size), handler);
482 // Single buffer
483 boost::asio::async_read(s,
484 ba::buffer(data, size),
485 compl-func --> ba::transfer_at_least(32),
486 handler);
487
488 // Multiple buffers
489boost::asio::async_read(s, buffers,
490 compl-func --> boost::asio::transfer_all(),
491 handler);
492 */
493
494// ================= Others ===============================
495
496 /*
497 strand Provides serialised handler execution.
498 work Class to inform the io_service when it has work to do.
499
500
501io_service::
502dispatch Request the io_service to invoke the given handler.
503poll Run the io_service's event processing loop to execute ready
504 handlers.
505poll_one Run the io_service's event processing loop to execute one ready
506 handler.
507post Request the io_service to invoke the given handler and return
508 immediately.
509reset Reset the io_service in preparation for a subsequent run()
510 invocation.
511run Run the io_service's event processing loop.
512run_one Run the io_service's event processing loop to execute at most
513 one handler.
514stop Stop the io_service's event processing loop.
515wrap Create a new handler that automatically dispatches the wrapped
516 handler on the io_service.
517
518strand:: The io_service::strand class provides the ability to
519 post and dispatch handlers with the guarantee that none
520 of those handlers will execute concurrently.
521
522dispatch Request the strand to invoke the given handler.
523get_io_service Get the io_service associated with the strand.
524post Request the strand to invoke the given handler and return
525 immediately.
526wrap Create a new handler that automatically dispatches the
527 wrapped handler on the strand.
528
529work:: The work class is used to inform the io_service when
530 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
531get_io_service Get the io_service associated with the work.
532work Constructor notifies the io_service that work is starting.
533
534*/
535
536
Note: See TracBrowser for help on using the repository browser.