1 | #include "MTcpIpIO.h"
2 |
3 | #include <unistd.h> // usleep
4 |
5 | #include <errno.h>
6 |
7 | #include <TSocket.h>
8 | #include <TServerSocket.h>
9 |
10 | #include "MLog.h"
11 | #include "MLogManip.h"
12 |
13 | #include "MString.h"
14 | #include "MTimeout.h"
15 |
16 | #undef DEBUG
17 |
18 | using namespace std;
19 |
20 | /*
21 | enum ESockOptions {
22 | kSendBuffer, // size of send buffer
23 | kRecvBuffer, // size of receive buffer
24 | kOobInline, // OOB message inline
25 | kKeepAlive, // keep socket alive
26 | kReuseAddr, // allow reuse of local portion of address 5-tuple
27 | kNoDelay, // send without delay
28 | kNoBlock, // non-blocking I/O
29 | kProcessGroup, // socket process group (used for SIGURG and SIGIO)
30 | kAtMark, // are we at out-of-band mark (read only)
31 | kBytesToRead // get number of bytes to read, FIONREAD (read only)
32 | };
33 |
34 | enum ESendRecvOptions {
35 | kDefault, // default option (= 0)
36 | kOob, // send or receive out-of-band data
37 | kPeek, // peek at incoming message (receive only)
38 | kDontBlock // send/recv as much data as possible without blocking
39 | };
40 | */
41 |
42 | MTcpIpO::MTcpIpO(const char *addr, Int_t tx) : fPortTx(tx)
43 | {
44 | // was "inf2" but nobody knows what the prg is doing if it times out
45 | gLog << all << "- Open send socket to " << addr << ":" << tx << endl;
46 |
47 | const Int_t save = gDebug;
48 | gDebug=1;
49 | fTxSocket = new TSocket(addr, tx);
50 | gDebug=save;
51 | fTxSocket->SetOption(kNoBlock, 1);
52 |
53 | MTcpIpO::RunThread();
54 | }
55 |
56 | MTcpIpO::~MTcpIpO()
57 | {
58 | CancelThread();
59 |
60 | // Now delete all TCP/IP objects
61 | delete fTxSocket;
62 | }
63 |
64 | MTcpIpCC::MTcpIpCC(Int_t rx, UInt_t timeout) : MTcpIpI(rx, timeout)
65 | {
66 | MTcpIpI::RunThread();
67 | }
68 |
69 | MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx, UInt_t timeout) : MTcpIpCC(rx, timeout), MTcpIpO(addr, tx)
70 | {
71 | }
72 |
73 | TString MTcpIpO::GetSocketAddress(const TSocket &s)
74 | {
75 | if (!s.IsValid())
76 | return "n/a";
77 |
78 | const TInetAddress &a = s.GetInetAddress();
79 | if (!a.IsValid())
80 | return "undefined";
81 |
82 | return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
83 | }
84 |
85 | /*
86 | TString MTcpIpO::GetSocketAddress() const
87 | {
88 | TLockGuard guard(const_cast<TMutex*>(&fMutex));
89 | return GetSocketAddress(*fTxSocket);
90 | }
91 | */
92 | bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
93 | {
94 | if (!tx.IsValid())
95 | return false;
96 |
97 | #ifdef DEBUG
98 | cout << "Tx: " << msg << flush;
99 | #endif
100 |
101 | const Int_t l = tx.SendRaw(msg, len, kDontBlock);
102 |
103 | // Frame not sent, EWOULDBLOCK
104 | if (l==-4)
105 | {
106 | gLog << err << "ERROR - Sending to " << GetSocketAddress(tx) << " would block." << endl;
107 | /*--->*/ tx.Close(); // ?????
108 | return false;
109 | }
110 |
111 | if (l<0)
112 | {
113 | gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
114 | return false;
115 | }
116 |
117 | if (l!=len)
118 | {
119 | gLog << err << "ERROR - Could only sent " << l << " out of " << len << " bytes to " << GetSocketAddress(tx) << endl;
120 | return false;
121 | }
122 |
123 | return true;
124 | }
125 |
126 | bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
127 | {
128 | #ifdef DEBUG
129 | cout << "Connecting to " << addr << ":" << port << endl;
130 | #endif
131 |
132 | // FIXME: Set tx-socket to nonblocking?
133 | TSocket tx(addr, port);
134 | return SendFrame(tx, msg, len);
135 | }
136 |
137 | bool MTcpIpO::Send(const char *msg, Int_t len)
138 | {
139 | const Int_t mtx = fMutex.TryLock();
140 | if (mtx==13)
141 | gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
142 |
143 | // If Mutex cannot be locked, i.e. we are currently reopening
144 | // the send socket we cannot wait, because we would block the
145 | // executing threrad.
146 | if (mtx)
147 | return false;
148 |
149 | const bool rc = SendFrame(*fTxSocket, msg, len);
150 |
151 | fMutex.UnLock();
152 |
153 | return rc;
154 | }
155 |
156 | Int_t MTcpIpO::Thread()
157 | {
158 | const TInetAddress &a = fTxSocket->GetInetAddress();
159 | if (!a.IsValid())
160 | {
161 | gLog << err << "ERROR: Send socket address invalid." << endl;
162 | return 0;
163 | }
164 |
165 | Int_t wait = 1000; /// Wait a ms
166 |
167 | while (!IsThreadCanceled())
168 | {
169 | if (fTxSocket->IsValid())
170 | {
171 | MThread::Sleep(1000); // Wait a ms
172 | wait = 1000;
173 | continue;
174 | }
175 |
176 |
177 | #ifdef DEBUG
178 | cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
179 | #endif
180 |
181 | fMutex.Lock();
182 |
183 | delete fTxSocket;
184 |
185 | fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
186 | fTxSocket->SetOption(kNoBlock, 1);
187 |
188 | fMutex.UnLock();
189 |
190 | if (fTxSocket->IsValid())
191 | continue;
192 |
193 | MThread::Sleep(wait); // Wait a ms
194 | if (wait<3000000) // 3s
195 | wait *= 2;
196 | }
197 |
198 | return 1;
199 | }
200 |
201 | Int_t MTcpIpOI::Thread()
202 | {
203 | fConnectionEstablished = kFALSE;
204 |
205 | const TInetAddress &a = fTxSocket->GetInetAddress();
206 | if (!a.IsValid())
207 | {
208 | gLog << err << "- MTcpIpOI::Thread - ERROR: Send socket address invalid." << endl;
209 | return 0;
210 | }
211 |
212 | gLog << inf << "- MTcpIpOI::Thread created connecting to " << a.GetHostAddress() << ":" << fPortTx << endl;
213 |
214 | Int_t wait = 1000; /// Wait a ms
215 |
216 | MTimeout timeout(fTimeout);
217 |
218 | while (!IsThreadCanceled())
219 | {
220 | if (fTxSocket->IsValid())
221 | {
222 | fConnectionEstablished = kTRUE;
223 | fTimeout = 1000;
224 |
225 | // Get connection on port fPortRx and redirected
226 | // Check for pending data (every ms)
227 | switch (fTxSocket->Select(TSocket::kRead, 1))
228 | {
229 | case kTRUE: // Data pending... go on reading
230 | if (!ReadSocket(*fTxSocket))
231 | {
232 | gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(*fTxSocket) << endl;
233 | fTxSocket->Close();
234 | continue;
235 | }
236 | timeout.Start(fTimeout);
237 | continue;
238 |
239 | case kFALSE: // Time out, no data yet, go on waiting
240 | if (timeout.HasTimedOut())
241 | {
242 | gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(*fTxSocket) << " timed out after " << fTimeout << "ms." << endl;
243 | fTxSocket->Close();
244 | continue;
245 | }
246 | continue;
247 |
248 | default: // Error occurance
249 | gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
250 | fTxSocket->Close();
251 | continue;
252 | }
253 | }
254 |
255 | fConnectionEstablished = kFALSE;
256 |
257 | #ifdef DEBUG
258 | cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
259 | #endif
260 |
261 | fMutex.Lock();
262 |
263 | delete fTxSocket;
264 |
265 | fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
266 | fTxSocket->SetOption(kNoBlock, 1);
267 |
268 | fMutex.UnLock();
269 |
270 | timeout.Start(fTimeout);
271 |
272 | if (fTxSocket->IsValid())
273 | continue;
274 |
275 | MThread::Sleep(wait); // Wait a ms
276 | if (wait<3000000) // 3s
277 | wait *= 2;
278 | }
279 |
280 | return 1;
281 | }
282 |
283 | bool MTcpIpCC::InterpreteStr(TString str)
284 | {
285 | cout << "Rx: " << str << flush;
286 | return true;
287 | }
288 |
289 | Bool_t MTcpIpCC::ReadSocket(TSocket &rx)
290 | {
291 | TString str;
292 |
293 | while (!MTcpIpI::IsThreadCanceled())
294 | {
295 | char c;
296 | const Int_t len = rx.RecvRaw(&c, 1);
297 |
298 | // For details see TSocket::RecvRaw
299 | // -1: // ERROR
301 | // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer
302 | // 0: Data received with zero length! (Connection lost/call interrupted)
303 | if (len<=0)
304 | return kFALSE;
305 |
306 | // Data received
307 | if (len>1)
308 | {
309 | cout << "Data too long!!!" << endl;
310 | break;
311 | }
312 |
313 | // Data received (len==1)
314 | if (c!='\n' && c!=0)
315 | {
316 | str += c;
317 | continue;
318 | }
319 |
320 | // String completed
321 | InterpreteStr(str);
322 | str = "";
323 | }
324 |
325 | return kTRUE;
326 | }
327 |
328 | Bool_t MTcpIpI::WaitForData(TSocket &sock)
329 | {
330 | // No connection established?
331 | if (!sock.IsValid())
332 | {
333 | gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
334 | return kFALSE;
335 | }
336 |
337 | fRxMutex.Lock();
338 |
339 | fConnectionEstablished = kTRUE;
340 | fRxSocket = &sock;
341 |
342 | fRxMutex.UnLock();
343 |
344 | MTimeout timeout(fTimeout);
345 |
346 | // Get connection on port fPortRx and redirected
347 | while (!IsThreadCanceled())
348 | {
349 | // Check for pending data (every ms)
350 | switch (sock.Select(TSocket::kRead, 1))
351 | {
352 | case kTRUE: // Data pending... go on reading
353 | if (!ReadSocket(sock))
354 | {
355 | gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
356 | return kFALSE;
357 | }
358 | timeout.Start(fTimeout);
359 |
360 | // Go on waiting for new data
361 | continue;
362 |
363 | case kFALSE: // Time out, no data yet, go on waiting
364 | if (timeout.HasTimedOut())
365 | {
366 | gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out after " << fTimeout << "ms." << endl;
367 | return kFALSE;
368 | }
369 |
370 | // Go on waiting for new data
371 | continue;
372 |
373 | default: // Error occurance
374 | gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
375 | return kFALSE;
376 | }
377 | }
378 | return kTRUE; //???
379 | }
380 |
381 | void MTcpIpI::WaitForConnection(TServerSocket &server)
382 | {
383 | while (!IsThreadCanceled())
384 | {
385 | gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl;
386 |
387 | // Check for a connection request (reminder: we are in non-blocking mode)
388 | TSocket *socket = 0;
389 |
390 | while (!IsThreadCanceled())
391 | {
392 | //cout << (int) IsThreadCanceled() << endl;
393 | // Wait for a new connection on RX port
394 | socket = server.Accept();
395 |
396 | // Accept returned an error
397 | if (socket==0)
398 | {
399 | gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
400 | // Since we don't know the type of the error we better shut down the socket
401 | return;
402 | }
403 |
404 | // No connection request pending
405 | if ((Long_t)socket<0)
406 | {
407 | MThread::Sleep(1000); // Wait a ms
408 | continue;
409 | }
410 |
411 | // Connection established
412 | break;
413 | }
414 |
415 | if ((Long_t)socket<=0)
416 | return;
417 |
418 | gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl;
419 |
420 | WaitForData(*socket);
421 |
422 | fRxMutex.Lock();
423 |
424 | fRxSocket = 0;
425 | fConnectionEstablished = kFALSE;
426 |
427 | fRxMutex.UnLock();
428 |
429 | #ifdef DEBUG
430 | cout << "===> DEL SOCKET" << endl;
431 | #endif
432 | delete socket;
433 | }
434 | }
435 |
436 | Int_t MTcpIpI::Thread()
437 | {
438 | gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
439 |
440 | while (!IsThreadCanceled())
441 | {
442 | TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0);
443 | server->SetOption(kNoBlock, 1);
444 |
445 | while (!IsThreadCanceled() && server->IsValid())
446 | WaitForConnection(*server);
447 |
448 | if (!server->IsValid())
449 | {
450 | gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
451 | switch (server->GetErrorCode())
452 | {
453 | case 0: gLog << "No error." << endl; break;
454 | case -1: gLog << "low level socket() call failed." << endl; break;
455 | case -2: gLog << "low level bind() call failed." << endl; break;
456 | case -3: gLog << "low level listen() call failed." << endl; break;
457 | default: gLog << "Unknown." << endl; break;
458 | }
459 | }
460 |
461 | #ifdef DEBUG
462 | cout << "===> DEL SERVER" << endl;
463 | #endif
464 | delete server;
465 |
466 | MThread::Sleep(5000000);
467 | }
468 |
469 | gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
470 |
471 | return 0;
472 | }
473 |
474 | bool MTcpIpFact::Send(const char *msg, int len)
475 | {
476 | const Int_t mtx = fRxMutex.TryLock();
477 | if (mtx==13)
478 | gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
479 |
480 | // If Mutex cannot be locked, i.e. we are currently reopening
481 | // the send socket we cannot wait, because we would block the
482 | // executing threrad.
483 | if (mtx)
484 | return false;
485 |
486 | const bool rc = fRxSocket ? MTcpIpO::SendFrame(*fRxSocket, msg, len) : false;
487 |
488 | fRxMutex.UnLock();
489 |
490 | return rc;
491 | }
492 |
493 | /*
494 | Int_t MTcpIpI::Thread()
495 | {
496 | gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
497 |
498 | // if (fPortRx==7404)
499 | // {
500 | // gLog << err << "CeCo communication skipped." << endl;
501 | // return 0;
502 | // }
503 |
504 | TServerSocket *fServSock=NULL;
505 | TSocket *fRxSocket=NULL;
506 |
507 | // while (!HasStopFlag())
508 | while (!IsThreadCanceled())
509 | {
510 | fServSock=new TServerSocket(fPortRx, kTRUE);
511 | if (!fServSock->IsValid())
512 | {
513 | gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
514 | switch (fServSock->GetErrorCode())
515 | {
516 | case 0: gLog << "No error." << endl; break;
517 | case -1: gLog << "low level socket() call failed." << endl; break;
518 | case -2: gLog << "low level bind() call failed." << endl; break;
519 | case -3: gLog << "low level listen() call failed." << endl; break;
520 | default: gLog << "Unknown." << endl; break;
521 | }
522 | delete fServSock;
523 | fServSock=NULL;
524 | MThread::Sleep(5000000);
525 | continue;
526 | }
527 |
528 | fServSock->SetOption(kNoBlock, 1);
529 |
530 | gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
531 | // while (!HasStopFlag() && (Long_t)fRxSocket<=0)
532 | while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
533 | {
534 | //TThread::CancelPoint();
535 |
536 | fRxSocket = fServSock->Accept();
537 | if (fRxSocket==0)
538 | cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
539 | usleep(10);
540 | }
541 |
542 | // Can happen in case of HasStopFlag()
543 | if (fRxSocket==(void*)-1)
544 | fRxSocket=NULL;
545 |
546 | if (fRxSocket==NULL)
547 | {
548 | delete fServSock;
549 | fServSock=NULL;
550 | continue;
551 | }
552 |
553 | if (!fRxSocket->IsValid())
554 | {
555 | cout << "TSocket invalid on port " << fPortRx << "." << endl;
556 | delete fServSock;
557 | delete fRxSocket;
558 | fServSock = NULL;
559 | fRxSocket = NULL;
560 | continue;
561 | }
562 |
563 | gLog << all << "Connection established on port " << fPortRx << "." << endl;
564 |
565 |
566 | //fRxSocket->SetOption(kNoBlock, 1);
567 |
568 | // Waqit for data
569 | while (!IsThreadCanceled())
570 | {
571 | switch (fRxSocket->Select(kRead, 1))
572 | {
573 | case kTRUE: // Data waiting to be read
574 | break;
575 | case kFALSE: // time out
576 | usleep(10);
577 | continue;
578 | }
579 |
580 | // ERROR
581 | cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
582 |
583 | delete fServSock;
584 | delete fRxSocket;
585 | fServSock = NULL;
586 | fRxSocket = NULL;
587 | break;
588 | }ยจ
589 |
590 | if (!fServSock)
591 | continue;
592 |
593 | if (IsThreadCanceled())
594 | {
595 | delete fServSock;
596 | delete fRxSocket;
597 | fServSock = NULL;
598 | fRxSocket = NULL;
599 | continue;
600 | }
601 |
602 | // ------ IDENTICAL UP TO HERE ------
603 |
604 | // Read and evaluate data
605 | ReadSocket(*fRxSocket);
606 |
607 | delete fServSock;
608 | delete fRxSocket;
609 | fServSock = NULL;
610 | fRxSocket = NULL;
611 | }
612 |
613 | gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
614 |
615 | return 0;
616 | // return NULL;
617 | }*/