source: trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc@ 8864

Last change on this file since 8864 was 8864, checked in by tbretz, 17 years ago
*** empty log message ***
File size: 10.9 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <errno.h>
6
7#include <TSocket.h>
8#include <TServerSocket.h>
9
10#include "MLog.h"
11#include "MLogManip.h"
12
13#include "MString.h"
14
15#undef DEBUG
16
17using namespace std;
18
19 /*
20 enum ESockOptions {
21 kSendBuffer, // size of send buffer
22 kRecvBuffer, // size of receive buffer
23 kOobInline, // OOB message inline
24 kKeepAlive, // keep socket alive
25 kReuseAddr, // allow reuse of local portion of address 5-tuple
26 kNoDelay, // send without delay
27 kNoBlock, // non-blocking I/O
28 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
29 kAtMark, // are we at out-of-band mark (read only)
30 kBytesToRead // get number of bytes to read, FIONREAD (read only)
31 };
32
33 enum ESendRecvOptions {
34 kDefault, // default option (= 0)
35 kOob, // send or receive out-of-band data
36 kPeek, // peek at incoming message (receive only)
37 kDontBlock // send/recv as much data as possible without blocking
38 };
39 */
40
41MTcpIpO::MTcpIpO(const char *addr, Int_t tx)
42{
43 fTxSocket = new TSocket(addr, tx);
44}
45
46MTcpIpO::~MTcpIpO()
47{
48 // Now delete all TCP/IP objects
49 delete fTxSocket;
50}
51
52MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(addr, tx)
53{
54 RunThread();
55}
56
57MTcpIpIO::~MTcpIpIO()
58{
59}
60
61TString MTcpIpO::GetSocketAddress(const TSocket &s)
62{
63 if (!s.IsValid())
64 return "n/a";
65
66 const TInetAddress &a = s.GetInetAddress();
67 if (!a.IsValid())
68 return "undefined";
69
70 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
71}
72
73TString MTcpIpO::GetSocketAddress() const
74{
75 return GetSocketAddress(*fTxSocket);
76}
77
78bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
79{
80 if (!tx.IsValid())
81 {
82 //gLog << warn << "WARNING - No transmission to " << GetSocketAddress(tx) << " possible." << endl;
83 return false;
84 }
85
86 const Int_t l = tx.SendRaw(msg, len);
87 if (l<0)
88 {
89 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
90 return false;
91 }
92
93 if (l!=len)
94 {
95 gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << GetSocketAddress(tx) << endl;
96 return false;
97 }
98
99#ifdef DEBUG
100 cout << "Tx: " << msg << flush;
101#endif
102
103 return true;
104}
105
106bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
107{
108 // R__LOCKGUARD2(myMutex);
109#ifdef DEBUG
110 cout << "Connecting to " << addr << ":" << port << endl;
111#endif
112
113 // FIXME: Set tx-socket to nonblocking?
114 TSocket tx(addr, port);
115 return SendFrame(tx, msg, len);
116/*
117 if (!tx.IsValid())
118 {
119 gLog << warn << "WARNING - No transmission to " << addr << ":" << port << " possible." << endl;
120 return false;
121 }
122
123 gLog << dbg << "Sending to " << addr << ":" << port << endl;
124
125 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
126 if (l<0)
127 {
128 gLog << err << "ERROR - Sending TCP/IP frame to " << addr << ":" << port << endl;
129 return false;
130 }
131
132 if (l!=len)
133 {
134 gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << addr << ":" << port << endl;
135 return false;
136 }
137
138#ifdef DEBUG
139 cout << "Tx: " << msg << flush;
140#endif
141
142 return true;
143 */
144}
145
146bool MTcpIpO::Send(const char *msg, Int_t len)
147{
148 return SendFrame(*fTxSocket, msg, len);
149}
150
151bool MTcpIpIO::InterpreteStr(TString str)
152{
153 cout << "Rx: " << str << flush;
154 return true;
155}
156
157Bool_t MTcpIpIO::ReadSocket(TSocket &rx)
158{
159 TString str;
160
161 while (!IsThreadCanceled())
162 {
163 char c;
164 const Int_t len = rx.RecvRaw(&c, 1);
165
166 // No data received (non-blocking mode)
167 if (len<0)
168 {
169 usleep(1);
170 continue;
171 }
172
173 // Data received with zero length! (Connection lost)
174 if (len==0)
175 return kFALSE; // This break is for TEST PURPOSE FIXME!!!
176
177 // Data received
178 if (len>1)
179 {
180 cout << "Data too long!!!" << endl;
181 break;
182 }
183
184 // Data received (len==1)
185 if (c!='\n')
186 {
187 str += c;
188 continue;
189 }
190
191 // String completed
192 InterpreteStr(str);
193 str = "";
194 }
195
196 return kTRUE;
197}
198
199Bool_t MTcpIpI::WaitForData(TSocket &sock)
200{
201 // No connection established?
202 if (!sock.IsValid())
203 {
204 gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl;
205 return kFALSE;
206 }
207
208 fConnectionEstablished = kTRUE;
209
210 // Get connection on port fPortRx and redirected
211 while (!IsThreadCanceled())
212 {
213 // Check for pending data (every ms)
214 switch (sock.Select(TSocket::kRead, 1))
215 {
216 case kTRUE: // Data pending... go on reading
217 if (!ReadSocket(sock))
218 {
219 gLog << warn << "WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl;
220 return kFALSE;
221 }
222
223 // *FALLTHROUGH*
224 case kFALSE: // Time out, no data yet, go on waiting
225
226 // Go on waiting for new data
227 continue;
228
229 default: // Error occurance
230 gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl;
231 return kFALSE;
232 }
233 }
234 return kTRUE; //???
235}
236
237void MTcpIpI::WaitForConnection(TServerSocket &server)
238{
239 gLog << all << " Wait for connection" << endl;
240
241 while (!IsThreadCanceled())
242 {
243 gLog << all << "Listening for new connection on port " << fPortRx << "..." << endl;
244
245 // Check for a connection request (reminder: we are in non-blocking mode)
246 TSocket *socket = 0;
247
248 while (!IsThreadCanceled())
249 {
250 socket = server.Accept();
251
252 // Accept returned an error
253 if (socket==0)
254 {
255 gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl;
256 // Since we don't know the type of the error we better shut down the socket
257 return;
258 }
259
260 // No connection request pending
261 if ((Long_t)socket<0)
262 {
263 MThread::Sleep(1000); // Wait a ms
264 continue;
265 }
266
267 // Connection established
268 break;
269 }
270
271 if ((Long_t)socket<=0)
272 return;
273
274 if (!WaitForData(*socket))
275 fConnectionEstablished = kFALSE;
276
277 delete socket;
278 }
279}
280
281Int_t MTcpIpI::Thread()
282{
283 gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl;
284
285 while (!IsThreadCanceled())
286 {
287 TServerSocket *server=new TServerSocket(fPortRx, kTRUE);
288 server->SetOption(kNoBlock, 1);
289
290 while (!IsThreadCanceled() && server->IsValid())
291 WaitForConnection(*server);
292
293 if (!server->IsValid())
294 {
295 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
296 switch (server->GetErrorCode())
297 {
298 case 0: gLog << "No error." << endl; break;
299 case -1: gLog << "low level socket() call failed." << endl; break;
300 case -2: gLog << "low level bind() call failed." << endl; break;
301 case -3: gLog << "low level listen() call failed." << endl; break;
302 default: gLog << "Unknown." << endl; break;
303 }
304 }
305
306 delete server;
307
308 MThread::Sleep(5000000);
309 }
310
311 gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl;
312
313 return 0;
314}
315
316/*
317Int_t MTcpIpI::Thread()
318{
319 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
320
321// if (fPortRx==7404)
322// {
323// gLog << err << "CeCo communication skipped." << endl;
324// return 0;
325// }
326
327 TServerSocket *fServSock=NULL;
328 TSocket *fRxSocket=NULL;
329
330// while (!HasStopFlag())
331 while (!IsThreadCanceled())
332 {
333 fServSock=new TServerSocket(fPortRx, kTRUE);
334 if (!fServSock->IsValid())
335 {
336 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
337 switch (fServSock->GetErrorCode())
338 {
339 case 0: gLog << "No error." << endl; break;
340 case -1: gLog << "low level socket() call failed." << endl; break;
341 case -2: gLog << "low level bind() call failed." << endl; break;
342 case -3: gLog << "low level listen() call failed." << endl; break;
343 default: gLog << "Unknown." << endl; break;
344 }
345 delete fServSock;
346 fServSock=NULL;
347 MThread::Sleep(5000000);
348 continue;
349 }
350
351 fServSock->SetOption(kNoBlock, 1);
352
353 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
354// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
355 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
356 {
357 //TThread::CancelPoint();
358
359 fRxSocket = fServSock->Accept();
360 if (fRxSocket==0)
361 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
362 usleep(10);
363 }
364
365 // Can happen in case of HasStopFlag()
366 if (fRxSocket==(void*)-1)
367 fRxSocket=NULL;
368
369 if (fRxSocket==NULL)
370 {
371 delete fServSock;
372 fServSock=NULL;
373 continue;
374 }
375
376 if (!fRxSocket->IsValid())
377 {
378 cout << "TSocket invalid on port " << fPortRx << "." << endl;
379 delete fServSock;
380 delete fRxSocket;
381 fServSock = NULL;
382 fRxSocket = NULL;
383 continue;
384 }
385
386 gLog << all << "Connection established on port " << fPortRx << "." << endl;
387
388
389 //fRxSocket->SetOption(kNoBlock, 1);
390
391 // Waqit for data
392 while (!IsThreadCanceled())
393 {
394 switch (fRxSocket->Select(kRead, 1))
395 {
396 case kTRUE: // Data waiting to be read
397 break;
398 case kFALSE: // time out
399 usleep(10);
400 continue;
401 }
402
403 // ERROR
404 cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl;
405
406 delete fServSock;
407 delete fRxSocket;
408 fServSock = NULL;
409 fRxSocket = NULL;
410 break;
411
412
413 if (!fServSock)
414 continue;
415
416 if (IsThreadCanceled())
417 {
418 delete fServSock;
419 delete fRxSocket;
420 fServSock = NULL;
421 fRxSocket = NULL;
422 continue;
423 }
424
425 // ------ IDENTICAL UP TO HERE ------
426
427 // Read and evaluate data
428 ReadSocket(*fRxSocket);
429
430 delete fServSock;
431 delete fRxSocket;
432 fServSock = NULL;
433 fRxSocket = NULL;
434 }
435
436 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
437
438 return 0;
439// return NULL;
440}*/
Note: See TracBrowser for help on using the repository browser.