Stopped using obsolete -v option to paradynd
[dyninst.git] / pdutil / src / rpcUtil.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 /*
43  * $Log: rpcUtil.C,v $
44  * Revision 1.59  1998/03/01 03:24:43  ssuen
45  * Stopped using obsolete -v option to paradynd
46  *
47  * Revision 1.58  1998/02/02 23:12:07  wylie
48  * Conditional use of NT-specific WSAETIMEDOUT instead of ETIMEDOUT timeout error.
49  *
50  * Revision 1.57  1998/01/30 18:29:56  ssuen
51  * Fixed front-end memory leak.  Paradynds retry if connection with front-end
52  * fails.
53  *
54  * Revision 1.56  1997/10/23 16:02:08  nash
55  * Removed previous changes for Linux.  A better way to include the '-F' command
56  * line argument for 'rsh' is to put a script in the PATH.
57  *
58  * Revision 1.55  1997/10/17 00:26:59  nash
59  * Added "-F" option to execlp(rsh...) from rshCommand (temporarily?) for the Linux development machines to work.
60  *
61  * Revision 1.54  1997/06/16 18:45:31  tamches
62  * delete rpcBuffers[j];   to remove a memory leak
63  *
64  * Revision 1.53  1997/05/30 16:29:26  hseom
65  * fixed some memory-related bug
66  *
67  * Revision 1.52  1997/05/23 22:59:18  mjrg
68  * Windows NT port
69  *
70  * Revision 1.51  1997/05/20 15:18:57  lzheng
71  * Changes made to handle the comminication between different types of
72  * machines
73  *
74  * Revision 1.50  1997/05/17 20:01:30  lzheng
75  * Changes made for nonblocking write
76  *
77  * Revision 1.49  1997/04/21 16:57:56  hseom
78  * added support for trace data (in a byte array)
79  *
80  * Revision 1.48  1997/02/26 23:49:54  mjrg
81  * First part of WindowsNT commit: changes for compiling with VisualC++;
82  * moved includes to platform header files
83  *
84  * Revision 1.47  1997/01/21 20:09:59  mjrg
85  * Added support for unix domain sockets.
86  * Added getHostName function
87  *
88  * Revision 1.46  1997/01/16 20:52:21  tamches
89  * removed RPC_undo_arg_list (to paradynd)
90  *
91  * Revision 1.45  1996/11/26 16:09:37  naim
92  * Fixing asserts - naim
93  *
94  * Revision 1.44  1996/11/12 17:50:16  mjrg
95  * Removed warnings, changes for compiling with Visual C++ and xlc
96  *
97  * Revision 1.43  1996/08/16 21:32:02  tamches
98  * updated copyright for release 1.1
99  *
100  * Revision 1.42  1996/05/31 23:43:10  tamches
101  * removed pid from XDRrpc.  Modified handleRemoteConnect appropriately.
102  * Now paradynd doesn't try to send pid to paradyn (it wasn't being used
103  * and was one cause of paradyn UI freezing on UI startup).
104  *
105  * Revision 1.41  1995/11/22 00:06:20  mjrg
106  * Updates for paradyndPVM on solaris
107  * Fixed problem with wrong daemon getting connection to paradyn
108  * Removed -f and -t arguments to paradyn
109  * Added cleanUpAndExit to clean up and exit from pvm before we exit paradynd
110  * Fixed bug in my previous commit
111  *
112  * Revision 1.40  1995/11/12 00:44:28  newhall
113  * fix to execCmd: forked process closes it's copy of parent's file descriptors
114  *
115  * Revision 1.39  1995/08/24  15:14:29  hollings
116  * AIX/SP-2 port (including option for split instruction/data heaps)
117  * Tracing of rexec (correctly spawns a paradynd if needed)
118  * Added rtinst function to read getrusage stats (can now be used in metrics)
119  * Critical Path
120  * Improved Error reporting in MDL sematic checks
121  * Fixed MDL Function call statement
122  * Fixed bugs in TK usage (strings passed where UID expected)
123  *
124  * Revision 1.38  1995/05/18  11:12:15  markc
125  * Added flavor arg to RPC_undo_g_list
126  *
127 */
128
129 //
130 // This file defines a set of utility routines for RPC services.
131 //
132 //
133
134 // overcome malloc redefinition due to /usr/include/rpc/types.h declaring 
135 // malloc 
136 // This is ugly, and I hope to get rid of it -- mdc 2/2/95
137 #if defined(notdef)
138 /* prevents malloc from being redefined */
139 #ifdef MIPS
140 #define MALLOC_DEFINED_AS_VOID
141 #endif
142 #endif
143
144 #include <limits.h>
145 #include "util/h/rpcUtil.h"
146
147 const char *RSH_COMMAND="rsh";
148
149 int RPCdefaultXDRRead(const void* handle, char *buf, const u_int len)
150 {
151     int fd = (int) handle;
152     int ret;
153
154 #if defined(i386_unknown_nt4_0)
155     ret = recv(fd, buf, len, 0);
156 #else
157     do {
158         ret = P_read(fd, buf, len);
159     } while (ret < 0 && errno == EINTR);
160 #endif
161
162     if (ret <= 0) { return(-1); }
163     return (ret);
164 }
165
166 // one counter per file descriptor to record the sequnce no the message
167 // to be received
168 vector<int> counter_2;
169
170 // One partial message record for each file descriptor
171 vector<rpcBuffer *> partialMsgs;
172
173 int RPCasyncXDRRead(const void* handle, char *buf, const u_int len)
174 {
175     /* called when paradyn/xdr detects that it needs to read a message
176        from paradynd. */
177     int fd = (int) handle;
178     unsigned header;
179     int ret;
180     int needCopy = 0;
181     char *buffer = buf;
182     u_int internal_len = 0;
183     bool newfd = true;
184     unsigned i;
185
186     for (i = 0; i< partialMsgs.size(); i++) {
187         if (partialMsgs[i] -> fd == fd) {
188             newfd = false;
189             break;
190         }
191     }
192     
193     // If new connection, allocate a partial message record
194     if (newfd) {
195         rpcBuffer *partial = new rpcBuffer;
196         partial -> fd  = fd;
197         partial -> len = 0;
198         partial -> buf = NULL;
199         partialMsgs += partial;
200         i = partialMsgs.size() - 1;
201         
202         counter_2 += 0;
203     }
204
205     // There're two situations dealt with here: with partial message
206     // left by previous read, without. The 'len' field is zero if it is
207     // the first case, nonzero if it is the second case    
208     if (partialMsgs[i] -> len) needCopy = 1;
209
210     // Allocate buffer which is four more bytes than RPC buffer for the
211     // convenience of processing message headers.
212     buffer = new char[len + sizeof(int)];
213
214     if (needCopy) {
215         P_memcpy(buffer, partialMsgs[i]->buf , partialMsgs[i]->len); 
216         delete [] partialMsgs[i]->buf;
217         partialMsgs[i]->buf = NULL;
218     }
219
220 #if defined(i386_unknown_nt4_0)
221     ret = recv(fd, buffer+partialMsgs[i]->len,
222                len + sizeof(int) - partialMsgs[i]->len, 0);
223 #else
224     do {
225         ret = P_read(fd, buffer+partialMsgs[i]->len, 
226                      len + sizeof(int) -partialMsgs[i]->len);
227     } while (ret < 0 && errno == EINTR);
228 #endif
229
230     if (ret <= 0) { return(-1); }
231
232     ret += partialMsgs[i]->len;
233
234     char *pstart = buffer;
235     
236     // Processing the messages received: remove those message headers;
237     // check on deliminator and sequence no to ensure the correctness;
238     // save the partial message if any
239     char *tail = buffer;
240     int is_left = ret - sizeof(int);
241     while (is_left >= 0) {
242         P_memcpy((char *)&header, buffer, sizeof(int));
243 //printf(">> header=%x, header1=%x, len=%x\n", header, ntohl(header));
244         header = ntohl(header);
245         assert(0xf == ((header >> 12)&0xf));
246
247         short seq_no;
248         P_memcpy((char *)&(seq_no), buffer, sizeof(short));
249         seq_no = ntohs(seq_no);
250         header = (0x0fff&header);
251         
252         if (header <= is_left) {
253             P_memcpy(tail, buffer+sizeof(int), header);
254
255             counter_2[i] = ((counter_2[i]+1)%SHRT_MAX);
256             assert(seq_no == counter_2[i]);
257
258             internal_len += header;
259             if (internal_len > len) {
260                 abort();
261                 ret = ret - sizeof(int) - is_left;
262                 break;
263             }
264             tail += header;
265             buffer += header + sizeof(int);
266
267             is_left = is_left - header - sizeof(int);
268             ret -= sizeof(int);
269         } else {
270             ret = ret - sizeof(int) - is_left;
271             break;
272         }
273     }
274
275
276
277     if (is_left >= 0) {
278         
279         partialMsgs[i]->len = is_left + sizeof(int);
280         partialMsgs[i]->buf = new char[partialMsgs[i]->len+1];
281         P_memcpy(partialMsgs[i]->buf, buffer, partialMsgs[i]->len); 
282     } else {
283         partialMsgs[i]->len = 0;
284         assert(is_left == (0-(int)sizeof(int)));
285     }
286
287     // Copy back to internal RPC buffer
288     P_memcpy(buf, pstart, ret);
289
290 //  if (needCopy) {
291 //    buffer = pstart;
292 //    assert(buffer != buf); // make sure we aren't deleting the 2d param
293 //    delete [] buffer;
294 //  }
295
296     if (pstart != buf) delete [] pstart;
297     
298     return (ret);
299 }
300
301 int RPCdefaultXDRWrite(const void* handle, const char *buf, const u_int len)
302 {
303     int fd = (int) handle;
304     int ret;
305
306 #if defined(i386_unknown_nt4_0)
307     ret = send(fd, buf, len, 0);
308 #else
309     do {
310
311         ret = P_write(fd, buf, len);
312     } while (ret < 0 && errno == EINTR);
313 #endif
314
315     errno = 0;
316     if (ret != (int)len)
317       return(-1);
318     else 
319       return (ret);
320 }
321
322 vector<rpcBuffer *> rpcBuffers;
323 static short counter = 0;
324
325 int RPCasyncXDRWrite(const void* handle, const char *buf, const u_int len)
326 {
327     int ret;
328     int index = 0;
329     unsigned header = len;
330
331     rpcBuffer *rb = new rpcBuffer;
332     rb -> fd = (int) handle;
333     rb -> len = len+sizeof(int);
334     rb -> buf = new char[rb -> len];
335
336     counter = ((counter+1)%SHRT_MAX);
337
338     assert(len <= 4040);
339
340     // Adding a header to the messages sent   
341     header = ((counter << 16) | header | 0xf000); 
342
343     // Converting to network order
344     header = htonl(header);
345 //printf(">> header=%x, counter=%x, len=%x\n", header, counter, len);
346     
347     P_memcpy(rb -> buf, (char *)&header, sizeof(int));
348     P_memcpy(rb -> buf+sizeof(int), buf, len);
349
350     rpcBuffers += rb;
351
352     // Write the items to the other end if possible with asynchrous write
353     for (int i = 0; (i < (int)rpcBuffers.size()); i++) {
354
355 #if defined(i386_unknown_nt4_0)
356         u_long ioctlsock_arg = 1; // set non-blocking
357         if (ioctlsocket(rpcBuffers[i]->fd, FIONBIO, &ioctlsock_arg) == -1)
358             perror("ioctlsocket");
359
360         ret = (int) send(rpcBuffers[i]->fd, rpcBuffers[i]->buf, 
361                          rpcBuffers[i]->len, 0);
362
363         ioctlsock_arg = 0; // set blocking
364         if (ioctlsocket(rpcBuffers[i]->fd, FIONBIO, &ioctlsock_arg) == -1)
365             perror("ioctlsocket");
366 #else
367         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FNONBLOCK) == -1)
368             perror("fcntl");
369
370         ret = (int)P_write(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
371                            rpcBuffers[i]->len);
372
373         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FSYNC) == -1)
374             perror("fcntl");
375 #endif
376
377         if (rpcBuffers[i]->len != ret) {
378
379             if (ret != -1) {
380                 assert(ret < rpcBuffers[i]->len);
381                 printf("Warning: a partial message sent!\n");
382                 P_memcpy(rpcBuffers[i]->buf, rpcBuffers[i]->buf+ret,
383                          rpcBuffers[i]->len-ret);
384                 rpcBuffers[i]->len -= ret;
385             }
386             break;
387         }
388         index++;
389         
390     }
391     
392     // Delete the items sent out
393     unsigned j;
394     for (j = 0; j < (unsigned)index; j++) {
395         delete [] rpcBuffers[j]->buf;
396         delete rpcBuffers[j];
397         rpcBuffers[j] = NULL; // probably unnecessary
398     }    
399     
400     for (j = 0; j < rpcBuffers.size() - index; j++)
401         rpcBuffers[j] = rpcBuffers[j+index];
402     rpcBuffers.resize(rpcBuffers.size() - index);
403
404     return (ret = (int)len);
405 }
406
407 void
408 doDeferedRPCasyncXDRWrite() {
409
410     if (rpcBuffers.size() == 0)
411         return;
412
413     int ret, index = 0;
414
415     // Write the items to the other end if possible 
416     for (int i = 0; (i < (int)rpcBuffers.size()); i++) {
417
418 #if defined (i386_unknown_nt4_0)
419         u_long ioctlsock_arg = 1; // set non-blocking
420         if (ioctlsocket(rpcBuffers[i]->fd, FIONBIO, &ioctlsock_arg) == -1)
421             perror("ioctlsocket");
422
423         ret = (int) send(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
424                          rpcBuffers[i]->len, 0);
425
426         ioctlsock_arg = 0; // set blocking
427         if (ioctlsocket(rpcBuffers[i]->fd, FIONBIO, &ioctlsock_arg) == -1)
428             perror("ioctlsocket");
429 #else
430         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FNONBLOCK) == -1)
431             perror("fcntl");
432
433         ret = (int)P_write(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
434                            rpcBuffers[i]->len);
435
436         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FSYNC) == -1)
437             perror("fcntl");
438 #endif
439         
440         if (rpcBuffers[i]->len != ret) {
441
442             if (ret != -1) {
443                 assert(ret < rpcBuffers[i]->len);
444                 printf("Warning: a partial message sent!\n");
445                 P_memcpy(rpcBuffers[i]->buf, rpcBuffers[i]->buf+ret,
446                          rpcBuffers[i]->len-ret);
447                 rpcBuffers[i]->len -= ret;
448             }
449             break;
450         }
451         index++;
452
453     }
454     
455     // Delete the items sent out
456     unsigned j;
457     for (j = 0; j < (unsigned)index; j++) {
458         delete [] rpcBuffers[j]->buf;
459     }    
460
461     for (j = 0; j < rpcBuffers.size() - index; j++)
462         rpcBuffers[j] = rpcBuffers[j+index];
463     rpcBuffers.resize(rpcBuffers.size() - index);
464 }
465
466 XDRrpc::~XDRrpc()
467 {
468   if (fd >= 0)
469     {
470 #if !defined(i386_unknown_nt4_0)
471       P_fcntl (fd, F_SETFL, FNDELAY);
472 #endif
473       P_close(fd);
474       fd = -1;
475     }
476   if (xdrs) 
477     {
478       P_xdr_destroy (xdrs);
479       delete (xdrs);
480       xdrs = NULL;
481     }
482 }
483
484 //
485 // prepare for RPC's to be done/received on the passed fd.
486 //
487 XDRrpc::XDRrpc(const int f, xdr_rd_func readRoutine, xdr_wr_func writeRoutine, const int nblock)
488 : xdrs(NULL), fd(f)
489 {
490     assert(fd >= 0);
491     xdrs = new XDR;
492     assert(xdrs);
493     if (!readRoutine) {
494         if (nblock == 1) {
495             readRoutine = (xdr_rd_func) RPCasyncXDRRead;
496         } else { 
497             readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
498         }
499     }   
500     if (!writeRoutine) {
501         if (nblock == 2) {
502             writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
503         } else {    
504             writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
505         }
506     }   
507     P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine, writeRoutine);
508 }
509
510 //
511 // prepare for RPC's to be done/received on the passed fd.
512 //
513 XDRrpc::XDRrpc(const string &machine,
514                const string &user,
515                const string &program,
516                xdr_rd_func readRoutine, 
517                xdr_wr_func writeRoutine,
518                const vector<string> &arg_list,
519                const int nblock,
520                const int wellKnownPortFd)
521 : xdrs(NULL), fd(-1)
522 {
523     fd = RPCprocessCreate(machine, user, program, arg_list, wellKnownPortFd);
524     if (fd >= 0) {
525         xdrs = new XDR;
526         if (!readRoutine) {
527             if (nblock == 1) {
528                 readRoutine = (xdr_rd_func) RPCasyncXDRRead;
529             } else { 
530                 readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
531             }
532         }
533         if (!writeRoutine) {
534             if (nblock == 2) {
535                 writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
536             } else { 
537                 writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
538             }
539         }
540         P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine, writeRoutine);
541     }
542 }
543
544 bool
545 RPC_readReady (int fd, int timeout)
546 {
547   fd_set readfds;
548   struct timeval tvptr, *the_tv;
549
550   tvptr.tv_sec = timeout; tvptr.tv_usec = 0;
551   if (fd < 0) return false;
552   FD_ZERO(&readfds);
553   FD_SET (fd, &readfds);
554
555   // -1 timeout = blocking select
556   if (timeout == -1)
557      the_tv = 0;
558   else
559      the_tv = &tvptr;
560  
561   if (P_select (fd+1, &readfds, NULL, NULL, the_tv) == -1)
562     {
563       // if (errno == EBADF)
564         return false;
565     }
566   return (FD_ISSET (fd, &readfds));
567 }
568
569 /*
570  * Build an argument list starting at position 0.
571  * Note, this arg list will be used in an exec system call
572  * AND, the command name will have to be inserted at the head of the list
573  * But, a NULL space will NOT be left at the head of the list
574  */
575 bool RPC_make_arg_list(vector<string> &list,
576                        const int well_known_socket, const int flag,
577                        const int /* firstPVM */,
578                        const string machine_name, const bool use_machine)
579 {
580   char arg_str[100];
581
582   list.resize(0);
583
584   sprintf(arg_str, "%s%d", "-p", well_known_socket);  
585   list += arg_str;
586   // arg_list[arg_count++] = strdup (arg_str);  // 0
587
588   if (!use_machine) {
589     list += string("-m") + getHostName();
590   } else {
591     list += string("-m") + machine_name;
592   }
593   // arg_list[arg_count++] = strdup (arg_str); // 3
594
595   sprintf(arg_str, "%s%d", "-l", flag);
596   list += arg_str;
597   //arg_list[arg_count++] = strdup (arg_str);  // 4
598
599   //-v option to paradynd is obsolete
600   //sprintf(arg_str, "%s%d", "-v", firstPVM);
601   //list += arg_str;
602   // arg_list[arg_count++] = strdup (arg_str);  // 5 
603
604   return true;
605 }
606
607 // returns fd of socket that is listened on, or -1
608 // (actually, looks like it returns the port number listened on, or -1)
609 int
610 RPC_setup_socket (int &sfd,   // return file descriptor
611                   const int family, // AF_INET ...
612                   const int type)   // SOCK_STREAM ...
613 {
614   if ((sfd = P_socket(family, type, 0)) < 0)
615     return -1;
616
617   struct sockaddr_in serv_addr;
618   P_memset ((void*) &serv_addr, 0, sizeof(serv_addr));
619   serv_addr.sin_family = (short) family;
620   serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
621   serv_addr.sin_port = htons(0);
622   
623   size_t length = sizeof(serv_addr);
624
625   if (P_bind(sfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
626     return -1;
627
628   if (P_getsockname (sfd, (struct sockaddr *) &serv_addr, &length) < 0)
629     return -1;
630
631   if (P_listen(sfd, 128) < 0)  //Be prepared for lots of simultaneous connects
632     return -1;
633
634   return (ntohs (serv_addr.sin_port));
635 }
636
637 // setup a AF_UNIX domain socket of type SOCK_STREAM
638 bool
639 RPC_setup_socket_un (int &sfd,   // return file descriptor
640                      const char *path) // file path socket is bound to
641 {
642 #if defined(i386_unknown_nt4_0)
643   assert(0); // no unix domain sockets on Windows NT
644 #else
645   struct sockaddr_un saddr;
646
647   if ((sfd = P_socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
648     return false;
649
650   saddr.sun_family = AF_UNIX;
651   P_strcpy(saddr.sun_path, path);
652   
653   if (P_bind(sfd, (struct sockaddr *) &saddr, sizeof(saddr)) < 0)
654     return false;
655
656   if (P_listen(sfd, 128) < 0)  //Be prepared for lots of simultaneous connects
657     return false;
658 #endif
659
660   return true;
661 }
662
663 //
664 // connect to well known socket
665 //
666 XDRrpc::XDRrpc(int family,            
667                int req_port,             
668                int type,
669                const string machine, 
670                xdr_rd_func readRoutine,
671                xdr_wr_func writeRoutine,
672                const int nblock) : xdrs(NULL), fd(-1)
673      // socket, connect using machine
674 {
675   struct sockaddr_in serv_addr;
676   struct hostent *hostptr = 0;
677   struct in_addr *inadr = 0;
678   if (!(hostptr = P_gethostbyname(machine.string_of())))
679     return;
680
681   inadr = (struct in_addr *) ((void*) hostptr->h_addr_list[0]);
682   P_memset ((void*) &serv_addr, 0, sizeof(serv_addr));
683   serv_addr.sin_family = family;
684   serv_addr.sin_addr = *inadr;
685   serv_addr.sin_port = htons(req_port);
686
687   if ( (fd = P_socket(family, type, 0)) < 0)
688     { fd = -1; return; }
689
690   //connect() may timeout if lots of Paradynd's are trying to connect to
691   //  Paradyn at the same time, so we keep retrying the connect().
692   errno = 0;
693   while (P_connect(fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
694 #if defined(i386_unknown_nt4_0)
695     if (errno != WSAETIMEDOUT)
696 #else
697     if (errno != ETIMEDOUT)
698 #endif
699         { fd = -1; return; } 
700     close(fd);
701     if ((fd = P_socket(family, type, 0)) < 0) { fd = -1; return; }
702     errno = 0;
703   }
704
705     xdrs = new XDR;
706     assert(xdrs);
707     if (!readRoutine) {
708         if (nblock == 1) {
709             readRoutine = (xdr_rd_func) RPCasyncXDRRead;
710         } else {  
711             readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
712         }
713         }
714     if (!writeRoutine) {
715         if (nblock == 2) {
716             writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
717         } else { 
718             writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
719         }
720     }
721     P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine,writeRoutine);
722
723
724 //
725 // These xdr functions are not to be called with XDR_FREE
726 //
727 bool_t xdr_Boolean(XDR *xdrs, bool *bool_val) {
728    u_char i;
729    bool_t res;
730    switch (xdrs->x_op) {
731    case XDR_ENCODE:
732       i = (u_char) *bool_val;
733       return (P_xdr_u_char(xdrs, &i));
734    case XDR_DECODE:
735       res = P_xdr_u_char(xdrs, &i);
736       *bool_val = (bool) i;
737       return res;
738    case XDR_FREE:
739    default:
740       assert(0);
741       return FALSE;
742    }
743 }
744
745 // XDR_FREE not handled, free it yourself!
746 // our version of string encoding that does malloc as needed.
747 //
748 bool_t xdr_string_pd(XDR *xdrs, string *str)
749 {
750   unsigned int length = 0;
751   assert(str);
752
753   // if XDR_FREE, str's memory is freed
754   switch (xdrs->x_op) {
755   case XDR_ENCODE:
756     if ((length = str->length())) {
757       if (!P_xdr_u_int(xdrs, &length))
758         return FALSE;
759       else {
760         char *buffer = (char*) str->string_of(); 
761         bool_t res = P_xdr_string (xdrs, &buffer, str->length() + 1);
762         return res;
763       }
764     } else {
765       return (P_xdr_u_int(xdrs, &length));
766     }
767   case XDR_DECODE:
768     if (!P_xdr_u_int(xdrs, &length))
769       return FALSE;
770      else if (!length) {
771        *str = (char*) NULL;
772        return TRUE;
773      } else {
774        char *temp; 
775        bool newd = false;
776        unsigned max_len = 511;
777        char stat_buf[512];
778        if (length < 512) 
779           temp = (char*) stat_buf;
780        else {
781           temp = new char[length+1];
782           max_len = length;
783           newd = true;
784        }      
785        if (!P_xdr_string (xdrs, &temp, max_len)) {
786           if (newd) 
787             delete temp;
788           return FALSE;
789        } else {
790           *str = temp;
791           if (newd)
792             delete temp; 
793           return TRUE;
794        }
795      }
796   case XDR_FREE:
797   default:
798     // this should never occur  
799     assert(0);
800     return (FALSE);
801   }
802 }
803
804 // trace data streams
805 // XDR_FREE not handled, free it yourself!
806 // our version of string encoding that does malloc as needed.
807 //
808
809 bool_t xdr_byteArray_pd(XDR *xdrs, byteArray *bArray)
810 {
811   unsigned int length = 0;
812   assert(bArray);
813
814   // if XDR_FREE, str's memory is freed
815   switch (xdrs->x_op) {
816   case XDR_ENCODE:
817     if (length = bArray->length()) {
818       if (!P_xdr_u_int(xdrs, &length))
819         return FALSE;
820       else {
821         char *buffer = (char*) bArray->getArray();
822         bool_t res = P_xdr_byteArray (xdrs, &buffer, &length, (bArray->length()));
823         return res;
824       }
825     } else {
826       return (P_xdr_u_int(xdrs, &length));
827     }
828   case XDR_DECODE:
829     if (!P_xdr_u_int(xdrs, &length))
830       return FALSE;
831      else if (!length) {
832        *bArray = byteArray( (char *)NULL , 0);
833        return TRUE;
834      } else {
835        char *temp;
836        unsigned int act_len;
837        unsigned int max_len = length;
838        temp = new char[length];
839        if (!P_xdr_byteArray (xdrs, &temp, &act_len, max_len)) {
840           delete [] temp;
841           return FALSE;
842        } else if (act_len != length) {
843           delete [] temp;
844           return FALSE;
845        } else {
846           *bArray = byteArray(temp,act_len);
847           delete [] temp;
848           return TRUE;
849        }
850      }
851   case XDR_FREE:
852   default:
853     // this should never occur
854     assert(0);
855     return (FALSE);
856   }
857 }
858
859 //
860 // directly exec the command (local).
861 //
862
863 int execCmd(const string command, const vector<string> &arg_list, int /*portFd*/)
864 {
865 #if defined(i386_unknown_nt4_0)
866   // TODO
867   assert(0);
868   return 0;
869 #else
870   int ret;
871   int sv[2];
872   int execlERROR;
873
874   errno = 0;
875   ret = P_socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
876   if (ret==-1) return(ret);
877   execlERROR = 0;
878   int al_len = arg_list.size();
879   char **new_al = new char*[al_len+2];
880   // TODO argv[0] should not include path
881   new_al[0] = P_strdup(command.string_of());
882   new_al[al_len+1] = NULL;
883   for (int i=0; i<al_len; ++i)
884     new_al[i+1] = P_strdup(arg_list[i].string_of());
885   ret = -1;
886
887   int pid;
888   pid = vfork();
889   // close sv[0] after exec 
890   P_fcntl(sv[0],F_SETFD,1);  
891
892   if (pid == 0) {
893     if (P_close(sv[0])) { execlERROR = errno; _exit(-1);}
894     if (P_dup2(sv[1], 0)) { execlERROR = errno; _exit(-1);}
895     P_execvp(new_al[0], new_al);
896     execlERROR = errno;
897     _exit(-1);
898   } else if (pid > 0 && !execlERROR) {
899     P_close(sv[1]);
900     ret = sv[0];
901   }
902
903   al_len=0;
904   while (new_al[al_len]) {
905     delete (new_al[al_len]);
906     al_len++;
907   }
908   delete(new_al);
909   return ret;
910 #endif
911 }
912
913 int handleRemoteConnect(int fd, int portFd) {
914     // NOTE: This routine is not generic; it is very specific to paradyn
915     // (due to the sscanf(line, "PARADYND %d")...)
916     // Hence it makes absolutely no sense to put it in a so-called library.
917     // It should be put into the paradyn/src tree --ari
918
919     FILE *pfp = P_fdopen(fd, "r");
920     if (pfp == NULL) {
921        cerr << "handleRemoteConnect: fdopen of fd " << fd << " failed." << endl;
922        return -1;
923     }
924
925     int retFd = RPC_getConnect(portFd); // calls accept(), which blocks (sigh...)
926     return retFd;
927 }
928
929 //
930 // use rsh to get a remote process started.
931 //
932 // We do an rsh to start a process and them wait for the process 
933 // to do a connect. There is no guarantee that the process we are waiting for is
934 // the one that gets the connection. This can happen with paradyndPVM: the
935 // daemon that is started by a rshCommand will start other daemons, and one of 
936 // these daemons may get the connection that should be for the first daemon.
937 // Daemons should always get a connection before attempting to start other daemons.
938 //
939
940 int rshCommand(const string hostName, const string userName, 
941                const string command, const vector<string> &arg_list, int portFd)
942 {
943 #if defined(i386_unknown_nt4_0)
944     // TODO
945     assert(0);
946     return 0;
947 #else
948     int fd[2];
949     int shellPid;
950     int ret;
951
952     int total = command.length() + 2;
953     for (unsigned i=0; i<arg_list.size(); i++) {
954       total += arg_list[i].length() + 2;
955     }
956
957     string paradyndCommand = command + " ";
958
959     for (unsigned j=0; j < arg_list.size(); j++)
960         paradyndCommand += arg_list[j] + " ";
961
962     // need to rsh to machine and setup io path.
963
964     if (pipe(fd)) {
965         perror("pipe");
966         return (-1);
967     }
968
969     shellPid = vfork();
970     if (shellPid == 0) {
971         /* child */
972         bool aflag;
973         aflag=(-1 != dup2(fd[1], 1)); /* copy it onto stdout */
974         assert(aflag);
975         aflag=(-1 != close(fd[0]));
976         assert(aflag);
977         aflag=(-1 != close(fd[1]));
978         assert(aflag);
979         if (userName.length()) {
980             ret = execlp(RSH_COMMAND, RSH_COMMAND, hostName.string_of(), "-l", 
981                          userName.string_of(), "-n", paradyndCommand.string_of(),
982                          "-l0", NULL);
983             fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
984         } else {
985             ret = execlp(RSH_COMMAND, RSH_COMMAND, hostName.string_of(), "-n", 
986                          paradyndCommand.string_of(), "-l0", NULL);
987             fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
988         }
989         _exit(-1);
990     } else if (shellPid > 0) {
991         close(fd[1]);
992     } else {
993         // error situation
994     }
995
996     return(handleRemoteConnect(fd[0], portFd));
997 #endif
998 }
999
1000 int rexecCommand(const string hostName, const string userName, 
1001                  const string command, const vector<string> &arg_list, int portFd)
1002 {
1003 #if defined(i386_unknown_nt4_0)
1004     // TODO
1005     assert(0);
1006     return 0;
1007 #else
1008     struct servent *inport;
1009
1010     int total = command.length() + 2;
1011     for (unsigned i=0; i<arg_list.size(); i++) {
1012       total += arg_list[i].length() + 2;
1013     }
1014
1015     char *paradyndCommand = new char[total+2];
1016     assert(paradyndCommand);
1017
1018     sprintf(paradyndCommand, "%s ", command.string_of());
1019     for (unsigned j=0; j<arg_list.size(); j++) {
1020         P_strcat(paradyndCommand, arg_list[j].string_of());
1021         P_strcat(paradyndCommand, " ");
1022     }
1023
1024     inport = P_getservbyname("exec",  "tcp");
1025
1026     char *hname = P_strdup(hostName.string_of());
1027     char *uname = P_strdup(userName.string_of());
1028     int fd = P_rexec(&hname, inport->s_port, uname, NULL,
1029                    paradyndCommand, NULL);
1030     delete hname; delete uname;
1031
1032     if (fd < 0) {
1033         perror("rexec");
1034         printf("rexec failed\n");
1035     }
1036     if (paradyndCommand)
1037       delete paradyndCommand;
1038
1039     return(handleRemoteConnect(fd, portFd));
1040 #endif
1041 }
1042
1043 /*
1044  * 
1045  * "command" will be appended to the front of the arg list
1046  *
1047  * if arg_list == NULL, command is the only argument used
1048  */
1049
1050 int RPCprocessCreate(const string hostName, const string userName,
1051                      const string command, const vector<string> &arg_list,
1052                      int portFd, const bool useRexec)
1053 {
1054     int ret;
1055
1056     if ((hostName == "") || 
1057         (hostName == "localhost") ||
1058         (hostName == getHostName()))
1059       ret = execCmd(command, arg_list, portFd);
1060     else if (useRexec)
1061       ret = rexecCommand(hostName, userName, command, arg_list, portFd);
1062     else
1063       ret = rshCommand(hostName, userName, command, arg_list, portFd);
1064
1065     return(ret);
1066 }
1067
1068 int RPC_getConnect(const int fd) {
1069   if (fd == -1)
1070     return -1;
1071
1072   struct sockaddr cli_addr;
1073   size_t clilen = sizeof(cli_addr);
1074   errno = 0;
1075   int new_fd = P_accept (fd, (struct sockaddr *) &cli_addr, &clilen);
1076
1077   if (new_fd < 0) {
1078     if (errno == EMFILE) {
1079       cerr << "Cannot accept more connections:  Too many open files" << endl;
1080       cerr << "Please see your documentation for `ulimit'" << endl << flush;
1081     }
1082     return -1;
1083   }
1084   else
1085     return new_fd;
1086 }
1087
1088 // TODO -- use vectors and strings ?
1089 /*
1090  *  RPCgetArg - break a string into blank separated words
1091  *  Used to parse a command line.
1092  *  
1093  *  input --> a null terminated string, the command line
1094  *  argc --> returns the number of args found
1095  *  returns --> words (separated by blanks on command line)
1096  *  Note --> this allocates memory 
1097  */
1098 bool RPCgetArg(vector<string> &arg, const char *input)
1099 {
1100 #define BLANK ' '
1101
1102   int word_len;
1103   if (!input) 
1104     return false;
1105
1106   int length = strlen(input);
1107   int index = 0;
1108
1109   /* advance past blanks in input */
1110   while ((index < length) && (input[index] == BLANK))
1111     index++;
1112
1113   /* input is all blanks, or NULL */
1114   if (index >= length) 
1115     return true;
1116
1117   do {
1118     /* the start of each string */
1119     word_len = 0; const char *word_start = input + index;
1120
1121     /* find the next BLANK and the WORD size */
1122     while ((index < length) && (input[index] != BLANK)) {
1123       index++; word_len++;
1124     }
1125
1126     /* copy the word */
1127     char *temp = new char[word_len+1];
1128     strncpy(temp, word_start, word_len);
1129     temp[word_len] = (char) 0;
1130     arg += temp;
1131     delete temp;
1132
1133     /* skip past consecutive blanks */
1134     while ((index < length) && (input[index] == BLANK))
1135       index++;
1136   } while (index < length);
1137   return true;
1138 }
1139
1140
1141 string getHostName() {
1142 #if defined(i386_unknown_nt4_0)
1143     char nameBuf[1000];
1144     gethostname(nameBuf,999);
1145     return string(nameBuf);
1146 #else
1147     struct utsname un;
1148     P_uname(&un);
1149     return string(un.nodename);
1150 #endif
1151 }