source: trunk/FACT++/src/dserver2.cc@ 16734

Last change on this file since 16734 was 14976, checked in by tbretz, 12 years ago
Replaced deprecated io_service() by get_io_service()
File size: 10.2 KB
Line 
1#include <iostream>
2#include <string>
3#include <boost/asio.hpp>
4#include <boost/bind.hpp>
5#include <boost/lexical_cast.hpp>
6#include <boost/asio/deadline_timer.hpp>
7#include <boost/enable_shared_from_this.hpp>
8
9using boost::lexical_cast;
10
11#include "Time.h"
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22int Port = 0;
23
24class tcp_connection : public ba::ip::tcp::socket, public boost::enable_shared_from_this<tcp_connection>
25{
26
27private:
28 void AsyncRead(ba::mutable_buffers_1 buffers)
29 {
30 ba::async_read(*this, buffers,
31 boost::bind(&tcp_connection::HandleReceivedData, shared_from_this(),
32 dummy::error, dummy::bytes_transferred));
33 }
34
35 void AsyncWrite(ba::const_buffers_1 buffers)
36 {
37 ba::async_write(*this, buffers,
38 boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
39 dummy::error, dummy::bytes_transferred));
40 }
41 void AsyncWait(ba::deadline_timer &timer, int seconds,
42 void (tcp_connection::*handler)(const bs::error_code&))// const
43 {
44 timer.expires_from_now(boost::posix_time::seconds(seconds));
45 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
46 }
47
48 static int inst;
49 int instance;
50 ba::deadline_timer deadline_;
51
52 std::string message_;
53 std::string message2;
54 std::string msg_in;
55
56 char mybuffer[10000];
57
58 int state;
59
60 // The constructor is prvate to force the obtained pointer to be shared
61 tcp_connection(ba::io_service& ioservice) : ba::ip::tcp::socket(ioservice),
62 instance(inst++), deadline_(ioservice)
63 {
64 deadline_.expires_at(boost::posix_time::pos_infin);
65 state=0;
66 }
67
68 // Callback when writing was successfull or failed
69 void HandleSentData(const boost::system::error_code& error, size_t bytes_transferred)
70 {
71 cout << "Data sent: (transmitted=" << bytes_transferred << ") rc=" << error.message() << " (" << error << ")" << endl;
72 }
73
74 void HandleReceivedData(const boost::system::error_code& error, size_t bytes_received)
75 {
76 string str = string(mybuffer, bytes_received);
77 cout << "Received b=" << bytes_received << ": '" << str << "' " << error.message() << " (" << error << ")" << endl;
78
79 // Do not schedule a new read if the connection failed.
80 if (bytes_received==0)
81 {
82 // Close the connection
83 close();
84 deadline_.cancel();
85 return;
86 }
87
88 if (strcmp(mybuffer, "START")==0 && state==0)
89 state = 1;
90
91 if (strcmp(mybuffer, "STOP")==0 && state==1)
92 state = 0;
93
94 if (strcmp(mybuffer, "TIME")==0)
95 {
96 stringstream msg;
97 msg << "s-" << Port << ": " << Time() << " 9";
98 message2 = msg.str();
99
100 AsyncWrite(ba::buffer(message2.c_str(), 37));
101
102 cout << msg.str() << endl;
103 }
104
105 AsyncRead(ba::buffer(mybuffer, 10));
106 }
107
108 void check_deadline(const boost::system::error_code &)
109 {
110 if (!is_open())
111 {
112 // For example: Here we could schedule a new accept if we
113 // would not want to allow two connections at the same time.
114 return;
115 }
116
117 // Check whether the deadline has passed. We compare the deadline
118 // against the current time since a new asynchronous operation
119 // may have moved the deadline before this actor had a chance
120 // to run.
121 if (deadline_.expires_at() <= ba::deadline_timer::traits_type::now())
122 {
123 stringstream str;
124 str << "s-" << Port << ": " << Time() << " " << state;
125
126 message_ = str.str();
127
128 // The deadline has passed. Stop the session. The other
129 // actors will terminate as soon as possible.
130 AsyncWrite(ba::buffer(message_.c_str(), 37));
131 AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
132
133 cout << str.str() << endl;
134
135 return;
136 }
137
138 AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
139 }
140
141public:
142 typedef boost::shared_ptr<tcp_connection> shared_ptr;
143
144 static shared_ptr create(ba::io_service& io_service)
145 {
146 return shared_ptr(new tcp_connection(io_service));
147 }
148
149 void start()
150 {
151 message_ = "This is the first msg. ";
152
153 // Ownership of buffer must be valid until Handler is called.
154
155 // Emit something to be written to the socket
156 AsyncRead(ba::buffer(mybuffer, 10));
157
158 // async_read_until
159 AsyncWrite(ba::buffer(message_));
160
161 // Whenever the time expires we will schedule a new message to be sent
162 AsyncWait(deadline_, 3, &tcp_connection::check_deadline);
163 }
164};
165
166
167int tcp_connection::inst = 0;
168
169
170class tcp_server : public tcp::acceptor
171{
172public:
173 tcp_server(ba::io_service& ioservice, int port) :
174 tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
175
176 {
177 // We could start listening for more than one connection
178 // here, but since there is only one handler executed each time
179 // it would not make sense. Before one handle_accept is not
180 // finished no new handle_accept will be called.
181 // Workround: Start a new thread in handle_accept
182 start_accept();
183 }
184
185private:
186 void start_accept()
187 {
188 cout << "Start accept..." << flush;
189 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/get_io_service());
190
191 // This will accept a connection without blocking
192 async_accept(*new_connection,
193 boost::bind(&tcp_server::handle_accept,
194 this,
195 new_connection,
196 ba::placeholders::error));
197
198 cout << "start-done." << endl;
199 }
200
201 void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code& error)
202 {
203 // The connection has been accepted and is now ready to use
204
205 // not installing a new handler will stop run()
206 cout << "Handle accept..." << flush;
207 if (!error)
208 {
209
210 new_connection->start();
211
212 // The is now an open connection/server (tcp_connection)
213 // we immediatly schedule another connection
214 // This allowed two client-connection at the same time
215 start_accept();
216 }
217 cout << "handle-done." << endl;
218 }
219};
220
221int main(int argc, const char **argv)
222{
223 try
224 {
225 ba::io_service io_service;
226
227 Port = argc==2 ? lexical_cast<int>(argv[1]) : 5000;
228
229 tcp_server server(io_service, Port);
230 // ba::add_service(io_service, &server);
231 // server.add_service(...);
232 cout << "Run..." << flush;
233
234 // Calling run() from a single thread ensures no concurrent access
235 // of the handler which are called!!!
236 io_service.run();
237
238 cout << "end." << endl;
239 }
240 catch (std::exception& e)
241 {
242 std::cerr << e.what() << std::endl;
243 }
244
245 return 0;
246}
247/* ====================== Buffers ===========================
248
249char d1[128]; ba::buffer(d1));
250std::vector<char> d2(128); ba::buffer(d2);
251boost::array<char, 128> d3; by::buffer(d3);
252
253// --------------------------------
254char d1[128];
255std::vector<char> d2(128);
256boost::array<char, 128> d3;
257
258boost::array<mutable_buffer, 3> bufs1 = {
259 ba::buffer(d1),
260 ba::buffer(d2),
261 ba::buffer(d3) };
262sock.read(bufs1);
263
264std::vector<const_buffer> bufs2;
265bufs2.push_back(boost::asio::buffer(d1));
266bufs2.push_back(boost::asio::buffer(d2));
267bufs2.push_back(boost::asio::buffer(d3));
268sock.write(bufs2);
269
270
271// ======================= Read functions =========================
272
273ba::async_read_until --> delimiter
274
275streambuf buf; // Ensure validity until handler!
276by::async_read(s, buf, ....);
277
278ba::async_read(s, ba:buffer(data, size), handler);
279 // Single buffer
280 boost::asio::async_read(s,
281 ba::buffer(data, size),
282 compl-func --> ba::transfer_at_least(32),
283 handler);
284
285 // Multiple buffers
286boost::asio::async_read(s, buffers,
287 compl-func --> boost::asio::transfer_all(),
288 handler);
289 */
290
291// ================= Others ===============================
292
293 /*
294 strand Provides serialised handler execution.
295 work Class to inform the io_service when it has work to do.
296
297
298io_service::
299dispatch Request the io_service to invoke the given handler.
300poll Run the io_service's event processing loop to execute ready
301 handlers.
302poll_one Run the io_service's event processing loop to execute one ready
303 handler.
304post Request the io_service to invoke the given handler and return
305 immediately.
306reset Reset the io_service in preparation for a subsequent run()
307 invocation.
308run Run the io_service's event processing loop.
309run_one Run the io_service's event processing loop to execute at most
310 one handler.
311stop Stop the io_service's event processing loop.
312wrap Create a new handler that automatically dispatches the wrapped
313 handler on the io_service.
314
315strand:: The io_service::strand class provides the ability to
316 post and dispatch handlers with the guarantee that none
317 of those handlers will execute concurrently.
318
319dispatch Request the strand to invoke the given handler.
320get_io_service Get the io_service associated with the strand.
321post Request the strand to invoke the given handler and return
322 immediately.
323wrap Create a new handler that automatically dispatches the
324 wrapped handler on the strand.
325
326work:: The work class is used to inform the io_service when
327 work starts and finishes. This ensures that the io_service's run() function will not exit while work is underway, and that it does exit when there is no unfinished work remaining.
328get_io_service Get the io_service associated with the work.
329work Constructor notifies the io_service that work is starting.
330
331*/
332
333
Note: See TracBrowser for help on using the repository browser.