- added support for (mips-sgi-irix6.4) native compiler build
[dyninst.git] / paradyn / src / DMthread / DMdaemon.C
1 /*
2  * Copyright (c) 1996-1998 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 /*
43  * $Id: DMdaemon.C,v 1.82 1999/08/09 05:47:42 csserra Exp $
44  * method functions for paradynDaemon and daemonEntry classes
45  */
46
47 #include <assert.h>
48 extern "C" {
49 double   quiet_nan();
50 #include <malloc.h>
51 #include <rpc/types.h>
52 #include <rpc/xdr.h>
53 #include <stdio.h>
54 }
55
56 #include <sys/types.h>
57 #include "thread/h/thread.h"
58 #include "paradyn/src/pdMain/paradyn.h"
59 #include "dataManager.thread.h"
60 #include "dyninstRPC.xdr.CLNT.h"
61 #include "DMdaemon.h"
62 #include "paradyn/src/TCthread/tunableConst.h"
63 #include "paradyn/src/UIthread/Status.h"
64 #include "DMmetric.h"
65 #include "paradyn/src/met/metricExt.h"
66 #include "DMtime.h"
67
68 // TEMP this should be part of a class def.
69 status_line *DMstatus=NULL;
70 status_line *PROCstatus=NULL;
71
72 // This is from met/metMain.C
73 void metCheckDaemonProcess( const string & );
74
75 // change a char* that points to "" to point to NULL
76 // NULL is used to signify "NO ARGUMENT"
77 // NULL is easier to detect than "" (which needs a strlen to detect)
78 /*
79 static void fixArg(char *&argToFix)
80 {
81   if (argToFix && !strlen(argToFix))
82     argToFix = (char*) 0;
83 }
84 */
85
86
87 static appState applicationState = appPaused; // status of the application.
88
89
90 metricInstance *DM_enableType::findMI(metricInstanceHandle mh){
91     for(u_int i=0; i < request->size(); i++){
92          if(mh == ((*request)[i])->getHandle()){
93              return ((*request)[i]);
94     } }
95     return 0;
96 }
97
98 void DM_enableType::setDone(metricInstanceHandle mh){
99     for(u_int i=0; i < request->size(); i++){
100         if(mh == ((*request)[i])->getHandle()){
101             (*done)[i] = true;
102             return;
103         } 
104     }
105 }
106
107 // find any matching completed mids and update the done values 
108 // if a matching value is successfully enabled done=true, else done= false
109 void DM_enableType::updateAny(vector<metricInstance *> &completed_mis,
110                               vector<bool> successful){
111
112    for(u_int i=0; i < done->size(); i++){
113        if(!(*done)[i]){  // try to update element
114            assert(not_all_done);
115            metricInstanceHandle mh = ((*request)[i])->getHandle();
116            for(u_int j=0; j < completed_mis.size(); j++){
117                if(completed_mis[j]){
118                    if(completed_mis[j]->getHandle() == mh){
119                        if(successful[j]) (*done)[i] = true;
120                        not_all_done--;
121                } } 
122            }
123    } }
124 }
125
126
127
128 // Called whenever a program is ready to run (both programs started by paradyn
129 // and programs forked by other programs). QUESTION: What about when a program does
130 // an exec syscall?
131 //
132 // The new program inherits all enabled metrics, and if the application is 
133 // running, the new program must run too.
134 // 
135 // Called by the igen routine newProgramCallbackFunc()
136
137 bool paradynDaemon::addRunningProgram (int pid,
138                                        const vector<string> &paradynd_argv,
139                                        paradynDaemon *daemon,
140                                        bool calledFromExec,
141                                        bool attached_runMe) {
142     executable *exec = NULL;
143
144     if (calledFromExec) {
145        for (unsigned i=0; i < programs.size(); i++) {
146           if ((int) programs[i]->pid == pid && programs[i]->controlPath == daemon) {
147              exec = programs[i];
148              break;
149           }
150        }
151        assert(exec);
152
153        // exec() doesn't change the pid, but paradynd_argv changes (big deal).
154        exec->argv = paradynd_argv;
155
156        // fall through (we probably want to execute the daemon->continueProcess())
157     }
158     else {
159        // the non-exec (the normal) case follows:
160        exec = new executable (pid, paradynd_argv, daemon);
161        programs += exec;
162        ++procRunning;
163
164        // the following propagates mi's to the new process IF it's the only
165        // process on the daemon.  Otherwise, the daemon can and does propagate
166        // on its own.  We don't call it in the exec case (above) since we know it
167        // wouldn't do anything.
168        daemon->propagateMetrics();
169     }
170
171     if (applicationState == appRunning || attached_runMe) {
172       //cerr << "paradyn: calling daemon->continueProcess off of addRunningProgram (machine " << daemon->getDaemonMachineName() << "; pid=" << pid << ") !" << endl;
173       daemon->continueProcess(pid);
174     }
175
176     if (procRunning == 1 && attached_runMe) {
177        // currently, Paradyn believes that the application is paused.
178        // Let's change that to 'running'.
179        if (!continueAll())
180           cerr << "warning: continueAll failed" << endl;
181        uiMgr->enablePauseOrRun();
182     }
183
184     return true;
185 }
186
187 //
188 // add a new paradyn daemon
189 // called when a new paradynd contacts the advertised socket
190 //
191 bool paradynDaemon::addDaemon (PDSOCKET sock)
192 {
193   // constructor adds new daemon to dictionary allDaemons
194   paradynDaemon *new_daemon = new paradynDaemon (sock);
195
196   if (new_daemon->errorConditionFound) {
197     //TODO: "something" has to be done for a proper clean up - naim
198     uiMgr->showError(6,"");
199     return(false);
200   }
201
202 //
203 // KLUDGE:  set socket buffer size to 64k to avoid write-write deadlock
204 //              between paradyn and paradynd
205 //
206 #if defined(sparc_sun_sunos4_1_3) || defined(hppa1_1_hp_hpux)
207    int num_bytes =0;
208    int size = sizeof(num_bytes);
209    num_bytes = 32768;
210    if(setsockopt(new_daemon->get_sock(),SOL_SOCKET,SO_SNDBUF,(char *)&num_bytes ,size) < 0){
211       perror("setsocketopt SND_BUF error");
212    }
213 #endif
214
215   msg_bind_socket (new_daemon->get_sock(), true, (int(*)(void*)) xdrrec_eof,
216                      (void*)new_daemon->net_obj(), &(new_daemon->stid));
217   assert(new_daemon);
218
219   // The pid is reported later in an upcall
220   return (true);
221 }
222
223 //  TODO : fix this so that it really does clean up state
224 // Dispose of daemon state.
225 //    This is called because someone wants to kill a daemon, or the daemon
226 //       died and we need to cleanup after it.
227 //
228 void paradynDaemon::removeDaemon(paradynDaemon *d, bool informUser)
229 {
230
231     if (informUser) {
232       string msg;
233       msg = string("paradynd has died on host <") + d->getDaemonMachineName()
234             + string(">");
235       uiMgr->showError(5, P_strdup(msg.string_of()));
236     }
237
238     d->dead = true;
239
240 #ifdef notdef
241     executable *e;
242     List<executable*> progs;
243     daemons.remove(d);
244
245     //
246     // Delete executables running on the dead paradyn daemon.
247     //
248     for (progs = programs; e = *progs; progs++) {
249        if (e->controlPath == d) {
250            programs.remove(e);
251            delete(e);
252        }
253     }
254
255 #endif
256
257     // tell the thread package to ignore the fd to the daemon.
258     msg_unbind(d->getSocketTid());
259 }
260
261
262 // get the current time of a daemon, to adjust for clock differences.
263 void getDaemonTime(paradynDaemon *pd) {
264   timeStamp t1 = getCurrentTime();
265   timeStamp dt = pd->getTime(); // daemon time
266   timeStamp t2 = getCurrentTime();
267   timeStamp delay = (t2 - t1) / 2.0;
268   pd->setTimeFactor(t1 - dt + delay);
269 }
270
271 //
272 // add a new daemon
273 // check to see if a daemon that matches the function args exists
274 // if it does exist, return a pointer to it
275 // otherwise, create a new daemon
276 //
277 paradynDaemon *paradynDaemon::getDaemonHelper(const string &machine, 
278                                               const string &login, 
279                                               const string &name) {
280
281     // find out if we have a paradynd on this machine+login+paradynd
282     paradynDaemon *pd;
283     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
284         pd = paradynDaemon::allDaemons[i];
285         if ((!machine.string_of() || (pd->machine == machine)) && 
286            (!login.string_of() || (pd->login == login))  &&
287            (name.string_of() && (pd->name == name))) {
288             return (pd);     
289         }
290     }
291   
292     // find a matching entry in the dictionary, and start it
293     daemonEntry *def = findEntry(machine, name);
294     if (!def) {
295         if (name.length()) {
296           string msg = string("Paradyn daemon \"") + name + string("\" not defined.");
297           uiMgr->showError(90,P_strdup(msg.string_of()));
298         }
299         else {
300           uiMgr->showError(91,"");
301         }
302         return ((paradynDaemon*) 0);
303     }
304
305     string m = machine; 
306     // fill in machine name if empty
307     if (!m.string_of()) {
308         m = default_host;
309     }
310
311     char statusLine[256];
312     sprintf(statusLine, "Starting daemon on %s",m.string_of());
313     uiMgr->updateStatus(DMstatus,P_strdup(statusLine));
314
315     string flav_arg(string("-z") + def->getFlavor());
316     unsigned asize = paradynDaemon::args.size();
317     paradynDaemon::args += flav_arg;
318
319     pd = new paradynDaemon(m, login, def->getCommandString(), 
320                                def->getRemoteShellString(), def->getNameString(),
321                                    def->getFlavorString());
322
323     if (pd->errorConditionFound) {
324       //TODO: "something" has to be done for a proper clean up - naim
325       string msg;
326       msg=string("Cannot create daemon process on host \"") + m + string("\"");
327       uiMgr->showError(84,P_strdup(msg.string_of())); 
328       return((paradynDaemon*) 0);
329     }
330
331 #if defined(sparc_sun_sunos4_1_3) || defined(hppa1_1_hp_hpux)
332    int num_bytes =0;
333    int nb_size = sizeof(num_bytes);
334    num_bytes = 32768;
335    if(setsockopt(pd->get_desc(),SOL_SOCKET,SO_SNDBUF,(char *)&num_bytes ,nb_size) < 0){
336       perror("setsocketopt SND_BUF error");
337    }
338 #endif
339
340     paradynDaemon::args.resize(asize);
341     uiMgr->updateStatus(DMstatus,P_strdup("ready"));
342
343     if (pd->get_sock() == PDSOCKET_ERROR) {
344         uiMgr->showError (6, "");
345         return((paradynDaemon*) 0);
346     }
347
348    // Send the initial metrics, constraints, and other neato things
349    mdl_send(pd);
350    // Send the initial metrics, constraints, and other neato things
351    vector<T_dyninstRPC::metricInfo> info = pd->getAvailableMetrics();
352    unsigned size = info.size();
353    for (unsigned u=0; u<size; u++)
354         addMetric(info[u]);
355
356     getDaemonTime(pd);
357
358     msg_bind_socket(pd->get_sock(), true, (int(*)(void*))xdrrec_eof,
359                      (void*) pd->net_obj(), &(pd->stid));
360     return (pd);
361 }
362
363 //  
364 // add a new daemon, unless a daemon is already running on that machine
365 // with the same machine, login, and program
366 //
367 bool paradynDaemon::getDaemon (const string &machine, 
368                                const string &login,
369                                const string &name){
370
371     if (!getDaemonHelper(machine, login, name)){
372         return false;
373     }
374     return true;
375 }
376
377 //
378 // define a new entry for the daemon dictionary, or change an existing entry
379 //
380 bool paradynDaemon::defineDaemon (const char *c, const char *d,
381                                   const char *l, const char *n,
382                                   const char *m, const char *r, const char *f) {
383
384   if(!n || !c)
385       return false;
386   string command = c;
387   string dir = d;
388   string login = l;
389   string name = n;
390   string machine = m;
391   string remote_shell = r;
392   string flavor = f;
393
394   daemonEntry *newE;
395   for(unsigned i=0; i < allEntries.size(); i++){
396       newE = allEntries[i];
397       if(newE->getNameString() == name){
398           if (newE->setAll(machine, command, name, login, dir, remote_shell, flavor))
399               return true;
400           else 
401               return false;
402       }
403   }
404
405   newE = new daemonEntry(machine, command, name, login, dir, remote_shell, flavor);
406   if (!newE)
407       return false;
408   allEntries += newE;
409   return true;
410 }
411
412
413 daemonEntry *paradynDaemon::findEntry(const string &, 
414                                       const string &n) {
415
416     // if (!n) return ((daemonEntry*) 0);
417     for(unsigned i=0; i < allEntries.size(); i++){
418         daemonEntry *newE = allEntries[i];
419         if(newE->getNameString() == n){
420            return(newE);
421         }
422     }
423     return ((daemonEntry*) 0);
424
425 }
426
427 void paradynDaemon::tellDaemonsOfResource(u_int parent, u_int my_id, 
428                                           const char *name, unsigned type) {
429     paradynDaemon *pd;
430     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
431         pd = paradynDaemon::allDaemons[i];
432         pd->addResource(parent,my_id,name, type);
433     }
434
435 }
436
437 void paradynDaemon::printEntries()
438 {
439     daemonEntry *entry;
440     for(unsigned i=0; i < allEntries.size(); i++){
441         entry = allEntries[i];
442         entry->print();
443     }
444 }
445
446 void paradynDaemon::print() 
447 {
448   cout << "DAEMON\n";
449   cout << "  name: " << name << endl;
450   cout << "  command: " << command << endl;
451   cout << "  login: " << login << endl;
452   cout << "  machine: " << machine << endl;
453   cout << "  flavor: " << flavor << endl;
454 }
455
456 void paradynDaemon::printDaemons()
457 {
458     paradynDaemon *pd;
459   cout << "ACTIVE DAEMONS\n";
460     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
461         pd = paradynDaemon::allDaemons[i];
462         pd->print();
463     }
464 }
465
466 void paradynDaemon::printPrograms()
467 {
468     executable *entry;
469     for(unsigned i=0; i < programs.size(); i++){
470         entry = programs[i];
471         cout << "PROGRAM ENTRY\n";
472         cout << "pid : " << entry->pid << endl;
473         entry->controlPath->print();
474     }
475 }
476
477 //
478 // Return list of names of defined daemons.  
479 //
480 vector<string> *paradynDaemon::getAvailableDaemons()
481 {
482     vector<string> *names = new vector<string>;
483
484     daemonEntry *entry;
485     for(unsigned i=0; i < allEntries.size(); i++){
486         entry = allEntries[i];
487         *names += entry->getName();
488     }
489     return(names);
490 }
491
492 // For a given machine name, find the appropriate paradynd structure(s).
493 vector<paradynDaemon*> paradynDaemon::machineName2Daemon(const string &mach) {
494    vector<paradynDaemon*> v;
495    for (unsigned i=0; i < allDaemons.size(); i++) {
496       paradynDaemon *theDaemon = allDaemons[i];
497       if (theDaemon->getDaemonMachineName() == mach)
498         v += theDaemon;
499    }
500    return v;
501 }
502
503 //Tempest
504 static int startBlzApp(const string &machine,
505                        const string &login,
506                        const string &, // name
507                        const string &, // dir
508                        const vector<string> &argv)
509 {
510 #if !defined(i386_unknown_nt4_0)
511         //int firstPVM = 1 ;                   // "-v"   1
512         //int  flag = 2;                       // "-l"   2
513         // flavor == "-z"   cow
514         string localhost ;                     // "-m"   gethostname
515         int  port = dataManager::dm->sock_port ; // "-p"   dataManager::sock_fd
516
517         char temp[256], *p;
518         string hostname = getHostName();
519         strcpy (temp, hostname.string_of());
520         if((p = strchr(temp, '.'))) *p = '\0' ;
521         localhost += temp ;
522
523         int from = 1, to = 4 ;
524         int time= 20, nodes = 4;
525         string   application ;
526         string   directory ;
527         bool     use_range = false ;
528
529         //parse the application commandline
530         for(unsigned i=0; i<argv.size(); i++)
531         {
532                 if(strncmp(argv[i].string_of(), "-nodes", 6)==0)
533                         sscanf(argv[i].string_of(), "-nodes%d", &nodes) ;
534                 else if(strncmp(argv[i].string_of(), "-range", 6)==0){
535                         sscanf(argv[i].string_of(), "-range%d:%d", &from, &to) ;
536                         use_range = true ;
537                 }
538                 else if(strncmp(argv[i].string_of(), "-time", 5)==0)
539                         sscanf(argv[i].string_of(), "-time%d", &time) ;
540                 else {
541                         if(application.length() > 0)
542                                 application += " " ;
543                         application += argv[i] ;
544                 }
545
546         }
547
548         // get the directory
549         strcpy(temp, application.string_of()) ;
550         printf("application = [%s]\n", application.string_of()) ;
551         if((p = strchr(temp, ' ')))
552                 *p = '\0' ;
553         else if((p = strchr(temp, '<')))
554                 *p = '\0' ;
555         else if((p = strchr(temp, '>')))
556                 *p = '\0' ;
557         p = strrchr(temp, '/') ; *p = '\0' ;
558         directory += temp ;
559         printf("directory=[%s]\n", directory.string_of()) ;
560
561         //make the scripts
562         string cpScript, pnScript ;
563         cpScript = directory + "/PD_ctrl_script" ;
564         pnScript = directory + "/PD_node_script" ;
565
566         //make  the serverscript
567         FILE *fp = fopen(cpScript.string_of(), "w") ;
568         fprintf(fp, "#!/bin/csh\n") ;
569         fprintf(fp, "crsh all %s\n", pnScript.string_of()) ;
570         fclose(fp) ;
571         system("chmod +x PD_ctrl_script") ;
572
573         //make the pn_script
574         fp = fopen(pnScript.string_of(), "w") ;
575         fprintf (fp, "#!/bin/csh\n") ;
576         fprintf (fp, "setenv LAM_CONFIG /p/wwt/myrinet/configs/lam.conf.master\n") ;
577         fprintf (fp, "cd %s\n", directory.string_of()) ;
578         //paradynd -p36622 -mgoofy -l1 -v1 -zcow -runme ....
579         fprintf (fp, "paradynd -p%d -m%s -l0 -v1 -zcow -runme %s\n",
580                      port,
581                      localhost.string_of(),
582                      application.string_of()) ;
583         fclose  (fp) ;
584         system("chmod +x PD_node_script") ;
585
586         //fork it
587         int shellPid ;
588         int ret ;
589         sprintf(temp, "%d", time) ;
590         string timeFrame  ; timeFrame += temp ;
591
592         string nodeRange ;
593         if(use_range) {
594                 char sfrom[64], sto[64] ;
595                 sprintf(sfrom, "%d", from) ;
596                 sprintf(sto, "%d", to) ;
597                 nodeRange += "-range ";
598                 nodeRange += sfrom ;
599                 nodeRange +=  ":" ;
600                 nodeRange += sto ;
601         } else
602         {
603                 sprintf(temp, "%d", nodes) ;
604                 nodeRange += "-nodes " ;
605                 nodeRange += temp ;
606         }
607
608         if(use_range)
609            sprintf(temp, "Starting job on nodes %d-%d, for %d minutes\n", from, to, time)
610 ;
611         else
612            sprintf(temp, "Starting job on %d nodes, for %d minutes\n", nodes, time) ;
613         uiMgr->updateStatus(DMstatus,P_strdup(temp));
614         shellPid = vfork() ;
615         if(shellPid == 0)
616         {
617                 if(login.length())
618                 {
619                  ret = execlp("rsh", "rsh", machine.string_of(), "-l",
620                         login.string_of(), "-n",
621                         "/p/cow/bin/crun", cpScript.string_of() ,
622                         "-capture",
623                         "-contig",
624                         "-time",   timeFrame.string_of(),
625                         nodeRange.string_of(),
626                         NULL);
627
628                 } else
629                 {
630                  ret = execlp("rsh", "rsh", machine.string_of(), "-n",
631                         "/p/cow/bin/crun", cpScript.string_of() ,
632                         "-capture",
633                         "-contig",
634                         "-time",   timeFrame.string_of(),
635                         nodeRange.string_of(),
636                         NULL);
637                 }
638                 fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
639                 _exit(-1) ;
640         } else if( shellPid > 0)
641                 ;
642         else    return false; // error situation
643
644         return true ;
645 #else // defined(i386_unknown_nt4_0)
646                 // TODO - implement this?
647                 return false;
648 #endif // defined(i386_unknown_nt4_0)
649 }
650
651
652 static bool execPOE(const string /* &machine*/, const string /* &login */,
653                     const string /* &name   */, const string         &dir,
654                     const vector<string> &argv, const vector<string>  args,
655                     daemonEntry          *de)
656 {
657         unsigned i;
658
659
660   char **s = (char**) malloc(sizeof(char*) * (args.size() + argv.size() + 5));
661   char   t[1024];
662   assert(s != NULL);
663
664   s[0] = strdup("poe");
665   s[1] = strdup(de->getCommand());
666
667   for (i = 0; i < args.size(); i++)
668     s[i+2] = (strcmp(args[i].string_of(), "-l1")) 
669              ? strdup(args[i].string_of()) : strdup("-l0");
670
671   sprintf(t, "-z%s", de->getFlavor());
672   s[args.size()+2] = strdup(t);
673   s[args.size()+3] = strdup("-runme");
674       
675   for (i = 0; i < argv.size(); i++) 
676     s[args.size()+4+i] = (i || strcmp(argv[i].string_of(), "poe"))
677                          ? strdup(argv[i].string_of()) : strdup("");
678
679   s[args.size()+argv.size()+4] = NULL;
680
681                       //IBM POE sets remote directory same as current directory
682   if (dir.length() && (P_chdir(dir.string_of()) < 0)) 
683     cerr << "cannot chdir to " << dir.string_of() << ": " 
684          << sys_errlist[errno] << endl;
685
686   return(execvp(s[0], s) != -1);
687 }
688
689
690
691 static bool rshPOE(const string         &machine, const string         &login,
692                    const string      /* &name*/,  const string         &dir,
693                    const vector<string> &argv,    const vector<string>  args,
694                    daemonEntry          *de)
695 {
696         unsigned i;
697   char *s[6];
698   char  t[1024];
699
700   s[0] = strdup("rsh");
701   s[1] = strdup(machine.string_of());
702   s[2] = (login.length()) ? strdup("-l"):              strdup("");
703   s[3] = (login.length()) ? strdup(login.string_of()): strdup("");
704
705   sprintf(t, "(");
706
707   if (dir.length()) strcat(strcat(strcat(t, "cd "), dir.string_of()), "; ");
708
709   strcat(strcat(strcat(t, "poe "), de->getCommand()), " ");
710
711   for (i = 0; i < args.size(); i++)
712     strcat(strcat(t, (strcmp(args[i].string_of(), "-l1")) 
713                      ? args[i].string_of() : "-l0"), " ");
714
715   strcat(strcat(strcat(t, "-z"), de->getFlavor()), " -runme ");
716       
717   for (i = 0; i < argv.size(); i++) 
718     strcat(strcat(t, (i || strcmp(argv[i].string_of(), "poe"))
719                      ? argv[i].string_of() : ""), " ");
720
721   strcat(t, ")");
722
723   s[4] = strdup(t);
724   s[5] = NULL;
725
726   return(execvp(s[0], s) != -1);
727 }
728
729
730
731 static bool startPOE(const string         &machine, const string         &login,
732                      const string         &name,    const string         &dir,
733                      const vector<string> &argv,    const vector<string>  args,
734                      daemonEntry          *de)
735 {
736 #if !defined(i386_unknown_nt4_0)
737   if (DMstatus)   uiMgr->updateStatus(DMstatus,   "ready");
738   if (PROCstatus) uiMgr->updateStatus(PROCstatus, "IBM POE");
739
740   if (fork()) return(true);
741
742   if ((machine.length() == 0) || (machine == "localhost") || 
743       (machine == getHostName()))
744     return(execPOE(machine, login, name, dir, argv, args, de));
745   else
746     return( rshPOE(machine, login, name, dir, argv, args, de));
747 #else // !defined(i386_unknown_nt4_0)
748         // TODO - implement this?
749         return false;
750 #endif // !defined(i386_unknown_nt4_0)
751 }
752
753
754
755 // TODO: fix this
756 //
757 // add a new executable (binary) to a program.
758 //
759 bool paradynDaemon::newExecutable(const string &machine,
760                                   const string &login,
761                                   const string &name, 
762                                   const string &dir, 
763                                   const vector<string> &argv){
764
765   if (! DMstatus)
766       DMstatus = new status_line("Data Manager");
767
768   if (! PROCstatus)
769       PROCstatus = new status_line("Processes");
770
771   //------------------
772   //Added to start blizzard application, Tempest
773
774   daemonEntry *def = findEntry(machine, name) ;
775   if (!def) {
776      if (name.length()) {
777         string msg = string("Paradyn daemon \"") + name + string("\" not defined.");
778         uiMgr->showError(90,P_strdup(msg.string_of()));
779      }
780      else {
781         uiMgr->showError(91,"");
782      }
783      return false;
784   }
785   if(def->getFlavorString() == "cow")
786         return startBlzApp(machine, login, name, dir, argv) ;
787   //------------
788
789   if (def->getFlavorString() == "mpi")
790     return(startPOE(machine, login, name, dir, argv, args, def));
791
792   paradynDaemon *daemon;
793   if ((daemon=getDaemonHelper(machine, login, name)) == (paradynDaemon*) NULL)
794       return false;
795
796   performanceStream::ResourceBatchMode(batchStart);
797   int pid = daemon->addExecutable(argv, dir);
798   performanceStream::ResourceBatchMode(batchEnd);
799
800   // did the application get started ok?
801   if (pid > 0 && !daemon->did_error_occur()) {
802       // TODO
803       char tmp_buf[80];
804       sprintf (tmp_buf, "PID=%d", pid);
805       uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
806 #ifdef notdef
807       executable *exec = new executable(pid, argv, daemon);
808       paradynDaemon::programs += exec;
809       ++procRunning;
810 #endif
811       return (true);
812   } else {
813       return(false);
814   }
815 }
816
817 bool paradynDaemon::attachStub(const string &machine,
818                                const string &userName,
819                                const string &cmd, // program name (full path)
820                                int the_pid,
821                                const string &daemonName,
822                                int afterAttach // 0 --> as is, 1 --> pause, 2 --> run
823                                ) {
824   // Note: by this time, both the RUN and PAUSE buttons have been disabled in the
825   // user interface...
826
827   if (! DMstatus)
828       DMstatus = new status_line("Data Manager");
829
830   if (! PROCstatus)
831       PROCstatus = new status_line("Processes");
832
833   paradynDaemon *daemon = getDaemonHelper(machine, userName, daemonName);
834   if (daemon == NULL)
835       return false;
836
837   char tmp_buf[128];
838   sprintf (tmp_buf, "attaching to PID=%d...", the_pid);
839   uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
840
841   performanceStream::ResourceBatchMode(batchStart);
842   bool success = daemon->attach(cmd, the_pid, afterAttach);
843   performanceStream::ResourceBatchMode(batchEnd);
844
845   if (daemon->did_error_occur())
846      return false;
847
848   if (!success)
849      return false;
850
851   sprintf (tmp_buf, "PID=%d", the_pid);
852   uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
853   return true; // success
854 }
855
856 //
857 // start the programs running.
858 //
859 bool paradynDaemon::startApplication()
860 {
861     executable *prog;
862     for(unsigned i=0; i < programs.size(); i++){
863         prog = programs[i];
864         prog->controlPath->startProgram(prog->pid);   
865     }
866     return(true);
867 }
868
869 //
870 // pause all processes.
871 //
872 bool paradynDaemon::pauseAll()
873 {
874     paradynDaemon *pd;
875     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
876         pd = paradynDaemon::allDaemons[i];
877         pd->pauseApplication();
878     }
879     // tell perf streams about change.
880     performanceStream::notifyAllChange(appPaused);
881     applicationState = appPaused;
882     return(true);
883 }
884
885 //
886 // pause one processes.
887 //
888 bool paradynDaemon::pauseProcess(unsigned pid)
889 {
890     executable *exec = 0;
891     for(unsigned i=0; i < programs.size(); i++){
892         exec = programs[i];
893         if(exec->pid == pid)
894             break;
895         exec = 0;
896     }
897     if (exec) {
898         exec->controlPath->pauseProgram(exec->pid);
899         return(true); 
900     } else
901         return (false);
902 }
903
904 //
905 // continue all processes.
906 //
907 bool paradynDaemon::continueAll()
908 {
909     paradynDaemon *pd;
910
911     if (programs.size() == 0)
912        return false; // no program to pause
913
914     if (procRunning == 0)
915        return false;
916
917     for(int i = paradynDaemon::allDaemons.size()-1; i >= 0; i--) 
918     {
919         pd = paradynDaemon::allDaemons[i];
920         pd->continueApplication();
921     }
922     // tell perf streams about change.
923     performanceStream::notifyAllChange(appRunning);
924     applicationState = appRunning;
925     return(true);
926 }
927
928 //
929 // continue one processes.
930 //
931 bool paradynDaemon::continueProcess(unsigned pid)
932 {
933     executable *exec = 0;
934     for(unsigned i=0; i < programs.size(); i++){
935         exec = programs[i];
936         if(exec->pid == pid)
937             break;
938         exec = 0;
939     }
940     if (exec) {
941         exec->controlPath->continueProgram(exec->pid);
942         return(true); 
943     } else
944         return (false);
945 }
946
947 //
948 // detach the paradyn tool from a running program.  This should clean all
949 //   of the dynamic instrumentation that has been inserted.
950 //
951 bool paradynDaemon::detachApplication(bool pause)
952 {
953     executable *exec = 0;
954     for(unsigned i=0; i < programs.size(); i++){
955         exec = programs[i];
956         exec->controlPath->detachProgram(exec->pid,pause);
957     }
958     return(true);
959 }
960
961 //
962 // print the status of each process.  This is used mostly for debugging.
963 //
964 void paradynDaemon::printStatus()
965 {
966     executable *exec = 0;
967     for(unsigned i=0; i < programs.size(); i++){
968         exec = programs[i];
969         string status = exec->controlPath->getStatus(exec->pid);
970             if (!exec->controlPath->did_error_occur()) {
971                 cout << status << endl;
972             }
973     }
974 }
975
976 //
977 // Cause the passed process id to dump a core file.  This is also used for
978 //    debugging.
979 // If pid = -1, all processes will dump core files.
980 //
981 void paradynDaemon::dumpCore(int pid)
982 {
983     executable *exec = 0;
984     for(unsigned i=0; i < programs.size(); i++){
985         exec = programs[i];
986         if ((exec->pid == (unsigned)pid) || (pid == -1)) {
987             exec->controlPath->coreProcess(exec->pid);
988             printf("found process and coreing it\n");
989         }
990     }
991 }
992
993
994 bool paradynDaemon::setInstSuppress(resource *res, bool newValue)
995 {
996     bool ret = false;
997     paradynDaemon *pd;
998     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
999         pd = paradynDaemon::allDaemons[i];
1000         ret |= pd->setTracking(res->getHandle(), newValue);
1001     }
1002     return(ret);
1003 }
1004
1005 //
1006 // signal from daemon that is is about to start or end a set 
1007 // of new resource definitions
1008 //
1009 void paradynDaemon::resourceBatchMode(bool onNow){
1010     int prev = count;
1011     if (onNow) {
1012         count++;
1013     } else {
1014         assert(count > 0);
1015         count--;
1016     }
1017
1018     if (count == 0) {
1019         for(u_int i=0; i < allDaemons.size(); i++){
1020             (allDaemons[i])->reportResources();
1021         }
1022         performanceStream::ResourceBatchMode(batchEnd);
1023     } else if (!prev) {
1024         performanceStream::ResourceBatchMode(batchStart);
1025     }
1026 }
1027
1028 //
1029 //  reportResources:  send new resource ids to daemon
1030 //
1031 void  paradynDaemon::reportResources(){
1032     assert(newResourceTempIds.size() == newResourceHandles.size());
1033     resourceInfoResponse(newResourceTempIds, newResourceHandles);
1034     newResourceTempIds.resize(0);
1035     newResourceHandles.resize(0);
1036 }
1037
1038 //
1039 // upcall from paradynd reporting new resource
1040 //
1041 void paradynDaemon::resourceInfoCallback(u_int temporaryId,
1042                               vector<string> resource_name,
1043                               string abstr, u_int type) {
1044
1045     resourceHandle r = createResource(temporaryId, resource_name, abstr, type);
1046     if(!count){
1047       if (r != temporaryId) {
1048         vector<u_int>tempIds; vector<u_int>rIds;
1049         tempIds += temporaryId; rIds += r;
1050         resourceInfoResponse(tempIds, rIds);
1051       }
1052     }
1053     else {
1054         if (r != temporaryId) {
1055           newResourceTempIds += temporaryId;
1056           newResourceHandles += r;
1057           assert(newResourceTempIds.size() == newResourceHandles.size());
1058         }
1059     }
1060 }
1061
1062 void paradynDaemon::severalResourceInfoCallback(vector<T_dyninstRPC::resourceInfoCallbackStruct> items) {
1063    for (unsigned lcv=0; lcv < items.size(); lcv++)
1064       resourceInfoCallback(items[lcv].temporaryId,
1065                            items[lcv].resource_name,
1066                            items[lcv].abstraction,
1067                            items[lcv].type);
1068 }
1069
1070
1071 //
1072 // Get the expected delay (as a fraction of the running program) for the passed
1073 //   resource list (focus) and metric.
1074 //
1075 void paradynDaemon::getPredictedDataCostCall(perfStreamHandle ps_handle,
1076                                       metricHandle m_handle,
1077                                       resourceListHandle rl_handle,
1078                                       resourceList *rl, 
1079                                       metric *m,
1080                                       u_int clientID)
1081 {
1082     if(rl && m){
1083         vector<u_int> focus;
1084         bool aflag;
1085         aflag=rl->convertToIDList(focus);
1086         assert(aflag);
1087         const char *metName = m->getName();
1088         assert(metName);
1089         u_int requestId;
1090         if(performanceStream::addPredCostRequest(ps_handle,requestId,m_handle,
1091                                 rl_handle, paradynDaemon::allDaemons.size())){
1092             paradynDaemon *pd;
1093             for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
1094                 pd = paradynDaemon::allDaemons[i];
1095                 pd->getPredictedDataCost(ps_handle,requestId,focus, 
1096                                          metName,clientID);
1097             }
1098             return;
1099         }
1100     }
1101     // TODO: change this to do the right thing
1102     // this should make the response upcall to the correct calling thread
1103     // perfConsult->getPredictedDataCostCallbackPC(0,0.0);
1104     assert(0);
1105 }
1106
1107 //
1108 // make data enable request to paradynds, and add request entry to
1109 // list of outstanding enable requests
1110 //
1111 void paradynDaemon::enableData(vector<metricInstance *> *miVec,
1112                                vector<bool> *done,
1113                                vector<bool> *enabled,
1114                                DM_enableType *new_entry,
1115                                bool need_to_enable){
1116
1117     // make enable request, pass only pairs that need to be enabled to daemons
1118     if(need_to_enable){  
1119         bool whole_prog_focus = false;
1120         vector<paradynDaemon*> daemon_subset; // which daemons to send request
1121         vector<T_dyninstRPC::focusStruct> foci; 
1122         vector<string> metrics; 
1123         vector<u_int> mi_ids;  
1124
1125         for(u_int i=0; i < miVec->size(); i++){
1126             if(!(*enabled)[i] && !(*done)[i]){
1127                 // create foci, metrics, and mi_ids entries for this mi
1128                 T_dyninstRPC::focusStruct focus;
1129                 string met_name;
1130                 bool aflag;
1131                 aflag=((*miVec)[i]->convertToIDList(focus.focus));
1132                 assert(aflag);
1133                 met_name = (*miVec)[i]->getMetricName();
1134                 foci += focus;
1135                 metrics += met_name;
1136                 mi_ids += (*miVec)[i]->getHandle();
1137                 // set curretly enabling flag on mi 
1138                 (*miVec)[i]->setCurrentlyEnabling();
1139
1140                 // check to see if this focus is refined on the machine
1141                 // or process heirarcy, if so then add the approp. daemon
1142                 // to the daemon_subset, else set whole_prog_focus to true
1143                 if(!whole_prog_focus){
1144                     string machine_name;
1145                     resourceList *rl = (*miVec)[i]->getresourceList(); 
1146                     assert(rl);
1147                     // focus is refined on machine or process heirarchy 
1148                     if(rl->getMachineNameReferredTo(machine_name)){
1149                         // get the daemon corr. to this focus and add it
1150                         // to the list of daemons
1151                         vector<paradynDaemon*> vpd = 
1152                                 paradynDaemon::machineName2Daemon(machine_name);
1153                         assert(vpd.size());
1154                         for(u_int j=0; j < vpd.size(); j++){
1155                           bool found = false;
1156
1157                           for(u_int k=0; k<daemon_subset.size() && !found; k++)
1158                             if(vpd[j]->id == daemon_subset[k]->id) 
1159                               found = true;    
1160
1161                           if(!found)  // add new daemon to subset list
1162                             daemon_subset += vpd[j];
1163                         }  
1164                     }
1165                     else {  // foucs is not refined on process or machine 
1166                         whole_prog_focus = true;
1167                     }
1168                 }
1169         } }
1170         assert(foci.size() == metrics.size());
1171         assert(metrics.size() == mi_ids.size());
1172         assert(daemon_subset.size() <= paradynDaemon::allDaemons.size());
1173         // if there is a whole_prog_focus then make the request to all 
1174         // the daemons, else make the request to the daemon subset
1175         // make enable requests to all daemons
1176         if(whole_prog_focus) {
1177             for(u_int j=0; j < paradynDaemon::allDaemons.size(); j++){
1178                paradynDaemon *pd = paradynDaemon::allDaemons[j]; 
1179                pd->enableDataCollection(foci,metrics,mi_ids,j,
1180                                      new_entry->request_id);
1181             }
1182         }
1183         else {  
1184             // change the enable number in the entry 
1185             new_entry->how_many = daemon_subset.size();
1186             for(u_int j=0; j < daemon_subset.size(); j++){
1187                daemon_subset[j]->enableDataCollection(foci,metrics,mi_ids,
1188                                  daemon_subset[j]->id,new_entry->request_id);
1189             }
1190         }
1191     }
1192     // add entry to outstanding_enables list
1193     paradynDaemon::outstanding_enables += new_entry;
1194     new_entry = 0; 
1195     miVec = 0;
1196     done = 0; 
1197     enabled = 0;
1198 }
1199
1200
1201 // propagateMetrics:
1202 // called when a new process is started, to propagate all enabled metrics to
1203 // the new process.  (QUESTION: should this include when a process makes
1204 // an exec syscall, thus 'starting' another process?)
1205 // Metrics are propagated only if the new process is the only process running 
1206 // on a daemon (this is why we don't need the pid here). If there are already
1207 // other processes running on a daemon, than it is up to the daemon to do the
1208 // propagation (we can't do it here because the daemon has to do the aggregation).
1209 // Calling this function has no effect if there are no metrics enabled.
1210 void paradynDaemon::propagateMetrics() {
1211
1212     vector<metricInstanceHandle> allMIHs = metricInstance::allMetricInstances.keys();
1213
1214     for (unsigned i = 0; i < allMIHs.size(); i++) {
1215
1216       metricInstance *mi = metricInstance::getMI(allMIHs[i]);
1217
1218       if (!mi->isEnabled())
1219          continue;
1220
1221       // first we must find if the daemon already has this metric enabled for
1222       // some process. In this case, we don't need to do anything, the
1223       // daemon will do the propagation by itself.
1224       bool found = false;
1225       for (unsigned j = 0; j < mi->components.size(); j++) {
1226          if (mi->components[j]->getDaemon() == this) {
1227             found = true;
1228             break;
1229          }
1230       }
1231
1232       if (found)
1233          continue; // we don't enable this mi; let paradynd do it
1234
1235       resourceListHandle r_handle = mi->getFocusHandle();
1236       metricHandle m_handle = mi->getMetricHandle();
1237       resourceList *rl = resourceList::getFocus(r_handle);
1238       metric *m = metric::getMetric(m_handle);
1239
1240       vector<u_int> vs;
1241       bool aflag = rl->convertToIDList(vs);
1242       assert(aflag);
1243
1244       int id = enableDataCollection2(vs, (const char *) m->getName(), mi->id);
1245
1246       if (id > 0 && !did_error_occur()) {
1247          component *comp = new component(this, id, mi);
1248          if (!mi->addComponent(comp)) {
1249             cout << "internal error in paradynDaemon::addRunningProgram" << endl;
1250             abort();
1251          }
1252       }
1253     }
1254 }
1255
1256
1257 bool paradynDaemon::setDefaultArgs(char *&name)
1258 {
1259   if (!name)
1260     name = strdup("defd");
1261   if (name)
1262     return true;
1263   else 
1264     return false;
1265 }
1266
1267
1268 bool daemonEntry::setAll (const string &m, const string &c, const string &n,
1269                           const string &l, const string &d, const string &r, const string &f)
1270 {
1271   if(!n.string_of() || !c.string_of())
1272       return false;
1273
1274   if (m.string_of()) machine = m;
1275   if (c.string_of()) command = c;
1276   if (n.string_of()) name = n;
1277   if (l.string_of()) login = l;
1278   if (d.string_of()) dir = d;
1279   if (r.string_of()) remote_shell = r;
1280   if (f.string_of()) flavor = f;
1281
1282   return true;
1283 }
1284 void daemonEntry::print() 
1285 {
1286   cout << "DAEMON ENTRY\n";
1287   cout << "  name: " << name << endl;
1288   cout << "  command: " << command << endl;
1289   cout << "  dir: " << dir << endl;
1290   cout << "  login: " << login << endl;
1291   cout << "  machine: " << machine << endl;
1292   cout << "  remote_shell: " << remote_shell << endl;
1293   cout << "  flavor: " << flavor << endl;
1294 }
1295
1296 #ifdef notdef
1297 int paradynDaemon::read(const void* handle, char *buf, const int len) {
1298   assert(0);
1299   int ret, err;
1300
1301   assert(len > 0);
1302   assert((int)handle<200);
1303   assert((int)handle >= 0);
1304   static vector<unsigned> fd_vect(200);
1305
1306   // must handle the msg_bind_buffered call here because xdr_read will be
1307   // called in the constructor for paradynDaemon, before the previous call
1308   // to msg_bind had been called
1309
1310   if (!fd_vect[(unsigned)handle]) {
1311     paradynDaemon *pd;
1312     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
1313         pd = paradynDaemon::allDaemons[i];
1314         if(pd->get_sock() == (int)handle)
1315             break;
1316     }
1317     if (!(pd))
1318       return -1;
1319
1320     msg_bind((int)handle, true, (int(*)(void*))xdrrec_eof,
1321                       (void*)(pd)->net_obj(), &???);
1322     fd_vect[(unsigned)handle] = 1;
1323   }
1324
1325   do {
1326     tag_t tag = MSG_TAG_FILE;
1327         thread_t ready_tid;
1328
1329     do
1330                 ready_tid = THR_TID_UNSPEC;
1331                 err = msg_poll(&ready_tid, &tag, true);
1332         while ((err != THR_ERR) && (ready_tid != (thread_t)handle))
1333     
1334     if (ready_tid == (thread_t) handle) {
1335       errno = 0;
1336       ret = P_read((int)handle, buf, len);
1337     } else 
1338       return -1;
1339   } while (ret < 0 && errno == EINTR);
1340
1341   if (ret <= 0)
1342     return (-1);
1343   else
1344     return ret;
1345 }
1346 #endif // notdef
1347
1348 void paradynDaemon::firstSampleCallback(int, double firstTime) {
1349   static bool done = false;
1350   if (!done) {
1351 //  cerr << "paradyn: welcome to firstSampleCallback; firstTime=" << firstTime << "; adjusted time=" << getAdjustedTime(firstTime) << endl;
1352
1353     setEarliestFirstTime(getAdjustedTime(firstTime));
1354   }
1355   done = true;
1356 }
1357
1358
1359 paradynDaemon::paradynDaemon(const string &m, const string &u, const string &c,
1360                            const string &r, const string &n, const string &f)
1361 : dynRPCUser(m, u, c, r, NULL, NULL, args, 1, dataManager::sock_desc),
1362   machine(m), login(u), command(c), name(n), flavor(f), activeMids(uiHash)
1363 {
1364   if (!this->errorConditionFound) {
1365     // No problems found in order to create this new daemon process - naim
1366     assert(m.length());
1367     assert(c.length());
1368     assert(n.length());
1369     assert(f.length());
1370
1371     // if c includes a pathname, lose the pathname
1372     const char *loc = P_strrchr(c.string_of(), '/');
1373     if (loc) {
1374       loc = loc + 1;
1375       command = loc;
1376     }
1377   
1378     status = new status_line(machine.string_of(), status_line::PROCESS);
1379     paradynDaemon *pd = this;
1380     paradynDaemon::allDaemons+=pd;
1381     id = paradynDaemon::allDaemons.size()-1;
1382     assert(paradynDaemon::allDaemons.size() > id); 
1383
1384         metCheckDaemonProcess( machine );
1385   }
1386   // else...we leave "errorConditionFound" for the caller to check...
1387   //        don't forget to check!
1388 }
1389
1390 // machine, name, command, flavor and login are set via a callback
1391 paradynDaemon::paradynDaemon(PDSOCKET use_sock)
1392 : dynRPCUser(use_sock, NULL, NULL, 1), flavor(0), activeMids(uiHash){
1393   if (!this->errorConditionFound) {
1394     // No problems found in order to create this new daemon process - naim 
1395     paradynDaemon *pd = this;
1396     paradynDaemon::allDaemons += pd;
1397     id = paradynDaemon::allDaemons.size()-1;
1398   }
1399   // else...we leave "errorConditionFound" for the caller to check...
1400   //        don't forget to check!
1401 }
1402
1403 bool our_print_sample_arrival = false;
1404 void printSampleArrivalCallback(bool newVal) {
1405    our_print_sample_arrival = newVal;
1406 }
1407
1408 // batched version of sampleCallbackFunc
1409 void paradynDaemon::batchSampleDataCallbackFunc(int ,
1410                 vector<T_dyninstRPC::batch_buffer_entry> theBatchBuffer)
1411 {
1412     // get the earliest first time that had been reported by any paradyn
1413     // daemon to use as the base (0) time
1414     bool aflag;
1415     aflag=getEarliestFirstTime();
1416     assert(aflag);
1417
1418   // Just for debugging:
1419   //fprintf(stderr, "in DMdaemon.C, burst size = %d\n", theBatchBuffer.size()) ;
1420
1421     // Go through every item in the batch buffer we've just received and
1422     // process it.
1423     for (unsigned index =0; index < theBatchBuffer.size(); index++) {
1424         const T_dyninstRPC::batch_buffer_entry &entry = theBatchBuffer[index] ; 
1425
1426         unsigned mid          = entry.mid ;
1427         double startTimeStamp = entry.startTimeStamp ;
1428         double endTimeStamp   = entry.endTimeStamp ;
1429         double value          = entry.value ;
1430         u_int  weight         = entry.weight;
1431         //bool   internal_metric = entry.internal_met;
1432         startTimeStamp = 
1433             this->getAdjustedTime(startTimeStamp) - getEarliestFirstTime();
1434         endTimeStamp = 
1435             this->getAdjustedTime(endTimeStamp) - getEarliestFirstTime();
1436
1437         if (our_print_sample_arrival) {
1438             cout << "mid " << mid << " " << value << " from "
1439                  << startTimeStamp << " to " << endTimeStamp 
1440                  << " weight " << weight 
1441                  << " machine " << machine.string_of() << "\n";
1442         }
1443
1444         // Okay, the sample is not an error; let's process it.
1445         metricInstance *mi;
1446         bool found = activeMids.find(mid, mi);
1447         if (!found) {
1448            // this can occur due to asynchrony of enable or disable requests
1449            // so just ignore the data
1450            if (our_print_sample_arrival)
1451               cout << "ignoring that sample since it's no longer active" << endl;
1452
1453            continue;
1454         }
1455         assert(mi);
1456
1457         // Any sample sent by a daemon should not have the start time
1458         // less than lastSampleEnd for the aggregate sample. When a new
1459         // component is added to a metric, the first sample could have
1460         // the startTime less than lastSampleEnd. If this happens,
1461         // the daemon clock must be late (or the time adjustment
1462         // factor is not good enough), and so we must update
1463         // the time adjustment factor for this daemon.
1464         if (startTimeStamp < mi->aggSample.currentTime()) {
1465           timeStamp diff = mi->aggSample.currentTime() - startTimeStamp;
1466           startTimeStamp += diff;
1467           endTimeStamp += diff;
1468           this->setTimeFactor(this->getTimeFactor() + diff);
1469           //printf("*** Adjusting time for %s: diff = %f\n", this->machine.string_of(), diff);
1470         }
1471
1472         struct sampleInterval ret;
1473         if (mi->components.size()){
1474            // find the right component.
1475            component *part = 0;
1476            for(unsigned i=0; i < mi->components.size(); i++) {
1477               if(mi->components[i]->daemon == this){
1478                  part = mi->components[i];
1479                  // update the weight associated with this component
1480                  // this does not necessarily need to be updated with
1481                  // each new value as long as we can distinguish between
1482                  // internal and non-internal metric values in some way
1483                  // (internal metrics weight is 1 and regular metrics 
1484                  // weight is the number of processes for this daemon),
1485                  // and the weight is changed when the number of processes 
1486                  // changes (we are not currently doing this part)
1487                  //if(!internal_metric){
1488                  //    mi->num_procs_per_part[i] = weight;
1489                  //}
1490               }
1491            }
1492            if (!part) {
1493               uiMgr->showError(3, "");
1494               return;
1495               //exit(-1);
1496            }
1497
1498            // update the sampleInfo value associated with 
1499            // the daemon that sent the value 
1500            //
1501        
1502            assert(part->sample);
1503            if (!part->sample->firstValueReceived()) {
1504               //part->sample->startTime(startTimeStamp);
1505               part->sample->firstTimeAndValue(startTimeStamp, (float)0.0);
1506            }
1507
1508            part->sample->newValue(endTimeStamp, (float)value, weight);
1509         }
1510
1511         // don't aggregate if this metric is still being enabled (we may 
1512         // not have received replies for the enable requests from all the daemons)
1513         if (mi->isCurrentlyEnabling())
1514           continue;
1515
1516         //
1517         // update the metric instance sample value if there is a new
1518         // interval with data for all parts, otherwise this routine
1519         // returns false for ret.valid and the data cannot be bucketed
1520         // by the histograms yet (not all components have sent data for
1521         // this interval)
1522         // newValue will aggregate the parts according to mi's aggOp
1523         //
1524         ret = mi->aggSample.aggregateValues();
1525         
1526         if (ret.valid) {  // there is new data from all components 
1527            assert(ret.end >= 0.0);
1528            assert(ret.start >= 0.0);
1529            assert(ret.end >= ret.start);
1530            mi->enabledTime += ret.end - ret.start;
1531            mi->addInterval(ret.start, ret.end, ret.value, false);
1532         }
1533     } // the main for loop
1534 }
1535
1536 // trace data streams
1537 void paradynDaemon::batchTraceDataCallbackFunc(int ,
1538                 vector<T_dyninstRPC::trace_batch_buffer_entry> theTraceBatchBuffer)
1539 {
1540     // get the earliest first time that had been reported by any paradyn
1541     // daemon to use as the base (0) time
1542     // assert(getEarliestFirstTime());
1543
1544   // Just for debugging:
1545   //fprintf(stderr, "in DMdaemon.C, burst size = %d\n", theTraceBatchBuffer.size());
1546
1547     // Go through every item in the batch buffer we've just received and
1548     // process it.
1549     for (unsigned index =0; index < theTraceBatchBuffer.size(); index++) {
1550         T_dyninstRPC::trace_batch_buffer_entry &entry = theTraceBatchBuffer[index] ;
1551
1552         unsigned mid          = entry.mid ;
1553         unsigned length       = entry.length;
1554
1555         if (our_print_sample_arrival) {
1556             cout << "mid " << mid << " : length = " << length << "\n";
1557         }
1558
1559         // Okay, the sample is not an error; let's process it.
1560         metricInstance *mi;
1561         bool found = activeMids.find(mid, mi);
1562         if (!found) {
1563            // this can occur due to asynchrony of enable or disable requests
1564            // so just ignore the data
1565           continue;
1566         }
1567         assert(mi);
1568         byteArray *localTraceData = new byteArray(entry.traceRecord.getArray(),
1569         length);
1570         mi->sendTraceData(localTraceData->getArray(),length);
1571
1572         delete localTraceData;
1573
1574     } // the main for loop
1575 }
1576
1577 //
1578 // paradyn daemon should never go away.  This represents an error state
1579 //    due to a paradynd being killed for some reason.
1580 //
1581 // TODO -- handle this better
1582 paradynDaemon::~paradynDaemon() {
1583
1584 #ifdef notdef
1585     metricInstance *mi;
1586     HTable<metricInstance*> curr;
1587
1588     allDaemons.remove(this);
1589
1590     // remove the metric ID as required.
1591     for (curr = activeMids; mi = *curr; curr++) {
1592         mi->parts.remove(this);
1593         mi->components.remove(this);
1594     }
1595 #endif
1596     printf("Inconsistant state\n");
1597     abort();
1598 }
1599
1600 //
1601 // When an error is determined on an igen call, this function is
1602 // called, since the default error handler will exit, and we don't
1603 // want paradyn to exit.
1604 //
1605 void paradynDaemon::handle_error()
1606 {
1607    removeDaemon(this, true);
1608 }
1609
1610 //
1611 // When a paradynd is started remotely, ie not by paradyn, this upcall
1612 // reports the information for that paradynd to paradyn
1613 //
1614 // This must set command, name, machine and flavor fields
1615 // (pid no longer used --ari)
1616 //
1617 void 
1618 paradynDaemon::reportSelf (string m, string p, int /*pid*/, string flav)
1619 {
1620   flavor = flav;
1621   if (!m.length() || !p.length()) {
1622     removeDaemon(this, true);
1623     printf("paradyn daemon reported bad info, removed\n");
1624     // error
1625   } else {
1626     machine = m.string_of();
1627     command = p.string_of();
1628     status = new status_line(machine.string_of(), status_line::PROCESS);
1629
1630     if(flavor == "pvm") {
1631       name = "pvmd";
1632     } else if(flavor == "unix") {
1633       name = "defd";
1634     } else if(flavor == "mpi") {
1635       name = "mpid";
1636     } else if (flavor == "winnt") {
1637       name = "winntd";
1638     } else {
1639       name = flavor;
1640     }
1641
1642         metCheckDaemonProcess( machine );
1643   }
1644
1645   // Send the initial metrics, constraints, and other neato things
1646   mdl_send(this);
1647   vector<T_dyninstRPC::metricInfo> info = this->getAvailableMetrics();
1648   unsigned size = info.size();
1649   for (unsigned u=0; u<size; u++)
1650       addMetric(info[u]);
1651
1652   getDaemonTime(this);
1653
1654   return;
1655 }
1656
1657 //
1658 // When a paradynd reports status, send the status to the user
1659 //
1660 void 
1661 paradynDaemon::reportStatus (string line)
1662 {
1663   if (status)
1664     uiMgr->updateStatus(status, P_strdup(line.string_of()));
1665 }
1666
1667 /***
1668  This call is used by a daemon to report a change in the status of a process
1669  such as when the process exits.
1670  When one process exits, we just decrement procRunning, a counter of the number
1671  of processes running. If procRunning is zero, there are no more processes running,
1672  and the status of the application is set to appExited.
1673 ***/
1674 void
1675 paradynDaemon::processStatus(int pid, u_int stat) {
1676   if (stat == procExited) { // process exited
1677     for(unsigned i=0; i < programs.size(); i++) {
1678         if ((programs[i]->pid == (unsigned)pid) && programs[i]->controlPath == this) {
1679         programs[i]->exited = true;
1680         if (--procRunning == 0)
1681           performanceStream::notifyAllChange(appExited);
1682         break;
1683         }
1684     }
1685   }
1686 }
1687  
1688
1689 // Called by a daemon when there is no more data to be sent for a metric
1690 // instance (because the processes have exited).
1691 void
1692 paradynDaemon::endOfDataCollection(int mid) {
1693
1694     if(activeMids.defines(mid)){
1695         metricInstance *mi = activeMids[mid];
1696         assert(mi);
1697         bool aflag;
1698         aflag=(mi->removeComponent(this));
1699         assert(aflag);
1700     }
1701     else{  // check if this mid is for a disabled metric 
1702         bool found = false;
1703         for (unsigned ve=0; ve<disabledMids.size(); ve++) {
1704             if ((int) disabledMids[ve] == mid) {
1705                 found = true;
1706                 break;
1707             }
1708         }
1709         if (!found) {
1710             cout << "Ending data collection for unknown metric" << endl;
1711             uiMgr->showError (2, "Ending data collection for unknown metric");
1712         }
1713     }
1714 }