Added TR_SYNC, which is sent by DYNINSTbreakPoint right after call
[dyninst.git] / paradynd / src / perfStream.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 #ifdef PARADYND_PVM
43 extern "C" {
44 #include <pvm3.h>
45 }
46 #include "pvm_support.h"
47 #endif
48
49 #include "util/h/headers.h"
50 #include "rtinst/h/rtinst.h"
51 #include "rtinst/h/trace.h"
52 #include "dyninstAPI/src/symtab.h"
53 #include "dyninstAPI/src/process.h"
54 #include "dyninstAPI/src/inst.h"
55 #include "dyninstAPI/src/instP.h"
56 #include "dyninstAPI/src/dyninstP.h"
57 #include "paradynd/src/metric.h"
58 #include "dyninstAPI/src/util.h"
59 #include "paradynd/src/comm.h"
60 #include "dyninstAPI/src/stats.h"
61 #include "paradynd/src/debugger.h"
62 #include "paradynd/src/main.h"
63 #include "paradynd/src/association.h"
64 #include "paradynd/src/init.h"
65 #include "paradynd/src/context.h"
66 #include "paradynd/src/perfStream.h"
67 #include "dyninstAPI/src/os.h"
68 #include "paradynd/src/mdld.h"
69 #include "paradynd/src/showerror.h"
70 #include "paradynd/src/main.h"
71 #include "util/h/debugOstream.h"
72
73 // trace data streams
74 #include "util/h/Dictionary.h"
75
76 // The following were all defined in process.C (for no particular reason)
77 extern debug_ostream attach_cerr;
78 extern debug_ostream inferiorrpc_cerr;
79 extern debug_ostream shmsample_cerr;
80 extern debug_ostream forkexec_cerr;
81 extern debug_ostream metric_cerr;
82 extern debug_ostream signal_cerr;
83 extern debug_ostream sharedobj_cerr;
84
85 string traceSocketPath; /* file path for trace socket */
86 int traceConnectInfo;
87 int traceSocketPort;
88
89 static void createResource(int pid, traceHeader *header, struct _newresource *r);
90 static void reportMemory(int pid, traceHeader *header, struct _traceMemory *r) ;
91
92 bool firstSampleReceived = false;
93
94 double cyclesPerSecond = 0;
95 extern time64 firstRecordTime; // util.C
96
97 // Read output data from process curr. 
98 void processAppIO(process *curr)
99 {
100     int ret;
101     char lineBuf[256];
102
103     ret = read(curr->ioLink, lineBuf, sizeof(lineBuf)-1);
104     if (ret < 0) {
105         //statusLine("read error");
106         //showErrorCallback(23, "Read error");
107         //cleanUpAndExit(-2);
108         string msg = string("Read error on IO stream from PID=") +
109                      string(curr->getPid()) + string(": ") +
110                      string(sys_errlist[errno]) + 
111                      string("\nNo more data will be received from this process.");
112         showErrorCallback(23, msg);
113         P_close(curr->ioLink);
114         curr->ioLink = -1;
115         return;
116     } else if (ret == 0) {
117         /* end of file -- process exited */
118         P_close(curr->ioLink);
119         curr->ioLink = -1;
120         string msg = string("Process ") + string(curr->getPid()) + string(" exited");
121         statusLine(msg.string_of());
122         handleProcessExit(curr,0);
123         return;
124     }
125
126     // null terminate it
127     lineBuf[ret] = '\0';
128
129     // forward the data to the paradyn process.
130     tp->applicationIO(curr->getPid(), ret, lineBuf);
131        // note: this is an async igen call, so the results may not appear right away.
132 }
133
134
135 char errorLine[1024];
136
137 void logLine(const char *line)
138 {
139     static char fullLine[1024];
140
141     strcat(fullLine, line);
142        // Ack!  Possible overflow!  Possible bug!
143        // If you put a '\n' at the end of every string passed to a call
144        // to logLine (and the string is < 1000 chars) then you'll be okay.
145        // Otherwise, watch out!
146
147     if (fullLine[strlen(fullLine)-1] == '\n') {
148         tp->applicationIO(0, strlen(fullLine), fullLine);
149         fullLine[0] = '\0';
150     }
151 }
152
153 void statusLine(const char *line)
154 {
155   tp->reportStatus(line);
156 }
157
158 // New with paradynd-->paradyn buffering.  When true, it tells the
159 // buffering routine to flush (to paradyn); otherwise, it would flush
160 // only when the buffer was full, hurting response time.
161 extern bool BURST_HAS_COMPLETED;
162
163 extern vector<process*> processVec;
164 extern process* findProcess(int); // should become a static method of class process
165
166 // trace data streams
167 extern bool TRACE_BURST_HAS_COMPLETED;
168 unsigned mid_hash(const unsigned &mid) {return mid;}
169 dictionary_hash<unsigned, unsigned> traceOn(mid_hash);
170
171 // Read trace data from process curr.
172 void processTraceStream(process *curr)
173 {
174     int ret;
175     traceStream sid;
176     char *recordData;
177     traceHeader header;
178     struct _association *a;
179
180     ret = read(curr->traceLink, &(curr->buffer[curr->bufEnd]), 
181                sizeof(curr->buffer)-curr->bufEnd);
182
183     if (ret < 0) {
184         //statusLine("read error, exiting");
185         //showErrorCallback(23, "Read error");
186         //curr->traceLink = -1;
187         //cleanUpAndExit(-2);
188         string msg = string("Read error on trace stream from PID=") +
189                      string(curr->getPid()) + string(": ") +
190                      string(sys_errlist[errno]) + 
191                      string("\nNo more data will be received from this process");
192         showErrorCallback(23, msg);
193         P_close(curr->traceLink);
194         curr->traceLink = -1;
195         return;
196     } else if (ret == 0) {
197         /* end of file */
198         // process exited unexpectedly
199         //string buffer = string("Process ") + string(curr->pid);
200         //buffer += string(" has exited unexpectedly");
201         //statusLine(P_strdup(buffer.string_of()));
202         //showErrorCallback(11, P_strdup(buffer.string_of()));
203         string msg = string("Process ") + string(curr->getPid()) + string(" exited");
204         statusLine(msg.string_of());
205         P_close(curr->traceLink);
206         curr->traceLink = -1;
207         handleProcessExit(curr,0);
208         return;
209     }
210
211     curr->bufEnd += ret;
212     curr->bufStart = 0;
213
214     while (curr->bufStart < curr->bufEnd) {
215         if (curr->bufEnd - curr->bufStart < (sizeof(traceStream) + sizeof(header))) {
216             break;
217         }
218
219         if (curr->bufStart % WORDSIZE != 0)     /* Word alignment check */
220             break;                      /* this will re-align by shifting */
221
222         memcpy(&sid, &(curr->buffer[curr->bufStart]), sizeof(traceStream));
223         curr->bufStart += sizeof(traceStream);
224
225         memcpy(&header, &(curr->buffer[curr->bufStart]), sizeof(header));
226         curr->bufStart += sizeof(header);
227
228         curr->bufStart = ALIGN_TO_WORDSIZE(curr->bufStart);
229         if (header.length % WORDSIZE != 0) {
230             sprintf(errorLine, "Warning: non-aligned length (%d) received on traceStream.  Type=%d\n", header.length, header.type);
231             logLine(errorLine);
232             showErrorCallback(36,(const char *) errorLine);
233         }
234             
235         if (curr->bufEnd - curr->bufStart < (unsigned)header.length) {
236             /* the whole record isn't here yet */
237             curr->bufStart -= sizeof(traceStream) + sizeof(header);
238             break;
239         }
240
241         recordData = &(curr->buffer[curr->bufStart]);
242         curr->bufStart +=  header.length;
243
244         if (!firstRecordTime)
245             firstRecordTime = header.wall;
246             // firstRecordTime is used by getCurrentTime() in util.C when arg passed
247             // is 'true', but noone seems to do that...so firstRecordTime is not a
248             // terribly important vrble (but, for now at least, it's used in metric.C)
249
250         // callback to paradyn (okay if this callback is done more than once; paradyn
251         // detects this.  This is important since right now, we're also gonna do this
252         // callback when starting a new process; needed since SHM_SAMPLING might well
253         // start sending samples to paradyn before any trace records were received
254         // here.)
255         static bool done_yet = false;
256         if (!done_yet) {
257            tp->firstSampleCallback(curr->getPid(), (double) (header.wall/1000000.0));
258            done_yet = true;
259         }
260         switch (header.type) {
261 #if defined(SHM_SAMPLING) && defined(MT_THREAD)
262             case TR_THREAD:
263                 createThread((traceThread *) ((void*)recordData));
264                 break;
265             case TR_THRSELF:
266                 updateThreadId((traceThrSelf *) ((void*)recordData));
267                 break;
268 #endif
269             case TR_NEW_RESOURCE:
270 //              cerr << "paradynd: received a new resource from pid " << curr->getPid() << "; processing now" << endl;
271                 createResource(curr->getPid(), &header, (struct _newresource *) ((void*)recordData));
272                    // createResource() is in this file, below
273                 break;
274
275             case TR_NEW_MEMORY:
276                 reportMemory(curr->getPid(), &header, (struct _traceMemory *) ((void*)recordData));
277                 break;
278
279             case TR_NEW_ASSOCIATION:
280                 a = (struct _association *) ((void*)recordData);
281                 newAssoc(curr, a->abstraction, a->type, a->key, a->value);
282                 break;
283
284 #ifndef SHM_SAMPLING
285             case TR_SAMPLE:
286                 // metric_cerr << "got something from pid " << curr->getPid() << endl;
287
288                  // sprintf(errorLine, "Got data from process %d\n", curr->getPid());
289                  // logLine(errorLine);
290 //              assert(curr->getFirstRecordTime());
291                 processSample(curr->getPid(), &header, (traceSample *) ((void*)recordData));
292                    // in metric.C
293                 firstSampleReceived = true;
294                 break;
295 #endif
296
297             case TR_EXIT:
298                 sprintf(errorLine, "process %d exited\n", curr->getPid());
299                 logLine(errorLine);
300                 printAppStats((struct endStatsRec *) ((void*)recordData),
301                               cyclesPerSecond);
302                 printDyninstStats();
303                 P_close(curr->traceLink);
304                 curr->traceLink = -1;
305                 handleProcessExit(curr, 0);
306                 break;
307
308 #ifndef SHM_SAMPLING
309             case TR_COST_UPDATE:
310                 processCost(curr, &header, (costUpdate *) ((void*)recordData));
311                    // in metric.C
312                 break;
313 #endif
314
315             case TR_CP_SAMPLE:
316                 // critical path sample
317                 extern void processCP(process *, traceHeader *, cpSample *);
318                 processCP(curr, &header, (cpSample *) recordData);
319                 break;
320
321             case TR_EXEC_FAILED:
322                 { int pid = *(int *)recordData;
323                   process *p = findProcess(pid);
324                   p->inExec = false;
325                   p->execFilePath = string("");
326                 }
327                 break;
328
329             case TR_DATA:
330                 extern void batchTraceData(int, int, int, char *);
331                 batchTraceData(0, sid, header.length, recordData);
332                 traceOn[sid] = 1;
333                 break;
334
335             case TR_SYNC:
336                 // to eliminate a race condition, --Zhichen
337                 break ;
338             default:
339                 sprintf(errorLine, "Got unknown record type %d on sid %d\n", 
340                     header.type, sid);
341                 logLine(errorLine);
342                 sprintf(errorLine, "Received bad trace data from process %d.", curr->getPid());
343                 showErrorCallback(37,(const char *) errorLine);
344         }
345     }
346     BURST_HAS_COMPLETED = true; // will force a batch-flush very soon
347
348     // trace data streams
349     for (dictionary_hash_iter<unsigned,unsigned> iter=traceOn; iter; iter++) {
350        const unsigned key = iter.currkey();
351        unsigned val = iter.currval();
352        
353        if (val) {
354              extern void batchTraceData(int, int, int, char *);
355              TRACE_BURST_HAS_COMPLETED = true;
356              // will force a trace-batch-flush very soon
357              batchTraceData(0, key, 0, (char *)NULL);
358              traceOn[key] = 0;
359              //sprintf(errorLine, "$$$Tag burst with mid %d\n", k);
360              //logLine(errorLine);
361         }
362     }
363
364     /* copy those bits we have to the base */
365     memcpy(curr->buffer, &(curr->buffer[curr->bufStart]), 
366         curr->bufEnd - curr->bufStart);
367     curr->bufEnd = curr->bufEnd - curr->bufStart;
368 }
369
370 void doDeferredRPCs() {
371    // Any RPCs waiting to be performed?  If so, and if it's safe to
372    // perform one, then launch one.
373    for (unsigned lcv=0; lcv < processVec.size(); lcv++) {
374       process *proc = processVec[lcv];
375       if (proc == NULL) continue; // proc must've died and has itself cleaned up
376       if (proc->status() == exited) continue;
377       if (proc->status() == neonatal) continue; // not sure if this is appropriate
378       
379       bool wasLaunched = proc->launchRPCifAppropriate(proc->status() == running,
380                                                       false);
381       // do we need to do anything with 'wasLaunched'?
382       if (wasLaunched)
383          inferiorrpc_cerr << "fyi: launched an inferior RPC" << endl;
384    }
385 }
386
387
388 void ioFunc()
389 {
390      printf("in SIG child func\n");
391      fflush(stdout);
392 }
393
394 #ifdef SHM_SAMPLING
395 static void checkAndDoShmSampling(time64 &pollTimeUSecs) {
396    // We assume that nextShmSampleTime (synched to getCurrWallTime())
397    // has already been set.  If the curr time is >= to this, then
398    // we should sample immediately, and update nextShmSampleTime to
399    // be nextShmSampleTime + sampleInterval, unless it is <= currTime,
400    // in which case we set it to currTime + sampleInterval.
401
402    // QUESTION: should we sample a given process while it's paused?  While it's
403    //           in the middle of an inferiorRPC?  For now, the answer is no
404    //           to both.
405
406    static time64 nextMajorSampleTime = 0;
407    static time64 nextMinorSampleTime = 0;
408
409    const time64 currWallTime = getCurrWallTime();
410       // checks for rollback
411
412    bool doMajorSample = false; // so far...
413    bool doMinorSample = false; // so far...
414
415    bool forNextTimeDoMinorSample = false; // so far...
416
417    if (currWallTime >= nextMajorSampleTime)
418       doMajorSample = true;
419    else if (currWallTime >= nextMinorSampleTime)
420       doMinorSample = true;
421    else
422       // it's not time to do anything.
423       return;
424
425    // Do shared memory sampling (for all processes) now!
426
427    // Loop thru all processes.  For each, process inferiorIntCounters,
428    // inferiorWallTimers, and inferiorProcessTimers.  But don't
429    // sample while an inferiorRPC is pending for that process, or for
430    // a non-running process.
431
432    for (unsigned lcv=0; lcv < processVec.size(); lcv++) {
433       process *theProc = processVec[lcv];
434       if (theProc == NULL)
435          continue; // proc died & had its structures cleaned up
436
437       // Don't sample paused/exited/neonatal processes, or even running processes
438       // that haven't been bootstrapped yet (i.e. haven't called DYNINSTinit yet),
439       // or processes that are in the middle of an inferiorRPC (we like for
440       // inferiorRPCs to finish up quickly).
441       if (theProc->status_ != running) {
442          //shmsample_cerr << "(-" << theProc->getStatusAsString() << "-)";
443          continue;
444       }
445       else if (!theProc->isBootstrappedYet()) {
446          //shmsample_cerr << "(-*-)" << endl;
447          continue;
448       }
449       else if (theProc->existsRPCinProgress()) {
450          //shmsample_cerr << "(-~-)" << endl;
451          continue;
452       }
453
454       if (doMajorSample) {
455          //shmsample_cerr << "(-Y-)" << endl;
456
457          if (!theProc->doMajorShmSample(currWallTime)) {
458             // The major sample didn't complete all of its work, so we
459             // schedule a minor sample for sometime in the near future
460             // (before the next major sample)
461
462             shmsample_cerr << "a minor sample will be needed" << endl;
463
464             forNextTimeDoMinorSample = true;
465          }
466       }
467       else if (doMinorSample) {
468          shmsample_cerr << "trying needed minor sample..."; cerr.flush();
469
470          if (!theProc->doMinorShmSample()) {
471             // The minor sample didn't complete all of its work, so
472             // schedule another one.
473             forNextTimeDoMinorSample = true;
474
475             shmsample_cerr << "it failed" << endl; cerr.flush();
476          }
477          else {
478             shmsample_cerr << "it succeeded" << endl; cerr.flush();
479          }
480       }
481    } // loop thru the processes
482
483    // And now, do the internal metrics
484    if (doMajorSample)
485       reportInternalMetrics(true);
486
487    // Here, we should probably flush the batch buffer (whether for a major
488    // sample or a minor one)
489    extern void flush_batch_buffer(); // metric.C (should be in this file)
490    flush_batch_buffer();
491
492    // Take currSamplingRate (which has values such as 0.2, 0.4, 0.8, 1.6, etc.)
493    // and multiply by a million to get the # of usecs per sample.
494    extern float currSamplingRate; // dynrpc.C
495    assert(currSamplingRate > 0);
496    const time64 shmSamplingInterval =
497               (time64)((double)currSamplingRate * 1000000.0);
498
499    if (doMajorSample) {
500       // If we just did a major sample, then we schedule the next major sample,
501       // and reset the next minor sample time.
502       nextMajorSampleTime += shmSamplingInterval;
503       if (nextMajorSampleTime <= currWallTime)
504          nextMajorSampleTime = currWallTime + shmSamplingInterval;
505    }
506
507    if (forNextTimeDoMinorSample) {
508       // If a minor sample is needed, then we schedule it.  For now, let's
509       // assume that a minor sample is always scheduled for now plus
510       // one-fourth of the original sampling rate...i.e. for now + (0.2 sec/4) =
511       // now + (0.05 sec), i.e. now + 50 milliseconds.
512 // temp: one-tenth of original sample rate...i.e. for now + 0.02 sec (+20 millisec)
513
514 //      nextMinorSampleTime = currWallTime + 50000; // 50ms = 50000us
515       nextMinorSampleTime = currWallTime + 20000; // 20ms = 20000us
516       if (nextMinorSampleTime > nextMajorSampleTime)
517          // oh, never mind, we'll just do the major sample which is going to
518          // happen first anyway.
519          nextMinorSampleTime = nextMajorSampleTime;
520    }
521    else {
522       // we don't need to do a minor sample next time, so reset nextMinorSampleTime
523       nextMinorSampleTime = nextMajorSampleTime;
524    }
525
526    time64 nextAnyKindOfSampleTime = nextMajorSampleTime;
527    if (nextMinorSampleTime < nextAnyKindOfSampleTime)
528       nextAnyKindOfSampleTime = nextMinorSampleTime;
529
530    assert(nextAnyKindOfSampleTime >= currWallTime);
531    const time64 shmSamplingTimeout = nextAnyKindOfSampleTime - currWallTime;
532
533    if (shmSamplingTimeout < pollTimeUSecs)
534       pollTimeUSecs = shmSamplingTimeout;
535 }
536 #endif
537
538
539 /*
540  * Wait for a data from one of the inferiors or a request to come in.
541  *
542  */
543
544 void controllerMainLoop(bool check_buffer_first)
545 {
546     int ct;
547     int width;
548     fd_set readSet;
549     fd_set errorSet;
550     struct timeval pollTime;
551     int traceSocket_fd;
552
553     // TODO - i am the guilty party - this will go soon - mdc
554 #ifdef PARADYND_PVM
555 #ifdef notdef
556     int fd_num, *fd_ptr;
557     if (pvm_mytid() < 0)
558       {
559         printf("pvm not working\n");
560         _exit(-1);
561       }
562     fd_num = pvm_getfds(&fd_ptr);
563     assert(fd_num == 1);
564 #endif
565 #endif
566
567 //    cerr << "welcome to controllerMainLoop...pid=" << getpid() << endl;
568 //    kill(getpid(), SIGSTOP);
569 //    cerr << "doing controllerMainLoop..." << endl;
570
571
572     /***
573        set up a socket to be used to create a trace link
574        by inferior processes that are not forked 
575        directly by this daemon.
576        This is a unix domain socket, which is bound to the file
577           <P_tmpdir>/paradynd.<pid>
578        where <P_tmpdir> is a constant defined in stdio.h (usually "/tmp" or
579        "/usr/tmp"), and <pid> is the pid of the paradynd process.
580
581        This socket is currently being used in two cases: when a
582        process forks and when we attach to a running process.  In the
583        fork case, the socket path can be passed in the environment (so
584        any name for the file would be ok), but in the attach case the
585        name is passed as an argument to DYNINSTinit. Since we
586        currently can only pass integer values as arguments, we use the
587        file name paradynd.<pid>, so that we need only to pass the pid
588        as the argument to DYNINSTinit, which can then determine the
589        full file name.
590
591        traceSocket_fd is the file descriptor of a socket, ready to receive
592        connections.
593        It represents a socket created with socket(); listen()
594        In other words, one which we intend to call accept() on.
595        (See perfStream.C -- the call to RPC_getConnect(traceSocket_fd))
596     ***/
597
598 #if !defined(i386_unknown_nt4_0)
599     traceSocketPath = string(P_tmpdir) + string("paradynd.") + string(getpid());
600     // unlink it, in case the file was left around from a previous run
601     unlink(traceSocketPath.string_of());
602
603     if (!RPC_setup_socket_un(traceSocket_fd, traceSocketPath.string_of())) {
604       perror("paradynd -- can't setup socket");
605       cleanUpAndExit(-1);
606     }
607     traceConnectInfo = getpid();
608 #else
609     traceSocketPort = RPC_setup_socket(traceSocket_fd, PF_INET, SOCK_STREAM);
610     if (traceSocketPort < 0) {
611       perror("paradynd -- can't setup socket");
612       cleanUpAndExit(-1);
613     }
614     traceConnectInfo = traceSocketPort;
615 #endif
616
617
618     while (1) {
619         // we have moved this code at the beginning of the loop, so we will
620         // process signals before igen requets. this is to avoid problems when
621         // an inferiorRPC is waiting for a system call to complete and an igen
622         // requests arrives at that moment - naim
623         extern void checkProcStatus(); // check status of inferior processes
624         checkProcStatus();
625
626         FD_ZERO(&readSet);
627         FD_ZERO(&errorSet);
628         width = 0;
629         unsigned p_size = processVec.size();
630         for (unsigned u=0; u<p_size; u++) {
631             if (processVec[u] == NULL)
632                continue;
633
634             if (processVec[u]->traceLink >= 0)
635               FD_SET(processVec[u]->traceLink, &readSet);
636             if (processVec[u]->traceLink > width)
637               width = processVec[u]->traceLink;
638
639             if (processVec[u]->ioLink >= 0)
640               FD_SET(processVec[u]->ioLink, &readSet);
641             if (processVec[u]->ioLink > width)
642               width = processVec[u]->ioLink;
643         }
644
645         // add traceSocket_fd, which accept()'s new connections (from processes
646         // not launched via createProcess() [process.C], such as when a process
647         // forks, or when we attach to an already-running process).
648         if (traceSocket_fd > 0) FD_SET(traceSocket_fd, &readSet);
649         if (traceSocket_fd > width) width = traceSocket_fd;
650
651         // add our igen connection with the paradyn process.
652         FD_SET(tp->get_fd(), &readSet);
653         FD_SET(tp->get_fd(), &errorSet);
654         if (tp->get_fd() > width) width = tp->get_fd();
655
656 #ifdef PARADYND_PVM
657         // add connection to pvm daemon.
658         /***
659           There is a problem here since pvm_getfds is not implemented on 
660           libpvmshmem which we use on solaris (a call to pvm_getfds returns
661           PvmNotImpl).
662           If we cannot use pvm_getfds here, the only alternative is to use polling.
663           To keep the code simple, I am using polling in all cases.
664         ***/
665 #ifdef notdef // not in use because pvm_getfds is not implemented on all platforms
666         fd_num = pvm_getfds(&fd_ptr);
667         assert(fd_num == 1);
668         FD_SET(fd_ptr[0], &readSet);
669         if (fd_ptr[0] > width)
670           width = fd_ptr[0];
671 #endif
672 #endif
673
674 #ifdef SHM_SAMPLING
675 // When _not_ shm sampling, rtinst defines a global vrble called
676 // DYNINSTin_sample, which is set to true while the application samples
677 // itself due to an alarm-expire.  When this variable is set, a call to
678 // DYNINSTstartProcessTimer() et al. will return immediately, taking
679 // no action.  This is of course a bad thing to happen.
680 // So: when not shm sampling, we mustn't do an inferiorRPC here.
681 // So we only do inferiorRPC here when SHM_SAMPLING.
682 // (What do we do when non-shm-sampling?  We wait until we're sure
683 // that we're not in the middle of processing a timer.  One way to do
684 // that is to manually reset DYNINSTin_sample when doing an RPC, and
685 // then restoring its initial value when done.  Instead, we wait for an
686 // ALARM signal to be delivered, and do pending RPCs just before we forward
687 // the signal.  Assuming ALARM signals aren't recursive, this should do the
688 // trick.  Ick...yet another reason to kill the ALARM signal and go with shm
689 // sampling.
690
691         doDeferredRPCs();
692 #endif
693         extern void doDeferedRPCasyncXDRWrite();
694         doDeferedRPCasyncXDRWrite();
695
696 #if !defined(i386_unknown_nt4_0)
697         time64 pollTimeUSecs = 50000;
698            // this is the time (rather arbitrarily) chosen fixed time length
699            // in which to check for signals, etc.
700 #else
701         // Windows NT wait happens in WaitForDebugEvent (in pdwinnt.C)
702         time64 pollTimeUSecs = 0;
703 #endif
704
705 #ifdef SHM_SAMPLING
706         checkAndDoShmSampling(pollTimeUSecs);
707            // does shm sampling of each process, as appropriate.
708            // may update pollTimeUSecs.
709 #endif 
710
711         pollTime.tv_sec  = pollTimeUSecs / 1000000;
712         pollTime.tv_usec = pollTimeUSecs % 1000000;
713
714         // This fd may have been read from prior to entering this loop
715         // There may be some bytes lying around
716         if (check_buffer_first) {
717           bool no_stuff_there = P_xdrrec_eof(tp->net_obj());
718           while (!no_stuff_there) {
719             T_dyninstRPC::message_tags ret = tp->waitLoop();
720             if (ret == T_dyninstRPC::error) {
721               // assume the client has exited, and leave.
722               cleanUpAndExit(-1);
723             }
724             no_stuff_there = P_xdrrec_eof(tp->net_obj());
725           }
726         }
727
728         // TODO - move this into an os dependent area
729         ct = P_select(width+1, &readSet, NULL, &errorSet, &pollTime);
730
731         if (ct > 0) {
732
733             if (traceSocket_fd >= 0 && FD_ISSET(traceSocket_fd, &readSet)) {
734               // Either (1) a process we're measuring has forked, and the child
735               // process is asking for a new connection, or (2) a process we've
736               // attached to is asking for a new connection.
737
738               processNewTSConnection(traceSocket_fd); // context.C
739             }
740
741             unsigned p_size = processVec.size();
742             for (unsigned u=0; u<p_size; u++) {
743                 if (processVec[u] == NULL)
744                    continue; // process structure has been deallocated
745
746                 if (processVec[u] && processVec[u]->traceLink >= 0 && 
747                        FD_ISSET(processVec[u]->traceLink, &readSet)) {
748                     processTraceStream(processVec[u]);
749
750                     /* in the meantime, the process may have died, setting
751                        processVec[u] to NULL */
752
753                     /* clear it in case another process is sharing it */
754                     if (processVec[u] &&
755                         processVec[u]->traceLink >= 0)
756                            // may have been set to -1
757                        FD_CLR(processVec[u]->traceLink, &readSet);
758                 }
759
760                 if (processVec[u] && processVec[u]->ioLink >= 0 && 
761                        FD_ISSET(processVec[u]->ioLink, &readSet)) {
762                     processAppIO(processVec[u]);
763
764                     // app can (conceivably) die in processAppIO(), resulting
765                     // in a processVec[u] to NULL.
766
767                     /* clear it in case another process is sharing it */
768                     if (processVec[u] && processVec[u]->ioLink >= 0)
769                        // may have been set to -1
770                        FD_CLR(processVec[u]->ioLink, &readSet);
771                 }
772             }
773
774             if (FD_ISSET(tp->get_fd(), &errorSet)) {
775                 // paradyn is gone so we go too.
776                 cleanUpAndExit(-1);
777             }
778
779             bool delayIGENrequests=false;
780             for (unsigned u1=0; u1<p_size; u1++) {
781               if (processVec[u1] == NULL)
782                 continue; // process structure has been deallocated
783  
784               if (processVec[u1]->isRPCwaitingForSysCallToComplete()) {
785                 delayIGENrequests=true;
786                 break;
787               }
788             }
789
790             // if we are waiting for a system call to complete in order to
791             // launch an inferiorRPC, we will avoid processing any igen
792             // request - naim
793             if (!delayIGENrequests) {
794               // Check if something has arrived from Paradyn on our igen link.
795               if (FD_ISSET(tp->get_fd(), &readSet)) {
796                 bool no_stuff_there = false;
797                 while(!no_stuff_there) {
798                   T_dyninstRPC::message_tags ret = tp->waitLoop();
799                   if (ret == T_dyninstRPC::error) {
800                     // assume the client has exited, and leave.
801                     cleanUpAndExit(-1);
802                   }
803                   no_stuff_there = P_xdrrec_eof(tp->net_obj());
804                 }
805               }
806               while (tp->buffered_requests()) {
807                 T_dyninstRPC::message_tags ret = tp->process_buffered();
808                 if (ret == T_dyninstRPC::error)
809                   cleanUpAndExit(-1);
810               }
811             }
812
813 #ifdef PARADYND_PVM
814 #ifdef notdef // not in use because of the problems with pvm_getfds. See comment above.
815             // message on pvmd channel
816             int res;
817             fd_num = pvm_getfds(&fd_ptr);
818             assert(fd_num == 1);
819             if (FD_ISSET(fd_ptr[0], &readSet)) {
820                 // res == -1 --> error
821                 res = PDYN_handle_pvmd_message();
822                 // handle pvm message
823             }
824 #endif
825 #endif
826         }
827
828 #ifdef PARADYND_PVM
829         // poll for messages from the pvm daemon, and handle the message if 
830         // there is one.
831         // See comments above on the problems with pvm_getfds.
832         if (pvm_running) {
833           PDYN_handle_pvmd_message();
834         }
835 #endif
836
837 #ifndef SHM_SAMPLING
838         // the ifdef is here because when shm sampling, reportInternalMetrics is
839         // already done.
840         reportInternalMetrics(false);
841 #endif
842     }
843 }
844
845
846 static void createResource(int pid, traceHeader *header, struct _newresource *r)
847 {
848     char *tmp;
849     char *name;
850     // resource *res;
851     vector<string> parent_name;
852     resource *parent = NULL;
853     unsigned type;
854     
855     switch (r->type) {
856     case RES_TYPE_STRING: type = MDL_T_STRING; break;
857     case RES_TYPE_INT:    type = MDL_T_INT; break;
858     default: 
859       string msg = string("Invalid resource type reported on trace stream from PID=")
860                    + string(pid);
861       showErrorCallback(36,msg);
862       return;
863     }
864
865     name = r->name;
866     do {
867         tmp = strchr(name, '/');
868         if (tmp) {
869             *tmp = '\0';
870             tmp++;
871             parent_name += name;
872             name = tmp;
873         }
874     } while (tmp);
875
876     if ((parent = resource::findResource(parent_name)) && name != r->name) {
877       resource::newResource(parent, NULL, r->abstraction, name,
878                             header->wall, "", type,
879                             true);
880     }
881     else {
882       string msg = string("Unknown resource '") + string(r->name) +
883                    string("' reported on trace stream from PID=") +
884                    string(pid);
885       showErrorCallback(36,msg);
886     }
887
888 }
889
890 //
891 // Blizzard
892 // report a piece of shared-memory
893 // After the paradyn received the piece of memory, it will
894 // generate ids for them and broadcast it to all the daemons
895 // via memoryInfoResponse in paradynd/dynrpc.C
896 // memoryInfoCallback is defined in paradyn/DMthread/DMmain.C
897 // it calls createResource_ncb which actually generate those ids
898 //
899 //     Paradynd                              Paradyn
900 // ------------------------------------------------------------
901 //   reportMemory                     
902 //        memoryInfoCallback  ........... memoryInforCallback
903 //                                         createResource_ncb
904 //   memoryInfoResponse ................. memoryInfoResponse  
905 //       newResource_ncb
906 //
907 //
908 static void reportMemory(int pid, traceHeader *, struct _traceMemory *r)
909 {
910     char        *name   = r->name;
911     int         va = r->va ;
912     unsigned    memSize = r->memSize ;
913     unsigned    blkSize = r->blkSize ;
914
915     tp->resourceBatchMode(true);
916     tp->memoryInfoCallback(0, name, va, memSize, blkSize) ;
917     tp->resourceBatchMode(false);
918 }