source: trunk/FACT++/src/dserver2.cc @ 10365

Last change on this file since 10365 was 10183, checked in by tbretz, 10 years ago
New import.
File size: 10.2 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12
13using namespace std;
14
15namespace ba    = boost::asio;
16namespace bs    = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22int Port = 0;
23
24class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
25{
26
27private:
28    void AsyncRead(ba::mutable_buffers_1 buffers)
29    {
30        ba::async_read(*this, buffers,
31                       boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
32                                   dummy::error, dummy::bytes_transferred));
33    }
34
35    void AsyncWrite(ba::const_buffers_1 buffers)
36    {
37        ba::async_write(*this, buffers,
38                        boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
39                                    dummy::error, dummy::bytes_transferred));
40    }
41    void AsyncWait(ba::deadline_timer &timer, int seconds,
42                               void (tcp_connection::*handler)(const bs::error_code&))// const
43    {
44        timer.expires_from_now(boost::posix_time::seconds(seconds));
45        timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
46    }
47
48    static int inst;
49    int instance;
50    ba::deadline_timer deadline_;
51
52    std::string message_;
53    std::string message2;
54    std::string msg_in;
55
56    char mybuffer[10000];
57
58    int state;
59
60    // The constructor is prvate to force the obtained pointer to be shared
61    tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
62        instance(inst++), deadline_(ioservice)
63    {
64        deadline_.expires_at(boost::posix_time::pos_infin);
65        state=0;
66    }
67
68    // Callback when writing was successfull or failed
69    void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
70    {
71        cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
72    }
73
74    void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
75    {
76        string str = string(mybuffer, bytes_received);
77        cout << "Received b=" << bytes_received << ": '" << str << "' " << error.message() << " (" << error << ")" << endl;
78
79        // Do not schedule a new read if the connection failed.
80        if (bytes_received==0)
81        {
82            // Close the connection
83            close();
84            deadline_.cancel();
85            return;
86        }
87
88        if (strcmp(mybuffer, "START")==0 && state==0)
89            state = 1;
90
91        if (strcmp(mybuffer, "STOP")==0 && state==1)
92            state = 0;
93
94        if (strcmp(mybuffer, "TIME")==0)
95        {
96            stringstream msg;
97            msg << "s-" << Port << ": " << Time() << " 9";
98            message2 = msg.str();
99
100            AsyncWrite(ba::buffer(message2.c_str(), 37));
101
102            cout << msg.str() << endl;
103        }
104
105        AsyncRead(ba::buffer(mybuffer, 10));
106    }
107
108    void check_deadline(const boost::system::error_code &)
109    {
110        if (!is_open())
111        {
112            // For example: Here we could schedule a new accept if we
113            // would not want to allow two connections at the same time.
114            return;
115        }
116
117        // Check whether the deadline has passed. We compare the deadline
118        // against the current time since a new asynchronous operation
119        // may have moved the deadline before this actor had a chance
120        // to run.
121        if (deadline_.expires_at() <= ba::deadline_timer::traits_type::now())
122        {
123            stringstream str;
124            str << "s-" << Port << ": " << Time() << " " << state;
125
126            message_ = str.str();
127
128            // The deadline has passed. Stop the session. The other
129            // actors will terminate as soon as possible.
130            AsyncWrite(ba::buffer(message_.c_str(), 37));
131            AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
132
133            cout << str.str() << endl;
134
135            return;
136        }
137
138        AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
139    }
140
141public:
142    typedef boost::shared_ptr<tcp_connection> shared_ptr;
143
144    static shared_ptr create(ba::io_service& io_service)
145    {
146        return shared_ptr(new tcp_connection(io_service));
147    }
148
149    void start()
150    {
151        message_ = "This is the first msg. ";
152
153        // Ownership of buffer must be valid until Handler is called.
154
155        // Emit something to be written to the socket
156        AsyncRead(ba::buffer(mybuffer, 10));
157
158        // async_read_until
159        AsyncWrite(ba::buffer(message_));
160
161        // Whenever the time expires we will schedule a new message to be sent
162        AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
163    }
164};
165
166
167int tcp_connection::inst = 0;
168
169
170class tcp_server : public tcp::acceptor
171{
172public:
173    tcp_server(ba::io_service& ioservice, int port) :
174        tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
175
176    {
177        // We could start listening for more than one connection
178        // here, but since there is only one handler executed each time
179        // it would not make sense. Before one handle_accept is not
180        // finished no new handle_accept will be called.
181        // Workround: Start a new thread in handle_accept
182        start_accept();
183    }
184
185private:
186    void start_accept()
187    {
188        cout << "Start accept..." << flush;
189        tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/io_service());
190
191        // This will accept a connection without blocking
192        async_accept(*new_connection,
193                     boost::bind(&tcp_server::handle_accept,
194                                 this,
195                                 new_connection,
196                                 ba::placeholders::error));
197
198        cout << "start-done." << endl;
199    }
200
201    void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
202    {
203        // The connection has been accepted and is now ready to use
204
205        // not installing a new handler will stop run()
206        cout << "Handle accept..." << flush;
207        if (!error)
208        {
209
210            new_connection->start();
211
212            // The is now an open connection/server (tcp_connection)
213            // we immediatly schedule another connection
214            // This allowed two client-connection at the same time
215            start_accept();
216        }
217        cout << "handle-done." << endl;
218    }
219};
220
221int main(int argc, const char **argv)
222{
223    try
224    {
225        ba::io_service io_service;
226
227        Port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
228
229        tcp_server server(io_service, Port);
230        //  ba::add_service(io_service, &server);
231        //  server.add_service(...);
232        cout << "Run..." << flush;
233
234        // Calling run() from a single thread ensures no concurrent access
235        // of the handler which are called!!!
236        io_service.run();
237
238        cout << "end." << endl;
239    }
240    catch (std::exception& e)
241    {
242        std::cerr << e.what() << std::endl;
243    }
244
245    return 0;
246}
247/*  ====================== Buffers ===========================
248
249char d1[128]; ba::buffer(d1));
250std::vector<char> d2(128); ba::buffer(d2);
251boost::array<char, 128> d3; by::buffer(d3);
252
253// --------------------------------
254char d1[128];
255std::vector<char> d2(128);
256boost::array<char, 128> d3;
257
258boost::array<mutable_buffer, 3> bufs1 = {
259   ba::buffer(d1),
260   ba::buffer(d2),
261   ba::buffer(d3) };
262sock.read(bufs1);
263
264std::vector<const_buffer> bufs2;
265bufs2.push_back(boost::asio::buffer(d1));
266bufs2.push_back(boost::asio::buffer(d2));
267bufs2.push_back(boost::asio::buffer(d3));
268sock.write(bufs2);
269
270
271// ======================= Read functions =========================
272
273ba::async_read_until --> delimiter
274
275streambuf buf; // Ensure validity until handler!
276by::async_read(s, buf, ....);
277
278ba::async_read(s, ba:buffer(data, size), handler);
279 // Single buffer
280 boost::asio::async_read(s,
281                         ba::buffer(data, size),
282 compl-func -->          ba::transfer_at_least(32),
283                         handler);
284
285 // Multiple buffers
286boost::asio::async_read(s, buffers,
287 compl-func -->         boost::asio::transfer_all(),
288                        handler);
289                        */
290
291// ================= Others ===============================
292
293        /*
294        strand   Provides serialised handler execution.
295        work     Class to inform the io_service when it has work to do.
296
297
298io_service::
299dispatch   Request the io_service to invoke the given handler.
300poll       Run the io_service's event processing loop to execute ready
301           handlers.
302poll_one   Run the io_service's event processing loop to execute one ready
303           handler.
304post       Request the io_service to invoke the given handler and return
305           immediately.
306reset      Reset the io_service in preparation for a subsequent run()
307           invocation.
308run        Run the io_service's event processing loop.
309run_one    Run the io_service's event processing loop to execute at most
310           one handler.
311stop       Stop the io_service's event processing loop.
312wrap       Create a new handler that automatically dispatches the wrapped
313           handler on the io_service.
314
315strand::         The io_service::strand class provides the ability to
316                 post and dispatch handlers with the guarantee that none
317                 of those handlers will execute concurrently.
318
319dispatch         Request the strand to invoke the given handler.
320get_io_service   Get the io_service associated with the strand.
321post             Request the strand to invoke the given handler and return
322                 immediately.
323wrap             Create a new handler that automatically dispatches the
324                 wrapped handler on the strand.
325
326work::           The work class is used to inform the io_service when
327                 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
328get_io_service   Get the io_service associated with the work.
329work             Constructor notifies the io_service that work is starting.
330
331*/
332
333
Note: See TracBrowser for help on using the repository browser.