source: trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc@ 9443

Last change on this file since 9443 was 9443, checked in by tbretz, 15 years ago
*** empty log message ***
File size: 12.7 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <errno.h>
6
7#include <TSocket.h>
8#include <TServerSocket.h>
9
10#include "MLog.h"
11#include "MLogManip.h"
12
13#include "MString.h"
14#include "MTimeout.h"
15
16#undef DEBUG
17
18using namespace std;
19
20 /*
21 enum ESockOptions {
22 kSendBuffer, // size of send buffer
23 kRecvBuffer, // size of receive buffer
24 kOobInline, // OOB message inline
25 kKeepAlive, // keep socket alive
26 kReuseAddr, // allow reuse of local portion of address 5-tuple
27 kNoDelay, // send without delay
28 kNoBlock, // non-blocking I/O
29 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
30 kAtMark, // are we at out-of-band mark (read only)
31 kBytesToRead // get number of bytes to read, FIONREAD (read only)
32 };
33
34 enum ESendRecvOptions {
35 kDefault, // default option (= 0)
36 kOob, // send or receive out-of-band data
37 kPeek, // peek at incoming message (receive only)
38 kDontBlock // send/recv as much data as possible without blocking
39 };
40 */
41
42MTcpIpO::MTcpIpO(const char *addr, Int_t tx) : fPortTx(tx)
43{
44 // was "inf2" but nobody knows what the prg is doing if it times out
45 gLog << all << "- Open send socket to " << addr << ":" << tx << endl;
46
47 const Int_t save = gDebug;
48 gDebug=1;
49 fTxSocket = new TSocket(addr, tx);
50 gDebug=save;
51 fTxSocket->SetOption(kNoBlock, 1);
52
53 MTcpIpO::RunThread();
54}
55
56MTcpIpO::~MTcpIpO()
57{
58 CancelThread();
59
60 // Now delete all TCP/IP objects
61 delete fTxSocket;
62}
63
64MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(addr, tx)
65{
66 MTcpIpI::RunThread();
67}
68
69MTcpIpIO::~MTcpIpIO()
70{
71}
72
73TString MTcpIpO::GetSocketAddress(const TSocket &s)
74{
75 if (!s.IsValid())
76 return "n/a";
77
78 const TInetAddress &a = s.GetInetAddress();
79 if (!a.IsValid())
80 return "undefined";
81
82 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
83}
84
85/*
86TString MTcpIpO::GetSocketAddress() const
87{
88 TLockGuard guard(const_cast<TMutex*>(&fMutex));
89 return GetSocketAddress(*fTxSocket);
90}
91*/
92bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
93{
94 if (!tx.IsValid())
95 return false;
96
97#ifdef DEBUG
98 cout << "Tx: " << msg << flush;
99#endif
100
101 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
102
103 // Frame not sent, EWOULDBLOCK
104 if (l==-4)
105 {
106 gLog << err << "ERROR - Sending to " << GetSocketAddress(tx) << " would block." << endl;
107/*--->*/ tx.Close(); // ?????
108 return false;
109 }
110
111 if (l<0)
112 {
113 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
114 return false;
115 }
116
117 if (l!=len)
118 {
119 gLog << err << "ERROR - Could only sent " << l << " out of " << len << " bytes to " << GetSocketAddress(tx) << endl;
120 return false;
121 }
122
123 return true;
124}
125
126bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
127{
128#ifdef DEBUG
129 cout << "Connecting to " << addr << ":" << port << endl;
130#endif
131
132 // FIXME: Set tx-socket to nonblocking?
133 TSocket tx(addr, port);
134 return SendFrame(tx, msg, len);
135}
136
137bool MTcpIpO::Send(const char *msg, Int_t len)
138{
139 const Int_t mtx = fMutex.TryLock();
140 if (mtx==13)
141 gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl;
142
143 // If Mutex cannot be locked, i.e. we are currently reopening
144 // the send socket we cannot wait, because we would block the
145 // executing threrad.
146 if (mtx)
147 return false;
148
149 const bool rc = SendFrame(*fTxSocket, msg, len);
150
151 fMutex.UnLock();
152
153 return rc;
154}
155
156Int_t MTcpIpO::Thread()
157{
158 const TInetAddress &a = fTxSocket->GetInetAddress();
159 if (!a.IsValid())
160 {
161 gLog << err << "ERROR: Send socket address invalid." << endl;
162 return 0;
163 }
164
165 Int_t wait = 1000; /// Wait a ms
166
167 while (!IsThreadCanceled())
168 {
169 if (fTxSocket->IsValid())
170 {
171 MThread::Sleep(1000); // Wait a ms
172 wait = 1000;
173 continue;
174 }
175
176
177#ifdef DEBUG
178 cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl;
179#endif
180
181 fMutex.Lock();
182
183 delete fTxSocket;
184
185 fTxSocket = new TSocket(a.GetHostAddress(), fPortTx);
186 fTxSocket->SetOption(kNoBlock, 1);
187
188 fMutex.UnLock();
189
190 if (fTxSocket->IsValid())
191 continue;
192
193 MThread::Sleep(wait); // Wait a ms
194 if (wait<30000000) // 30s
195 wait *= 2;
196 }
197
198 return 1;
199}
200
201bool MTcpIpIO::InterpreteStr(TString str)
202{
203 cout << "Rx: " << str << flush;
204 return true;
205}
206
207Bool_t MTcpIpIO::ReadSocket(TSocket &rx)
208{
209 TString str;
210
211 while (!MTcpIpI::IsThreadCanceled())
212 {
213 char c;
214 const Int_t len = rx.RecvRaw(&c, 1);
215
216 // For details see TSocket::RecvRaw
217 // -1: // ERROR
218 // EINVAL, EWOULDBLOCK
219 // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer
220 // 0: Data received with zero length! (Connection lost/call interrupted)
221 if (len<=0)
222 return kFALSE;
223
224 // Data received
225 if (len>1)
226 {
227 cout << "Data too long!!!" << endl;
228 break;
229 }
230
231 // Data received (len==1)
232 if (c!='\n')
233 {
234 str += c;
235 continue;
236 }
237
238 // String completed
239 InterpreteStr(str);
240 str = "";
241 }
242
243 return kTRUE;
244}
245
246Bool_t MTcpIpI::WaitForData(TSocket &sock)
247{
248 // No connection established?
249 if (!sock.IsValid())
250 {
251 gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
252 return kFALSE;
253 }
254
255 fConnectionEstablished = kTRUE;
256
257 MTimeout timeout(fTimeout);
258
259 // Get connection on port fPortRx and redirected
260 while (!IsThreadCanceled())
261 {
262 // Check for pending data (every ms)
263 switch (sock.Select(TSocket::kRead, 1))
264 {
265 case kTRUE: // Data pending... go on reading
266 if (!ReadSocket(sock))
267 {
268 gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
269 return kFALSE;
270 }
271 timeout.Start(fTimeout);
272
273 // Go on waiting for new data
274 continue;
275
276 case kFALSE: // Time out, no data yet, go on waiting
277 if (timeout.HasTimedOut())
278 {
279 gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out after " << fTimeout << "ms." << endl;
280 return kFALSE;
281 }
282
283 // Go on waiting for new data
284 continue;
285
286 default: // Error occurance
287 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
288 return kFALSE;
289 }
290 }
291 return kTRUE; //???
292}
293
294void MTcpIpI::WaitForConnection(TServerSocket &server)
295{
296 while (!IsThreadCanceled())
297 {
298 gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl;
299
300 // Check for a connection request (reminder: we are in non-blocking mode)
301 TSocket *socket = 0;
302
303 while (!IsThreadCanceled())
304 {
305 //cout << (int) IsThreadCanceled() << endl;
306 // Wait for a new connection on RX port
307 socket = server.Accept();
308
309 // Accept returned an error
310 if (socket==0)
311 {
312 gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
313 // Since we don't know the type of the error we better shut down the socket
314 return;
315 }
316
317 // No connection request pending
318 if ((Long_t)socket<0)
319 {
320 MThread::Sleep(1000); // Wait a ms
321 continue;
322 }
323
324 // Connection established
325 break;
326 }
327
328 if ((Long_t)socket<=0)
329 return;
330
331 gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl;
332
333 if (!WaitForData(*socket))
334 fConnectionEstablished = kFALSE;
335
336#ifdef DEBUG
337 cout << "===> DEL SOCKET" << endl;
338#endif
339 delete socket;
340 }
341}
342
343Int_t MTcpIpI::Thread()
344{
345 gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
346
347 while (!IsThreadCanceled())
348 {
349 TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0);
350 server->SetOption(kNoBlock, 1);
351
352 while (!IsThreadCanceled() && server->IsValid())
353 WaitForConnection(*server);
354
355 if (!server->IsValid())
356 {
357 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
358 switch (server->GetErrorCode())
359 {
360 case 0: gLog << "No error." << endl; break;
361 case -1: gLog << "low level socket() call failed." << endl; break;
362 case -2: gLog << "low level bind() call failed." << endl; break;
363 case -3: gLog << "low level listen() call failed." << endl; break;
364 default: gLog << "Unknown." << endl; break;
365 }
366 }
367
368#ifdef DEBUG
369 cout << "===> DEL SERVER" << endl;
370#endif
371 delete server;
372
373 MThread::Sleep(5000000);
374 }
375
376 gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
377
378 return 0;
379}
380
381/*
382Int_t MTcpIpI::Thread()
383{
384 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
385
386// if (fPortRx==7404)
387// {
388// gLog << err << "CeCo communication skipped." << endl;
389// return 0;
390// }
391
392 TServerSocket *fServSock=NULL;
393 TSocket *fRxSocket=NULL;
394
395// while (!HasStopFlag())
396 while (!IsThreadCanceled())
397 {
398 fServSock=new TServerSocket(fPortRx, kTRUE);
399 if (!fServSock->IsValid())
400 {
401 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
402 switch (fServSock->GetErrorCode())
403 {
404 case 0: gLog << "No error." << endl; break;
405 case -1: gLog << "low level socket() call failed." << endl; break;
406 case -2: gLog << "low level bind() call failed." << endl; break;
407 case -3: gLog << "low level listen() call failed." << endl; break;
408 default: gLog << "Unknown." << endl; break;
409 }
410 delete fServSock;
411 fServSock=NULL;
412 MThread::Sleep(5000000);
413 continue;
414 }
415
416 fServSock->SetOption(kNoBlock, 1);
417
418 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
419// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
420 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
421 {
422 //TThread::CancelPoint();
423
424 fRxSocket = fServSock->Accept();
425 if (fRxSocket==0)
426 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
427 usleep(10);
428 }
429
430 // Can happen in case of HasStopFlag()
431 if (fRxSocket==(void*)-1)
432 fRxSocket=NULL;
433
434 if (fRxSocket==NULL)
435 {
436 delete fServSock;
437 fServSock=NULL;
438 continue;
439 }
440
441 if (!fRxSocket->IsValid())
442 {
443 cout << "TSocket invalid on port " << fPortRx << "." << endl;
444 delete fServSock;
445 delete fRxSocket;
446 fServSock = NULL;
447 fRxSocket = NULL;
448 continue;
449 }
450
451 gLog << all << "Connection established on port " << fPortRx << "." << endl;
452
453
454 //fRxSocket->SetOption(kNoBlock, 1);
455
456 // Waqit for data
457 while (!IsThreadCanceled())
458 {
459 switch (fRxSocket->Select(kRead, 1))
460 {
461 case kTRUE: // Data waiting to be read
462 break;
463 case kFALSE: // time out
464 usleep(10);
465 continue;
466 }
467
468 // ERROR
469 cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
470
471 delete fServSock;
472 delete fRxSocket;
473 fServSock = NULL;
474 fRxSocket = NULL;
475 break;
476
477
478 if (!fServSock)
479 continue;
480
481 if (IsThreadCanceled())
482 {
483 delete fServSock;
484 delete fRxSocket;
485 fServSock = NULL;
486 fRxSocket = NULL;
487 continue;
488 }
489
490 // ------ IDENTICAL UP TO HERE ------
491
492 // Read and evaluate data
493 ReadSocket(*fRxSocket);
494
495 delete fServSock;
496 delete fRxSocket;
497 fServSock = NULL;
498 fRxSocket = NULL;
499 }
500
501 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
502
503 return 0;
504// return NULL;
505}*/
Note: See TracBrowser for help on using the repository browser.