severalResourceInfoCallback
[dyninst.git] / paradyn / src / DMthread / DMmain.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 #include <assert.h>
43 extern "C" {
44 double   quiet_nan();
45 #include <malloc.h>
46 #include <stdio.h>
47 }
48
49 #include "thread/h/thread.h"
50 #include "paradyn/src/TCthread/tunableConst.h"
51 #include "dataManager.thread.SRVR.h"
52 #include "dyninstRPC.xdr.CLNT.h"
53 #include "DMdaemon.h"
54 #include "DMmetric.h"
55 #include "DMperfstream.h"
56 #include "DMabstractions.h"
57 #include "paradyn/src/pdMain/paradyn.h"
58 #include "paradyn/src/UIthread/Status.h"
59
60 #include "util/h/Vector.h"
61 #include "util/h/Dictionary.h"
62 #include "util/h/String.h"
63 // trace data streams
64 #include "util/h/ByteArray.h"
65 #include "DMphase.h"
66
67 typedef vector<string> blahType;
68
69 // bool parse_metrics(string metric_file);
70 bool metMain(string &userFile);
71
72 // this has to be declared before baseAbstr, cmfAbstr, and rootResource 
73 int dataManager::sock_fd;  
74 int dataManager::socket;  
75 dataManager *dataManager::dm = NULL;  
76
77 dictionary_hash<string,abstraction*> abstraction::allAbstractions(string::hash);
78 abstraction *baseAbstr = new abstraction("BASE");
79 abstraction *cmfAbstr = new abstraction("CMF");
80
81 dictionary_hash<string,metric*> metric::allMetrics(string::hash);
82 dictionary_hash<metricInstanceHandle,metricInstance *> 
83                 metricInstance::allMetricInstances(metricInstance::mhash);
84 dictionary_hash<perfStreamHandle,performanceStream*>  
85                 performanceStream::allStreams(performanceStream::pshash);
86 dictionary_hash<string, resource*> resource::allResources(string::hash, 8192);
87 dictionary_hash<string,resourceList *> resourceList::allFoci(string::hash);
88
89 dictionary_hash<unsigned, resource*>resource::resources(uiHash);
90 vector<string> resource::lib_constraints;
91 vector< vector<string> > resource::func_constraints;
92 bool resource::func_constraints_built = false;
93 bool resource::lib_constraints_built = false;
94
95 vector<metric*> metric::metrics;
96 vector<paradynDaemon*> paradynDaemon::allDaemons;
97 vector<daemonEntry*> paradynDaemon::allEntries;
98 vector<executable*> paradynDaemon::programs;
99 unsigned paradynDaemon::procRunning;
100 vector<resourceList *> resourceList::foci;
101 vector<phaseInfo *> phaseInfo::dm_phases;
102 u_int metricInstance::next_id = 1;
103 // u_int performanceStream::next_id = 0;
104 vector<DM_enableType*> paradynDaemon::outstanding_enables;  
105 u_int paradynDaemon::next_enable_id = 0;  
106 u_int paradynDaemon::count = 0;
107
108 // to distinguish the enableDataRequest calls only for samples 
109 // from those for both samples and traces 
110 // 0 is reserved for non-trace use
111 u_int performanceStream::next_id = 1;
112
113 resource *resource::rootResource = new resource();
114 timeStamp metricInstance::curr_bucket_width;
115 timeStamp metricInstance::global_bucket_width;
116 phaseHandle metricInstance::curr_phase_id;
117 u_int metricInstance::num_curr_hists = 0;
118 u_int metricInstance::num_global_hists = 0;
119
120 double paradynDaemon::earliestFirstTime = 0;
121 void newSampleRate(float rate);
122
123 extern void histDataCallBack(sampleValue*, timeStamp, int, int, void*, bool);
124 extern void histFoldCallBack(timeStamp, void*, bool);
125
126 //
127 // IO from application processes.
128 //
129 void dynRPCUser::applicationIO(int,int,string data)
130 {
131
132     // NOTE: this fixes a purify error with the commented out code (a memory
133     // segment error occurs occasionally with the line "cout << rest << endl") 
134     // this is problably not the best fix,  but I can't figure out why 
135     // the error is occuring (rest is always '\0' terminated when this
136     // error occurs)---tn 
137     fprintf(stdout,data.string_of());
138     fflush(stdout);
139
140 #ifdef n_def
141     char *ptr;
142     char *rest;
143     // extra should really be per process.
144     static string extra;
145
146     rest = P_strdup(data.string_of());
147
148     char *tp = rest;
149     ptr = P_strchr(rest, '\n');
150     while (ptr) {
151         *ptr = '\0';
152         if (pid) {
153             printf("pid %d:", pid);
154         } else {
155             printf("paradynd: ");
156         }
157         if (extra.length()) {
158             cout << extra;
159             extra = (char*) NULL;
160         }
161         cout << rest << endl;
162         rest = ptr+1;
163         if(rest)
164             ptr = P_strchr(rest, '\n');
165         else
166             ptr = 0;
167     }
168     extra += rest;
169     delete tp;
170     rest = 0;
171 #endif
172
173 }
174
175 extern status_line *DMstatus;
176
177 void dynRPCUser::resourceBatchMode(bool onNow)
178 {
179    printf("error calling virtual func: dynRPCUser::resourceBatchMode\n");
180 }
181
182 //
183 // upcalls from remote process.
184 //
185 void dynRPCUser::resourceInfoCallback(u_int , vector<string> ,
186                                       string , u_int) {
187
188 printf("error calling virtual func: dynRPCUser::resourceInfoCallback\n");
189
190 }
191
192 void dynRPCUser::severalResourceInfoCallback(vector<T_dyninstRPC::resourceInfoCallbackStruct>) {
193 printf("error calling virtual func: dynRPCUser::severalResourceInfoCallback\n");
194 }
195
196 void dynRPCUser::memoryInfoCallback(int,
197                                     string vname,
198                                     int va,
199                                     u_int mem_size,
200                                     u_int blk_size)
201 {//TO DO
202    string       abstr = "BASE";
203    u_int        type = MDL_T_INT;
204    int          end =  va + mem_size ;
205    int          start = va ;
206    vector<resourceHandle> handles, whereHandles ;
207    resourceHandle         p_handle, r_handle ;
208    int                    num_blks = 0 ;
209
210    printf("Paradyn received: (var:%s, va:%d, mem_size:%d, blk_size:%d)\n",
211               vname.string_of(), va, mem_size, blk_size) ;
212
213    vector<string> res_name;
214    res_name += "Memory" ; res_name += vname ;
215    bool exist = false ;
216    r_handle = createResource_ncb(res_name, abstr, MDL_T_VARIABLE, p_handle, exist);
217    handles += r_handle ;
218
219    /* inform others about it if they need to know */
220    if(!exist)
221    {
222     char temp[255] ;
223     sprintf(temp, "Memory/%s", vname.string_of()) ;
224     const char *name = strdup(temp) ;
225     const char *abs  = strdup(abstr.string_of()) ;
226     dictionary_hash_iter<perfStreamHandle,performanceStream*>
227                         allS(performanceStream::allStreams);
228     perfStreamHandle h;
229     performanceStream *ps;
230     while(allS.next(h,ps)){
231         ps->callResourceFunc(p_handle,r_handle,name,abs);
232     }
233    }
234
235    while (va < end)
236    {
237         char temp[255] ;
238         vector<string> res_name;
239         res_name += "Memory" ; res_name += vname ;
240
241         sprintf(temp, "%d", (int) va) ;
242         res_name += temp ;
243         exist = true; /* we do not want to search to duplication */
244         r_handle = createResource_ncb(res_name, abstr, type, p_handle, exist);
245         handles += r_handle ;
246         whereHandles += r_handle ;
247
248         va += blk_size ;
249         num_blks ++ ;
250    }
251    /* inform the daemon of the things it needs to know */
252    /* should send this to all daemons, not just one  */
253    {
254     for(u_int j=0; j < paradynDaemon::allDaemons.size(); j++){
255                 paradynDaemon *pd = paradynDaemon::allDaemons[j];
256                 pd->memoryInfoResponse(vname, start, mem_size, blk_size, handles
257 ) ;
258     }
259    }
260
261    /* inform others about it if they need to know */
262    {
263     dictionary_hash_iter<perfStreamHandle,performanceStream*>
264                         allS(performanceStream::allStreams);
265     perfStreamHandle h;
266     performanceStream *ps;
267     while(allS.next(h,ps)){
268         ps->callMemoryFunc(vname, start, mem_size, blk_size, p_handle, whereHandles);
269     }
270    }
271 }
272
273
274 void dynRPCUser::mappingInfoCallback(int,
275                                      string abstraction, 
276                                      string type, 
277                                      string key,
278                                      string value)
279 {
280   AMnewMapping(abstraction.string_of(),type.string_of(),key.string_of(),
281                value.string_of());    
282 }
283
284 class uniqueName {
285   public:
286     uniqueName(stringHandle base) { name = base; nextId = 0; }
287     int nextId;
288     stringHandle name;
289 };
290
291 //
292 // handles a completed enable response: updates metricInstance state
293 // and send the calling thread the response 
294 //
295 void DMenableResponse(DM_enableType &enable,vector<bool> &successful){
296     
297
298     vector<metricInstance *> &mis = (*enable.request);
299     assert(successful.size() == mis.size());
300     vector<metricInstInfo> *response = new vector<metricInstInfo>(mis.size()); 
301
302     // update MI state and response vector
303     for(u_int i=0; i < mis.size(); i++){
304         if(mis[i] && successful[i]){  // this MI could be enabled
305           mis[i]->setEnabled();
306           metric *metricptr = metric::getMetric(mis[i]->getMetricHandle());
307           if(metricptr){
308
309             if(enable.ph_type == CurrentPhase){
310                 u_int old_current = mis[i]->currUsersCount();
311                 bool  current_data = mis[i]->isCurrHistogram();
312                 mis[i]->newCurrDataCollection(metricptr->getStyle(),
313                                               histDataCallBack,
314                                               histFoldCallBack);
315                 mis[i]->newGlobalDataCollection(metricptr->getStyle(),
316                                                 histDataCallBack,
317                                                 histFoldCallBack);
318                 mis[i]->addCurrentUser(enable.ps_handle);
319
320                 // trace data streams
321                 mis[i]->newTraceDataCollection(traceDataCallBack);
322                 mis[i]->addTraceUser(enable.pt_handle);
323
324                 // set sample rate to match current phase hist. bucket width
325                 if(!metricInstance::numCurrHists()){
326                     float rate = phaseInfo::GetLastBucketWidth();
327                     newSampleRate(rate);
328                 }
329                 // new active curr. histogram added if there are no previous
330                 // curr. subscribers and either persistent_collection is clear
331                 // or there was no curr. histogram prior to this
332                 if((!old_current)
333                     && (mis[i]->currUsersCount() == 1) && 
334                     (!(mis[i]->isCollectionPersistent()) || (!current_data))){
335                     metricInstance::incrNumCurrHists();
336                 }
337                 // new global histogram if this metricInstance was just enabled
338                 if(!((*enable.enabled)[i])){
339                     metricInstance::incrNumGlobalHists();
340                 }
341             }
342             else {  // this is a global phase enable
343                 mis[i]->newGlobalDataCollection(metricptr->getStyle(),
344                                                 histDataCallBack,
345                                                 histFoldCallBack);
346                 mis[i]->addGlobalUser(enable.ps_handle);
347
348                 // trace data streams
349                 mis[i]->newTraceDataCollection(traceDataCallBack);
350                 mis[i]->addTraceUser(enable.pt_handle);
351
352                 // if this is first global histogram enabled and there are no
353                 // curr hists, then set sample rate to global bucket width
354                 if(!metricInstance::numCurrHists()){
355                     if(!metricInstance::numGlobalHists()){
356                         float rate = Histogram::getGlobalBucketWidth();
357                         newSampleRate(rate);
358                 }}
359                 // new global hist added: update count
360                 if(!((*enable.enabled)[i])){
361                     metricInstance::incrNumGlobalHists();
362                 }
363             }
364             // update response vector
365             (*response)[i].successfully_enabled = true;
366             (*response)[i].mi_id = mis[i]->getHandle(); 
367             (*response)[i].m_id = mis[i]->getMetricHandle();
368             (*response)[i].r_id = mis[i]->getFocusHandle();
369             (*response)[i].metric_name = mis[i]->getMetricName();
370             (*response)[i].focus_name = mis[i]->getFocusName();
371             (*response)[i].metric_units = metricptr->getUnits();
372             (*response)[i].units_type = metricptr->getUnitsType();
373
374             // update the persistence flags: the OR of new & previous values
375             if(enable.persistent_data){
376                 mis[i]->setPersistentData();
377             }
378             if(enable.persistent_collection){
379                 mis[i]->setPersistentCollection();
380             }
381             if(enable.phase_persistent_data){
382                 mis[i]->setPhasePersistentData();
383             }
384           }
385           else {
386               cout << "mis enabled but no metric handle: " 
387                    << mis[i]->getMetricHandle() << endl;
388               assert(0);
389           }
390         }
391         else {  // was not successfully enabled
392             (*response)[i].successfully_enabled = false;
393             (*response)[i].mi_id = mis[i]->getHandle(); 
394             (*response)[i].m_id = mis[i]->getMetricHandle();
395             (*response)[i].r_id = mis[i]->getFocusHandle();
396             (*response)[i].metric_name = mis[i]->getMetricName();
397             (*response)[i].focus_name = mis[i]->getFocusName();
398         }
399
400 //        if(mis[i]) {
401 //          (*response)[i].mi_id = mis[i]->getHandle(); 
402 //          (*response)[i].m_id = mis[i]->getMetricHandle();
403 //          (*response)[i].r_id = mis[i]->getFocusHandle();
404 //          (*response)[i].metric_name = mis[i]->getMetricName();
405 //          (*response)[i].focus_name = mis[i]->getFocusName();
406 //        }
407     }
408
409     // make response call
410     dictionary_hash_iter<perfStreamHandle,performanceStream*>
411             allS(performanceStream::allStreams);
412     perfStreamHandle h; performanceStream *ps;
413     while(allS.next(h,ps)){
414         if(h == (perfStreamHandle)(enable.ps_handle)){
415             ps->callDataEnableFunc(response,enable.client_id);
416             return;
417     } }
418     // trace data streams
419     allS.reset();
420     while(allS.next(h,ps)){
421         if(h == (perfStreamHandle)(enable.pt_handle)){
422             ps->callDataEnableFunc(response,enable.client_id);
423             return;
424     } }
425     response = 0;
426 }
427
428 //
429 // handle an enable response from a daemon. If all daemons have responded
430 // then make response callback to calling thread, and check the outstanding
431 // enables list to see if this enable response satisfies any waiting requests.
432 // and enable for an MI is successful if its done entry is true and if its
433 // MI* is not 0
434 //
435 void dynRPCUser::enableDataCallback(u_int daemon_id, 
436                                     vector<int> return_id,
437                                     vector<u_int> mi_ids,
438                                     u_int request_id)
439 {
440     // find element in outstanding_enables corr. to request_id
441     u_int which =0;
442     DM_enableType *request_entry = 0;
443     for(u_int i=0; i < paradynDaemon::outstanding_enables.size(); i++){
444         if((paradynDaemon::outstanding_enables[i])->request_id == request_id){
445             which = i;
446             request_entry = paradynDaemon::outstanding_enables[i];
447             break;
448     } }
449
450     if(!request_entry){
451         // a request entry can be removed if a new phase event occurs
452         // between the enable request and response, so ignore the response
453         return;
454     }
455     assert(daemon_id < paradynDaemon::allDaemons.size());
456     paradynDaemon *pd = paradynDaemon::allDaemons[daemon_id];
457
458     // for each mi in request update mi's components with new daemon if
459     // it was successfully enabled
460     assert(mi_ids.size() == return_id.size());
461     for(u_int j=0; j< return_id.size(); j++){
462         if(return_id[j] != -1){
463             metricInstanceHandle mh =  mi_ids[j];     
464             metricInstance *mi = request_entry->findMI(mh);
465             assert(mi);
466             component *comp = new component(pd,return_id[j], mi);  
467             bool aflag;
468             aflag=(mi->addComponent(comp));
469             assert(aflag);
470             // if at least one daemon could enable, update done and enabled
471             request_entry->setDone(mh);
472     } }
473
474     // update count of outstanding daemon responses
475     assert(request_entry->how_many);
476     request_entry->how_many--;
477
478     // all daemons have responded to enable request, send result to caller 
479     if(!request_entry->how_many) { 
480         vector<bool> successful( request_entry->request->size());
481         for(u_int k=0; k < request_entry->request->size(); k++){
482             // if MI is 0 or if done is false
483             if(!((*(request_entry->done))[k]) 
484                 || !((*(request_entry->request))[k])){
485                 successful[k] = false;
486             }
487             else {
488                 successful[k] = true;
489             }
490         }
491         // if all daemons have responded update state for request and send
492         // result to caller
493         // a successful enable has both the enabled flag set and an mi*
494
495         // clear currentlyEnabling flag and decrement the count of 
496         // waiting enables for all MI's
497         for(u_int i1=0; i1 < request_entry->done->size(); i1++){
498             if((*request_entry->request)[i1]){
499                 ((*request_entry->request)[i1])->clearCurrentlyEnabling();
500                 if(request_entry->ph_type == CurrentPhase){
501                     ((*request_entry->request)[i1])->decrCurrWaiting();
502                 }
503                 else{
504                     ((*request_entry->request)[i1])->decrGlobalWaiting();
505                 }
506         } }
507
508         // update MI state for this entry and send response to caller
509         DMenableResponse(*request_entry,successful);
510         
511
512         // remove this entry from the outstanding enables list 
513         u_int size = paradynDaemon::outstanding_enables.size();
514         paradynDaemon::outstanding_enables[which] =
515                          paradynDaemon::outstanding_enables[size-1];
516                 paradynDaemon::outstanding_enables.resize(size-1);
517
518         // for each element on outstanding_enables, check to see if there are
519         // any outstatnding_enables that can be satisfied by this request
520         // if so, update state, and for any outstanding_enables that are 
521         // complete, send the result to the client thread  
522         // update not_all_done 
523         for(u_int i2=0; i2 < paradynDaemon::outstanding_enables.size(); i2++){
524             DM_enableType *next_entry = paradynDaemon::outstanding_enables[i2];
525             next_entry->updateAny(*(request_entry->request),successful);
526         }
527         delete request_entry;
528         request_entry = 0;
529
530         if(paradynDaemon::outstanding_enables.size()){
531           bool done = false;
532           u_int i3 = 0;
533           while(!done){
534               if((paradynDaemon::outstanding_enables[i3])->not_all_done){
535                   i3++;
536               }
537               else {  // this entry's request is complete
538                    // update MI state for this entry and send response to caller
539                    DM_enableType *temp = paradynDaemon::outstanding_enables[i3];
540                    successful.resize(temp->request->size());
541                    for(u_int k2=0; k2 < successful.size(); k2++){
542                        if(!((*(temp->done))[k2])) successful[k2] = false;
543                        else successful[k2] = true;
544                    }
545                    // decrement the number of waiting for enables for 
546                    // each MI in this response
547                    for(u_int k3=0; k3 < temp->request->size(); k3++){
548                        if((*temp->request)[k3]){
549                            if(temp->ph_type == CurrentPhase){
550                                ((*temp->request)[k3])->decrCurrWaiting();
551                            }
552                            else{
553                               ((*temp->request)[k3])->decrGlobalWaiting();
554                            }
555                    } }
556
557                    DMenableResponse(*temp,successful);
558
559                    // remove entry from outstanding_enables list
560                    u_int newsize=paradynDaemon::outstanding_enables.size()-1;
561                    paradynDaemon::outstanding_enables[i3] =
562                          paradynDaemon::outstanding_enables[newsize];
563                    paradynDaemon::outstanding_enables.resize(newsize);
564                    delete temp;
565               }
566               if(i3 >= paradynDaemon::outstanding_enables.size()) done = true;
567           }
568         }
569     }
570 }
571
572 //
573 // Upcall from daemon in response to getPredictedDataCost call
574 // id - perfStreamHandle assoc. with the call
575 // req_id - an identifier assoc. with the request 
576 // val - the cost of enabling the metric/focus pair
577 //
578 void dynRPCUser::getPredictedDataCostCallback(u_int id,
579                                               u_int req_id,
580                                               float val,
581                                               u_int clientID)
582 {
583     // find the assoc. perfStream and update it's pred data cost value
584     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
585                 allS(performanceStream::allStreams);
586     perfStreamHandle h; performanceStream *ps;
587     while(allS.next(h,ps)){
588         if(h == (perfStreamHandle)id){
589             ps->predictedDataCostCallback(req_id,val,clientID);
590             return;
591     } }
592     // TODO: call correct routine
593     assert(0);
594 }
595
596 //
597 // Display errors using showError function from the UIM class
598 // This function allows to display error messages from paradynd
599 // using the "upcall" or "call back" mechanism.
600 // Parameters:  errCode = Error code
601 //              errString = Error message
602 //              hostName = Host name where the error occur
603 // Call: there is a macro defined in "showerror.h". This macro must be
604 //       used when calling this function. A typical call is:
605 //       showErrorCallback(99, "Erro message test"). This macro will
606 //       automatically insert the additional host info required.
607 //
608 void dynRPCUser::showErrorCallback(int errCode, 
609                                    string errString,
610                                    string hostName)
611 {
612     string msg;
613
614     if (errString.length() > 0) {
615         if (hostName.length() > 0) {
616             msg = string("<Msg from daemon on host ") + hostName + 
617                   string("> ") + errString;
618         }
619         else { 
620             msg = string("<Msg from daemon on host ?> ") + errString; 
621         }
622         uiMgr->showError(errCode, P_strdup(msg.string_of()));
623     }
624     else {
625         uiMgr->showError(errCode, ""); 
626     }
627
628     //
629     // hostName.length() should always be > 0, otherwise
630     // hostName is not defined (i.e. "?" will be used instead).
631     // if errString.length()==0, (i.e. errString.string_of()==""),
632     // then we will use the default error message in errorList.tcl
633     // This message, however, will not include any info about the current
634     // host name.
635     //
636 }
637
638 //
639 // Paradynd calls this igen fn when it starts a new process (more
640 // specifically, after it starts the new process and the new process
641 // has completed running DYNINSTinit).
642 //
643 void dynRPCUser::newProgramCallbackFunc(int pid,
644                                         vector<string> argvString,
645                                         string machine_name,
646                                         bool calledFromExec,
647                                         bool runMe)
648 {
649     // there better be a paradynd running on this machine!
650
651     for (unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++) {
652         paradynDaemon *pd = paradynDaemon::allDaemons[i];
653         if (pd->machine.length() && (pd->machine == machine_name)){
654             if (!paradynDaemon::addRunningProgram(pid, argvString, pd, calledFromExec,
655                                                   runMe))
656                assert(false);
657
658             uiMgr->enablePauseOrRun();
659
660             return;
661         }
662     }
663
664     // for now, abort if there is no paradynd, this should not happen
665     printf("process started on %s, can't find paradynd there\n",
666            machine_name.string_of());
667     printf("paradyn error #1 encountered\n");
668     exit(-1);
669 }
670
671 void dynRPCUser::newMetricCallback(T_dyninstRPC::metricInfo info)
672 {
673     addMetric(info);
674 }
675
676 void dynRPCUser::firstSampleCallback (int,double) {
677
678   assert(0 && "Invalid virtual function");
679 }
680
681 void dynRPCUser::cpDataCallbackFunc(int,double,int,double,double)
682 {
683     assert(0 && "Invalid virtual function");
684 }
685
686 // batch the sample delivery
687 void dynRPCUser::batchSampleDataCallbackFunc(int,
688                     vector<T_dyninstRPC::batch_buffer_entry>)
689 {
690     assert(0 && "Invalid virtual function");
691 }
692
693 // batch the trace delivery
694 void dynRPCUser::batchTraceDataCallbackFunc(int,
695                     vector<T_dyninstRPC::trace_batch_buffer_entry>)
696 {
697     assert(0 && "Invalid virtual function");
698 }
699
700 //
701 // When a paradynd is started remotely, ie not by paradyn, this upcall
702 // reports the information for that paradynd to paradyn
703 //
704 void 
705 dynRPCUser::reportSelf (string , string , int , string)
706 {
707   assert(0);
708   return;
709 }
710
711 void 
712 dynRPCUser::reportStatus (string)
713 {
714     assert(0 && "Invalid virtual function");
715 }
716
717 void
718 dynRPCUser::processStatus(int, u_int)
719 {
720     assert(0 && "Invalid virtual function");
721 }
722
723 void
724 dynRPCUser::endOfDataCollection(int)
725 {
726   assert(0 && "Invalid virtual function");
727 }
728
729
730 // 
731 // establish socket that will be advertised to paradynd's
732 // this socket will allow paradynd's to connect to paradyn for pvm
733 //
734 static void
735 DMsetupSocket (int &sockfd)
736 {
737   // setup "well known" socket for pvm paradynd's to connect to
738   bool aflag;
739   aflag = ((dataManager::dm->socket =
740            RPC_setup_socket (sockfd, AF_INET, SOCK_STREAM)) >= 0);
741   assert(aflag);
742
743   // bind fd for this thread
744   msg_bind (sockfd, true);
745 }
746
747
748 void dataManager::printDaemonStartInfo() {
749   string msg = string("To start a paradyn daemon on a remote machine, logon to that machine and run paradynd with the following arguments:\n\n")
750     + string(" paradynd -p") + string(dataManager::dm->socket) + string(" -m")
751     + getHostName()
752     + string(" -l2 -v1 -z<flavor>")
753     + string("\n\nwhere flavor is the one of: unix, pvm, winnt, mpi\n")
754     + string("\nNote: paradyn daemons are usually started automatically.\nManual start-up is needed only when an rshd or rexecd is not available on the remote machine.\n");
755
756   static char buf[1000];
757   sprintf(buf, "%s", msg.string_of());
758   uiMgr->showError(99, buf);
759   //fprintf(stderr, msg.string_of());
760 }
761
762 static void
763 DMnewParadynd ()
764 {
765   // accept the connection
766   int new_fd = RPC_getConnect(dataManager::dm->sock_fd);
767   if (new_fd < 0)
768     uiMgr->showError(4, "");
769
770   // add new daemon to dictionary of all deamons
771   paradynDaemon::addDaemon(new_fd); 
772 }
773
774 bool dataManager::DM_sequential_init(const char* met_file){
775    string mfile = met_file;
776    return(metMain(mfile)); 
777 }
778
779 int dataManager::DM_post_thread_create_init(int tid) {
780
781
782     thr_name("Data Manager");
783     dataManager::dm = new dataManager(tid);
784
785     // supports argv passed to paradynDaemon
786     // new paradynd's may try to connect to well known port
787     DMsetupSocket (dataManager::dm->sock_fd);
788
789     bool aflag;
790     aflag=(RPC_make_arg_list(paradynDaemon::args,
791                              dataManager::dm->socket, 1, 1, "", false));
792     assert(aflag);
793
794     // start initial phase
795     string dm_phase0 = "phase_0";
796     phaseInfo::startPhase(0.0,dm_phase0,false,false);
797
798     char DMbuff[64];
799     unsigned int msgSize = 64;
800     msg_send (MAINtid, MSG_TAG_DM_READY, (char *) NULL, 0);
801     unsigned int tag = MSG_TAG_ALL_CHILDREN_READY;
802     msg_recv (&tag, DMbuff, &msgSize);
803     return 1;
804 }
805
806 //
807 // Main loop for the dataManager thread.
808 //
809 void *DMmain(void* varg)
810 {
811     unsigned fd_first = 0;
812     // We declare the "printChangeCollection" tunable constant here; it will
813     // last for the lifetime of this function, which is pretty much forever.
814     // (used to be declared as global in DMappContext.C.  Globally declared
815     //  tunables are now a no-no).  Note that the variable name (printCC) is
816     // unimportant.   -AT
817     tunableBooleanConstantDeclarator printCC("printChangeCollection", 
818               "Print the name of metric/focus when enabled or disabled",
819               false, // initial value
820               NULL, // callback
821               developerConstant);
822
823     // Now the same for "printSampleArrival"
824     extern bool our_print_sample_arrival;
825     our_print_sample_arrival = false;
826     extern void printSampleArrivalCallback(bool);
827     tunableBooleanConstantDeclarator printSA("printSampleArrival", 
828               "Print out status lines to show the arrival of samples",
829               our_print_sample_arrival, // init val
830               printSampleArrivalCallback,
831               developerConstant);
832
833     int tid; memcpy((void*)&tid,varg, sizeof(int));
834     dataManager::DM_post_thread_create_init(tid);
835
836     int ret;
837     unsigned int tag;
838     paradynDaemon *pd = NULL;
839     while (1) {
840         for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
841             pd = paradynDaemon::allDaemons[i]; 
842             // handle up to max async requests that may have been buffered
843             // while blocking on a sync request
844             while (pd->buffered_requests()){
845                 if(pd->process_buffered() == T_dyninstRPC::error) {
846                     cout << "error on paradyn daemon\n";
847                     paradynDaemon::removeDaemon(pd, true);
848         } } }
849
850         tag = MSG_TAG_ANY;
851         // ret = msg_poll(&tag, true);
852         ret = msg_poll_preference(&tag, true,fd_first);
853         fd_first = !fd_first;
854         assert(ret != THR_ERR);
855
856         if (tag == MSG_TAG_FILE) {
857             // must be an upcall on something speaking the dynRPC protocol.
858             if (ret == dataManager::dm->sock_fd){
859                 DMnewParadynd(); // set up a new daemon
860             }
861             else {
862                 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
863                     pd = paradynDaemon::allDaemons[i]; 
864                     if(pd->get_fd() == ret){
865                         if(pd->waitLoop() == T_dyninstRPC::error) {
866                             cout << "error on paradyn daemon\n";
867                             paradynDaemon::removeDaemon(pd, true);
868                     }}
869
870                     // handle async requests that may have been buffered
871                     // while blocking on a sync request
872                     while(pd->buffered_requests()){
873                         if(pd->process_buffered() == T_dyninstRPC::error) {
874                             cout << "error on paradyn daemon\n";
875                             paradynDaemon::removeDaemon(pd, true);
876                     }}
877                 }
878             }
879         } else if (dataManager::dm->isValidTag
880                   ((T_dataManager::message_tags)tag)) {
881             if (dataManager::dm->waitLoop(true, 
882                (T_dataManager::message_tags)tag) == T_dataManager::error) {
883                 // handle error
884                 assert(0);
885             }
886         } else {
887             cerr << "Unrecognized message in DMmain.C\n";
888             assert(0);
889         }
890    }
891 }
892
893
894 void addMetric(T_dyninstRPC::metricInfo &info)
895 {
896     // if metric already exists return
897     if(metric::allMetrics.defines(info.name)){
898         return;
899     }
900     metric *met = new metric(info);
901
902     // now tell all perfStreams
903     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
904                 allS(performanceStream::allStreams);
905     perfStreamHandle h;
906     performanceStream *ps;
907     while(allS.next(h,ps)){
908         if(ps->controlFunc.mFunc){
909             // set the correct destination thread.
910             dataManager::dm->setTid(ps->threadId);
911             dataManager::dm->newMetricDefined(ps->controlFunc.mFunc, 
912                                               ps->Handle(),
913                                               met->getName(),
914                                               met->getStyle(),
915                                               met->getAggregate(),
916                                               met->getUnits(),
917                                               met->getHandle(),
918                                               met->getUnitsType());
919         }
920     }
921 }
922
923
924 // I don't want to parse for '/' more than once, thus the use of a string vector
925 resourceHandle createResource(unsigned res_id, vector<string>& resource_name, string& abstr, unsigned type) {
926
927   static const string slashStr = "/";
928   static const string baseStr = "BASE";
929
930   resource *parent = NULL;
931   unsigned r_size = resource_name.size();
932   string p_name;
933
934
935   switch (r_size) {
936     case 0:
937         // Should this case ever occur ?
938         assert(0); break;
939     case 1:
940         parent = resource::rootResource; break;
941     default:
942         for (unsigned ri=0; ri<(r_size-1); ri++) 
943             p_name += slashStr + resource_name[ri];
944         parent = resource::string_to_resource(p_name);
945         assert(parent);
946         break;
947     }
948     if (!parent) assert(0);
949
950
951     /* first check to see if the resource has already been defined */
952     resource *p = resource::resources[parent->getHandle()];
953     string myName = p_name;
954     myName += slashStr;
955     myName += resource_name[r_size - 1];
956
957     resource *child;
958     if (resource::allResources.find(myName.string_of(), child)) {
959       return child->getHandle();
960     }
961
962     // if abstr is not defined then use default abstraction 
963     if(!abstr.string_of()){
964         abstr = baseStr;
965     }
966
967     /* then create it */
968     resource *ret =  new resource(parent->getHandle(),res_id, resource_name,
969                                   myName,abstr, type);
970
971     // check to see if the suppressMagnify option should be set...if
972     // this resource is specifed in the mdl exclude_lib option
973     vector<string> shared_lib_constraints;
974     if(resource::get_lib_constraints(shared_lib_constraints) &&
975        (string(parent->getFullName()) == "/Code")) {
976             for(u_int i=0; i < shared_lib_constraints.size(); i++){
977                 if(shared_lib_constraints[i] == ret->getName()){
978                     ret->setSuppressMagnify();
979                 }
980             }
981     }
982
983     // check to see if the suppressMagnify option should be set if
984     // the resource is a function that is specified in the mdl exclude_func
985     // options
986     if(!ret->isMagnifySuppressed()){
987       if(parent != resource::rootResource) {
988           // get parent of parent, if it is "/Code" then check the 
989           // list of exculded_funcs
990           resourceHandle pph = parent->getParent();
991           resource *ppr = resource::handle_to_resource(pph);
992           if( ppr && (string(ppr->getFullName()) == "/Code")) {
993               vector< vector<string> > libs;
994               if(resource::get_func_constraints(libs)) {
995                 for(u_int i=0; i < libs.size(); i++){
996                     if(((libs[i])[0] == parent->getName()) && 
997                        ((libs[i])[1] == ret->getName())) { 
998                        ret->setSuppressMagnify(); 
999                     }
1000                 } 
1001               } 
1002           }
1003       }
1004     }
1005
1006     /* inform others about it if they need to know */
1007     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
1008                         allS(performanceStream::allStreams);
1009     perfStreamHandle h;
1010     performanceStream *ps;
1011     resourceHandle r_handle = ret->getHandle();
1012     string name = ret->getFullName(); 
1013     while(allS.next(h,ps)){
1014         ps->callResourceFunc(parent->getHandle(),r_handle,ret->getFullName(),
1015         ret->getAbstractionName());
1016     }
1017     return(r_handle);
1018 }
1019
1020 resourceHandle createResource_ncb(vector<string>& resource_name, string& abstr, unsigned type,
1021                                   resourceHandle &p_handle, bool &exist
1022                                  ) 
1023 {
1024   resource *parent = NULL;
1025   unsigned r_size = resource_name.size();
1026   string p_name;
1027
1028
1029   switch (r_size) {
1030     case 0:
1031         // Should this case ever occur ?
1032         assert(0); break;
1033     case 1:
1034         parent = resource::rootResource; break;
1035     default:
1036         for (unsigned ri=0; ri<(r_size-1); ri++) 
1037             p_name += string("/") + resource_name[ri];
1038         parent = resource::string_to_resource(p_name);
1039         assert(parent);
1040         break;
1041     }
1042     if (!parent) assert(0);
1043
1044
1045     /* first check to see if the resource has already been defined */
1046     p_handle = parent->getHandle() ;
1047     resource *p = resource::resources[parent->getHandle()];
1048     string myName = p_name;
1049     myName += "/";
1050     myName += resource_name[r_size - 1];
1051     if(!exist) {
1052         resourceHandle *child = p->findChild(myName.string_of());
1053         if (child){
1054                 return(*child); 
1055                 delete child;
1056         }
1057     } else {
1058         exist = false ;
1059     }
1060
1061     // if abstr is not defined then use default abstraction 
1062     if(!abstr.string_of()){
1063         abstr = string("BASE");
1064     }
1065
1066     /* then create it */
1067     resource *ret =  new resource(parent->getHandle(),resource_name,
1068                                   myName,abstr, type);
1069
1070     resourceHandle r_handle = ret->getHandle() ;
1071     return(r_handle);
1072 }
1073
1074 void newSampleRate(float rate)
1075 {
1076     paradynDaemon *pd = NULL;
1077     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
1078         pd = paradynDaemon::allDaemons[i]; 
1079         pd->setSampleRate(rate);
1080     }
1081 }
1082
1083 #ifdef ndef
1084 // Note - the metric parser has been moved into the dataManager
1085 bool parse_metrics(string metric_file) {
1086      bool parseResult = metMain(metric_file);
1087     return parseResult;
1088 }
1089 #endif
1090
1091