source: trunk/FACT++/src/fad.cc@ 10851

Last change on this file since 10851 was 10851, checked in by tbretz, 10 years ago
Added support for switching between command socket and data sockets.
File size: 16.9 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31
32 double fStartTime;
33 uint32_t fRunNumber;
34
35 void AsyncRead(ba::mutable_buffers_1 buffers)
36 {
37 ba::async_read(*this, buffers,
38 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39 dummy::error, dummy::bytes_transferred));
40 }
41
42 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43 {
44 ba::async_write(*socket, buffers,
45 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46 dummy::error, dummy::bytes_transferred));
47 }
48 void AsyncWait(ba::deadline_timer &timer, int seconds,
49 void (tcp_connection::*handler)(const bs::error_code&))// const
50 {
51 timer.expires_from_now(boost::posix_time::seconds(seconds));
52 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53 }
54
55 // The constructor is prvate to force the obtained pointer to be shared
56 tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
57 fTriggerSendData(ioservice)
58 {
59 }
60
61 // Callback when writing was successfull or failed
62 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63 {
64 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65 }
66
67 vector<uint16_t> fBufCommand;
68 vector<uint16_t> fBuffer;
69
70 vector<uint16_t> fCommand;
71
72 FAD::EventHeader fHeader;
73 FAD::ChannelHeader fChHeader[kNumChannels];
74
75 ba::deadline_timer fTriggerSendData;
76
77 bool fTriggerEnabled;
78 bool fCommandSocket;
79
80 int fSocket;
81
82 void SendData()
83 {
84 if (!fTriggerEnabled)
85 return;
86
87 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
88 fHeader.fEventCounter++;
89 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
90
91
92 fBuffer.resize(0);
93
94 for (int i=0; i<kNumChannels; i++)
95 {
96 fChHeader[i].fStartCell = i*10;
97
98 const vector<uint16_t> buf = fChHeader[i].HtoN();
99
100 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
101 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
102
103 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
104 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
105 }
106
107 fBuffer.push_back(htons(FAD::kDelimiterEnd));
108
109 const vector<uint16_t> h = fHeader.HtoN();
110
111 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
112
113 if (fCommandSocket)
114 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
115 else
116 {
117 fSocket++;
118 fSocket %= fSockets.size();
119
120 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
121 }
122 }
123
124 void TriggerSendData(const boost::system::error_code &ec)
125 {
126 if (!is_open())
127 {
128 // For example: Here we could schedule a new accept if we
129 // would not want to allow two connections at the same time.
130 return;
131 }
132
133 if (ec==ba::error::basic_errors::operation_aborted)
134 return;
135
136 // Check whether the deadline has passed. We compare the deadline
137 // against the current time since a new asynchronous operation
138 // may have moved the deadline before this actor had a chance
139 // to run.
140 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
141 return;
142
143 // The deadline has passed.
144 SendData();
145
146 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
147 }
148
149 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
150 {
151 // Do not schedule a new read if the connection failed.
152 if (bytes_received==0)
153 {
154 // Close the connection
155 close();
156 return;
157 }
158
159 // No command received yet
160 if (fCommand.size()==0)
161 {
162 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
163 fBufCommand.begin(), ntohs);
164
165 switch (fBufCommand[0])
166 {
167 case kCmdDrsEnable:
168 case kCmdDrsEnable+0x100:
169 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
170 cout << "-> DrsEnable" << endl;
171 break;
172
173 case kCmdDwrite:
174 case kCmdDwrite+0x100:
175 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
176 cout << "-> Dwrite" << endl;
177 break;
178
179 case kCmdTriggerLine:
180 case kCmdTriggerLine+0x100:
181 cout << "-> Trigger line" << endl;
182 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
183 break;
184
185 case kCmdSclk:
186 case kCmdSclk+0x100:
187 cout << "-> Sclk" << endl;
188 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
189 break;
190
191 case kCmdSrclk:
192 case kCmdSrclk+0x100:
193 cout << "-> Drclk" << endl;
194 break;
195
196 case kCmdRun:
197 case kCmdRun+0x100:
198 cout << "-> Run" << endl;
199 break;
200
201 case kCmdSocket:
202 case kCmdSocket+0x100:
203 cout << "-> Socket" << endl;
204 fCommandSocket = fBufCommand[0]==kCmdSocket;
205 break;
206
207 case kCmdContTriggerOn:
208 case kCmdContTriggerOff:
209 if (fBufCommand[0]==kCmdContTriggerOn)
210 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
211 else
212 fTriggerSendData.cancel();
213 cout << "-> ContTrig" << endl;
214 break;
215
216 case kCmdResetTriggerId:
217 cout << "-> Reset" << endl;
218 fHeader.fEventCounter = 0;
219 break;
220
221 case kCmdSingleTrigger:
222 cout << "-> Trigger" << endl;
223 SendData();
224 break;
225
226 default:
227 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
228 {
229 fCommand.resize(2);
230 fCommand[0] = kCmdWriteRoi;
231 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
232 break;
233 }
234 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
235 {
236 fCommand.resize(2);
237 fCommand[0] = kCmdWriteDac;
238 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
239 break;
240 }
241
242 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
243 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
244 return;
245 }
246
247 fBufCommand.resize(1);
248 AsyncRead(ba::buffer(fBufCommand));
249 return;
250 }
251
252 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
253 fBufCommand.begin(), ntohs);
254
255 switch (fCommand[0])
256 {
257 case kCmdWriteRoi:
258 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
259 fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
260 break;
261
262 case kCmdWriteDac:
263 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
264 fHeader.fDac[fCommand[1]] = fBufCommand[0];
265 break;
266 }
267
268 fCommand.resize(0);
269
270 fBufCommand.resize(1);
271 AsyncRead(ba::buffer(fBufCommand));
272 }
273
274public:
275 typedef boost::shared_ptr<tcp_connection> shared_ptr;
276
277 static shared_ptr create(ba::io_service& io_service)
278 {
279 return shared_ptr(new tcp_connection(io_service));
280 }
281
282 void start()
283 {
284 // Ownership of buffer must be valid until Handler is called.
285
286 fTriggerEnabled=false;
287 fCommandSocket=true;
288
289 fHeader.fStartDelimiter = FAD::kDelimiterStart;
290 fHeader.fVersion = 0x104;
291 fHeader.fStatus = 0xf<<12 |
292 FAD::EventHeader::kDenable |
293 FAD::EventHeader::kDwrite |
294 FAD::EventHeader::kDcmLocked |
295 FAD::EventHeader::kDcmReady |
296 FAD::EventHeader::kSpiSclk;
297
298 fStartTime = Time(Time::utc).UnixTime();
299
300 for (int i=0; i<kNumChannels; i++)
301 {
302 fChHeader[i].fId = (i%9) | ((i/9)<<4);
303 fChHeader[i].fRegionOfInterest = 0;
304 }
305
306 // Emit something to be written to the socket
307 fBufCommand.resize(1);
308 AsyncRead(ba::buffer(fBufCommand));
309
310// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
311
312// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
313// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
314
315 }
316
317 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
318
319 ~tcp_connection()
320 {
321 fSockets.clear();
322 }
323
324 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
325 {
326 cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
327 cout << ":"<< port << endl;
328 fSockets.push_back(socket);
329 }
330};
331
332
333class tcp_server
334{
335 tcp::acceptor acc0;
336 tcp::acceptor acc1;
337 tcp::acceptor acc2;
338 tcp::acceptor acc3;
339 tcp::acceptor acc4;
340 tcp::acceptor acc5;
341 tcp::acceptor acc6;
342 tcp::acceptor acc7;
343
344 int fPort;
345
346public:
347 tcp_server(ba::io_service& ioservice, int port) :
348 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
349 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
350 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
351 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
352 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
353 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
354 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
355 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
356 fPort(port)
357 {
358 // We could start listening for more than one connection
359 // here, but since there is only one handler executed each time
360 // it would not make sense. Before one handle_accept is not
361 // finished no new handle_accept will be called.
362 // Workround: Start a new thread in handle_accept
363 start_accept();
364 }
365
366private:
367 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
368 {
369 boost::shared_ptr<ba::ip::tcp::socket> connection =
370 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
371 acc.async_accept(*connection,
372 boost::bind(&tcp_connection::handle_accept,
373 dest, connection,
374 acc.local_endpoint().port(),
375 ba::placeholders::error));
376 }
377
378 void start_accept()
379 {
380 cout << "Start accept..." << flush;
381 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service());
382
383 // This will accept a connection without blocking
384 acc0.async_accept(*new_connection,
385 boost::bind(&tcp_server::handle_accept,
386 this,
387 new_connection,
388 ba::placeholders::error));
389
390 start_accept(new_connection, acc1);
391 start_accept(new_connection, acc2);
392 start_accept(new_connection, acc3);
393 start_accept(new_connection, acc4);
394 start_accept(new_connection, acc5);
395 start_accept(new_connection, acc6);
396 start_accept(new_connection, acc7);
397
398 cout << "start-done." << endl;
399 }
400
401 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
402 {
403 // The connection has been accepted and is now ready to use
404
405 // not installing a new handler will stop run()
406 cout << "Handle accept..." << flush;
407 if (!error)
408 {
409 new_connection->start();
410
411 // The is now an open connection/server (tcp_connection)
412 // we immediatly schedule another connection
413 // This allowed two client-connection at the same time
414 start_accept();
415 }
416 cout << "handle-done." << endl;
417 }
418};
419
420int main(int argc, const char **argv)
421{
422 try
423 {
424 ba::io_service io_service;
425
426 int port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
427
428 tcp_server server(io_service, port);
429
430 // ba::add_service(io_service, &server);
431 // server.add_service(...);
432 //cout << "Run..." << flush;
433
434 // Calling run() from a single thread ensures no concurrent access
435 // of the handler which are called!!!
436 io_service.run();
437
438 //cout << "end." << endl;
439 }
440 catch (std::exception& e)
441 {
442 std::cerr << e.what() << std::endl;
443 }
444
445 return 0;
446}
447/* ====================== Buffers ===========================
448
449char d1[128]; ba::buffer(d1));
450std::vector<char> d2(128); ba::buffer(d2);
451boost::array<char, 128> d3; by::buffer(d3);
452
453// --------------------------------
454char d1[128];
455std::vector<char> d2(128);
456boost::array<char, 128> d3;
457
458boost::array<mutable_buffer, 3> bufs1 = {
459 ba::buffer(d1),
460 ba::buffer(d2),
461 ba::buffer(d3) };
462sock.read(bufs1);
463
464std::vector<const_buffer> bufs2;
465bufs2.push_back(boost::asio::buffer(d1));
466bufs2.push_back(boost::asio::buffer(d2));
467bufs2.push_back(boost::asio::buffer(d3));
468sock.write(bufs2);
469
470
471// ======================= Read functions =========================
472
473ba::async_read_until --> delimiter
474
475streambuf buf; // Ensure validity until handler!
476by::async_read(s, buf, ....);
477
478ba::async_read(s, ba:buffer(data, size), handler);
479 // Single buffer
480 boost::asio::async_read(s,
481 ba::buffer(data, size),
482 compl-func --> ba::transfer_at_least(32),
483 handler);
484
485 // Multiple buffers
486boost::asio::async_read(s, buffers,
487 compl-func --> boost::asio::transfer_all(),
488 handler);
489 */
490
491// ================= Others ===============================
492
493 /*
494 strand Provides serialised handler execution.
495 work Class to inform the io_service when it has work to do.
496
497
498io_service::
499dispatch Request the io_service to invoke the given handler.
500poll Run the io_service's event processing loop to execute ready
501 handlers.
502poll_one Run the io_service's event processing loop to execute one ready
503 handler.
504post Request the io_service to invoke the given handler and return
505 immediately.
506reset Reset the io_service in preparation for a subsequent run()
507 invocation.
508run Run the io_service's event processing loop.
509run_one Run the io_service's event processing loop to execute at most
510 one handler.
511stop Stop the io_service's event processing loop.
512wrap Create a new handler that automatically dispatches the wrapped
513 handler on the io_service.
514
515strand:: The io_service::strand class provides the ability to
516 post and dispatch handlers with the guarantee that none
517 of those handlers will execute concurrently.
518
519dispatch Request the strand to invoke the given handler.
520get_io_service Get the io_service associated with the strand.
521post Request the strand to invoke the given handler and return
522 immediately.
523wrap Create a new handler that automatically dispatches the
524 wrapped handler on the strand.
525
526work:: The work class is used to inform the io_service when
527 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
528get_io_service Get the io_service associated with the work.
529work Constructor notifies the io_service that work is starting.
530
531*/
532
533
Note: See TracBrowser for help on using the repository browser.