source: trunk/FACT++/src/fad.cc @ 10870

Last change on this file since 10870 was 10864, checked in by tbretz, 9 years ago
Don't catch exceptions in main; propagare continous number as board id.
File size: 17.5 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba    = boost::asio;
20namespace bs    = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31    const int fBoardId;
32
33    double   fStartTime;
34    uint32_t fRunNumber;
35
36    void AsyncRead(ba::mutable_buffers_1 buffers)
37    {
38        ba::async_read(*this, buffers,
39                       boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
40                                   dummy::error, dummy::bytes_transferred));
41    }
42
43    void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
44    {
45        ba::async_write(*socket, buffers,
46                        boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
47                                    dummy::error, dummy::bytes_transferred));
48    }
49    void AsyncWait(ba::deadline_timer &timer, int seconds,
50                               void (tcp_connection::*handler)(const bs::error_code&))// const
51    {
52        timer.expires_from_now(boost::posix_time::seconds(seconds));
53        timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
54    }
55
56    // The constructor is prvate to force the obtained pointer to be shared
57    tcp_connection(ba::io_service& ioservice, int boardid) : ba::ip::tcp::socket(ioservice),
58        fBoardId(boardid), fTriggerSendData(ioservice)
59    {
60    }
61
62    // Callback when writing was successfull or failed
63    void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
64    {
65        cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
66    }
67
68    vector<uint16_t> fBufCommand;
69    vector<uint16_t> fBuffer;
70
71    vector<uint16_t> fCommand;
72
73    FAD::EventHeader   fHeader;
74    FAD::ChannelHeader fChHeader[kNumChannels];
75
76    ba::deadline_timer fTriggerSendData;
77
78    bool fTriggerEnabled;
79    bool fCommandSocket;
80
81    int fSocket;
82
83    void SendData()
84    {
85        if (!fTriggerEnabled)
86            return;
87
88        fHeader.fPackageLength = sizeof(EventHeader)/2+1;
89        fHeader.fEventCounter++;
90        fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
91
92
93        fBuffer.resize(0);
94
95        for (int i=0; i<kNumChannels; i++)
96        {
97            fChHeader[i].fStartCell = i*10;
98
99            const vector<uint16_t> buf = fChHeader[i].HtoN();
100
101            fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
102            fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
103
104            fHeader.fPackageLength += sizeof(ChannelHeader)/2;
105            fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
106        }
107
108        fBuffer.push_back(htons(FAD::kDelimiterEnd));
109
110        const vector<uint16_t> h = fHeader.HtoN();
111
112        fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
113
114        if (fCommandSocket)
115            AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
116        else
117        {
118            if (fSockets.size()==0)
119                return;
120
121            fSocket++;
122            fSocket %= fSockets.size();
123
124            AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
125        }
126    }
127
128    void TriggerSendData(const boost::system::error_code &ec)
129    {
130        if (!is_open())
131        {
132            // For example: Here we could schedule a new accept if we
133            // would not want to allow two connections at the same time.
134            return;
135        }
136
137        if (ec==ba::error::basic_errors::operation_aborted)
138            return;
139
140        // Check whether the deadline has passed. We compare the deadline
141        // against the current time since a new asynchronous operation
142        // may have moved the deadline before this actor had a chance
143        // to run.
144        if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
145            return;
146
147        // The deadline has passed.
148        SendData();
149
150        AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
151    }
152
153    void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
154    {
155        // Do not schedule a new read if the connection failed.
156        if (bytes_received==0)
157        {
158            // Close the connection
159            close();
160            return;
161        }
162
163        // No command received yet
164        if (fCommand.size()==0)
165        {
166            transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
167                      fBufCommand.begin(), ntohs);
168
169            switch (fBufCommand[0])
170            {
171            case kCmdDrsEnable:
172            case kCmdDrsEnable+0x100:
173                fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
174                cout << "-> DrsEnable" << endl;
175                break;
176
177            case kCmdDwrite:
178            case kCmdDwrite+0x100:
179                fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
180                cout << "-> Dwrite" << endl;
181                break;
182
183            case kCmdTriggerLine:
184            case kCmdTriggerLine+0x100:
185                cout << "-> Trigger line" << endl;
186                fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
187                break;
188
189            case kCmdSclk:
190            case kCmdSclk+0x100:
191                cout << "-> Sclk" << endl;
192                fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
193                break;
194
195            case kCmdSrclk:
196            case kCmdSrclk+0x100:
197                cout << "-> Drclk" << endl;
198                break;
199
200            case kCmdRun:
201            case kCmdRun+0x100:
202                fStartTime = Time(Time::utc).UnixTime();
203                cout << "-> Run" << endl;
204                break;
205
206            case kCmdSocket:
207            case kCmdSocket+0x100:
208                cout << "-> Socket" << endl;
209                fCommandSocket = fBufCommand[0]==kCmdSocket;
210                break;
211
212            case kCmdContTriggerOn:
213            case kCmdContTriggerOff:
214                if (fBufCommand[0]==kCmdContTriggerOn)
215                    AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
216                else
217                    fTriggerSendData.cancel();
218                cout << "-> ContTrig" << endl;
219                break;
220
221            case kCmdResetTriggerId:
222                cout << "-> Reset" << endl;
223                fHeader.fEventCounter = 0;
224                break;
225
226            case kCmdSingleTrigger:
227                cout << "-> Trigger" << endl;
228                SendData();
229                break;
230
231            default:
232                if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
233                {
234                    fCommand.resize(2);
235                    fCommand[0] = kCmdWriteRoi;
236                    fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
237                    break;
238                }
239                if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
240                {
241                    fCommand.resize(2);
242                    fCommand[0] = kCmdWriteDac;
243                    fCommand[1] = fBufCommand[0]-kCmdWriteDac;
244                    break;
245                }
246
247                cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
248                cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
249                return;
250            }
251
252            fBufCommand.resize(1);
253            AsyncRead(ba::buffer(fBufCommand));
254            return;
255        }
256
257        transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
258                  fBufCommand.begin(), ntohs);
259
260        switch (fCommand[0])
261        {
262        case kCmdWriteRoi:
263            cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
264            fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
265            break;
266
267        case kCmdWriteDac:
268            cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
269            fHeader.fDac[fCommand[1]] = fBufCommand[0];
270            break;
271        }
272
273        fCommand.resize(0);
274
275        fBufCommand.resize(1);
276        AsyncRead(ba::buffer(fBufCommand));
277    }
278
279public:
280    typedef boost::shared_ptr<tcp_connection> shared_ptr;
281
282    static shared_ptr create(ba::io_service& io_service, int boardid)
283    {
284        return shared_ptr(new tcp_connection(io_service, boardid));
285    }
286
287    void start()
288    {
289        // Ownership of buffer must be valid until Handler is called.
290
291        fTriggerEnabled=false;
292        fCommandSocket=true;
293
294        fHeader.fStartDelimiter = FAD::kDelimiterStart;
295        fHeader.fVersion = 0x104;
296        fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
297        fHeader.fStatus = 0xf<<12 |
298            FAD::EventHeader::kDenable    |
299            FAD::EventHeader::kDwrite     |
300            FAD::EventHeader::kDcmLocked  |
301            FAD::EventHeader::kDcmReady   |
302            FAD::EventHeader::kSpiSclk;
303
304        fStartTime = Time(Time::utc).UnixTime();
305
306        for (int i=0; i<kNumChannels; i++)
307        {
308            fChHeader[i].fId = (i%9) | ((i/9)<<4);
309            fChHeader[i].fRegionOfInterest = 0;
310        }
311
312        // Emit something to be written to the socket
313        fBufCommand.resize(1);
314        AsyncRead(ba::buffer(fBufCommand));
315
316//        AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
317
318//        AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
319//        AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
320
321    }
322
323    vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
324
325    ~tcp_connection()
326    {
327        fSockets.clear();
328    }
329
330    void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
331    {
332        cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
333        cout << ":"<< port << endl;
334        fSockets.push_back(socket);
335    }
336};
337
338
339class tcp_server
340{
341    tcp::acceptor acc0;
342    tcp::acceptor acc1;
343    tcp::acceptor acc2;
344    tcp::acceptor acc3;
345    tcp::acceptor acc4;
346    tcp::acceptor acc5;
347    tcp::acceptor acc6;
348    tcp::acceptor acc7;
349
350    int fBoardId;
351
352public:
353    tcp_server(ba::io_service& ioservice, int port, int board) :
354        acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
355        acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
356        acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
357        acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
358        acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
359        acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
360        acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
361        acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
362        fBoardId(board)
363    {
364        // We could start listening for more than one connection
365        // here, but since there is only one handler executed each time
366        // it would not make sense. Before one handle_accept is not
367        // finished no new handle_accept will be called.
368        // Workround: Start a new thread in handle_accept
369        start_accept();
370    }
371
372private:
373    void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
374    {
375        boost::shared_ptr<ba::ip::tcp::socket> connection =
376            boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
377        acc.async_accept(*connection,
378                          boost::bind(&tcp_connection::handle_accept,
379                                      dest, connection,
380                                      acc.local_endpoint().port(),
381                                      ba::placeholders::error));
382    }
383
384    void start_accept()
385    {
386        cout << "Start accept " << acc0.local_endpoint().port() << "..." << flush;
387        tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service(), fBoardId);
388
389        // This will accept a connection without blocking
390        acc0.async_accept(*new_connection,
391                          boost::bind(&tcp_server::handle_accept,
392                                      this,
393                                      new_connection,
394                                      ba::placeholders::error));
395
396        start_accept(new_connection, acc1);
397        start_accept(new_connection, acc2);
398        start_accept(new_connection, acc3);
399        start_accept(new_connection, acc4);
400        start_accept(new_connection, acc5);
401        start_accept(new_connection, acc6);
402        start_accept(new_connection, acc7);
403
404        cout << "start-done." << endl;
405    }
406
407    void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
408    {
409        // The connection has been accepted and is now ready to use
410
411        // not installing a new handler will stop run()
412        cout << "Handle accept..." << flush;
413        if (!error)
414        {
415            new_connection->start();
416
417            // The is now an open connection/server (tcp_connection)
418            // we immediatly schedule another connection
419            // This allowed two client-connection at the same time
420            start_accept();
421        }
422        cout << "handle-done." << endl;
423    }
424};
425
426int main(int argc, const char **argv)
427{
428    //try
429    {
430        ba::io_service io_service;
431
432        int port = argc>=2 ? lexical_cast<int>(argv[1]) : 5000;
433        int n    = argc==3 ? lexical_cast<int>(argv[2]) :    1;
434
435        vector<shared_ptr<tcp_server>> servers;
436
437        for (int i=0; i<n; i++)
438        {
439            shared_ptr<tcp_server> server(new tcp_server(io_service, port, i));
440            servers.push_back(server);
441
442            port += 8;
443        }
444
445        //  ba::add_service(io_service, &server);
446        //  server.add_service(...);
447        //cout << "Run..." << flush;
448
449        // Calling run() from a single thread ensures no concurrent access
450        // of the handler which are called!!!
451        io_service.run();
452
453        //cout << "end." << endl;
454    }
455    /*catch (std::exception& e)
456    {
457        std::cerr << e.what() << std::endl;
458    }*/
459
460    return 0;
461}
462/*  ====================== Buffers ===========================
463
464char d1[128]; ba::buffer(d1));
465std::vector<char> d2(128); ba::buffer(d2);
466boost::array<char, 128> d3; by::buffer(d3);
467
468// --------------------------------
469char d1[128];
470std::vector<char> d2(128);
471boost::array<char, 128> d3;
472
473boost::array<mutable_buffer, 3> bufs1 = {
474   ba::buffer(d1),
475   ba::buffer(d2),
476   ba::buffer(d3) };
477sock.read(bufs1);
478
479std::vector<const_buffer> bufs2;
480bufs2.push_back(boost::asio::buffer(d1));
481bufs2.push_back(boost::asio::buffer(d2));
482bufs2.push_back(boost::asio::buffer(d3));
483sock.write(bufs2);
484
485
486// ======================= Read functions =========================
487
488ba::async_read_until --> delimiter
489
490streambuf buf; // Ensure validity until handler!
491by::async_read(s, buf, ....);
492
493ba::async_read(s, ba:buffer(data, size), handler);
494 // Single buffer
495 boost::asio::async_read(s,
496                         ba::buffer(data, size),
497 compl-func -->          ba::transfer_at_least(32),
498                         handler);
499
500 // Multiple buffers
501boost::asio::async_read(s, buffers,
502 compl-func -->         boost::asio::transfer_all(),
503                        handler);
504                        */
505
506// ================= Others ===============================
507
508        /*
509        strand   Provides serialised handler execution.
510        work     Class to inform the io_service when it has work to do.
511
512
513io_service::
514dispatch   Request the io_service to invoke the given handler.
515poll       Run the io_service's event processing loop to execute ready
516           handlers.
517poll_one   Run the io_service's event processing loop to execute one ready
518           handler.
519post       Request the io_service to invoke the given handler and return
520           immediately.
521reset      Reset the io_service in preparation for a subsequent run()
522           invocation.
523run        Run the io_service's event processing loop.
524run_one    Run the io_service's event processing loop to execute at most
525           one handler.
526stop       Stop the io_service's event processing loop.
527wrap       Create a new handler that automatically dispatches the wrapped
528           handler on the io_service.
529
530strand::         The io_service::strand class provides the ability to
531                 post and dispatch handlers with the guarantee that none
532                 of those handlers will execute concurrently.
533
534dispatch         Request the strand to invoke the given handler.
535get_io_service   Get the io_service associated with the strand.
536post             Request the strand to invoke the given handler and return
537                 immediately.
538wrap             Create a new handler that automatically dispatches the
539                 wrapped handler on the strand.
540
541work::           The work class is used to inform the io_service when
542                 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
543get_io_service   Get the io_service associated with the work.
544work             Constructor notifies the io_service that work is starting.
545
546*/
547
548
Note: See TracBrowser for help on using the repository browser.