source: trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc@ 8862

Last change on this file since 8862 was 8862, checked in by tbretz, 17 years ago
*** empty log message ***
File size: 7.1 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <TSocket.h>
6#include <TServerSocket.h>
7
8#include "MLog.h"
9#include "MLogManip.h"
10
11#include "MString.h"
12
13#undef DEBUG
14
15using namespace std;
16
17 /*
18 enum ESockOptions {
19 kSendBuffer, // size of send buffer
20 kRecvBuffer, // size of receive buffer
21 kOobInline, // OOB message inline
22 kKeepAlive, // keep socket alive
23 kReuseAddr, // allow reuse of local portion of address 5-tuple
24 kNoDelay, // send without delay
25 kNoBlock, // non-blocking I/O
26 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
27 kAtMark, // are we at out-of-band mark (read only)
28 kBytesToRead // get number of bytes to read, FIONREAD (read only)
29 };
30
31 enum ESendRecvOptions {
32 kDefault, // default option (= 0)
33 kOob, // send or receive out-of-band data
34 kPeek, // peek at incoming message (receive only)
35 kDontBlock // send/recv as much data as possible without blocking
36 };
37 */
38
39MTcpIpO::MTcpIpO(const char *addr, Int_t tx)
40{
41 fTxSocket = new TSocket(addr, tx);
42}
43
44MTcpIpO::~MTcpIpO()
45{
46 // Now delete all TCP/IP objects
47 delete fTxSocket;
48}
49
50MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(addr, tx)
51{
52 RunThread();
53}
54
55MTcpIpIO::~MTcpIpIO()
56{
57}
58
59TString MTcpIpO::GetSocketAddress(const TSocket &s)
60{
61 if (!s.IsValid())
62 return "n/a";
63
64 const TInetAddress &a = s.GetInetAddress();
65 if (!a.IsValid())
66 return "undefined";
67
68 return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
69}
70
71TString MTcpIpO::GetSocketAddress() const
72{
73 return GetSocketAddress(*fTxSocket);
74}
75
76bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
77{
78 if (!tx.IsValid())
79 {
80 //gLog << warn << "WARNING - No transmission to " << GetSocketAddress(tx) << " possible." << endl;
81 return false;
82 }
83
84 const Int_t l = tx.SendRaw(msg, len);
85 if (l<0)
86 {
87 gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
88 return false;
89 }
90
91 if (l!=len)
92 {
93 gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << GetSocketAddress(tx) << endl;
94 return false;
95 }
96
97#ifdef DEBUG
98 cout << "Tx: " << msg << flush;
99#endif
100
101 return true;
102}
103
104bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
105{
106 // R__LOCKGUARD2(myMutex);
107#ifdef DEBUG
108 cout << "Connecting to " << addr << ":" << port << endl;
109#endif
110
111 // FIXME: Set tx-socket to nonblocking?
112 TSocket tx(addr, port);
113 return SendFrame(tx, msg, len);
114/*
115 if (!tx.IsValid())
116 {
117 gLog << warn << "WARNING - No transmission to " << addr << ":" << port << " possible." << endl;
118 return false;
119 }
120
121 gLog << dbg << "Sending to " << addr << ":" << port << endl;
122
123 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
124 if (l<0)
125 {
126 gLog << err << "ERROR - Sending TCP/IP frame to " << addr << ":" << port << endl;
127 return false;
128 }
129
130 if (l!=len)
131 {
132 gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << addr << ":" << port << endl;
133 return false;
134 }
135
136#ifdef DEBUG
137 cout << "Tx: " << msg << flush;
138#endif
139
140 return true;
141 */
142}
143
144bool MTcpIpO::Send(const char *msg, Int_t len)
145{
146 return SendFrame(*fTxSocket, msg, len);
147}
148
149bool MTcpIpIO::InterpreteStr(TString str)
150{
151 cout << "Rx: " << str << flush;
152 return true;
153}
154
155void MTcpIpIO::ReadSocket(TSocket &rx)
156{
157 // Clear buffer!
158 char c;
159// while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag())
160 while (rx.RecvRaw(&c, 1)>0 && !IsThreadCanceled())
161 usleep(1);
162
163 TString str;
164 // while (!HasStopFlag())
165 while (!IsThreadCanceled())
166 {
167 char c;
168 const Int_t len = rx.RecvRaw(&c, 1);
169
170 // No data received (non-blocking mode)
171 if (len<0)
172 {
173 usleep(1);
174 continue;
175 }
176
177 // Data received with zero length!
178 if (len==0)
179 {
180 // THIS MEANS CONNECTIION LOST!!!!
181 cout << "============> len==0 (CONNECTION LOST?)" << endl;
182 break; // This break is for TEST PURPOSE FIXME!!!
183 continue;
184 }
185
186 // Data received
187 if (len>1)
188 {
189 cout << "Data too long!!!" << endl;
190 break;
191 }
192
193 // Data received (len==1)
194 if (c!='\n')
195 {
196 str += c;
197 continue;
198 }
199
200 // String completed
201 InterpreteStr(str);
202 str = "";
203 }
204}
205
206//void *MTcpIpIO::Thread()
207Int_t MTcpIpI::Thread()
208{
209 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
210
211// if (fPortRx==7404)
212// {
213// gLog << err << "CeCo communication skipped." << endl;
214// return 0;
215// }
216
217 TServerSocket *fServSock=NULL;
218 TSocket *fRxSocket=NULL;
219
220// while (!HasStopFlag())
221 while (!IsThreadCanceled())
222 {
223 fServSock=new TServerSocket(fPortRx, kTRUE);
224 if (!fServSock->IsValid())
225 {
226 gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
227 switch (fServSock->GetErrorCode())
228 {
229 case 0: gLog << "No error." << endl; break;
230 case -1: gLog << "low level socket() call failed." << endl; break;
231 case -2: gLog << "low level bind() call failed." << endl; break;
232 case -3: gLog << "low level listen() call failed." << endl; break;
233 default: gLog << "Unknown." << endl; break;
234 }
235 delete fServSock;
236 fServSock=NULL;
237 MThread::Sleep(5000000);
238 continue;
239 }
240
241 fServSock->SetOption(kNoBlock, 1);
242
243 gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
244// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
245 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
246 {
247 //TThread::CancelPoint();
248
249 fRxSocket = fServSock->Accept();
250 if (fRxSocket==0)
251 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
252 usleep(10);
253 }
254
255 // Can happen in case of HasStopFlag()
256 if (fRxSocket==(void*)-1)
257 fRxSocket=NULL;
258
259 if (fRxSocket==NULL)
260 {
261 delete fServSock;
262 fServSock=NULL;
263 continue;
264 }
265
266 if (!fRxSocket->IsValid())
267 {
268 cout << "TSocket invalid on port " << fPortRx << "." << endl;
269 delete fServSock;
270 delete fRxSocket;
271 fServSock = NULL;
272 fRxSocket = NULL;
273 continue;
274 }
275
276 gLog << all << "Connection established on port " << fPortRx << "." << endl;
277
278 fRxSocket->SetOption(kNoBlock, 1);
279
280 // ------ IDENTICAL UP TO HERE ------
281
282 ReadSocket(*fRxSocket);
283
284 delete fServSock;
285 delete fRxSocket;
286 fServSock = NULL;
287 fRxSocket = NULL;
288 }
289
290 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
291
292 return 0;
293// return NULL;
294}
Note: See TracBrowser for help on using the repository browser.