Changes made for nonblocking write
[dyninst.git] / pdutil / src / rpcUtil.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 /*
43  * $Log: rpcUtil.C,v $
44  * Revision 1.50  1997/05/17 20:01:30  lzheng
45  * Changes made for nonblocking write
46  *
47  * Revision 1.49  1997/04/21 16:57:56  hseom
48  * added support for trace data (in a byte array)
49  *
50  * Revision 1.48  1997/02/26 23:49:54  mjrg
51  * First part of WindowsNT commit: changes for compiling with VisualC++;
52  * moved includes to platform header files
53  *
54  * Revision 1.47  1997/01/21 20:09:59  mjrg
55  * Added support for unix domain sockets.
56  * Added getHostName function
57  *
58  * Revision 1.46  1997/01/16 20:52:21  tamches
59  * removed RPC_undo_arg_list (to paradynd)
60  *
61  * Revision 1.45  1996/11/26 16:09:37  naim
62  * Fixing asserts - naim
63  *
64  * Revision 1.44  1996/11/12 17:50:16  mjrg
65  * Removed warnings, changes for compiling with Visual C++ and xlc
66  *
67  * Revision 1.43  1996/08/16 21:32:02  tamches
68  * updated copyright for release 1.1
69  *
70  * Revision 1.42  1996/05/31 23:43:10  tamches
71  * removed pid from XDRrpc.  Modified handleRemoteConnect appropriately.
72  * Now paradynd doesn't try to send pid to paradyn (it wasn't being used
73  * and was one cause of paradyn UI freezing on UI startup).
74  *
75  * Revision 1.41  1995/11/22 00:06:20  mjrg
76  * Updates for paradyndPVM on solaris
77  * Fixed problem with wrong daemon getting connection to paradyn
78  * Removed -f and -t arguments to paradyn
79  * Added cleanUpAndExit to clean up and exit from pvm before we exit paradynd
80  * Fixed bug in my previous commit
81  *
82  * Revision 1.40  1995/11/12 00:44:28  newhall
83  * fix to execCmd: forked process closes it's copy of parent's file descriptors
84  *
85  * Revision 1.39  1995/08/24  15:14:29  hollings
86  * AIX/SP-2 port (including option for split instruction/data heaps)
87  * Tracing of rexec (correctly spawns a paradynd if needed)
88  * Added rtinst function to read getrusage stats (can now be used in metrics)
89  * Critical Path
90  * Improved Error reporting in MDL sematic checks
91  * Fixed MDL Function call statement
92  * Fixed bugs in TK usage (strings passed where UID expected)
93  *
94  * Revision 1.38  1995/05/18  11:12:15  markc
95  * Added flavor arg to RPC_undo_g_list
96  *
97 */
98
99 //
100 // This file defines a set of utility routines for RPC services.
101 //
102 //
103
104 // overcome malloc redefinition due to /usr/include/rpc/types.h declaring 
105 // malloc 
106 // This is ugly, and I hope to get rid of it -- mdc 2/2/95
107 #if defined(notdef)
108 /* prevents malloc from being redefined */
109 #ifdef MIPS
110 #define MALLOC_DEFINED_AS_VOID
111 #endif
112 #endif
113
114 #include <limits.h>
115 #include "util/h/rpcUtil.h"
116
117 const char *RSH_COMMAND="rsh";
118
119 int RPCdefaultXDRRead(const void* handle, char *buf, const u_int len)
120 {
121     int fd = (int) handle;
122     int ret;
123
124     do {
125         ret = P_read(fd, buf, len);
126     } while (ret < 0 && errno == EINTR);
127
128     if (ret <= 0) { return(-1); }
129     return (ret);
130 }
131
132 // one counter per file descriptor to record the sequnce no the message
133 // to be received
134 vector<int> counter_2;
135
136 // One partial message record for each file descriptor
137 vector<rpcBuffer *> partialMsgs;
138
139 int RPCasyncXDRRead(const void* handle, char *buf, const u_int len)
140 {
141     /* called when paradyn/xdr detects that it needs to read a message
142        from paradynd. */
143     int fd = (int) handle;
144     int header;
145     int ret;
146     int needCopy = 0;
147     char *buffer = buf;
148     u_int internal_len = 0;
149     bool newfd = true;
150     unsigned i;
151
152     for (i = 0; i< partialMsgs.size(); i++) {
153         if (partialMsgs[i] -> fd == fd) {
154             newfd = false;
155             break;
156         }
157     }
158     
159     // If new connection, allocate a partial message record
160     if (newfd) {
161         rpcBuffer *partial = new rpcBuffer;
162         partial -> fd  = fd;
163         partial -> len = 0;
164         partial -> buf = NULL;
165         partialMsgs += partial;
166         i = partialMsgs.size() - 1;
167         
168         counter_2 += 0;
169     }
170
171     // There're two situations dealt with here: with partial message
172     // left by previous read, without. The 'len' field is zero if it is
173     // the first case, nonzero if it is the second case    
174     if (partialMsgs[i] -> len) needCopy = 1;
175
176     // Allocate buffer which is four more bytes than RPC buffer for the
177     // convenience of processing message headers.
178     buffer = new char[len + sizeof(int)];
179
180     if (needCopy) {
181         P_memcpy(buffer, partialMsgs[i]->buf , partialMsgs[i]->len); 
182         delete [] partialMsgs[i]->buf;
183         partialMsgs[i]->buf = NULL;
184     }
185
186     do {
187         ret = P_read(fd, buffer+partialMsgs[i]->len, 
188                      len + sizeof(int) -partialMsgs[i]->len);
189     } while (ret < 0 && errno == EINTR);
190
191     if (ret <= 0) { return(-1); }
192
193     ret += partialMsgs[i]->len;
194
195     char *pstart = buffer;
196     
197     // Processing the messages received: remove those message headers;
198     // check on deliminator and sequence no to ensure the correctness;
199     // save the partial message if any
200     char *tail = buffer;
201     int is_left = ret - sizeof(int);
202     while (is_left >= 0) {
203         P_memcpy((char *)&header, buffer, sizeof(int));
204         assert(0xf == ((header >> 12)&0xf));
205
206         short seq_no;
207         P_memcpy((char *)&(seq_no), buffer, sizeof(short));
208
209         header = (0x0fff&header);
210         
211         if (header <= is_left) {
212             P_memcpy(tail, buffer+sizeof(int), header);
213
214             counter_2[i] = ((counter_2[i]+1)%SHRT_MAX);
215             assert(seq_no == counter_2[i]);
216
217             internal_len += header;
218             if (internal_len > len) {
219                 abort();
220                 ret = ret - sizeof(int) - is_left;
221                 break;
222             }
223             tail += header;
224             buffer += header + sizeof(int);
225
226             is_left = is_left - header - sizeof(int);
227             ret -= sizeof(int);
228         } else {
229             ret = ret - sizeof(int) - is_left;
230             break;
231         }
232     }
233
234
235
236     if (is_left >= 0) {
237         
238         partialMsgs[i]->len = is_left + sizeof(int);
239         partialMsgs[i]->buf = new char[partialMsgs[i]->len+1];
240         P_memcpy(partialMsgs[i]->buf, buffer, partialMsgs[i]->len); 
241     } else {
242         partialMsgs[i]->len = 0;
243         assert(is_left == (0-(int)sizeof(int)));
244     }
245
246     // Copy back to internal RPC buffer
247     P_memcpy(buf, pstart, ret);
248
249     if (needCopy) {
250         buffer = pstart;
251         assert(buffer != buf); // make sure we aren't deleting the 2d param
252         delete [] buffer;
253     }
254     
255     return (ret);
256 }
257
258 int RPCdefaultXDRWrite(const void* handle, const char *buf, const u_int len)
259 {
260     int fd = (int) handle;
261     int ret;
262
263     do {
264
265         ret = P_write(fd, buf, len);
266     } while (ret < 0 && errno == EINTR);
267
268     errno = 0;
269     if (ret != (int)len)
270       return(-1);
271     else 
272       return (ret);
273 }
274
275 vector<rpcBuffer *> rpcBuffers;
276 static short counter = 0;
277
278 int RPCasyncXDRWrite(const void* handle, const char *buf, const u_int len)
279 {
280     int ret;
281     int index = 0;
282     int header = len;
283
284     rpcBuffer *rb = new rpcBuffer;
285     rb -> fd = (int) handle;
286     rb -> len = len+sizeof(int);
287     rb -> buf = new char[rb -> len];
288
289     counter = ((counter+1)%SHRT_MAX);
290
291     assert(len <= 4040);
292
293     // Adding a header to the messages sent   
294     header = ((counter << 16) | header | 0xf000); 
295     P_memcpy(rb -> buf, (char *)&header, sizeof(int));
296
297     P_memcpy(rb -> buf+sizeof(int), buf, len);
298
299     rpcBuffers += rb;
300
301     // Write the items to the other end if possible with asynchrous write
302     for (int i = 0; (i < (int)rpcBuffers.size()); i++) {
303
304         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FNONBLOCK) == -1)
305             perror("fcntl");
306
307         ret = (int)P_write(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
308                            rpcBuffers[i]->len);
309
310         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FSYNC) == -1)
311             perror("fcntl");
312
313         if (rpcBuffers[i]->len != ret) {
314
315             if (ret != -1) {
316                 assert(ret < rpcBuffers[i]->len);
317                 printf("Warning: a partial message sent!\n");
318                 P_memcpy(rpcBuffers[i]->buf, rpcBuffers[i]->buf+ret,
319                          rpcBuffers[i]->len-ret);
320                 rpcBuffers[i]->len -= ret;
321             }
322             break;
323         }
324         index++;
325         
326     }
327     
328     // Delete the items sent out
329     unsigned j;
330     for (j = 0; j < (unsigned)index; j++) {
331         delete [] rpcBuffers[j]->buf;
332     }    
333     
334     for (j = 0; j < rpcBuffers.size() - index; j++)
335         rpcBuffers[j] = rpcBuffers[j+index];
336     rpcBuffers.resize(rpcBuffers.size() - index);
337
338     return (ret = (int)len);
339 }
340
341 void
342 doDeferedRPCasyncXDRWrite() {
343
344     if (rpcBuffers.size() == 0)
345         return;
346
347     int ret, index = 0;
348
349     // Write the items to the other end if possible 
350     for (int i = 0; (i < (int)rpcBuffers.size()); i++) {
351
352         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FNONBLOCK) == -1)
353             perror("fcntl");
354
355         ret = (int)P_write(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
356                            rpcBuffers[i]->len);
357
358         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FSYNC) == -1)
359             perror("fcntl");
360         
361         if (rpcBuffers[i]->len != ret) {
362
363             if (ret != -1) {
364                 assert(ret < rpcBuffers[i]->len);
365                 printf("Warning: a partial message sent!\n");
366                 P_memcpy(rpcBuffers[i]->buf, rpcBuffers[i]->buf+ret,
367                          rpcBuffers[i]->len-ret);
368                 rpcBuffers[i]->len -= ret;
369             }
370             break;
371         }
372         index++;
373
374     }
375     
376     // Delete the items sent out
377     unsigned j;
378     for (j = 0; j < (unsigned)index; j++) {
379         delete [] rpcBuffers[j]->buf;
380     }    
381
382     for (j = 0; j < rpcBuffers.size() - index; j++)
383         rpcBuffers[j] = rpcBuffers[j+index];
384     rpcBuffers.resize(rpcBuffers.size() - index);
385 }
386
387 XDRrpc::~XDRrpc()
388 {
389   if (fd >= 0)
390     {
391       P_fcntl (fd, F_SETFL, FNDELAY);
392       P_close(fd);
393       fd = -1;
394     }
395   if (xdrs) 
396     {
397       P_xdr_destroy (xdrs);
398       delete (xdrs);
399       xdrs = NULL;
400     }
401 }
402
403 //
404 // prepare for RPC's to be done/received on the passed fd.
405 //
406 XDRrpc::XDRrpc(const int f, xdr_rd_func readRoutine, xdr_wr_func writeRoutine, const int nblock)
407 : xdrs(NULL), fd(f)
408 {
409     assert(fd >= 0);
410     xdrs = new XDR;
411     assert(xdrs);
412     if (!readRoutine) {
413         if (nblock == 1) {
414             readRoutine = (xdr_rd_func) RPCasyncXDRRead;
415         } else { 
416             readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
417         }
418     }   
419     if (!writeRoutine) {
420         if (nblock == 2) {
421             writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
422         } else {    
423             writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
424         }
425     }   
426     P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine, writeRoutine);
427 }
428
429 //
430 // prepare for RPC's to be done/received on the passed fd.
431 //
432 XDRrpc::XDRrpc(const string &machine,
433                const string &user,
434                const string &program,
435                xdr_rd_func readRoutine, 
436                xdr_wr_func writeRoutine,
437                const vector<string> &arg_list,
438                const int nblock,
439                const int wellKnownPortFd)
440 : xdrs(NULL), fd(-1)
441 {
442     fd = RPCprocessCreate(machine, user, program, arg_list, wellKnownPortFd);
443     if (fd >= 0) {
444         xdrs = new XDR;
445         if (!readRoutine) {
446             if (nblock == 1) {
447                 readRoutine = (xdr_rd_func) RPCasyncXDRRead;
448             } else { 
449                 readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
450             }
451         }
452         if (!writeRoutine) {
453             if (nblock == 2) {
454                 writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
455             } else { 
456                 writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
457             }
458         }
459         P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine, writeRoutine);
460     }
461 }
462
463 bool
464 RPC_readReady (int fd, int timeout)
465 {
466   fd_set readfds;
467   struct timeval tvptr, *the_tv;
468
469   tvptr.tv_sec = timeout; tvptr.tv_usec = 0;
470   if (fd < 0) return false;
471   FD_ZERO(&readfds);
472   FD_SET (fd, &readfds);
473
474   // -1 timeout = blocking select
475   if (timeout == -1)
476      the_tv = 0;
477   else
478      the_tv = &tvptr;
479  
480   if (P_select (fd+1, &readfds, NULL, NULL, the_tv) == -1)
481     {
482       // if (errno == EBADF)
483         return false;
484     }
485   return (FD_ISSET (fd, &readfds));
486 }
487
488 /*
489  * Build an argument list starting at position 0.
490  * Note, this arg list will be used in an exec system call
491  * AND, the command name will have to be inserted at the head of the list
492  * But, a NULL space will NOT be left at the head of the list
493  */
494 bool RPC_make_arg_list(vector<string> &list,
495                        const int well_known_socket, const int flag, const int firstPVM,
496                        const string machine_name, const bool use_machine)
497 {
498   char arg_str[100];
499
500   list.resize(0);
501
502   sprintf(arg_str, "%s%d", "-p", well_known_socket);  
503   list += arg_str;
504   // arg_list[arg_count++] = strdup (arg_str);  // 0
505
506   if (!use_machine) {
507     list += string("-m") + getHostName();
508   } else {
509     list += string("-m") + machine_name;
510   }
511   // arg_list[arg_count++] = strdup (arg_str); // 3
512
513   sprintf(arg_str, "%s%d", "-l", flag);
514   list += arg_str;
515   //arg_list[arg_count++] = strdup (arg_str);  // 4
516
517   sprintf(arg_str, "%s%d", "-v", firstPVM);
518   list += arg_str;
519   // arg_list[arg_count++] = strdup (arg_str);  // 5 
520
521   return true;
522 }
523
524 // returns fd of socket that is listened on, or -1
525 // (actually, looks like it returns the port number listened on, or -1)
526 int
527 RPC_setup_socket (int &sfd,   // return file descriptor
528                   const int family, // AF_INET ...
529                   const int type)   // SOCK_STREAM ...
530 {
531   if ((sfd = P_socket(family, type, 0)) < 0)
532     return -1;
533
534   struct sockaddr_in serv_addr;
535   P_memset ((void*) &serv_addr, 0, sizeof(serv_addr));
536   serv_addr.sin_family = (short) family;
537   serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
538   serv_addr.sin_port = htons(0);
539   
540   size_t length = sizeof(serv_addr);
541
542   if (P_bind(sfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
543     return -1;
544
545   if (P_getsockname (sfd, (struct sockaddr *) &serv_addr, &length) < 0)
546     return -1;
547
548   if (P_listen(sfd, 5) < 0)
549     return -1;
550
551   return (ntohs (serv_addr.sin_port));
552 }
553
554 // setup a AF_UNIX domain socket of type SOCK_STREAM
555 bool
556 RPC_setup_socket_un (int &sfd,   // return file descriptor
557                      const char *path) // file path socket is bound to
558 {
559   struct sockaddr_un saddr;
560
561   if ((sfd = P_socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
562     return false;
563
564   saddr.sun_family = AF_UNIX;
565   P_strcpy(saddr.sun_path, path);
566   
567   if (P_bind(sfd, (struct sockaddr *) &saddr, sizeof(saddr)) < 0)
568     return false;
569
570   if (P_listen(sfd, 5) < 0)
571     return false;
572
573   return true;
574 }
575
576 //
577 // connect to well known socket
578 //
579 XDRrpc::XDRrpc(int family,            
580                int req_port,             
581                int type,
582                const string machine, 
583                xdr_rd_func readRoutine,
584                xdr_wr_func writeRoutine,
585                const int nblock) : xdrs(NULL), fd(-1)
586      // socket, connect using machine
587 {
588   struct sockaddr_in serv_addr;
589   struct hostent *hostptr = 0;
590   struct in_addr *inadr = 0;
591   if (!(hostptr = P_gethostbyname(machine.string_of())))
592     return;
593
594   inadr = (struct in_addr *) ((void*) hostptr->h_addr_list[0]);
595   P_memset ((void*) &serv_addr, 0, sizeof(serv_addr));
596   serv_addr.sin_family = family;
597   serv_addr.sin_addr = *inadr;
598   serv_addr.sin_port = htons(req_port);
599
600   if ( (fd = P_socket(family, type, 0)) < 0)
601     { fd = -1; return; }
602
603   if (P_connect(fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
604     { fd = -1; return; }
605
606     xdrs = new XDR;
607     assert(xdrs);
608     if (!readRoutine) {
609         if (nblock == 1) {
610             readRoutine = (xdr_rd_func) RPCasyncXDRRead;
611         } else {  
612             readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
613         }
614         }
615     if (!writeRoutine) {
616         if (nblock == 2) {
617             writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
618         } else { 
619             writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
620         }
621     }
622     P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine,writeRoutine);
623
624
625 //
626 // These xdr functions are not to be called with XDR_FREE
627 //
628 bool_t xdr_Boolean(XDR *xdrs, bool *bool_val) {
629    u_char i;
630    bool_t res;
631    switch (xdrs->x_op) {
632    case XDR_ENCODE:
633       i = (u_char) *bool_val;
634       return (P_xdr_u_char(xdrs, &i));
635    case XDR_DECODE:
636       res = P_xdr_u_char(xdrs, &i);
637       *bool_val = (bool) i;
638       return res;
639    case XDR_FREE:
640    default:
641       assert(0);
642       return FALSE;
643    }
644 }
645
646 // XDR_FREE not handled, free it yourself!
647 // our version of string encoding that does malloc as needed.
648 //
649 bool_t xdr_string_pd(XDR *xdrs, string *str)
650 {
651   unsigned int length = 0;
652   assert(str);
653
654   // if XDR_FREE, str's memory is freed
655   switch (xdrs->x_op) {
656   case XDR_ENCODE:
657     if ((length = str->length())) {
658       if (!P_xdr_u_int(xdrs, &length))
659         return FALSE;
660       else {
661         char *buffer = (char*) str->string_of(); 
662         bool_t res = P_xdr_string (xdrs, &buffer, str->length() + 1);
663         return res;
664       }
665     } else {
666       return (P_xdr_u_int(xdrs, &length));
667     }
668   case XDR_DECODE:
669     if (!P_xdr_u_int(xdrs, &length))
670       return FALSE;
671      else if (!length) {
672        *str = (char*) NULL;
673        return TRUE;
674      } else {
675        char *temp; 
676        bool newd = false;
677        unsigned max_len = 511;
678        char stat_buf[512];
679        if (length < 512) 
680           temp = (char*) stat_buf;
681        else {
682           temp = new char[length+1];
683           max_len = length;
684           newd = true;
685        }      
686        if (!P_xdr_string (xdrs, &temp, max_len)) {
687           if (newd) 
688             delete temp;
689           return FALSE;
690        } else {
691           *str = temp;
692           if (newd)
693             delete temp; 
694           return TRUE;
695        }
696      }
697   case XDR_FREE:
698   default:
699     // this should never occur  
700     assert(0);
701     return (FALSE);
702   }
703 }
704
705 // trace data streams
706 // XDR_FREE not handled, free it yourself!
707 // our version of string encoding that does malloc as needed.
708 //
709
710 bool_t xdr_byteArray_pd(XDR *xdrs, byteArray *bArray)
711 {
712   unsigned int length = 0;
713   assert(bArray);
714
715   // if XDR_FREE, str's memory is freed
716   switch (xdrs->x_op) {
717   case XDR_ENCODE:
718     if (length = bArray->length()) {
719       if (!P_xdr_u_int(xdrs, &length))
720         return FALSE;
721       else {
722         char *buffer = (char*) bArray->getArray();
723         bool_t res = P_xdr_byteArray (xdrs, &buffer, &length, (bArray->length()));
724         return res;
725       }
726     } else {
727       return (P_xdr_u_int(xdrs, &length));
728     }
729   case XDR_DECODE:
730     if (!P_xdr_u_int(xdrs, &length))
731       return FALSE;
732      else if (!length) {
733        *bArray = byteArray( (char *)NULL , 0);
734        return TRUE;
735      } else {
736        char *temp;
737        unsigned int act_len;
738        unsigned int max_len = length;
739        temp = new char[length];
740        if (!P_xdr_byteArray (xdrs, &temp, &act_len, max_len)) {
741           delete temp;
742           return FALSE;
743        } else if (act_len != length) {
744           delete temp;
745           return FALSE;
746        } else {
747           *bArray = byteArray(temp,act_len);
748           delete temp;
749           return TRUE;
750        }
751      }
752   case XDR_FREE:
753   default:
754     // this should never occur
755     assert(0);
756     return (FALSE);
757   }
758 }
759
760 //
761 // directly exec the command (local).
762 //
763
764 int execCmd(const string command, const vector<string> &arg_list, int /*portFd*/)
765 {
766   int ret;
767   int sv[2];
768   int execlERROR;
769
770   errno = 0;
771   ret = P_socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
772   if (ret==-1) return(ret);
773   execlERROR = 0;
774   int al_len = arg_list.size();
775   char **new_al = new char*[al_len+2];
776   // TODO argv[0] should not include path
777   new_al[0] = P_strdup(command.string_of());
778   new_al[al_len+1] = NULL;
779   for (int i=0; i<al_len; ++i)
780     new_al[i+1] = P_strdup(arg_list[i].string_of());
781   ret = -1;
782
783   int pid;
784   pid = vfork();
785   // close sv[0] after exec 
786   P_fcntl(sv[0],F_SETFD,1);  
787
788   if (pid == 0) {
789     if (P_close(sv[0])) { execlERROR = errno; _exit(-1);}
790     if (P_dup2(sv[1], 0)) { execlERROR = errno; _exit(-1);}
791     P_execvp(new_al[0], new_al);
792     execlERROR = errno;
793     _exit(-1);
794   } else if (pid > 0 && !execlERROR) {
795     P_close(sv[1]);
796     ret = sv[0];
797   }
798
799   al_len=0;
800   while (new_al[al_len]) {
801     delete (new_al[al_len]);
802     al_len++;
803   }
804   delete(new_al);
805   return ret;
806 }
807
808 int handleRemoteConnect(int fd, int portFd) {
809     // NOTE: This routine is not generic; it is very specific to paradyn
810     // (due to the sscanf(line, "PARADYND %d")...)
811     // Hence it makes absolutely no sense to put it in a so-called library.
812     // It should be put into the paradyn/src tree --ari
813
814     FILE *pfp = P_fdopen(fd, "r");
815     if (pfp == NULL) {
816        cerr << "handleRemoteConnect: fdopen of fd " << fd << " failed." << endl;
817        return -1;
818     }
819
820     int retFd = RPC_getConnect(portFd); // calls accept(), which blocks (sigh...)
821     return retFd;
822 }
823
824 //
825 // use rsh to get a remote process started.
826 //
827 // We do an rsh to start a process and them wait for the process 
828 // to do a connect. There is no guarantee that the process we are waiting for is
829 // the one that gets the connection. This can happen with paradyndPVM: the
830 // daemon that is started by a rshCommand will start other daemons, and one of 
831 // these daemons may get the connection that should be for the first daemon.
832 // Daemons should always get a connection before attempting to start other daemons.
833 //
834
835 int rshCommand(const string hostName, const string userName, 
836                const string command, const vector<string> &arg_list, int portFd)
837 {
838     int fd[2];
839     int shellPid;
840     int ret;
841
842     int total = command.length() + 2;
843     for (unsigned i=0; i<arg_list.size(); i++) {
844       total += arg_list[i].length() + 2;
845     }
846
847     string paradyndCommand = command + " ";
848
849     for (unsigned j=0; j < arg_list.size(); j++)
850         paradyndCommand += arg_list[j] + " ";
851
852     // need to rsh to machine and setup io path.
853
854     if (pipe(fd)) {
855         perror("pipe");
856         return (-1);
857     }
858
859     shellPid = vfork();
860     if (shellPid == 0) {
861         /* child */
862         bool aflag;
863         aflag=(-1 != dup2(fd[1], 1)); /* copy it onto stdout */
864         assert(aflag);
865         aflag=(-1 != close(fd[0]));
866         assert(aflag);
867         aflag=(-1 != close(fd[1]));
868         assert(aflag);
869         if (userName.length()) {
870             ret = execlp(RSH_COMMAND, RSH_COMMAND, hostName.string_of(), "-l", 
871                          userName.string_of(), "-n", paradyndCommand.string_of(),
872                          "-l0", NULL);
873             fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
874         } else {
875             ret = execlp(RSH_COMMAND, RSH_COMMAND, hostName.string_of(), "-n", 
876                          paradyndCommand.string_of(), "-l0", NULL);
877             fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
878         }
879         _exit(-1);
880     } else if (shellPid > 0) {
881         close(fd[1]);
882     } else {
883         // error situation
884     }
885
886     return(handleRemoteConnect(fd[0], portFd));
887 }
888
889 int rexecCommand(const string hostName, const string userName, 
890                  const string command, const vector<string> &arg_list, int portFd)
891 {
892     struct servent *inport;
893
894     int total = command.length() + 2;
895     for (unsigned i=0; i<arg_list.size(); i++) {
896       total += arg_list[i].length() + 2;
897     }
898
899     char *paradyndCommand = new char[total+2];
900     assert(paradyndCommand);
901
902     sprintf(paradyndCommand, "%s ", command.string_of());
903     for (unsigned j=0; j<arg_list.size(); j++) {
904         P_strcat(paradyndCommand, arg_list[j].string_of());
905         P_strcat(paradyndCommand, " ");
906     }
907
908     inport = P_getservbyname("exec",  "tcp");
909
910     char *hname = P_strdup(hostName.string_of());
911     char *uname = P_strdup(userName.string_of());
912     int fd = P_rexec(&hname, inport->s_port, uname, NULL,
913                    paradyndCommand, NULL);
914     delete hname; delete uname;
915
916     if (fd < 0) {
917         perror("rexec");
918         printf("rexec failed\n");
919     }
920     if (paradyndCommand)
921       delete paradyndCommand;
922
923     return(handleRemoteConnect(fd, portFd));
924 }
925
926 /*
927  * 
928  * "command" will be appended to the front of the arg list
929  *
930  * if arg_list == NULL, command is the only argument used
931  */
932
933 int RPCprocessCreate(const string hostName, const string userName,
934                      const string command, const vector<string> &arg_list,
935                      int portFd, const bool useRexec)
936 {
937     int ret;
938     struct utsname unm;
939
940     if (P_uname(&unm) == -1)
941       assert(0);
942
943     if ((hostName == "") || 
944         (hostName == "localhost") ||
945         (hostName == unm.nodename))
946       ret = execCmd(command, arg_list, portFd);
947     else if (useRexec)
948       ret = rexecCommand(hostName, userName, command, arg_list, portFd);
949     else
950       ret = rshCommand(hostName, userName, command, arg_list, portFd);
951
952     return(ret);
953 }
954
955 int RPC_getConnect(const int fd) {
956   if (fd == -1)
957     return -1;
958
959   struct in_addr cli_addr;
960   size_t clilen = sizeof(cli_addr);
961   int new_fd = P_accept (fd, (struct sockaddr *) &cli_addr, &clilen);
962
963   if (new_fd < 0)
964     return -1;
965   else
966     return new_fd;
967 }
968
969 // TODO -- use vectors and strings ?
970 /*
971  *  RPCgetArg - break a string into blank separated words
972  *  Used to parse a command line.
973  *  
974  *  input --> a null terminated string, the command line
975  *  argc --> returns the number of args found
976  *  returns --> words (separated by blanks on command line)
977  *  Note --> this allocates memory 
978  */
979 bool RPCgetArg(vector<string> &arg, const char *input)
980 {
981 #define BLANK ' '
982
983   int word_len;
984   if (!input) 
985     return false;
986
987   int length = strlen(input);
988   int index = 0;
989
990   /* advance past blanks in input */
991   while ((index < length) && (input[index] == BLANK))
992     index++;
993
994   /* input is all blanks, or NULL */
995   if (index >= length) 
996     return true;
997
998   do {
999     /* the start of each string */
1000     word_len = 0; const char *word_start = input + index;
1001
1002     /* find the next BLANK and the WORD size */
1003     while ((index < length) && (input[index] != BLANK)) {
1004       index++; word_len++;
1005     }
1006
1007     /* copy the word */
1008     char *temp = new char[word_len+1];
1009     strncpy(temp, word_start, word_len);
1010     temp[word_len] = (char) 0;
1011     arg += temp;
1012     delete temp;
1013
1014     /* skip past consecutive blanks */
1015     while ((index < length) && (input[index] == BLANK))
1016       index++;
1017   } while (index < length);
1018   return true;
1019 }
1020
1021
1022 string getHostName() {
1023     struct utsname un;
1024     P_uname(&un);
1025     return string(un.nodename);
1026 }