source: trunk/FACT++/src/fad.cc @ 10846

Last change on this file since 10846 was 10846, checked in by tbretz, 8 years ago
Start to listen on eight ports like the FAD boards.
File size: 15.0 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba    = boost::asio;
20namespace bs    = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26// ------------------------------------------------------------------------
27
28class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
29{
30private:
31
32    double   fStartTime;
33    uint32_t fRunNumber;
34
35    void AsyncRead(ba::mutable_buffers_1 buffers)
36    {
37        ba::async_read(*this, buffers,
38                       boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
39                                   dummy::error, dummy::bytes_transferred));
40    }
41
42    void AsyncWrite(const ba::const_buffers_1 &buffers)
43    {
44        ba::async_write(*this, buffers,
45                        boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
46                                    dummy::error, dummy::bytes_transferred));
47    }
48    void AsyncWait(ba::deadline_timer &timer, int seconds,
49                               void (tcp_connection::*handler)(const bs::error_code&))// const
50    {
51        timer.expires_from_now(boost::posix_time::seconds(seconds));
52        timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
53    }
54
55    // The constructor is prvate to force the obtained pointer to be shared
56    tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
57        fTriggerSendData(ioservice)
58    {
59    }
60
61    // Callback when writing was successfull or failed
62    void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
63    {
64        cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
65    }
66
67    vector<uint16_t> fBufCommand;
68    vector<uint16_t> fBuffer;
69
70    vector<uint16_t> fCommand;
71
72    FAD::EventHeader   fHeader;
73    FAD::ChannelHeader fChHeader[kNumChannels];
74
75    ba::deadline_timer fTriggerSendData;
76
77    bool fTriggerEnabled;
78
79    void SendData()
80    {
81        if (!fTriggerEnabled)
82            return;
83
84        fHeader.fPackageLength = sizeof(EventHeader)/2+1;
85        fHeader.fEventCounter++;
86        fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
87
88
89        fBuffer.resize(0);
90
91        for (int i=0; i<kNumChannels; i++)
92        {
93            fChHeader[i].fStartCell = i*10;
94
95            const vector<uint16_t> buf = fChHeader[i].HtoN();
96
97            fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
98            fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
99
100            fHeader.fPackageLength += sizeof(ChannelHeader)/2;
101            fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
102        }
103
104        fBuffer.push_back(htons(FAD::kDelimiterEnd));
105
106        const vector<uint16_t> h = fHeader.HtoN();
107
108        fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
109
110        AsyncWrite(ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
111    }
112
113    void TriggerSendData(const boost::system::error_code &ec)
114    {
115        if (!is_open())
116        {
117            // For example: Here we could schedule a new accept if we
118            // would not want to allow two connections at the same time.
119            return;
120        }
121
122        if (ec==ba::error::basic_errors::operation_aborted)
123            return;
124
125        // Check whether the deadline has passed. We compare the deadline
126        // against the current time since a new asynchronous operation
127        // may have moved the deadline before this actor had a chance
128        // to run.
129        if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
130            return;
131
132        // The deadline has passed.
133        SendData();
134
135        AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
136    }
137
138    void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
139    {
140        // Do not schedule a new read if the connection failed.
141        if (bytes_received==0)
142        {
143            // Close the connection
144            close();
145            return;
146        }
147
148        // No command received yet
149        if (fCommand.size()==0)
150        {
151            transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
152                      fBufCommand.begin(), ntohs);
153
154            switch (fBufCommand[0])
155            {
156            case kCmdDrsEnable:
157            case kCmdDrsEnable+0x100:
158                fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
159                cout << "-> DrsEnable" << endl;
160                break;
161
162            case kCmdDwrite:
163            case kCmdDwrite+0x100:
164                fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
165                cout << "-> Dwrite" << endl;
166                break;
167
168            case kCmdTriggerLine:
169            case kCmdTriggerLine+0x100:
170                cout << "-> Trigger line" << endl;
171                fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
172                break;
173
174            case kCmdSclk:
175            case kCmdSclk+0x100:
176                cout << "-> Sclk" << endl;
177                fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
178                break;
179
180            case kCmdSrclk:
181            case kCmdSrclk+0x100:
182                cout << "-> Drclk" << endl;
183                break;
184
185            case kCmdRun:
186            case kCmdRun+0x100:
187                cout << "-> Run" << endl;
188                break;
189
190            case kCmdContTriggerOn:
191            case kCmdContTriggerOff:
192                if (fBufCommand[0]==kCmdContTriggerOn)
193                    AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
194                else
195                    fTriggerSendData.cancel();
196                cout << "-> ContTrig" << endl;
197                break;
198
199            case kCmdResetTriggerId:
200                cout << "-> Reset" << endl;
201                fHeader.fEventCounter = 0;
202                break;
203
204            case kCmdSingleTrigger:
205                cout << "-> Trigger" << endl;
206                SendData();
207                break;
208
209            default:
210                if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
211                {
212                    fCommand.resize(2);
213                    fCommand[0] = kCmdWriteRoi;
214                    fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
215                    break;
216                }
217                if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
218                {
219                    fCommand.resize(2);
220                    fCommand[0] = kCmdWriteDac;
221                    fCommand[1] = fBufCommand[0]-kCmdWriteDac;
222                    break;
223                }
224
225                cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
226                cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
227                return;
228            }
229
230            fBufCommand.resize(1);
231            AsyncRead(ba::buffer(fBufCommand));
232            return;
233        }
234
235        transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
236                  fBufCommand.begin(), ntohs);
237
238        switch (fCommand[0])
239        {
240        case kCmdWriteRoi:
241            cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
242            fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
243            break;
244
245        case kCmdWriteDac:
246            cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
247            fHeader.fDac[fCommand[1]] = fBufCommand[0];
248            break;
249        }
250
251        fCommand.resize(0);
252
253        fBufCommand.resize(1);
254        AsyncRead(ba::buffer(fBufCommand));
255    }
256
257public:
258    typedef boost::shared_ptr<tcp_connection> shared_ptr;
259
260    static shared_ptr create(ba::io_service& io_service)
261    {
262        return shared_ptr(new tcp_connection(io_service));
263    }
264
265    void start()
266    {
267        // Ownership of buffer must be valid until Handler is called.
268
269        fHeader.fStartDelimiter = FAD::kDelimiterStart;
270        fHeader.fVersion = 0x104;
271        fHeader.fStatus = 0xf<<12 |
272            FAD::EventHeader::kDenable    |
273            FAD::EventHeader::kDwrite     |
274            FAD::EventHeader::kDcmLocked  |
275            FAD::EventHeader::kDcmReady   |
276            FAD::EventHeader::kSpiSclk;
277
278        fStartTime = Time(Time::utc).UnixTime();
279
280        for (int i=0; i<kNumChannels; i++)
281        {
282            fChHeader[i].fId = (i%9) | ((i/9)<<4);
283            fChHeader[i].fRegionOfInterest = 0;
284        }
285
286        // Emit something to be written to the socket
287        fBufCommand.resize(1);
288        AsyncRead(ba::buffer(fBufCommand));
289
290//        AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
291
292//        AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
293//        AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
294
295    }
296};
297
298
299class tcp_server : public tcp::acceptor
300{
301public:
302    tcp_server(ba::io_service& ioservice, int port) :
303        tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
304
305    {
306        // We could start listening for more than one connection
307        // here, but since there is only one handler executed each time
308        // it would not make sense. Before one handle_accept is not
309        // finished no new handle_accept will be called.
310        // Workround: Start a new thread in handle_accept
311        start_accept();
312    }
313
314private:
315    void start_accept()
316    {
317        cout << "Start accept..." << flush;
318        tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/io_service());
319
320        // This will accept a connection without blocking
321        async_accept(*new_connection,
322                     boost::bind(&tcp_server::handle_accept,
323                                 this,
324                                 new_connection,
325                                 ba::placeholders::error));
326
327
328        cout << "start-done." << endl;
329    }
330
331    void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
332    {
333        // The connection has been accepted and is now ready to use
334
335        // not installing a new handler will stop run()
336        cout << "Handle accept..." << flush;
337        if (!error)
338        {
339            new_connection->start();
340
341            // The is now an open connection/server (tcp_connection)
342            // we immediatly schedule another connection
343            // This allowed two client-connection at the same time
344            start_accept();
345        }
346        cout << "handle-done." << endl;
347    }
348};
349
350int main(int argc, const char **argv)
351{
352    try
353    {
354        ba::io_service io_service;
355
356        int port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
357
358        tcp_server server(io_service, port);
359
360        tcp::acceptor acc1(io_service, tcp::endpoint(tcp::v4(), port+1));
361        tcp::acceptor acc2(io_service, tcp::endpoint(tcp::v4(), port+2));
362        tcp::acceptor acc3(io_service, tcp::endpoint(tcp::v4(), port+3));
363        tcp::acceptor acc4(io_service, tcp::endpoint(tcp::v4(), port+4));
364        tcp::acceptor acc5(io_service, tcp::endpoint(tcp::v4(), port+5));
365        tcp::acceptor acc6(io_service, tcp::endpoint(tcp::v4(), port+6));
366        tcp::acceptor acc7(io_service, tcp::endpoint(tcp::v4(), port+7));
367
368        //  ba::add_service(io_service, &server);
369        //  server.add_service(...);
370        cout << "Run..." << flush;
371
372        // Calling run() from a single thread ensures no concurrent access
373        // of the handler which are called!!!
374        io_service.run();
375
376        cout << "end." << endl;
377    }
378    catch (std::exception& e)
379    {
380        std::cerr << e.what() << std::endl;
381    }
382
383    return 0;
384}
385/*  ====================== Buffers ===========================
386
387char d1[128]; ba::buffer(d1));
388std::vector<char> d2(128); ba::buffer(d2);
389boost::array<char, 128> d3; by::buffer(d3);
390
391// --------------------------------
392char d1[128];
393std::vector<char> d2(128);
394boost::array<char, 128> d3;
395
396boost::array<mutable_buffer, 3> bufs1 = {
397   ba::buffer(d1),
398   ba::buffer(d2),
399   ba::buffer(d3) };
400sock.read(bufs1);
401
402std::vector<const_buffer> bufs2;
403bufs2.push_back(boost::asio::buffer(d1));
404bufs2.push_back(boost::asio::buffer(d2));
405bufs2.push_back(boost::asio::buffer(d3));
406sock.write(bufs2);
407
408
409// ======================= Read functions =========================
410
411ba::async_read_until --> delimiter
412
413streambuf buf; // Ensure validity until handler!
414by::async_read(s, buf, ....);
415
416ba::async_read(s, ba:buffer(data, size), handler);
417 // Single buffer
418 boost::asio::async_read(s,
419                         ba::buffer(data, size),
420 compl-func -->          ba::transfer_at_least(32),
421                         handler);
422
423 // Multiple buffers
424boost::asio::async_read(s, buffers,
425 compl-func -->         boost::asio::transfer_all(),
426                        handler);
427                        */
428
429// ================= Others ===============================
430
431        /*
432        strand   Provides serialised handler execution.
433        work     Class to inform the io_service when it has work to do.
434
435
436io_service::
437dispatch   Request the io_service to invoke the given handler.
438poll       Run the io_service's event processing loop to execute ready
439           handlers.
440poll_one   Run the io_service's event processing loop to execute one ready
441           handler.
442post       Request the io_service to invoke the given handler and return
443           immediately.
444reset      Reset the io_service in preparation for a subsequent run()
445           invocation.
446run        Run the io_service's event processing loop.
447run_one    Run the io_service's event processing loop to execute at most
448           one handler.
449stop       Stop the io_service's event processing loop.
450wrap       Create a new handler that automatically dispatches the wrapped
451           handler on the io_service.
452
453strand::         The io_service::strand class provides the ability to
454                 post and dispatch handlers with the guarantee that none
455                 of those handlers will execute concurrently.
456
457dispatch         Request the strand to invoke the given handler.
458get_io_service   Get the io_service associated with the strand.
459post             Request the strand to invoke the given handler and return
460                 immediately.
461wrap             Create a new handler that automatically dispatches the
462                 wrapped handler on the strand.
463
464work::           The work class is used to inform the io_service when
465                 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
466get_io_service   Get the io_service associated with the work.
467work             Constructor notifies the io_service that work is starting.
468
469*/
470
471
Note: See TracBrowser for help on using the repository browser.