source: trunk/FACT++/src/fad.cc @ 10853

Last change on this file since 10853 was 10853, checked in by tbretz, 8 years ago
Fixed a floating point exception if no connection is open.
File size: 17.0 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba    = boost::asio;
20namespace bs    = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31
32    double   fStartTime;
33    uint32_t fRunNumber;
34
35    void AsyncRead(ba::mutable_buffers_1 buffers)
36    {
37        ba::async_read(*this, buffers,
38                       boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39                                   dummy::error, dummy::bytes_transferred));
40    }
41
42    void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43    {
44        ba::async_write(*socket, buffers,
45                        boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46                                    dummy::error, dummy::bytes_transferred));
47    }
48    void AsyncWait(ba::deadline_timer &timer, int seconds,
49                               void (tcp_connection::*handler)(const bs::error_code&))// const
50    {
51        timer.expires_from_now(boost::posix_time::seconds(seconds));
52        timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53    }
54
55    // The constructor is prvate to force the obtained pointer to be shared
56    tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
57        fTriggerSendData(ioservice)
58    {
59    }
60
61    // Callback when writing was successfull or failed
62    void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63    {
64        cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65    }
66
67    vector<uint16_t> fBufCommand;
68    vector<uint16_t> fBuffer;
69
70    vector<uint16_t> fCommand;
71
72    FAD::EventHeader   fHeader;
73    FAD::ChannelHeader fChHeader[kNumChannels];
74
75    ba::deadline_timer fTriggerSendData;
76
77    bool fTriggerEnabled;
78    bool fCommandSocket;
79
80    int fSocket;
81
82    void SendData()
83    {
84        if (!fTriggerEnabled)
85            return;
86
87        fHeader.fPackageLength = sizeof(EventHeader)/2+1;
88        fHeader.fEventCounter++;
89        fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
90
91
92        fBuffer.resize(0);
93
94        for (int i=0; i<kNumChannels; i++)
95        {
96            fChHeader[i].fStartCell = i*10;
97
98            const vector<uint16_t> buf = fChHeader[i].HtoN();
99
100            fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
101            fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
102
103            fHeader.fPackageLength += sizeof(ChannelHeader)/2;
104            fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
105        }
106
107        fBuffer.push_back(htons(FAD::kDelimiterEnd));
108
109        const vector<uint16_t> h = fHeader.HtoN();
110
111        fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
112
113        if (fCommandSocket)
114            AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
115        else
116        {
117            if (fSockets.size()==0)
118                return;
119
120            fSocket++;
121            fSocket %= fSockets.size();
122
123            AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
124        }
125    }
126
127    void TriggerSendData(const boost::system::error_code &ec)
128    {
129        if (!is_open())
130        {
131            // For example: Here we could schedule a new accept if we
132            // would not want to allow two connections at the same time.
133            return;
134        }
135
136        if (ec==ba::error::basic_errors::operation_aborted)
137            return;
138
139        // Check whether the deadline has passed. We compare the deadline
140        // against the current time since a new asynchronous operation
141        // may have moved the deadline before this actor had a chance
142        // to run.
143        if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
144            return;
145
146        // The deadline has passed.
147        SendData();
148
149        AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
150    }
151
152    void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
153    {
154        // Do not schedule a new read if the connection failed.
155        if (bytes_received==0)
156        {
157            // Close the connection
158            close();
159            return;
160        }
161
162        // No command received yet
163        if (fCommand.size()==0)
164        {
165            transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
166                      fBufCommand.begin(), ntohs);
167
168            switch (fBufCommand[0])
169            {
170            case kCmdDrsEnable:
171            case kCmdDrsEnable+0x100:
172                fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
173                cout << "-> DrsEnable" << endl;
174                break;
175
176            case kCmdDwrite:
177            case kCmdDwrite+0x100:
178                fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
179                cout << "-> Dwrite" << endl;
180                break;
181
182            case kCmdTriggerLine:
183            case kCmdTriggerLine+0x100:
184                cout << "-> Trigger line" << endl;
185                fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
186                break;
187
188            case kCmdSclk:
189            case kCmdSclk+0x100:
190                cout << "-> Sclk" << endl;
191                fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
192                break;
193
194            case kCmdSrclk:
195            case kCmdSrclk+0x100:
196                cout << "-> Drclk" << endl;
197                break;
198
199            case kCmdRun:
200            case kCmdRun+0x100:
201                cout << "-> Run" << endl;
202                break;
203
204            case kCmdSocket:
205            case kCmdSocket+0x100:
206                cout << "-> Socket" << endl;
207                fCommandSocket = fBufCommand[0]==kCmdSocket;
208                break;
209
210            case kCmdContTriggerOn:
211            case kCmdContTriggerOff:
212                if (fBufCommand[0]==kCmdContTriggerOn)
213                    AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
214                else
215                    fTriggerSendData.cancel();
216                cout << "-> ContTrig" << endl;
217                break;
218
219            case kCmdResetTriggerId:
220                cout << "-> Reset" << endl;
221                fHeader.fEventCounter = 0;
222                break;
223
224            case kCmdSingleTrigger:
225                cout << "-> Trigger" << endl;
226                SendData();
227                break;
228
229            default:
230                if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
231                {
232                    fCommand.resize(2);
233                    fCommand[0] = kCmdWriteRoi;
234                    fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
235                    break;
236                }
237                if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
238                {
239                    fCommand.resize(2);
240                    fCommand[0] = kCmdWriteDac;
241                    fCommand[1] = fBufCommand[0]-kCmdWriteDac;
242                    break;
243                }
244
245                cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
246                cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
247                return;
248            }
249
250            fBufCommand.resize(1);
251            AsyncRead(ba::buffer(fBufCommand));
252            return;
253        }
254
255        transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
256                  fBufCommand.begin(), ntohs);
257
258        switch (fCommand[0])
259        {
260        case kCmdWriteRoi:
261            cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
262            fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
263            break;
264
265        case kCmdWriteDac:
266            cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
267            fHeader.fDac[fCommand[1]] = fBufCommand[0];
268            break;
269        }
270
271        fCommand.resize(0);
272
273        fBufCommand.resize(1);
274        AsyncRead(ba::buffer(fBufCommand));
275    }
276
277public:
278    typedef boost::shared_ptr<tcp_connection> shared_ptr;
279
280    static shared_ptr create(ba::io_service& io_service)
281    {
282        return shared_ptr(new tcp_connection(io_service));
283    }
284
285    void start()
286    {
287        // Ownership of buffer must be valid until Handler is called.
288
289        fTriggerEnabled=false;
290        fCommandSocket=true;
291
292        fHeader.fStartDelimiter = FAD::kDelimiterStart;
293        fHeader.fVersion = 0x104;
294        fHeader.fStatus = 0xf<<12 |
295            FAD::EventHeader::kDenable    |
296            FAD::EventHeader::kDwrite     |
297            FAD::EventHeader::kDcmLocked  |
298            FAD::EventHeader::kDcmReady   |
299            FAD::EventHeader::kSpiSclk;
300
301        fStartTime = Time(Time::utc).UnixTime();
302
303        for (int i=0; i<kNumChannels; i++)
304        {
305            fChHeader[i].fId = (i%9) | ((i/9)<<4);
306            fChHeader[i].fRegionOfInterest = 0;
307        }
308
309        // Emit something to be written to the socket
310        fBufCommand.resize(1);
311        AsyncRead(ba::buffer(fBufCommand));
312
313//        AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
314
315//        AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
316//        AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
317
318    }
319
320    vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
321
322    ~tcp_connection()
323    {
324        fSockets.clear();
325    }
326
327    void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
328    {
329        cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
330        cout << ":"<< port << endl;
331        fSockets.push_back(socket);
332    }
333};
334
335
336class tcp_server
337{
338    tcp::acceptor acc0;
339    tcp::acceptor acc1;
340    tcp::acceptor acc2;
341    tcp::acceptor acc3;
342    tcp::acceptor acc4;
343    tcp::acceptor acc5;
344    tcp::acceptor acc6;
345    tcp::acceptor acc7;
346
347    int fPort;
348
349public:
350    tcp_server(ba::io_service& ioservice, int port) :
351        acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
352        acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
353        acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
354        acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
355        acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
356        acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
357        acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
358        acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
359        fPort(port)
360    {
361        // We could start listening for more than one connection
362        // here, but since there is only one handler executed each time
363        // it would not make sense. Before one handle_accept is not
364        // finished no new handle_accept will be called.
365        // Workround: Start a new thread in handle_accept
366        start_accept();
367    }
368
369private:
370    void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
371    {
372        boost::shared_ptr<ba::ip::tcp::socket> connection =
373            boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
374        acc.async_accept(*connection,
375                          boost::bind(&tcp_connection::handle_accept,
376                                      dest, connection,
377                                      acc.local_endpoint().port(),
378                                      ba::placeholders::error));
379    }
380
381    void start_accept()
382    {
383        cout << "Start accept..." << flush;
384        tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service());
385
386        // This will accept a connection without blocking
387        acc0.async_accept(*new_connection,
388                          boost::bind(&tcp_server::handle_accept,
389                                      this,
390                                      new_connection,
391                                      ba::placeholders::error));
392
393        start_accept(new_connection, acc1);
394        start_accept(new_connection, acc2);
395        start_accept(new_connection, acc3);
396        start_accept(new_connection, acc4);
397        start_accept(new_connection, acc5);
398        start_accept(new_connection, acc6);
399        start_accept(new_connection, acc7);
400
401        cout << "start-done." << endl;
402    }
403
404    void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
405    {
406        // The connection has been accepted and is now ready to use
407
408        // not installing a new handler will stop run()
409        cout << "Handle accept..." << flush;
410        if (!error)
411        {
412            new_connection->start();
413
414            // The is now an open connection/server (tcp_connection)
415            // we immediatly schedule another connection
416            // This allowed two client-connection at the same time
417            start_accept();
418        }
419        cout << "handle-done." << endl;
420    }
421};
422
423int main(int argc, const char **argv)
424{
425    try
426    {
427        ba::io_service io_service;
428
429        int port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
430
431        tcp_server server(io_service, port);
432
433        //  ba::add_service(io_service, &server);
434        //  server.add_service(...);
435        //cout << "Run..." << flush;
436
437        // Calling run() from a single thread ensures no concurrent access
438        // of the handler which are called!!!
439        io_service.run();
440
441        //cout << "end." << endl;
442    }
443    catch (std::exception& e)
444    {
445        std::cerr << e.what() << std::endl;
446    }
447
448    return 0;
449}
450/*  ====================== Buffers ===========================
451
452char d1[128]; ba::buffer(d1));
453std::vector<char> d2(128); ba::buffer(d2);
454boost::array<char, 128> d3; by::buffer(d3);
455
456// --------------------------------
457char d1[128];
458std::vector<char> d2(128);
459boost::array<char, 128> d3;
460
461boost::array<mutable_buffer, 3> bufs1 = {
462   ba::buffer(d1),
463   ba::buffer(d2),
464   ba::buffer(d3) };
465sock.read(bufs1);
466
467std::vector<const_buffer> bufs2;
468bufs2.push_back(boost::asio::buffer(d1));
469bufs2.push_back(boost::asio::buffer(d2));
470bufs2.push_back(boost::asio::buffer(d3));
471sock.write(bufs2);
472
473
474// ======================= Read functions =========================
475
476ba::async_read_until --> delimiter
477
478streambuf buf; // Ensure validity until handler!
479by::async_read(s, buf, ....);
480
481ba::async_read(s, ba:buffer(data, size), handler);
482 // Single buffer
483 boost::asio::async_read(s,
484                         ba::buffer(data, size),
485 compl-func -->          ba::transfer_at_least(32),
486                         handler);
487
488 // Multiple buffers
489boost::asio::async_read(s, buffers,
490 compl-func -->         boost::asio::transfer_all(),
491                        handler);
492                        */
493
494// ================= Others ===============================
495
496        /*
497        strand   Provides serialised handler execution.
498        work     Class to inform the io_service when it has work to do.
499
500
501io_service::
502dispatch   Request the io_service to invoke the given handler.
503poll       Run the io_service's event processing loop to execute ready
504           handlers.
505poll_one   Run the io_service's event processing loop to execute one ready
506           handler.
507post       Request the io_service to invoke the given handler and return
508           immediately.
509reset      Reset the io_service in preparation for a subsequent run()
510           invocation.
511run        Run the io_service's event processing loop.
512run_one    Run the io_service's event processing loop to execute at most
513           one handler.
514stop       Stop the io_service's event processing loop.
515wrap       Create a new handler that automatically dispatches the wrapped
516           handler on the io_service.
517
518strand::         The io_service::strand class provides the ability to
519                 post and dispatch handlers with the guarantee that none
520                 of those handlers will execute concurrently.
521
522dispatch         Request the strand to invoke the given handler.
523get_io_service   Get the io_service associated with the strand.
524post             Request the strand to invoke the given handler and return
525                 immediately.
526wrap             Create a new handler that automatically dispatches the
527                 wrapped handler on the strand.
528
529work::           The work class is used to inform the io_service when
530                 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
531get_io_service   Get the io_service associated with the work.
532work             Constructor notifies the io_service that work is starting.
533
534*/
535
536
Note: See TracBrowser for help on using the repository browser.