Changes made to handle the comminication between different types of
[dyninst.git] / common / src / rpcUtil.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 /*
43  * $Log: rpcUtil.C,v $
44  * Revision 1.51  1997/05/20 15:18:57  lzheng
45  * Changes made to handle the comminication between different types of
46  * machines
47  *
48  * Revision 1.50  1997/05/17 20:01:30  lzheng
49  * Changes made for nonblocking write
50  *
51  * Revision 1.49  1997/04/21 16:57:56  hseom
52  * added support for trace data (in a byte array)
53  *
54  * Revision 1.48  1997/02/26 23:49:54  mjrg
55  * First part of WindowsNT commit: changes for compiling with VisualC++;
56  * moved includes to platform header files
57  *
58  * Revision 1.47  1997/01/21 20:09:59  mjrg
59  * Added support for unix domain sockets.
60  * Added getHostName function
61  *
62  * Revision 1.46  1997/01/16 20:52:21  tamches
63  * removed RPC_undo_arg_list (to paradynd)
64  *
65  * Revision 1.45  1996/11/26 16:09:37  naim
66  * Fixing asserts - naim
67  *
68  * Revision 1.44  1996/11/12 17:50:16  mjrg
69  * Removed warnings, changes for compiling with Visual C++ and xlc
70  *
71  * Revision 1.43  1996/08/16 21:32:02  tamches
72  * updated copyright for release 1.1
73  *
74  * Revision 1.42  1996/05/31 23:43:10  tamches
75  * removed pid from XDRrpc.  Modified handleRemoteConnect appropriately.
76  * Now paradynd doesn't try to send pid to paradyn (it wasn't being used
77  * and was one cause of paradyn UI freezing on UI startup).
78  *
79  * Revision 1.41  1995/11/22 00:06:20  mjrg
80  * Updates for paradyndPVM on solaris
81  * Fixed problem with wrong daemon getting connection to paradyn
82  * Removed -f and -t arguments to paradyn
83  * Added cleanUpAndExit to clean up and exit from pvm before we exit paradynd
84  * Fixed bug in my previous commit
85  *
86  * Revision 1.40  1995/11/12 00:44:28  newhall
87  * fix to execCmd: forked process closes it's copy of parent's file descriptors
88  *
89  * Revision 1.39  1995/08/24  15:14:29  hollings
90  * AIX/SP-2 port (including option for split instruction/data heaps)
91  * Tracing of rexec (correctly spawns a paradynd if needed)
92  * Added rtinst function to read getrusage stats (can now be used in metrics)
93  * Critical Path
94  * Improved Error reporting in MDL sematic checks
95  * Fixed MDL Function call statement
96  * Fixed bugs in TK usage (strings passed where UID expected)
97  *
98  * Revision 1.38  1995/05/18  11:12:15  markc
99  * Added flavor arg to RPC_undo_g_list
100  *
101 */
102
103 //
104 // This file defines a set of utility routines for RPC services.
105 //
106 //
107
108 // overcome malloc redefinition due to /usr/include/rpc/types.h declaring 
109 // malloc 
110 // This is ugly, and I hope to get rid of it -- mdc 2/2/95
111 #if defined(notdef)
112 /* prevents malloc from being redefined */
113 #ifdef MIPS
114 #define MALLOC_DEFINED_AS_VOID
115 #endif
116 #endif
117
118 #include <limits.h>
119 #include <sys/types.h>
120 #include <netinet/in.h>
121 #include "util/h/rpcUtil.h"
122
123 const char *RSH_COMMAND="rsh";
124
125 int RPCdefaultXDRRead(const void* handle, char *buf, const u_int len)
126 {
127     int fd = (int) handle;
128     int ret;
129
130     do {
131         ret = P_read(fd, buf, len);
132     } while (ret < 0 && errno == EINTR);
133
134     if (ret <= 0) { return(-1); }
135     return (ret);
136 }
137
138 // one counter per file descriptor to record the sequnce no the message
139 // to be received
140 vector<int> counter_2;
141
142 // One partial message record for each file descriptor
143 vector<rpcBuffer *> partialMsgs;
144
145 int RPCasyncXDRRead(const void* handle, char *buf, const u_int len)
146 {
147     /* called when paradyn/xdr detects that it needs to read a message
148        from paradynd. */
149     int fd = (int) handle;
150     unsigned header;
151     int ret;
152     int needCopy = 0;
153     char *buffer = buf;
154     u_int internal_len = 0;
155     bool newfd = true;
156     unsigned i;
157
158     for (i = 0; i< partialMsgs.size(); i++) {
159         if (partialMsgs[i] -> fd == fd) {
160             newfd = false;
161             break;
162         }
163     }
164     
165     // If new connection, allocate a partial message record
166     if (newfd) {
167         rpcBuffer *partial = new rpcBuffer;
168         partial -> fd  = fd;
169         partial -> len = 0;
170         partial -> buf = NULL;
171         partialMsgs += partial;
172         i = partialMsgs.size() - 1;
173         
174         counter_2 += 0;
175     }
176
177     // There're two situations dealt with here: with partial message
178     // left by previous read, without. The 'len' field is zero if it is
179     // the first case, nonzero if it is the second case    
180     if (partialMsgs[i] -> len) needCopy = 1;
181
182     // Allocate buffer which is four more bytes than RPC buffer for the
183     // convenience of processing message headers.
184     buffer = new char[len + sizeof(int)];
185
186     if (needCopy) {
187         P_memcpy(buffer, partialMsgs[i]->buf , partialMsgs[i]->len); 
188         delete [] partialMsgs[i]->buf;
189         partialMsgs[i]->buf = NULL;
190     }
191
192     do {
193         ret = P_read(fd, buffer+partialMsgs[i]->len, 
194                      len + sizeof(int) -partialMsgs[i]->len);
195     } while (ret < 0 && errno == EINTR);
196
197     if (ret <= 0) { return(-1); }
198
199     ret += partialMsgs[i]->len;
200
201     char *pstart = buffer;
202     
203     // Processing the messages received: remove those message headers;
204     // check on deliminator and sequence no to ensure the correctness;
205     // save the partial message if any
206     char *tail = buffer;
207     int is_left = ret - sizeof(int);
208     while (is_left >= 0) {
209         P_memcpy((char *)&header, buffer, sizeof(int));
210         header = ntohl(header);
211         assert(0xf == ((header >> 12)&0xf));
212
213         short seq_no;
214         P_memcpy((char *)&(seq_no), buffer, sizeof(short));
215         seq_no = ntohs(seq_no);
216         header = (0x0fff&header);
217         
218         if (header <= is_left) {
219             P_memcpy(tail, buffer+sizeof(int), header);
220
221             counter_2[i] = ((counter_2[i]+1)%SHRT_MAX);
222             assert(seq_no == counter_2[i]);
223
224             internal_len += header;
225             if (internal_len > len) {
226                 abort();
227                 ret = ret - sizeof(int) - is_left;
228                 break;
229             }
230             tail += header;
231             buffer += header + sizeof(int);
232
233             is_left = is_left - header - sizeof(int);
234             ret -= sizeof(int);
235         } else {
236             ret = ret - sizeof(int) - is_left;
237             break;
238         }
239     }
240
241
242
243     if (is_left >= 0) {
244         
245         partialMsgs[i]->len = is_left + sizeof(int);
246         partialMsgs[i]->buf = new char[partialMsgs[i]->len+1];
247         P_memcpy(partialMsgs[i]->buf, buffer, partialMsgs[i]->len); 
248     } else {
249         partialMsgs[i]->len = 0;
250         assert(is_left == (0-(int)sizeof(int)));
251     }
252
253     // Copy back to internal RPC buffer
254     P_memcpy(buf, pstart, ret);
255
256     if (needCopy) {
257         buffer = pstart;
258         assert(buffer != buf); // make sure we aren't deleting the 2d param
259         delete [] buffer;
260     }
261     
262     return (ret);
263 }
264
265 int RPCdefaultXDRWrite(const void* handle, const char *buf, const u_int len)
266 {
267     int fd = (int) handle;
268     int ret;
269
270     do {
271
272         ret = P_write(fd, buf, len);
273     } while (ret < 0 && errno == EINTR);
274
275     errno = 0;
276     if (ret != (int)len)
277       return(-1);
278     else 
279       return (ret);
280 }
281
282 vector<rpcBuffer *> rpcBuffers;
283 static short counter = 0;
284
285 int RPCasyncXDRWrite(const void* handle, const char *buf, const u_int len)
286 {
287     int ret;
288     int index = 0;
289     unsigned header = len;
290
291     rpcBuffer *rb = new rpcBuffer;
292     rb -> fd = (int) handle;
293     rb -> len = len+sizeof(int);
294     rb -> buf = new char[rb -> len];
295
296     counter = ((counter+1)%SHRT_MAX);
297
298     assert(len <= 4040);
299
300     // Adding a header to the messages sent   
301     header = ((counter << 16) | header | 0xf000); 
302
303     // Converting to network order
304     header = htonl(header);
305     
306     P_memcpy(rb -> buf, (char *)&header, sizeof(int));
307     P_memcpy(rb -> buf+sizeof(int), buf, len);
308
309     rpcBuffers += rb;
310
311     // Write the items to the other end if possible with asynchrous write
312     for (int i = 0; (i < (int)rpcBuffers.size()); i++) {
313
314         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FNONBLOCK) == -1)
315             perror("fcntl");
316
317         ret = (int)P_write(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
318                            rpcBuffers[i]->len);
319
320         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FSYNC) == -1)
321             perror("fcntl");
322
323         if (rpcBuffers[i]->len != ret) {
324
325             if (ret != -1) {
326                 assert(ret < rpcBuffers[i]->len);
327                 printf("Warning: a partial message sent!\n");
328                 P_memcpy(rpcBuffers[i]->buf, rpcBuffers[i]->buf+ret,
329                          rpcBuffers[i]->len-ret);
330                 rpcBuffers[i]->len -= ret;
331             }
332             break;
333         }
334         index++;
335         
336     }
337     
338     // Delete the items sent out
339     unsigned j;
340     for (j = 0; j < (unsigned)index; j++) {
341         delete [] rpcBuffers[j]->buf;
342     }    
343     
344     for (j = 0; j < rpcBuffers.size() - index; j++)
345         rpcBuffers[j] = rpcBuffers[j+index];
346     rpcBuffers.resize(rpcBuffers.size() - index);
347
348     return (ret = (int)len);
349 }
350
351 void
352 doDeferedRPCasyncXDRWrite() {
353
354     if (rpcBuffers.size() == 0)
355         return;
356
357     int ret, index = 0;
358
359     // Write the items to the other end if possible 
360     for (int i = 0; (i < (int)rpcBuffers.size()); i++) {
361
362         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FNONBLOCK) == -1)
363             perror("fcntl");
364
365         ret = (int)P_write(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
366                            rpcBuffers[i]->len);
367
368         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FSYNC) == -1)
369             perror("fcntl");
370         
371         if (rpcBuffers[i]->len != ret) {
372
373             if (ret != -1) {
374                 assert(ret < rpcBuffers[i]->len);
375                 printf("Warning: a partial message sent!\n");
376                 P_memcpy(rpcBuffers[i]->buf, rpcBuffers[i]->buf+ret,
377                          rpcBuffers[i]->len-ret);
378                 rpcBuffers[i]->len -= ret;
379             }
380             break;
381         }
382         index++;
383
384     }
385     
386     // Delete the items sent out
387     unsigned j;
388     for (j = 0; j < (unsigned)index; j++) {
389         delete [] rpcBuffers[j]->buf;
390     }    
391
392     for (j = 0; j < rpcBuffers.size() - index; j++)
393         rpcBuffers[j] = rpcBuffers[j+index];
394     rpcBuffers.resize(rpcBuffers.size() - index);
395 }
396
397 XDRrpc::~XDRrpc()
398 {
399   if (fd >= 0)
400     {
401       P_fcntl (fd, F_SETFL, FNDELAY);
402       P_close(fd);
403       fd = -1;
404     }
405   if (xdrs) 
406     {
407       P_xdr_destroy (xdrs);
408       delete (xdrs);
409       xdrs = NULL;
410     }
411 }
412
413 //
414 // prepare for RPC's to be done/received on the passed fd.
415 //
416 XDRrpc::XDRrpc(const int f, xdr_rd_func readRoutine, xdr_wr_func writeRoutine, const int nblock)
417 : xdrs(NULL), fd(f)
418 {
419     assert(fd >= 0);
420     xdrs = new XDR;
421     assert(xdrs);
422     if (!readRoutine) {
423         if (nblock == 1) {
424             readRoutine = (xdr_rd_func) RPCasyncXDRRead;
425         } else { 
426             readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
427         }
428     }   
429     if (!writeRoutine) {
430         if (nblock == 2) {
431             writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
432         } else {    
433             writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
434         }
435     }   
436     P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine, writeRoutine);
437 }
438
439 //
440 // prepare for RPC's to be done/received on the passed fd.
441 //
442 XDRrpc::XDRrpc(const string &machine,
443                const string &user,
444                const string &program,
445                xdr_rd_func readRoutine, 
446                xdr_wr_func writeRoutine,
447                const vector<string> &arg_list,
448                const int nblock,
449                const int wellKnownPortFd)
450 : xdrs(NULL), fd(-1)
451 {
452     fd = RPCprocessCreate(machine, user, program, arg_list, wellKnownPortFd);
453     if (fd >= 0) {
454         xdrs = new XDR;
455         if (!readRoutine) {
456             if (nblock == 1) {
457                 readRoutine = (xdr_rd_func) RPCasyncXDRRead;
458             } else { 
459                 readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
460             }
461         }
462         if (!writeRoutine) {
463             if (nblock == 2) {
464                 writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
465             } else { 
466                 writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
467             }
468         }
469         P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine, writeRoutine);
470     }
471 }
472
473 bool
474 RPC_readReady (int fd, int timeout)
475 {
476   fd_set readfds;
477   struct timeval tvptr, *the_tv;
478
479   tvptr.tv_sec = timeout; tvptr.tv_usec = 0;
480   if (fd < 0) return false;
481   FD_ZERO(&readfds);
482   FD_SET (fd, &readfds);
483
484   // -1 timeout = blocking select
485   if (timeout == -1)
486      the_tv = 0;
487   else
488      the_tv = &tvptr;
489  
490   if (P_select (fd+1, &readfds, NULL, NULL, the_tv) == -1)
491     {
492       // if (errno == EBADF)
493         return false;
494     }
495   return (FD_ISSET (fd, &readfds));
496 }
497
498 /*
499  * Build an argument list starting at position 0.
500  * Note, this arg list will be used in an exec system call
501  * AND, the command name will have to be inserted at the head of the list
502  * But, a NULL space will NOT be left at the head of the list
503  */
504 bool RPC_make_arg_list(vector<string> &list,
505                        const int well_known_socket, const int flag, const int firstPVM,
506                        const string machine_name, const bool use_machine)
507 {
508   char arg_str[100];
509
510   list.resize(0);
511
512   sprintf(arg_str, "%s%d", "-p", well_known_socket);  
513   list += arg_str;
514   // arg_list[arg_count++] = strdup (arg_str);  // 0
515
516   if (!use_machine) {
517     list += string("-m") + getHostName();
518   } else {
519     list += string("-m") + machine_name;
520   }
521   // arg_list[arg_count++] = strdup (arg_str); // 3
522
523   sprintf(arg_str, "%s%d", "-l", flag);
524   list += arg_str;
525   //arg_list[arg_count++] = strdup (arg_str);  // 4
526
527   sprintf(arg_str, "%s%d", "-v", firstPVM);
528   list += arg_str;
529   // arg_list[arg_count++] = strdup (arg_str);  // 5 
530
531   return true;
532 }
533
534 // returns fd of socket that is listened on, or -1
535 // (actually, looks like it returns the port number listened on, or -1)
536 int
537 RPC_setup_socket (int &sfd,   // return file descriptor
538                   const int family, // AF_INET ...
539                   const int type)   // SOCK_STREAM ...
540 {
541   if ((sfd = P_socket(family, type, 0)) < 0)
542     return -1;
543
544   struct sockaddr_in serv_addr;
545   P_memset ((void*) &serv_addr, 0, sizeof(serv_addr));
546   serv_addr.sin_family = (short) family;
547   serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
548   serv_addr.sin_port = htons(0);
549   
550   size_t length = sizeof(serv_addr);
551
552   if (P_bind(sfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
553     return -1;
554
555   if (P_getsockname (sfd, (struct sockaddr *) &serv_addr, &length) < 0)
556     return -1;
557
558   if (P_listen(sfd, 5) < 0)
559     return -1;
560
561   return (ntohs (serv_addr.sin_port));
562 }
563
564 // setup a AF_UNIX domain socket of type SOCK_STREAM
565 bool
566 RPC_setup_socket_un (int &sfd,   // return file descriptor
567                      const char *path) // file path socket is bound to
568 {
569   struct sockaddr_un saddr;
570
571   if ((sfd = P_socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
572     return false;
573
574   saddr.sun_family = AF_UNIX;
575   P_strcpy(saddr.sun_path, path);
576   
577   if (P_bind(sfd, (struct sockaddr *) &saddr, sizeof(saddr)) < 0)
578     return false;
579
580   if (P_listen(sfd, 5) < 0)
581     return false;
582
583   return true;
584 }
585
586 //
587 // connect to well known socket
588 //
589 XDRrpc::XDRrpc(int family,            
590                int req_port,             
591                int type,
592                const string machine, 
593                xdr_rd_func readRoutine,
594                xdr_wr_func writeRoutine,
595                const int nblock) : xdrs(NULL), fd(-1)
596      // socket, connect using machine
597 {
598   struct sockaddr_in serv_addr;
599   struct hostent *hostptr = 0;
600   struct in_addr *inadr = 0;
601   if (!(hostptr = P_gethostbyname(machine.string_of())))
602     return;
603
604   inadr = (struct in_addr *) ((void*) hostptr->h_addr_list[0]);
605   P_memset ((void*) &serv_addr, 0, sizeof(serv_addr));
606   serv_addr.sin_family = family;
607   serv_addr.sin_addr = *inadr;
608   serv_addr.sin_port = htons(req_port);
609
610   if ( (fd = P_socket(family, type, 0)) < 0)
611     { fd = -1; return; }
612
613   if (P_connect(fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
614     { fd = -1; return; }
615
616     xdrs = new XDR;
617     assert(xdrs);
618     if (!readRoutine) {
619         if (nblock == 1) {
620             readRoutine = (xdr_rd_func) RPCasyncXDRRead;
621         } else {  
622             readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
623         }
624         }
625     if (!writeRoutine) {
626         if (nblock == 2) {
627             writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
628         } else { 
629             writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
630         }
631     }
632     P_xdrrec_create(xdrs, 0, 0, (char *) fd, readRoutine,writeRoutine);
633
634
635 //
636 // These xdr functions are not to be called with XDR_FREE
637 //
638 bool_t xdr_Boolean(XDR *xdrs, bool *bool_val) {
639    u_char i;
640    bool_t res;
641    switch (xdrs->x_op) {
642    case XDR_ENCODE:
643       i = (u_char) *bool_val;
644       return (P_xdr_u_char(xdrs, &i));
645    case XDR_DECODE:
646       res = P_xdr_u_char(xdrs, &i);
647       *bool_val = (bool) i;
648       return res;
649    case XDR_FREE:
650    default:
651       assert(0);
652       return FALSE;
653    }
654 }
655
656 // XDR_FREE not handled, free it yourself!
657 // our version of string encoding that does malloc as needed.
658 //
659 bool_t xdr_string_pd(XDR *xdrs, string *str)
660 {
661   unsigned int length = 0;
662   assert(str);
663
664   // if XDR_FREE, str's memory is freed
665   switch (xdrs->x_op) {
666   case XDR_ENCODE:
667     if ((length = str->length())) {
668       if (!P_xdr_u_int(xdrs, &length))
669         return FALSE;
670       else {
671         char *buffer = (char*) str->string_of(); 
672         bool_t res = P_xdr_string (xdrs, &buffer, str->length() + 1);
673         return res;
674       }
675     } else {
676       return (P_xdr_u_int(xdrs, &length));
677     }
678   case XDR_DECODE:
679     if (!P_xdr_u_int(xdrs, &length))
680       return FALSE;
681      else if (!length) {
682        *str = (char*) NULL;
683        return TRUE;
684      } else {
685        char *temp; 
686        bool newd = false;
687        unsigned max_len = 511;
688        char stat_buf[512];
689        if (length < 512) 
690           temp = (char*) stat_buf;
691        else {
692           temp = new char[length+1];
693           max_len = length;
694           newd = true;
695        }      
696        if (!P_xdr_string (xdrs, &temp, max_len)) {
697           if (newd) 
698             delete temp;
699           return FALSE;
700        } else {
701           *str = temp;
702           if (newd)
703             delete temp; 
704           return TRUE;
705        }
706      }
707   case XDR_FREE:
708   default:
709     // this should never occur  
710     assert(0);
711     return (FALSE);
712   }
713 }
714
715 // trace data streams
716 // XDR_FREE not handled, free it yourself!
717 // our version of string encoding that does malloc as needed.
718 //
719
720 bool_t xdr_byteArray_pd(XDR *xdrs, byteArray *bArray)
721 {
722   unsigned int length = 0;
723   assert(bArray);
724
725   // if XDR_FREE, str's memory is freed
726   switch (xdrs->x_op) {
727   case XDR_ENCODE:
728     if (length = bArray->length()) {
729       if (!P_xdr_u_int(xdrs, &length))
730         return FALSE;
731       else {
732         char *buffer = (char*) bArray->getArray();
733         bool_t res = P_xdr_byteArray (xdrs, &buffer, &length, (bArray->length()));
734         return res;
735       }
736     } else {
737       return (P_xdr_u_int(xdrs, &length));
738     }
739   case XDR_DECODE:
740     if (!P_xdr_u_int(xdrs, &length))
741       return FALSE;
742      else if (!length) {
743        *bArray = byteArray( (char *)NULL , 0);
744        return TRUE;
745      } else {
746        char *temp;
747        unsigned int act_len;
748        unsigned int max_len = length;
749        temp = new char[length];
750        if (!P_xdr_byteArray (xdrs, &temp, &act_len, max_len)) {
751           delete temp;
752           return FALSE;
753        } else if (act_len != length) {
754           delete temp;
755           return FALSE;
756        } else {
757           *bArray = byteArray(temp,act_len);
758           delete temp;
759           return TRUE;
760        }
761      }
762   case XDR_FREE:
763   default:
764     // this should never occur
765     assert(0);
766     return (FALSE);
767   }
768 }
769
770 //
771 // directly exec the command (local).
772 //
773
774 int execCmd(const string command, const vector<string> &arg_list, int /*portFd*/)
775 {
776   int ret;
777   int sv[2];
778   int execlERROR;
779
780   errno = 0;
781   ret = P_socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
782   if (ret==-1) return(ret);
783   execlERROR = 0;
784   int al_len = arg_list.size();
785   char **new_al = new char*[al_len+2];
786   // TODO argv[0] should not include path
787   new_al[0] = P_strdup(command.string_of());
788   new_al[al_len+1] = NULL;
789   for (int i=0; i<al_len; ++i)
790     new_al[i+1] = P_strdup(arg_list[i].string_of());
791   ret = -1;
792
793   int pid;
794   pid = vfork();
795   // close sv[0] after exec 
796   P_fcntl(sv[0],F_SETFD,1);  
797
798   if (pid == 0) {
799     if (P_close(sv[0])) { execlERROR = errno; _exit(-1);}
800     if (P_dup2(sv[1], 0)) { execlERROR = errno; _exit(-1);}
801     P_execvp(new_al[0], new_al);
802     execlERROR = errno;
803     _exit(-1);
804   } else if (pid > 0 && !execlERROR) {
805     P_close(sv[1]);
806     ret = sv[0];
807   }
808
809   al_len=0;
810   while (new_al[al_len]) {
811     delete (new_al[al_len]);
812     al_len++;
813   }
814   delete(new_al);
815   return ret;
816 }
817
818 int handleRemoteConnect(int fd, int portFd) {
819     // NOTE: This routine is not generic; it is very specific to paradyn
820     // (due to the sscanf(line, "PARADYND %d")...)
821     // Hence it makes absolutely no sense to put it in a so-called library.
822     // It should be put into the paradyn/src tree --ari
823
824     FILE *pfp = P_fdopen(fd, "r");
825     if (pfp == NULL) {
826        cerr << "handleRemoteConnect: fdopen of fd " << fd << " failed." << endl;
827        return -1;
828     }
829
830     int retFd = RPC_getConnect(portFd); // calls accept(), which blocks (sigh...)
831     return retFd;
832 }
833
834 //
835 // use rsh to get a remote process started.
836 //
837 // We do an rsh to start a process and them wait for the process 
838 // to do a connect. There is no guarantee that the process we are waiting for is
839 // the one that gets the connection. This can happen with paradyndPVM: the
840 // daemon that is started by a rshCommand will start other daemons, and one of 
841 // these daemons may get the connection that should be for the first daemon.
842 // Daemons should always get a connection before attempting to start other daemons.
843 //
844
845 int rshCommand(const string hostName, const string userName, 
846                const string command, const vector<string> &arg_list, int portFd)
847 {
848     int fd[2];
849     int shellPid;
850     int ret;
851
852     int total = command.length() + 2;
853     for (unsigned i=0; i<arg_list.size(); i++) {
854       total += arg_list[i].length() + 2;
855     }
856
857     string paradyndCommand = command + " ";
858
859     for (unsigned j=0; j < arg_list.size(); j++)
860         paradyndCommand += arg_list[j] + " ";
861
862     // need to rsh to machine and setup io path.
863
864     if (pipe(fd)) {
865         perror("pipe");
866         return (-1);
867     }
868
869     shellPid = vfork();
870     if (shellPid == 0) {
871         /* child */
872         bool aflag;
873         aflag=(-1 != dup2(fd[1], 1)); /* copy it onto stdout */
874         assert(aflag);
875         aflag=(-1 != close(fd[0]));
876         assert(aflag);
877         aflag=(-1 != close(fd[1]));
878         assert(aflag);
879         if (userName.length()) {
880             ret = execlp(RSH_COMMAND, RSH_COMMAND, hostName.string_of(), "-l", 
881                          userName.string_of(), "-n", paradyndCommand.string_of(),
882                          "-l0", NULL);
883             fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
884         } else {
885             ret = execlp(RSH_COMMAND, RSH_COMMAND, hostName.string_of(), "-n", 
886                          paradyndCommand.string_of(), "-l0", NULL);
887             fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
888         }
889         _exit(-1);
890     } else if (shellPid > 0) {
891         close(fd[1]);
892     } else {
893         // error situation
894     }
895
896     return(handleRemoteConnect(fd[0], portFd));
897 }
898
899 int rexecCommand(const string hostName, const string userName, 
900                  const string command, const vector<string> &arg_list, int portFd)
901 {
902     struct servent *inport;
903
904     int total = command.length() + 2;
905     for (unsigned i=0; i<arg_list.size(); i++) {
906       total += arg_list[i].length() + 2;
907     }
908
909     char *paradyndCommand = new char[total+2];
910     assert(paradyndCommand);
911
912     sprintf(paradyndCommand, "%s ", command.string_of());
913     for (unsigned j=0; j<arg_list.size(); j++) {
914         P_strcat(paradyndCommand, arg_list[j].string_of());
915         P_strcat(paradyndCommand, " ");
916     }
917
918     inport = P_getservbyname("exec",  "tcp");
919
920     char *hname = P_strdup(hostName.string_of());
921     char *uname = P_strdup(userName.string_of());
922     int fd = P_rexec(&hname, inport->s_port, uname, NULL,
923                    paradyndCommand, NULL);
924     delete hname; delete uname;
925
926     if (fd < 0) {
927         perror("rexec");
928         printf("rexec failed\n");
929     }
930     if (paradyndCommand)
931       delete paradyndCommand;
932
933     return(handleRemoteConnect(fd, portFd));
934 }
935
936 /*
937  * 
938  * "command" will be appended to the front of the arg list
939  *
940  * if arg_list == NULL, command is the only argument used
941  */
942
943 int RPCprocessCreate(const string hostName, const string userName,
944                      const string command, const vector<string> &arg_list,
945                      int portFd, const bool useRexec)
946 {
947     int ret;
948     struct utsname unm;
949
950     if (P_uname(&unm) == -1)
951       assert(0);
952
953     if ((hostName == "") || 
954         (hostName == "localhost") ||
955         (hostName == unm.nodename))
956       ret = execCmd(command, arg_list, portFd);
957     else if (useRexec)
958       ret = rexecCommand(hostName, userName, command, arg_list, portFd);
959     else
960       ret = rshCommand(hostName, userName, command, arg_list, portFd);
961
962     return(ret);
963 }
964
965 int RPC_getConnect(const int fd) {
966   if (fd == -1)
967     return -1;
968
969   struct in_addr cli_addr;
970   size_t clilen = sizeof(cli_addr);
971   int new_fd = P_accept (fd, (struct sockaddr *) &cli_addr, &clilen);
972
973   if (new_fd < 0)
974     return -1;
975   else
976     return new_fd;
977 }
978
979 // TODO -- use vectors and strings ?
980 /*
981  *  RPCgetArg - break a string into blank separated words
982  *  Used to parse a command line.
983  *  
984  *  input --> a null terminated string, the command line
985  *  argc --> returns the number of args found
986  *  returns --> words (separated by blanks on command line)
987  *  Note --> this allocates memory 
988  */
989 bool RPCgetArg(vector<string> &arg, const char *input)
990 {
991 #define BLANK ' '
992
993   int word_len;
994   if (!input) 
995     return false;
996
997   int length = strlen(input);
998   int index = 0;
999
1000   /* advance past blanks in input */
1001   while ((index < length) && (input[index] == BLANK))
1002     index++;
1003
1004   /* input is all blanks, or NULL */
1005   if (index >= length) 
1006     return true;
1007
1008   do {
1009     /* the start of each string */
1010     word_len = 0; const char *word_start = input + index;
1011
1012     /* find the next BLANK and the WORD size */
1013     while ((index < length) && (input[index] != BLANK)) {
1014       index++; word_len++;
1015     }
1016
1017     /* copy the word */
1018     char *temp = new char[word_len+1];
1019     strncpy(temp, word_start, word_len);
1020     temp[word_len] = (char) 0;
1021     arg += temp;
1022     delete temp;
1023
1024     /* skip past consecutive blanks */
1025     while ((index < length) && (input[index] == BLANK))
1026       index++;
1027   } while (index < length);
1028   return true;
1029 }
1030
1031
1032 string getHostName() {
1033     struct utsname un;
1034     P_uname(&un);
1035     return string(un.nodename);
1036 }