source: trunk/Cosy/tcpip/MTcpIpIO.cc@ 12582

Last change on this file since 12582 was 10031, checked in by tbretz, 14 years ago
Changed connection from SPS to a client-only connection.
File size: 15.0 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <errno.h>
6
7#include <TSocket.h>
8#include <TServerSocket.h>
9
10#include "MLog.h"
11#include "MLogManip.h"
12
13#include "MString.h"
14#include "MTimeout.h"
15
16#undef DEBUG
17
18using namespace std;
19
20 /*
21 enum ESockOptions {
22 kSendBuffer, // size of send buffer
23 kRecvBuffer, // size of receive buffer
24 kOobInline, // OOB message inline
25 kKeepAlive, // keep socket alive
26 kReuseAddr, // allow reuse of local portion of address 5-tuple
27 kNoDelay, // send without delay
28 kNoBlock, // non-blocking I/O
29 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
30 kAtMark, // are we at out-of-band mark (read only)
31 kBytesToRead // get number of bytes to read, FIONREAD (read only)
32 };
33
34 enum ESendRecvOptions {
35 kDefault, // default option (= 0)
36 kOob, // send or receive out-of-band data
37 kPeek, // peek at incoming message (receive only)
38 kDontBlock // send/recv as much data as possible without blocking
39 };
40 */
41
42MTcpIpO::MTcpIpO(const char *addr, Int_t tx) : fPortTx(tx)
43{
44 // was "inf2" but nobody knows what the prg is doing if it times out
45 gLog << all << "- Open send socket to " << addr << ":" << tx << endl;
46
47 const Int_t save = gDebug;
48 gDebug=1;
49 fTxSocket = new TSocket(addr, tx);
50 gDebug=save;
51 fTxSocket->SetOption(kNoBlock, 1);
52
53 MTcpIpO::RunThread();
54}
55
56MTcpIpO::~MTcpIpO()
57{
58 CancelThread();
59
60 // Now delete all TCP/IP objects
61 delete fTxSocket;
62}
63
64MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx, UInt_t timeout) : MTcpIpI(rx, timeout), MTcpIpO(addr, tx)
65{
66 MTcpIpI::RunThread();
67}
68
69TString MTcpIpO::GetSocketAddress(const TSocket &s)
70{
71 if (!s.IsValid())
72 return "n/a";
73
74 const TInetAddress &a = s.GetInetAddress();
75 if (!a.IsValid())
76 return "undefined";
77
78 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
79}
80
81/*
82TString MTcpIpO::GetSocketAddress() const
83{
84 TLockGuard guard(const_cast<TMutex*>(&fMutex));
85 return GetSocketAddress(*fTxSocket);
86}
87*/
88bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
89{
90 if (!tx.IsValid())
91 return false;
92
93#ifdef DEBUG
94 cout << "Tx: " << msg << flush;
95#endif
96
97 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
98
99 // Frame not sent, EWOULDBLOCK
100 if (l==-4)
101 {
102 gLog << err << "ERROR - Sending to " << GetSocketAddress(tx) << " would block." << endl;
103/*--->*/ tx.Close(); // ?????
104 return false;
105 }
106
107 if (l<0)
108 {
109 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
110 return false;
111 }
112
113 if (l!=len)
114 {
115 gLog << err << "ERROR - Could only sent " << l << " out of " << len << " bytes to " << GetSocketAddress(tx) << endl;
116 return false;
117 }
118
119 return true;
120}
121
122bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
123{
124#ifdef DEBUG
125 cout << "Connecting to " << addr << ":" << port << endl;
126#endif
127
128 // FIXME: Set tx-socket to nonblocking?
129 TSocket tx(addr, port);
130 return SendFrame(tx, msg, len);
131}
132
133bool MTcpIpO::Send(const char *msg, Int_t len)
134{
135 const Int_t mtx = fMutex.TryLock();
136 if (mtx==13)
137 gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
138
139 // If Mutex cannot be locked, i.e. we are currently reopening
140 // the send socket we cannot wait, because we would block the
141 // executing threrad.
142 if (mtx)
143 return false;
144
145 const bool rc = SendFrame(*fTxSocket, msg, len);
146
147 fMutex.UnLock();
148
149 return rc;
150}
151
152Int_t MTcpIpO::Thread()
153{
154 const TInetAddress &a = fTxSocket->GetInetAddress();
155 if (!a.IsValid())
156 {
157 gLog << err << "ERROR: Send socket address invalid." << endl;
158 return 0;
159 }
160
161 Int_t wait = 1000; /// Wait a ms
162
163 while (!IsThreadCanceled())
164 {
165 if (fTxSocket->IsValid())
166 {
167 MThread::Sleep(1000); // Wait a ms
168 wait = 1000;
169 continue;
170 }
171
172
173#ifdef DEBUG
174 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
175#endif
176
177 fMutex.Lock();
178
179 delete fTxSocket;
180
181 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
182 fTxSocket->SetOption(kNoBlock, 1);
183
184 fMutex.UnLock();
185
186 if (fTxSocket->IsValid())
187 continue;
188
189 MThread::Sleep(wait); // Wait a ms
190 if (wait<3000000) // 3s
191 wait *= 2;
192 }
193
194 return 1;
195}
196
197Int_t MTcpIpOI::Thread()
198{
199 fConnectionEstablished = kFALSE;
200
201 const TInetAddress &a = fTxSocket->GetInetAddress();
202 if (!a.IsValid())
203 {
204 gLog << err << "- MTcpIpOI::Thread - ERROR: Send socket address invalid." << endl;
205 return 0;
206 }
207
208 gLog << inf << "- MTcpIpOI::Thread created connecting to " << a.GetHostAddress() << ":" << fPortTx << endl;
209
210 Int_t wait = 1000; /// Wait a ms
211
212 MTimeout timeout(fTimeout);
213
214 while (!IsThreadCanceled())
215 {
216 if (fTxSocket->IsValid())
217 {
218 fConnectionEstablished = kTRUE;
219 fTimeout = 1000;
220
221 // Get connection on port fPortRx and redirected
222 // Check for pending data (every ms)
223 switch (fTxSocket->Select(TSocket::kRead, 1))
224 {
225 case kTRUE: // Data pending... go on reading
226 if (!ReadSocket(*fTxSocket))
227 {
228 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(*fTxSocket) << endl;
229 fTxSocket->Close();
230 continue;
231 }
232 timeout.Start(fTimeout);
233 continue;
234
235 case kFALSE: // Time out, no data yet, go on waiting
236 if (timeout.HasTimedOut())
237 {
238 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(*fTxSocket) << " timed out after " << fTimeout << "ms." << endl;
239 fTxSocket->Close();
240 continue;
241 }
242 continue;
243
244 default: // Error occurance
245 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
246 fTxSocket->Close();
247 continue;
248 }
249 }
250
251 fConnectionEstablished = kFALSE;
252
253#ifdef DEBUG
254 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
255#endif
256
257 fMutex.Lock();
258
259 delete fTxSocket;
260
261 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
262 fTxSocket->SetOption(kNoBlock, 1);
263
264 fMutex.UnLock();
265
266 timeout.Start(fTimeout);
267
268 if (fTxSocket->IsValid())
269 continue;
270
271 MThread::Sleep(wait); // Wait a ms
272 if (wait<3000000) // 3s
273 wait *= 2;
274 }
275
276 return 1;
277}
278
279bool MTcpIpIO::InterpreteStr(TString str)
280{
281 cout << "Rx: " << str << flush;
282 return true;
283}
284
285Bool_t MTcpIpIO::ReadSocket(TSocket &rx)
286{
287 TString str;
288
289 while (!MTcpIpI::IsThreadCanceled())
290 {
291 char c;
292 const Int_t len = rx.RecvRaw(&c, 1);
293
294 // For details see TSocket::RecvRaw
295 // -1: // ERROR
296 // EINVAL, EWOULDBLOCK
297 // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer
298 // 0: Data received with zero length! (Connection lost/call interrupted)
299 if (len<=0)
300 return kFALSE;
301
302 // Data received
303 if (len>1)
304 {
305 cout << "Data too long!!!" << endl;
306 break;
307 }
308
309 // Data received (len==1)
310 if (c!='\n')
311 {
312 str += c;
313 continue;
314 }
315
316 // String completed
317 InterpreteStr(str);
318 str = "";
319 }
320
321 return kTRUE;
322}
323
324Bool_t MTcpIpI::WaitForData(TSocket &sock)
325{
326 // No connection established?
327 if (!sock.IsValid())
328 {
329 gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
330 return kFALSE;
331 }
332
333 fConnectionEstablished = kTRUE;
334
335 MTimeout timeout(fTimeout);
336
337 // Get connection on port fPortRx and redirected
338 while (!IsThreadCanceled())
339 {
340 // Check for pending data (every ms)
341 switch (sock.Select(TSocket::kRead, 1))
342 {
343 case kTRUE: // Data pending... go on reading
344 if (!ReadSocket(sock))
345 {
346 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
347 return kFALSE;
348 }
349 timeout.Start(fTimeout);
350
351 // Go on waiting for new data
352 continue;
353
354 case kFALSE: // Time out, no data yet, go on waiting
355 if (timeout.HasTimedOut())
356 {
357 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out after " << fTimeout << "ms." << endl;
358 return kFALSE;
359 }
360
361 // Go on waiting for new data
362 continue;
363
364 default: // Error occurance
365 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
366 return kFALSE;
367 }
368 }
369 return kTRUE; //???
370}
371
372void MTcpIpI::WaitForConnection(TServerSocket &server)
373{
374 while (!IsThreadCanceled())
375 {
376 gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl;
377
378 // Check for a connection request (reminder: we are in non-blocking mode)
379 TSocket *socket = 0;
380
381 while (!IsThreadCanceled())
382 {
383 //cout << (int) IsThreadCanceled() << endl;
384 // Wait for a new connection on RX port
385 socket = server.Accept();
386
387 // Accept returned an error
388 if (socket==0)
389 {
390 gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
391 // Since we don't know the type of the error we better shut down the socket
392 return;
393 }
394
395 // No connection request pending
396 if ((Long_t)socket<0)
397 {
398 MThread::Sleep(1000); // Wait a ms
399 continue;
400 }
401
402 // Connection established
403 break;
404 }
405
406 if ((Long_t)socket<=0)
407 return;
408
409 gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl;
410
411 if (!WaitForData(*socket))
412 fConnectionEstablished = kFALSE;
413
414#ifdef DEBUG
415 cout << "===> DEL SOCKET" << endl;
416#endif
417 delete socket;
418 }
419}
420
421Int_t MTcpIpI::Thread()
422{
423 gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
424
425 while (!IsThreadCanceled())
426 {
427 TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0);
428 server->SetOption(kNoBlock, 1);
429
430 while (!IsThreadCanceled() && server->IsValid())
431 WaitForConnection(*server);
432
433 if (!server->IsValid())
434 {
435 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
436 switch (server->GetErrorCode())
437 {
438 case 0: gLog << "No error." << endl; break;
439 case -1: gLog << "low level socket() call failed." << endl; break;
440 case -2: gLog << "low level bind() call failed." << endl; break;
441 case -3: gLog << "low level listen() call failed." << endl; break;
442 default: gLog << "Unknown." << endl; break;
443 }
444 }
445
446#ifdef DEBUG
447 cout << "===> DEL SERVER" << endl;
448#endif
449 delete server;
450
451 MThread::Sleep(5000000);
452 }
453
454 gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
455
456 return 0;
457}
458
459/*
460Int_t MTcpIpI::Thread()
461{
462 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
463
464// if (fPortRx==7404)
465// {
466// gLog << err << "CeCo communication skipped." << endl;
467// return 0;
468// }
469
470 TServerSocket *fServSock=NULL;
471 TSocket *fRxSocket=NULL;
472
473// while (!HasStopFlag())
474 while (!IsThreadCanceled())
475 {
476 fServSock=new TServerSocket(fPortRx, kTRUE);
477 if (!fServSock->IsValid())
478 {
479 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
480 switch (fServSock->GetErrorCode())
481 {
482 case 0: gLog << "No error." << endl; break;
483 case -1: gLog << "low level socket() call failed." << endl; break;
484 case -2: gLog << "low level bind() call failed." << endl; break;
485 case -3: gLog << "low level listen() call failed." << endl; break;
486 default: gLog << "Unknown." << endl; break;
487 }
488 delete fServSock;
489 fServSock=NULL;
490 MThread::Sleep(5000000);
491 continue;
492 }
493
494 fServSock->SetOption(kNoBlock, 1);
495
496 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
497// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
498 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
499 {
500 //TThread::CancelPoint();
501
502 fRxSocket = fServSock->Accept();
503 if (fRxSocket==0)
504 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
505 usleep(10);
506 }
507
508 // Can happen in case of HasStopFlag()
509 if (fRxSocket==(void*)-1)
510 fRxSocket=NULL;
511
512 if (fRxSocket==NULL)
513 {
514 delete fServSock;
515 fServSock=NULL;
516 continue;
517 }
518
519 if (!fRxSocket->IsValid())
520 {
521 cout << "TSocket invalid on port " << fPortRx << "." << endl;
522 delete fServSock;
523 delete fRxSocket;
524 fServSock = NULL;
525 fRxSocket = NULL;
526 continue;
527 }
528
529 gLog << all << "Connection established on port " << fPortRx << "." << endl;
530
531
532 //fRxSocket->SetOption(kNoBlock, 1);
533
534 // Waqit for data
535 while (!IsThreadCanceled())
536 {
537 switch (fRxSocket->Select(kRead, 1))
538 {
539 case kTRUE: // Data waiting to be read
540 break;
541 case kFALSE: // time out
542 usleep(10);
543 continue;
544 }
545
546 // ERROR
547 cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
548
549 delete fServSock;
550 delete fRxSocket;
551 fServSock = NULL;
552 fRxSocket = NULL;
553 break;
554
555
556 if (!fServSock)
557 continue;
558
559 if (IsThreadCanceled())
560 {
561 delete fServSock;
562 delete fRxSocket;
563 fServSock = NULL;
564 fRxSocket = NULL;
565 continue;
566 }
567
568 // ------ IDENTICAL UP TO HERE ------
569
570 // Read and evaluate data
571 ReadSocket(*fRxSocket);
572
573 delete fServSock;
574 delete fRxSocket;
575 fServSock = NULL;
576 fRxSocket = NULL;
577 }
578
579 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
580
581 return 0;
582// return NULL;
583}*/
Note: See TracBrowser for help on using the repository browser.