source: trunk/FACT++/src/fad.cc@ 11218

Last change on this file since 11218 was 11209, checked in by tbretz, 13 years ago
Added trigger id and fake temperature.
File size: 19.4 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30public:
31 const int fBoardId;
32
33 double fStartTime;
34
35 void AsyncRead(ba::mutable_buffers_1 buffers)
36 {
37 ba::async_read(*this, buffers,
38 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39 dummy::error, dummy::bytes_transferred));
40 }
41
42 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43 {
44 ba::async_write(*socket, buffers,
45 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46 dummy::error, dummy::bytes_transferred));
47 }
48 void AsyncWait(ba::deadline_timer &timer, int seconds,
49 void (tcp_connection::*handler)(const bs::error_code&))// const
50 {
51 timer.expires_from_now(boost::posix_time::seconds(seconds));
52 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53 }
54
55 // The constructor is prvate to force the obtained pointer to be shared
56 tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
57 fBoardId(boardid), fTriggerSendData(ioservice)
58 {
59 }
60
61 // Callback when writing was successfull or failed
62 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63 {
64 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65 }
66
67 vector<uint16_t> fBufCommand;
68 vector<uint16_t> fBuffer;
69
70 vector<uint16_t> fCommand;
71
72 FAD::EventHeader fHeader;
73 FAD::EventHeader fRam;
74 FAD::ChannelHeader fChHeader[kNumChannels];
75
76 uint16_t fRamRoi[kNumChannels];
77
78 ba::deadline_timer fTriggerSendData;
79
80 bool fTriggerEnabled;
81 bool fCommandSocket;
82
83 int fSocket;
84
85 void SendData()
86 {
87 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
88 fHeader.fEventCounter++;
89 fHeader.fTriggerId = fHeader.fEventCounter;
90 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
91
92 for (int i=0; i<FAD::kNumTemp; i++)
93 fHeader.fTempDrs[i] = (42.+fBoardId/40.+float(rand())/RAND_MAX*5)*16;
94
95 fBuffer.resize(0);
96
97 for (int i=0; i<kNumChannels; i++)
98 {
99 fChHeader[i].fStartCell = i*10;
100
101 const vector<uint16_t> buf = fChHeader[i].HtoN();
102
103 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
104 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42+fHeader.fEventCounter*100);
105
106 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
107 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
108 }
109
110 fBuffer.push_back(htons(FAD::kDelimiterEnd));
111
112 const vector<uint16_t> h = fHeader.HtoN();
113
114 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
115
116 if (fCommandSocket)
117 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
118 else
119 {
120 if (fSockets.size()==0)
121 return;
122
123 fSocket++;
124 fSocket %= fSockets.size();
125
126 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
127 }
128 }
129
130 void TriggerSendData(const boost::system::error_code &ec)
131 {
132 if (!is_open())
133 {
134 // For example: Here we could schedule a new accept if we
135 // would not want to allow two connections at the same time.
136 return;
137 }
138
139 if (ec==ba::error::basic_errors::operation_aborted)
140 return;
141
142 // Check whether the deadline has passed. We compare the deadline
143 // against the current time since a new asynchronous operation
144 // may have moved the deadline before this actor had a chance
145 // to run.
146 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
147 return;
148
149 // The deadline has passed.
150 if (fTriggerEnabled)
151 SendData();
152
153 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
154 }
155
156 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
157 {
158 // Do not schedule a new read if the connection failed.
159 if (bytes_received==0)
160 {
161 // Close the connection
162 close();
163 return;
164 }
165
166 // No command received yet
167 if (fCommand.size()==0)
168 {
169 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
170 fBufCommand.begin(), ntohs);
171
172 switch (fBufCommand[0])
173 {
174 case kCmdDrsEnable:
175 case kCmdDrsEnable+0x100:
176 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
177 cout << "-> DrsEnable" << endl;
178 break;
179
180 case kCmdDwrite:
181 case kCmdDwrite+0x100:
182 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
183 cout << "-> Dwrite" << endl;
184 break;
185
186 case kCmdTriggerLine:
187 case kCmdTriggerLine+0x100:
188 cout << "-> Trigger line" << endl;
189 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
190 fHeader.Enable(FAD::EventHeader::kTriggerLine, fTriggerEnabled);
191 break;
192
193 case kCmdSclk:
194 case kCmdSclk+0x100:
195 cout << "-> Sclk" << endl;
196 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
197 break;
198
199 case kCmdSrclk:
200 case kCmdSrclk+0x100:
201 cout << "-> Drclk" << endl;
202 break;
203
204 case kCmdRun:
205 case kCmdRun+0x100:
206 fStartTime = Time(Time::utc).UnixTime();
207 cout << "-> Run" << endl;
208 break;
209
210 case kCmdBusy:
211 case kCmdBusy+0x100:
212 cout << "-> Busy" << endl;
213 fHeader.Enable(FAD::EventHeader::kBusy, fBufCommand[0]==kCmdBusy);
214 break;
215
216 case kCmdSocket:
217 case kCmdSocket+0x100:
218 cout << "-> Socket" << endl;
219 fCommandSocket = fBufCommand[0]==kCmdSocket;
220 fHeader.Enable(FAD::EventHeader::kSock17, !fCommandSocket);
221 break;
222
223 case kCmdContTrigger:
224 case kCmdContTrigger+0x100:
225 if (fBufCommand[0]==kCmdContTrigger)
226 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
227 else
228 fTriggerSendData.cancel();
229 fHeader.Enable(FAD::EventHeader::kContTrigger, fBufCommand[0]==kCmdContTrigger);
230 cout << "-> ContTrig" << endl;
231 break;
232
233 case kCmdResetTriggerId:
234 cout << "-> ResetId" << endl;
235 fHeader.fEventCounter = 0;
236 break;
237
238 case kCmdSingleTrigger:
239 cout << "-> Trigger " << fBoardId << endl;
240 SendData();
241 break;
242
243 case kCmdWriteExecute:
244 cout << "-> Execute" << endl;
245 memcpy(fHeader.fDac, fRam.fDac, sizeof(fHeader.fDac));
246 for (int i=0; i<kNumChannels; i++)
247 fChHeader[i].fRegionOfInterest = fRamRoi[i];
248 fHeader.fRunNumber = fRam.fRunNumber;
249 break;
250
251 case kCmdWriteRunNumberMSW:
252 fCommand = fBufCommand;
253 break;
254
255 case kCmdWriteRunNumberLSW:
256 fCommand = fBufCommand;
257 break;
258
259 default:
260 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
261 {
262 fCommand.resize(2);
263 fCommand[0] = kCmdWriteRoi;
264 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
265 break;
266 }
267 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
268 {
269 fCommand.resize(2);
270 fCommand[0] = kCmdWriteDac;
271 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
272 break;
273 }
274
275 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
276 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
277 return;
278 }
279
280 fBufCommand.resize(1);
281 AsyncRead(ba::buffer(fBufCommand));
282 return;
283 }
284
285 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
286 fBufCommand.begin(), ntohs);
287
288 switch (fCommand[0])
289 {
290 case kCmdWriteRunNumberMSW:
291 fRam.fRunNumber &= 0xffff;
292 fRam.fRunNumber |= fBufCommand[0]<<16;
293 cout << "-> Set RunNumber MSW" << endl;
294 break;
295 case kCmdWriteRunNumberLSW:
296 fRam.fRunNumber &= 0xffff0000;
297 fRam.fRunNumber |= fBufCommand[0];
298 cout << "-> Set RunNumber LSW" << endl;
299 break;
300 case kCmdWriteRoi:
301 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
302 //fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
303 fRamRoi[fCommand[1]] = fBufCommand[0];
304 break;
305
306 case kCmdWriteDac:
307 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
308 fRam.fDac[fCommand[1]] = fBufCommand[0];
309 break;
310 }
311
312 fCommand.resize(0);
313
314 fBufCommand.resize(1);
315 AsyncRead(ba::buffer(fBufCommand));
316 }
317
318public:
319 typedef boost::shared_ptr<tcp_connection> shared_ptr;
320
321 static shared_ptr create(ba::io_service& io_service, int boardid)
322 {
323 return shared_ptr(new tcp_connection(io_service, boardid));
324 }
325
326 void start()
327 {
328 // Ownership of buffer must be valid until Handler is called.
329
330 fTriggerEnabled=false;
331 fCommandSocket=true;
332
333 fHeader.fStartDelimiter = FAD::kDelimiterStart;
334 fHeader.fVersion = 0x104;
335 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
336 fHeader.fRunNumber = 1;
337 fHeader.fDNA = reinterpret_cast<uint64_t>(this);
338 fHeader.fStatus = 0xf<<12 |
339 FAD::EventHeader::kDenable |
340 FAD::EventHeader::kDwrite |
341 FAD::EventHeader::kDcmLocked |
342 FAD::EventHeader::kDcmReady |
343 FAD::EventHeader::kSpiSclk;
344
345
346 fStartTime = Time(Time::utc).UnixTime();
347
348 for (int i=0; i<kNumChannels; i++)
349 {
350 fChHeader[i].fId = (i%9) | ((i/9)<<4);
351 fChHeader[i].fRegionOfInterest = 0;
352 }
353
354 // Emit something to be written to the socket
355 fBufCommand.resize(1);
356 AsyncRead(ba::buffer(fBufCommand));
357
358// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
359
360// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
361// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
362
363 }
364
365 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
366
367 ~tcp_connection()
368 {
369 fSockets.clear();
370 }
371
372 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
373 {
374 cout << this << " Added one socket[" << fBoardId << "] " << socket->remote_endpoint().address().to_v4().to_string();
375 cout << ":"<< port << endl;
376 fSockets.push_back(socket);
377 }
378};
379
380
381class tcp_server
382{
383 tcp::acceptor acc0;
384 tcp::acceptor acc1;
385 tcp::acceptor acc2;
386 tcp::acceptor acc3;
387 tcp::acceptor acc4;
388 tcp::acceptor acc5;
389 tcp::acceptor acc6;
390 tcp::acceptor acc7;
391
392 int fBoardId;
393
394public:
395 tcp_server(ba::io_service& ioservice, int port, int board) :
396 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
397 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
398 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
399 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
400 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
401 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
402 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
403 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
404 fBoardId(board)
405 {
406 // We could start listening for more than one connection
407 // here, but since there is only one handler executed each time
408 // it would not make sense. Before one handle_accept is not
409 // finished no new handle_accept will be called.
410 // Workround: Start a new thread in handle_accept
411 start_accept();
412 }
413
414private:
415 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
416 {
417 boost::shared_ptr<ba::ip::tcp::socket> connection =
418 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
419
420 acc.async_accept(*connection,
421 boost::bind(&tcp_connection::handle_accept,
422 dest, connection,
423 acc.local_endpoint().port(),
424 ba::placeholders::error));
425 }
426
427 void start_accept()
428 {
429 cout << "Start accept[" << fBoardId << "] " << acc0.local_endpoint().port() << "..." << flush;
430 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
431
432 cout << new_connection.get() << " ";
433
434 // This will accept a connection without blocking
435 acc0.async_accept(*new_connection,
436 boost::bind(&tcp_server::handle_accept,
437 this,
438 new_connection,
439 ba::placeholders::error));
440
441 start_accept(new_connection, acc1);
442 start_accept(new_connection, acc2);
443 start_accept(new_connection, acc3);
444 start_accept(new_connection, acc4);
445 start_accept(new_connection, acc5);
446 start_accept(new_connection, acc6);
447 start_accept(new_connection, acc7);
448
449 cout << "start-done." << endl;
450 }
451
452 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
453 {
454 // The connection has been accepted and is now ready to use
455
456 // not installing a new handler will stop run()
457 cout << new_connection.get() << " Handle accept[" << fBoardId << "]["<<new_connection->fBoardId<<"]..." << flush;
458 if (!error)
459 {
460 new_connection->start();
461
462 // The is now an open connection/server (tcp_connection)
463 // we immediatly schedule another connection
464 // This allowed two client-connection at the same time
465 start_accept();
466 }
467 cout << "handle-done." << endl;
468 }
469};
470
471int main(int argc, const char **argv)
472{
473 //try
474 {
475 ba::io_service io_service;
476
477 int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
478 int n = argc==3 ? lexical_cast<int>(argv[2]) : 1;
479
480 vector<shared_ptr<tcp_server>> servers;
481
482 for (int i=0; i<n; i++)
483 {
484 shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
485 servers.push_back(server);
486
487 port += 8;
488 }
489
490 // ba::add_service(io_service, &server);
491 // server.add_service(...);
492 //cout << "Run..." << flush;
493
494 // Calling run() from a single thread ensures no concurrent access
495 // of the handler which are called!!!
496 io_service.run();
497
498 //cout << "end." << endl;
499 }
500 /*catch (std::exception& e)
501 {
502 std::cerr << e.what() << std::endl;
503 }*/
504
505 return 0;
506}
507/* ====================== Buffers ===========================
508
509char d1[128]; ba::buffer(d1));
510std::vector<char> d2(128); ba::buffer(d2);
511boost::array<char, 128> d3; by::buffer(d3);
512
513// --------------------------------
514char d1[128];
515std::vector<char> d2(128);
516boost::array<char, 128> d3;
517
518boost::array<mutable_buffer, 3> bufs1 = {
519 ba::buffer(d1),
520 ba::buffer(d2),
521 ba::buffer(d3) };
522sock.read(bufs1);
523
524std::vector<const_buffer> bufs2;
525bufs2.push_back(boost::asio::buffer(d1));
526bufs2.push_back(boost::asio::buffer(d2));
527bufs2.push_back(boost::asio::buffer(d3));
528sock.write(bufs2);
529
530
531// ======================= Read functions =========================
532
533ba::async_read_until --> delimiter
534
535streambuf buf; // Ensure validity until handler!
536by::async_read(s, buf, ....);
537
538ba::async_read(s, ba:buffer(data, size), handler);
539 // Single buffer
540 boost::asio::async_read(s,
541 ba::buffer(data, size),
542 compl-func --> ba::transfer_at_least(32),
543 handler);
544
545 // Multiple buffers
546boost::asio::async_read(s, buffers,
547 compl-func --> boost::asio::transfer_all(),
548 handler);
549 */
550
551// ================= Others ===============================
552
553 /*
554 strand Provides serialised handler execution.
555 work Class to inform the io_service when it has work to do.
556
557
558io_service::
559dispatch Request the io_service to invoke the given handler.
560poll Run the io_service's event processing loop to execute ready
561 handlers.
562poll_one Run the io_service's event processing loop to execute one ready
563 handler.
564post Request the io_service to invoke the given handler and return
565 immediately.
566reset Reset the io_service in preparation for a subsequent run()
567 invocation.
568run Run the io_service's event processing loop.
569run_one Run the io_service's event processing loop to execute at most
570 one handler.
571stop Stop the io_service's event processing loop.
572wrap Create a new handler that automatically dispatches the wrapped
573 handler on the io_service.
574
575strand:: The io_service::strand class provides the ability to
576 post and dispatch handlers with the guarantee that none
577 of those handlers will execute concurrently.
578
579dispatch Request the strand to invoke the given handler.
580get_io_service Get the io_service associated with the strand.
581post Request the strand to invoke the given handler and return
582 immediately.
583wrap Create a new handler that automatically dispatches the
584 wrapped handler on the strand.
585
586work:: The work class is used to inform the io_service when
587 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
588get_io_service Get the io_service associated with the work.
589work Constructor notifies the io_service that work is starting.
590
591*/
592
593
Note: See TracBrowser for help on using the repository browser.