source: trunk/Cosy/tcpip/MTcpIpIO.cc@ 17345

Last change on this file since 17345 was 14321, checked in by tbretz, 12 years ago
Added the kNoDelay option to ensure that packets are not split.
File size: 15.9 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <errno.h>
6
7#include <TSocket.h>
8#include <TServerSocket.h>
9
10#include "MLog.h"
11#include "MLogManip.h"
12
13#include "MString.h"
14#include "MTimeout.h"
15
16#undef DEBUG
17
18using namespace std;
19
20 /*
21 enum ESockOptions {
22 kSendBuffer, // size of send buffer
23 kRecvBuffer, // size of receive buffer
24 kOobInline, // OOB message inline
25 kKeepAlive, // keep socket alive
26 kReuseAddr, // allow reuse of local portion of address 5-tuple
27 kNoDelay, // send without delay
28 kNoBlock, // non-blocking I/O
29 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
30 kAtMark, // are we at out-of-band mark (read only)
31 kBytesToRead // get number of bytes to read, FIONREAD (read only)
32 };
33
34 enum ESendRecvOptions {
35 kDefault, // default option (= 0)
36 kOob, // send or receive out-of-band data
37 kPeek, // peek at incoming message (receive only)
38 kDontBlock // send/recv as much data as possible without blocking
39 };
40 */
41
42MTcpIpO::MTcpIpO(const char *addr, Int_t tx) : fPortTx(tx)
43{
44 // was "inf2" but nobody knows what the prg is doing if it times out
45 gLog << all << "- Open send socket to " << addr << ":" << tx << endl;
46
47 const Int_t save = gDebug;
48 gDebug=1;
49 fTxSocket = new TSocket(addr, tx);
50 gDebug=save;
51 fTxSocket->SetOption(kNoBlock, 1);
52 fTxSocket->SetOption(kNodelay, 1);
53
54 MTcpIpO::RunThread();
55}
56
57MTcpIpO::~MTcpIpO()
58{
59 CancelThread();
60
61 // Now delete all TCP/IP objects
62 delete fTxSocket;
63}
64
65MTcpIpCC::MTcpIpCC(Int_t rx, UInt_t timeout) : MTcpIpI(rx, timeout)
66{
67 MTcpIpI::RunThread();
68}
69
70MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx, UInt_t timeout) : MTcpIpCC(rx, timeout), MTcpIpO(addr, tx)
71{
72}
73
74TString MTcpIpO::GetSocketAddress(const TSocket &s)
75{
76 if (!s.IsValid())
77 return "n/a";
78
79 const TInetAddress &a = s.GetInetAddress();
80 if (!a.IsValid())
81 return "undefined";
82
83 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
84}
85
86/*
87TString MTcpIpO::GetSocketAddress() const
88{
89 TLockGuard guard(const_cast<TMutex*>(&fMutex));
90 return GetSocketAddress(*fTxSocket);
91}
92*/
93bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
94{
95 if (!tx.IsValid())
96 return false;
97
98#ifdef DEBUG
99 cout << "Tx: " << msg << flush;
100#endif
101
102 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
103
104 // Frame not sent, EWOULDBLOCK
105 if (l==-4)
106 {
107 gLog << err << "ERROR - Sending to " << GetSocketAddress(tx) << " would block." << endl;
108/*--->*/ tx.Close(); // ?????
109 return false;
110 }
111
112 if (l<0)
113 {
114 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
115 return false;
116 }
117
118 if (l!=len)
119 {
120 gLog << err << "ERROR - Could only sent " << l << " out of " << len << " bytes to " << GetSocketAddress(tx) << endl;
121 return false;
122 }
123
124 return true;
125}
126
127bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
128{
129#ifdef DEBUG
130 cout << "Connecting to " << addr << ":" << port << endl;
131#endif
132
133 // FIXME: Set tx-socket to nonblocking?
134 TSocket tx(addr, port);
135 return SendFrame(tx, msg, len);
136}
137
138bool MTcpIpO::Send(const char *msg, Int_t len)
139{
140 const Int_t mtx = fMutex.TryLock();
141 if (mtx==13)
142 gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
143
144 // If Mutex cannot be locked, i.e. we are currently reopening
145 // the send socket we cannot wait, because we would block the
146 // executing threrad.
147 if (mtx)
148 return false;
149
150 const bool rc = SendFrame(*fTxSocket, msg, len);
151
152 fMutex.UnLock();
153
154 return rc;
155}
156
157Int_t MTcpIpO::Thread()
158{
159 const TInetAddress &a = fTxSocket->GetInetAddress();
160 if (!a.IsValid())
161 {
162 gLog << err << "ERROR: Send socket address invalid." << endl;
163 return 0;
164 }
165
166 Int_t wait = 1000; /// Wait a ms
167
168 while (!IsThreadCanceled())
169 {
170 if (fTxSocket->IsValid())
171 {
172 MThread::Sleep(1000); // Wait a ms
173 wait = 1000;
174 continue;
175 }
176
177
178#ifdef DEBUG
179 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
180#endif
181
182 fMutex.Lock();
183
184 delete fTxSocket;
185
186 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
187 fTxSocket->SetOption(kNoBlock, 1);
188 fTxSocket->SetOption(kNodelay, 1);
189
190 fMutex.UnLock();
191
192 if (fTxSocket->IsValid())
193 continue;
194
195 MThread::Sleep(wait); // Wait a ms
196 if (wait<3000000) // 3s
197 wait *= 2;
198 }
199
200 return 1;
201}
202
203Int_t MTcpIpOI::Thread()
204{
205 fConnectionEstablished = kFALSE;
206
207 const TInetAddress &a = fTxSocket->GetInetAddress();
208 if (!a.IsValid())
209 {
210 gLog << err << "- MTcpIpOI::Thread - ERROR: Send socket address invalid." << endl;
211 return 0;
212 }
213
214 gLog << inf << "- MTcpIpOI::Thread created connecting to " << a.GetHostAddress() << ":" << fPortTx << endl;
215
216 Int_t wait = 1000; /// Wait a ms
217
218 MTimeout timeout(fTimeout);
219
220 while (!IsThreadCanceled())
221 {
222 if (fTxSocket->IsValid())
223 {
224 fConnectionEstablished = kTRUE;
225 fTimeout = 1000;
226
227 // Get connection on port fPortRx and redirected
228 // Check for pending data (every ms)
229 switch (fTxSocket->Select(TSocket::kRead, 1))
230 {
231 case kTRUE: // Data pending... go on reading
232 if (!ReadSocket(*fTxSocket))
233 {
234 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(*fTxSocket) << endl;
235 fTxSocket->Close();
236 continue;
237 }
238 timeout.Start(fTimeout);
239 continue;
240
241 case kFALSE: // Time out, no data yet, go on waiting
242 if (timeout.HasTimedOut())
243 {
244 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(*fTxSocket) << " timed out after " << fTimeout << "ms." << endl;
245 fTxSocket->Close();
246 continue;
247 }
248 continue;
249
250 default: // Error occurance
251 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
252 fTxSocket->Close();
253 continue;
254 }
255 }
256
257 fConnectionEstablished = kFALSE;
258
259#ifdef DEBUG
260 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
261#endif
262
263 fMutex.Lock();
264
265 delete fTxSocket;
266
267 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
268 fTxSocket->SetOption(kNoBlock, 1);
269 fTxSocket->SetOption(kNoDelay, 1);
270
271 fMutex.UnLock();
272
273 timeout.Start(fTimeout);
274
275 if (fTxSocket->IsValid())
276 continue;
277
278 MThread::Sleep(wait); // Wait a ms
279 if (wait<3000000) // 3s
280 wait *= 2;
281 }
282
283 return 1;
284}
285
286bool MTcpIpCC::InterpreteStr(TString str)
287{
288 cout << "Rx: " << str << flush;
289 return true;
290}
291
292Bool_t MTcpIpCC::ReadSocket(TSocket &rx)
293{
294 TString str;
295
296 while (!MTcpIpI::IsThreadCanceled())
297 {
298 char c;
299 const Int_t len = rx.RecvRaw(&c, 1);
300
301 // For details see TSocket::RecvRaw
302 // -1: // ERROR
303 // EINVAL, EWOULDBLOCK
304 // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer
305 // 0: Data received with zero length! (Connection lost/call interrupted)
306 if (len<=0)
307 return kFALSE;
308
309 // Data received
310 if (len>1)
311 {
312 cout << "Data too long!!!" << endl;
313 break;
314 }
315
316 // Data received (len==1)
317 if (c!='\n' && c!=0)
318 {
319 str += c;
320 continue;
321 }
322
323 // String completed
324 InterpreteStr(str);
325 str = "";
326 }
327
328 return kTRUE;
329}
330
331Bool_t MTcpIpI::WaitForData(TSocket &sock)
332{
333 // No connection established?
334 if (!sock.IsValid())
335 {
336 gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
337 return kFALSE;
338 }
339
340 fRxMutex.Lock();
341
342 fConnectionEstablished = kTRUE;
343 fRxSocket = &sock;
344
345 fRxMutex.UnLock();
346
347 MTimeout timeout(fTimeout);
348
349 // Get connection on port fPortRx and redirected
350 while (!IsThreadCanceled())
351 {
352 // Check for pending data (every ms)
353 switch (sock.Select(TSocket::kRead, 1))
354 {
355 case kTRUE: // Data pending... go on reading
356 if (!ReadSocket(sock))
357 {
358 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
359 return kFALSE;
360 }
361 timeout.Start(fTimeout);
362
363 // Go on waiting for new data
364 continue;
365
366 case kFALSE: // Time out, no data yet, go on waiting
367 if (timeout.HasTimedOut())
368 {
369 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out after " << fTimeout << "ms." << endl;
370 return kFALSE;
371 }
372
373 // Go on waiting for new data
374 continue;
375
376 default: // Error occurance
377 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
378 return kFALSE;
379 }
380 }
381 return kTRUE; //???
382}
383
384void MTcpIpI::WaitForConnection(TServerSocket &server)
385{
386 while (!IsThreadCanceled())
387 {
388 gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl;
389
390 // Check for a connection request (reminder: we are in non-blocking mode)
391 TSocket *socket = 0;
392
393 while (!IsThreadCanceled())
394 {
395 //cout << (int) IsThreadCanceled() << endl;
396 // Wait for a new connection on RX port
397 socket = server.Accept();
398
399 // Accept returned an error
400 if (socket==0)
401 {
402 gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
403 // Since we don't know the type of the error we better shut down the socket
404 return;
405 }
406
407 // No connection request pending
408 if ((Long_t)socket<0)
409 {
410 MThread::Sleep(1000); // Wait a ms
411 continue;
412 }
413
414 // Connection established
415 break;
416 }
417
418 if ((Long_t)socket<=0)
419 return;
420
421 gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl;
422
423 WaitForData(*socket);
424
425 fRxMutex.Lock();
426
427 fRxSocket = 0;
428 fConnectionEstablished = kFALSE;
429
430 fRxMutex.UnLock();
431
432#ifdef DEBUG
433 cout << "===> DEL SOCKET" << endl;
434#endif
435 delete socket;
436 }
437}
438
439Int_t MTcpIpI::Thread()
440{
441 gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
442
443 while (!IsThreadCanceled())
444 {
445 TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0);
446 server->SetOption(kNoBlock, 1);
447 server->SetOption(kNoDelay, 1);
448
449 while (!IsThreadCanceled() && server->IsValid())
450 WaitForConnection(*server);
451
452 if (!server->IsValid())
453 {
454 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
455 switch (server->GetErrorCode())
456 {
457 case 0: gLog << "No error." << endl; break;
458 case -1: gLog << "low level socket() call failed." << endl; break;
459 case -2: gLog << "low level bind() call failed." << endl; break;
460 case -3: gLog << "low level listen() call failed." << endl; break;
461 default: gLog << "Unknown." << endl; break;
462 }
463 }
464
465#ifdef DEBUG
466 cout << "===> DEL SERVER" << endl;
467#endif
468 delete server;
469
470 MThread::Sleep(5000000);
471 }
472
473 gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
474
475 return 0;
476}
477
478bool MTcpIpFact::Send(const char *msg, int len)
479{
480 const Int_t mtx = fRxMutex.TryLock();
481 if (mtx==13)
482 gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
483
484 // If Mutex cannot be locked, i.e. we are currently reopening
485 // the send socket we cannot wait, because we would block the
486 // executing threrad.
487 if (mtx)
488 return false;
489
490 const bool rc = fRxSocket ? MTcpIpO::SendFrame(*fRxSocket, msg, len) : false;
491
492 fRxMutex.UnLock();
493
494 return rc;
495}
496
497/*
498Int_t MTcpIpI::Thread()
499{
500 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
501
502// if (fPortRx==7404)
503// {
504// gLog << err << "CeCo communication skipped." << endl;
505// return 0;
506// }
507
508 TServerSocket *fServSock=NULL;
509 TSocket *fRxSocket=NULL;
510
511// while (!HasStopFlag())
512 while (!IsThreadCanceled())
513 {
514 fServSock=new TServerSocket(fPortRx, kTRUE);
515 if (!fServSock->IsValid())
516 {
517 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
518 switch (fServSock->GetErrorCode())
519 {
520 case 0: gLog << "No error." << endl; break;
521 case -1: gLog << "low level socket() call failed." << endl; break;
522 case -2: gLog << "low level bind() call failed." << endl; break;
523 case -3: gLog << "low level listen() call failed." << endl; break;
524 default: gLog << "Unknown." << endl; break;
525 }
526 delete fServSock;
527 fServSock=NULL;
528 MThread::Sleep(5000000);
529 continue;
530 }
531
532 fServSock->SetOption(kNoBlock, 1);
533
534 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
535// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
536 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
537 {
538 //TThread::CancelPoint();
539
540 fRxSocket = fServSock->Accept();
541 if (fRxSocket==0)
542 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
543 usleep(10);
544 }
545
546 // Can happen in case of HasStopFlag()
547 if (fRxSocket==(void*)-1)
548 fRxSocket=NULL;
549
550 if (fRxSocket==NULL)
551 {
552 delete fServSock;
553 fServSock=NULL;
554 continue;
555 }
556
557 if (!fRxSocket->IsValid())
558 {
559 cout << "TSocket invalid on port " << fPortRx << "." << endl;
560 delete fServSock;
561 delete fRxSocket;
562 fServSock = NULL;
563 fRxSocket = NULL;
564 continue;
565 }
566
567 gLog << all << "Connection established on port " << fPortRx << "." << endl;
568
569
570 //fRxSocket->SetOption(kNoBlock, 1);
571
572 // Waqit for data
573 while (!IsThreadCanceled())
574 {
575 switch (fRxSocket->Select(kRead, 1))
576 {
577 case kTRUE: // Data waiting to be read
578 break;
579 case kFALSE: // time out
580 usleep(10);
581 continue;
582 }
583
584 // ERROR
585 cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
586
587 delete fServSock;
588 delete fRxSocket;
589 fServSock = NULL;
590 fRxSocket = NULL;
591 break;
592
593
594 if (!fServSock)
595 continue;
596
597 if (IsThreadCanceled())
598 {
599 delete fServSock;
600 delete fRxSocket;
601 fServSock = NULL;
602 fRxSocket = NULL;
603 continue;
604 }
605
606 // ------ IDENTICAL UP TO HERE ------
607
608 // Read and evaluate data
609 ReadSocket(*fRxSocket);
610
611 delete fServSock;
612 delete fRxSocket;
613 fServSock = NULL;
614 fRxSocket = NULL;
615 }
616
617 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
618
619 return 0;
620// return NULL;
621}*/
Note: See TracBrowser for help on using the repository browser.