1 | #include "MTcpIpIO.h"
|
---|
2 |
|
---|
3 | #include <unistd.h> // usleep
|
---|
4 |
|
---|
5 | #include <TSocket.h>
|
---|
6 | #include <TServerSocket.h>
|
---|
7 |
|
---|
8 | #include "MLog.h"
|
---|
9 | #include "MLogManip.h"
|
---|
10 |
|
---|
11 | #undef DEBUG
|
---|
12 |
|
---|
13 | using namespace std;
|
---|
14 |
|
---|
15 | /*
|
---|
16 | enum ESockOptions {
|
---|
17 | kSendBuffer, // size of send buffer
|
---|
18 | kRecvBuffer, // size of receive buffer
|
---|
19 | kOobInline, // OOB message inline
|
---|
20 | kKeepAlive, // keep socket alive
|
---|
21 | kReuseAddr, // allow reuse of local portion of address 5-tuple
|
---|
22 | kNoDelay, // send without delay
|
---|
23 | kNoBlock, // non-blocking I/O
|
---|
24 | kProcessGroup, // socket process group (used for SIGURG and SIGIO)
|
---|
25 | kAtMark, // are we at out-of-band mark (read only)
|
---|
26 | kBytesToRead // get number of bytes to read, FIONREAD (read only)
|
---|
27 | };
|
---|
28 |
|
---|
29 | enum ESendRecvOptions {
|
---|
30 | kDefault, // default option (= 0)
|
---|
31 | kOob, // send or receive out-of-band data
|
---|
32 | kPeek, // peek at incoming message (receive only)
|
---|
33 | kDontBlock // send/recv as much data as possible without blocking
|
---|
34 | };
|
---|
35 | */
|
---|
36 |
|
---|
37 | MTcpIpO::MTcpIpO(Int_t tx)
|
---|
38 | {
|
---|
39 | fTxSocket = new TSocket("ceco", tx);
|
---|
40 | }
|
---|
41 |
|
---|
42 | MTcpIpO::~MTcpIpO()
|
---|
43 | {
|
---|
44 | // Now delete all TCP/IP objects
|
---|
45 | delete fTxSocket;
|
---|
46 | }
|
---|
47 |
|
---|
48 | MTcpIpIO::MTcpIpIO(Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(tx)
|
---|
49 | {
|
---|
50 | RunThread();
|
---|
51 | }
|
---|
52 |
|
---|
53 | MTcpIpIO::~MTcpIpIO()
|
---|
54 | {
|
---|
55 | }
|
---|
56 |
|
---|
57 | bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
|
---|
58 | {
|
---|
59 | if (!tx.IsValid())
|
---|
60 | {
|
---|
61 | //cout << "*!* Transmit socket invalid!" << endl;
|
---|
62 | return false;
|
---|
63 | }
|
---|
64 |
|
---|
65 | const Int_t l = tx.SendRaw(msg, len);
|
---|
66 | if (l<0)
|
---|
67 | {
|
---|
68 | cout << "ERROR - Sending Message" << endl;
|
---|
69 | return false;
|
---|
70 | }
|
---|
71 |
|
---|
72 | if (l!=len)
|
---|
73 | {
|
---|
74 | cout << "Send wrong number (" << l << ") of Bytes." << endl;
|
---|
75 | return false;
|
---|
76 | }
|
---|
77 |
|
---|
78 | #ifdef DEBUG
|
---|
79 | cout << "Tx: " << msg << flush;
|
---|
80 | #endif
|
---|
81 |
|
---|
82 | return true;
|
---|
83 | }
|
---|
84 |
|
---|
85 | bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
|
---|
86 | {
|
---|
87 | // R__LOCKGUARD2(myMutex);
|
---|
88 |
|
---|
89 | cout << "Connecting to " << addr << ":" << port << endl;
|
---|
90 |
|
---|
91 | // FIXME: Set tx-socket to nonblocking?
|
---|
92 | TSocket tx(addr, port);
|
---|
93 | // return SendFrame(tx, msg, len);
|
---|
94 |
|
---|
95 | if (!tx.IsValid())
|
---|
96 | {
|
---|
97 | //cout << "*!* Transmit socket invalid!" << endl;
|
---|
98 | return false;
|
---|
99 | }
|
---|
100 |
|
---|
101 | cout << "Sending to " << addr << ":" << port << endl;
|
---|
102 |
|
---|
103 | const Int_t l = tx.SendRaw(msg, len, kDontBlock);
|
---|
104 | if (l<0)
|
---|
105 | {
|
---|
106 | cout << "ERROR - Sending Message" << endl;
|
---|
107 | return false;
|
---|
108 | }
|
---|
109 |
|
---|
110 | if (l!=len)
|
---|
111 | {
|
---|
112 | cout << "Send wrong number (" << l << ") of Bytes." << endl;
|
---|
113 | return false;
|
---|
114 | }
|
---|
115 |
|
---|
116 | #ifdef DEBUG
|
---|
117 | cout << "Tx: " << msg << flush;
|
---|
118 | #endif
|
---|
119 |
|
---|
120 | return true;
|
---|
121 | }
|
---|
122 |
|
---|
123 | bool MTcpIpO::Send(const char *msg, Int_t len)
|
---|
124 | {
|
---|
125 | return SendFrame(*fTxSocket, msg, len);
|
---|
126 | }
|
---|
127 |
|
---|
128 | bool MTcpIpIO::InterpreteStr(TString str)
|
---|
129 | {
|
---|
130 | cout << "Rx: " << str << flush;
|
---|
131 | return true;
|
---|
132 | }
|
---|
133 |
|
---|
134 | void MTcpIpIO::ReadSocket(TSocket &rx)
|
---|
135 | {
|
---|
136 | // Clear buffer!
|
---|
137 | char c;
|
---|
138 | // while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag())
|
---|
139 | while (rx.RecvRaw(&c, 1)>0 && !IsThreadCanceled())
|
---|
140 | usleep(1);
|
---|
141 |
|
---|
142 | TString str;
|
---|
143 | // while (!HasStopFlag())
|
---|
144 | while (!IsThreadCanceled())
|
---|
145 | {
|
---|
146 | char c;
|
---|
147 | const Int_t len = rx.RecvRaw(&c, 1);
|
---|
148 |
|
---|
149 | // No data received (non-blocking mode)
|
---|
150 | if (len<0)
|
---|
151 | {
|
---|
152 | usleep(1);
|
---|
153 | continue;
|
---|
154 | }
|
---|
155 |
|
---|
156 | // Data received with zero length!
|
---|
157 | if (len==0)
|
---|
158 | {
|
---|
159 | // THIS MEANS CONNECTIION LOST!!!!
|
---|
160 | cout << "============> len==0 (CONNECTION LOST?)" << endl;
|
---|
161 | break; // This break is for TEST PURPOSE FIXME!!!
|
---|
162 | continue;
|
---|
163 | }
|
---|
164 |
|
---|
165 | // Data received
|
---|
166 | if (len>1)
|
---|
167 | {
|
---|
168 | cout << "Data too long!!!" << endl;
|
---|
169 | break;
|
---|
170 | }
|
---|
171 |
|
---|
172 | // Data received (len==1)
|
---|
173 | if (c!='\n')
|
---|
174 | {
|
---|
175 | str += c;
|
---|
176 | continue;
|
---|
177 | }
|
---|
178 |
|
---|
179 | // String completed
|
---|
180 | InterpreteStr(str);
|
---|
181 | str = "";
|
---|
182 | }
|
---|
183 | }
|
---|
184 |
|
---|
185 | //void *MTcpIpIO::Thread()
|
---|
186 | Int_t MTcpIpI::Thread()
|
---|
187 | {
|
---|
188 | gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
|
---|
189 |
|
---|
190 | TServerSocket *fServSock=NULL;
|
---|
191 | TSocket *fRxSocket=NULL;
|
---|
192 |
|
---|
193 | // while (!HasStopFlag())
|
---|
194 | while (!IsThreadCanceled())
|
---|
195 | {
|
---|
196 | fServSock=new TServerSocket(fPortRx, kTRUE);
|
---|
197 | if (!fServSock->IsValid())
|
---|
198 | {
|
---|
199 | cout << "ServerSocket on port " << fPortRx << " invalid: ";
|
---|
200 | switch (fServSock->GetErrorCode())
|
---|
201 | {
|
---|
202 | case 0: cout << "No error." << endl; break;
|
---|
203 | case -1: cout << "low level socket() call failed." << endl; break;
|
---|
204 | case -2: cout << "low level bind() call failed." << endl; break;
|
---|
205 | case -3: cout << "low level listen() call failed." << endl; break;
|
---|
206 | default: cout << "Unknown." << endl; break;
|
---|
207 | }
|
---|
208 | delete fServSock;
|
---|
209 | fServSock=NULL;
|
---|
210 | MThread::Sleep(5000000);
|
---|
211 | continue;
|
---|
212 | }
|
---|
213 |
|
---|
214 | fServSock->SetOption(kNoBlock, 1);
|
---|
215 |
|
---|
216 | cout << "Waiting for connection on port " << fPortRx << "..." << endl;
|
---|
217 | // while (!HasStopFlag() && (Long_t)fRxSocket<=0)
|
---|
218 | while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
|
---|
219 | {
|
---|
220 | //TThread::CancelPoint();
|
---|
221 |
|
---|
222 | fRxSocket = fServSock->Accept();
|
---|
223 | if (fRxSocket==0)
|
---|
224 | cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
|
---|
225 | usleep(10);
|
---|
226 | }
|
---|
227 |
|
---|
228 | // Can happen in case of HasStopFlag()
|
---|
229 | if (fRxSocket==(void*)-1)
|
---|
230 | fRxSocket=NULL;
|
---|
231 |
|
---|
232 | if (fRxSocket==NULL)
|
---|
233 | {
|
---|
234 | delete fServSock;
|
---|
235 | fServSock=NULL;
|
---|
236 | continue;
|
---|
237 | }
|
---|
238 |
|
---|
239 | if (!fRxSocket->IsValid())
|
---|
240 | {
|
---|
241 | cout << "TSocket invalid on port " << fPortRx << "." << endl;
|
---|
242 | delete fServSock;
|
---|
243 | delete fRxSocket;
|
---|
244 | fServSock = NULL;
|
---|
245 | fRxSocket = NULL;
|
---|
246 | continue;
|
---|
247 | }
|
---|
248 |
|
---|
249 | cout << "Connection established on port " << fPortRx << "." << endl;
|
---|
250 |
|
---|
251 | fRxSocket->SetOption(kNoBlock, 1);
|
---|
252 |
|
---|
253 | // ------ IDENTICAL UP TO HERE ------
|
---|
254 |
|
---|
255 | ReadSocket(*fRxSocket);
|
---|
256 |
|
---|
257 | delete fServSock;
|
---|
258 | delete fRxSocket;
|
---|
259 | fServSock = NULL;
|
---|
260 | fRxSocket = NULL;
|
---|
261 | }
|
---|
262 |
|
---|
263 | gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
|
---|
264 |
|
---|
265 | return 0;
|
---|
266 | // return NULL;
|
---|
267 | }
|
---|