source: trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc@ 9439

Last change on this file since 9439 was 9439, checked in by tbretz, 16 years ago
*** empty log message ***
File size: 12.7 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <errno.h>
6
7#include <TSocket.h>
8#include <TServerSocket.h>
9
10#include "MLog.h"
11#include "MLogManip.h"
12
13#include "MString.h"
14#include "MTimeout.h"
15
16#undef DEBUG
17//#define DEBUG
18
19using namespace std;
20
21 /*
22 enum ESockOptions {
23 kSendBuffer, // size of send buffer
24 kRecvBuffer, // size of receive buffer
25 kOobInline, // OOB message inline
26 kKeepAlive, // keep socket alive
27 kReuseAddr, // allow reuse of local portion of address 5-tuple
28 kNoDelay, // send without delay
29 kNoBlock, // non-blocking I/O
30 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
31 kAtMark, // are we at out-of-band mark (read only)
32 kBytesToRead // get number of bytes to read, FIONREAD (read only)
33 };
34
35 enum ESendRecvOptions {
36 kDefault, // default option (= 0)
37 kOob, // send or receive out-of-band data
38 kPeek, // peek at incoming message (receive only)
39 kDontBlock // send/recv as much data as possible without blocking
40 };
41 */
42
43MTcpIpO::MTcpIpO(const char *addr, Int_t tx) : fPortTx(tx)
44{
45 // was "inf2" but nobody knows what the prg is doing if it times out
46 gLog << all << "- Open send socket to " << addr << ":" << tx << endl;
47
48 fTxSocket = new TSocket(addr, tx);
49 fTxSocket->SetOption(kNoBlock, 1);
50
51 MTcpIpO::RunThread();
52}
53
54MTcpIpO::~MTcpIpO()
55{
56 CancelThread();
57
58 // Now delete all TCP/IP objects
59 delete fTxSocket;
60}
61
62MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(addr, tx)
63{
64 MTcpIpI::RunThread();
65}
66
67MTcpIpIO::~MTcpIpIO()
68{
69}
70
71TString MTcpIpO::GetSocketAddress(const TSocket &s)
72{
73 if (!s.IsValid())
74 return "n/a";
75
76 const TInetAddress &a = s.GetInetAddress();
77 if (!a.IsValid())
78 return "undefined";
79
80 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
81}
82
83/*
84TString MTcpIpO::GetSocketAddress() const
85{
86 TLockGuard guard(const_cast<TMutex*>(&fMutex));
87 return GetSocketAddress(*fTxSocket);
88}
89*/
90bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
91{
92 if (!tx.IsValid())
93 return false;
94
95#ifdef DEBUG
96 cout << "Tx: " << msg << flush;
97#endif
98
99 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
100
101 // Frame not sent, EWOULDBLOCK
102 if (l==-4)
103 {
104 gLog << err << "ERROR - Sending to " << GetSocketAddress(tx) << " would block." << endl;
105/*--->*/ tx.Close(); // ?????
106 return false;
107 }
108
109 if (l<0)
110 {
111 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
112 return false;
113 }
114
115 if (l!=len)
116 {
117 gLog << err << "ERROR - Could only sent " << l << " out of " << len << " bytes to " << GetSocketAddress(tx) << endl;
118 return false;
119 }
120
121 return true;
122}
123
124bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
125{
126#ifdef DEBUG
127 cout << "Connecting to " << addr << ":" << port << endl;
128#endif
129
130 // FIXME: Set tx-socket to nonblocking?
131 TSocket tx(addr, port);
132 return SendFrame(tx, msg, len);
133}
134
135bool MTcpIpO::Send(const char *msg, Int_t len)
136{
137 const Int_t mtx = fMutex.TryLock();
138 if (mtx==13)
139 gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
140
141 // If Mutex cannot be locked, i.e. we are currently reopening
142 // the send socket we cannot wait, because we would block the
143 // executing threrad.
144 if (mtx)
145 return false;
146
147 const bool rc = SendFrame(*fTxSocket, msg, len);
148
149 fMutex.UnLock();
150
151 return rc;
152}
153
154Int_t MTcpIpO::Thread()
155{
156 const TInetAddress &a = fTxSocket->GetInetAddress();
157 if (!a.IsValid())
158 {
159 gLog << err << "ERROR: Send socket address invalid." << endl;
160 return 0;
161 }
162
163 Int_t wait = 1000; /// Wait a ms
164
165 while (!IsThreadCanceled())
166 {
167 if (fTxSocket->IsValid())
168 {
169 MThread::Sleep(1000); // Wait a ms
170 wait = 1000;
171 continue;
172 }
173
174
175#ifdef DEBUG
176 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
177#endif
178
179 fMutex.Lock();
180
181 delete fTxSocket;
182
183 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
184 fTxSocket->SetOption(kNoBlock, 1);
185
186 fMutex.UnLock();
187
188 if (fTxSocket->IsValid())
189 continue;
190
191 MThread::Sleep(wait); // Wait a ms
192 if (wait<30000000) // 30s
193 wait *= 2;
194 }
195
196 return 1;
197}
198
199bool MTcpIpIO::InterpreteStr(TString str)
200{
201 cout << "Rx: " << str << flush;
202 return true;
203}
204
205Bool_t MTcpIpIO::ReadSocket(TSocket &rx)
206{
207 TString str;
208
209 while (!MTcpIpI::IsThreadCanceled())
210 {
211 char c;
212 const Int_t len = rx.RecvRaw(&c, 1);
213
214 // For details see TSocket::RecvRaw
215 // -1: // ERROR
216 // EINVAL, EWOULDBLOCK
217 // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer
218 // 0: Data received with zero length! (Connection lost/call interrupted)
219 if (len<=0)
220 return kFALSE;
221
222 // Data received
223 if (len>1)
224 {
225 cout << "Data too long!!!" << endl;
226 break;
227 }
228
229 // Data received (len==1)
230 if (c!='\n')
231 {
232 str += c;
233 continue;
234 }
235
236 // String completed
237 InterpreteStr(str);
238 str = "";
239 }
240
241 return kTRUE;
242}
243
244Bool_t MTcpIpI::WaitForData(TSocket &sock)
245{
246 // No connection established?
247 if (!sock.IsValid())
248 {
249 gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
250 return kFALSE;
251 }
252
253 fConnectionEstablished = kTRUE;
254
255 MTimeout timeout(fTimeout);
256
257 // Get connection on port fPortRx and redirected
258 while (!IsThreadCanceled())
259 {
260 // Check for pending data (every ms)
261 switch (sock.Select(TSocket::kRead, 1))
262 {
263 case kTRUE: // Data pending... go on reading
264 if (!ReadSocket(sock))
265 {
266 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
267 return kFALSE;
268 }
269 timeout.Start(fTimeout);
270
271 // Go on waiting for new data
272 continue;
273
274 case kFALSE: // Time out, no data yet, go on waiting
275 if (timeout.HasTimedOut())
276 {
277 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out after " << fTimeout << "ms." << endl;
278 return kFALSE;
279 }
280
281 // Go on waiting for new data
282 continue;
283
284 default: // Error occurance
285 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
286 return kFALSE;
287 }
288 }
289 return kTRUE; //???
290}
291
292void MTcpIpI::WaitForConnection(TServerSocket &server)
293{
294 while (!IsThreadCanceled())
295 {
296 gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl;
297
298 // Check for a connection request (reminder: we are in non-blocking mode)
299 TSocket *socket = 0;
300
301 while (!IsThreadCanceled())
302 {
303 //cout << (int) IsThreadCanceled() << endl;
304 // Wait for a new connection on RX port
305 socket = server.Accept();
306
307 // Accept returned an error
308 if (socket==0)
309 {
310 gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
311 // Since we don't know the type of the error we better shut down the socket
312 return;
313 }
314
315 // No connection request pending
316 if ((Long_t)socket<0)
317 {
318 MThread::Sleep(1000); // Wait a ms
319 continue;
320 }
321
322 // Connection established
323 break;
324 }
325
326 if ((Long_t)socket<=0)
327 return;
328
329 gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl;
330
331 if (!WaitForData(*socket))
332 fConnectionEstablished = kFALSE;
333
334#ifdef DEBUG
335 cout << "===> DEL SOCKET" << endl;
336#endif
337 delete socket;
338 }
339}
340
341Int_t MTcpIpI::Thread()
342{
343 gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
344
345 while (!IsThreadCanceled())
346 {
347 TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0);
348 server->SetOption(kNoBlock, 1);
349
350 while (!IsThreadCanceled() && server->IsValid())
351 WaitForConnection(*server);
352
353 if (!server->IsValid())
354 {
355 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
356 switch (server->GetErrorCode())
357 {
358 case 0: gLog << "No error." << endl; break;
359 case -1: gLog << "low level socket() call failed." << endl; break;
360 case -2: gLog << "low level bind() call failed." << endl; break;
361 case -3: gLog << "low level listen() call failed." << endl; break;
362 default: gLog << "Unknown." << endl; break;
363 }
364 }
365
366#ifdef DEBUG
367 cout << "===> DEL SERVER" << endl;
368#endif
369 delete server;
370
371 MThread::Sleep(5000000);
372 }
373
374 gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
375
376 return 0;
377}
378
379/*
380Int_t MTcpIpI::Thread()
381{
382 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
383
384// if (fPortRx==7404)
385// {
386// gLog << err << "CeCo communication skipped." << endl;
387// return 0;
388// }
389
390 TServerSocket *fServSock=NULL;
391 TSocket *fRxSocket=NULL;
392
393// while (!HasStopFlag())
394 while (!IsThreadCanceled())
395 {
396 fServSock=new TServerSocket(fPortRx, kTRUE);
397 if (!fServSock->IsValid())
398 {
399 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
400 switch (fServSock->GetErrorCode())
401 {
402 case 0: gLog << "No error." << endl; break;
403 case -1: gLog << "low level socket() call failed." << endl; break;
404 case -2: gLog << "low level bind() call failed." << endl; break;
405 case -3: gLog << "low level listen() call failed." << endl; break;
406 default: gLog << "Unknown." << endl; break;
407 }
408 delete fServSock;
409 fServSock=NULL;
410 MThread::Sleep(5000000);
411 continue;
412 }
413
414 fServSock->SetOption(kNoBlock, 1);
415
416 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
417// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
418 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
419 {
420 //TThread::CancelPoint();
421
422 fRxSocket = fServSock->Accept();
423 if (fRxSocket==0)
424 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
425 usleep(10);
426 }
427
428 // Can happen in case of HasStopFlag()
429 if (fRxSocket==(void*)-1)
430 fRxSocket=NULL;
431
432 if (fRxSocket==NULL)
433 {
434 delete fServSock;
435 fServSock=NULL;
436 continue;
437 }
438
439 if (!fRxSocket->IsValid())
440 {
441 cout << "TSocket invalid on port " << fPortRx << "." << endl;
442 delete fServSock;
443 delete fRxSocket;
444 fServSock = NULL;
445 fRxSocket = NULL;
446 continue;
447 }
448
449 gLog << all << "Connection established on port " << fPortRx << "." << endl;
450
451
452 //fRxSocket->SetOption(kNoBlock, 1);
453
454 // Waqit for data
455 while (!IsThreadCanceled())
456 {
457 switch (fRxSocket->Select(kRead, 1))
458 {
459 case kTRUE: // Data waiting to be read
460 break;
461 case kFALSE: // time out
462 usleep(10);
463 continue;
464 }
465
466 // ERROR
467 cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
468
469 delete fServSock;
470 delete fRxSocket;
471 fServSock = NULL;
472 fRxSocket = NULL;
473 break;
474
475
476 if (!fServSock)
477 continue;
478
479 if (IsThreadCanceled())
480 {
481 delete fServSock;
482 delete fRxSocket;
483 fServSock = NULL;
484 fRxSocket = NULL;
485 continue;
486 }
487
488 // ------ IDENTICAL UP TO HERE ------
489
490 // Read and evaluate data
491 ReadSocket(*fRxSocket);
492
493 delete fServSock;
494 delete fRxSocket;
495 fServSock = NULL;
496 fRxSocket = NULL;
497 }
498
499 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
500
501 return 0;
502// return NULL;
503}*/
Note: See TracBrowser for help on using the repository browser.