source: trunk/FACT++/src/fad.cc@ 11189

Last change on this file since 11189 was 11179, checked in by tbretz, 13 years ago
Added FAD DNA.
File size: 18.6 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31 const int fBoardId;
32
33 double fStartTime;
34
35 void AsyncRead(ba::mutable_buffers_1 buffers)
36 {
37 ba::async_read(*this, buffers,
38 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39 dummy::error, dummy::bytes_transferred));
40 }
41
42 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43 {
44 ba::async_write(*socket, buffers,
45 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46 dummy::error, dummy::bytes_transferred));
47 }
48 void AsyncWait(ba::deadline_timer &timer, int seconds,
49 void (tcp_connection::*handler)(const bs::error_code&))// const
50 {
51 timer.expires_from_now(boost::posix_time::seconds(seconds));
52 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53 }
54
55 // The constructor is prvate to force the obtained pointer to be shared
56 tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
57 fBoardId(boardid), fTriggerSendData(ioservice)
58 {
59 }
60
61 // Callback when writing was successfull or failed
62 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63 {
64 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65 }
66
67 vector<uint16_t> fBufCommand;
68 vector<uint16_t> fBuffer;
69
70 vector<uint16_t> fCommand;
71
72 FAD::EventHeader fHeader;
73 FAD::EventHeader fRam;
74 FAD::ChannelHeader fChHeader[kNumChannels];
75
76 uint16_t fRamRoi[kNumChannels];
77
78 ba::deadline_timer fTriggerSendData;
79
80 bool fTriggerEnabled;
81 bool fCommandSocket;
82
83 int fSocket;
84
85 void SendData()
86 {
87 if (!fTriggerEnabled)
88 return;
89
90 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
91 fHeader.fEventCounter++;
92 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
93
94 fBuffer.resize(0);
95
96 for (int i=0; i<kNumChannels; i++)
97 {
98 fChHeader[i].fStartCell = i*10;
99
100 const vector<uint16_t> buf = fChHeader[i].HtoN();
101
102 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
103 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42+fHeader.fEventCounter*100);
104
105 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
106 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
107 }
108
109 fBuffer.push_back(htons(FAD::kDelimiterEnd));
110
111 const vector<uint16_t> h = fHeader.HtoN();
112
113 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
114
115 if (fCommandSocket)
116 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
117 else
118 {
119 if (fSockets.size()==0)
120 return;
121
122 fSocket++;
123 fSocket %= fSockets.size();
124
125 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
126 }
127 }
128
129 void TriggerSendData(const boost::system::error_code &ec)
130 {
131 if (!is_open())
132 {
133 // For example: Here we could schedule a new accept if we
134 // would not want to allow two connections at the same time.
135 return;
136 }
137
138 if (ec==ba::error::basic_errors::operation_aborted)
139 return;
140
141 // Check whether the deadline has passed. We compare the deadline
142 // against the current time since a new asynchronous operation
143 // may have moved the deadline before this actor had a chance
144 // to run.
145 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
146 return;
147
148 // The deadline has passed.
149 SendData();
150
151 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
152 }
153
154 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
155 {
156 // Do not schedule a new read if the connection failed.
157 if (bytes_received==0)
158 {
159 // Close the connection
160 close();
161 return;
162 }
163
164 // No command received yet
165 if (fCommand.size()==0)
166 {
167 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
168 fBufCommand.begin(), ntohs);
169
170 switch (fBufCommand[0])
171 {
172 case kCmdDrsEnable:
173 case kCmdDrsEnable+0x100:
174 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
175 cout << "-> DrsEnable" << endl;
176 break;
177
178 case kCmdDwrite:
179 case kCmdDwrite+0x100:
180 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
181 cout << "-> Dwrite" << endl;
182 break;
183
184 case kCmdTriggerLine:
185 case kCmdTriggerLine+0x100:
186 cout << "-> Trigger line" << endl;
187 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
188 break;
189
190 case kCmdSclk:
191 case kCmdSclk+0x100:
192 cout << "-> Sclk" << endl;
193 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
194 break;
195
196 case kCmdSrclk:
197 case kCmdSrclk+0x100:
198 cout << "-> Drclk" << endl;
199 break;
200
201 case kCmdRun:
202 case kCmdRun+0x100:
203 fStartTime = Time(Time::utc).UnixTime();
204 cout << "-> Run" << endl;
205 break;
206
207 case kCmdSocket:
208 case kCmdSocket+0x100:
209 cout << "-> Socket" << endl;
210 fCommandSocket = fBufCommand[0]==kCmdSocket;
211 break;
212
213 case kCmdContTrigger:
214 case kCmdContTrigger+0x100:
215 if (fBufCommand[0]==kCmdContTrigger+0x100)
216 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
217 else
218 fTriggerSendData.cancel();
219 cout << "-> ContTrig" << endl;
220 break;
221
222 case kCmdResetTriggerId:
223 cout << "-> Reset" << endl;
224 fHeader.fEventCounter = 0;
225 break;
226
227 case kCmdSingleTrigger:
228 cout << "-> Trigger" << endl;
229 SendData();
230 break;
231
232 case kCmdWriteExecute:
233 cout << "-> Execute" << endl;
234 memcpy(fHeader.fDac, fRam.fDac, sizeof(fHeader.fDac));
235 for (int i=0; i<kNumChannels; i++)
236 fChHeader[i].fRegionOfInterest = fRamRoi[i];
237 fHeader.fRunNumber = fRam.fRunNumber;
238 break;
239
240 case kCmdWriteRunNumberMSW:
241 fCommand = fBufCommand;
242 break;
243
244 case kCmdWriteRunNumberLSW:
245 fCommand = fBufCommand;
246 break;
247
248 default:
249 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
250 {
251 fCommand.resize(2);
252 fCommand[0] = kCmdWriteRoi;
253 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
254 break;
255 }
256 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
257 {
258 fCommand.resize(2);
259 fCommand[0] = kCmdWriteDac;
260 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
261 break;
262 }
263
264 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
265 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
266 return;
267 }
268
269 fBufCommand.resize(1);
270 AsyncRead(ba::buffer(fBufCommand));
271 return;
272 }
273
274 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
275 fBufCommand.begin(), ntohs);
276
277 switch (fCommand[0])
278 {
279 case kCmdWriteRunNumberMSW:
280 fRam.fRunNumber &= 0xffff;
281 fRam.fRunNumber |= fBufCommand[0]<<16;
282 cout << "-> Set RunNumber MSW" << endl;
283 break;
284 case kCmdWriteRunNumberLSW:
285 fRam.fRunNumber &= 0xffff0000;
286 fRam.fRunNumber |= fBufCommand[0];
287 cout << "-> Set RunNumber LSW" << endl;
288 break;
289 case kCmdWriteRoi:
290 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
291 //fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
292 fRamRoi[fCommand[1]] = fBufCommand[0];
293 break;
294
295 case kCmdWriteDac:
296 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
297 fRam.fDac[fCommand[1]] = fBufCommand[0];
298 break;
299 }
300
301 fCommand.resize(0);
302
303 fBufCommand.resize(1);
304 AsyncRead(ba::buffer(fBufCommand));
305 }
306
307public:
308 typedef boost::shared_ptr<tcp_connection> shared_ptr;
309
310 static shared_ptr create(ba::io_service& io_service, int boardid)
311 {
312 return shared_ptr(new tcp_connection(io_service, boardid));
313 }
314
315 void start()
316 {
317 // Ownership of buffer must be valid until Handler is called.
318
319 fTriggerEnabled=false;
320 fCommandSocket=true;
321
322 fHeader.fStartDelimiter = FAD::kDelimiterStart;
323 fHeader.fVersion = 0x104;
324 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
325 fHeader.fRunNumber = 1;
326 fHeader.fDNA = reinterpret_cast<uint64_t>(this);
327 fHeader.fStatus = 0xf<<12 |
328 FAD::EventHeader::kDenable |
329 FAD::EventHeader::kDwrite |
330 FAD::EventHeader::kDcmLocked |
331 FAD::EventHeader::kDcmReady |
332 FAD::EventHeader::kSpiSclk;
333
334
335 fStartTime = Time(Time::utc).UnixTime();
336
337 for (int i=0; i<kNumChannels; i++)
338 {
339 fChHeader[i].fId = (i%9) | ((i/9)<<4);
340 fChHeader[i].fRegionOfInterest = 0;
341 }
342
343 // Emit something to be written to the socket
344 fBufCommand.resize(1);
345 AsyncRead(ba::buffer(fBufCommand));
346
347// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
348
349// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
350// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
351
352 }
353
354 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
355
356 ~tcp_connection()
357 {
358 fSockets.clear();
359 }
360
361 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
362 {
363 cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
364 cout << ":"<< port << endl;
365 fSockets.push_back(socket);
366 }
367};
368
369
370class tcp_server
371{
372 tcp::acceptor acc0;
373 tcp::acceptor acc1;
374 tcp::acceptor acc2;
375 tcp::acceptor acc3;
376 tcp::acceptor acc4;
377 tcp::acceptor acc5;
378 tcp::acceptor acc6;
379 tcp::acceptor acc7;
380
381 int fBoardId;
382
383public:
384 tcp_server(ba::io_service& ioservice, int port, int board) :
385 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
386 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
387 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
388 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
389 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
390 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
391 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
392 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
393 fBoardId(board)
394 {
395 // We could start listening for more than one connection
396 // here, but since there is only one handler executed each time
397 // it would not make sense. Before one handle_accept is not
398 // finished no new handle_accept will be called.
399 // Workround: Start a new thread in handle_accept
400 start_accept();
401 }
402
403private:
404 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
405 {
406 boost::shared_ptr<ba::ip::tcp::socket> connection =
407 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
408 acc.async_accept(*connection,
409 boost::bind(&tcp_connection::handle_accept,
410 dest, connection,
411 acc.local_endpoint().port(),
412 ba::placeholders::error));
413 }
414
415 void start_accept()
416 {
417 cout << "Start accept " << acc0.local_endpoint().port() << "..." << flush;
418 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
419
420 // This will accept a connection without blocking
421 acc0.async_accept(*new_connection,
422 boost::bind(&tcp_server::handle_accept,
423 this,
424 new_connection,
425 ba::placeholders::error));
426
427 start_accept(new_connection, acc1);
428 start_accept(new_connection, acc2);
429 start_accept(new_connection, acc3);
430 start_accept(new_connection, acc4);
431 start_accept(new_connection, acc5);
432 start_accept(new_connection, acc6);
433 start_accept(new_connection, acc7);
434
435 cout << "start-done." << endl;
436 }
437
438 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
439 {
440 // The connection has been accepted and is now ready to use
441
442 // not installing a new handler will stop run()
443 cout << "Handle accept..." << flush;
444 if (!error)
445 {
446 new_connection->start();
447
448 // The is now an open connection/server (tcp_connection)
449 // we immediatly schedule another connection
450 // This allowed two client-connection at the same time
451 start_accept();
452 }
453 cout << "handle-done." << endl;
454 }
455};
456
457int main(int argc, const char **argv)
458{
459 //try
460 {
461 ba::io_service io_service;
462
463 int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
464 int n = argc==3 ? lexical_cast<int>(argv[2]) : 1;
465
466 vector<shared_ptr<tcp_server>> servers;
467
468 for (int i=0; i<n; i++)
469 {
470 shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
471 servers.push_back(server);
472
473 port += 8;
474 }
475
476 // ba::add_service(io_service, &server);
477 // server.add_service(...);
478 //cout << "Run..." << flush;
479
480 // Calling run() from a single thread ensures no concurrent access
481 // of the handler which are called!!!
482 io_service.run();
483
484 //cout << "end." << endl;
485 }
486 /*catch (std::exception& e)
487 {
488 std::cerr << e.what() << std::endl;
489 }*/
490
491 return 0;
492}
493/* ====================== Buffers ===========================
494
495char d1[128]; ba::buffer(d1));
496std::vector<char> d2(128); ba::buffer(d2);
497boost::array<char, 128> d3; by::buffer(d3);
498
499// --------------------------------
500char d1[128];
501std::vector<char> d2(128);
502boost::array<char, 128> d3;
503
504boost::array<mutable_buffer, 3> bufs1 = {
505 ba::buffer(d1),
506 ba::buffer(d2),
507 ba::buffer(d3) };
508sock.read(bufs1);
509
510std::vector<const_buffer> bufs2;
511bufs2.push_back(boost::asio::buffer(d1));
512bufs2.push_back(boost::asio::buffer(d2));
513bufs2.push_back(boost::asio::buffer(d3));
514sock.write(bufs2);
515
516
517// ======================= Read functions =========================
518
519ba::async_read_until --> delimiter
520
521streambuf buf; // Ensure validity until handler!
522by::async_read(s, buf, ....);
523
524ba::async_read(s, ba:buffer(data, size), handler);
525 // Single buffer
526 boost::asio::async_read(s,
527 ba::buffer(data, size),
528 compl-func --> ba::transfer_at_least(32),
529 handler);
530
531 // Multiple buffers
532boost::asio::async_read(s, buffers,
533 compl-func --> boost::asio::transfer_all(),
534 handler);
535 */
536
537// ================= Others ===============================
538
539 /*
540 strand Provides serialised handler execution.
541 work Class to inform the io_service when it has work to do.
542
543
544io_service::
545dispatch Request the io_service to invoke the given handler.
546poll Run the io_service's event processing loop to execute ready
547 handlers.
548poll_one Run the io_service's event processing loop to execute one ready
549 handler.
550post Request the io_service to invoke the given handler and return
551 immediately.
552reset Reset the io_service in preparation for a subsequent run()
553 invocation.
554run Run the io_service's event processing loop.
555run_one Run the io_service's event processing loop to execute at most
556 one handler.
557stop Stop the io_service's event processing loop.
558wrap Create a new handler that automatically dispatches the wrapped
559 handler on the io_service.
560
561strand:: The io_service::strand class provides the ability to
562 post and dispatch handlers with the guarantee that none
563 of those handlers will execute concurrently.
564
565dispatch Request the strand to invoke the given handler.
566get_io_service Get the io_service associated with the strand.
567post Request the strand to invoke the given handler and return
568 immediately.
569wrap Create a new handler that automatically dispatches the
570 wrapped handler on the strand.
571
572work:: The work class is used to inform the io_service when
573 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
574get_io_service Get the io_service associated with the work.
575work Constructor notifies the io_service that work is starting.
576
577*/
578
579
Note: See TracBrowser for help on using the repository browser.