StRoot  1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
msgNQLib.cxx
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <sys/stat.h>
5 #include <fcntl.h>
6 #include <errno.h>
7 #include <string.h>
8 #include <poll.h>
9 #include <sys/socket.h>
10 #include <netinet/in.h>
11 #include <arpa/inet.h>
12 #include <netdb.h>
13 
14 #if defined(__linux__) || defined(__APPLE__)
15 
16 #endif
17 
18 #include <rtsLog.h>
19 
20 
21 #include "msgNQLib.h"
22 
23 
24 
25 // index is the desciptor - NOT the task
26 //static struct msgNQStruct desc[MSG_NQ_MAXQUEUES] ;
27 
28 
29 /*
30  Opens the pipe associated with a task and returns a standard file
31  desciptor if successfull or -1 if not.
32 
33  If the pipe didn't exist - it makes it first via mknod.
34 
35  If the second argument ("lock") is true it additionally places
36  an advisory lock on the pipe itself. This should only be done if the
37  caller will be the listener of the pipe!
38 
39  If another task has the same pipe open msgQCreate will return error.
40 
41 */
42 int msgNQCreate(char *host, int port, int msglen)
43 {
44  struct sockaddr_in me ;
45  int size, dsc ;
46  struct hostent *hostent ;
47  int optval ;
48  int ret ;
49 
50  size = sizeof(struct sockaddr_in) ;
51  memset((char *)&me,0,size) ;
52 
53 
54  me.sin_family = AF_INET ;
55  me.sin_port = htons(port) ;
56 
57 
58  hostent = gethostbyname(host) ;
59  if(hostent == NULL) {
60  LOG(CRIT,"Unknown host %s (%s)",host,strerror(errno),0,0,0) ;
61  return -1 ;
62  }
63 
64 
65  memcpy(&me.sin_addr.s_addr,*(hostent->h_addr_list),sizeof(me.sin_addr.s_addr)) ;
66 
67  errno = 0 ;
68  dsc = socket(AF_INET, SOCK_STREAM, 0) ;
69  if(dsc < 0) {
70  LOG(CRIT,"socket() failed [%s]",strerror(errno),0,0,0,0) ;
71  return -1 ;
72  }
73 
74  optval = 1 ;
75  setsockopt(dsc,SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, sizeof(optval)) ;
76  setsockopt(dsc,SOL_SOCKET, SO_REUSEADDR, (char *)&optval, sizeof(optval)) ;
77 
78  errno = 0 ;
79  if(connect(dsc,(struct sockaddr *)&me,size) < 0) {
80  LOG(CRIT,"connect() to %s, port %d failed [%s]",host,port,strerror(errno),0,0) ;
81  close(dsc) ;
82  return -1 ;
83  }
84 
85 
86 #if defined(__linux__) || defined(__APPLE__)
87  LOG(DBG,"Before fcntl") ;
88  errno = 0 ;
89  ret = fcntl(dsc,F_SETFL, O_NONBLOCK) ;
90  if(ret < 0) {
91  LOG(CRIT,"fcntl() failed [%s]",strerror(errno),0,0,0,0) ;
92  close(dsc) ;
93  return -1 ;
94  }
95 
96 
97 #else
98  LOG(DBG,"Before fcntl...",0,0,0,0,0) ;
99  int modes ;
100  errno = 0 ;
101  ret = fcntl(dsc,F_GETFL,&modes) ;
102  if(ret < 0) {
103  LOG(CRIT,"fcntl() failed [%s]",strerror(errno),0,0,0,0) ;
104  close(dsc) ;
105  return -1 ;
106  }
107 
108  LOG(DBG,"Before fcntl 0x%X",modes,0,0,0,0) ;
109  errno = 0 ;
110  ret = fcntl(dsc,F_SETFL, modes|O_NONBLOCK) ;
111  if(ret < 0) {
112  LOG(CRIT,"fcntl() failed [%s]",strerror(errno),0,0,0,0) ;
113  close(dsc) ;
114  return -1 ;
115  }
116 #endif
117 
118  return dsc ;
119 }
120 
121 /*
122  Send a message over the desc with a timeout and a locking
123  check.
124 
125  Returns: positive integer (number of bytes written)
126  negative integer
127  -1 error
128  -2 timeout (MSG_Q_TIMEOUT)
129 
130 */
131 
132 int msgNQSend(int dsc, char *what, int size, int timeout, int prio)
133 {
134  int ret ;
135  struct pollfd pollstruct ;
136 
137 
138  if(timeout == WAIT_FOREVER) timeout = 100000000 ;
139 
140  pollstruct.fd = dsc ;
141  pollstruct.events = POLLOUT ;
142 
143  // override size
144  size = 120 ;
145 
146 
147  for(;;) {
148  if(msgNQCheck(dsc)==0) {
149  LOG(ERR,"Task %d not there...",dsc,0,0,0,0) ;
150  return MSG_Q_NOTASK ;
151  }
152 
153  errno = 0 ;
154  ret = write(dsc,what,size) ;
155  if(ret < 0) {
156  if(errno == EAGAIN) {
157  if(timeout) {
158 
159  errno = 0 ;
160  ret = poll(&pollstruct,1,1000) ; // 1 sec
161 
162  if((timeout % 10) == 0) {
163  LOG(DBG,"Unable to send to task %d in 10 seconds...",dsc,0,0,0,0) ;
164  }
165 
166  timeout-- ;
167 
168  if(ret >= 0) { // timeout or OK - retry
169  continue ;
170  }
171 
172  if(errno == EINTR) { // a signal was caught
173  continue ; // retry ;
174  }
175  LOG(ERR,"poll() returned (%s)",strerror(errno),0,0,0,0) ;
176  return MSG_Q_ERROR ;
177 
178  }
179  else break ;
180  }
181  else {
182  LOG(ERR,"Can't write to task (%s)",strerror(errno),0,0,0,0) ;
183  return MSG_Q_ERROR ;
184  }
185  }
186  else break ; // written something - bytes in "ret"
187  } ;
188 
189  if(ret < 0) return MSG_Q_TIMEOUT ;
190 
191  if(ret != size) {
192  LOG(ERR,"Bad size (%d != ret %d) in task %d (%s)",120,ret,dsc,strerror(errno),prio) ;
193  return MSG_Q_ERROR ; // oops
194  }
195 
196  return size ;
197 }
198 
199 /*
200  Usual receive.
201 */
202 int msgNQReceive(int dsc, char *where, int size, int timeout)
203 {
204  int ret ;
205  struct pollfd pollstruct ;
206 // int i ;
207 
208 
209 
210  if(timeout < 0) timeout = 100000000 ;
211 
212  pollstruct.fd = dsc ;
213 #if defined(__linux__) || defined(__APPLE__)
214  pollstruct.events = POLLIN | POLLPRI ;
215 #else
216  pollstruct.events = POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI ;
217 #endif
218  // force size!
219  size = 120 ;
220 
221  for(;;) {
222  if(!msgNQCheck(dsc)) { // not locked?
223  LOG(ERR,"Task %d not there ...",dsc,0,0,0,0) ;
224  return MSG_Q_NOTASK ;
225  }
226 
227  LOG(DBG,"Before read %d",size,0,0,0,0) ;
228  errno = 0 ;
229  ret = read(dsc,where,size) ;
230  LOG(DBG,"After read %d, %d",ret,errno,0,0,0);
231 
232  if(ret < 0) {
233  if(errno == EAGAIN) { // non-blocking read
234  if(timeout) {
235 
236  errno = 0 ;
237  ret = poll(&pollstruct,1,1000) ;
238 
239  if((timeout % 10) == 0) {
240  LOG(DBG,"Unable to rcv. from task %d in 10 seconds...",dsc,0,0,0,0) ;
241  }
242 
243  timeout-- ;
244  if(ret >= 0) { // timeout or OK
245  continue ;
246  }
247 
248  if(errno == EINTR) { // a signal was caught
249  LOG(DBG,"Signal caught while in rcv. poll() from task %d...",dsc,0,0,0,0) ;
250  continue ; // retry ;
251  }
252  LOG(ERR,"poll() returned (%s)",strerror(errno),0,0,0,0) ;
253  return MSG_Q_ERROR ;
254 
255  }
256  else break ;
257 
258  }
259  else {
260  LOG(ERR,"Can't read from task %d (%s)",dsc,strerror(errno),0,0,0) ;
261  return MSG_Q_ERROR ;
262  }
263  }
264  break ;
265 
266  } ;
267 
268  if(ret < 0) return MSG_Q_TIMEOUT ; // timeout
269 
270  if(ret != size) {
271  LOG(ERR,"Read returned %d instead of %d - task %d",ret,size,dsc,0,0) ;
272  return MSG_Q_ERROR ;
273  }
274 
275 
276  return 120 ;
277 }
278 
279 
280 /*
281  Close the descriptor and mark it closed.
282 */
283 int msgNQDelete(int desc)
284 {
285 
286  close(desc) ;
287 
288  return 0 ;
289 }
290 
291 
292 /*
293  Check for the existance of the task on a previously open
294  desciptor.
295  Returns TRUE if the other end is still alive or 0 if it isn't
296 */
297 int msgNQCheck(int dsc)
298 {
299  int optval ;
300  int ret ;
301 #if defined(__linux__) || defined(__APPLE__)
302  socklen_t size;
303 #else
304  int size ;
305 #endif
306 
307  if(dsc < 0) {
308  LOG(WARN,"No such NQueue %d",dsc,0,0,0,0) ;
309  return 0 ;
310  }
311 
312  size = sizeof(optval) ;
313 
314  // do something to the socket, doesn't matter what...
315  ret = getsockopt(dsc, SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, &size) ;
316  if(ret < 0) {
317  LOG(ERR,"getsockopt() returned error for dsc %d [%s]",dsc,strerror(errno),0,0,0) ;
318  return 0 ; // dead
319  }
320 
321 
322  return 1 ; // OK
323 
324 }
325 
326 
327 
328