severalResourceInfoCallback
[dyninst.git] / paradyn / src / DMthread / DMdaemon.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 /*
43  *  method functions for paradynDaemon and daemonEntry classes
44  */
45 #include <assert.h>
46 extern "C" {
47 double   quiet_nan();
48 #include <malloc.h>
49 #include <rpc/types.h>
50 #include <rpc/xdr.h>
51 #include <stdio.h>
52 }
53
54 #include <sys/types.h>
55 #include <sys/socket.h>
56 #include "thread/h/thread.h"
57 #include "paradyn/src/pdMain/paradyn.h"
58 #include "dataManager.thread.h"
59 #include "dyninstRPC.xdr.CLNT.h"
60 #include "DMdaemon.h"
61 #include "paradyn/src/TCthread/tunableConst.h"
62 #include "paradyn/src/UIthread/Status.h"
63 #include "DMmetric.h"
64 #include "paradyn/src/met/metricExt.h"
65 #include "util/h/Time.h"
66
67 // TEMP this should be part of a class def.
68 status_line *DMstatus=NULL;
69 status_line *PROCstatus=NULL;
70
71 // change a char* that points to "" to point to NULL
72 // NULL is used to signify "NO ARGUMENT"
73 // NULL is easier to detect than "" (which needs a strlen to detect)
74 /*
75 static void fixArg(char *&argToFix)
76 {
77   if (argToFix && !strlen(argToFix))
78     argToFix = (char*) 0;
79 }
80 */
81
82
83 static appState applicationState = appPaused; // status of the application.
84
85
86 metricInstance *DM_enableType::findMI(metricInstanceHandle mh){
87     for(u_int i=0; i < request->size(); i++){
88          if(mh == ((*request)[i])->getHandle()){
89              return ((*request)[i]);
90     } }
91     return 0;
92 }
93
94 void DM_enableType::setDone(metricInstanceHandle mh){
95     for(u_int i=0; i < request->size(); i++){
96         if(mh == ((*request)[i])->getHandle()){
97             (*done)[i] = true;
98             return;
99         } 
100     }
101 }
102
103 // find any matching completed mids and update the done values 
104 // if a matching value is successfully enabled done=true, else done= false
105 void DM_enableType::updateAny(vector<metricInstance *> &completed_mis,
106                               vector<bool> successful){
107
108    for(u_int i=0; i < done->size(); i++){
109        if(!(*done)[i]){  // try to update element
110            assert(not_all_done);
111            metricInstanceHandle mh = ((*request)[i])->getHandle();
112            for(u_int j=0; j < completed_mis.size(); j++){
113                if(completed_mis[j]){
114                    if(completed_mis[j]->getHandle() == mh){
115                        if(successful[j]) (*done)[i] = true;
116                        not_all_done--;
117                } } 
118            }
119    } }
120 }
121
122
123
124 // Called whenever a program is ready to run (both programs started by paradyn
125 // and programs forked by other programs). QUESTION: What about when a program does
126 // an exec syscall?
127 //
128 // The new program inherits all enabled metrics, and if the application is 
129 // running, the new program must run too.
130 // 
131 // Called by the igen routine newProgramCallbackFunc()
132
133 bool paradynDaemon::addRunningProgram (int pid,
134                                        const vector<string> &paradynd_argv,
135                                        paradynDaemon *daemon,
136                                        bool calledFromExec,
137                                        bool attached_runMe) {
138     executable *exec = NULL;
139
140     if (calledFromExec) {
141        for (unsigned i=0; i < programs.size(); i++) {
142           if ((int) programs[i]->pid == pid && programs[i]->controlPath == daemon) {
143              exec = programs[i];
144              break;
145           }
146        }
147        assert(exec);
148
149        // exec() doesn't change the pid, but paradynd_argv changes (big deal).
150        exec->argv = paradynd_argv;
151
152        // fall through (we probably want to execute the daemon->continueProcess())
153     }
154     else {
155        // the non-exec (the normal) case follows:
156        exec = new executable (pid, paradynd_argv, daemon);
157        programs += exec;
158        ++procRunning;
159
160        // the following propagates mi's to the new process IF it's the only
161        // process on the daemon.  Otherwise, the daemon can and does propagate
162        // on its own.  We don't call it in the exec case (above) since we know it
163        // wouldn't do anything.
164        daemon->propagateMetrics();
165     }
166
167     if (applicationState == appRunning || attached_runMe) {
168       //cerr << "paradyn: calling daemon->continueProcess off of addRunningProgram (machine " << daemon->getDaemonMachineName() << "; pid=" << pid << ") !" << endl;
169       daemon->continueProcess(pid);
170     }
171
172     if (procRunning == 1 && attached_runMe) {
173        // currently, Paradyn believes that the application is paused.
174        // Let's change that to 'running'.
175        if (!continueAll())
176           cerr << "warning: continueAll failed" << endl;
177        uiMgr->enablePauseOrRun();
178     }
179
180     return true;
181 }
182
183
184 //
185 // add a new paradyn daemon
186 // called when a new paradynd contacts the advertised socket
187 //
188 bool paradynDaemon::addDaemon (int new_fd)
189 {
190   // constructor adds new daemon to dictionary allDaemons
191   paradynDaemon *new_daemon = new paradynDaemon (new_fd);
192
193   if (new_daemon->errorConditionFound) {
194     //TODO: "something" has to be done for a proper clean up - naim
195     uiMgr->showError(6,"");
196     return(false);
197   }
198
199 //
200 // KLUDGE:  set socket buffer size to 64k to avoid write-write deadlock
201 //              between paradyn and paradynd
202 //
203 #if defined(sparc_sun_sunos4_1_3) || defined(hppa1_1_hp_hpux)
204    int num_bytes =0;
205    int size = sizeof(num_bytes);
206    num_bytes = 32768;
207    if(setsockopt(new_daemon->get_fd(),SOL_SOCKET,SO_SNDBUF,(char *)&num_bytes ,size) < 0){
208       perror("setsocketopt SND_BUF error");
209    }
210 #endif
211
212   msg_bind_buffered (new_daemon->get_fd(), true, (int(*)(void*)) xdrrec_eof,
213                      (void*)new_daemon->net_obj());
214   assert(new_daemon);
215   // The pid is reported later in an upcall
216   return (true);
217 }
218
219 //  TODO : fix this so that it really does clean up state
220 // Dispose of daemon state.
221 //    This is called because someone wants to kill a daemon, or the daemon
222 //       died and we need to cleanup after it.
223 //
224 void paradynDaemon::removeDaemon(paradynDaemon *d, bool informUser)
225 {
226
227     if (informUser) {
228       string msg;
229       msg = string("paradynd has died on host <") + d->getDaemonMachineName()
230             + string(">");
231       uiMgr->showError(5, P_strdup(msg.string_of()));
232     }
233
234     d->dead = true;
235
236 #ifdef notdef
237     executable *e;
238     List<executable*> progs;
239     daemons.remove(d);
240
241     //
242     // Delete executables running on the dead paradyn daemon.
243     //
244     for (progs = programs; e = *progs; progs++) {
245        if (e->controlPath == d) {
246            programs.remove(e);
247            delete(e);
248        }
249     }
250
251 #endif
252
253     // tell the thread package to ignore the fd to the daemon.
254     msg_unbind(d->get_fd());
255 }
256
257 timeStamp getCurrentTime(void) {
258     static double previousTime=0.0;
259     struct timeval tv;
260   retry:
261     bool aflag;
262     aflag=(gettimeofday(&tv, NULL) == 0); // 0 --> success; -1 --> error
263     assert(aflag);
264
265     double seconds_dbl = tv.tv_sec * 1.0;
266     assert(tv.tv_usec < 1000000);
267     double useconds_dbl = tv.tv_usec * 1.0;
268
269     seconds_dbl += useconds_dbl / 1000000.0;
270
271     if (seconds_dbl < previousTime) goto retry;
272     previousTime = seconds_dbl;
273
274     return seconds_dbl;
275 }
276
277 // get the current time of a daemon, to adjust for clock differences.
278 void getDaemonTime(paradynDaemon *pd) {
279   timeStamp t1 = getCurrentTime();
280   timeStamp dt = pd->getTime(); // daemon time
281   timeStamp t2 = getCurrentTime();
282   timeStamp delay = (t2 - t1) / 2.0;
283   pd->setTimeFactor(t1 - dt + delay);
284 }
285
286 //
287 // add a new daemon
288 // check to see if a daemon that matches the function args exists
289 // if it does exist, return a pointer to it
290 // otherwise, create a new daemon
291 //
292 paradynDaemon *paradynDaemon::getDaemonHelper(const string &machine, 
293                                               const string &login, 
294                                               const string &name) {
295
296     // find out if we have a paradynd on this machine+login+paradynd
297     paradynDaemon *pd;
298     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
299         pd = paradynDaemon::allDaemons[i];
300         if ((!machine.string_of() || (pd->machine == machine)) && 
301            (!login.string_of() || (pd->login == login))  &&
302            (name.string_of() && (pd->name == name))) {
303             return (pd);     
304         }
305     }
306   
307     // find a matching entry in the dicitionary, and start it
308     daemonEntry *def = findEntry(machine, name);
309     if (!def) {
310         if (name.length()) {
311           string msg = string("Paradyn daemon \"") + name + string("\" not defined.");
312           uiMgr->showError(90,P_strdup(msg.string_of()));
313         }
314         else {
315           uiMgr->showError(91,"");
316         }
317         return ((paradynDaemon*) 0);
318     }
319
320     string m = machine; 
321     // fill in machine name if emtpy
322     if (!m.string_of()) {
323         m = default_host;
324     }
325
326     char statusLine[256];
327     sprintf(statusLine, "Starting daemon on %s",m.string_of());
328     uiMgr->updateStatus(DMstatus,P_strdup(statusLine));
329
330     string flav_arg(string("-z") + def->getFlavor());
331     unsigned asize = paradynDaemon::args.size();
332     paradynDaemon::args += flav_arg;
333
334     pd = new paradynDaemon(m, login, def->getCommandString(), 
335                                def->getNameString(), def->getFlavorString());
336
337     if (pd->errorConditionFound) {
338       //TODO: "something" has to be done for a proper clean up - naim
339       string msg;
340       msg=string("Cannot create daemon process on host \"") + m + string("\"");
341       uiMgr->showError(84,P_strdup(msg.string_of())); 
342       return((paradynDaemon*) 0);
343     }
344
345 #if defined(sparc_sun_sunos4_1_3) || defined(hppa1_1_hp_hpux)
346    int num_bytes =0;
347    int nb_size = sizeof(num_bytes);
348    num_bytes = 32768;
349    if(setsockopt(pd->get_fd(),SOL_SOCKET,SO_SNDBUF,(char *)&num_bytes ,nb_size) < 0){
350       perror("setsocketopt SND_BUF error");
351    }
352 #endif
353
354     paradynDaemon::args.resize(asize);
355     uiMgr->updateStatus(DMstatus,P_strdup("ready"));
356
357     if (pd->get_fd() < 0) {
358         uiMgr->showError (6, "");
359         return((paradynDaemon*) 0);
360     }
361
362    // Send the initial metrics, constraints, and other neato things
363    mdl_send(pd);
364    // Send the initial metrics, constraints, and other neato things
365    vector<T_dyninstRPC::metricInfo> info = pd->getAvailableMetrics();
366    unsigned size = info.size();
367    for (unsigned u=0; u<size; u++)
368         addMetric(info[u]);
369
370     getDaemonTime(pd);
371
372     msg_bind_buffered(pd->get_fd(), true, (int(*)(void*))xdrrec_eof,
373                      (void*) pd->net_obj());
374     return (pd);
375 }
376
377 //  
378 // add a new daemon, unless a daemon is already running on that machine
379 // with the same machine, login, and program
380 //
381 bool paradynDaemon::getDaemon (const string &machine, 
382                                const string &login,
383                                const string &name){
384
385     if (!getDaemonHelper(machine, login, name)){
386         return false;
387     }
388     return true;
389 }
390
391 //
392 // define a new entry for the daemon dictionary, or change an existing entry
393 //
394 bool paradynDaemon::defineDaemon (const char *c, const char *d,
395                                   const char *l, const char *n,
396                                   const char *m, const char *f) {
397
398   if(!n || !c)
399       return false;
400   string command = c;
401   string dir = d;
402   string login = l;
403   string name = n;
404   string machine = m;
405   string flavor = f;
406
407   daemonEntry *newE;
408   for(unsigned i=0; i < allEntries.size(); i++){
409       newE = allEntries[i];
410       if(newE->getNameString() == name){
411           if (newE->setAll(machine, command, name, login, dir, flavor))
412               return true;
413           else 
414               return false;
415       }
416   }
417
418   newE = new daemonEntry(machine, command, name, login, dir, flavor);
419   if (!newE)
420       return false;
421   allEntries += newE;
422   return true;
423 }
424
425
426 daemonEntry *paradynDaemon::findEntry(const string &, 
427                                       const string &n) {
428
429     // if (!n) return ((daemonEntry*) 0);
430     for(unsigned i=0; i < allEntries.size(); i++){
431         daemonEntry *newE = allEntries[i];
432         if(newE->getNameString() == n){
433            return(newE);
434         }
435     }
436     return ((daemonEntry*) 0);
437
438 }
439
440 void paradynDaemon::tellDaemonsOfResource(u_int parent, u_int my_id, 
441                                           const char *name, unsigned type) {
442     paradynDaemon *pd;
443     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
444         pd = paradynDaemon::allDaemons[i];
445         pd->addResource(parent,my_id,name, type);
446     }
447
448 }
449
450 void paradynDaemon::printEntries()
451 {
452     daemonEntry *entry;
453     for(unsigned i=0; i < allEntries.size(); i++){
454         entry = allEntries[i];
455         entry->print();
456     }
457 }
458
459 void paradynDaemon::print() 
460 {
461   cout << "DAEMON\n";
462   cout << "  name: " << name << endl;
463   cout << "  command: " << command << endl;
464   cout << "  login: " << login << endl;
465   cout << "  machine: " << machine << endl;
466   cout << "  flavor: " << flavor << endl;
467 }
468
469 void paradynDaemon::printDaemons()
470 {
471     paradynDaemon *pd;
472   cout << "ACTIVE DAEMONS\n";
473     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
474         pd = paradynDaemon::allDaemons[i];
475         pd->print();
476     }
477 }
478
479 void paradynDaemon::printPrograms()
480 {
481     executable *entry;
482     for(unsigned i=0; i < programs.size(); i++){
483         entry = programs[i];
484         cout << "PROGRAM ENTRY\n";
485         cout << "pid : " << entry->pid << endl;
486         entry->controlPath->print();
487     }
488 }
489
490 //
491 // Return list of names of defined daemons.  
492 //
493 vector<string> *paradynDaemon::getAvailableDaemons()
494 {
495     vector<string> *names = new vector<string>;
496
497     daemonEntry *entry;
498     for(unsigned i=0; i < allEntries.size(); i++){
499         entry = allEntries[i];
500         *names += entry->getName();
501     }
502     return(names);
503     names = 0;
504 }
505
506 // For a given machine name, find the appropriate paradynd structure.
507 // Returns NULL if an appropriate matching entry can't be found.
508 paradynDaemon *paradynDaemon::machineName2Daemon(const string &theMachineName) {
509    for (unsigned i=0; i < allDaemons.size(); i++) {
510       paradynDaemon *theDaemon = allDaemons[i];
511       if (theDaemon->getDaemonMachineName() == theMachineName)
512          return theDaemon;
513    }
514    return 0; // failure; this machine name isn't known!
515 }
516
517 //Tempest
518 static int startBlzApp(const string &machine,
519                         const string &login,
520                         const string &name,
521                         const string &dir,
522                         const vector<string> &argv)
523 {
524         //int firstPVM = 1 ;                   // "-v"   1
525         //int  flag = 2;                       // "-l"   2
526         // flavor == "-z"   cow
527         string localhost ;                     // "-m"   gethostname
528         int  port = dataManager::dm->socket ;  // "-p"   dataManager::sock_fd
529
530         char temp[256], *p ;
531         gethostname(temp, 32);
532         if((p = strchr(temp, '.'))) *p = '\0' ;
533         localhost += temp ;
534
535         int from = 1, to = 4 ;
536         int time= 20, nodes = 4;
537         string   application ;
538         string   directory ;
539         bool     use_range = false ;
540
541         //parse the application commandline
542         for(unsigned i=0; i<argv.size(); i++)
543         {
544                 if(strncmp(argv[i].string_of(), "-nodes", 6)==0)
545                         sscanf(argv[i].string_of(), "-nodes%d", &nodes) ;
546                 else if(strncmp(argv[i].string_of(), "-range", 6)==0){
547                         sscanf(argv[i].string_of(), "-range%d:%d", &from, &to) ;
548                         use_range = true ;
549                 }
550                 else if(strncmp(argv[i].string_of(), "-time", 5)==0)
551                         sscanf(argv[i].string_of(), "-time%d", &time) ;
552                 else {
553                         if(application.length() > 0)
554                                 application += " " ;
555                         application += argv[i] ;
556                 }
557
558         }
559
560         // get the directory
561         strcpy(temp, application.string_of()) ;
562         printf("application = [%s]\n", application.string_of()) ;
563         if((p = strchr(temp, ' ')))
564                 *p = '\0' ;
565         else if((p = strchr(temp, '<')))
566                 *p = '\0' ;
567         else if((p = strchr(temp, '>')))
568                 *p = '\0' ;
569         p = strrchr(temp, '/') ; *p = '\0' ;
570         directory += temp ;
571         printf("directory=[%s]\n", directory.string_of()) ;
572
573         //make the scripts
574         string cpScript, pnScript ;
575         cpScript = directory + "/PD_ctrl_script" ;
576         pnScript = directory + "/PD_node_script" ;
577
578         //make  the serverscript
579         FILE *fp = fopen(cpScript.string_of(), "w") ;
580         fprintf(fp, "#!/bin/csh\n") ;
581         fprintf(fp, "crsh all %s\n", pnScript.string_of()) ;
582         fclose(fp) ;
583         system("chmod +x PD_ctrl_script") ;
584
585         //make the pn_script
586         fp = fopen(pnScript.string_of(), "w") ;
587         fprintf (fp, "#!/bin/csh\n") ;
588         fprintf (fp, "setenv LAM_CONFIG /p/wwt/myrinet/configs/lam.conf.master\n") ;
589         fprintf (fp, "cd %s\n", directory.string_of()) ;
590         //paradynd -p36622 -mgoofy -l1 -v1 -zcow -runme ....
591         fprintf (fp, "paradynd -p%d -m%s -l0 -v1 -zcow -runme %s\n",
592                      port,
593                      localhost.string_of(),
594                      application.string_of()) ;
595         fclose  (fp) ;
596         system("chmod +x PD_node_script") ;
597
598         //fork it
599         int shellPid ;
600         int ret ;
601         sprintf(temp, "%d", time) ;
602         string timeFrame  ; timeFrame += temp ;
603
604         string nodeRange ;
605         if(use_range) {
606                 char sfrom[64], sto[64] ;
607                 sprintf(sfrom, "%d", from) ;
608                 sprintf(sto, "%d", to) ;
609                 nodeRange += "-range ";
610                 nodeRange += sfrom ;
611                 nodeRange +=  ":" ;
612                 nodeRange += sto ;
613         } else
614         {
615                 sprintf(temp, "%d", nodes) ;
616                 nodeRange += "-nodes " ;
617                 nodeRange += temp ;
618         }
619
620         if(use_range)
621            sprintf(temp, "Starting job on nodes %d-%d, for %d minutes\n", from, to, time)
622 ;
623         else
624            sprintf(temp, "Starting job on %d nodes, for %d minutes\n", nodes, time) ;
625         uiMgr->updateStatus(DMstatus,P_strdup(temp));
626         shellPid = vfork() ;
627         if(shellPid == 0)
628         {
629                 if(login.length())
630                 {
631                  ret = execlp("rsh", "rsh", machine.string_of(), "-l",
632                         login.string_of(), "-n",
633                         "/p/cow/bin/crun", cpScript.string_of() ,
634                         "-capture",
635                         "-contig",
636                         "-time",   timeFrame.string_of(),
637                         nodeRange.string_of(),
638                         NULL);
639
640                 } else
641                 {
642                  ret = execlp("rsh", "rsh", machine.string_of(), "-n",
643                         "/p/cow/bin/crun", cpScript.string_of() ,
644                         "-capture",
645                         "-contig",
646                         "-time",   timeFrame.string_of(),
647                         nodeRange.string_of(),
648                         NULL);
649                 }
650                 fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
651                 _exit(-1) ;
652         } else if( shellPid > 0)
653                 ;
654         else    return false; // error situation
655
656         return true ;
657 }
658
659
660
661 // TODO: fix this
662 //
663 // add a new executable (binary) to a program.
664 //
665 bool paradynDaemon::newExecutable(const string &machine,
666                                   const string &login,
667                                   const string &name, 
668                                   const string &dir, 
669                                   const vector<string> &argv){
670
671   if (! DMstatus)
672       DMstatus = new status_line("Data Manager");
673
674   if (! PROCstatus)
675       PROCstatus = new status_line("Processes");
676
677   //------------------
678   //Added to start blizzard application, Tempest
679
680   daemonEntry *def = findEntry(machine, name) ;
681   if (!def) {
682      if (name.length()) {
683         string msg = string("Paradyn daemon \"") + name + string("\" not defined.");
684         uiMgr->showError(90,P_strdup(msg.string_of()));
685      }
686      else {
687         uiMgr->showError(91,"");
688      }
689      return false;
690   }
691   if(def->getFlavorString() == "cow")
692         return startBlzApp(machine, login, name, dir, argv) ;
693   //------------
694   paradynDaemon *daemon;
695   if ((daemon=getDaemonHelper(machine, login, name)) == (paradynDaemon*) NULL)
696       return false;
697
698   performanceStream::ResourceBatchMode(batchStart);
699   int pid = daemon->addExecutable(argv, dir);
700   performanceStream::ResourceBatchMode(batchEnd);
701
702   // did the application get started ok?
703   if (pid > 0 && !daemon->did_error_occur()) {
704       // TODO
705       char tmp_buf[80];
706       sprintf (tmp_buf, "PID=%d", pid);
707       uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
708 #ifdef notdef
709       executable *exec = new executable(pid, argv, daemon);
710       paradynDaemon::programs += exec;
711       ++procRunning;
712 #endif
713       return (true);
714   } else {
715       return(false);
716   }
717 }
718
719 bool paradynDaemon::attachStub(const string &machine,
720                                const string &userName,
721                                const string &cmd, // program name (full path)
722                                int the_pid,
723                                const string &daemonName,
724                                int afterAttach // 0 --> as is, 1 --> pause, 2 --> run
725                                ) {
726   // Note: by this time, both the RUN and PAUSE buttons have been disabled in the
727   // user interface...
728
729   if (! DMstatus)
730       DMstatus = new status_line("Data Manager");
731
732   if (! PROCstatus)
733       PROCstatus = new status_line("Processes");
734
735   paradynDaemon *daemon = getDaemonHelper(machine, userName, daemonName);
736   if (daemon == NULL)
737       return false;
738
739   char tmp_buf[128];
740   sprintf (tmp_buf, "attaching to PID=%d...", the_pid);
741   uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
742
743   performanceStream::ResourceBatchMode(batchStart);
744   bool success = daemon->attach(cmd, the_pid, afterAttach);
745   performanceStream::ResourceBatchMode(batchEnd);
746
747   if (daemon->did_error_occur())
748      return false;
749
750   if (!success)
751      return false;
752
753   sprintf (tmp_buf, "PID=%d", the_pid);
754   uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
755   return true; // success
756 }
757
758 //
759 // start the programs running.
760 //
761 bool paradynDaemon::startApplication()
762 {
763     executable *prog;
764     for(unsigned i=0; i < programs.size(); i++){
765         prog = programs[i];
766         prog->controlPath->startProgram(prog->pid);   
767     }
768     return(true);
769 }
770
771 //
772 // pause all processes.
773 //
774 bool paradynDaemon::pauseAll()
775 {
776     paradynDaemon *pd;
777     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
778         pd = paradynDaemon::allDaemons[i];
779         pd->pauseApplication();
780     }
781     // tell perf streams about change.
782     performanceStream::notifyAllChange(appPaused);
783     applicationState = appPaused;
784     return(true);
785 }
786
787 //
788 // pause one processes.
789 //
790 bool paradynDaemon::pauseProcess(unsigned pid)
791 {
792     executable *exec = 0;
793     for(unsigned i=0; i < programs.size(); i++){
794         exec = programs[i];
795         if(exec->pid == pid)
796             break;
797         exec = 0;
798     }
799     if (exec) {
800         exec->controlPath->pauseProgram(exec->pid);
801         return(true); 
802     } else
803         return (false);
804 }
805
806 //
807 // continue all processes.
808 //
809 bool paradynDaemon::continueAll()
810 {
811     paradynDaemon *pd;
812
813     if (programs.size() == 0)
814        return false; // no program to pause
815
816     if (procRunning == 0)
817        return false;
818
819     for(int i = paradynDaemon::allDaemons.size()-1; i >= 0; i--) 
820     {
821         pd = paradynDaemon::allDaemons[i];
822         pd->continueApplication();
823     }
824     // tell perf streams about change.
825     performanceStream::notifyAllChange(appRunning);
826     applicationState = appRunning;
827     return(true);
828 }
829
830 //
831 // continue one processes.
832 //
833 bool paradynDaemon::continueProcess(unsigned pid)
834 {
835     executable *exec = 0;
836     for(unsigned i=0; i < programs.size(); i++){
837         exec = programs[i];
838         if(exec->pid == pid)
839             break;
840         exec = 0;
841     }
842     if (exec) {
843         exec->controlPath->continueProgram(exec->pid);
844         return(true); 
845     } else
846         return (false);
847 }
848
849 //
850 // detach the paradyn tool from a running program.  This should clean all
851 //   of the dynamic instrumentation that has been inserted.
852 //
853 bool paradynDaemon::detachApplication(bool pause)
854 {
855     executable *exec = 0;
856     for(unsigned i=0; i < programs.size(); i++){
857         exec = programs[i];
858         exec->controlPath->detachProgram(exec->pid,pause);
859     }
860     return(true);
861 }
862
863 //
864 // print the status of each process.  This is used mostly for debugging.
865 //
866 void paradynDaemon::printStatus()
867 {
868     executable *exec = 0;
869     for(unsigned i=0; i < programs.size(); i++){
870         exec = programs[i];
871         string status = exec->controlPath->getStatus(exec->pid);
872             if (!exec->controlPath->did_error_occur()) {
873                 cout << status << endl;
874             }
875     }
876 }
877
878 //
879 // Cause the passed process id to dump a core file.  This is also used for
880 //    debugging.
881 // If pid = -1, all processes will dump core files.
882 //
883 void paradynDaemon::dumpCore(int pid)
884 {
885     executable *exec = 0;
886     for(unsigned i=0; i < programs.size(); i++){
887         exec = programs[i];
888         if ((exec->pid == (unsigned)pid) || (pid == -1)) {
889             exec->controlPath->coreProcess(exec->pid);
890             printf("found process and coreing it\n");
891         }
892     }
893 }
894
895
896 bool paradynDaemon::setInstSuppress(resource *res, bool newValue)
897 {
898     bool ret = false;
899     paradynDaemon *pd;
900     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
901         pd = paradynDaemon::allDaemons[i];
902         ret |= pd->setTracking(res->getHandle(), newValue);
903     }
904     return(ret);
905 }
906
907 //
908 // signal from daemon that is is about to start or end a set 
909 // of new resource definitions
910 //
911 void paradynDaemon::resourceBatchMode(bool onNow){
912     int prev = count;
913     if (onNow) {
914         count++;
915     } else {
916         assert(count > 0);
917         count--;
918     }
919
920     if (count == 0) {
921         for(u_int i=0; i < allDaemons.size(); i++){
922             (allDaemons[i])->reportResources();
923         }
924         performanceStream::ResourceBatchMode(batchEnd);
925     } else if (!prev) {
926         performanceStream::ResourceBatchMode(batchStart);
927     }
928 }
929
930 //
931 //  reportResources:  send new resource ids to daemon
932 //
933 void  paradynDaemon::reportResources(){
934     assert(newResourceTempIds.size() == newResourceHandles.size());
935     resourceInfoResponse(newResourceTempIds, newResourceHandles);
936     newResourceTempIds.resize(0);
937     newResourceHandles.resize(0);
938 }
939
940 //
941 // upcall from paradynd reporting new resource
942 //
943 void paradynDaemon::resourceInfoCallback(u_int temporaryId,
944                               vector<string> resource_name,
945                               string abstr, u_int type) {
946
947     resourceHandle r = createResource(temporaryId, resource_name, abstr, type);
948     if(!count){
949       if (r != temporaryId) {
950         vector<u_int>tempIds; vector<u_int>rIds;
951         tempIds += temporaryId; rIds += r;
952         resourceInfoResponse(tempIds, rIds);
953       }
954     }
955     else {
956         if (r != temporaryId) {
957           newResourceTempIds += temporaryId;
958           newResourceHandles += r;
959           assert(newResourceTempIds.size() == newResourceHandles.size());
960         }
961     }
962 }
963
964 void paradynDaemon::severalResourceInfoCallback(vector<T_dyninstRPC::resourceInfoCallbackStruct> items) {
965    for (unsigned lcv=0; lcv < items.size(); lcv++)
966       resourceInfoCallback(items[lcv].temporaryId,
967                            items[lcv].resource_name,
968                            items[lcv].abstraction,
969                            items[lcv].type);
970 }
971
972
973 //
974 // Get the expected delay (as a fraction of the running program) for the passed
975 //   resource list (focus) and metric.
976 //
977 void paradynDaemon::getPredictedDataCostCall(perfStreamHandle ps_handle,
978                                       metricHandle m_handle,
979                                       resourceListHandle rl_handle,
980                                       resourceList *rl, 
981                                       metric *m,
982                                       u_int clientID)
983 {
984     if(rl && m){
985         vector<u_int> focus;
986         bool aflag;
987         aflag=rl->convertToIDList(focus);
988         assert(aflag);
989         const char *metName = m->getName();
990         assert(metName);
991         u_int requestId;
992         if(performanceStream::addPredCostRequest(ps_handle,requestId,m_handle,
993                                 rl_handle, paradynDaemon::allDaemons.size())){
994             paradynDaemon *pd;
995             for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
996                 pd = paradynDaemon::allDaemons[i];
997                 pd->getPredictedDataCost(ps_handle,requestId,focus, 
998                                          metName,clientID);
999             }
1000             return;
1001         }
1002     }
1003     // TODO: change this to do the right thing
1004     // this should make the response upcall to the correct calling thread
1005     // perfConsult->getPredictedDataCostCallbackPC(0,0.0);
1006     assert(0);
1007 }
1008
1009 //
1010 // make data enable request to paradynds, and add request entry to
1011 // list of outstanding enable requests
1012 //
1013 void paradynDaemon::enableData(vector<metricInstance *> *miVec,
1014                                vector<bool> *done,
1015                                vector<bool> *enabled,
1016                                DM_enableType *new_entry,
1017                                bool need_to_enable){
1018
1019     // make enable request, pass only pairs that need to be enabled to daemons
1020     if(need_to_enable){  
1021         bool whole_prog_focus = false;
1022         vector<paradynDaemon*> daemon_subset; // which daemons to send request
1023         vector<T_dyninstRPC::focusStruct> foci; 
1024         vector<string> metrics; 
1025         vector<u_int> mi_ids;  
1026
1027         for(u_int i=0; i < miVec->size(); i++){
1028             if(!(*enabled)[i] && !(*done)[i]){
1029                 // create foci, metrics, and mi_ids entries for this mi
1030                 T_dyninstRPC::focusStruct focus;
1031                 string met_name;
1032                 bool aflag;
1033                 aflag=((*miVec)[i]->convertToIDList(focus.focus));
1034                 assert(aflag);
1035                 met_name = (*miVec)[i]->getMetricName();
1036                 foci += focus;
1037                 metrics += met_name;
1038                 mi_ids += (*miVec)[i]->getHandle();
1039                 // set curretly enabling flag on mi 
1040                 (*miVec)[i]->setCurrentlyEnabling();
1041
1042                 // check to see if this focus is refined on the machine
1043                 // or process heirarcy, if so then add the approp. daemon
1044                 // to the daemon_subset, else set whole_prog_focus to true
1045                 if(!whole_prog_focus){
1046                     string machine_name;
1047                     resourceList *rl = (*miVec)[i]->getresourceList(); 
1048                     assert(rl);
1049                     // focus is refined on machine or process heirarchy 
1050                     if(rl->getMachineNameReferredTo(machine_name)){
1051                         // get the daemon corr. to this focus and add it
1052                         // to the list of daemons
1053                         paradynDaemon *pd = 
1054                                 paradynDaemon::machineName2Daemon(machine_name);
1055                         assert(pd);
1056                         bool found = false;
1057                         for(u_int k=0; k< daemon_subset.size(); k++){
1058                           if(pd->id == daemon_subset[k]->id){
1059                               found = true;    
1060                         } }
1061                         if(!found){ // add new daemon to subset list
1062                             daemon_subset += pd;    
1063                         }
1064                         pd = 0;
1065                     }
1066                     else {  // foucs is not refined on process or machine 
1067                         whole_prog_focus = true;
1068                     }
1069                 }
1070         } }
1071         assert(foci.size() == metrics.size());
1072         assert(metrics.size() == mi_ids.size());
1073         assert(daemon_subset.size() <= paradynDaemon::allDaemons.size());
1074         // if there is a whole_prog_focus then make the request to all 
1075         // the daemons, else make the request to the daemon subset
1076         // make enable requests to all daemons
1077         if(whole_prog_focus) {
1078             for(u_int j=0; j < paradynDaemon::allDaemons.size(); j++){
1079                paradynDaemon *pd = paradynDaemon::allDaemons[j]; 
1080                pd->enableDataCollection(foci,metrics,mi_ids,j,
1081                                      new_entry->request_id);
1082             }
1083         }
1084         else {  
1085             // change the enable number in the entry 
1086             new_entry->how_many = daemon_subset.size();
1087             for(u_int j=0; j < daemon_subset.size(); j++){
1088                daemon_subset[j]->enableDataCollection(foci,metrics,mi_ids,
1089                                  daemon_subset[j]->id,new_entry->request_id);
1090             }
1091         }
1092     }
1093     // add entry to outstanding_enables list
1094     paradynDaemon::outstanding_enables += new_entry;
1095     new_entry = 0; 
1096     miVec = 0;
1097     done = 0; 
1098     enabled = 0;
1099 }
1100
1101
1102 // propagateMetrics:
1103 // called when a new process is started, to propagate all enabled metrics to
1104 // the new process.  (QUESTION: should this include when a process makes
1105 // an exec syscall, thus 'starting' another process?)
1106 // Metrics are propagated only if the new process is the only process running 
1107 // on a daemon (this is why we don't need the pid here). If there are already
1108 // other processes running on a daemon, than it is up to the daemon to do the
1109 // propagation (we can't do it here because the daemon has to do the aggregation).
1110 // Calling this function has no effect if there are no metrics enabled.
1111 void paradynDaemon::propagateMetrics() {
1112
1113     vector<metricInstanceHandle> allMIHs = metricInstance::allMetricInstances.keys();
1114
1115     for (unsigned i = 0; i < allMIHs.size(); i++) {
1116
1117       metricInstance *mi = metricInstance::getMI(allMIHs[i]);
1118
1119       if (!mi->isEnabled())
1120          continue;
1121
1122       // first we must find if the daemon already has this metric enabled for
1123       // some process. In this case, we don't need to do anything, the
1124       // daemon will do the propagation by itself.
1125       bool found = false;
1126       for (unsigned j = 0; j < mi->components.size(); j++) {
1127          if (mi->components[j]->getDaemon() == this) {
1128             found = true;
1129             break;
1130          }
1131       }
1132
1133       if (found)
1134          continue; // we don't enable this mi; let paradynd do it
1135
1136       resourceListHandle r_handle = mi->getFocusHandle();
1137       metricHandle m_handle = mi->getMetricHandle();
1138       resourceList *rl = resourceList::getFocus(r_handle);
1139       metric *m = metric::getMetric(m_handle);
1140
1141       vector<u_int> vs;
1142       bool aflag = rl->convertToIDList(vs);
1143       assert(aflag);
1144
1145       int id = enableDataCollection2(vs, (const char *) m->getName(), mi->id);
1146
1147       if (id > 0 && !did_error_occur()) {
1148          component *comp = new component(this, id, mi);
1149          if (!mi->addComponent(comp)) {
1150             cout << "internal error in paradynDaemon::addRunningProgram" << endl;
1151             abort();
1152          }
1153       }
1154     }
1155 }
1156
1157
1158 bool paradynDaemon::setDefaultArgs(char *&name)
1159 {
1160   if (!name)
1161     name = strdup("defd");
1162   if (name)
1163     return true;
1164   else 
1165     return false;
1166 }
1167
1168
1169 bool daemonEntry::setAll (const string &m, const string &c, const string &n,
1170                           const string &l, const string &d, const string &f)
1171 {
1172   if(!n.string_of() || !c.string_of())
1173       return false;
1174
1175   if (m.string_of()) machine = m;
1176   if (c.string_of()) command = c;
1177   if (n.string_of()) name = n;
1178   if (l.string_of()) login = l;
1179   if (d.string_of()) dir = d;
1180   if (d.string_of()) flavor = f;
1181
1182   return true;
1183 }
1184 void daemonEntry::print() 
1185 {
1186   cout << "DAEMON ENTRY\n";
1187   cout << "  name: " << name << endl;
1188   cout << "  command: " << command << endl;
1189   cout << "  dir: " << dir << endl;
1190   cout << "  login: " << login << endl;
1191   cout << "  machine: " << machine << endl;
1192   cout << "  flavor: " << flavor << endl;
1193 }
1194
1195 int paradynDaemon::read(const void* handle, char *buf, const int len) {
1196   assert(0);
1197   int ret, ready_fd;
1198   assert(len > 0);
1199   assert((int)handle<200);
1200   assert((int)handle >= 0);
1201   static vector<unsigned> fd_vect(200);
1202
1203   // must handle the msg_bind_buffered call here because xdr_read will be
1204   // called in the constructor for paradynDaemon, before the previous call
1205   // to msg_bind_buffered had been called
1206
1207   if (!fd_vect[(unsigned)handle]) {
1208     paradynDaemon *pd;
1209     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
1210         pd = paradynDaemon::allDaemons[i];
1211         if(pd->get_fd() == (int)handle)
1212             break;
1213     }
1214     if (!(pd))
1215       return -1;
1216
1217     msg_bind_buffered((int)handle, true, (int(*)(void*))xdrrec_eof,
1218                       (void*)(pd)->net_obj());
1219     fd_vect[(unsigned)handle] = 1;
1220   }
1221
1222   do {
1223     unsigned tag = MSG_TAG_FILE;
1224
1225     do 
1226       ready_fd = msg_poll(&tag, true);
1227     while ((ready_fd != (int) handle) && (ready_fd != THR_ERR));
1228     
1229     if (ready_fd == (int) handle) {
1230       errno = 0;
1231       ret = P_read((int)handle, buf, len);
1232     } else 
1233       return -1;
1234   } while (ret < 0 && errno == EINTR);
1235
1236   if (ret <= 0)
1237     return (-1);
1238   else
1239     return ret;
1240 }
1241
1242 void paradynDaemon::firstSampleCallback(int, double firstTime) {
1243   static bool done = false;
1244   if (!done) {
1245 //  cerr << "paradyn: welcome to firstSampleCallback; firstTime=" << firstTime << "; adjusted time=" << getAdjustedTime(firstTime) << endl;
1246
1247     setEarliestFirstTime(getAdjustedTime(firstTime));
1248   }
1249   done = true;
1250 }
1251
1252
1253 paradynDaemon::paradynDaemon(const string &m, const string &u, const string &c,
1254                              const string &n, const string &f)
1255 : dynRPCUser(m, u, c, NULL, NULL, args, 1, dataManager::sock_fd),
1256   machine(m), login(u), command(c), name(n), flavor(f), activeMids(uiHash)
1257 {
1258   if (!this->errorConditionFound) {
1259     // No problems found in order to create this new daemon process - naim
1260     assert(m.length());
1261     assert(c.length());
1262     assert(n.length());
1263     assert(f.length());
1264
1265     // if c includes a pathname, lose the pathname
1266     const char *loc = P_strrchr(c.string_of(), '/');
1267     if (loc) {
1268       loc = loc + 1;
1269       command = loc;
1270     }
1271   
1272     status = new status_line(machine.string_of());
1273     paradynDaemon *pd = this;
1274     paradynDaemon::allDaemons+=pd;
1275     id = paradynDaemon::allDaemons.size()-1;
1276     assert(paradynDaemon::allDaemons.size() > id); 
1277   }
1278   // else...we leave "errorConditionFound" for the caller to check...
1279   //        don't forget to check!
1280 }
1281
1282 // machine, name, command, flavor and login are set via a callback
1283 paradynDaemon::paradynDaemon(int f)
1284 : dynRPCUser(f, NULL, NULL, 1), flavor(0), activeMids(uiHash){
1285   if (!this->errorConditionFound) {
1286     // No problems found in order to create this new daemon process - naim 
1287     paradynDaemon *pd = this;
1288     paradynDaemon::allDaemons += pd;
1289     id = paradynDaemon::allDaemons.size()-1;
1290   }
1291   // else...we leave "errorConditionFound" for the caller to check...
1292   //        don't forget to check!
1293 }
1294
1295 bool our_print_sample_arrival = false;
1296 void printSampleArrivalCallback(bool newVal) {
1297    our_print_sample_arrival = newVal;
1298 }
1299
1300 // batched version of sampleCallbackFunc
1301 void paradynDaemon::batchSampleDataCallbackFunc(int ,
1302                 vector<T_dyninstRPC::batch_buffer_entry> theBatchBuffer)
1303 {
1304     // get the earliest first time that had been reported by any paradyn
1305     // daemon to use as the base (0) time
1306     bool aflag;
1307     aflag=getEarliestFirstTime();
1308     assert(aflag);
1309
1310   // Just for debugging:
1311   //fprintf(stderr, "in DMdaemon.C, burst size = %d\n", theBatchBuffer.size()) ;
1312
1313     // Go through every item in the batch buffer we've just received and
1314     // process it.
1315     for (unsigned index =0; index < theBatchBuffer.size(); index++) {
1316         T_dyninstRPC::batch_buffer_entry &entry = theBatchBuffer[index] ; 
1317
1318         unsigned mid          = entry.mid ;
1319         double startTimeStamp = entry.startTimeStamp ;
1320         double endTimeStamp   = entry.endTimeStamp ;
1321         double value          = entry.value ;
1322         u_int  weight         = entry.weight;
1323         //bool   internal_metric = entry.internal_met;
1324         startTimeStamp = 
1325             this->getAdjustedTime(startTimeStamp) - getEarliestFirstTime();
1326         endTimeStamp = 
1327             this->getAdjustedTime(endTimeStamp) - getEarliestFirstTime();
1328
1329         if (our_print_sample_arrival) {
1330             cout << "mid " << mid << " " << value << " from "
1331                  << startTimeStamp << " to " << endTimeStamp 
1332                  << " weight " << weight 
1333                  << " machine " << machine.string_of() << "\n";
1334         }
1335
1336         // Okay, the sample is not an error; let's process it.
1337         metricInstance *mi;
1338         bool found = activeMids.find(mid, mi);
1339         if (!found) {
1340            // this can occur due to asynchrony of enable or disable requests
1341            // so just ignore the data
1342           continue;
1343         }
1344         assert(mi);
1345
1346         // Any sample sent by a daemon should not have the start time
1347         // less than lastSampleEnd for the aggregate sample. When a new
1348         // component is added to a metric, the first sample could have
1349         // the startTime less than lastSampleEnd. If this happens,
1350         // the daemon clock must be late (or the time adjustment
1351         // factor is not good enough), and so we must update
1352         // the time adjustment factor for this daemon.
1353         if (startTimeStamp < mi->aggSample.currentTime()) {
1354           timeStamp diff = mi->aggSample.currentTime() - startTimeStamp;
1355           startTimeStamp += diff;
1356           endTimeStamp += diff;
1357           this->setTimeFactor(this->getTimeFactor() + diff);
1358           //printf("*** Adjusting time for %s: diff = %f\n", this->machine.string_of(), diff);
1359         }
1360
1361         struct sampleInterval ret;
1362         if (mi->components.size()){
1363            // find the right component.
1364            component *part = 0;
1365            for(unsigned i=0; i < mi->components.size(); i++) {
1366               if((unsigned)mi->components[i]->daemon == (unsigned)this){
1367                  part = mi->components[i];
1368                  // update the weight associated with this component
1369                  // this does not necessarily need to be updated with
1370                  // each new value as long as we can distinguish between
1371                  // internal and non-internal metric values in some way
1372                  // (internal metrics weight is 1 and regular metrics 
1373                  // weight is the number of processes for this daemon),
1374                  // and the weight is changed when the number of processes 
1375                  // changes (we are not currently doing this part)
1376                  //if(!internal_metric){
1377                  //    mi->num_procs_per_part[i] = weight;
1378                  //}
1379               }
1380            }
1381            if (!part) {
1382               uiMgr->showError(3, "");
1383               return;
1384               //exit(-1);
1385            }
1386
1387            // update the sampleInfo value associated with 
1388            // the daemon that sent the value 
1389            //
1390            if (!part->sample->firstValueReceived())
1391              part->sample->startTime(startTimeStamp);
1392            part->sample->newValue(endTimeStamp, value, weight);
1393         }
1394
1395         // don't aggregate if this metric is still being enabled (we may 
1396         // not have received replies for the enable requests from all the daemons)
1397         if (mi->isCurrentlyEnabling())
1398           continue;
1399
1400         //
1401         // update the metric instance sample value if there is a new
1402         // interval with data for all parts, otherwise this routine
1403         // returns false for ret.valid and the data cannot be bucketed
1404         // by the histograms yet (not all components have sent data for
1405         // this interval)
1406         // newValue will aggregate the parts according to mi's aggOp
1407         //
1408         ret = mi->aggSample.aggregateValues();
1409         
1410         if (ret.valid) {  // there is new data from all components 
1411            assert(ret.end >= 0.0);
1412            assert(ret.start >= 0.0);
1413            assert(ret.end >= ret.start);
1414            mi->enabledTime += ret.end - ret.start;
1415            mi->addInterval(ret.start, ret.end, ret.value, false);
1416         }
1417     } // the main for loop
1418 }
1419
1420 // trace data streams
1421 void paradynDaemon::batchTraceDataCallbackFunc(int ,
1422                 vector<T_dyninstRPC::trace_batch_buffer_entry> theTraceBatchBuffer)
1423 {
1424     // get the earliest first time that had been reported by any paradyn
1425     // daemon to use as the base (0) time
1426     // assert(getEarliestFirstTime());
1427
1428   // Just for debugging:
1429   //fprintf(stderr, "in DMdaemon.C, burst size = %d\n", theTraceBatchBuffer.size());
1430
1431     // Go through every item in the batch buffer we've just received and
1432     // process it.
1433     for (unsigned index =0; index < theTraceBatchBuffer.size(); index++) {
1434         T_dyninstRPC::trace_batch_buffer_entry &entry = theTraceBatchBuffer[index] ;
1435
1436         unsigned mid          = entry.mid ;
1437         unsigned length       = entry.length;
1438
1439         if (our_print_sample_arrival) {
1440             cout << "mid " << mid << " : length = " << length << "\n";
1441         }
1442
1443         // Okay, the sample is not an error; let's process it.
1444         metricInstance *mi;
1445         bool found = activeMids.find(mid, mi);
1446         if (!found) {
1447            // this can occur due to asynchrony of enable or disable requests
1448            // so just ignore the data
1449           continue;
1450         }
1451         assert(mi);
1452         byteArray *localTraceData = new byteArray(entry.traceRecord.getArray(),
1453         length);
1454         mi->sendTraceData(localTraceData->getArray(),length);
1455
1456         delete localTraceData;
1457
1458     } // the main for loop
1459 }
1460
1461 //
1462 // paradyn daemon should never go away.  This represents an error state
1463 //    due to a paradynd being killed for some reason.
1464 //
1465 // TODO -- handle this better
1466 paradynDaemon::~paradynDaemon() {
1467
1468 #ifdef notdef
1469     metricInstance *mi;
1470     HTable<metricInstance*> curr;
1471
1472     allDaemons.remove(this);
1473
1474     // remove the metric ID as required.
1475     for (curr = activeMids; mi = *curr; curr++) {
1476         mi->parts.remove(this);
1477         mi->components.remove(this);
1478     }
1479 #endif
1480     printf("Inconsistant state\n");
1481     abort();
1482 }
1483
1484 //
1485 // When an error is determined on an igen call, this function is
1486 // called, since the default error handler will exit, and we don't
1487 // want paradyn to exit.
1488 //
1489 void paradynDaemon::handle_error()
1490 {
1491    removeDaemon(this, true);
1492 }
1493
1494 //
1495 // When a paradynd is started remotely, ie not by paradyn, this upcall
1496 // reports the information for that paradynd to paradyn
1497 //
1498 // This must set command, name, machine and flavor fields
1499 // (pid no longer used --ari)
1500 //
1501 void 
1502 paradynDaemon::reportSelf (string m, string p, int /*pid*/, string flav)
1503 {
1504   flavor = flav;
1505   if (!m.length() || !p.length()) {
1506     removeDaemon(this, true);
1507     printf("paradyn daemon reported bad info, removed\n");
1508     // error
1509   } else {
1510     machine = m.string_of();
1511     command = p.string_of();
1512     status = new status_line(machine.string_of());
1513
1514     if(flavor == "pvm") {
1515       name = "pvmd";
1516     } else if(flavor == "unix") {
1517       name = "defd";
1518     } else if(flavor == "mpi") {
1519       name = "mpid";
1520     } else if (flavor == "winnt") {
1521       name = "winntd";
1522     } else {
1523       name = flavor;
1524     }
1525   }
1526
1527   // Send the initial metrics, constraints, and other neato things
1528   mdl_send(this);
1529   vector<T_dyninstRPC::metricInfo> info = this->getAvailableMetrics();
1530   unsigned size = info.size();
1531   for (unsigned u=0; u<size; u++)
1532       addMetric(info[u]);
1533
1534   getDaemonTime(this);
1535
1536   return;
1537 }
1538
1539 //
1540 // When a paradynd reports status, send the status to the user
1541 //
1542 void 
1543 paradynDaemon::reportStatus (string line)
1544 {
1545   if (status)
1546     uiMgr->updateStatus(status, P_strdup(line.string_of()));
1547 }
1548
1549 /***
1550  This call is used by a daemon to report a change in the status of a process
1551  such as when the process exits.
1552  When one process exits, we just decrement procRunning, a counter of the number
1553  of processes running. If procRunning is zero, there are no more processes running,
1554  and the status of the application is set to appExited.
1555 ***/
1556 void
1557 paradynDaemon::processStatus(int pid, u_int stat) {
1558   if (stat == procExited) { // process exited
1559     for(unsigned i=0; i < programs.size(); i++) {
1560         if ((programs[i]->pid == (unsigned)pid) && programs[i]->controlPath == this) {
1561         programs[i]->exited = true;
1562         if (--procRunning == 0)
1563           performanceStream::notifyAllChange(appExited);
1564         break;
1565         }
1566     }
1567   }
1568 }
1569  
1570
1571 // Called by a daemon when there is no more data to be sent for a metric
1572 // instance (because the processes have exited).
1573 void
1574 paradynDaemon::endOfDataCollection(int mid) {
1575
1576     if(activeMids.defines(mid)){
1577         metricInstance *mi = activeMids[mid];
1578         assert(mi);
1579         bool aflag;
1580         aflag=(mi->removeComponent(this));
1581         assert(aflag);
1582     }
1583     else{  // check if this mid is for a disabled metric 
1584         bool found = false;
1585         for (unsigned ve=0; ve<disabledMids.size(); ve++) {
1586             if ((int) disabledMids[ve] == mid) {
1587                 found = true;
1588                 break;
1589             }
1590         }
1591         if (!found) {
1592             cout << "Ending data collection for unknown metric" << endl;
1593             uiMgr->showError (2, "Ending data collection for unknown metric");
1594         }
1595     }
1596 }