source: trunk/Cosy/tcpip/MTcpIpIO.cc@ 13449

Last change on this file since 13449 was 12598, checked in by tbretz, 13 years ago
Implemented a new class MTcpIpFact using a duplex communication for FACT
File size: 15.7 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <errno.h>
6
7#include <TSocket.h>
8#include <TServerSocket.h>
9
10#include "MLog.h"
11#include "MLogManip.h"
12
13#include "MString.h"
14#include "MTimeout.h"
15
16#undef DEBUG
17
18using namespace std;
19
20 /*
21 enum ESockOptions {
22 kSendBuffer, // size of send buffer
23 kRecvBuffer, // size of receive buffer
24 kOobInline, // OOB message inline
25 kKeepAlive, // keep socket alive
26 kReuseAddr, // allow reuse of local portion of address 5-tuple
27 kNoDelay, // send without delay
28 kNoBlock, // non-blocking I/O
29 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
30 kAtMark, // are we at out-of-band mark (read only)
31 kBytesToRead // get number of bytes to read, FIONREAD (read only)
32 };
33
34 enum ESendRecvOptions {
35 kDefault, // default option (= 0)
36 kOob, // send or receive out-of-band data
37 kPeek, // peek at incoming message (receive only)
38 kDontBlock // send/recv as much data as possible without blocking
39 };
40 */
41
42MTcpIpO::MTcpIpO(const char *addr, Int_t tx) : fPortTx(tx)
43{
44 // was "inf2" but nobody knows what the prg is doing if it times out
45 gLog << all << "- Open send socket to " << addr << ":" << tx << endl;
46
47 const Int_t save = gDebug;
48 gDebug=1;
49 fTxSocket = new TSocket(addr, tx);
50 gDebug=save;
51 fTxSocket->SetOption(kNoBlock, 1);
52
53 MTcpIpO::RunThread();
54}
55
56MTcpIpO::~MTcpIpO()
57{
58 CancelThread();
59
60 // Now delete all TCP/IP objects
61 delete fTxSocket;
62}
63
64MTcpIpCC::MTcpIpCC(Int_t rx, UInt_t timeout) : MTcpIpI(rx, timeout)
65{
66 MTcpIpI::RunThread();
67}
68
69MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx, UInt_t timeout) : MTcpIpCC(rx, timeout), MTcpIpO(addr, tx)
70{
71}
72
73TString MTcpIpO::GetSocketAddress(const TSocket &s)
74{
75 if (!s.IsValid())
76 return "n/a";
77
78 const TInetAddress &a = s.GetInetAddress();
79 if (!a.IsValid())
80 return "undefined";
81
82 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
83}
84
85/*
86TString MTcpIpO::GetSocketAddress() const
87{
88 TLockGuard guard(const_cast<TMutex*>(&fMutex));
89 return GetSocketAddress(*fTxSocket);
90}
91*/
92bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
93{
94 if (!tx.IsValid())
95 return false;
96
97#ifdef DEBUG
98 cout << "Tx: " << msg << flush;
99#endif
100
101 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
102
103 // Frame not sent, EWOULDBLOCK
104 if (l==-4)
105 {
106 gLog << err << "ERROR - Sending to " << GetSocketAddress(tx) << " would block." << endl;
107/*--->*/ tx.Close(); // ?????
108 return false;
109 }
110
111 if (l<0)
112 {
113 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
114 return false;
115 }
116
117 if (l!=len)
118 {
119 gLog << err << "ERROR - Could only sent " << l << " out of " << len << " bytes to " << GetSocketAddress(tx) << endl;
120 return false;
121 }
122
123 return true;
124}
125
126bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
127{
128#ifdef DEBUG
129 cout << "Connecting to " << addr << ":" << port << endl;
130#endif
131
132 // FIXME: Set tx-socket to nonblocking?
133 TSocket tx(addr, port);
134 return SendFrame(tx, msg, len);
135}
136
137bool MTcpIpO::Send(const char *msg, Int_t len)
138{
139 const Int_t mtx = fMutex.TryLock();
140 if (mtx==13)
141 gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
142
143 // If Mutex cannot be locked, i.e. we are currently reopening
144 // the send socket we cannot wait, because we would block the
145 // executing threrad.
146 if (mtx)
147 return false;
148
149 const bool rc = SendFrame(*fTxSocket, msg, len);
150
151 fMutex.UnLock();
152
153 return rc;
154}
155
156Int_t MTcpIpO::Thread()
157{
158 const TInetAddress &a = fTxSocket->GetInetAddress();
159 if (!a.IsValid())
160 {
161 gLog << err << "ERROR: Send socket address invalid." << endl;
162 return 0;
163 }
164
165 Int_t wait = 1000; /// Wait a ms
166
167 while (!IsThreadCanceled())
168 {
169 if (fTxSocket->IsValid())
170 {
171 MThread::Sleep(1000); // Wait a ms
172 wait = 1000;
173 continue;
174 }
175
176
177#ifdef DEBUG
178 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
179#endif
180
181 fMutex.Lock();
182
183 delete fTxSocket;
184
185 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
186 fTxSocket->SetOption(kNoBlock, 1);
187
188 fMutex.UnLock();
189
190 if (fTxSocket->IsValid())
191 continue;
192
193 MThread::Sleep(wait); // Wait a ms
194 if (wait<3000000) // 3s
195 wait *= 2;
196 }
197
198 return 1;
199}
200
201Int_t MTcpIpOI::Thread()
202{
203 fConnectionEstablished = kFALSE;
204
205 const TInetAddress &a = fTxSocket->GetInetAddress();
206 if (!a.IsValid())
207 {
208 gLog << err << "- MTcpIpOI::Thread - ERROR: Send socket address invalid." << endl;
209 return 0;
210 }
211
212 gLog << inf << "- MTcpIpOI::Thread created connecting to " << a.GetHostAddress() << ":" << fPortTx << endl;
213
214 Int_t wait = 1000; /// Wait a ms
215
216 MTimeout timeout(fTimeout);
217
218 while (!IsThreadCanceled())
219 {
220 if (fTxSocket->IsValid())
221 {
222 fConnectionEstablished = kTRUE;
223 fTimeout = 1000;
224
225 // Get connection on port fPortRx and redirected
226 // Check for pending data (every ms)
227 switch (fTxSocket->Select(TSocket::kRead, 1))
228 {
229 case kTRUE: // Data pending... go on reading
230 if (!ReadSocket(*fTxSocket))
231 {
232 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(*fTxSocket) << endl;
233 fTxSocket->Close();
234 continue;
235 }
236 timeout.Start(fTimeout);
237 continue;
238
239 case kFALSE: // Time out, no data yet, go on waiting
240 if (timeout.HasTimedOut())
241 {
242 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(*fTxSocket) << " timed out after " << fTimeout << "ms." << endl;
243 fTxSocket->Close();
244 continue;
245 }
246 continue;
247
248 default: // Error occurance
249 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
250 fTxSocket->Close();
251 continue;
252 }
253 }
254
255 fConnectionEstablished = kFALSE;
256
257#ifdef DEBUG
258 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
259#endif
260
261 fMutex.Lock();
262
263 delete fTxSocket;
264
265 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
266 fTxSocket->SetOption(kNoBlock, 1);
267
268 fMutex.UnLock();
269
270 timeout.Start(fTimeout);
271
272 if (fTxSocket->IsValid())
273 continue;
274
275 MThread::Sleep(wait); // Wait a ms
276 if (wait<3000000) // 3s
277 wait *= 2;
278 }
279
280 return 1;
281}
282
283bool MTcpIpCC::InterpreteStr(TString str)
284{
285 cout << "Rx: " << str << flush;
286 return true;
287}
288
289Bool_t MTcpIpCC::ReadSocket(TSocket &rx)
290{
291 TString str;
292
293 while (!MTcpIpI::IsThreadCanceled())
294 {
295 char c;
296 const Int_t len = rx.RecvRaw(&c, 1);
297
298 // For details see TSocket::RecvRaw
299 // -1: // ERROR
300 // EINVAL, EWOULDBLOCK
301 // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer
302 // 0: Data received with zero length! (Connection lost/call interrupted)
303 if (len<=0)
304 return kFALSE;
305
306 // Data received
307 if (len>1)
308 {
309 cout << "Data too long!!!" << endl;
310 break;
311 }
312
313 // Data received (len==1)
314 if (c!='\n' && c!=0)
315 {
316 str += c;
317 continue;
318 }
319
320 // String completed
321 InterpreteStr(str);
322 str = "";
323 }
324
325 return kTRUE;
326}
327
328Bool_t MTcpIpI::WaitForData(TSocket &sock)
329{
330 // No connection established?
331 if (!sock.IsValid())
332 {
333 gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
334 return kFALSE;
335 }
336
337 fRxMutex.Lock();
338
339 fConnectionEstablished = kTRUE;
340 fRxSocket = &sock;
341
342 fRxMutex.UnLock();
343
344 MTimeout timeout(fTimeout);
345
346 // Get connection on port fPortRx and redirected
347 while (!IsThreadCanceled())
348 {
349 // Check for pending data (every ms)
350 switch (sock.Select(TSocket::kRead, 1))
351 {
352 case kTRUE: // Data pending... go on reading
353 if (!ReadSocket(sock))
354 {
355 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
356 return kFALSE;
357 }
358 timeout.Start(fTimeout);
359
360 // Go on waiting for new data
361 continue;
362
363 case kFALSE: // Time out, no data yet, go on waiting
364 if (timeout.HasTimedOut())
365 {
366 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out after " << fTimeout << "ms." << endl;
367 return kFALSE;
368 }
369
370 // Go on waiting for new data
371 continue;
372
373 default: // Error occurance
374 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
375 return kFALSE;
376 }
377 }
378 return kTRUE; //???
379}
380
381void MTcpIpI::WaitForConnection(TServerSocket &server)
382{
383 while (!IsThreadCanceled())
384 {
385 gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl;
386
387 // Check for a connection request (reminder: we are in non-blocking mode)
388 TSocket *socket = 0;
389
390 while (!IsThreadCanceled())
391 {
392 //cout << (int) IsThreadCanceled() << endl;
393 // Wait for a new connection on RX port
394 socket = server.Accept();
395
396 // Accept returned an error
397 if (socket==0)
398 {
399 gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
400 // Since we don't know the type of the error we better shut down the socket
401 return;
402 }
403
404 // No connection request pending
405 if ((Long_t)socket<0)
406 {
407 MThread::Sleep(1000); // Wait a ms
408 continue;
409 }
410
411 // Connection established
412 break;
413 }
414
415 if ((Long_t)socket<=0)
416 return;
417
418 gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl;
419
420 WaitForData(*socket);
421
422 fRxMutex.Lock();
423
424 fRxSocket = 0;
425 fConnectionEstablished = kFALSE;
426
427 fRxMutex.UnLock();
428
429#ifdef DEBUG
430 cout << "===> DEL SOCKET" << endl;
431#endif
432 delete socket;
433 }
434}
435
436Int_t MTcpIpI::Thread()
437{
438 gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
439
440 while (!IsThreadCanceled())
441 {
442 TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0);
443 server->SetOption(kNoBlock, 1);
444
445 while (!IsThreadCanceled() && server->IsValid())
446 WaitForConnection(*server);
447
448 if (!server->IsValid())
449 {
450 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
451 switch (server->GetErrorCode())
452 {
453 case 0: gLog << "No error." << endl; break;
454 case -1: gLog << "low level socket() call failed." << endl; break;
455 case -2: gLog << "low level bind() call failed." << endl; break;
456 case -3: gLog << "low level listen() call failed." << endl; break;
457 default: gLog << "Unknown." << endl; break;
458 }
459 }
460
461#ifdef DEBUG
462 cout << "===> DEL SERVER" << endl;
463#endif
464 delete server;
465
466 MThread::Sleep(5000000);
467 }
468
469 gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
470
471 return 0;
472}
473
474bool MTcpIpFact::Send(const char *msg, int len)
475{
476 const Int_t mtx = fRxMutex.TryLock();
477 if (mtx==13)
478 gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
479
480 // If Mutex cannot be locked, i.e. we are currently reopening
481 // the send socket we cannot wait, because we would block the
482 // executing threrad.
483 if (mtx)
484 return false;
485
486 const bool rc = fRxSocket ? MTcpIpO::SendFrame(*fRxSocket, msg, len) : false;
487
488 fRxMutex.UnLock();
489
490 return rc;
491}
492
493/*
494Int_t MTcpIpI::Thread()
495{
496 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
497
498// if (fPortRx==7404)
499// {
500// gLog << err << "CeCo communication skipped." << endl;
501// return 0;
502// }
503
504 TServerSocket *fServSock=NULL;
505 TSocket *fRxSocket=NULL;
506
507// while (!HasStopFlag())
508 while (!IsThreadCanceled())
509 {
510 fServSock=new TServerSocket(fPortRx, kTRUE);
511 if (!fServSock->IsValid())
512 {
513 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
514 switch (fServSock->GetErrorCode())
515 {
516 case 0: gLog << "No error." << endl; break;
517 case -1: gLog << "low level socket() call failed." << endl; break;
518 case -2: gLog << "low level bind() call failed." << endl; break;
519 case -3: gLog << "low level listen() call failed." << endl; break;
520 default: gLog << "Unknown." << endl; break;
521 }
522 delete fServSock;
523 fServSock=NULL;
524 MThread::Sleep(5000000);
525 continue;
526 }
527
528 fServSock->SetOption(kNoBlock, 1);
529
530 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
531// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
532 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
533 {
534 //TThread::CancelPoint();
535
536 fRxSocket = fServSock->Accept();
537 if (fRxSocket==0)
538 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
539 usleep(10);
540 }
541
542 // Can happen in case of HasStopFlag()
543 if (fRxSocket==(void*)-1)
544 fRxSocket=NULL;
545
546 if (fRxSocket==NULL)
547 {
548 delete fServSock;
549 fServSock=NULL;
550 continue;
551 }
552
553 if (!fRxSocket->IsValid())
554 {
555 cout << "TSocket invalid on port " << fPortRx << "." << endl;
556 delete fServSock;
557 delete fRxSocket;
558 fServSock = NULL;
559 fRxSocket = NULL;
560 continue;
561 }
562
563 gLog << all << "Connection established on port " << fPortRx << "." << endl;
564
565
566 //fRxSocket->SetOption(kNoBlock, 1);
567
568 // Waqit for data
569 while (!IsThreadCanceled())
570 {
571 switch (fRxSocket->Select(kRead, 1))
572 {
573 case kTRUE: // Data waiting to be read
574 break;
575 case kFALSE: // time out
576 usleep(10);
577 continue;
578 }
579
580 // ERROR
581 cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
582
583 delete fServSock;
584 delete fRxSocket;
585 fServSock = NULL;
586 fRxSocket = NULL;
587 break;
588
589
590 if (!fServSock)
591 continue;
592
593 if (IsThreadCanceled())
594 {
595 delete fServSock;
596 delete fRxSocket;
597 fServSock = NULL;
598 fRxSocket = NULL;
599 continue;
600 }
601
602 // ------ IDENTICAL UP TO HERE ------
603
604 // Read and evaluate data
605 ReadSocket(*fRxSocket);
606
607 delete fServSock;
608 delete fRxSocket;
609 fServSock = NULL;
610 fRxSocket = NULL;
611 }
612
613 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
614
615 return 0;
616// return NULL;
617}*/
Note: See TracBrowser for help on using the repository browser.