1 | #include "MTcpIpIO.h"
|
---|
2 |
|
---|
3 | #include <unistd.h> // usleep
|
---|
4 |
|
---|
5 | #include <TSocket.h>
|
---|
6 | #include <TServerSocket.h>
|
---|
7 |
|
---|
8 | #include "MLog.h"
|
---|
9 | #include "MLogManip.h"
|
---|
10 |
|
---|
11 | #include "MString.h"
|
---|
12 |
|
---|
13 | #undef DEBUG
|
---|
14 |
|
---|
15 | using namespace std;
|
---|
16 |
|
---|
17 | /*
|
---|
18 | enum ESockOptions {
|
---|
19 | kSendBuffer, // size of send buffer
|
---|
20 | kRecvBuffer, // size of receive buffer
|
---|
21 | kOobInline, // OOB message inline
|
---|
22 | kKeepAlive, // keep socket alive
|
---|
23 | kReuseAddr, // allow reuse of local portion of address 5-tuple
|
---|
24 | kNoDelay, // send without delay
|
---|
25 | kNoBlock, // non-blocking I/O
|
---|
26 | kProcessGroup, // socket process group (used for SIGURG and SIGIO)
|
---|
27 | kAtMark, // are we at out-of-band mark (read only)
|
---|
28 | kBytesToRead // get number of bytes to read, FIONREAD (read only)
|
---|
29 | };
|
---|
30 |
|
---|
31 | enum ESendRecvOptions {
|
---|
32 | kDefault, // default option (= 0)
|
---|
33 | kOob, // send or receive out-of-band data
|
---|
34 | kPeek, // peek at incoming message (receive only)
|
---|
35 | kDontBlock // send/recv as much data as possible without blocking
|
---|
36 | };
|
---|
37 | */
|
---|
38 |
|
---|
39 | MTcpIpO::MTcpIpO(const char *addr, Int_t tx)
|
---|
40 | {
|
---|
41 | fTxSocket = new TSocket(addr, tx);
|
---|
42 | }
|
---|
43 |
|
---|
44 | MTcpIpO::~MTcpIpO()
|
---|
45 | {
|
---|
46 | // Now delete all TCP/IP objects
|
---|
47 | delete fTxSocket;
|
---|
48 | }
|
---|
49 |
|
---|
50 | MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(addr, tx)
|
---|
51 | {
|
---|
52 | RunThread();
|
---|
53 | }
|
---|
54 |
|
---|
55 | MTcpIpIO::~MTcpIpIO()
|
---|
56 | {
|
---|
57 | }
|
---|
58 |
|
---|
59 | TString MTcpIpO::GetSocketAddress(const TSocket &s)
|
---|
60 | {
|
---|
61 | if (!s.IsValid())
|
---|
62 | return "n/a";
|
---|
63 |
|
---|
64 | const TInetAddress &a = s.GetInetAddress();
|
---|
65 | if (!a.IsValid())
|
---|
66 | return "undefined";
|
---|
67 |
|
---|
68 | return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort());
|
---|
69 | }
|
---|
70 |
|
---|
71 | TString MTcpIpO::GetSocketAddress() const
|
---|
72 | {
|
---|
73 | return GetSocketAddress(*fTxSocket);
|
---|
74 | }
|
---|
75 |
|
---|
76 | bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
|
---|
77 | {
|
---|
78 | if (!tx.IsValid())
|
---|
79 | {
|
---|
80 | //gLog << warn << "WARNING - No transmission to " << GetSocketAddress(tx) << " possible." << endl;
|
---|
81 | return false;
|
---|
82 | }
|
---|
83 |
|
---|
84 | const Int_t l = tx.SendRaw(msg, len);
|
---|
85 | if (l<0)
|
---|
86 | {
|
---|
87 | gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl;
|
---|
88 | return false;
|
---|
89 | }
|
---|
90 |
|
---|
91 | if (l!=len)
|
---|
92 | {
|
---|
93 | gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << GetSocketAddress(tx) << endl;
|
---|
94 | return false;
|
---|
95 | }
|
---|
96 |
|
---|
97 | #ifdef DEBUG
|
---|
98 | cout << "Tx: " << msg << flush;
|
---|
99 | #endif
|
---|
100 |
|
---|
101 | return true;
|
---|
102 | }
|
---|
103 |
|
---|
104 | bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
|
---|
105 | {
|
---|
106 | // R__LOCKGUARD2(myMutex);
|
---|
107 | #ifdef DEBUG
|
---|
108 | cout << "Connecting to " << addr << ":" << port << endl;
|
---|
109 | #endif
|
---|
110 |
|
---|
111 | // FIXME: Set tx-socket to nonblocking?
|
---|
112 | TSocket tx(addr, port);
|
---|
113 | return SendFrame(tx, msg, len);
|
---|
114 | /*
|
---|
115 | if (!tx.IsValid())
|
---|
116 | {
|
---|
117 | gLog << warn << "WARNING - No transmission to " << addr << ":" << port << " possible." << endl;
|
---|
118 | return false;
|
---|
119 | }
|
---|
120 |
|
---|
121 | gLog << dbg << "Sending to " << addr << ":" << port << endl;
|
---|
122 |
|
---|
123 | const Int_t l = tx.SendRaw(msg, len, kDontBlock);
|
---|
124 | if (l<0)
|
---|
125 | {
|
---|
126 | gLog << err << "ERROR - Sending TCP/IP frame to " << addr << ":" << port << endl;
|
---|
127 | return false;
|
---|
128 | }
|
---|
129 |
|
---|
130 | if (l!=len)
|
---|
131 | {
|
---|
132 | gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << addr << ":" << port << endl;
|
---|
133 | return false;
|
---|
134 | }
|
---|
135 |
|
---|
136 | #ifdef DEBUG
|
---|
137 | cout << "Tx: " << msg << flush;
|
---|
138 | #endif
|
---|
139 |
|
---|
140 | return true;
|
---|
141 | */
|
---|
142 | }
|
---|
143 |
|
---|
144 | bool MTcpIpO::Send(const char *msg, Int_t len)
|
---|
145 | {
|
---|
146 | return SendFrame(*fTxSocket, msg, len);
|
---|
147 | }
|
---|
148 |
|
---|
149 | bool MTcpIpIO::InterpreteStr(TString str)
|
---|
150 | {
|
---|
151 | cout << "Rx: " << str << flush;
|
---|
152 | return true;
|
---|
153 | }
|
---|
154 |
|
---|
155 | void MTcpIpIO::ReadSocket(TSocket &rx)
|
---|
156 | {
|
---|
157 | // Clear buffer!
|
---|
158 | char c;
|
---|
159 | // while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag())
|
---|
160 | while (rx.RecvRaw(&c, 1)>0 && !IsThreadCanceled())
|
---|
161 | usleep(1);
|
---|
162 |
|
---|
163 | TString str;
|
---|
164 | // while (!HasStopFlag())
|
---|
165 | while (!IsThreadCanceled())
|
---|
166 | {
|
---|
167 | char c;
|
---|
168 | const Int_t len = rx.RecvRaw(&c, 1);
|
---|
169 |
|
---|
170 | // No data received (non-blocking mode)
|
---|
171 | if (len<0)
|
---|
172 | {
|
---|
173 | usleep(1);
|
---|
174 | continue;
|
---|
175 | }
|
---|
176 |
|
---|
177 | // Data received with zero length!
|
---|
178 | if (len==0)
|
---|
179 | {
|
---|
180 | // THIS MEANS CONNECTIION LOST!!!!
|
---|
181 | cout << "============> len==0 (CONNECTION LOST?)" << endl;
|
---|
182 | break; // This break is for TEST PURPOSE FIXME!!!
|
---|
183 | continue;
|
---|
184 | }
|
---|
185 |
|
---|
186 | // Data received
|
---|
187 | if (len>1)
|
---|
188 | {
|
---|
189 | cout << "Data too long!!!" << endl;
|
---|
190 | break;
|
---|
191 | }
|
---|
192 |
|
---|
193 | // Data received (len==1)
|
---|
194 | if (c!='\n')
|
---|
195 | {
|
---|
196 | str += c;
|
---|
197 | continue;
|
---|
198 | }
|
---|
199 |
|
---|
200 | // String completed
|
---|
201 | InterpreteStr(str);
|
---|
202 | str = "";
|
---|
203 | }
|
---|
204 | }
|
---|
205 |
|
---|
206 | //void *MTcpIpIO::Thread()
|
---|
207 | Int_t MTcpIpI::Thread()
|
---|
208 | {
|
---|
209 | gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
|
---|
210 |
|
---|
211 | // if (fPortRx==7404)
|
---|
212 | // {
|
---|
213 | // gLog << err << "CeCo communication skipped." << endl;
|
---|
214 | // return 0;
|
---|
215 | // }
|
---|
216 |
|
---|
217 | TServerSocket *fServSock=NULL;
|
---|
218 | TSocket *fRxSocket=NULL;
|
---|
219 |
|
---|
220 | // while (!HasStopFlag())
|
---|
221 | while (!IsThreadCanceled())
|
---|
222 | {
|
---|
223 | fServSock=new TServerSocket(fPortRx, kTRUE);
|
---|
224 | if (!fServSock->IsValid())
|
---|
225 | {
|
---|
226 | gLog << err << "ServerSocket on port " << fPortRx << " invalid: ";
|
---|
227 | switch (fServSock->GetErrorCode())
|
---|
228 | {
|
---|
229 | case 0: gLog << "No error." << endl; break;
|
---|
230 | case -1: gLog << "low level socket() call failed." << endl; break;
|
---|
231 | case -2: gLog << "low level bind() call failed." << endl; break;
|
---|
232 | case -3: gLog << "low level listen() call failed." << endl; break;
|
---|
233 | default: gLog << "Unknown." << endl; break;
|
---|
234 | }
|
---|
235 | delete fServSock;
|
---|
236 | fServSock=NULL;
|
---|
237 | MThread::Sleep(5000000);
|
---|
238 | continue;
|
---|
239 | }
|
---|
240 |
|
---|
241 | fServSock->SetOption(kNoBlock, 1);
|
---|
242 |
|
---|
243 | gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl;
|
---|
244 | // while (!HasStopFlag() && (Long_t)fRxSocket<=0)
|
---|
245 | while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
|
---|
246 | {
|
---|
247 | //TThread::CancelPoint();
|
---|
248 |
|
---|
249 | fRxSocket = fServSock->Accept();
|
---|
250 | if (fRxSocket==0)
|
---|
251 | cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
|
---|
252 | usleep(10);
|
---|
253 | }
|
---|
254 |
|
---|
255 | // Can happen in case of HasStopFlag()
|
---|
256 | if (fRxSocket==(void*)-1)
|
---|
257 | fRxSocket=NULL;
|
---|
258 |
|
---|
259 | if (fRxSocket==NULL)
|
---|
260 | {
|
---|
261 | delete fServSock;
|
---|
262 | fServSock=NULL;
|
---|
263 | continue;
|
---|
264 | }
|
---|
265 |
|
---|
266 | if (!fRxSocket->IsValid())
|
---|
267 | {
|
---|
268 | cout << "TSocket invalid on port " << fPortRx << "." << endl;
|
---|
269 | delete fServSock;
|
---|
270 | delete fRxSocket;
|
---|
271 | fServSock = NULL;
|
---|
272 | fRxSocket = NULL;
|
---|
273 | continue;
|
---|
274 | }
|
---|
275 |
|
---|
276 | gLog << all << "Connection established on port " << fPortRx << "." << endl;
|
---|
277 |
|
---|
278 | fRxSocket->SetOption(kNoBlock, 1);
|
---|
279 |
|
---|
280 | // ------ IDENTICAL UP TO HERE ------
|
---|
281 |
|
---|
282 | ReadSocket(*fRxSocket);
|
---|
283 |
|
---|
284 | delete fServSock;
|
---|
285 | delete fRxSocket;
|
---|
286 | fServSock = NULL;
|
---|
287 | fRxSocket = NULL;
|
---|
288 | }
|
---|
289 |
|
---|
290 | gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
|
---|
291 |
|
---|
292 | return 0;
|
---|
293 | // return NULL;
|
---|
294 | }
|
---|