source: trunk/FACT++/src/fad.cc @ 10851

Last change on this file since 10851 was 10851, checked in by tbretz, 8 years ago
Added support for switching between command socket and data sockets.
File size: 16.9 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba    = boost::asio;
20namespace bs    = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31
32    double   fStartTime;
33    uint32_t fRunNumber;
34
35    void AsyncRead(ba::mutable_buffers_1 buffers)
36    {
37        ba::async_read(*this, buffers,
38                       boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39                                   dummy::error, dummy::bytes_transferred));
40    }
41
42    void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
43    {
44        ba::async_write(*socket, buffers,
45                        boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46                                    dummy::error, dummy::bytes_transferred));
47    }
48    void AsyncWait(ba::deadline_timer &timer, int seconds,
49                               void (tcp_connection::*handler)(const bs::error_code&))// const
50    {
51        timer.expires_from_now(boost::posix_time::seconds(seconds));
52        timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53    }
54
55    // The constructor is prvate to force the obtained pointer to be shared
56    tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
57        fTriggerSendData(ioservice)
58    {
59    }
60
61    // Callback when writing was successfull or failed
62    void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63    {
64        cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65    }
66
67    vector<uint16_t> fBufCommand;
68    vector<uint16_t> fBuffer;
69
70    vector<uint16_t> fCommand;
71
72    FAD::EventHeader   fHeader;
73    FAD::ChannelHeader fChHeader[kNumChannels];
74
75    ba::deadline_timer fTriggerSendData;
76
77    bool fTriggerEnabled;
78    bool fCommandSocket;
79
80    int fSocket;
81
82    void SendData()
83    {
84        if (!fTriggerEnabled)
85            return;
86
87        fHeader.fPackageLength = sizeof(EventHeader)/2+1;
88        fHeader.fEventCounter++;
89        fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
90
91
92        fBuffer.resize(0);
93
94        for (int i=0; i<kNumChannels; i++)
95        {
96            fChHeader[i].fStartCell = i*10;
97
98            const vector<uint16_t> buf = fChHeader[i].HtoN();
99
100            fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
101            fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
102
103            fHeader.fPackageLength += sizeof(ChannelHeader)/2;
104            fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
105        }
106
107        fBuffer.push_back(htons(FAD::kDelimiterEnd));
108
109        const vector<uint16_t> h = fHeader.HtoN();
110
111        fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
112
113        if (fCommandSocket)
114            AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
115        else
116        {
117            fSocket++;
118            fSocket %= fSockets.size();
119
120            AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
121        }
122    }
123
124    void TriggerSendData(const boost::system::error_code &ec)
125    {
126        if (!is_open())
127        {
128            // For example: Here we could schedule a new accept if we
129            // would not want to allow two connections at the same time.
130            return;
131        }
132
133        if (ec==ba::error::basic_errors::operation_aborted)
134            return;
135
136        // Check whether the deadline has passed. We compare the deadline
137        // against the current time since a new asynchronous operation
138        // may have moved the deadline before this actor had a chance
139        // to run.
140        if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
141            return;
142
143        // The deadline has passed.
144        SendData();
145
146        AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
147    }
148
149    void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
150    {
151        // Do not schedule a new read if the connection failed.
152        if (bytes_received==0)
153        {
154            // Close the connection
155            close();
156            return;
157        }
158
159        // No command received yet
160        if (fCommand.size()==0)
161        {
162            transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
163                      fBufCommand.begin(), ntohs);
164
165            switch (fBufCommand[0])
166            {
167            case kCmdDrsEnable:
168            case kCmdDrsEnable+0x100:
169                fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
170                cout << "-> DrsEnable" << endl;
171                break;
172
173            case kCmdDwrite:
174            case kCmdDwrite+0x100:
175                fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
176                cout << "-> Dwrite" << endl;
177                break;
178
179            case kCmdTriggerLine:
180            case kCmdTriggerLine+0x100:
181                cout << "-> Trigger line" << endl;
182                fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
183                break;
184
185            case kCmdSclk:
186            case kCmdSclk+0x100:
187                cout << "-> Sclk" << endl;
188                fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
189                break;
190
191            case kCmdSrclk:
192            case kCmdSrclk+0x100:
193                cout << "-> Drclk" << endl;
194                break;
195
196            case kCmdRun:
197            case kCmdRun+0x100:
198                cout << "-> Run" << endl;
199                break;
200
201            case kCmdSocket:
202            case kCmdSocket+0x100:
203                cout << "-> Socket" << endl;
204                fCommandSocket = fBufCommand[0]==kCmdSocket;
205                break;
206
207            case kCmdContTriggerOn:
208            case kCmdContTriggerOff:
209                if (fBufCommand[0]==kCmdContTriggerOn)
210                    AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
211                else
212                    fTriggerSendData.cancel();
213                cout << "-> ContTrig" << endl;
214                break;
215
216            case kCmdResetTriggerId:
217                cout << "-> Reset" << endl;
218                fHeader.fEventCounter = 0;
219                break;
220
221            case kCmdSingleTrigger:
222                cout << "-> Trigger" << endl;
223                SendData();
224                break;
225
226            default:
227                if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
228                {
229                    fCommand.resize(2);
230                    fCommand[0] = kCmdWriteRoi;
231                    fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
232                    break;
233                }
234                if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
235                {
236                    fCommand.resize(2);
237                    fCommand[0] = kCmdWriteDac;
238                    fCommand[1] = fBufCommand[0]-kCmdWriteDac;
239                    break;
240                }
241
242                cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
243                cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
244                return;
245            }
246
247            fBufCommand.resize(1);
248            AsyncRead(ba::buffer(fBufCommand));
249            return;
250        }
251
252        transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
253                  fBufCommand.begin(), ntohs);
254
255        switch (fCommand[0])
256        {
257        case kCmdWriteRoi:
258            cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
259            fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
260            break;
261
262        case kCmdWriteDac:
263            cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
264            fHeader.fDac[fCommand[1]] = fBufCommand[0];
265            break;
266        }
267
268        fCommand.resize(0);
269
270        fBufCommand.resize(1);
271        AsyncRead(ba::buffer(fBufCommand));
272    }
273
274public:
275    typedef boost::shared_ptr<tcp_connection> shared_ptr;
276
277    static shared_ptr create(ba::io_service& io_service)
278    {
279        return shared_ptr(new tcp_connection(io_service));
280    }
281
282    void start()
283    {
284        // Ownership of buffer must be valid until Handler is called.
285
286        fTriggerEnabled=false;
287        fCommandSocket=true;
288
289        fHeader.fStartDelimiter = FAD::kDelimiterStart;
290        fHeader.fVersion = 0x104;
291        fHeader.fStatus = 0xf<<12 |
292            FAD::EventHeader::kDenable    |
293            FAD::EventHeader::kDwrite     |
294            FAD::EventHeader::kDcmLocked  |
295            FAD::EventHeader::kDcmReady   |
296            FAD::EventHeader::kSpiSclk;
297
298        fStartTime = Time(Time::utc).UnixTime();
299
300        for (int i=0; i<kNumChannels; i++)
301        {
302            fChHeader[i].fId = (i%9) | ((i/9)<<4);
303            fChHeader[i].fRegionOfInterest = 0;
304        }
305
306        // Emit something to be written to the socket
307        fBufCommand.resize(1);
308        AsyncRead(ba::buffer(fBufCommand));
309
310//        AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
311
312//        AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
313//        AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
314
315    }
316
317    vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
318
319    ~tcp_connection()
320    {
321        fSockets.clear();
322    }
323
324    void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
325    {
326        cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
327        cout << ":"<< port << endl;
328        fSockets.push_back(socket);
329    }
330};
331
332
333class tcp_server
334{
335    tcp::acceptor acc0;
336    tcp::acceptor acc1;
337    tcp::acceptor acc2;
338    tcp::acceptor acc3;
339    tcp::acceptor acc4;
340    tcp::acceptor acc5;
341    tcp::acceptor acc6;
342    tcp::acceptor acc7;
343
344    int fPort;
345
346public:
347    tcp_server(ba::io_service& ioservice, int port) :
348        acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
349        acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
350        acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
351        acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
352        acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
353        acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
354        acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
355        acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
356        fPort(port)
357    {
358        // We could start listening for more than one connection
359        // here, but since there is only one handler executed each time
360        // it would not make sense. Before one handle_accept is not
361        // finished no new handle_accept will be called.
362        // Workround: Start a new thread in handle_accept
363        start_accept();
364    }
365
366private:
367    void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
368    {
369        boost::shared_ptr<ba::ip::tcp::socket> connection =
370            boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
371        acc.async_accept(*connection,
372                          boost::bind(&tcp_connection::handle_accept,
373                                      dest, connection,
374                                      acc.local_endpoint().port(),
375                                      ba::placeholders::error));
376    }
377
378    void start_accept()
379    {
380        cout << "Start accept..." << flush;
381        tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service());
382
383        // This will accept a connection without blocking
384        acc0.async_accept(*new_connection,
385                          boost::bind(&tcp_server::handle_accept,
386                                      this,
387                                      new_connection,
388                                      ba::placeholders::error));
389
390        start_accept(new_connection, acc1);
391        start_accept(new_connection, acc2);
392        start_accept(new_connection, acc3);
393        start_accept(new_connection, acc4);
394        start_accept(new_connection, acc5);
395        start_accept(new_connection, acc6);
396        start_accept(new_connection, acc7);
397
398        cout << "start-done." << endl;
399    }
400
401    void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
402    {
403        // The connection has been accepted and is now ready to use
404
405        // not installing a new handler will stop run()
406        cout << "Handle accept..." << flush;
407        if (!error)
408        {
409            new_connection->start();
410
411            // The is now an open connection/server (tcp_connection)
412            // we immediatly schedule another connection
413            // This allowed two client-connection at the same time
414            start_accept();
415        }
416        cout << "handle-done." << endl;
417    }
418};
419
420int main(int argc, const char **argv)
421{
422    try
423    {
424        ba::io_service io_service;
425
426        int port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
427
428        tcp_server server(io_service, port);
429
430        //  ba::add_service(io_service, &server);
431        //  server.add_service(...);
432        //cout << "Run..." << flush;
433
434        // Calling run() from a single thread ensures no concurrent access
435        // of the handler which are called!!!
436        io_service.run();
437
438        //cout << "end." << endl;
439    }
440    catch (std::exception& e)
441    {
442        std::cerr << e.what() << std::endl;
443    }
444
445    return 0;
446}
447/*  ====================== Buffers ===========================
448
449char d1[128]; ba::buffer(d1));
450std::vector<char> d2(128); ba::buffer(d2);
451boost::array<char, 128> d3; by::buffer(d3);
452
453// --------------------------------
454char d1[128];
455std::vector<char> d2(128);
456boost::array<char, 128> d3;
457
458boost::array<mutable_buffer, 3> bufs1 = {
459   ba::buffer(d1),
460   ba::buffer(d2),
461   ba::buffer(d3) };
462sock.read(bufs1);
463
464std::vector<const_buffer> bufs2;
465bufs2.push_back(boost::asio::buffer(d1));
466bufs2.push_back(boost::asio::buffer(d2));
467bufs2.push_back(boost::asio::buffer(d3));
468sock.write(bufs2);
469
470
471// ======================= Read functions =========================
472
473ba::async_read_until --> delimiter
474
475streambuf buf; // Ensure validity until handler!
476by::async_read(s, buf, ....);
477
478ba::async_read(s, ba:buffer(data, size), handler);
479 // Single buffer
480 boost::asio::async_read(s,
481                         ba::buffer(data, size),
482 compl-func -->          ba::transfer_at_least(32),
483                         handler);
484
485 // Multiple buffers
486boost::asio::async_read(s, buffers,
487 compl-func -->         boost::asio::transfer_all(),
488                        handler);
489                        */
490
491// ================= Others ===============================
492
493        /*
494        strand   Provides serialised handler execution.
495        work     Class to inform the io_service when it has work to do.
496
497
498io_service::
499dispatch   Request the io_service to invoke the given handler.
500poll       Run the io_service's event processing loop to execute ready
501           handlers.
502poll_one   Run the io_service's event processing loop to execute one ready
503           handler.
504post       Request the io_service to invoke the given handler and return
505           immediately.
506reset      Reset the io_service in preparation for a subsequent run()
507           invocation.
508run        Run the io_service's event processing loop.
509run_one    Run the io_service's event processing loop to execute at most
510           one handler.
511stop       Stop the io_service's event processing loop.
512wrap       Create a new handler that automatically dispatches the wrapped
513           handler on the io_service.
514
515strand::         The io_service::strand class provides the ability to
516                 post and dispatch handlers with the guarantee that none
517                 of those handlers will execute concurrently.
518
519dispatch         Request the strand to invoke the given handler.
520get_io_service   Get the io_service associated with the strand.
521post             Request the strand to invoke the given handler and return
522                 immediately.
523wrap             Create a new handler that automatically dispatches the
524                 wrapped handler on the strand.
525
526work::           The work class is used to inform the io_service when
527                 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
528get_io_service   Get the io_service associated with the work.
529work             Constructor notifies the io_service that work is starting.
530
531*/
532
533
Note: See TracBrowser for help on using the repository browser.