source: trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc@ 9395

Last change on this file since 9395 was 8869, checked in by tbretz, 17 years ago
*** empty log message ***
File size: 11.5 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <errno.h>
6
7#include <TSocket.h>
8#include <TServerSocket.h>
9
10#include "MLog.h"
11#include "MLogManip.h"
12
13#include "MString.h"
14#include "MTimeout.h"
15
16#undef DEBUG
17
18using namespace std;
19
20 /*
21 enum ESockOptions {
22 kSendBuffer, // size of send buffer
23 kRecvBuffer, // size of receive buffer
24 kOobInline, // OOB message inline
25 kKeepAlive, // keep socket alive
26 kReuseAddr, // allow reuse of local portion of address 5-tuple
27 kNoDelay, // send without delay
28 kNoBlock, // non-blocking I/O
29 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
30 kAtMark, // are we at out-of-band mark (read only)
31 kBytesToRead // get number of bytes to read, FIONREAD (read only)
32 };
33
34 enum ESendRecvOptions {
35 kDefault, // default option (= 0)
36 kOob, // send or receive out-of-band data
37 kPeek, // peek at incoming message (receive only)
38 kDontBlock // send/recv as much data as possible without blocking
39 };
40 */
41
42MTcpIpO::MTcpIpO(const char *addr, Int_t tx)
43{
44 fTxSocket = new TSocket(addr, tx);
45}
46
47MTcpIpO::~MTcpIpO()
48{
49 // Now delete all TCP/IP objects
50 delete fTxSocket;
51}
52
53MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(addr, tx)
54{
55 RunThread();
56}
57
58MTcpIpIO::~MTcpIpIO()
59{
60}
61
62TString MTcpIpO::GetSocketAddress(const TSocket &s)
63{
64 if (!s.IsValid())
65 return "n/a";
66
67 const TInetAddress &a = s.GetInetAddress();
68 if (!a.IsValid())
69 return "undefined";
70
71 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
72}
73
74TString MTcpIpO::GetSocketAddress() const
75{
76 return GetSocketAddress(*fTxSocket);
77}
78
79bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
80{
81 if (!tx.IsValid())
82 {
83 //gLog << warn << "WARNING - No transmission to " << GetSocketAddress(tx) << " possible." << endl;
84 return false;
85 }
86
87#ifdef DEBUG
88 cout << "Tx: " << msg << flush;
89#endif
90
91 const Int_t l = tx.SendRaw(msg, len);
92 if (l<0)
93 {
94 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
95 return false;
96 }
97
98 if (l!=len)
99 {
100 gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << GetSocketAddress(tx) << endl;
101 return false;
102 }
103
104 return true;
105}
106
107bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
108{
109 // R__LOCKGUARD2(myMutex);
110#ifdef DEBUG
111 cout << "Connecting to " << addr << ":" << port << endl;
112#endif
113
114 // FIXME: Set tx-socket to nonblocking?
115 TSocket tx(addr, port);
116 return SendFrame(tx, msg, len);
117/*
118 if (!tx.IsValid())
119 {
120 gLog << warn << "WARNING - No transmission to " << addr << ":" << port << " possible." << endl;
121 return false;
122 }
123
124 gLog << dbg << "Sending to " << addr << ":" << port << endl;
125
126 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
127 if (l<0)
128 {
129 gLog << err << "ERROR - Sending TCP/IP frame to " << addr << ":" << port << endl;
130 return false;
131 }
132
133 if (l!=len)
134 {
135 gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << addr << ":" << port << endl;
136 return false;
137 }
138
139#ifdef DEBUG
140 cout << "Tx: " << msg << flush;
141#endif
142
143 return true;
144 */
145}
146
147bool MTcpIpO::Send(const char *msg, Int_t len)
148{
149 return SendFrame(*fTxSocket, msg, len);
150}
151
152bool MTcpIpIO::InterpreteStr(TString str)
153{
154 cout << "Rx: " << str << flush;
155 return true;
156}
157
158Bool_t MTcpIpIO::ReadSocket(TSocket &rx)
159{
160 TString str;
161
162 while (!IsThreadCanceled())
163 {
164 char c;
165 const Int_t len = rx.RecvRaw(&c, 1);
166
167 // For details see TSocket::RecvRaw
168 // -1: // ERROR
169 // EINVAL, EWOULDBLOCK
170 // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer
171 // 0: Data received with zero length! (Connection lost/call interrupted)
172 if (len<=0)
173 return kFALSE;
174
175 // Data received
176 if (len>1)
177 {
178 cout << "Data too long!!!" << endl;
179 break;
180 }
181
182 // Data received (len==1)
183 if (c!='\n')
184 {
185 str += c;
186 continue;
187 }
188
189 // String completed
190 InterpreteStr(str);
191 str = "";
192 }
193
194 return kTRUE;
195}
196
197Bool_t MTcpIpI::WaitForData(TSocket &sock)
198{
199 // No connection established?
200 if (!sock.IsValid())
201 {
202 gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
203 return kFALSE;
204 }
205
206 fConnectionEstablished = kTRUE;
207
208 MTimeout timeout;
209
210 // Get connection on port fPortRx and redirected
211 while (!IsThreadCanceled())
212 {
213 // Check for pending data (every ms)
214 switch (sock.Select(TSocket::kRead, 1))
215 {
216 case kTRUE: // Data pending... go on reading
217 if (!ReadSocket(sock))
218 {
219 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
220 return kFALSE;
221 }
222 timeout.Start(fTimeout);
223
224 // Go on waiting for new data
225 continue;
226
227 case kFALSE: // Time out, no data yet, go on waiting
228 if (timeout.HasTimedOut())
229 {
230 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out." << endl;
231 return kFALSE;
232 }
233
234 // Go on waiting for new data
235 continue;
236
237 default: // Error occurance
238 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
239 return kFALSE;
240 }
241 }
242 return kTRUE; //???
243}
244
245void MTcpIpI::WaitForConnection(TServerSocket &server)
246{
247 while (!IsThreadCanceled())
248 {
249 gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl;
250
251 // Check for a connection request (reminder: we are in non-blocking mode)
252 TSocket *socket = 0;
253
254 while (!IsThreadCanceled())
255 {
256 //cout << (int) IsThreadCanceled() << endl;
257 // Wait for a new connection on RX port
258 socket = server.Accept();
259
260 // Accept returned an error
261 if (socket==0)
262 {
263 gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
264 // Since we don't know the type of the error we better shut down the socket
265 return;
266 }
267
268 // No connection request pending
269 if ((Long_t)socket<0)
270 {
271 MThread::Sleep(1000); // Wait a ms
272 continue;
273 }
274
275 // Connection established
276 break;
277 }
278
279 if ((Long_t)socket<=0)
280 return;
281
282 gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl;
283
284 if (!WaitForData(*socket))
285 fConnectionEstablished = kFALSE;
286
287 delete socket;
288 }
289}
290
291Int_t MTcpIpI::Thread()
292{
293 gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
294
295 while (!IsThreadCanceled())
296 {
297 TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0);
298 server->SetOption(kNoBlock, 1);
299
300 while (!IsThreadCanceled() && server->IsValid())
301 WaitForConnection(*server);
302
303 if (!server->IsValid())
304 {
305 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
306 switch (server->GetErrorCode())
307 {
308 case 0: gLog << "No error." << endl; break;
309 case -1: gLog << "low level socket() call failed." << endl; break;
310 case -2: gLog << "low level bind() call failed." << endl; break;
311 case -3: gLog << "low level listen() call failed." << endl; break;
312 default: gLog << "Unknown." << endl; break;
313 }
314 }
315
316 delete server;
317
318 MThread::Sleep(5000000);
319 }
320
321 gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
322
323 return 0;
324}
325
326/*
327Int_t MTcpIpI::Thread()
328{
329 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
330
331// if (fPortRx==7404)
332// {
333// gLog << err << "CeCo communication skipped." << endl;
334// return 0;
335// }
336
337 TServerSocket *fServSock=NULL;
338 TSocket *fRxSocket=NULL;
339
340// while (!HasStopFlag())
341 while (!IsThreadCanceled())
342 {
343 fServSock=new TServerSocket(fPortRx, kTRUE);
344 if (!fServSock->IsValid())
345 {
346 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
347 switch (fServSock->GetErrorCode())
348 {
349 case 0: gLog << "No error." << endl; break;
350 case -1: gLog << "low level socket() call failed." << endl; break;
351 case -2: gLog << "low level bind() call failed." << endl; break;
352 case -3: gLog << "low level listen() call failed." << endl; break;
353 default: gLog << "Unknown." << endl; break;
354 }
355 delete fServSock;
356 fServSock=NULL;
357 MThread::Sleep(5000000);
358 continue;
359 }
360
361 fServSock->SetOption(kNoBlock, 1);
362
363 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
364// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
365 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
366 {
367 //TThread::CancelPoint();
368
369 fRxSocket = fServSock->Accept();
370 if (fRxSocket==0)
371 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
372 usleep(10);
373 }
374
375 // Can happen in case of HasStopFlag()
376 if (fRxSocket==(void*)-1)
377 fRxSocket=NULL;
378
379 if (fRxSocket==NULL)
380 {
381 delete fServSock;
382 fServSock=NULL;
383 continue;
384 }
385
386 if (!fRxSocket->IsValid())
387 {
388 cout << "TSocket invalid on port " << fPortRx << "." << endl;
389 delete fServSock;
390 delete fRxSocket;
391 fServSock = NULL;
392 fRxSocket = NULL;
393 continue;
394 }
395
396 gLog << all << "Connection established on port " << fPortRx << "." << endl;
397
398
399 //fRxSocket->SetOption(kNoBlock, 1);
400
401 // Waqit for data
402 while (!IsThreadCanceled())
403 {
404 switch (fRxSocket->Select(kRead, 1))
405 {
406 case kTRUE: // Data waiting to be read
407 break;
408 case kFALSE: // time out
409 usleep(10);
410 continue;
411 }
412
413 // ERROR
414 cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
415
416 delete fServSock;
417 delete fRxSocket;
418 fServSock = NULL;
419 fRxSocket = NULL;
420 break;
421
422
423 if (!fServSock)
424 continue;
425
426 if (IsThreadCanceled())
427 {
428 delete fServSock;
429 delete fRxSocket;
430 fServSock = NULL;
431 fRxSocket = NULL;
432 continue;
433 }
434
435 // ------ IDENTICAL UP TO HERE ------
436
437 // Read and evaluate data
438 ReadSocket(*fRxSocket);
439
440 delete fServSock;
441 delete fRxSocket;
442 fServSock = NULL;
443 fRxSocket = NULL;
444 }
445
446 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
447
448 return 0;
449// return NULL;
450}*/
Note: See TracBrowser for help on using the repository browser.