source: trunk/FACT++/src/fad.cc@ 11014

Last change on this file since 11014 was 10973, checked in by tbretz, 13 years ago
Implemented the new execute command for the write command.
File size: 18.5 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31 const int fBoardId;
32
33 double fStartTime;
34
35 void AsyncRead(ba::mutable_buffers_1 buffers)
36 {
37 ba::async_read(*this, buffers,
38 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39 dummy::error, dummy::bytes_transferred));
40 }
41
42 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43 {
44 ba::async_write(*socket, buffers,
45 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46 dummy::error, dummy::bytes_transferred));
47 }
48 void AsyncWait(ba::deadline_timer &timer, int seconds,
49 void (tcp_connection::*handler)(const bs::error_code&))// const
50 {
51 timer.expires_from_now(boost::posix_time::seconds(seconds));
52 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53 }
54
55 // The constructor is prvate to force the obtained pointer to be shared
56 tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
57 fBoardId(boardid), fTriggerSendData(ioservice)
58 {
59 }
60
61 // Callback when writing was successfull or failed
62 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63 {
64 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65 }
66
67 vector<uint16_t> fBufCommand;
68 vector<uint16_t> fBuffer;
69
70 vector<uint16_t> fCommand;
71
72 FAD::EventHeader fHeader;
73 FAD::EventHeader fRam;
74 FAD::ChannelHeader fChHeader[kNumChannels];
75
76 uint16_t fRamRoi[kNumChannels];
77
78 ba::deadline_timer fTriggerSendData;
79
80 bool fTriggerEnabled;
81 bool fCommandSocket;
82
83 int fSocket;
84
85 void SendData()
86 {
87 if (!fTriggerEnabled)
88 return;
89
90 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
91 fHeader.fEventCounter++;
92 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
93
94 fBuffer.resize(0);
95
96 for (int i=0; i<kNumChannels; i++)
97 {
98 fChHeader[i].fStartCell = i*10;
99
100 const vector<uint16_t> buf = fChHeader[i].HtoN();
101
102 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
103 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
104
105 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
106 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
107 }
108
109 fBuffer.push_back(htons(FAD::kDelimiterEnd));
110
111 const vector<uint16_t> h = fHeader.HtoN();
112
113 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
114
115 if (fCommandSocket)
116 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
117 else
118 {
119 if (fSockets.size()==0)
120 return;
121
122 fSocket++;
123 fSocket %= fSockets.size();
124
125 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
126 }
127 }
128
129 void TriggerSendData(const boost::system::error_code &ec)
130 {
131 if (!is_open())
132 {
133 // For example: Here we could schedule a new accept if we
134 // would not want to allow two connections at the same time.
135 return;
136 }
137
138 if (ec==ba::error::basic_errors::operation_aborted)
139 return;
140
141 // Check whether the deadline has passed. We compare the deadline
142 // against the current time since a new asynchronous operation
143 // may have moved the deadline before this actor had a chance
144 // to run.
145 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
146 return;
147
148 // The deadline has passed.
149 SendData();
150
151 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
152 }
153
154 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
155 {
156 // Do not schedule a new read if the connection failed.
157 if (bytes_received==0)
158 {
159 // Close the connection
160 close();
161 return;
162 }
163
164 // No command received yet
165 if (fCommand.size()==0)
166 {
167 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
168 fBufCommand.begin(), ntohs);
169
170 switch (fBufCommand[0])
171 {
172 case kCmdDrsEnable:
173 case kCmdDrsEnable+0x100:
174 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
175 cout << "-> DrsEnable" << endl;
176 break;
177
178 case kCmdDwrite:
179 case kCmdDwrite+0x100:
180 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
181 cout << "-> Dwrite" << endl;
182 break;
183
184 case kCmdTriggerLine:
185 case kCmdTriggerLine+0x100:
186 cout << "-> Trigger line" << endl;
187 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
188 break;
189
190 case kCmdSclk:
191 case kCmdSclk+0x100:
192 cout << "-> Sclk" << endl;
193 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
194 break;
195
196 case kCmdSrclk:
197 case kCmdSrclk+0x100:
198 cout << "-> Drclk" << endl;
199 break;
200
201 case kCmdRun:
202 case kCmdRun+0x100:
203 fStartTime = Time(Time::utc).UnixTime();
204 cout << "-> Run" << endl;
205 break;
206
207 case kCmdSocket:
208 case kCmdSocket+0x100:
209 cout << "-> Socket" << endl;
210 fCommandSocket = fBufCommand[0]==kCmdSocket;
211 break;
212
213 case kCmdContTriggerOn:
214 case kCmdContTriggerOff:
215 if (fBufCommand[0]==kCmdContTriggerOn)
216 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
217 else
218 fTriggerSendData.cancel();
219 cout << "-> ContTrig" << endl;
220 break;
221
222 case kCmdResetTriggerId:
223 cout << "-> Reset" << endl;
224 fHeader.fEventCounter = 0;
225 break;
226
227 case kCmdSingleTrigger:
228 cout << "-> Trigger" << endl;
229 SendData();
230 break;
231
232 case kCmdWriteExecute:
233 cout << "-> Execute" << endl;
234 memcpy(fHeader.fDac, fRam.fDac, sizeof(fHeader.fDac));
235 for (int i=0; i<kNumChannels; i++)
236 fChHeader[i].fRegionOfInterest = fRamRoi[i];
237 fHeader.fRunNumber = fRam.fRunNumber;
238 break;
239
240 case kCmdWriteRunNumberMSW:
241 fCommand = fBufCommand;
242 break;
243
244 case kCmdWriteRunNumberLSW:
245 fCommand = fBufCommand;
246 break;
247
248 default:
249 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
250 {
251 fCommand.resize(2);
252 fCommand[0] = kCmdWriteRoi;
253 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
254 break;
255 }
256 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
257 {
258 fCommand.resize(2);
259 fCommand[0] = kCmdWriteDac;
260 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
261 break;
262 }
263
264 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
265 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
266 return;
267 }
268
269 fBufCommand.resize(1);
270 AsyncRead(ba::buffer(fBufCommand));
271 return;
272 }
273
274 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
275 fBufCommand.begin(), ntohs);
276
277 switch (fCommand[0])
278 {
279 case kCmdWriteRunNumberMSW:
280 fRam.fRunNumber &= 0xffff;
281 fRam.fRunNumber |= fBufCommand[0]<<16;
282 cout << "-> Set RunNumber MSW" << endl;
283 break;
284 case kCmdWriteRunNumberLSW:
285 fRam.fRunNumber &= 0xffff0000;
286 fRam.fRunNumber |= fBufCommand[0];
287 cout << "-> Set RunNumber LSW" << endl;
288 break;
289 case kCmdWriteRoi:
290 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
291 //fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
292 fRamRoi[fCommand[1]] = fBufCommand[0];
293 break;
294
295 case kCmdWriteDac:
296 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
297 fRam.fDac[fCommand[1]] = fBufCommand[0];
298 break;
299 }
300
301 fCommand.resize(0);
302
303 fBufCommand.resize(1);
304 AsyncRead(ba::buffer(fBufCommand));
305 }
306
307public:
308 typedef boost::shared_ptr<tcp_connection> shared_ptr;
309
310 static shared_ptr create(ba::io_service& io_service, int boardid)
311 {
312 return shared_ptr(new tcp_connection(io_service, boardid));
313 }
314
315 void start()
316 {
317 // Ownership of buffer must be valid until Handler is called.
318
319 fTriggerEnabled=false;
320 fCommandSocket=true;
321
322 fHeader.fStartDelimiter = FAD::kDelimiterStart;
323 fHeader.fVersion = 0x104;
324 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
325 fHeader.fRunNumber = 1;
326 fHeader.fStatus = 0xf<<12 |
327 FAD::EventHeader::kDenable |
328 FAD::EventHeader::kDwrite |
329 FAD::EventHeader::kDcmLocked |
330 FAD::EventHeader::kDcmReady |
331 FAD::EventHeader::kSpiSclk;
332
333 fStartTime = Time(Time::utc).UnixTime();
334
335 for (int i=0; i<kNumChannels; i++)
336 {
337 fChHeader[i].fId = (i%9) | ((i/9)<<4);
338 fChHeader[i].fRegionOfInterest = 0;
339 }
340
341 // Emit something to be written to the socket
342 fBufCommand.resize(1);
343 AsyncRead(ba::buffer(fBufCommand));
344
345// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
346
347// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
348// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
349
350 }
351
352 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
353
354 ~tcp_connection()
355 {
356 fSockets.clear();
357 }
358
359 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
360 {
361 cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
362 cout << ":"<< port << endl;
363 fSockets.push_back(socket);
364 }
365};
366
367
368class tcp_server
369{
370 tcp::acceptor acc0;
371 tcp::acceptor acc1;
372 tcp::acceptor acc2;
373 tcp::acceptor acc3;
374 tcp::acceptor acc4;
375 tcp::acceptor acc5;
376 tcp::acceptor acc6;
377 tcp::acceptor acc7;
378
379 int fBoardId;
380
381public:
382 tcp_server(ba::io_service& ioservice, int port, int board) :
383 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
384 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
385 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
386 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
387 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
388 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
389 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
390 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
391 fBoardId(board)
392 {
393 // We could start listening for more than one connection
394 // here, but since there is only one handler executed each time
395 // it would not make sense. Before one handle_accept is not
396 // finished no new handle_accept will be called.
397 // Workround: Start a new thread in handle_accept
398 start_accept();
399 }
400
401private:
402 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
403 {
404 boost::shared_ptr<ba::ip::tcp::socket> connection =
405 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
406 acc.async_accept(*connection,
407 boost::bind(&tcp_connection::handle_accept,
408 dest, connection,
409 acc.local_endpoint().port(),
410 ba::placeholders::error));
411 }
412
413 void start_accept()
414 {
415 cout << "Start accept " << acc0.local_endpoint().port() << "..." << flush;
416 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
417
418 // This will accept a connection without blocking
419 acc0.async_accept(*new_connection,
420 boost::bind(&tcp_server::handle_accept,
421 this,
422 new_connection,
423 ba::placeholders::error));
424
425 start_accept(new_connection, acc1);
426 start_accept(new_connection, acc2);
427 start_accept(new_connection, acc3);
428 start_accept(new_connection, acc4);
429 start_accept(new_connection, acc5);
430 start_accept(new_connection, acc6);
431 start_accept(new_connection, acc7);
432
433 cout << "start-done." << endl;
434 }
435
436 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
437 {
438 // The connection has been accepted and is now ready to use
439
440 // not installing a new handler will stop run()
441 cout << "Handle accept..." << flush;
442 if (!error)
443 {
444 new_connection->start();
445
446 // The is now an open connection/server (tcp_connection)
447 // we immediatly schedule another connection
448 // This allowed two client-connection at the same time
449 start_accept();
450 }
451 cout << "handle-done." << endl;
452 }
453};
454
455int main(int argc, const char **argv)
456{
457 //try
458 {
459 ba::io_service io_service;
460
461 int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
462 int n = argc==3 ? lexical_cast<int>(argv[2]) : 1;
463
464 vector<shared_ptr<tcp_server>> servers;
465
466 for (int i=0; i<n; i++)
467 {
468 shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
469 servers.push_back(server);
470
471 port += 8;
472 }
473
474 // ba::add_service(io_service, &server);
475 // server.add_service(...);
476 //cout << "Run..." << flush;
477
478 // Calling run() from a single thread ensures no concurrent access
479 // of the handler which are called!!!
480 io_service.run();
481
482 //cout << "end." << endl;
483 }
484 /*catch (std::exception& e)
485 {
486 std::cerr << e.what() << std::endl;
487 }*/
488
489 return 0;
490}
491/* ====================== Buffers ===========================
492
493char d1[128]; ba::buffer(d1));
494std::vector<char> d2(128); ba::buffer(d2);
495boost::array<char, 128> d3; by::buffer(d3);
496
497// --------------------------------
498char d1[128];
499std::vector<char> d2(128);
500boost::array<char, 128> d3;
501
502boost::array<mutable_buffer, 3> bufs1 = {
503 ba::buffer(d1),
504 ba::buffer(d2),
505 ba::buffer(d3) };
506sock.read(bufs1);
507
508std::vector<const_buffer> bufs2;
509bufs2.push_back(boost::asio::buffer(d1));
510bufs2.push_back(boost::asio::buffer(d2));
511bufs2.push_back(boost::asio::buffer(d3));
512sock.write(bufs2);
513
514
515// ======================= Read functions =========================
516
517ba::async_read_until --> delimiter
518
519streambuf buf; // Ensure validity until handler!
520by::async_read(s, buf, ....);
521
522ba::async_read(s, ba:buffer(data, size), handler);
523 // Single buffer
524 boost::asio::async_read(s,
525 ba::buffer(data, size),
526 compl-func --> ba::transfer_at_least(32),
527 handler);
528
529 // Multiple buffers
530boost::asio::async_read(s, buffers,
531 compl-func --> boost::asio::transfer_all(),
532 handler);
533 */
534
535// ================= Others ===============================
536
537 /*
538 strand Provides serialised handler execution.
539 work Class to inform the io_service when it has work to do.
540
541
542io_service::
543dispatch Request the io_service to invoke the given handler.
544poll Run the io_service's event processing loop to execute ready
545 handlers.
546poll_one Run the io_service's event processing loop to execute one ready
547 handler.
548post Request the io_service to invoke the given handler and return
549 immediately.
550reset Reset the io_service in preparation for a subsequent run()
551 invocation.
552run Run the io_service's event processing loop.
553run_one Run the io_service's event processing loop to execute at most
554 one handler.
555stop Stop the io_service's event processing loop.
556wrap Create a new handler that automatically dispatches the wrapped
557 handler on the io_service.
558
559strand:: The io_service::strand class provides the ability to
560 post and dispatch handlers with the guarantee that none
561 of those handlers will execute concurrently.
562
563dispatch Request the strand to invoke the given handler.
564get_io_service Get the io_service associated with the strand.
565post Request the strand to invoke the given handler and return
566 immediately.
567wrap Create a new handler that automatically dispatches the
568 wrapped handler on the strand.
569
570work:: The work class is used to inform the io_service when
571 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
572get_io_service Get the io_service associated with the work.
573work Constructor notifies the io_service that work is starting.
574
575*/
576
577
Note: See TracBrowser for help on using the repository browser.