source: trunk/FACT++/src/ftm.cc @ 10823

Last change on this file since 10823 was 10823, checked in by tbretz, 9 years ago
Handle operation_aborted in timeouts; multiply timestamp by 10000; removed obsolete deadline_ data member
File size: 19.8 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFTM.h"
15
16using namespace std;
17using namespace FTM;
18
19namespace ba    = boost::asio;
20namespace bs    = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26int Port = 0;
27
28// ------------------------------------------------------------------------
29
30
31// ------------------------------------------------------------------------
32
33class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
34{
35private:
36
37    double fStartTime;
38
39    void AsyncRead(ba::mutable_buffers_1 buffers)
40    {
41        ba::async_read(*this, buffers,
42                       boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
43                                   dummy::error, dummy::bytes_transferred));
44    }
45
46    void AsyncWrite(const ba::const_buffers_1 &buffers)
47    {
48        ba::async_write(*this, buffers,
49                        boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
50                                    dummy::error, dummy::bytes_transferred));
51    }
52    void AsyncWait(ba::deadline_timer &timer, int seconds,
53                               void (tcp_connection::*handler)(const bs::error_code&))// const
54    {
55        timer.expires_from_now(boost::posix_time::seconds(seconds));
56        timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
57    }
58
59    ba::deadline_timer fTriggerDynData;
60
61    // The constructor is prvate to force the obtained pointer to be shared
62    tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
63        fTriggerDynData(ioservice)
64    {
65        //deadline_.expires_at(boost::posix_time::pos_infin);
66
67        fHeader.fDelimiter=kDelimiterStart;
68        fHeader.fState=FTM::kFtmIdle;
69        fHeader.fBoardId=0xaffe;
70        fHeader.fFirmwareId=0x42;
71
72        fDelimiter = htons(kDelimiterEnd);
73
74        fStaticData.clear();
75
76        fStaticData.fMultiplicityPhysics = 1;
77        fStaticData.fMultiplicityCalib   = 40;
78        fStaticData.fWindowCalib         = 1;
79        fStaticData.fWindowPhysics       = 0;
80        fStaticData.fDelayTrigger        = 21;
81        fStaticData.fDelayTimeMarker     = 42;
82        fStaticData.fDeadTime            = 84;
83
84        fStaticData.fClockConditioner[0] = 100;
85        fStaticData.fClockConditioner[1] = 1;
86        fStaticData.fClockConditioner[2] = 8;
87        fStaticData.fClockConditioner[3] = 9;
88        fStaticData.fClockConditioner[4] = 11;
89        fStaticData.fClockConditioner[5] = 13;
90        fStaticData.fClockConditioner[6] = 14;
91        fStaticData.fClockConditioner[7] = 15;
92
93        fStaticData.fTriggerSequence = 1 | (2<<5) | (3<<10);
94
95        fStaticData.fGeneralSettings =
96            FTM::StaticData::kTrigger |
97            FTM::StaticData::kLPext   |
98            FTM::StaticData::kPedestal;
99
100        fStaticData.fActiveFTU[0] = 0x3ff;
101        fStaticData.fActiveFTU[3] = 0x3ff;
102
103        for (int i=0; i<40; i++)
104        {
105            for (int p=0; p<4; p++)
106                fStaticData[i].fEnable[p] = 0x1ff;
107
108            for (int p=0; p<5; p++)
109                fStaticData[i].fDAC[p]    = (p+1)*10;
110
111            fStaticData[i].fPrescaling    = 42;
112        }
113
114        for (unsigned long long i=0; i<40; i++)
115        {
116            fFtuList[i].fDNA      = (i<<48)|(i<<32)|(i<<16)|i;
117            fFtuList[i].fPingAddr = (1<<8) | i;
118        }
119
120        fFtuList[1].fPingAddr = (1<<9) | 1;
121        fFtuList[0].fPingAddr = 0;
122
123        fFtuList.fNumBoards = 19;
124        fFtuList.fNumBoardsCrate[0] = 9;
125        fFtuList.fNumBoardsCrate[1] = 0;
126        fFtuList.fNumBoardsCrate[2] = 0;
127        fFtuList.fNumBoardsCrate[3] = 10;
128    }
129
130    // Callback when writing was successfull or failed
131    void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
132    {
133        cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
134    }
135
136    vector<uint16_t> fBufCommand;
137    vector<uint16_t> fBufHeader;
138    vector<uint16_t> fBufFtuList;
139    vector<uint16_t> fBufStaticData;
140    vector<uint16_t> fBufDynamicData;
141
142    vector<uint16_t> fCommand;
143    FTM::Header      fHeader;
144    FTM::FtuList     fFtuList;
145    FTM::StaticData  fStaticData;
146    FTM::DynamicData fDynamicData;
147
148    //vector<uint16_t> fStaticData;
149
150    uint16_t fDelimiter;
151    uint16_t fBufRegister;
152
153    uint16_t fCounter;
154    uint16_t fTimeStamp;
155
156    bool fReportsDisabled;
157
158    void SendDynamicData()
159    {
160        if (fReportsDisabled)
161            return;
162
163        //if (fHeader.fState == FTM::kFtmRunning)
164        //    fDynamicData.fOnTimeCounter = lrint(Time().UnixTime()-fStartTime);
165
166        fDynamicData.fTempSensor[0] = (23. + (6.*rand()/RAND_MAX-3))*10;
167        fDynamicData.fTempSensor[1] = (55. + (6.*rand()/RAND_MAX-3))*10;
168        fDynamicData.fTempSensor[2] = (39. + (6.*rand()/RAND_MAX-3))*10;
169        fDynamicData.fTempSensor[3] = (42. + (6.*rand()/RAND_MAX-3))*10;
170
171        for (int i=0; i<40; i++)
172            for (int p=0; p<4; p++)
173                fDynamicData[i].fRatePatch[p] = (1000 + (float(rand())/RAND_MAX-0.5)*25*p);
174
175        fHeader.fType=kDynamicData;     // FtuList
176        fHeader.fDataSize=sizeof(FTM::DynamicData)/2+1;
177        fHeader.fTriggerCounter = fCounter++;
178        fHeader.fTimeStamp = fTimeStamp++*1000000;//lrint(Time().UnixTime());
179
180        fBufHeader      = fHeader.HtoN();
181        fBufDynamicData = fDynamicData.HtoN();
182
183        AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0],      fBufHeader.size()*2)));
184        AsyncWrite(ba::buffer(ba::const_buffer(&fBufDynamicData[0], sizeof(FTM::DynamicData))));
185        AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
186    }
187
188    void SendStaticData()
189    {
190        fHeader.fType=kStaticData;     // FtuList
191        fHeader.fDataSize=sizeof(FTM::StaticData)/2+1;
192        fHeader.fTriggerCounter = fCounter++;
193        fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
194
195        for (int i=0; i<4; i++)
196            fFtuList.fActiveFTU[i] = fStaticData.fActiveFTU[i];
197
198        fBufHeader     = fHeader.HtoN();
199        fBufStaticData = fStaticData.HtoN();
200
201        AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0],     fBufHeader.size()*2)));
202        AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[0], fBufStaticData.size()*2)));
203        AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
204    }
205
206    void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
207    {
208        // Do not schedule a new read if the connection failed.
209        if (bytes_received==0)
210        {
211            // Close the connection
212            close();
213            return;
214        }
215
216        // No command received yet
217        if (fCommand.size()==0)
218        {
219            transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
220                      fBufCommand.begin(), ntohs);
221
222            if (fBufCommand[0]!='@')
223            {
224                cout << "Inavlid command: 0x" << hex << fBufCommand[0] << dec << endl;
225                cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
226                cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
227                return;
228            }
229
230            switch (fBufCommand[1])
231            {
232            case kCmdToggleLed:
233                cout << "-> TOGGLE_LED" << endl;
234
235                fBufCommand.resize(5);
236                AsyncRead(ba::buffer(fBufCommand));
237                return;
238
239            case kCmdPing:
240                cout << "-> PING" << endl;
241
242                fHeader.fType=kFtuList;     // FtuList
243                fHeader.fDataSize=sizeof(FTM::FtuList)/2+1;
244                fHeader.fTriggerCounter = fCounter++;
245                fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
246
247                fFtuList[1].fPingAddr = ((rand()&1)<<9) | 1;
248                fFtuList[0].fPingAddr = ((rand()&1)<<8);
249
250                fBufHeader  = fHeader.HtoN();
251                fBufFtuList = fFtuList.HtoN();
252
253                AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0],  fBufHeader.size()*2)));
254                AsyncWrite(ba::buffer(ba::const_buffer(&fBufFtuList[0], fBufFtuList.size()*2)));
255                AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
256
257                fBufCommand.resize(5);
258                AsyncRead(ba::buffer(fBufCommand));
259                return;
260
261            case kCmdRead: // kCmdRead
262                cout << "-> READ" << endl;
263                switch (fBufCommand[2])
264                {
265                case kCmdStaticData:
266                    cout << "-> STATIC" << endl;
267
268                    SendStaticData();
269
270                    fBufCommand.resize(5);
271                    AsyncRead(ba::buffer(fBufCommand));
272
273                    return;
274
275                case kCmdDynamicData:
276                    cout << "-> DYNAMIC" << endl;
277
278                    SendDynamicData();
279
280                    fBufCommand.resize(5);
281                    AsyncRead(ba::buffer(fBufCommand));
282
283                    return;
284
285                case kCmdRegister:
286                    fCommand = fBufCommand;
287                    cout << "-> REGISTER" << endl;
288
289                    fBufCommand.resize(1);
290                    AsyncRead(ba::buffer(fBufCommand));
291                    return;
292                }
293                break;
294
295
296            case kCmdWrite:
297                switch (fBufCommand[2])
298                {
299                case kCmdRegister:
300                    fCommand = fBufCommand;
301                    cout << "-> REGISTER" << endl;
302
303                    fBufCommand.resize(2);
304                    AsyncRead(ba::buffer(fBufCommand));
305                    return;
306
307                case kCmdStaticData:
308                    fCommand = fBufCommand;
309                    cout << "-> STATIC DATA" << endl;
310
311                    fBufCommand.resize(sizeof(StaticData)/2);
312                    AsyncRead(ba::buffer(fBufCommand));
313                    return;
314                }
315                break;
316
317            case kCmdDisableReports:
318                cout << "-> DISABLE REPORTS " << !fBufCommand[2] << endl;
319                fReportsDisabled = !fBufCommand[2];
320
321                fBufCommand.resize(5);
322                AsyncRead(ba::buffer(fBufCommand));
323                return;
324
325            case kCmdStartRun:
326                fHeader.fState = FTM::kFtmRunning;
327
328                fStartTime = Time().UnixTime();
329
330                fCounter = 0;
331                fTimeStamp = 0;
332
333                fBufCommand.resize(5);
334                AsyncRead(ba::buffer(fBufCommand));
335                return;
336
337            case kCmdStopRun:
338                fHeader.fState = FTM::kFtmIdle;
339
340                fCounter = 0;
341                fTimeStamp = 0;
342
343                fBufCommand.resize(5);
344                AsyncRead(ba::buffer(fBufCommand));
345                return;
346            }
347
348            cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
349            cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
350            return;
351        }
352
353        // Command data received
354
355        // Prepare reception of next command
356        switch (fCommand[1])
357        {
358        case kCmdRead: // kCmdRead
359            {
360                const uint16_t addr = ntohs(fBufCommand[0]);
361                const uint16_t val  = reinterpret_cast<uint16_t*>(&fStaticData)[addr];
362
363                cout << "-> GET REGISTER[" << addr << "]=" << val << endl;
364
365                fHeader.fType=kRegister;     // FtuList
366                fHeader.fDataSize=2;
367                fHeader.fTriggerCounter = fCounter++;
368                fHeader.fTimeStamp = fTimeStamp*1000000;//lrint(Time().UnixTime());
369
370                fBufHeader = fHeader.HtoN();
371                fBufStaticData[addr] = htons(val);
372
373                AsyncWrite(ba::buffer(ba::const_buffer(&fBufHeader[0], fBufHeader.size()*2)));
374                AsyncWrite(ba::buffer(ba::const_buffer(&fBufStaticData[addr], 2)));
375                AsyncWrite(ba::buffer(ba::const_buffer(&fDelimiter, 2)));
376                break;
377            }
378
379        case kCmdWrite:
380            switch (fCommand[2])
381            {
382            case kCmdRegister:
383                {
384                    const uint16_t addr = ntohs(fBufCommand[0]);
385                    const uint16_t val  = ntohs(fBufCommand[1]);
386
387                    cout << "-> SET REGISTER[" << addr << "]=" << val << endl;
388
389                    reinterpret_cast<uint16_t*>(&fStaticData)[addr] = val;
390                }
391                break;
392
393            case kCmdStaticData:
394                {
395                    cout << "-> SET STATIC DATA" << endl;
396                    fStaticData = fBufCommand;
397                }
398                break;
399            }
400            break;
401        }
402
403        fCommand.resize(0);
404
405        fBufCommand.resize(5);
406        AsyncRead(ba::buffer(fBufCommand));
407    }
408
409    void SendDynData(const boost::system::error_code &)
410    {
411        if (!is_open())
412        {
413            // For example: Here we could schedule a new accept if we
414            // would not want to allow two connections at the same time.
415            return;
416        }
417
418        if (ec==ba::error::basic_errors::operation_aborted)
419            return;
420
421        // Check whether the deadline has passed. We compare the deadline
422        // against the current time since a new asynchronous operation
423        // may have moved the deadline before this actor had a chance
424        // to run.
425
426        if (fTriggerDynData.expires_at() > ba::deadline_timer::traits_type::now())
427            return;
428
429        // The deadline has passed.
430        SendDynamicData();
431
432        AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
433    }
434
435public:
436    typedef boost::shared_ptr<tcp_connection> shared_ptr;
437
438    static shared_ptr create(ba::io_service& io_service)
439    {
440        return shared_ptr(new tcp_connection(io_service));
441    }
442
443    void start()
444    {
445        // Ownership of buffer must be valid until Handler is called.
446
447        // Emit something to be written to the socket
448        fBufCommand.resize(5);
449        AsyncRead(ba::buffer(fBufCommand));
450
451        AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
452
453//        AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
454//        AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
455
456    }
457};
458
459
460class tcp_server : public tcp::acceptor
461{
462public:
463    tcp_server(ba::io_service& ioservice, int port) :
464        tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
465
466    {
467        // We could start listening for more than one connection
468        // here, but since there is only one handler executed each time
469        // it would not make sense. Before one handle_accept is not
470        // finished no new handle_accept will be called.
471        // Workround: Start a new thread in handle_accept
472        start_accept();
473    }
474
475private:
476    void start_accept()
477    {
478        cout << "Start accept..." << flush;
479        tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/io_service());
480
481        // This will accept a connection without blocking
482        async_accept(*new_connection,
483                     boost::bind(&tcp_server::handle_accept,
484                                 this,
485                                 new_connection,
486                                 ba::placeholders::error));
487
488        cout << "start-done." << endl;
489    }
490
491    void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
492    {
493        // The connection has been accepted and is now ready to use
494
495        // not installing a new handler will stop run()
496        cout << "Handle accept..." << flush;
497        if (!error)
498        {
499            new_connection->start();
500
501            // The is now an open connection/server (tcp_connection)
502            // we immediatly schedule another connection
503            // This allowed two client-connection at the same time
504            start_accept();
505        }
506        cout << "handle-done." << endl;
507    }
508};
509
510int main(int argc, const char **argv)
511{
512    try
513    {
514        ba::io_service io_service;
515
516        Port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
517
518        tcp_server server(io_service, Port);
519        //  ba::add_service(io_service, &server);
520        //  server.add_service(...);
521        cout << "Run..." << flush;
522
523        // Calling run() from a single thread ensures no concurrent access
524        // of the handler which are called!!!
525        io_service.run();
526
527        cout << "end." << endl;
528    }
529    catch (std::exception& e)
530    {
531        std::cerr << e.what() << std::endl;
532    }
533
534    return 0;
535}
536/*  ====================== Buffers ===========================
537
538char d1[128]; ba::buffer(d1));
539std::vector<char> d2(128); ba::buffer(d2);
540boost::array<char, 128> d3; by::buffer(d3);
541
542// --------------------------------
543char d1[128];
544std::vector<char> d2(128);
545boost::array<char, 128> d3;
546
547boost::array<mutable_buffer, 3> bufs1 = {
548   ba::buffer(d1),
549   ba::buffer(d2),
550   ba::buffer(d3) };
551sock.read(bufs1);
552
553std::vector<const_buffer> bufs2;
554bufs2.push_back(boost::asio::buffer(d1));
555bufs2.push_back(boost::asio::buffer(d2));
556bufs2.push_back(boost::asio::buffer(d3));
557sock.write(bufs2);
558
559
560// ======================= Read functions =========================
561
562ba::async_read_until --> delimiter
563
564streambuf buf; // Ensure validity until handler!
565by::async_read(s, buf, ....);
566
567ba::async_read(s, ba:buffer(data, size), handler);
568 // Single buffer
569 boost::asio::async_read(s,
570                         ba::buffer(data, size),
571 compl-func -->          ba::transfer_at_least(32),
572                         handler);
573
574 // Multiple buffers
575boost::asio::async_read(s, buffers,
576 compl-func -->         boost::asio::transfer_all(),
577                        handler);
578                        */
579
580// ================= Others ===============================
581
582        /*
583        strand   Provides serialised handler execution.
584        work     Class to inform the io_service when it has work to do.
585
586
587io_service::
588dispatch   Request the io_service to invoke the given handler.
589poll       Run the io_service's event processing loop to execute ready
590           handlers.
591poll_one   Run the io_service's event processing loop to execute one ready
592           handler.
593post       Request the io_service to invoke the given handler and return
594           immediately.
595reset      Reset the io_service in preparation for a subsequent run()
596           invocation.
597run        Run the io_service's event processing loop.
598run_one    Run the io_service's event processing loop to execute at most
599           one handler.
600stop       Stop the io_service's event processing loop.
601wrap       Create a new handler that automatically dispatches the wrapped
602           handler on the io_service.
603
604strand::         The io_service::strand class provides the ability to
605                 post and dispatch handlers with the guarantee that none
606                 of those handlers will execute concurrently.
607
608dispatch         Request the strand to invoke the given handler.
609get_io_service   Get the io_service associated with the strand.
610post             Request the strand to invoke the given handler and return
611                 immediately.
612wrap             Create a new handler that automatically dispatches the
613                 wrapped handler on the strand.
614
615work::           The work class is used to inform the io_service when
616                 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
617get_io_service   Get the io_service associated with the work.
618work             Constructor notifies the io_service that work is starting.
619
620*/
621
622
Note: See TracBrowser for help on using the repository browser.