Updated with new support for Windows NT in front end and thread lib
[dyninst.git] / pdutil / src / rpcUtil.C
1 /*
2  * Copyright (c) 1996-1999 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 //
43 // This file defines a set of utility routines for RPC services.
44 // $Id: rpcUtil.C,v 1.63 1999/03/03 17:44:45 pcroth Exp $
45 //
46
47 // overcome malloc redefinition due to /usr/include/rpc/types.h declaring 
48 // malloc 
49 // This is ugly, and I hope to get rid of it -- mdc 2/2/95
50 #if defined(notdef)
51 /* prevents malloc from being redefined */
52 #ifdef MIPS
53 #define MALLOC_DEFINED_AS_VOID
54 #endif
55 #endif
56
57 #include <limits.h>
58 #include "util/h/headers.h"
59 #include "util/h/pdsocket.h"
60 #include "util/h/rpcUtil.h"
61
62 const char *RSH_COMMAND="rsh";
63
64 //---------------------------------------------------------------------------
65 // prototypes of utility functions used in this file
66 //---------------------------------------------------------------------------
67 #if defined(i386_unknown_nt4_0)
68 bool CreateSocketPair( PDSOCKET socks[2] );
69 #endif // defined(i386_unknown_nt4_0)
70
71
72 int RPCdefaultXDRRead(const void* handle, char *buf, const u_int len)
73 {
74     PDSOCKET sock = (PDSOCKET) handle;
75     int ret;
76
77 #if defined(i386_unknown_nt4_0)
78         ret = recv( sock, buf, len, 0 );
79 #else
80     do {
81         ret = P_read(sock, buf, len);
82     } while (ret < 0 && errno == EINTR);
83 #endif
84
85         if( (ret == PDSOCKET_ERROR) || (ret == 0))
86         {
87                 return (-1);
88         }
89     return (ret);
90 }
91
92 // one counter per file descriptor to record the sequnce no the message
93 // to be received
94 vector<int> counter_2;
95
96 // One partial message record for each file descriptor
97 vector<rpcBuffer *> partialMsgs;
98
99 int RPCasyncXDRRead(const void* handle, char *buf, const u_int len)
100 {
101     /* called when paradyn/xdr detects that it needs to read a message
102        from paradynd. */
103     PDSOCKET fd = (PDSOCKET) handle;
104     unsigned header;
105     int ret;
106     int needCopy = 0;
107     char *buffer = buf;
108     u_int internal_len = 0;
109     bool newfd = true;
110     unsigned i;
111
112     for (i = 0; i< partialMsgs.size(); i++) {
113                 assert(partialMsgs[i] != NULL);
114         if (partialMsgs[i] -> fd == fd) {
115             newfd = false;
116             break;
117         }
118     }
119     
120     // If new connection, allocate a partial message record
121     if (newfd) {
122         rpcBuffer *partial = new rpcBuffer;
123         partial -> fd  = fd;
124         partial -> len = 0;
125         partial -> buf = NULL;
126         partialMsgs += partial;
127         i = partialMsgs.size() - 1;
128         
129         counter_2 += 0;
130     }
131
132     // There're two situations dealt with here: with partial message
133     // left by previous read, without. The 'len' field is zero if it is
134     // the first case, nonzero if it is the second case    
135     if (partialMsgs[i] -> len) needCopy = 1;
136
137     // Allocate buffer which is four more bytes than RPC buffer for the
138     // convenience of processing message headers.
139     buffer = new char[len + sizeof(int)];
140
141     if (needCopy) {
142         P_memcpy(buffer, partialMsgs[i]->buf , partialMsgs[i]->len); 
143         delete [] partialMsgs[i]->buf;
144         partialMsgs[i]->buf = NULL;
145     }
146
147 #if defined(i386_unknown_nt4_0)
148         ret = recv(fd, buffer+partialMsgs[i]->len,
149                    len + sizeof(int) - partialMsgs[i]->len, 0);
150 #else
151     do {
152         ret = P_read(fd, buffer+partialMsgs[i]->len, 
153                      len + sizeof(int) -partialMsgs[i]->len);
154     } while (ret < 0 && errno == EINTR);
155 #endif
156
157     if((ret == PDSOCKET_ERROR) || (ret == 0))
158         {
159                 return(-1);
160         }
161
162     ret += partialMsgs[i]->len;
163
164     char *pstart = buffer;
165     
166     // Processing the messages received: remove those message headers;
167     // check on deliminator and sequence no to ensure the correctness;
168     // save the partial message if any
169     char *tail = buffer;
170     int is_left = ret - sizeof(int);
171     while (is_left >= 0) {
172         P_memcpy((char *)&header, buffer, sizeof(int));
173 //printf(">> header=%x, header1=%x, len=%x\n", header, ntohl(header));
174         header = ntohl(header);
175         assert(0xf == ((header >> 12)&0xf));
176
177         short seq_no;
178         P_memcpy((char *)&(seq_no), buffer, sizeof(short));
179         seq_no = ntohs(seq_no);
180         header = (0x0fff&header);
181         
182         if (header <= (unsigned)is_left) {
183             P_memcpy(tail, buffer+sizeof(int), header);
184
185             counter_2[i] = ((counter_2[i]+1)%SHRT_MAX);
186             assert(seq_no == counter_2[i]);
187
188             internal_len += header;
189             if (internal_len > len) {
190                 abort();
191                 ret = ret - sizeof(int) - is_left;
192                 break;
193             }
194             tail += header;
195             buffer += header + sizeof(int);
196
197             is_left = is_left - header - sizeof(int);
198             ret -= sizeof(int);
199         } else {
200             ret = ret - sizeof(int) - is_left;
201             break;
202         }
203     }
204
205
206
207     if (is_left >= 0) {
208         
209         partialMsgs[i]->len = is_left + sizeof(int);
210         partialMsgs[i]->buf = new char[partialMsgs[i]->len+1];
211         P_memcpy(partialMsgs[i]->buf, buffer, partialMsgs[i]->len); 
212     } else {
213         partialMsgs[i]->len = 0;
214         assert(is_left == (0-(int)sizeof(int)));
215     }
216
217     // Copy back to internal RPC buffer
218     P_memcpy(buf, pstart, ret);
219
220 //  if (needCopy) {
221 //    buffer = pstart;
222 //    assert(buffer != buf); // make sure we aren't deleting the 2d param
223 //    delete [] buffer;
224 //  }
225
226     if (pstart != buf) delete [] pstart;
227     
228     return (ret);
229 }
230
231 int RPCdefaultXDRWrite(const void* handle, const char *buf, const u_int len)
232 {
233     PDSOCKET sock = (PDSOCKET) handle;
234     int ret;
235
236 #if defined(i386_unknown_nt4_0)
237         ret = send(sock, buf, len, 0);
238 #else
239     do {
240
241         ret = P_write(sock, buf, len);
242     } while (ret < 0 && errno == EINTR);
243 #endif
244
245     errno = 0;
246     if (ret != (int)len)
247       return(-1);
248     else 
249       return (ret);
250 }
251
252 vector<rpcBuffer *> rpcBuffers;
253 static short counter = 0;
254
255 int RPCasyncXDRWrite(const void* handle, const char *buf, const u_int len)
256 {
257     int ret;
258     unsigned index = 0;
259     unsigned header = len;
260
261     //printf("RPCasyncXDRWrite(handle=%p, buf=%p, len=%d)\n", handle, buf, len);
262
263     rpcBuffer *rb = new rpcBuffer;
264     rb -> fd = (int) handle;
265     rb -> len = len+sizeof(int);
266     rb -> buf = new char[rb -> len];
267
268     counter = ((counter+1)%SHRT_MAX);
269
270     assert(len <= 4040);
271
272     // Adding a header to the messages sent   
273     header = ((counter << 16) | header | 0xf000); 
274
275     // Converting to network order
276     header = htonl(header);
277     //printf(">> header=%x, counter=%x, len=%x\n", header, counter, len);
278     
279     P_memcpy(rb -> buf, (char *)&header, sizeof(int));
280     P_memcpy(rb -> buf+sizeof(int), buf, len);
281
282     rpcBuffers += rb;
283
284     // Write the items to the other end if possible with asynchrous write
285     for (int i = 0; (i < (int)rpcBuffers.size()); i++) {
286
287 #if defined(i386_unknown_nt4_0)
288         u_long ioctlsock_arg = 1; // set non-blocking
289         if (ioctlsocket(rpcBuffers[i]->fd, FIONBIO, &ioctlsock_arg) == -1)
290             perror("ioctlsocket");
291
292         ret = (int) send(rpcBuffers[i]->fd, rpcBuffers[i]->buf, 
293                  rpcBuffers[i]->len, 0);
294
295         ioctlsock_arg = 0; // set blocking
296         if (ioctlsocket(rpcBuffers[i]->fd, FIONBIO, &ioctlsock_arg) == -1)
297             perror("ioctlsocket");
298
299 #else
300         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FNONBLOCK) == -1)
301             perror("fcntl");
302
303         ret = (int)P_write(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
304                            rpcBuffers[i]->len);
305
306         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FSYNC) == -1)
307             perror("fcntl");
308 #endif
309
310         if (rpcBuffers[i]->len != ret) {
311
312             if (ret != -1) {
313                 assert(ret < rpcBuffers[i]->len);
314                 printf("Warning: a partial message sent!\n");
315                 P_memcpy(rpcBuffers[i]->buf, rpcBuffers[i]->buf+ret,
316                          rpcBuffers[i]->len-ret);
317                 rpcBuffers[i]->len -= ret;
318             }
319             break;
320         }
321         index++;
322         
323     }
324     
325     // Delete the items sent out
326     unsigned j;
327     for (j = 0; j < index; j++) {
328         delete [] rpcBuffers[j]->buf;
329         delete rpcBuffers[j];
330         rpcBuffers[j] = NULL; // probably unnecessary
331     }    
332     
333     for (j = 0; j < rpcBuffers.size() - index; j++)
334         rpcBuffers[j] = rpcBuffers[j+index];
335     rpcBuffers.resize(rpcBuffers.size() - index);
336
337     return (ret = (int)len);
338 }
339
340 void
341 doDeferedRPCasyncXDRWrite() {
342
343     if (rpcBuffers.size() == 0)
344         return;
345
346     int ret, index = 0;
347
348     // Write the items to the other end if possible 
349     for (int i = 0; (i < (int)rpcBuffers.size()); i++) {
350
351 #if defined (i386_unknown_nt4_0)
352
353         u_long ioctlsock_arg = 1; // set non-blocking
354         if (ioctlsocket(rpcBuffers[i]->fd, FIONBIO, &ioctlsock_arg) == -1)
355             perror("ioctlsocket");
356
357         ret = (int) send(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
358                          rpcBuffers[i]->len, 0);
359
360         ioctlsock_arg = 0; // set blocking
361         if (ioctlsocket(rpcBuffers[i]->fd, FIONBIO, &ioctlsock_arg) == -1)
362             perror("ioctlsocket");
363
364 #else
365         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FNONBLOCK) == -1)
366             perror("fcntl");
367
368         ret = (int)P_write(rpcBuffers[i]->fd, rpcBuffers[i]->buf,
369                            rpcBuffers[i]->len);
370
371         if (P_fcntl (rpcBuffers[i]->fd, F_SETFL, FSYNC) == -1)
372             perror("fcntl");
373 #endif
374         
375         if (rpcBuffers[i]->len != ret) {
376
377             if (ret != -1) {
378                 assert(ret < rpcBuffers[i]->len);
379                 printf("Warning: a partial message sent!\n");
380                 P_memcpy(rpcBuffers[i]->buf, rpcBuffers[i]->buf+ret,
381                          rpcBuffers[i]->len-ret);
382                 rpcBuffers[i]->len -= ret;
383             }
384             break;
385         }
386         index++;
387
388     }
389     
390     // Delete the items sent out
391     unsigned j;
392     for (j = 0; j < (unsigned)index; j++) {
393         delete [] rpcBuffers[j]->buf;
394     }    
395
396     for (j = 0; j < rpcBuffers.size() - index; j++)
397         rpcBuffers[j] = rpcBuffers[j+index];
398     rpcBuffers.resize(rpcBuffers.size() - index);
399 }
400
401 XDRrpc::~XDRrpc()
402 {
403   if (sock != PDSOCKET_ERROR)
404     {
405 #if !defined(i386_unknown_nt4_0)
406       P_fcntl (sock, F_SETFL, FNDELAY);
407 #endif
408                 CLOSEPDSOCKET( sock );
409                 sock = PDSOCKET_ERROR;
410     }
411   if (xdrs) 
412     {
413       P_xdr_destroy (xdrs);
414       delete (xdrs);
415       xdrs = NULL;
416     }
417 }
418
419 //
420 // prepare for RPC's to be done/received on the passed fd.
421 //
422 XDRrpc::XDRrpc(PDSOCKET use_sock, xdr_rd_func readRoutine, xdr_wr_func writeRoutine, const int nblock)
423 : xdrs(NULL), sock(use_sock)
424 {
425     assert(use_sock != PDSOCKET_ERROR);
426     xdrs = new XDR;
427 #ifdef PURE_BUILD
428     memset(xdrs,'\0',sizeof(XDR));
429 #endif
430     assert(xdrs);
431     if (!readRoutine) {
432         if (nblock == 1) {
433             readRoutine = (xdr_rd_func) RPCasyncXDRRead;
434         } else { 
435             readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
436         }
437     }   
438     if (!writeRoutine) {
439         if (nblock == 2) {
440             writeRoutine = (xdr_wr_func) RPCasyncXDRWrite;
441         } else {    
442             writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
443         }
444     }   
445     P_xdrrec_create(xdrs, 0, 0, (char *) sock, readRoutine, writeRoutine);
446 }
447
448 //
449 // prepare for RPC's to be done/received on the passed fd.
450 //
451 XDRrpc::XDRrpc(const string &machine,
452                const string &user,
453                const string &program,
454                xdr_rd_func readRoutine, 
455                xdr_wr_func writeRoutine,
456                const vector<string> &arg_list,
457                const int nblock,
458                PDSOCKET wellKnownSocket)
459 : xdrs(NULL), sock(INVALID_PDSOCKET)
460 {
461     sock = RPCprocessCreate(machine, user, program, arg_list, wellKnownSocket);
462     if (sock != INVALID_PDSOCKET) {
463         xdrs = new XDR;
464 #ifdef PURE_BUILD
465         memset(xdrs,'\0',sizeof(XDR));
466 #endif
467         if (!readRoutine) {
468             if (nblock == 1) {
469                 readRoutine = (xdr_rd_func) RPCasyncXDRRead;
470             } else { 
471                 readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
472             }
473         }
474         if (!writeRoutine) {
475             if (nblock == 2) {
476                 writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
477             } else { 
478                 writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
479             }
480         }
481         P_xdrrec_create(xdrs, 0, 0, (char *) sock, readRoutine, writeRoutine);
482     }
483 }
484
485 bool
486 RPC_readReady (PDSOCKET sock, int timeout)
487 {
488   fd_set readfds;
489   struct timeval tvptr, *the_tv;
490
491   tvptr.tv_sec = timeout; tvptr.tv_usec = 0;
492   if (sock == INVALID_PDSOCKET) return false;
493   FD_ZERO(&readfds);
494   FD_SET (sock, &readfds);
495
496   // -1 timeout = blocking select
497   if (timeout == -1)
498      the_tv = 0;
499   else
500      the_tv = &tvptr;
501  
502   if (P_select (sock+1, &readfds, NULL, NULL, the_tv) == PDSOCKET_ERROR)
503     {
504       // if (errno == EBADF)
505         return false;
506     }
507   return (FD_ISSET (sock, &readfds));
508 }
509
510 /*
511  * Build an argument list starting at position 0.
512  * Note, this arg list will be used in an exec system call
513  * AND, the command name will have to be inserted at the head of the list
514  * But, a NULL space will NOT be left at the head of the list
515  */
516 bool RPC_make_arg_list(vector<string> &list,
517                        const int well_known_socket, const int flag,
518                        const int /* firstPVM */,
519                        const string machine_name, const bool use_machine)
520 {
521   char arg_str[100];
522
523   list.resize(0);
524
525   sprintf(arg_str, "%s%d", "-p", well_known_socket);  
526   list += arg_str;
527   // arg_list[arg_count++] = strdup (arg_str);  // 0
528
529   if (!use_machine) {
530     list += string("-m") + getHostName();
531   } else {
532     list += string("-m") + machine_name;
533   }
534   // arg_list[arg_count++] = strdup (arg_str); // 3
535
536   sprintf(arg_str, "%s%d", "-l", flag);
537   list += arg_str;
538   //arg_list[arg_count++] = strdup (arg_str);  // 4
539
540   //-v option to paradynd is obsolete
541   //sprintf(arg_str, "%s%d", "-v", firstPVM);
542   //list += arg_str;
543   // arg_list[arg_count++] = strdup (arg_str);  // 5 
544
545   return true;
546 }
547
548 // returns port number of socket that is listened on, or -1
549 // returns socket descriptor through sock parameter
550 int
551 RPC_setup_socket (PDSOCKET &sock,   // return socket descriptor
552                   const int family, // AF_INET ...
553                   const int type)   // SOCK_STREAM ...
554 {
555   if ((sock = P_socket(family, type, 0)) == INVALID_PDSOCKET)
556     return -1;
557
558   struct sockaddr_in serv_addr;
559   P_memset ((void*) &serv_addr, 0, sizeof(serv_addr));
560   serv_addr.sin_family = (short) family;
561   serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
562   serv_addr.sin_port = htons(0);
563   
564   size_t length = sizeof(serv_addr);
565
566   if (P_bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == PDSOCKET_ERROR)
567     return -1;
568
569   if (P_getsockname (sock, (struct sockaddr *) &serv_addr, &length) == PDSOCKET_ERROR)
570     return -1;
571
572   if (P_listen(sock, 128) == PDSOCKET_ERROR)  //Be prepared for lots of simultaneous connects
573     return -1;
574
575   return (ntohs (serv_addr.sin_port));
576 }
577
578 // setup a AF_UNIX domain socket of type SOCK_STREAM
579 // NT does not support AF_UNIX domain sockets
580 #if !defined(i386_unknown_nt4_0)
581 bool
582 RPC_setup_socket_un (int &sfd,   // return file descriptor
583                      const char *path) // file path socket is bound to
584 {
585   struct sockaddr_un saddr;
586
587   if ((sfd = P_socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
588     return false;
589
590   saddr.sun_family = AF_UNIX;
591   P_strcpy(saddr.sun_path, path);
592   
593   if (P_bind(sfd, (struct sockaddr *) &saddr, sizeof(saddr)) < 0)
594     return false;
595
596   if (P_listen(sfd, 128) < 0)  //Be prepared for lots of simultaneous connects
597     return false;
598
599   return true;
600 }
601 #endif // !defined(i386_unknown_nt4_0)
602
603 //
604 // connect to well known socket
605 //
606 XDRrpc::XDRrpc(int family,            
607                int req_port,             
608                int type,
609                const string machine, 
610                xdr_rd_func readRoutine,
611                xdr_wr_func writeRoutine,
612                const int nblock) : xdrs(NULL), sock(INVALID_PDSOCKET)
613      // socket, connect using machine
614 {
615   struct sockaddr_in serv_addr;
616   struct hostent *hostptr = 0;
617   struct in_addr *inadr = 0;
618   if (!(hostptr = P_gethostbyname(machine.string_of())))
619     return;
620
621   inadr = (struct in_addr *) ((void*) hostptr->h_addr_list[0]);
622   P_memset ((void*) &serv_addr, 0, sizeof(serv_addr));
623   serv_addr.sin_family = family;
624   serv_addr.sin_addr = *inadr;
625   serv_addr.sin_port = htons(req_port);
626
627   if ( (sock = P_socket(family, type, 0)) == PDSOCKET_ERROR)
628     { sock = INVALID_PDSOCKET; return; }
629
630   //connect() may timeout if lots of Paradynd's are trying to connect to
631   //  Paradyn at the same time, so we keep retrying the connect().
632   errno = 0;
633   while (P_connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == PDSOCKET_ERROR) {
634 #if defined(i386_unknown_nt4_0)
635     if (PDSOCKET_ERRNO != WSAETIMEDOUT)
636 #else
637     if (errno != ETIMEDOUT)
638 #endif
639         { sock = INVALID_PDSOCKET; return; } 
640     CLOSEPDSOCKET(sock);
641     if ((sock = P_socket(family, type, 0)) == PDSOCKET_ERROR)
642                 { sock = INVALID_PDSOCKET; return; }
643     errno = 0;
644   }
645
646     xdrs = new XDR;
647     assert(xdrs);
648     if (!readRoutine) {
649         if (nblock == 1) {
650             readRoutine = (xdr_rd_func) RPCasyncXDRRead;
651         } else {  
652             readRoutine = (xdr_rd_func) RPCdefaultXDRRead;
653         }
654         }
655     if (!writeRoutine) {
656         if (nblock == 2) {
657             writeRoutine = (xdr_wr_func)RPCasyncXDRWrite;
658         } else { 
659             writeRoutine = (xdr_wr_func) RPCdefaultXDRWrite;
660         }
661     }
662     P_xdrrec_create(xdrs, 0, 0, (char *) sock, readRoutine,writeRoutine);
663
664
665 //
666 // These xdr functions are not to be called with XDR_FREE
667 //
668 bool_t xdr_Boolean(XDR *xdrs, bool *bool_val) {
669    u_char i='\0';
670    bool_t res;
671    switch (xdrs->x_op) {
672    case XDR_ENCODE:
673       i = (u_char) *bool_val;
674       return (P_xdr_u_char(xdrs, &i));
675    case XDR_DECODE:
676       res = P_xdr_u_char(xdrs, &i);
677       *bool_val = (bool) i;
678       return res;
679    case XDR_FREE:
680    default:
681       assert(0);
682       return FALSE;
683    }
684 }
685
686 // XDR_FREE not handled, free it yourself!
687 // our version of string encoding that does malloc as needed.
688 //
689 bool_t xdr_string_pd(XDR *xdrs, string *str)
690 {
691   unsigned int length = 0;
692   assert(str);
693
694   // if XDR_FREE, str's memory is freed
695   switch (xdrs->x_op) {
696   case XDR_ENCODE:
697     if ((length = str->length())) {
698       if (!P_xdr_u_int(xdrs, &length))
699         return FALSE;
700       else {
701         char *buffer = const_cast<char*> (str->string_of()); 
702         bool_t res = P_xdr_string (xdrs, &buffer, str->length() + 1);
703         return res;
704       }
705     } else {
706       return (P_xdr_u_int(xdrs, &length));
707     }
708   case XDR_DECODE:
709     if (!P_xdr_u_int(xdrs, &length))
710       return FALSE;
711      else if (!length) {
712        *str = NULL;
713        return TRUE;
714      } else {
715        char *temp; 
716        bool newd = false;
717        unsigned max_len = 511;
718        char stat_buf[512];
719        if (length < 512) 
720           temp = (char*) stat_buf;
721        else {
722           temp = new char[length+1];
723           max_len = length;
724           newd = true;
725        }      
726        if (!P_xdr_string (xdrs, &temp, max_len)) {
727           if (newd) 
728             delete temp;
729           return FALSE;
730        } else {
731           *str = temp;
732           if (newd)
733             delete temp; 
734           return TRUE;
735        }
736      }
737   case XDR_FREE:
738   default:
739     // this should never occur  
740     assert(0);
741     return (FALSE);
742   }
743 }
744
745 // trace data streams
746 // XDR_FREE not handled, free it yourself!
747 // our version of string encoding that does malloc as needed.
748 //
749
750 bool_t xdr_byteArray_pd(XDR *xdrs, byteArray *bArray)
751 {
752   unsigned int length = 0;
753   assert(bArray);
754
755   // if XDR_FREE, str's memory is freed
756   switch (xdrs->x_op) {
757   case XDR_ENCODE:
758     length = bArray->length();
759     if (length > 0) {
760       if (!P_xdr_u_int(xdrs, &length))
761         return FALSE;
762       else {
763         char *buffer = static_cast<char*> (const_cast<void*> (bArray->getArray()));
764         bool_t res = P_xdr_byteArray (xdrs, &buffer, &length, (bArray->length()));
765         return res;
766       }
767     } else {
768       return (P_xdr_u_int(xdrs, &length));
769     }
770   case XDR_DECODE:
771     if (!P_xdr_u_int(xdrs, &length))
772       return FALSE;
773      else if (!length) {
774        *bArray = byteArray(NULL, 0);
775        return TRUE;
776      } else {
777        char *temp;
778        unsigned int act_len;
779        unsigned int max_len = length;
780        temp = new char[length];
781        if (!P_xdr_byteArray (xdrs, &temp, &act_len, max_len)) {
782           delete [] temp;
783           return FALSE;
784        } else if (act_len != length) {
785           delete [] temp;
786           return FALSE;
787        } else {
788           *bArray = byteArray(temp,act_len);
789           delete [] temp;
790           return TRUE;
791        }
792      }
793   case XDR_FREE:
794   default:
795     // this should never occur
796     assert(0);
797     return (FALSE);
798   }
799 }
800
801 //
802 // directly exec the command (local).
803 //
804
805 PDSOCKET
806 execCmd(const string command, const vector<string> &arg_list, int /*portFd*/)
807 {
808   PDSOCKET ret;
809   PDSOCKET sv[2];
810
811 #if !defined(i386_unknown_nt4_0)
812   int execlERROR;
813
814   errno = 0;
815   ret = P_socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
816   if (ret==-1) return(ret);
817   execlERROR = 0;
818   int al_len = arg_list.size();
819   char **new_al = new char*[al_len+2];
820   // TODO argv[0] should not include path
821   new_al[0] = P_strdup(command.string_of());
822   new_al[al_len+1] = NULL;
823   for (int i=0; i<al_len; ++i)
824     new_al[i+1] = P_strdup(arg_list[i].string_of());
825   ret = -1;
826
827   int pid;
828   pid = vfork();
829   // close sv[0] after exec 
830   P_fcntl(sv[0],F_SETFD,1);  
831
832   if (pid == 0) {
833     if (P_close(sv[0])) { execlERROR = errno; _exit(-1);}
834     if (P_dup2(sv[1], 0)) { execlERROR = errno; _exit(-1);}
835     P_execvp(new_al[0], new_al);
836     execlERROR = errno;
837     _exit(-1);
838   } else if (pid > 0 && !execlERROR) {
839     P_close(sv[1]);
840     ret = sv[0];
841   }
842
843   al_len=0;
844   while (new_al[al_len]) {
845     free(new_al[al_len]);
846     al_len++;
847   }
848   delete [] new_al;
849
850 #else // !defined(i386_unknown_nt4_0)
851
852         // create a pair of socket endpoints and connect them
853         // 
854         // note that we create TCP/IP sockets here rather than
855         // using a named pipe because the socket returned from
856         // this function is used to communicate with a Paradyn
857         // daemon, and the communication between the WinNT
858         // Paradyn front end and a Paradyn daemon is implemented
859         // using a socket to support communication with daemons
860         // on remote machines
861         if( !CreateSocketPair( sv ) )
862         {
863                 return INVALID_PDSOCKET;
864         }
865
866         // build a command line for the new process
867         string cmdLine( command );
868         unsigned i;
869         for( i = 0; i < arg_list.size(); i++ )
870         {
871                 cmdLine += " ";
872                 cmdLine += arg_list[i];
873         }
874
875         // set up the standard input of the new process to 
876         // be connected to our shared socket
877         STARTUPINFO startInfo;
878         GetStartupInfo( &startInfo );
879         startInfo.hStdInput = (HANDLE)sv[1];
880         startInfo.dwFlags |= STARTF_USESTDHANDLES;
881
882         // actually create the process
883         PROCESS_INFORMATION procInfo;
884         BOOL bCreated = CreateProcess( NULL,
885                                                                         (char*)cmdLine.string_of(),
886                                                                         NULL,
887                                                                         NULL,
888                                                                         TRUE,
889                                                                         NORMAL_PRIORITY_CLASS,
890                                                                         NULL,
891                                                                         NULL,
892                                                                         &startInfo,
893                                                                         &procInfo );
894
895         if( bCreated )
896         {
897                 // delay till child has started
898                 // TODO - how can we tell this?
899
900                 // release our hold on the child socket endpoint
901                 // and keep hold of our own endpoint
902                 CLOSEPDSOCKET( sv[1] );
903                 ret = sv[0];
904         }
905         else
906         {
907                 // cleanup
908                 CLOSEPDSOCKET( sv[0] );
909                 CLOSEPDSOCKET( sv[1] );
910
911                 // indicate the error
912                 ret = INVALID_PDSOCKET;
913         }
914
915 #endif // !defined(i386_unknown_nt4_0)
916
917         return ret;
918 }
919
920 int handleRemoteConnect(int fd, int portFd) {
921     // NOTE: This routine is not generic; it is very specific to paradyn
922     // (due to the sscanf(line, "PARADYND %d")...)
923     // Hence it makes absolutely no sense to put it in a so-called library.
924     // It should be put into the paradyn/src tree --ari
925
926     FILE *pfp = P_fdopen(fd, "r");
927     if (pfp == NULL) {
928        cerr << "handleRemoteConnect: fdopen of fd " << fd << " failed." << endl;
929        return -1;
930     }
931
932     int retFd = RPC_getConnect(portFd); // calls accept(), which blocks (sigh...)
933     return retFd;
934 }
935
936 //
937 // use rsh to get a remote process started.
938 //
939 // We do an rsh to start a process and them wait for the process 
940 // to do a connect. There is no guarantee that the process we are waiting for is
941 // the one that gets the connection. This can happen with paradyndPVM: the
942 // daemon that is started by a rshCommand will start other daemons, and one of 
943 // these daemons may get the connection that should be for the first daemon.
944 // Daemons should always get a connection before attempting to start other daemons.
945 //
946
947 int rshCommand(const string hostName, const string userName, 
948                const string command, const vector<string> &arg_list, int portFd)
949 {
950 #if defined(i386_unknown_nt4_0)
951     // TODO
952     assert(0);
953     return 0;
954 #else
955     int fd[2];
956     int shellPid;
957     int ret;
958
959     int total = command.length() + 2;
960     for (unsigned i=0; i<arg_list.size(); i++) {
961       total += arg_list[i].length() + 2;
962     }
963
964     string paradyndCommand = command + " ";
965
966     for (unsigned j=0; j < arg_list.size(); j++)
967         paradyndCommand += arg_list[j] + " ";
968
969     // need to rsh to machine and setup io path.
970
971     if (pipe(fd)) {
972         perror("pipe");
973         return (-1);
974     }
975
976     shellPid = vfork();
977     if (shellPid == 0) {
978         /* child */
979         bool aflag;
980         aflag=(-1 != dup2(fd[1], 1)); /* copy it onto stdout */
981         assert(aflag);
982         aflag=(-1 != close(fd[0]));
983         assert(aflag);
984         aflag=(-1 != close(fd[1]));
985         assert(aflag);
986         if (userName.length()) {
987             ret = execlp(RSH_COMMAND, RSH_COMMAND, hostName.string_of(), "-l", 
988                          userName.string_of(), "-n", paradyndCommand.string_of(),
989                          "-l0", NULL);
990             fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
991         } else {
992             ret = execlp(RSH_COMMAND, RSH_COMMAND, hostName.string_of(), "-n", 
993                          paradyndCommand.string_of(), "-l0", NULL);
994             fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
995         }
996         _exit(-1);
997     } else if (shellPid > 0) {
998         close(fd[1]);
999     } else {
1000         // error situation
1001     }
1002
1003     return(handleRemoteConnect(fd[0], portFd));
1004 #endif
1005 }
1006
1007 int rexecCommand(const string hostName, const string userName, 
1008                  const string command, const vector<string> &arg_list, int portFd)
1009 {
1010 #if defined(i386_unknown_nt4_0)
1011     // TODO
1012     assert(0);
1013     return 0;
1014 #else
1015     struct servent *inport;
1016
1017     int total = command.length() + 2;
1018     for (unsigned i=0; i<arg_list.size(); i++) {
1019       total += arg_list[i].length() + 2;
1020     }
1021
1022     char *paradyndCommand = new char[total+2];
1023     assert(paradyndCommand);
1024
1025     sprintf(paradyndCommand, "%s ", command.string_of());
1026     for (unsigned j=0; j<arg_list.size(); j++) {
1027         P_strcat(paradyndCommand, arg_list[j].string_of());
1028         P_strcat(paradyndCommand, " ");
1029     }
1030
1031     inport = P_getservbyname("exec",  "tcp");
1032
1033     char *hname = P_strdup(hostName.string_of());
1034     char *uname = P_strdup(userName.string_of());
1035     int fd = P_rexec(&hname, inport->s_port, uname, NULL,
1036                    paradyndCommand, NULL);
1037     delete hname; delete uname;
1038
1039     if (fd < 0) {
1040         perror("rexec");
1041         printf("rexec failed\n");
1042     }
1043     if (paradyndCommand)
1044       delete paradyndCommand;
1045
1046     return(handleRemoteConnect(fd, portFd));
1047 #endif
1048 }
1049
1050 /*
1051  * 
1052  * "command" will be appended to the front of the arg list
1053  *
1054  * if arg_list == NULL, command is the only argument used
1055  */
1056
1057 PDSOCKET RPCprocessCreate(const string hostName, const string userName,
1058                      const string command, const vector<string> &arg_list,
1059                      int portFd, const bool useRexec)
1060 {
1061     PDSOCKET ret;
1062
1063     if ((hostName == "") || 
1064         (hostName == "localhost") ||
1065         (hostName == getHostName()))
1066       ret = execCmd(command, arg_list, portFd);
1067     else if (useRexec)
1068       ret = rexecCommand(hostName, userName, command, arg_list, portFd);
1069     else
1070       ret = rshCommand(hostName, userName, command, arg_list, portFd);
1071
1072     return(ret);
1073 }
1074
1075 PDSOCKET RPC_getConnect(PDSOCKET sock) {
1076   if (sock == INVALID_PDSOCKET)
1077     return INVALID_PDSOCKET;
1078
1079   struct sockaddr cli_addr;
1080   size_t clilen = sizeof(cli_addr);
1081   errno = 0;
1082   PDSOCKET new_sock = P_accept (sock, (struct sockaddr *) &cli_addr, &clilen);
1083
1084   if (new_sock == PDSOCKET_ERROR) {
1085     if (PDSOCKET_ERRNO == EMFILE) {
1086       cerr << "Cannot accept more connections:  Too many open files" << endl;
1087       cerr << "Please see your documentation for `ulimit'" << endl << flush;
1088     }
1089     return INVALID_PDSOCKET;
1090   }
1091   else
1092     return new_sock;
1093 }
1094
1095 // TODO -- use vectors and strings ?
1096 /*
1097  *  RPCgetArg - break a string into blank separated words
1098  *  Used to parse a command line.
1099  *  
1100  *  input --> a null terminated string, the command line
1101  *  argc --> returns the number of args found
1102  *  returns --> words (separated by blanks on command line)
1103  *  Note --> this allocates memory 
1104  */
1105 bool RPCgetArg(vector<string> &arg, const char *input)
1106 {
1107 #define BLANK ' '
1108
1109   int word_len;
1110   if (!input) 
1111     return false;
1112
1113   int length = strlen(input);
1114   int index = 0;
1115
1116   /* advance past blanks in input */
1117   while ((index < length) && (input[index] == BLANK))
1118     index++;
1119
1120   /* input is all blanks, or NULL */
1121   if (index >= length) 
1122     return true;
1123
1124   do {
1125     /* the start of each string */
1126     word_len = 0; const char *word_start = input + index;
1127
1128     /* find the next BLANK and the WORD size */
1129     while ((index < length) && (input[index] != BLANK)) {
1130       index++; word_len++;
1131     }
1132
1133     /* copy the word */
1134     char *temp = new char[word_len+1];
1135     strncpy(temp, word_start, word_len);
1136     temp[word_len] = (char) 0;
1137     arg += temp;
1138     delete temp;
1139
1140     /* skip past consecutive blanks */
1141     while ((index < length) && (input[index] == BLANK))
1142       index++;
1143   } while (index < length);
1144   return true;
1145 }
1146
1147
1148 string getHostName() {
1149 #if defined(i386_unknown_nt4_0)
1150     char nameBuf[1000];
1151     gethostname(nameBuf,999);
1152     return string(nameBuf);
1153 #else
1154     struct utsname un;
1155     P_uname(&un);
1156     return string(un.nodename);
1157 #endif
1158 }
1159
1160
1161 #if defined(i386_unknown_nt4_0)
1162
1163 bool
1164 CreateSocketPair( PDSOCKET socks[2] )
1165 {
1166         PDSOCKET sockListen = INVALID_PDSOCKET;
1167         bool bCreated = false;
1168         
1169
1170         // create sockets
1171         socks[0] = socket( AF_INET, SOCK_STREAM, PF_UNSPEC );
1172         socks[1] = INVALID_PDSOCKET;
1173         sockListen = socket( AF_INET, SOCK_STREAM, PF_UNSPEC );
1174         if( (sockListen != INVALID_PDSOCKET) && (socks[0] != INVALID_PDSOCKET) )
1175         {
1176                 // connect sockets...
1177
1178                 // ...bind one of the sockets...
1179                 struct hostent* pHostData = gethostbyname( "localhost" );
1180                 assert( pHostData != NULL );
1181
1182                 SOCKADDR_IN sin;
1183                 sin.sin_family = AF_INET;
1184                 sin.sin_port = 0;                       // dynamically assigned
1185                 sin.sin_addr.S_un.S_addr = *(unsigned long*)pHostData->h_addr;
1186                 if( bind( sockListen, (SOCKADDR*)&sin, sizeof(sin) ) != PDSOCKET_ERROR )
1187                 {
1188                         // ...connect the other to the bound socket
1189                         sockaddr_in sain;
1190                         int cbSain = sizeof( sain );
1191                         if( getsockname( sockListen, (SOCKADDR*)&sain, &cbSain ) != PDSOCKET_ERROR )
1192                         {
1193                                 if( listen( sockListen, 1 ) != PDSOCKET_ERROR )
1194                                 {
1195                                         // issue a "deferred" connect for socket 0
1196                                         sin.sin_port = sain.sin_port;
1197                                         if( connect( socks[0], (SOCKADDR*)&sin, sizeof(sin) ) != PDSOCKET_ERROR )
1198                                         {
1199                                                 // accept the connection
1200                                                 socks[1] = accept( sockListen, NULL, NULL );
1201                                                 if( socks[1] != INVALID_PDSOCKET )
1202                                                 {
1203                                                         // verify the sockets are connected -
1204                                                         // socket 0 should now be writable
1205                                                         fd_set wrset;
1206                                                         struct timeval tv;
1207
1208                                                         tv.tv_sec = 0;
1209                                                         tv.tv_usec = 0;
1210                                                         FD_ZERO( &wrset );
1211                                                         FD_SET( socks[0], &wrset );
1212                                                         if( select( 0, NULL, &wrset, NULL, &tv ) == 1 )
1213                                                         {
1214                                                                 bCreated = true;
1215                                                         }
1216                                                 }
1217                                         }
1218                                 }
1219                         }
1220                 }
1221         }
1222
1223         if( !bCreated )
1224         {
1225                 CLOSEPDSOCKET( socks[0] );
1226                 socks[0] = INVALID_PDSOCKET;
1227                 CLOSEPDSOCKET( socks[1] );
1228                 socks[1] = INVALID_PDSOCKET;
1229         }
1230
1231         CLOSEPDSOCKET( sockListen );
1232
1233         return bCreated;
1234 }
1235
1236
1237 #endif // defined(i386_unknown_nt4_0)
1238