source: trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc@ 9435

Last change on this file since 9435 was 9432, checked in by tbretz, 16 years ago
*** empty log message ***
File size: 12.1 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <errno.h>
6
7#include <TSocket.h>
8#include <TServerSocket.h>
9
10#include "MLog.h"
11#include "MLogManip.h"
12
13#include "MString.h"
14#include "MTimeout.h"
15
16#undef DEBUG
17//#define DEBUG
18
19using namespace std;
20
21 /*
22 enum ESockOptions {
23 kSendBuffer, // size of send buffer
24 kRecvBuffer, // size of receive buffer
25 kOobInline, // OOB message inline
26 kKeepAlive, // keep socket alive
27 kReuseAddr, // allow reuse of local portion of address 5-tuple
28 kNoDelay, // send without delay
29 kNoBlock, // non-blocking I/O
30 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
31 kAtMark, // are we at out-of-band mark (read only)
32 kBytesToRead // get number of bytes to read, FIONREAD (read only)
33 };
34
35 enum ESendRecvOptions {
36 kDefault, // default option (= 0)
37 kOob, // send or receive out-of-band data
38 kPeek, // peek at incoming message (receive only)
39 kDontBlock // send/recv as much data as possible without blocking
40 };
41 */
42
43MTcpIpO::MTcpIpO(const char *addr, Int_t tx) : fPortTx(tx)
44{
45 gLog << inf2 << "- Open send socket to " << addr << ":" << tx << endl;
46 fTxSocket = new TSocket(addr, tx);
47}
48
49MTcpIpO::~MTcpIpO()
50{
51 // Now delete all TCP/IP objects
52 delete fTxSocket;
53}
54
55MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(addr, tx)
56{
57 RunThread();
58}
59
60MTcpIpIO::~MTcpIpIO()
61{
62}
63
64TString MTcpIpO::GetSocketAddress(const TSocket &s)
65{
66 if (!s.IsValid())
67 return "n/a";
68
69 const TInetAddress &a = s.GetInetAddress();
70 if (!a.IsValid())
71 return "undefined";
72
73 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
74}
75
76TString MTcpIpO::GetSocketAddress() const
77{
78 return GetSocketAddress(*fTxSocket);
79}
80
81bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
82{
83 if (!tx.IsValid())
84 {
85 //gLog << warn << "WARNING - No transmission to " << GetSocketAddress(tx) << " possible." << endl;
86 return false;
87 }
88
89#ifdef DEBUG
90 cout << "Tx: " << msg << flush;
91#endif
92
93 const Int_t l = tx.SendRaw(msg, len);
94 if (l<0)
95 {
96 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
97 return false;
98 }
99
100 if (l!=len)
101 {
102 gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << GetSocketAddress(tx) << endl;
103 return false;
104 }
105
106 return true;
107}
108
109bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
110{
111 // R__LOCKGUARD2(myMutex);
112#ifdef DEBUG
113 cout << "Connecting to " << addr << ":" << port << endl;
114#endif
115
116 // FIXME: Set tx-socket to nonblocking?
117 TSocket tx(addr, port);
118 return SendFrame(tx, msg, len);
119/*
120 if (!tx.IsValid())
121 {
122 gLog << warn << "WARNING - No transmission to " << addr << ":" << port << " possible." << endl;
123 return false;
124 }
125
126 gLog << dbg << "Sending to " << addr << ":" << port << endl;
127
128 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
129 if (l<0)
130 {
131 gLog << err << "ERROR - Sending TCP/IP frame to " << addr << ":" << port << endl;
132 return false;
133 }
134
135 if (l!=len)
136 {
137 gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << addr << ":" << port << endl;
138 return false;
139 }
140
141#ifdef DEBUG
142 cout << "Tx: " << msg << flush;
143#endif
144
145 return true;
146 */
147}
148
149bool MTcpIpO::Send(const char *msg, Int_t len)
150{
151 if (!fTxSocket->IsValid())
152 {
153 const TInetAddress &a = fTxSocket->GetInetAddress();
154 if (!a.IsValid())
155 return false;
156#ifdef DEBUG
157 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
158#endif
159 delete fTxSocket;
160 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
161 }
162
163 return SendFrame(*fTxSocket, msg, len);
164}
165
166bool MTcpIpIO::InterpreteStr(TString str)
167{
168 cout << "Rx: " << str << flush;
169 return true;
170}
171
172Bool_t MTcpIpIO::ReadSocket(TSocket &rx)
173{
174 TString str;
175
176 while (!IsThreadCanceled())
177 {
178 char c;
179 const Int_t len = rx.RecvRaw(&c, 1);
180
181 // For details see TSocket::RecvRaw
182 // -1: // ERROR
183 // EINVAL, EWOULDBLOCK
184 // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer
185 // 0: Data received with zero length! (Connection lost/call interrupted)
186 if (len<=0)
187 return kFALSE;
188
189 // Data received
190 if (len>1)
191 {
192 cout << "Data too long!!!" << endl;
193 break;
194 }
195
196 // Data received (len==1)
197 if (c!='\n')
198 {
199 str += c;
200 continue;
201 }
202
203 // String completed
204 InterpreteStr(str);
205 str = "";
206 }
207
208 return kTRUE;
209}
210
211Bool_t MTcpIpI::WaitForData(TSocket &sock)
212{
213 // No connection established?
214 if (!sock.IsValid())
215 {
216 gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
217 return kFALSE;
218 }
219
220 fConnectionEstablished = kTRUE;
221
222 MTimeout timeout(fTimeout);
223
224 // Get connection on port fPortRx and redirected
225 while (!IsThreadCanceled())
226 {
227 // Check for pending data (every ms)
228 switch (sock.Select(TSocket::kRead, 1))
229 {
230 case kTRUE: // Data pending... go on reading
231 if (!ReadSocket(sock))
232 {
233 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
234 return kFALSE;
235 }
236 timeout.Start(fTimeout);
237
238 // Go on waiting for new data
239 continue;
240
241 case kFALSE: // Time out, no data yet, go on waiting
242 if (timeout.HasTimedOut())
243 {
244 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out after " << fTimeout << "ms." << endl;
245 return kFALSE;
246 }
247
248 // Go on waiting for new data
249 continue;
250
251 default: // Error occurance
252 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
253 return kFALSE;
254 }
255 }
256 return kTRUE; //???
257}
258
259void MTcpIpI::WaitForConnection(TServerSocket &server)
260{
261 while (!IsThreadCanceled())
262 {
263 gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl;
264
265 // Check for a connection request (reminder: we are in non-blocking mode)
266 TSocket *socket = 0;
267
268 while (!IsThreadCanceled())
269 {
270 //cout << (int) IsThreadCanceled() << endl;
271 // Wait for a new connection on RX port
272 socket = server.Accept();
273
274 // Accept returned an error
275 if (socket==0)
276 {
277 gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
278 // Since we don't know the type of the error we better shut down the socket
279 return;
280 }
281
282 // No connection request pending
283 if ((Long_t)socket<0)
284 {
285 MThread::Sleep(1000); // Wait a ms
286 continue;
287 }
288
289 // Connection established
290 break;
291 }
292
293 if ((Long_t)socket<=0)
294 return;
295
296 gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl;
297
298 if (!WaitForData(*socket))
299 fConnectionEstablished = kFALSE;
300
301#ifdef DEBUG
302 cout << "===> DEL SOCKET" << endl;
303#endif
304 delete socket;
305 }
306}
307
308Int_t MTcpIpI::Thread()
309{
310 gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
311
312 while (!IsThreadCanceled())
313 {
314 TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0);
315 server->SetOption(kNoBlock, 1);
316
317 while (!IsThreadCanceled() && server->IsValid())
318 WaitForConnection(*server);
319
320 if (!server->IsValid())
321 {
322 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
323 switch (server->GetErrorCode())
324 {
325 case 0: gLog << "No error." << endl; break;
326 case -1: gLog << "low level socket() call failed." << endl; break;
327 case -2: gLog << "low level bind() call failed." << endl; break;
328 case -3: gLog << "low level listen() call failed." << endl; break;
329 default: gLog << "Unknown." << endl; break;
330 }
331 }
332
333#ifdef DEBUG
334 cout << "===> DEL SERVER" << endl;
335#endif
336 delete server;
337
338 MThread::Sleep(5000000);
339 }
340
341 gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
342
343 return 0;
344}
345
346/*
347Int_t MTcpIpI::Thread()
348{
349 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
350
351// if (fPortRx==7404)
352// {
353// gLog << err << "CeCo communication skipped." << endl;
354// return 0;
355// }
356
357 TServerSocket *fServSock=NULL;
358 TSocket *fRxSocket=NULL;
359
360// while (!HasStopFlag())
361 while (!IsThreadCanceled())
362 {
363 fServSock=new TServerSocket(fPortRx, kTRUE);
364 if (!fServSock->IsValid())
365 {
366 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
367 switch (fServSock->GetErrorCode())
368 {
369 case 0: gLog << "No error." << endl; break;
370 case -1: gLog << "low level socket() call failed." << endl; break;
371 case -2: gLog << "low level bind() call failed." << endl; break;
372 case -3: gLog << "low level listen() call failed." << endl; break;
373 default: gLog << "Unknown." << endl; break;
374 }
375 delete fServSock;
376 fServSock=NULL;
377 MThread::Sleep(5000000);
378 continue;
379 }
380
381 fServSock->SetOption(kNoBlock, 1);
382
383 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
384// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
385 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
386 {
387 //TThread::CancelPoint();
388
389 fRxSocket = fServSock->Accept();
390 if (fRxSocket==0)
391 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
392 usleep(10);
393 }
394
395 // Can happen in case of HasStopFlag()
396 if (fRxSocket==(void*)-1)
397 fRxSocket=NULL;
398
399 if (fRxSocket==NULL)
400 {
401 delete fServSock;
402 fServSock=NULL;
403 continue;
404 }
405
406 if (!fRxSocket->IsValid())
407 {
408 cout << "TSocket invalid on port " << fPortRx << "." << endl;
409 delete fServSock;
410 delete fRxSocket;
411 fServSock = NULL;
412 fRxSocket = NULL;
413 continue;
414 }
415
416 gLog << all << "Connection established on port " << fPortRx << "." << endl;
417
418
419 //fRxSocket->SetOption(kNoBlock, 1);
420
421 // Waqit for data
422 while (!IsThreadCanceled())
423 {
424 switch (fRxSocket->Select(kRead, 1))
425 {
426 case kTRUE: // Data waiting to be read
427 break;
428 case kFALSE: // time out
429 usleep(10);
430 continue;
431 }
432
433 // ERROR
434 cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
435
436 delete fServSock;
437 delete fRxSocket;
438 fServSock = NULL;
439 fRxSocket = NULL;
440 break;
441
442
443 if (!fServSock)
444 continue;
445
446 if (IsThreadCanceled())
447 {
448 delete fServSock;
449 delete fRxSocket;
450 fServSock = NULL;
451 fRxSocket = NULL;
452 continue;
453 }
454
455 // ------ IDENTICAL UP TO HERE ------
456
457 // Read and evaluate data
458 ReadSocket(*fRxSocket);
459
460 delete fServSock;
461 delete fRxSocket;
462 fServSock = NULL;
463 fRxSocket = NULL;
464 }
465
466 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
467
468 return 0;
469// return NULL;
470}*/
Note: See TracBrowser for help on using the repository browser.