Fixing race condition between inferiorRPC waiting for a system call to
[dyninst.git] / paradynd / src / perfStream.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 #ifdef PARADYND_PVM
43 extern "C" {
44 #include <pvm3.h>
45 }
46 #include "pvm_support.h"
47 #endif
48
49 #include "util/h/headers.h"
50 #include "rtinst/h/rtinst.h"
51 #include "rtinst/h/trace.h"
52 #include "dyninstAPI/src/symtab.h"
53 #include "dyninstAPI/src/process.h"
54 #include "dyninstAPI/src/inst.h"
55 #include "dyninstAPI/src/instP.h"
56 #include "dyninstAPI/src/dyninstP.h"
57 #include "paradynd/src/metric.h"
58 #include "dyninstAPI/src/util.h"
59 #include "paradynd/src/comm.h"
60 #include "dyninstAPI/src/stats.h"
61 #include "paradynd/src/debugger.h"
62 #include "paradynd/src/main.h"
63 #include "paradynd/src/association.h"
64 #include "paradynd/src/init.h"
65 #include "paradynd/src/context.h"
66 #include "paradynd/src/perfStream.h"
67 #include "dyninstAPI/src/os.h"
68 #include "paradynd/src/mdld.h"
69 #include "paradynd/src/showerror.h"
70 #include "paradynd/src/main.h"
71 #include "util/h/debugOstream.h"
72
73 // trace data streams
74 #include "util/h/Dictionary.h"
75
76 // The following were all defined in process.C (for no particular reason)
77 extern debug_ostream attach_cerr;
78 extern debug_ostream inferiorrpc_cerr;
79 extern debug_ostream shmsample_cerr;
80 extern debug_ostream forkexec_cerr;
81 extern debug_ostream metric_cerr;
82 extern debug_ostream signal_cerr;
83 extern debug_ostream sharedobj_cerr;
84
85 string traceSocketPath; /* file path for trace socket */
86 int traceConnectInfo;
87 int traceSocketPort;
88
89 static void createResource(int pid, traceHeader *header, struct _newresource *r);
90 static void reportMemory(int pid, traceHeader *header, struct _traceMemory *r) ;
91
92 bool firstSampleReceived = false;
93
94 double cyclesPerSecond = 0;
95 extern time64 firstRecordTime; // util.C
96
97 // Read output data from process curr. 
98 void processAppIO(process *curr)
99 {
100     int ret;
101     char lineBuf[256];
102
103     ret = read(curr->ioLink, lineBuf, sizeof(lineBuf)-1);
104     if (ret < 0) {
105         //statusLine("read error");
106         //showErrorCallback(23, "Read error");
107         //cleanUpAndExit(-2);
108         string msg = string("Read error on IO stream from PID=") +
109                      string(curr->getPid()) + string(": ") +
110                      string(sys_errlist[errno]) + 
111                      string("\nNo more data will be received from this process.");
112         showErrorCallback(23, msg);
113         P_close(curr->ioLink);
114         curr->ioLink = -1;
115         return;
116     } else if (ret == 0) {
117         /* end of file -- process exited */
118         P_close(curr->ioLink);
119         curr->ioLink = -1;
120         string msg = string("Process ") + string(curr->getPid()) + string(" exited");
121         statusLine(msg.string_of());
122         handleProcessExit(curr,0);
123         return;
124     }
125
126     // null terminate it
127     lineBuf[ret] = '\0';
128
129     // forward the data to the paradyn process.
130     tp->applicationIO(curr->getPid(), ret, lineBuf);
131        // note: this is an async igen call, so the results may not appear right away.
132 }
133
134
135 char errorLine[1024];
136
137 void logLine(const char *line)
138 {
139     static char fullLine[1024];
140
141     strcat(fullLine, line);
142        // Ack!  Possible overflow!  Possible bug!
143        // If you put a '\n' at the end of every string passed to a call
144        // to logLine (and the string is < 1000 chars) then you'll be okay.
145        // Otherwise, watch out!
146
147     if (fullLine[strlen(fullLine)-1] == '\n') {
148         tp->applicationIO(0, strlen(fullLine), fullLine);
149         fullLine[0] = '\0';
150     }
151 }
152
153 void statusLine(const char *line)
154 {
155   tp->reportStatus(line);
156 }
157
158 // New with paradynd-->paradyn buffering.  When true, it tells the
159 // buffering routine to flush (to paradyn); otherwise, it would flush
160 // only when the buffer was full, hurting response time.
161 extern bool BURST_HAS_COMPLETED;
162
163 extern vector<process*> processVec;
164 extern process* findProcess(int); // should become a static method of class process
165
166 // trace data streams
167 extern bool TRACE_BURST_HAS_COMPLETED;
168 unsigned mid_hash(const unsigned &mid) {return mid;}
169 dictionary_hash<unsigned, unsigned> traceOn(mid_hash);
170
171 // Read trace data from process curr.
172 void processTraceStream(process *curr)
173 {
174     int ret;
175     traceStream sid;
176     char *recordData;
177     traceHeader header;
178     struct _association *a;
179
180     ret = read(curr->traceLink, &(curr->buffer[curr->bufEnd]), 
181                sizeof(curr->buffer)-curr->bufEnd);
182
183     if (ret < 0) {
184         //statusLine("read error, exiting");
185         //showErrorCallback(23, "Read error");
186         //curr->traceLink = -1;
187         //cleanUpAndExit(-2);
188         string msg = string("Read error on trace stream from PID=") +
189                      string(curr->getPid()) + string(": ") +
190                      string(sys_errlist[errno]) + 
191                      string("\nNo more data will be received from this process");
192         showErrorCallback(23, msg);
193         P_close(curr->traceLink);
194         curr->traceLink = -1;
195         return;
196     } else if (ret == 0) {
197         /* end of file */
198         // process exited unexpectedly
199         //string buffer = string("Process ") + string(curr->pid);
200         //buffer += string(" has exited unexpectedly");
201         //statusLine(P_strdup(buffer.string_of()));
202         //showErrorCallback(11, P_strdup(buffer.string_of()));
203         string msg = string("Process ") + string(curr->getPid()) + string(" exited");
204         statusLine(msg.string_of());
205         P_close(curr->traceLink);
206         curr->traceLink = -1;
207         handleProcessExit(curr,0);
208         return;
209     }
210
211     curr->bufEnd += ret;
212     curr->bufStart = 0;
213
214     while (curr->bufStart < curr->bufEnd) {
215         if (curr->bufEnd - curr->bufStart < (sizeof(traceStream) + sizeof(header))) {
216             break;
217         }
218
219         if (curr->bufStart % WORDSIZE != 0)     /* Word alignment check */
220             break;                      /* this will re-align by shifting */
221
222         memcpy(&sid, &(curr->buffer[curr->bufStart]), sizeof(traceStream));
223         curr->bufStart += sizeof(traceStream);
224
225         memcpy(&header, &(curr->buffer[curr->bufStart]), sizeof(header));
226         curr->bufStart += sizeof(header);
227
228         curr->bufStart = ALIGN_TO_WORDSIZE(curr->bufStart);
229         if (header.length % WORDSIZE != 0) {
230             sprintf(errorLine, "Warning: non-aligned length (%d) received on traceStream.  Type=%d\n", header.length, header.type);
231             logLine(errorLine);
232             showErrorCallback(36,(const char *) errorLine);
233         }
234             
235         if (curr->bufEnd - curr->bufStart < (unsigned)header.length) {
236             /* the whole record isn't here yet */
237             curr->bufStart -= sizeof(traceStream) + sizeof(header);
238             break;
239         }
240
241         recordData = &(curr->buffer[curr->bufStart]);
242         curr->bufStart +=  header.length;
243
244         if (!firstRecordTime)
245             firstRecordTime = header.wall;
246             // firstRecordTime is used by getCurrentTime() in util.C when arg passed
247             // is 'true', but noone seems to do that...so firstRecordTime is not a
248             // terribly important vrble (but, for now at least, it's used in metric.C)
249
250         // callback to paradyn (okay if this callback is done more than once; paradyn
251         // detects this.  This is important since right now, we're also gonna do this
252         // callback when starting a new process; needed since SHM_SAMPLING might well
253         // start sending samples to paradyn before any trace records were received
254         // here.)
255         static bool done_yet = false;
256         if (!done_yet) {
257            tp->firstSampleCallback(curr->getPid(), (double) (header.wall/1000000.0));
258            done_yet = true;
259         }
260         switch (header.type) {
261 #if defined(SHM_SAMPLING) && defined(MT_THREAD)
262             case TR_THREAD:
263                 createThread((traceThread *) ((void*)recordData));
264                 break;
265             case TR_THRSELF:
266                 updateThreadId((traceThrSelf *) ((void*)recordData));
267                 break;
268 #endif
269             case TR_NEW_RESOURCE:
270 //              cerr << "paradynd: received a new resource from pid " << curr->getPid() << "; processing now" << endl;
271                 createResource(curr->getPid(), &header, (struct _newresource *) ((void*)recordData));
272                    // createResource() is in this file, below
273                 break;
274
275             case TR_NEW_MEMORY:
276                 reportMemory(curr->getPid(), &header, (struct _traceMemory *) ((void*)recordData));
277                 break;
278
279             case TR_NEW_ASSOCIATION:
280                 a = (struct _association *) ((void*)recordData);
281                 newAssoc(curr, a->abstraction, a->type, a->key, a->value);
282                 break;
283
284 #ifndef SHM_SAMPLING
285             case TR_SAMPLE:
286                 // metric_cerr << "got something from pid " << curr->getPid() << endl;
287
288                  // sprintf(errorLine, "Got data from process %d\n", curr->getPid());
289                  // logLine(errorLine);
290 //              assert(curr->getFirstRecordTime());
291                 processSample(curr->getPid(), &header, (traceSample *) ((void*)recordData));
292                    // in metric.C
293                 firstSampleReceived = true;
294                 break;
295 #endif
296
297             case TR_EXIT:
298                 sprintf(errorLine, "process %d exited\n", curr->getPid());
299                 logLine(errorLine);
300                 printAppStats((struct endStatsRec *) ((void*)recordData),
301                               cyclesPerSecond);
302                 printDyninstStats();
303                 P_close(curr->traceLink);
304                 curr->traceLink = -1;
305                 handleProcessExit(curr, 0);
306                 break;
307
308 #ifndef SHM_SAMPLING
309             case TR_COST_UPDATE:
310                 processCost(curr, &header, (costUpdate *) ((void*)recordData));
311                    // in metric.C
312                 break;
313 #endif
314
315             case TR_CP_SAMPLE:
316                 // critical path sample
317                 extern void processCP(process *, traceHeader *, cpSample *);
318                 processCP(curr, &header, (cpSample *) recordData);
319                 break;
320
321             case TR_EXEC_FAILED:
322                 { int pid = *(int *)recordData;
323                   process *p = findProcess(pid);
324                   p->inExec = false;
325                   p->execFilePath = string("");
326                 }
327                 break;
328
329             case TR_DATA:
330                 extern void batchTraceData(int, int, int, char *);
331                 batchTraceData(0, sid, header.length, recordData);
332                 traceOn[sid] = 1;
333                 break;
334
335             default:
336                 sprintf(errorLine, "Got unknown record type %d on sid %d\n", 
337                     header.type, sid);
338                 logLine(errorLine);
339                 sprintf(errorLine, "Received bad trace data from process %d.", curr->getPid());
340                 showErrorCallback(37,(const char *) errorLine);
341         }
342     }
343     BURST_HAS_COMPLETED = true; // will force a batch-flush very soon
344
345     // trace data streams
346     for (unsigned w = 0; w<traceOn.keys().size(); w++) {
347         if (traceOn.values()[w]) {
348              extern void batchTraceData(int, int, int, char *);
349              int k;
350              TRACE_BURST_HAS_COMPLETED = true;
351              // will force a trace-batch-flush very soon
352              batchTraceData(0, (k = traceOn.keys()[w]), 0, (char *)NULL);
353              traceOn[k] = 0;
354              //sprintf(errorLine, "$$$Tag burst with mid %d\n", k);
355              //logLine(errorLine);
356         }
357     }
358
359     /* copy those bits we have to the base */
360     memcpy(curr->buffer, &(curr->buffer[curr->bufStart]), 
361         curr->bufEnd - curr->bufStart);
362     curr->bufEnd = curr->bufEnd - curr->bufStart;
363 }
364
365 void doDeferredRPCs() {
366    // Any RPCs waiting to be performed?  If so, and if it's safe to
367    // perform one, then launch one.
368    for (unsigned lcv=0; lcv < processVec.size(); lcv++) {
369       process *proc = processVec[lcv];
370       if (proc == NULL) continue; // proc must've died and has itself cleaned up
371       if (proc->status() == exited) continue;
372       if (proc->status() == neonatal) continue; // not sure if this is appropriate
373       
374       bool wasLaunched = proc->launchRPCifAppropriate(proc->status() == running,
375                                                       false);
376       // do we need to do anything with 'wasLaunched'?
377       if (wasLaunched)
378          inferiorrpc_cerr << "fyi: launched an inferior RPC" << endl;
379    }
380 }
381
382
383 void ioFunc()
384 {
385      printf("in SIG child func\n");
386      fflush(stdout);
387 }
388
389 #ifdef SHM_SAMPLING
390 static void checkAndDoShmSampling(time64 &pollTimeUSecs) {
391    // We assume that nextShmSampleTime (synched to getCurrWallTime())
392    // has already been set.  If the curr time is >= to this, then
393    // we should sample immediately, and update nextShmSampleTime to
394    // be nextShmSampleTime + sampleInterval, unless it is <= currTime,
395    // in which case we set it to currTime + sampleInterval.
396
397    // QUESTION: should we sample a given process while it's paused?  While it's
398    //           in the middle of an inferiorRPC?  For now, the answer is no
399    //           to both.
400
401    static time64 nextMajorSampleTime = 0;
402    static time64 nextMinorSampleTime = 0;
403
404    const time64 currWallTime = getCurrWallTime();
405       // checks for rollback
406
407    bool doMajorSample = false; // so far...
408    bool doMinorSample = false; // so far...
409
410    bool forNextTimeDoMinorSample = false; // so far...
411
412    if (currWallTime >= nextMajorSampleTime)
413       doMajorSample = true;
414    else if (currWallTime >= nextMinorSampleTime)
415       doMinorSample = true;
416    else
417       // it's not time to do anything.
418       return;
419
420    // Do shared memory sampling (for all processes) now!
421
422    // Loop thru all processes.  For each, process inferiorIntCounters,
423    // inferiorWallTimers, and inferiorProcessTimers.  But don't
424    // sample while an inferiorRPC is pending for that process, or for
425    // a non-running process.
426
427    for (unsigned lcv=0; lcv < processVec.size(); lcv++) {
428       process *theProc = processVec[lcv];
429       if (theProc == NULL)
430          continue; // proc died & had its structures cleaned up
431
432       // Don't sample paused/exited/neonatal processes, or even running processes
433       // that haven't been bootstrapped yet (i.e. haven't called DYNINSTinit yet),
434       // or processes that are in the middle of an inferiorRPC (we like for
435       // inferiorRPCs to finish up quickly).
436       if (theProc->status_ != running) {
437          //shmsample_cerr << "(-" << theProc->getStatusAsString() << "-)";
438          continue;
439       }
440       else if (!theProc->isBootstrappedYet()) {
441          //shmsample_cerr << "(-*-)" << endl;
442          continue;
443       }
444       else if (theProc->existsRPCinProgress()) {
445          //shmsample_cerr << "(-~-)" << endl;
446          continue;
447       }
448
449       if (doMajorSample) {
450          //shmsample_cerr << "(-Y-)" << endl;
451
452          if (!theProc->doMajorShmSample(currWallTime)) {
453             // The major sample didn't complete all of its work, so we
454             // schedule a minor sample for sometime in the near future
455             // (before the next major sample)
456
457             shmsample_cerr << "a minor sample will be needed" << endl;
458
459             forNextTimeDoMinorSample = true;
460          }
461       }
462       else if (doMinorSample) {
463          shmsample_cerr << "trying needed minor sample..."; cerr.flush();
464
465          if (!theProc->doMinorShmSample()) {
466             // The minor sample didn't complete all of its work, so
467             // schedule another one.
468             forNextTimeDoMinorSample = true;
469
470             shmsample_cerr << "it failed" << endl; cerr.flush();
471          }
472          else {
473             shmsample_cerr << "it succeeded" << endl; cerr.flush();
474          }
475       }
476    } // loop thru the processes
477
478    // And now, do the internal metrics
479    if (doMajorSample)
480       reportInternalMetrics(true);
481
482    // Here, we should probably flush the batch buffer (whether for a major
483    // sample or a minor one)
484    extern void flush_batch_buffer(); // metric.C (should be in this file)
485    flush_batch_buffer();
486
487    // Take currSamplingRate (which has values such as 0.2, 0.4, 0.8, 1.6, etc.)
488    // and multiply by a million to get the # of usecs per sample.
489    extern float currSamplingRate; // dynrpc.C
490    assert(currSamplingRate > 0);
491    const time64 shmSamplingInterval =
492               (time64)((double)currSamplingRate * 1000000.0);
493
494    if (doMajorSample) {
495       // If we just did a major sample, then we schedule the next major sample,
496       // and reset the next minor sample time.
497       nextMajorSampleTime += shmSamplingInterval;
498       if (nextMajorSampleTime <= currWallTime)
499          nextMajorSampleTime = currWallTime + shmSamplingInterval;
500    }
501
502    if (forNextTimeDoMinorSample) {
503       // If a minor sample is needed, then we schedule it.  For now, let's
504       // assume that a minor sample is always scheduled for now plus
505       // one-fourth of the original sampling rate...i.e. for now + (0.2 sec/4) =
506       // now + (0.05 sec), i.e. now + 50 milliseconds.
507 // temp: one-tenth of original sample rate...i.e. for now + 0.02 sec (+20 millisec)
508
509 //      nextMinorSampleTime = currWallTime + 50000; // 50ms = 50000us
510       nextMinorSampleTime = currWallTime + 20000; // 20ms = 20000us
511       if (nextMinorSampleTime > nextMajorSampleTime)
512          // oh, never mind, we'll just do the major sample which is going to
513          // happen first anyway.
514          nextMinorSampleTime = nextMajorSampleTime;
515    }
516    else {
517       // we don't need to do a minor sample next time, so reset nextMinorSampleTime
518       nextMinorSampleTime = nextMajorSampleTime;
519    }
520
521    time64 nextAnyKindOfSampleTime = nextMajorSampleTime;
522    if (nextMinorSampleTime < nextAnyKindOfSampleTime)
523       nextAnyKindOfSampleTime = nextMinorSampleTime;
524
525    assert(nextAnyKindOfSampleTime >= currWallTime);
526    const time64 shmSamplingTimeout = nextAnyKindOfSampleTime - currWallTime;
527
528    if (shmSamplingTimeout < pollTimeUSecs)
529       pollTimeUSecs = shmSamplingTimeout;
530 }
531 #endif
532
533
534 /*
535  * Wait for a data from one of the inferiors or a request to come in.
536  *
537  */
538
539 void controllerMainLoop(bool check_buffer_first)
540 {
541     int ct;
542     int width;
543     fd_set readSet;
544     fd_set errorSet;
545     struct timeval pollTime;
546     int traceSocket_fd;
547
548     // TODO - i am the guilty party - this will go soon - mdc
549 #ifdef PARADYND_PVM
550 #ifdef notdef
551     int fd_num, *fd_ptr;
552     if (pvm_mytid() < 0)
553       {
554         printf("pvm not working\n");
555         _exit(-1);
556       }
557     fd_num = pvm_getfds(&fd_ptr);
558     assert(fd_num == 1);
559 #endif
560 #endif
561
562 //    cerr << "welcome to controllerMainLoop...pid=" << getpid() << endl;
563 //    kill(getpid(), SIGSTOP);
564 //    cerr << "doing controllerMainLoop..." << endl;
565
566
567     /***
568        set up a socket to be used to create a trace link
569        by inferior processes that are not forked 
570        directly by this daemon.
571        This is a unix domain socket, which is bound to the file
572           <P_tmpdir>/paradynd.<pid>
573        where <P_tmpdir> is a constant defined in stdio.h (usually "/tmp" or
574        "/usr/tmp"), and <pid> is the pid of the paradynd process.
575
576        This socket is currently being used in two cases: when a
577        process forks and when we attach to a running process.  In the
578        fork case, the socket path can be passed in the environment (so
579        any name for the file would be ok), but in the attach case the
580        name is passed as an argument to DYNINSTinit. Since we
581        currently can only pass integer values as arguments, we use the
582        file name paradynd.<pid>, so that we need only to pass the pid
583        as the argument to DYNINSTinit, which can then determine the
584        full file name.
585
586        traceSocket_fd is the file descriptor of a socket, ready to receive
587        connections.
588        It represents a socket created with socket(); listen()
589        In other words, one which we intend to call accept() on.
590        (See perfStream.C -- the call to RPC_getConnect(traceSocket_fd))
591     ***/
592
593 #if !defined(i386_unknown_nt4_0)
594     traceSocketPath = string(P_tmpdir) + string("paradynd.") + string(getpid());
595     // unlink it, in case the file was left around from a previous run
596     unlink(traceSocketPath.string_of());
597
598     if (!RPC_setup_socket_un(traceSocket_fd, traceSocketPath.string_of())) {
599       perror("paradynd -- can't setup socket");
600       cleanUpAndExit(-1);
601     }
602     traceConnectInfo = getpid();
603 #else
604     traceSocketPort = RPC_setup_socket(traceSocket_fd, PF_INET, SOCK_STREAM);
605     if (traceSocketPort < 0) {
606       perror("paradynd -- can't setup socket");
607       cleanUpAndExit(-1);
608     }
609     traceConnectInfo = traceSocketPort;
610 #endif
611
612
613     while (1) {
614         // we have moved this code at the beginning of the loop, so we will
615         // process signals before igen requets. this is to avoid problems when
616         // an inferiorRPC is waiting for a system call to complete and an igen
617         // requests arrives at that moment - naim
618         extern void checkProcStatus(); // check status of inferior processes
619         checkProcStatus();
620
621         FD_ZERO(&readSet);
622         FD_ZERO(&errorSet);
623         width = 0;
624         unsigned p_size = processVec.size();
625         for (unsigned u=0; u<p_size; u++) {
626             if (processVec[u] == NULL)
627                continue;
628
629             if (processVec[u]->traceLink >= 0)
630               FD_SET(processVec[u]->traceLink, &readSet);
631             if (processVec[u]->traceLink > width)
632               width = processVec[u]->traceLink;
633
634             if (processVec[u]->ioLink >= 0)
635               FD_SET(processVec[u]->ioLink, &readSet);
636             if (processVec[u]->ioLink > width)
637               width = processVec[u]->ioLink;
638         }
639
640         // add traceSocket_fd, which accept()'s new connections (from processes
641         // not launched via createProcess() [process.C], such as when a process
642         // forks, or when we attach to an already-running process).
643         if (traceSocket_fd > 0) FD_SET(traceSocket_fd, &readSet);
644         if (traceSocket_fd > width) width = traceSocket_fd;
645
646         // add our igen connection with the paradyn process.
647         FD_SET(tp->get_fd(), &readSet);
648         FD_SET(tp->get_fd(), &errorSet);
649         if (tp->get_fd() > width) width = tp->get_fd();
650
651 #ifdef PARADYND_PVM
652         // add connection to pvm daemon.
653         /***
654           There is a problem here since pvm_getfds is not implemented on 
655           libpvmshmem which we use on solaris (a call to pvm_getfds returns
656           PvmNotImpl).
657           If we cannot use pvm_getfds here, the only alternative is to use polling.
658           To keep the code simple, I am using polling in all cases.
659         ***/
660 #ifdef notdef // not in use because pvm_getfds is not implemented on all platforms
661         fd_num = pvm_getfds(&fd_ptr);
662         assert(fd_num == 1);
663         FD_SET(fd_ptr[0], &readSet);
664         if (fd_ptr[0] > width)
665           width = fd_ptr[0];
666 #endif
667 #endif
668
669 #ifdef SHM_SAMPLING
670 // When _not_ shm sampling, rtinst defines a global vrble called
671 // DYNINSTin_sample, which is set to true while the application samples
672 // itself due to an alarm-expire.  When this variable is set, a call to
673 // DYNINSTstartProcessTimer() et al. will return immediately, taking
674 // no action.  This is of course a bad thing to happen.
675 // So: when not shm sampling, we mustn't do an inferiorRPC here.
676 // So we only do inferiorRPC here when SHM_SAMPLING.
677 // (What do we do when non-shm-sampling?  We wait until we're sure
678 // that we're not in the middle of processing a timer.  One way to do
679 // that is to manually reset DYNINSTin_sample when doing an RPC, and
680 // then restoring its initial value when done.  Instead, we wait for an
681 // ALARM signal to be delivered, and do pending RPCs just before we forward
682 // the signal.  Assuming ALARM signals aren't recursive, this should do the
683 // trick.  Ick...yet another reason to kill the ALARM signal and go with shm
684 // sampling.
685
686         doDeferredRPCs();
687 #endif
688         extern void doDeferedRPCasyncXDRWrite();
689         doDeferedRPCasyncXDRWrite();
690
691 #if !defined(i386_unknown_nt4_0)
692         time64 pollTimeUSecs = 50000;
693            // this is the time (rather arbitrarily) chosen fixed time length
694            // in which to check for signals, etc.
695 #else
696         // Windows NT wait happens in WaitForDebugEvent (in pdwinnt.C)
697         time64 pollTimeUSecs = 0;
698 #endif
699
700 #ifdef SHM_SAMPLING
701         checkAndDoShmSampling(pollTimeUSecs);
702            // does shm sampling of each process, as appropriate.
703            // may update pollTimeUSecs.
704 #endif 
705
706         pollTime.tv_sec  = pollTimeUSecs / 1000000;
707         pollTime.tv_usec = pollTimeUSecs % 1000000;
708
709         // This fd may have been read from prior to entering this loop
710         // There may be some bytes lying around
711         if (check_buffer_first) {
712           bool no_stuff_there = P_xdrrec_eof(tp->net_obj());
713           while (!no_stuff_there) {
714             T_dyninstRPC::message_tags ret = tp->waitLoop();
715             if (ret == T_dyninstRPC::error) {
716               // assume the client has exited, and leave.
717               cleanUpAndExit(-1);
718             }
719             no_stuff_there = P_xdrrec_eof(tp->net_obj());
720           }
721         }
722
723         // TODO - move this into an os dependent area
724         ct = P_select(width+1, &readSet, NULL, &errorSet, &pollTime);
725
726         if (ct > 0) {
727
728             if (traceSocket_fd >= 0 && FD_ISSET(traceSocket_fd, &readSet)) {
729               // Either (1) a process we're measuring has forked, and the child
730               // process is asking for a new connection, or (2) a process we've
731               // attached to is asking for a new connection.
732
733               processNewTSConnection(traceSocket_fd); // context.C
734             }
735
736             unsigned p_size = processVec.size();
737             for (unsigned u=0; u<p_size; u++) {
738                 if (processVec[u] == NULL)
739                    continue; // process structure has been deallocated
740
741                 if (processVec[u] && processVec[u]->traceLink >= 0 && 
742                        FD_ISSET(processVec[u]->traceLink, &readSet)) {
743                     processTraceStream(processVec[u]);
744
745                     /* in the meantime, the process may have died, setting
746                        processVec[u] to NULL */
747
748                     /* clear it in case another process is sharing it */
749                     if (processVec[u] &&
750                         processVec[u]->traceLink >= 0)
751                            // may have been set to -1
752                        FD_CLR(processVec[u]->traceLink, &readSet);
753                 }
754
755                 if (processVec[u] && processVec[u]->ioLink >= 0 && 
756                        FD_ISSET(processVec[u]->ioLink, &readSet)) {
757                     processAppIO(processVec[u]);
758
759                     // app can (conceivably) die in processAppIO(), resulting
760                     // in a processVec[u] to NULL.
761
762                     /* clear it in case another process is sharing it */
763                     if (processVec[u] && processVec[u]->ioLink >= 0)
764                        // may have been set to -1
765                        FD_CLR(processVec[u]->ioLink, &readSet);
766                 }
767             }
768
769             if (FD_ISSET(tp->get_fd(), &errorSet)) {
770                 // paradyn is gone so we go too.
771                 cleanUpAndExit(-1);
772             }
773
774             bool delayIGENrequests=false;
775             for (unsigned u=0; u<p_size; u++) {
776               if (processVec[u] == NULL)
777                 continue; // process structure has been deallocated
778  
779               if (processVec[u]->isRPCwaitingForSysCallToComplete()) {
780                 delayIGENrequests=true;
781                 break;
782               }
783             }
784
785             // if we are waiting for a system call to complete in order to
786             // launch an inferiorRPC, we will avoid processing any igen
787             // request - naim
788             if (!delayIGENrequests) {
789               // Check if something has arrived from Paradyn on our igen link.
790               if (FD_ISSET(tp->get_fd(), &readSet)) {
791                 bool no_stuff_there = false;
792                 while(!no_stuff_there) {
793                   T_dyninstRPC::message_tags ret = tp->waitLoop();
794                   if (ret == T_dyninstRPC::error) {
795                     // assume the client has exited, and leave.
796                     cleanUpAndExit(-1);
797                   }
798                   no_stuff_there = P_xdrrec_eof(tp->net_obj());
799                 }
800               }
801               while (tp->buffered_requests()) {
802                 T_dyninstRPC::message_tags ret = tp->process_buffered();
803                 if (ret == T_dyninstRPC::error)
804                   cleanUpAndExit(-1);
805               }
806             }
807
808 #ifdef PARADYND_PVM
809 #ifdef notdef // not in use because of the problems with pvm_getfds. See comment above.
810             // message on pvmd channel
811             int res;
812             fd_num = pvm_getfds(&fd_ptr);
813             assert(fd_num == 1);
814             if (FD_ISSET(fd_ptr[0], &readSet)) {
815                 // res == -1 --> error
816                 res = PDYN_handle_pvmd_message();
817                 // handle pvm message
818             }
819 #endif
820 #endif
821         }
822
823 #ifdef PARADYND_PVM
824         // poll for messages from the pvm daemon, and handle the message if 
825         // there is one.
826         // See comments above on the problems with pvm_getfds.
827         if (pvm_running) {
828           PDYN_handle_pvmd_message();
829         }
830 #endif
831
832 #ifndef SHM_SAMPLING
833         // the ifdef is here because when shm sampling, reportInternalMetrics is
834         // already done.
835         reportInternalMetrics(false);
836 #endif
837     }
838 }
839
840
841 static void createResource(int pid, traceHeader *header, struct _newresource *r)
842 {
843     char *tmp;
844     char *name;
845     // resource *res;
846     vector<string> parent_name;
847     resource *parent = NULL;
848     unsigned type;
849     
850     switch (r->type) {
851     case RES_TYPE_STRING: type = MDL_T_STRING; break;
852     case RES_TYPE_INT:    type = MDL_T_INT; break;
853     default: 
854       string msg = string("Invalid resource type reported on trace stream from PID=")
855                    + string(pid);
856       showErrorCallback(36,msg);
857       return;
858     }
859
860     name = r->name;
861     do {
862         tmp = strchr(name, '/');
863         if (tmp) {
864             *tmp = '\0';
865             tmp++;
866             parent_name += name;
867             name = tmp;
868         }
869     } while (tmp);
870
871     if ((parent = resource::findResource(parent_name)) && name != r->name) {
872       resource::newResource(parent, NULL, r->abstraction, name,
873                             header->wall, "", type,
874                             true);
875     }
876     else {
877       string msg = string("Unknown resource '") + string(r->name) +
878                    string("' reported on trace stream from PID=") +
879                    string(pid);
880       showErrorCallback(36,msg);
881     }
882
883 }
884
885 // report a piece of shared-memory
886 static void reportMemory(int pid, traceHeader *header, struct _traceMemory *r)
887 {
888     char        *name   = r->name;
889     int         va = r->va ;
890     unsigned    memSize = r->memSize ;
891     unsigned    blkSize = r->blkSize ;
892
893     printf("reportMemory(%d, %s, %d, %u, %u)\n", pid, name, va, memSize, blkSize) ;
894     tp->resourceBatchMode(true);
895     tp->memoryInfoCallback(0, name, va, memSize, blkSize) ;
896     tp->resourceBatchMode(false);
897 }