source: trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc@ 8816

Last change on this file since 8816 was 8816, checked in by tbretz, 17 years ago
*** empty log message ***
File size: 6.3 KB
Line 
1#include "MTcpIpIO.h"
2
3#include <unistd.h> // usleep
4
5#include <TSocket.h>
6#include <TServerSocket.h>
7
8#include "MLog.h"
9#include "MLogManip.h"
10
11#undef DEBUG
12
13using namespace std;
14
15 /*
16 enum ESockOptions {
17 kSendBuffer, // size of send buffer
18 kRecvBuffer, // size of receive buffer
19 kOobInline, // OOB message inline
20 kKeepAlive, // keep socket alive
21 kReuseAddr, // allow reuse of local portion of address 5-tuple
22 kNoDelay, // send without delay
23 kNoBlock, // non-blocking I/O
24 kProcessGroup, // socket process group (used for SIGURG and SIGIO)
25 kAtMark, // are we at out-of-band mark (read only)
26 kBytesToRead // get number of bytes to read, FIONREAD (read only)
27 };
28
29 enum ESendRecvOptions {
30 kDefault, // default option (= 0)
31 kOob, // send or receive out-of-band data
32 kPeek, // peek at incoming message (receive only)
33 kDontBlock // send/recv as much data as possible without blocking
34 };
35 */
36
37MTcpIpO::MTcpIpO(Int_t tx)
38{
39 fTxSocket = new TSocket("ceco", tx);
40}
41
42MTcpIpO::~MTcpIpO()
43{
44 // Now delete all TCP/IP objects
45 delete fTxSocket;
46}
47
48MTcpIpIO::MTcpIpIO(Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(tx)
49{
50 RunThread();
51}
52
53MTcpIpIO::~MTcpIpIO()
54{
55}
56
57bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
58{
59 if (!tx.IsValid())
60 {
61 //cout << "*!* Transmit socket invalid!" << endl;
62 return false;
63 }
64
65 const Int_t l = tx.SendRaw(msg, len);
66 if (l<0)
67 {
68 cout << "ERROR - Sending Message" << endl;
69 return false;
70 }
71
72 if (l!=len)
73 {
74 cout << "Send wrong number (" << l << ") of Bytes." << endl;
75 return false;
76 }
77
78#ifdef DEBUG
79 cout << "Tx: " << msg << flush;
80#endif
81
82 return true;
83}
84
85bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
86{
87 // R__LOCKGUARD2(myMutex);
88
89 cout << "Connecting to " << addr << ":" << port << endl;
90
91 // FIXME: Set tx-socket to nonblocking?
92 TSocket tx(addr, port);
93 // return SendFrame(tx, msg, len);
94
95 if (!tx.IsValid())
96 {
97 //cout << "*!* Transmit socket invalid!" << endl;
98 return false;
99 }
100
101 cout << "Sending to " << addr << ":" << port << endl;
102
103 const Int_t l = tx.SendRaw(msg, len, kDontBlock);
104 if (l<0)
105 {
106 cout << "ERROR - Sending Message" << endl;
107 return false;
108 }
109
110 if (l!=len)
111 {
112 cout << "Send wrong number (" << l << ") of Bytes." << endl;
113 return false;
114 }
115
116#ifdef DEBUG
117 cout << "Tx: " << msg << flush;
118#endif
119
120 return true;
121}
122
123bool MTcpIpO::Send(const char *msg, Int_t len)
124{
125 return SendFrame(*fTxSocket, msg, len);
126}
127
128bool MTcpIpIO::InterpreteStr(TString str)
129{
130 cout << "Rx: " << str << flush;
131 return true;
132}
133
134void MTcpIpIO::ReadSocket(TSocket &rx)
135{
136 // Clear buffer!
137 char c;
138// while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag())
139 while (rx.RecvRaw(&c, 1)>0 && !IsThreadCanceled())
140 usleep(1);
141
142 TString str;
143 // while (!HasStopFlag())
144 while (!IsThreadCanceled())
145 {
146 char c;
147 const Int_t len = rx.RecvRaw(&c, 1);
148
149 // No data received (non-blocking mode)
150 if (len<0)
151 {
152 usleep(1);
153 continue;
154 }
155
156 // Data received with zero length!
157 if (len==0)
158 {
159 // THIS MEANS CONNECTIION LOST!!!!
160 cout << "============> len==0 (CONNECTION LOST?)" << endl;
161 break; // This break is for TEST PURPOSE FIXME!!!
162 continue;
163 }
164
165 // Data received
166 if (len>1)
167 {
168 cout << "Data too long!!!" << endl;
169 break;
170 }
171
172 // Data received (len==1)
173 if (c!='\n')
174 {
175 str += c;
176 continue;
177 }
178
179 // String completed
180 InterpreteStr(str);
181 str = "";
182 }
183}
184
185//void *MTcpIpIO::Thread()
186Int_t MTcpIpI::Thread()
187{
188 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
189
190 TServerSocket *fServSock=NULL;
191 TSocket *fRxSocket=NULL;
192
193// while (!HasStopFlag())
194 while (!IsThreadCanceled())
195 {
196 fServSock=new TServerSocket(fPortRx, kTRUE);
197 if (!fServSock->IsValid())
198 {
199 cout << "ServerSocket on port " << fPortRx << " invalid: ";
200 switch (fServSock->GetErrorCode())
201 {
202 case 0: cout << "No error." << endl; break;
203 case -1: cout << "low level socket() call failed." << endl; break;
204 case -2: cout << "low level bind() call failed." << endl; break;
205 case -3: cout << "low level listen() call failed." << endl; break;
206 default: cout << "Unknown." << endl; break;
207 }
208 delete fServSock;
209 fServSock=NULL;
210 MyThreadX::Sleep(5000000);
211 continue;
212 }
213
214 fServSock->SetOption(kNoBlock, 1);
215
216 cout << "Waiting for connection on port " << fPortRx << "..." << endl;
217// while (!HasStopFlag() && (Long_t)fRxSocket<=0)
218 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
219 {
220 //TThread::CancelPoint();
221
222 fRxSocket = fServSock->Accept();
223 if (fRxSocket==0)
224 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
225 usleep(10);
226 }
227
228 // Can happen in case of HasStopFlag()
229 if (fRxSocket==(void*)-1)
230 fRxSocket=NULL;
231
232 if (fRxSocket==NULL)
233 {
234 delete fServSock;
235 fServSock=NULL;
236 continue;
237 }
238
239 if (!fRxSocket->IsValid())
240 {
241 cout << "TSocket invalid on port " << fPortRx << "." << endl;
242 delete fServSock;
243 delete fRxSocket;
244 fServSock = NULL;
245 fRxSocket = NULL;
246 continue;
247 }
248
249 cout << "Connection established on port " << fPortRx << "." << endl;
250
251 fRxSocket->SetOption(kNoBlock, 1);
252
253 // ------ IDENTICAL UP TO HERE ------
254
255 ReadSocket(*fRxSocket);
256
257 delete fServSock;
258 delete fRxSocket;
259 fServSock = NULL;
260 fRxSocket = NULL;
261 }
262
263 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
264
265 return 0;
266// return NULL;
267}
Note: See TracBrowser for help on using the repository browser.