2 * Copyright (c) 1996 Barton P. Miller
4 * We provide the Paradyn Parallel Performance Tools (below
5 * described as Paradyn") on an AS IS basis, and do not warrant its
6 * validity or performance. We reserve the right to update, modify,
7 * or discontinue this software at any time. We shall have no
8 * obligation to supply such updates or modifications or any other
9 * form of support to you.
11 * This license is for research uses. For such uses, there is no
12 * charge. We define "research use" to mean you may freely use it
13 * inside your organization for whatever purposes you see fit. But you
14 * may not re-distribute Paradyn or parts of Paradyn, in any form
15 * source or binary (including derivatives), electronic or otherwise,
16 * to any other organization or entity without our permission.
18 * (for other uses, please contact us at paradyn@cs.wisc.edu)
20 * All warranties, including without limitation, any warranty of
21 * merchantability or fitness for a particular purpose, are hereby
24 * By your use of Paradyn, you understand and agree that we (or any
25 * other person or entity with proprietary rights in Paradyn) are
26 * under no obligation to provide either maintenance services,
27 * update services, notices of latent defects, or correction of
28 * defects for Paradyn.
30 * Even if advised of the possibility of such damages, under no
31 * circumstances shall we (or any other person or entity with
32 * proprietary rights in the software licensed hereunder) be liable
33 * to you or any third party for direct, indirect, or consequential
34 * damages of any character regardless of type of action, including,
35 * without limitation, loss of profits, loss of use, loss of good
36 * will, or computer failure or malfunction. You agree to indemnify
37 * us (and any other person or entity with proprietary rights in the
38 * software licensed hereunder) for any and all liability it may
39 * incur to third parties resulting from your use of Paradyn.
43 * method functions for paradynDaemon and daemonEntry classes
49 #include <rpc/types.h>
54 #include <sys/types.h>
55 #include <sys/socket.h>
56 #include "thread/h/thread.h"
57 #include "paradyn/src/pdMain/paradyn.h"
58 #include "dataManager.thread.h"
59 #include "dyninstRPC.xdr.CLNT.h"
61 #include "paradyn/src/TCthread/tunableConst.h"
62 #include "paradyn/src/UIthread/Status.h"
64 #include "paradyn/src/met/metricExt.h"
65 #include "util/h/Time.h"
67 // TEMP this should be part of a class def.
68 status_line *DMstatus=NULL;
69 status_line *PROCstatus=NULL;
71 // change a char* that points to "" to point to NULL
72 // NULL is used to signify "NO ARGUMENT"
73 // NULL is easier to detect than "" (which needs a strlen to detect)
75 static void fixArg(char *&argToFix)
77 if (argToFix && !strlen(argToFix))
83 static appState applicationState = appPaused; // status of the application.
86 metricInstance *DM_enableType::findMI(metricInstanceHandle mh){
87 for(u_int i=0; i < request->size(); i++){
88 if(mh == ((*request)[i])->getHandle()){
89 return ((*request)[i]);
94 void DM_enableType::setDone(metricInstanceHandle mh){
95 for(u_int i=0; i < request->size(); i++){
96 if(mh == ((*request)[i])->getHandle()){
103 // find any matching completed mids and update the done values
104 // if a matching value is successfully enabled done=true, else done= false
105 void DM_enableType::updateAny(vector<metricInstance *> &completed_mis,
106 vector<bool> successful){
108 for(u_int i=0; i < done->size(); i++){
109 if(!(*done)[i]){ // try to update element
110 assert(not_all_done);
111 metricInstanceHandle mh = ((*request)[i])->getHandle();
112 for(u_int j=0; j < completed_mis.size(); j++){
113 if(completed_mis[j]){
114 if(completed_mis[j]->getHandle() == mh){
115 if(successful[j]) (*done)[i] = true;
124 // Called whenever a program is ready to run (both programs started by paradyn
125 // and programs forked by other programs). QUESTION: What about when a program does
128 // The new program inherits all enabled metrics, and if the application is
129 // running, the new program must run too.
131 // Called by the igen routine newProgramCallbackFunc()
133 bool paradynDaemon::addRunningProgram (int pid,
134 const vector<string> ¶dynd_argv,
135 paradynDaemon *daemon,
137 bool attached_runMe) {
138 executable *exec = NULL;
140 if (calledFromExec) {
141 for (unsigned i=0; i < programs.size(); i++) {
142 if ((int) programs[i]->pid == pid && programs[i]->controlPath == daemon) {
149 // exec() doesn't change the pid, but paradynd_argv changes (big deal).
150 exec->argv = paradynd_argv;
152 // fall through (we probably want to execute the daemon->continueProcess())
155 // the non-exec (the normal) case follows:
156 exec = new executable (pid, paradynd_argv, daemon);
160 // the following propagates mi's to the new process IF it's the only
161 // process on the daemon. Otherwise, the daemon can and does propagate
162 // on its own. We don't call it in the exec case (above) since we know it
163 // wouldn't do anything.
164 daemon->propagateMetrics();
167 if (applicationState == appRunning || attached_runMe) {
168 //cerr << "paradyn: calling daemon->continueProcess off of addRunningProgram (machine " << daemon->getDaemonMachineName() << "; pid=" << pid << ") !" << endl;
169 daemon->continueProcess(pid);
172 if (procRunning == 1 && attached_runMe) {
173 // currently, Paradyn believes that the application is paused.
174 // Let's change that to 'running'.
176 cerr << "warning: continueAll failed" << endl;
177 uiMgr->enablePauseOrRun();
185 // add a new paradyn daemon
186 // called when a new paradynd contacts the advertised socket
188 bool paradynDaemon::addDaemon (int new_fd)
190 // constructor adds new daemon to dictionary allDaemons
191 paradynDaemon *new_daemon = new paradynDaemon (new_fd);
193 if (new_daemon->errorConditionFound) {
194 //TODO: "something" has to be done for a proper clean up - naim
195 uiMgr->showError(6,"");
200 // KLUDGE: set socket buffer size to 64k to avoid write-write deadlock
201 // between paradyn and paradynd
203 #if defined(sparc_sun_sunos4_1_3) || defined(hppa1_1_hp_hpux)
205 int size = sizeof(num_bytes);
207 if(setsockopt(new_daemon->get_fd(),SOL_SOCKET,SO_SNDBUF,(char *)&num_bytes ,size) < 0){
208 perror("setsocketopt SND_BUF error");
212 msg_bind_buffered (new_daemon->get_fd(), true, (int(*)(void*)) xdrrec_eof,
213 (void*)new_daemon->net_obj());
215 // The pid is reported later in an upcall
219 // TODO : fix this so that it really does clean up state
220 // Dispose of daemon state.
221 // This is called because someone wants to kill a daemon, or the daemon
222 // died and we need to cleanup after it.
224 void paradynDaemon::removeDaemon(paradynDaemon *d, bool informUser)
229 msg = string("paradynd has died on host <") + d->getDaemonMachineName()
231 uiMgr->showError(5, P_strdup(msg.string_of()));
238 List<executable*> progs;
242 // Delete executables running on the dead paradyn daemon.
244 for (progs = programs; e = *progs; progs++) {
245 if (e->controlPath == d) {
253 // tell the thread package to ignore the fd to the daemon.
254 msg_unbind(d->get_fd());
257 timeStamp getCurrentTime(void) {
258 static double previousTime=0.0;
262 aflag=(gettimeofday(&tv, NULL) == 0); // 0 --> success; -1 --> error
265 double seconds_dbl = tv.tv_sec * 1.0;
266 assert(tv.tv_usec < 1000000);
267 double useconds_dbl = tv.tv_usec * 1.0;
269 seconds_dbl += useconds_dbl / 1000000.0;
271 if (seconds_dbl < previousTime) goto retry;
272 previousTime = seconds_dbl;
277 // get the current time of a daemon, to adjust for clock differences.
278 void getDaemonTime(paradynDaemon *pd) {
279 timeStamp t1 = getCurrentTime();
280 timeStamp dt = pd->getTime(); // daemon time
281 timeStamp t2 = getCurrentTime();
282 timeStamp delay = (t2 - t1) / 2.0;
283 pd->setTimeFactor(t1 - dt + delay);
288 // check to see if a daemon that matches the function args exists
289 // if it does exist, return a pointer to it
290 // otherwise, create a new daemon
292 paradynDaemon *paradynDaemon::getDaemonHelper(const string &machine,
294 const string &name) {
296 // find out if we have a paradynd on this machine+login+paradynd
298 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
299 pd = paradynDaemon::allDaemons[i];
300 if ((!machine.string_of() || (pd->machine == machine)) &&
301 (!login.string_of() || (pd->login == login)) &&
302 (name.string_of() && (pd->name == name))) {
307 // find a matching entry in the dicitionary, and start it
308 daemonEntry *def = findEntry(machine, name);
311 string msg = string("Paradyn daemon \"") + name + string("\" not defined.");
312 uiMgr->showError(90,P_strdup(msg.string_of()));
315 uiMgr->showError(91,"");
317 return ((paradynDaemon*) 0);
321 // fill in machine name if emtpy
322 if (!m.string_of()) {
326 char statusLine[256];
327 sprintf(statusLine, "Starting daemon on %s",m.string_of());
328 uiMgr->updateStatus(DMstatus,P_strdup(statusLine));
330 string flav_arg(string("-z") + def->getFlavor());
331 unsigned asize = paradynDaemon::args.size();
332 paradynDaemon::args += flav_arg;
334 pd = new paradynDaemon(m, login, def->getCommandString(),
335 def->getNameString(), def->getFlavorString());
337 if (pd->errorConditionFound) {
338 //TODO: "something" has to be done for a proper clean up - naim
340 msg=string("Cannot create daemon process on host \"") + m + string("\"");
341 uiMgr->showError(84,P_strdup(msg.string_of()));
342 return((paradynDaemon*) 0);
345 #if defined(sparc_sun_sunos4_1_3) || defined(hppa1_1_hp_hpux)
347 int nb_size = sizeof(num_bytes);
349 if(setsockopt(pd->get_fd(),SOL_SOCKET,SO_SNDBUF,(char *)&num_bytes ,nb_size) < 0){
350 perror("setsocketopt SND_BUF error");
354 paradynDaemon::args.resize(asize);
355 uiMgr->updateStatus(DMstatus,P_strdup("ready"));
357 if (pd->get_fd() < 0) {
358 uiMgr->showError (6, "");
359 return((paradynDaemon*) 0);
362 // Send the initial metrics, constraints, and other neato things
364 // Send the initial metrics, constraints, and other neato things
365 vector<T_dyninstRPC::metricInfo> info = pd->getAvailableMetrics();
366 unsigned size = info.size();
367 for (unsigned u=0; u<size; u++)
372 msg_bind_buffered(pd->get_fd(), true, (int(*)(void*))xdrrec_eof,
373 (void*) pd->net_obj());
378 // add a new daemon, unless a daemon is already running on that machine
379 // with the same machine, login, and program
381 bool paradynDaemon::getDaemon (const string &machine,
385 if (!getDaemonHelper(machine, login, name)){
392 // define a new entry for the daemon dictionary, or change an existing entry
394 bool paradynDaemon::defineDaemon (const char *c, const char *d,
395 const char *l, const char *n,
396 const char *m, const char *f) {
408 for(unsigned i=0; i < allEntries.size(); i++){
409 newE = allEntries[i];
410 if(newE->getNameString() == name){
411 if (newE->setAll(machine, command, name, login, dir, flavor))
418 newE = new daemonEntry(machine, command, name, login, dir, flavor);
426 daemonEntry *paradynDaemon::findEntry(const string &,
429 // if (!n) return ((daemonEntry*) 0);
430 for(unsigned i=0; i < allEntries.size(); i++){
431 daemonEntry *newE = allEntries[i];
432 if(newE->getNameString() == n){
436 return ((daemonEntry*) 0);
440 void paradynDaemon::tellDaemonsOfResource(u_int parent, u_int my_id,
441 const char *name, unsigned type) {
443 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
444 pd = paradynDaemon::allDaemons[i];
445 pd->addResource(parent,my_id,name, type);
450 void paradynDaemon::printEntries()
453 for(unsigned i=0; i < allEntries.size(); i++){
454 entry = allEntries[i];
459 void paradynDaemon::print()
462 cout << " name: " << name << endl;
463 cout << " command: " << command << endl;
464 cout << " login: " << login << endl;
465 cout << " machine: " << machine << endl;
466 cout << " flavor: " << flavor << endl;
469 void paradynDaemon::printDaemons()
472 cout << "ACTIVE DAEMONS\n";
473 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
474 pd = paradynDaemon::allDaemons[i];
479 void paradynDaemon::printPrograms()
482 for(unsigned i=0; i < programs.size(); i++){
484 cout << "PROGRAM ENTRY\n";
485 cout << "pid : " << entry->pid << endl;
486 entry->controlPath->print();
491 // Return list of names of defined daemons.
493 vector<string> *paradynDaemon::getAvailableDaemons()
495 vector<string> *names = new vector<string>;
498 for(unsigned i=0; i < allEntries.size(); i++){
499 entry = allEntries[i];
500 *names += entry->getName();
506 // For a given machine name, find the appropriate paradynd structure.
507 // Returns NULL if an appropriate matching entry can't be found.
508 paradynDaemon *paradynDaemon::machineName2Daemon(const string &theMachineName) {
509 for (unsigned i=0; i < allDaemons.size(); i++) {
510 paradynDaemon *theDaemon = allDaemons[i];
511 if (theDaemon->getDaemonMachineName() == theMachineName)
514 return 0; // failure; this machine name isn't known!
518 static int startBlzApp(const string &machine,
522 const vector<string> &argv)
524 //int firstPVM = 1 ; // "-v" 1
525 //int flag = 2; // "-l" 2
526 // flavor == "-z" cow
527 string localhost ; // "-m" gethostname
528 int port = dataManager::dm->socket ; // "-p" dataManager::sock_fd
531 gethostname(temp, 32);
532 if((p = strchr(temp, '.'))) *p = '\0' ;
535 int from = 1, to = 4 ;
536 int time= 20, nodes = 4;
539 bool use_range = false ;
541 //parse the application commandline
542 for(unsigned i=0; i<argv.size(); i++)
544 if(strncmp(argv[i].string_of(), "-nodes", 6)==0)
545 sscanf(argv[i].string_of(), "-nodes%d", &nodes) ;
546 else if(strncmp(argv[i].string_of(), "-range", 6)==0){
547 sscanf(argv[i].string_of(), "-range%d:%d", &from, &to) ;
550 else if(strncmp(argv[i].string_of(), "-time", 5)==0)
551 sscanf(argv[i].string_of(), "-time%d", &time) ;
553 if(application.length() > 0)
555 application += argv[i] ;
561 strcpy(temp, application.string_of()) ;
562 printf("application = [%s]\n", application.string_of()) ;
563 if((p = strchr(temp, ' ')))
565 else if((p = strchr(temp, '<')))
567 else if((p = strchr(temp, '>')))
569 p = strrchr(temp, '/') ; *p = '\0' ;
571 printf("directory=[%s]\n", directory.string_of()) ;
574 string cpScript, pnScript ;
575 cpScript = directory + "/PD_ctrl_script" ;
576 pnScript = directory + "/PD_node_script" ;
578 //make the serverscript
579 FILE *fp = fopen(cpScript.string_of(), "w") ;
580 fprintf(fp, "#!/bin/csh\n") ;
581 fprintf(fp, "crsh all %s\n", pnScript.string_of()) ;
583 system("chmod +x PD_ctrl_script") ;
586 fp = fopen(pnScript.string_of(), "w") ;
587 fprintf (fp, "#!/bin/csh\n") ;
588 fprintf (fp, "setenv LAM_CONFIG /p/wwt/myrinet/configs/lam.conf.master\n") ;
589 fprintf (fp, "cd %s\n", directory.string_of()) ;
590 //paradynd -p36622 -mgoofy -l1 -v1 -zcow -runme ....
591 fprintf (fp, "paradynd -p%d -m%s -l0 -v1 -zcow -runme %s\n",
593 localhost.string_of(),
594 application.string_of()) ;
596 system("chmod +x PD_node_script") ;
601 sprintf(temp, "%d", time) ;
602 string timeFrame ; timeFrame += temp ;
606 char sfrom[64], sto[64] ;
607 sprintf(sfrom, "%d", from) ;
608 sprintf(sto, "%d", to) ;
609 nodeRange += "-range ";
615 sprintf(temp, "%d", nodes) ;
616 nodeRange += "-nodes " ;
621 sprintf(temp, "Starting job on nodes %d-%d, for %d minutes\n", from, to, time)
624 sprintf(temp, "Starting job on %d nodes, for %d minutes\n", nodes, time) ;
625 uiMgr->updateStatus(DMstatus,P_strdup(temp));
631 ret = execlp("rsh", "rsh", machine.string_of(), "-l",
632 login.string_of(), "-n",
633 "/p/cow/bin/crun", cpScript.string_of() ,
636 "-time", timeFrame.string_of(),
637 nodeRange.string_of(),
642 ret = execlp("rsh", "rsh", machine.string_of(), "-n",
643 "/p/cow/bin/crun", cpScript.string_of() ,
646 "-time", timeFrame.string_of(),
647 nodeRange.string_of(),
650 fprintf(stderr,"rshCommand: execlp failed (ret = %d)\n",ret);
652 } else if( shellPid > 0)
654 else return false; // error situation
663 // add a new executable (binary) to a program.
665 bool paradynDaemon::newExecutable(const string &machine,
669 const vector<string> &argv){
672 DMstatus = new status_line("Data Manager");
675 PROCstatus = new status_line("Processes");
678 //Added to start blizzard application, Tempest
680 daemonEntry *def = findEntry(machine, name) ;
683 string msg = string("Paradyn daemon \"") + name + string("\" not defined.");
684 uiMgr->showError(90,P_strdup(msg.string_of()));
687 uiMgr->showError(91,"");
691 if(def->getFlavorString() == "cow")
692 return startBlzApp(machine, login, name, dir, argv) ;
694 paradynDaemon *daemon;
695 if ((daemon=getDaemonHelper(machine, login, name)) == (paradynDaemon*) NULL)
698 performanceStream::ResourceBatchMode(batchStart);
699 int pid = daemon->addExecutable(argv, dir);
700 performanceStream::ResourceBatchMode(batchEnd);
702 // did the application get started ok?
703 if (pid > 0 && !daemon->did_error_occur()) {
706 sprintf (tmp_buf, "PID=%d", pid);
707 uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
709 executable *exec = new executable(pid, argv, daemon);
710 paradynDaemon::programs += exec;
719 bool paradynDaemon::attachStub(const string &machine,
720 const string &userName,
721 const string &cmd, // program name (full path)
723 const string &daemonName,
724 int afterAttach // 0 --> as is, 1 --> pause, 2 --> run
726 // Note: by this time, both the RUN and PAUSE buttons have been disabled in the
730 DMstatus = new status_line("Data Manager");
733 PROCstatus = new status_line("Processes");
735 paradynDaemon *daemon = getDaemonHelper(machine, userName, daemonName);
740 sprintf (tmp_buf, "attaching to PID=%d...", the_pid);
741 uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
743 performanceStream::ResourceBatchMode(batchStart);
744 bool success = daemon->attach(cmd, the_pid, afterAttach);
745 performanceStream::ResourceBatchMode(batchEnd);
747 if (daemon->did_error_occur())
753 sprintf (tmp_buf, "PID=%d", the_pid);
754 uiMgr->updateStatus(PROCstatus, P_strdup(tmp_buf));
755 return true; // success
759 // start the programs running.
761 bool paradynDaemon::startApplication()
764 for(unsigned i=0; i < programs.size(); i++){
766 prog->controlPath->startProgram(prog->pid);
772 // pause all processes.
774 bool paradynDaemon::pauseAll()
777 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
778 pd = paradynDaemon::allDaemons[i];
779 pd->pauseApplication();
781 // tell perf streams about change.
782 performanceStream::notifyAllChange(appPaused);
783 applicationState = appPaused;
788 // pause one processes.
790 bool paradynDaemon::pauseProcess(unsigned pid)
792 executable *exec = 0;
793 for(unsigned i=0; i < programs.size(); i++){
800 exec->controlPath->pauseProgram(exec->pid);
807 // continue all processes.
809 bool paradynDaemon::continueAll()
813 if (programs.size() == 0)
814 return false; // no program to pause
816 if (procRunning == 0)
819 for(int i = paradynDaemon::allDaemons.size()-1; i >= 0; i--)
821 pd = paradynDaemon::allDaemons[i];
822 pd->continueApplication();
824 // tell perf streams about change.
825 performanceStream::notifyAllChange(appRunning);
826 applicationState = appRunning;
831 // continue one processes.
833 bool paradynDaemon::continueProcess(unsigned pid)
835 executable *exec = 0;
836 for(unsigned i=0; i < programs.size(); i++){
843 exec->controlPath->continueProgram(exec->pid);
850 // detach the paradyn tool from a running program. This should clean all
851 // of the dynamic instrumentation that has been inserted.
853 bool paradynDaemon::detachApplication(bool pause)
855 executable *exec = 0;
856 for(unsigned i=0; i < programs.size(); i++){
858 exec->controlPath->detachProgram(exec->pid,pause);
864 // print the status of each process. This is used mostly for debugging.
866 void paradynDaemon::printStatus()
868 executable *exec = 0;
869 for(unsigned i=0; i < programs.size(); i++){
871 string status = exec->controlPath->getStatus(exec->pid);
872 if (!exec->controlPath->did_error_occur()) {
873 cout << status << endl;
879 // Cause the passed process id to dump a core file. This is also used for
881 // If pid = -1, all processes will dump core files.
883 void paradynDaemon::dumpCore(int pid)
885 executable *exec = 0;
886 for(unsigned i=0; i < programs.size(); i++){
888 if ((exec->pid == (unsigned)pid) || (pid == -1)) {
889 exec->controlPath->coreProcess(exec->pid);
890 printf("found process and coreing it\n");
896 bool paradynDaemon::setInstSuppress(resource *res, bool newValue)
900 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
901 pd = paradynDaemon::allDaemons[i];
902 ret |= pd->setTracking(res->getHandle(), newValue);
908 // signal from daemon that is is about to start or end a set
909 // of new resource definitions
911 void paradynDaemon::resourceBatchMode(bool onNow){
921 for(u_int i=0; i < allDaemons.size(); i++){
922 (allDaemons[i])->reportResources();
924 performanceStream::ResourceBatchMode(batchEnd);
926 performanceStream::ResourceBatchMode(batchStart);
931 // reportResources: send new resource ids to daemon
933 void paradynDaemon::reportResources(){
934 assert(newResourceTempIds.size() == newResourceHandles.size());
935 resourceInfoResponse(newResourceTempIds, newResourceHandles);
936 newResourceTempIds.resize(0);
937 newResourceHandles.resize(0);
941 // upcall from paradynd reporting new resource
943 void paradynDaemon::resourceInfoCallback(u_int temporaryId,
944 vector<string> resource_name,
945 string abstr, u_int type) {
947 resourceHandle r = createResource(temporaryId, resource_name, abstr, type);
949 if (r != temporaryId) {
950 vector<u_int>tempIds; vector<u_int>rIds;
951 tempIds += temporaryId; rIds += r;
952 resourceInfoResponse(tempIds, rIds);
956 if (r != temporaryId) {
957 newResourceTempIds += temporaryId;
958 newResourceHandles += r;
959 assert(newResourceTempIds.size() == newResourceHandles.size());
964 void paradynDaemon::severalResourceInfoCallback(vector<T_dyninstRPC::resourceInfoCallbackStruct> items) {
965 for (unsigned lcv=0; lcv < items.size(); lcv++)
966 resourceInfoCallback(items[lcv].temporaryId,
967 items[lcv].resource_name,
968 items[lcv].abstraction,
974 // Get the expected delay (as a fraction of the running program) for the passed
975 // resource list (focus) and metric.
977 void paradynDaemon::getPredictedDataCostCall(perfStreamHandle ps_handle,
978 metricHandle m_handle,
979 resourceListHandle rl_handle,
987 aflag=rl->convertToIDList(focus);
989 const char *metName = m->getName();
992 if(performanceStream::addPredCostRequest(ps_handle,requestId,m_handle,
993 rl_handle, paradynDaemon::allDaemons.size())){
995 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
996 pd = paradynDaemon::allDaemons[i];
997 pd->getPredictedDataCost(ps_handle,requestId,focus,
1003 // TODO: change this to do the right thing
1004 // this should make the response upcall to the correct calling thread
1005 // perfConsult->getPredictedDataCostCallbackPC(0,0.0);
1010 // make data enable request to paradynds, and add request entry to
1011 // list of outstanding enable requests
1013 void paradynDaemon::enableData(vector<metricInstance *> *miVec,
1015 vector<bool> *enabled,
1016 DM_enableType *new_entry,
1017 bool need_to_enable){
1019 // make enable request, pass only pairs that need to be enabled to daemons
1021 bool whole_prog_focus = false;
1022 vector<paradynDaemon*> daemon_subset; // which daemons to send request
1023 vector<T_dyninstRPC::focusStruct> foci;
1024 vector<string> metrics;
1025 vector<u_int> mi_ids;
1027 for(u_int i=0; i < miVec->size(); i++){
1028 if(!(*enabled)[i] && !(*done)[i]){
1029 // create foci, metrics, and mi_ids entries for this mi
1030 T_dyninstRPC::focusStruct focus;
1033 aflag=((*miVec)[i]->convertToIDList(focus.focus));
1035 met_name = (*miVec)[i]->getMetricName();
1037 metrics += met_name;
1038 mi_ids += (*miVec)[i]->getHandle();
1039 // set curretly enabling flag on mi
1040 (*miVec)[i]->setCurrentlyEnabling();
1042 // check to see if this focus is refined on the machine
1043 // or process heirarcy, if so then add the approp. daemon
1044 // to the daemon_subset, else set whole_prog_focus to true
1045 if(!whole_prog_focus){
1046 string machine_name;
1047 resourceList *rl = (*miVec)[i]->getresourceList();
1049 // focus is refined on machine or process heirarchy
1050 if(rl->getMachineNameReferredTo(machine_name)){
1051 // get the daemon corr. to this focus and add it
1052 // to the list of daemons
1054 paradynDaemon::machineName2Daemon(machine_name);
1057 for(u_int k=0; k< daemon_subset.size(); k++){
1058 if(pd->id == daemon_subset[k]->id){
1061 if(!found){ // add new daemon to subset list
1062 daemon_subset += pd;
1066 else { // foucs is not refined on process or machine
1067 whole_prog_focus = true;
1071 assert(foci.size() == metrics.size());
1072 assert(metrics.size() == mi_ids.size());
1073 assert(daemon_subset.size() <= paradynDaemon::allDaemons.size());
1074 // if there is a whole_prog_focus then make the request to all
1075 // the daemons, else make the request to the daemon subset
1076 // make enable requests to all daemons
1077 if(whole_prog_focus) {
1078 for(u_int j=0; j < paradynDaemon::allDaemons.size(); j++){
1079 paradynDaemon *pd = paradynDaemon::allDaemons[j];
1080 pd->enableDataCollection(foci,metrics,mi_ids,j,
1081 new_entry->request_id);
1085 // change the enable number in the entry
1086 new_entry->how_many = daemon_subset.size();
1087 for(u_int j=0; j < daemon_subset.size(); j++){
1088 daemon_subset[j]->enableDataCollection(foci,metrics,mi_ids,
1089 daemon_subset[j]->id,new_entry->request_id);
1093 // add entry to outstanding_enables list
1094 paradynDaemon::outstanding_enables += new_entry;
1102 // propagateMetrics:
1103 // called when a new process is started, to propagate all enabled metrics to
1104 // the new process. (QUESTION: should this include when a process makes
1105 // an exec syscall, thus 'starting' another process?)
1106 // Metrics are propagated only if the new process is the only process running
1107 // on a daemon (this is why we don't need the pid here). If there are already
1108 // other processes running on a daemon, than it is up to the daemon to do the
1109 // propagation (we can't do it here because the daemon has to do the aggregation).
1110 // Calling this function has no effect if there are no metrics enabled.
1111 void paradynDaemon::propagateMetrics() {
1113 vector<metricInstanceHandle> allMIHs = metricInstance::allMetricInstances.keys();
1115 for (unsigned i = 0; i < allMIHs.size(); i++) {
1117 metricInstance *mi = metricInstance::getMI(allMIHs[i]);
1119 if (!mi->isEnabled())
1122 // first we must find if the daemon already has this metric enabled for
1123 // some process. In this case, we don't need to do anything, the
1124 // daemon will do the propagation by itself.
1126 for (unsigned j = 0; j < mi->components.size(); j++) {
1127 if (mi->components[j]->getDaemon() == this) {
1134 continue; // we don't enable this mi; let paradynd do it
1136 resourceListHandle r_handle = mi->getFocusHandle();
1137 metricHandle m_handle = mi->getMetricHandle();
1138 resourceList *rl = resourceList::getFocus(r_handle);
1139 metric *m = metric::getMetric(m_handle);
1142 bool aflag = rl->convertToIDList(vs);
1145 int id = enableDataCollection2(vs, (const char *) m->getName(), mi->id);
1147 if (id > 0 && !did_error_occur()) {
1148 component *comp = new component(this, id, mi);
1149 if (!mi->addComponent(comp)) {
1150 cout << "internal error in paradynDaemon::addRunningProgram" << endl;
1158 bool paradynDaemon::setDefaultArgs(char *&name)
1161 name = strdup("defd");
1169 bool daemonEntry::setAll (const string &m, const string &c, const string &n,
1170 const string &l, const string &d, const string &f)
1172 if(!n.string_of() || !c.string_of())
1175 if (m.string_of()) machine = m;
1176 if (c.string_of()) command = c;
1177 if (n.string_of()) name = n;
1178 if (l.string_of()) login = l;
1179 if (d.string_of()) dir = d;
1180 if (d.string_of()) flavor = f;
1184 void daemonEntry::print()
1186 cout << "DAEMON ENTRY\n";
1187 cout << " name: " << name << endl;
1188 cout << " command: " << command << endl;
1189 cout << " dir: " << dir << endl;
1190 cout << " login: " << login << endl;
1191 cout << " machine: " << machine << endl;
1192 cout << " flavor: " << flavor << endl;
1195 int paradynDaemon::read(const void* handle, char *buf, const int len) {
1199 assert((int)handle<200);
1200 assert((int)handle >= 0);
1201 static vector<unsigned> fd_vect(200);
1203 // must handle the msg_bind_buffered call here because xdr_read will be
1204 // called in the constructor for paradynDaemon, before the previous call
1205 // to msg_bind_buffered had been called
1207 if (!fd_vect[(unsigned)handle]) {
1209 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
1210 pd = paradynDaemon::allDaemons[i];
1211 if(pd->get_fd() == (int)handle)
1217 msg_bind_buffered((int)handle, true, (int(*)(void*))xdrrec_eof,
1218 (void*)(pd)->net_obj());
1219 fd_vect[(unsigned)handle] = 1;
1223 unsigned tag = MSG_TAG_FILE;
1226 ready_fd = msg_poll(&tag, true);
1227 while ((ready_fd != (int) handle) && (ready_fd != THR_ERR));
1229 if (ready_fd == (int) handle) {
1231 ret = P_read((int)handle, buf, len);
1234 } while (ret < 0 && errno == EINTR);
1242 void paradynDaemon::firstSampleCallback(int, double firstTime) {
1243 static bool done = false;
1245 // cerr << "paradyn: welcome to firstSampleCallback; firstTime=" << firstTime << "; adjusted time=" << getAdjustedTime(firstTime) << endl;
1247 setEarliestFirstTime(getAdjustedTime(firstTime));
1253 paradynDaemon::paradynDaemon(const string &m, const string &u, const string &c,
1254 const string &n, const string &f)
1255 : dynRPCUser(m, u, c, NULL, NULL, args, 1, dataManager::sock_fd),
1256 machine(m), login(u), command(c), name(n), flavor(f), activeMids(uiHash)
1258 if (!this->errorConditionFound) {
1259 // No problems found in order to create this new daemon process - naim
1265 // if c includes a pathname, lose the pathname
1266 const char *loc = P_strrchr(c.string_of(), '/');
1272 status = new status_line(machine.string_of());
1273 paradynDaemon *pd = this;
1274 paradynDaemon::allDaemons+=pd;
1275 id = paradynDaemon::allDaemons.size()-1;
1276 assert(paradynDaemon::allDaemons.size() > id);
1278 // else...we leave "errorConditionFound" for the caller to check...
1279 // don't forget to check!
1282 // machine, name, command, flavor and login are set via a callback
1283 paradynDaemon::paradynDaemon(int f)
1284 : dynRPCUser(f, NULL, NULL, 1), flavor(0), activeMids(uiHash){
1285 if (!this->errorConditionFound) {
1286 // No problems found in order to create this new daemon process - naim
1287 paradynDaemon *pd = this;
1288 paradynDaemon::allDaemons += pd;
1289 id = paradynDaemon::allDaemons.size()-1;
1291 // else...we leave "errorConditionFound" for the caller to check...
1292 // don't forget to check!
1295 bool our_print_sample_arrival = false;
1296 void printSampleArrivalCallback(bool newVal) {
1297 our_print_sample_arrival = newVal;
1300 // batched version of sampleCallbackFunc
1301 void paradynDaemon::batchSampleDataCallbackFunc(int ,
1302 vector<T_dyninstRPC::batch_buffer_entry> theBatchBuffer)
1304 // get the earliest first time that had been reported by any paradyn
1305 // daemon to use as the base (0) time
1307 aflag=getEarliestFirstTime();
1310 // Just for debugging:
1311 //fprintf(stderr, "in DMdaemon.C, burst size = %d\n", theBatchBuffer.size()) ;
1313 // Go through every item in the batch buffer we've just received and
1315 for (unsigned index =0; index < theBatchBuffer.size(); index++) {
1316 T_dyninstRPC::batch_buffer_entry &entry = theBatchBuffer[index] ;
1318 unsigned mid = entry.mid ;
1319 double startTimeStamp = entry.startTimeStamp ;
1320 double endTimeStamp = entry.endTimeStamp ;
1321 double value = entry.value ;
1322 u_int weight = entry.weight;
1323 //bool internal_metric = entry.internal_met;
1325 this->getAdjustedTime(startTimeStamp) - getEarliestFirstTime();
1327 this->getAdjustedTime(endTimeStamp) - getEarliestFirstTime();
1329 if (our_print_sample_arrival) {
1330 cout << "mid " << mid << " " << value << " from "
1331 << startTimeStamp << " to " << endTimeStamp
1332 << " weight " << weight
1333 << " machine " << machine.string_of() << "\n";
1336 // Okay, the sample is not an error; let's process it.
1338 bool found = activeMids.find(mid, mi);
1340 // this can occur due to asynchrony of enable or disable requests
1341 // so just ignore the data
1346 // Any sample sent by a daemon should not have the start time
1347 // less than lastSampleEnd for the aggregate sample. When a new
1348 // component is added to a metric, the first sample could have
1349 // the startTime less than lastSampleEnd. If this happens,
1350 // the daemon clock must be late (or the time adjustment
1351 // factor is not good enough), and so we must update
1352 // the time adjustment factor for this daemon.
1353 if (startTimeStamp < mi->aggSample.currentTime()) {
1354 timeStamp diff = mi->aggSample.currentTime() - startTimeStamp;
1355 startTimeStamp += diff;
1356 endTimeStamp += diff;
1357 this->setTimeFactor(this->getTimeFactor() + diff);
1358 //printf("*** Adjusting time for %s: diff = %f\n", this->machine.string_of(), diff);
1361 struct sampleInterval ret;
1362 if (mi->components.size()){
1363 // find the right component.
1364 component *part = 0;
1365 for(unsigned i=0; i < mi->components.size(); i++) {
1366 if((unsigned)mi->components[i]->daemon == (unsigned)this){
1367 part = mi->components[i];
1368 // update the weight associated with this component
1369 // this does not necessarily need to be updated with
1370 // each new value as long as we can distinguish between
1371 // internal and non-internal metric values in some way
1372 // (internal metrics weight is 1 and regular metrics
1373 // weight is the number of processes for this daemon),
1374 // and the weight is changed when the number of processes
1375 // changes (we are not currently doing this part)
1376 //if(!internal_metric){
1377 // mi->num_procs_per_part[i] = weight;
1382 uiMgr->showError(3, "");
1387 // update the sampleInfo value associated with
1388 // the daemon that sent the value
1390 if (!part->sample->firstValueReceived())
1391 part->sample->startTime(startTimeStamp);
1392 part->sample->newValue(endTimeStamp, value, weight);
1395 // don't aggregate if this metric is still being enabled (we may
1396 // not have received replies for the enable requests from all the daemons)
1397 if (mi->isCurrentlyEnabling())
1401 // update the metric instance sample value if there is a new
1402 // interval with data for all parts, otherwise this routine
1403 // returns false for ret.valid and the data cannot be bucketed
1404 // by the histograms yet (not all components have sent data for
1406 // newValue will aggregate the parts according to mi's aggOp
1408 ret = mi->aggSample.aggregateValues();
1410 if (ret.valid) { // there is new data from all components
1411 assert(ret.end >= 0.0);
1412 assert(ret.start >= 0.0);
1413 assert(ret.end >= ret.start);
1414 mi->enabledTime += ret.end - ret.start;
1415 mi->addInterval(ret.start, ret.end, ret.value, false);
1417 } // the main for loop
1420 // trace data streams
1421 void paradynDaemon::batchTraceDataCallbackFunc(int ,
1422 vector<T_dyninstRPC::trace_batch_buffer_entry> theTraceBatchBuffer)
1424 // get the earliest first time that had been reported by any paradyn
1425 // daemon to use as the base (0) time
1426 // assert(getEarliestFirstTime());
1428 // Just for debugging:
1429 //fprintf(stderr, "in DMdaemon.C, burst size = %d\n", theTraceBatchBuffer.size());
1431 // Go through every item in the batch buffer we've just received and
1433 for (unsigned index =0; index < theTraceBatchBuffer.size(); index++) {
1434 T_dyninstRPC::trace_batch_buffer_entry &entry = theTraceBatchBuffer[index] ;
1436 unsigned mid = entry.mid ;
1437 unsigned length = entry.length;
1439 if (our_print_sample_arrival) {
1440 cout << "mid " << mid << " : length = " << length << "\n";
1443 // Okay, the sample is not an error; let's process it.
1445 bool found = activeMids.find(mid, mi);
1447 // this can occur due to asynchrony of enable or disable requests
1448 // so just ignore the data
1452 byteArray *localTraceData = new byteArray(entry.traceRecord.getArray(),
1454 mi->sendTraceData(localTraceData->getArray(),length);
1456 delete localTraceData;
1458 } // the main for loop
1462 // paradyn daemon should never go away. This represents an error state
1463 // due to a paradynd being killed for some reason.
1465 // TODO -- handle this better
1466 paradynDaemon::~paradynDaemon() {
1470 HTable<metricInstance*> curr;
1472 allDaemons.remove(this);
1474 // remove the metric ID as required.
1475 for (curr = activeMids; mi = *curr; curr++) {
1476 mi->parts.remove(this);
1477 mi->components.remove(this);
1480 printf("Inconsistant state\n");
1485 // When an error is determined on an igen call, this function is
1486 // called, since the default error handler will exit, and we don't
1487 // want paradyn to exit.
1489 void paradynDaemon::handle_error()
1491 removeDaemon(this, true);
1495 // When a paradynd is started remotely, ie not by paradyn, this upcall
1496 // reports the information for that paradynd to paradyn
1498 // This must set command, name, machine and flavor fields
1499 // (pid no longer used --ari)
1502 paradynDaemon::reportSelf (string m, string p, int /*pid*/, string flav)
1505 if (!m.length() || !p.length()) {
1506 removeDaemon(this, true);
1507 printf("paradyn daemon reported bad info, removed\n");
1510 machine = m.string_of();
1511 command = p.string_of();
1512 status = new status_line(machine.string_of());
1514 if(flavor == "pvm") {
1516 } else if(flavor == "unix") {
1518 } else if(flavor == "mpi") {
1520 } else if (flavor == "winnt") {
1527 // Send the initial metrics, constraints, and other neato things
1529 vector<T_dyninstRPC::metricInfo> info = this->getAvailableMetrics();
1530 unsigned size = info.size();
1531 for (unsigned u=0; u<size; u++)
1534 getDaemonTime(this);
1540 // When a paradynd reports status, send the status to the user
1543 paradynDaemon::reportStatus (string line)
1546 uiMgr->updateStatus(status, P_strdup(line.string_of()));
1550 This call is used by a daemon to report a change in the status of a process
1551 such as when the process exits.
1552 When one process exits, we just decrement procRunning, a counter of the number
1553 of processes running. If procRunning is zero, there are no more processes running,
1554 and the status of the application is set to appExited.
1557 paradynDaemon::processStatus(int pid, u_int stat) {
1558 if (stat == procExited) { // process exited
1559 for(unsigned i=0; i < programs.size(); i++) {
1560 if ((programs[i]->pid == (unsigned)pid) && programs[i]->controlPath == this) {
1561 programs[i]->exited = true;
1562 if (--procRunning == 0)
1563 performanceStream::notifyAllChange(appExited);
1571 // Called by a daemon when there is no more data to be sent for a metric
1572 // instance (because the processes have exited).
1574 paradynDaemon::endOfDataCollection(int mid) {
1576 if(activeMids.defines(mid)){
1577 metricInstance *mi = activeMids[mid];
1580 aflag=(mi->removeComponent(this));
1583 else{ // check if this mid is for a disabled metric
1585 for (unsigned ve=0; ve<disabledMids.size(); ve++) {
1586 if ((int) disabledMids[ve] == mid) {
1592 cout << "Ending data collection for unknown metric" << endl;
1593 uiMgr->showError (2, "Ending data collection for unknown metric");