source: trunk/FACT++/src/fad.cc@ 10837

Last change on this file since 10837 was 10826, checked in by tbretz, 14 years ago
File size: 14.6 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12#include "Converter.h"
13
14#include "HeadersFAD.h"
15
16using namespace std;
17using namespace FAD;
18
19namespace ba = boost::asio;
20namespace bs = boost::system;
21namespace dummy = ba::placeholders;
22
23using boost::lexical_cast;
24using ba::ip::tcp;
25
26int Port = 0;
27
28// ------------------------------------------------------------------------
29
30
31// ------------------------------------------------------------------------
32
33class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
34{
35private:
36
37 double fStartTime;
38 uint32_t fRunNumber;
39
40 void AsyncRead(ba::mutable_buffers_1 buffers)
41 {
42 ba::async_read(*this, buffers,
43 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
44 dummy::error, dummy::bytes_transferred));
45 }
46
47 void AsyncWrite(const ba::const_buffers_1 &buffers)
48 {
49 ba::async_write(*this, buffers,
50 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
51 dummy::error, dummy::bytes_transferred));
52 }
53 void AsyncWait(ba::deadline_timer &timer, int seconds,
54 void (tcp_connection::*handler)(const bs::error_code&))// const
55 {
56 timer.expires_from_now(boost::posix_time::seconds(seconds));
57 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
58 }
59
60 // The constructor is prvate to force the obtained pointer to be shared
61 tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
62 fTriggerSendData(ioservice)
63 {
64 }
65
66 // Callback when writing was successfull or failed
67 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
68 {
69 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
70 }
71
72 vector<uint16_t> fBufCommand;
73 vector<uint16_t> fBuffer;
74
75 vector<uint16_t> fCommand;
76
77 FAD::EventHeader fHeader;
78 FAD::ChannelHeader fChHeader[kNumChannels];
79
80 ba::deadline_timer fTriggerSendData;
81
82 bool fTriggerEnabled;
83
84 void SendData()
85 {
86 if (!fTriggerEnabled)
87 return;
88
89 fHeader.fPackageLength = sizeof(EventHeader)/2+1;
90 fHeader.fEventCounter++;
91 fHeader.fTimeStamp = uint32_t((Time(Time::utc).UnixTime()-fStartTime)*10000);
92
93
94 fBuffer.resize(0);
95
96 for (int i=0; i<kNumChannels; i++)
97 {
98 fChHeader[i].fStartCell = i*10;
99
100 const vector<uint16_t> buf = fChHeader[i].HtoN();
101
102 fBuffer.insert(fBuffer.end(), buf.begin(), buf.end());
103 fBuffer.insert(fBuffer.end(), fChHeader[i].fRegionOfInterest, 0x42);
104
105 fHeader.fPackageLength += sizeof(ChannelHeader)/2;
106 fHeader.fPackageLength += fChHeader[i].fRegionOfInterest;
107 }
108
109 fBuffer.push_back(htons(FAD::kDelimiterEnd));
110
111 const vector<uint16_t> h = fHeader.HtoN();
112
113 fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
114
115 AsyncWrite(ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
116 }
117
118 void TriggerSendData(const boost::system::error_code &ec)
119 {
120 if (!is_open())
121 {
122 // For example: Here we could schedule a new accept if we
123 // would not want to allow two connections at the same time.
124 return;
125 }
126
127 if (ec==ba::error::basic_errors::operation_aborted)
128 return;
129
130 // Check whether the deadline has passed. We compare the deadline
131 // against the current time since a new asynchronous operation
132 // may have moved the deadline before this actor had a chance
133 // to run.
134 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
135 return;
136
137 // The deadline has passed.
138 SendData();
139
140 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
141 }
142
143 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
144 {
145 // Do not schedule a new read if the connection failed.
146 if (bytes_received==0)
147 {
148 // Close the connection
149 close();
150 return;
151 }
152
153 // No command received yet
154 if (fCommand.size()==0)
155 {
156 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
157 fBufCommand.begin(), ntohs);
158
159 switch (fBufCommand[0])
160 {
161 case kCmdDrsEnable:
162 case kCmdDrsEnable+0x100:
163 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==kCmdDrsEnable);
164 cout << "-> DrsEnable" << endl;
165 break;
166
167 case kCmdDwrite:
168 case kCmdDwrite+0x100:
169 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==kCmdDwrite);
170 cout << "-> Dwrite" << endl;
171 break;
172
173 case kCmdTriggerLine:
174 case kCmdTriggerLine+0x100:
175 cout << "-> Trigger line" << endl;
176 fTriggerEnabled = fBufCommand[0]==kCmdTriggerLine;
177 break;
178
179 case kCmdSclk:
180 case kCmdSclk+0x100:
181 cout << "-> Sclk" << endl;
182 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==kCmdSclk);
183 break;
184
185 case kCmdSrclk:
186 case kCmdSrclk+0x100:
187 cout << "-> Drclk" << endl;
188 break;
189
190 case kCmdRun:
191 case kCmdRun+0x100:
192 cout << "-> Run" << endl;
193 break;
194
195 case kCmdContTriggerOn:
196 case kCmdContTriggerOff:
197 if (fBufCommand[0]==kCmdContTriggerOn)
198 AsyncWait(fTriggerSendData, 1, &tcp_connection::TriggerSendData);
199 else
200 fTriggerSendData.cancel();
201 cout << "-> ContTrig" << endl;
202 break;
203
204 case kCmdResetTriggerId:
205 cout << "-> Reset" << endl;
206 fHeader.fEventCounter = 0;
207 break;
208
209 case kCmdSingleTrigger:
210 cout << "-> Trigger" << endl;
211 SendData();
212 break;
213
214 default:
215 if (fBufCommand[0]>=kCmdWriteRoi && fBufCommand[0]<kCmdWriteRoi+kNumChannels)
216 {
217 fCommand.resize(2);
218 fCommand[0] = kCmdWriteRoi;
219 fCommand[1] = fBufCommand[0]-kCmdWriteRoi;
220 break;
221 }
222 if (fBufCommand[0]>= kCmdWriteDac && fBufCommand[0]<kCmdWriteDac+kNumDac)
223 {
224 fCommand.resize(2);
225 fCommand[0] = kCmdWriteDac;
226 fCommand[1] = fBufCommand[0]-kCmdWriteDac;
227 break;
228 }
229
230 cout << "Received b=" << bytes_received << ": " << error.message() << " (" << error << ")" << endl;
231 cout << "Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
232 return;
233 }
234
235 fBufCommand.resize(1);
236 AsyncRead(ba::buffer(fBufCommand));
237 return;
238 }
239
240 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
241 fBufCommand.begin(), ntohs);
242
243 switch (fCommand[0])
244 {
245 case kCmdWriteRoi:
246 cout << "-> Set Roi[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
247 fChHeader[fCommand[1]].fRegionOfInterest = fBufCommand[0];
248 break;
249
250 case kCmdWriteDac:
251 cout << "-> Set Dac[" << fCommand[1] << "]=" << fBufCommand[0] << endl;
252 fHeader.fDac[fCommand[1]] = fBufCommand[0];
253 break;
254 }
255
256 fCommand.resize(0);
257
258 fBufCommand.resize(1);
259 AsyncRead(ba::buffer(fBufCommand));
260 }
261
262public:
263 typedef boost::shared_ptr<tcp_connection> shared_ptr;
264
265 static shared_ptr create(ba::io_service& io_service)
266 {
267 return shared_ptr(new tcp_connection(io_service));
268 }
269
270 void start()
271 {
272 // Ownership of buffer must be valid until Handler is called.
273
274 fHeader.fStartDelimiter = FAD::kDelimiterStart;
275 fHeader.fVersion = 0x104;
276 fHeader.fStatus = 0xf<<12 |
277 FAD::EventHeader::kDenable |
278 FAD::EventHeader::kDwrite |
279 FAD::EventHeader::kDcmLocked |
280 FAD::EventHeader::kDcmReady |
281 FAD::EventHeader::kSpiSclk;
282
283 fStartTime = Time(Time::utc).UnixTime();
284
285 for (int i=0; i<kNumChannels; i++)
286 {
287 fChHeader[i].fId = (i%9) | ((i/9)<<4);
288 fChHeader[i].fRegionOfInterest = 0;
289 }
290
291 // Emit something to be written to the socket
292 fBufCommand.resize(1);
293 AsyncRead(ba::buffer(fBufCommand));
294
295// AsyncWait(fTriggerDynData, 1, &tcp_connection::SendDynData);
296
297// AsyncWrite(ba::buffer(ba::const_buffer(&fHeader, sizeof(FTM::Header))));
298// AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
299
300 }
301};
302
303
304class tcp_server : public tcp::acceptor
305{
306public:
307 tcp_server(ba::io_service& ioservice, int port) :
308 tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
309
310 {
311 // We could start listening for more than one connection
312 // here, but since there is only one handler executed each time
313 // it would not make sense. Before one handle_accept is not
314 // finished no new handle_accept will be called.
315 // Workround: Start a new thread in handle_accept
316 start_accept();
317 }
318
319private:
320 void start_accept()
321 {
322 cout << "Start accept..." << flush;
323 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/io_service());
324
325 // This will accept a connection without blocking
326 async_accept(*new_connection,
327 boost::bind(&tcp_server::handle_accept,
328 this,
329 new_connection,
330 ba::placeholders::error));
331
332 cout << "start-done." << endl;
333 }
334
335 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
336 {
337 // The connection has been accepted and is now ready to use
338
339 // not installing a new handler will stop run()
340 cout << "Handle accept..." << flush;
341 if (!error)
342 {
343 new_connection->start();
344
345 // The is now an open connection/server (tcp_connection)
346 // we immediatly schedule another connection
347 // This allowed two client-connection at the same time
348 start_accept();
349 }
350 cout << "handle-done." << endl;
351 }
352};
353
354int main(int argc, const char **argv)
355{
356 try
357 {
358 ba::io_service io_service;
359
360 Port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
361
362 tcp_server server(io_service, Port);
363 // ba::add_service(io_service, &server);
364 // server.add_service(...);
365 cout << "Run..." << flush;
366
367 // Calling run() from a single thread ensures no concurrent access
368 // of the handler which are called!!!
369 io_service.run();
370
371 cout << "end." << endl;
372 }
373 catch (std::exception& e)
374 {
375 std::cerr << e.what() << std::endl;
376 }
377
378 return 0;
379}
380/* ====================== Buffers ===========================
381
382char d1[128]; ba::buffer(d1));
383std::vector<char> d2(128); ba::buffer(d2);
384boost::array<char, 128> d3; by::buffer(d3);
385
386// --------------------------------
387char d1[128];
388std::vector<char> d2(128);
389boost::array<char, 128> d3;
390
391boost::array<mutable_buffer, 3> bufs1 = {
392 ba::buffer(d1),
393 ba::buffer(d2),
394 ba::buffer(d3) };
395sock.read(bufs1);
396
397std::vector<const_buffer> bufs2;
398bufs2.push_back(boost::asio::buffer(d1));
399bufs2.push_back(boost::asio::buffer(d2));
400bufs2.push_back(boost::asio::buffer(d3));
401sock.write(bufs2);
402
403
404// ======================= Read functions =========================
405
406ba::async_read_until --> delimiter
407
408streambuf buf; // Ensure validity until handler!
409by::async_read(s, buf, ....);
410
411ba::async_read(s, ba:buffer(data, size), handler);
412 // Single buffer
413 boost::asio::async_read(s,
414 ba::buffer(data, size),
415 compl-func --> ba::transfer_at_least(32),
416 handler);
417
418 // Multiple buffers
419boost::asio::async_read(s, buffers,
420 compl-func --> boost::asio::transfer_all(),
421 handler);
422 */
423
424// ================= Others ===============================
425
426 /*
427 strand Provides serialised handler execution.
428 work Class to inform the io_service when it has work to do.
429
430
431io_service::
432dispatch Request the io_service to invoke the given handler.
433poll Run the io_service's event processing loop to execute ready
434 handlers.
435poll_one Run the io_service's event processing loop to execute one ready
436 handler.
437post Request the io_service to invoke the given handler and return
438 immediately.
439reset Reset the io_service in preparation for a subsequent run()
440 invocation.
441run Run the io_service's event processing loop.
442run_one Run the io_service's event processing loop to execute at most
443 one handler.
444stop Stop the io_service's event processing loop.
445wrap Create a new handler that automatically dispatches the wrapped
446 handler on the io_service.
447
448strand:: The io_service::strand class provides the ability to
449 post and dispatch handlers with the guarantee that none
450 of those handlers will execute concurrently.
451
452dispatch Request the strand to invoke the given handler.
453get_io_service Get the io_service associated with the strand.
454post Request the strand to invoke the given handler and return
455 immediately.
456wrap Create a new handler that automatically dispatches the
457 wrapped handler on the strand.
458
459work:: The work class is used to inform the io_service when
460 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
461get_io_service Get the io_service associated with the work.
462work Constructor notifies the io_service that work is starting.
463
464*/
465
466
Note: See TracBrowser for help on using the repository browser.