Changed way paradyn and paradynd generate id's for resources
[dyninst.git] / paradyn / src / DMthread / DMmain.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 #include <assert.h>
43 extern "C" {
44 double   quiet_nan();
45 #include <malloc.h>
46 #include <stdio.h>
47 }
48
49 #include "thread/h/thread.h"
50 #include "paradyn/src/TCthread/tunableConst.h"
51 #include "dataManager.thread.SRVR.h"
52 #include "dyninstRPC.xdr.CLNT.h"
53 #include "DMdaemon.h"
54 #include "DMmetric.h"
55 #include "DMperfstream.h"
56 #include "DMabstractions.h"
57 #include "paradyn/src/pdMain/paradyn.h"
58 #include "paradyn/src/UIthread/Status.h"
59
60 #include "util/h/Vector.h"
61 #include "util/h/Dictionary.h"
62 #include "util/h/String.h"
63 // trace data streams
64 #include "util/h/ByteArray.h"
65 #include "DMphase.h"
66
67 typedef vector<string> blahType;
68
69 // bool parse_metrics(string metric_file);
70 bool metMain(string &userFile);
71
72 // this has to be declared before baseAbstr, cmfAbstr, and rootResource 
73 int dataManager::sock_fd;  
74 int dataManager::socket;  
75 dataManager *dataManager::dm = NULL;  
76
77 dictionary_hash<string,abstraction*> abstraction::allAbstractions(string::hash);
78 abstraction *baseAbstr = new abstraction("BASE");
79 abstraction *cmfAbstr = new abstraction("CMF");
80
81 dictionary_hash<string,metric*> metric::allMetrics(string::hash);
82 dictionary_hash<metricInstanceHandle,metricInstance *> 
83                 metricInstance::allMetricInstances(metricInstance::mhash);
84 dictionary_hash<perfStreamHandle,performanceStream*>  
85                 performanceStream::allStreams(performanceStream::pshash);
86 dictionary_hash<string, resource*> resource::allResources(string::hash, 8192);
87 dictionary_hash<string,resourceList *> resourceList::allFoci(string::hash);
88
89 dictionary_hash<unsigned, resource*>resource::resources(uiHash);
90 vector<string> resource::lib_constraints;
91 vector<metric*> metric::metrics;
92 vector<paradynDaemon*> paradynDaemon::allDaemons;
93 vector<daemonEntry*> paradynDaemon::allEntries;
94 vector<executable*> paradynDaemon::programs;
95 unsigned paradynDaemon::procRunning;
96 vector<resourceList *> resourceList::foci;
97 vector<phaseInfo *> phaseInfo::dm_phases;
98 u_int metricInstance::next_id = 1;
99 // u_int performanceStream::next_id = 0;
100 vector<DM_enableType*> paradynDaemon::outstanding_enables;  
101 u_int paradynDaemon::next_enable_id = 0;  
102 u_int paradynDaemon::count = 0;
103
104 // to distinguish the enableDataRequest calls only for samples 
105 // from those for both samples and traces 
106 // 0 is reserved for non-trace use
107 u_int performanceStream::next_id = 1;
108
109 resource *resource::rootResource = new resource();
110 timeStamp metricInstance::curr_bucket_width;
111 timeStamp metricInstance::global_bucket_width;
112 phaseHandle metricInstance::curr_phase_id;
113 u_int metricInstance::num_curr_hists = 0;
114 u_int metricInstance::num_global_hists = 0;
115
116 double paradynDaemon::earliestFirstTime = 0;
117 void newSampleRate(float rate);
118
119 extern void histDataCallBack(sampleValue*, timeStamp, int, int, void*, bool);
120 extern void histFoldCallBack(timeStamp, void*, bool);
121
122 //
123 // IO from application processes.
124 //
125 void dynRPCUser::applicationIO(int,int,string data)
126 {
127
128     // NOTE: this fixes a purify error with the commented out code (a memory
129     // segment error occurs occasionally with the line "cout << rest << endl") 
130     // this is problably not the best fix,  but I can't figure out why 
131     // the error is occuring (rest is always '\0' terminated when this
132     // error occurs)---tn 
133     fprintf(stdout,data.string_of());
134     fflush(stdout);
135
136 #ifdef n_def
137     char *ptr;
138     char *rest;
139     // extra should really be per process.
140     static string extra;
141
142     rest = P_strdup(data.string_of());
143
144     char *tp = rest;
145     ptr = P_strchr(rest, '\n');
146     while (ptr) {
147         *ptr = '\0';
148         if (pid) {
149             printf("pid %d:", pid);
150         } else {
151             printf("paradynd: ");
152         }
153         if (extra.length()) {
154             cout << extra;
155             extra = (char*) NULL;
156         }
157         cout << rest << endl;
158         rest = ptr+1;
159         if(rest)
160             ptr = P_strchr(rest, '\n');
161         else
162             ptr = 0;
163     }
164     extra += rest;
165     delete tp;
166     rest = 0;
167 #endif
168
169 }
170
171 extern status_line *DMstatus;
172
173 void dynRPCUser::resourceBatchMode(bool onNow)
174 {
175    printf("error calling virtual func: dynRPCUser::resourceBatchMode\n");
176 }
177
178 //
179 // upcalls from remote process.
180 //
181 void dynRPCUser::resourceInfoCallback(u_int , vector<string> ,
182                                       string , u_int) {
183
184 printf("error calling virtual func: dynRPCUser::resourceInfoCallback\n");
185
186 }
187
188 void dynRPCUser::memoryInfoCallback(int,
189                                     string vname,
190                                     int va,
191                                     u_int mem_size,
192                                     u_int blk_size)
193 {//TO DO
194    string       abstr = "BASE";
195    u_int        type = MDL_T_INT;
196    int          end =  va + mem_size ;
197    int          start = va ;
198    vector<resourceHandle> handles, whereHandles ;
199    resourceHandle         p_handle, r_handle ;
200    int                    num_blks = 0 ;
201
202    printf("Paradyn received: (var:%s, va:%d, mem_size:%d, blk_size:%d)\n",
203               vname.string_of(), va, mem_size, blk_size) ;
204
205    vector<string> res_name;
206    res_name += "Memory" ; res_name += vname ;
207    bool exist = false ;
208    r_handle = createResource_ncb(res_name, abstr, MDL_T_VARIABLE, p_handle, exist);
209    handles += r_handle ;
210
211    /* inform others about it if they need to know */
212    if(!exist)
213    {
214     char temp[255] ;
215     sprintf(temp, "Memory/%s", vname.string_of()) ;
216     const char *name = strdup(temp) ;
217     const char *abs  = strdup(abstr.string_of()) ;
218     dictionary_hash_iter<perfStreamHandle,performanceStream*>
219                         allS(performanceStream::allStreams);
220     perfStreamHandle h;
221     performanceStream *ps;
222     while(allS.next(h,ps)){
223         ps->callResourceFunc(p_handle,r_handle,name,abs);
224     }
225    }
226
227    while (va < end)
228    {
229         char temp[255] ;
230         vector<string> res_name;
231         res_name += "Memory" ; res_name += vname ;
232
233         sprintf(temp, "%d", (int) va) ;
234         res_name += temp ;
235         exist = true; /* we do not want to search to duplication */
236         r_handle = createResource_ncb(res_name, abstr, type, p_handle, exist);
237         handles += r_handle ;
238         whereHandles += r_handle ;
239
240         va += blk_size ;
241         num_blks ++ ;
242    }
243    /* inform the daemon of the things it needs to know */
244    /* should send this to all daemons, not just one  */
245    {
246     for(u_int j=0; j < paradynDaemon::allDaemons.size(); j++){
247                 paradynDaemon *pd = paradynDaemon::allDaemons[j];
248                 pd->memoryInfoResponse(vname, start, mem_size, blk_size, handles
249 ) ;
250     }
251    }
252
253    /* inform others about it if they need to know */
254    {
255     dictionary_hash_iter<perfStreamHandle,performanceStream*>
256                         allS(performanceStream::allStreams);
257     perfStreamHandle h;
258     performanceStream *ps;
259     while(allS.next(h,ps)){
260         ps->callMemoryFunc(vname, start, mem_size, blk_size, p_handle, whereHandles);
261     }
262    }
263 }
264
265
266 void dynRPCUser::mappingInfoCallback(int,
267                                      string abstraction, 
268                                      string type, 
269                                      string key,
270                                      string value)
271 {
272   AMnewMapping(abstraction.string_of(),type.string_of(),key.string_of(),
273                value.string_of());    
274 }
275
276 class uniqueName {
277   public:
278     uniqueName(stringHandle base) { name = base; nextId = 0; }
279     int nextId;
280     stringHandle name;
281 };
282
283 //
284 // handles a completed enable response: updates metricInstance state
285 // and send the calling thread the response 
286 //
287 void DMenableResponse(DM_enableType &enable,vector<bool> &successful){
288     
289
290     vector<metricInstance *> &mis = (*enable.request);
291     assert(successful.size() == mis.size());
292     vector<metricInstInfo> *response = new vector<metricInstInfo>(mis.size()); 
293
294     // update MI state and response vector
295     for(u_int i=0; i < mis.size(); i++){
296         if(mis[i] && successful[i]){  // this MI could be enabled
297           mis[i]->setEnabled();
298           metric *metricptr = metric::getMetric(mis[i]->getMetricHandle());
299           if(metricptr){
300
301             if(enable.ph_type == CurrentPhase){
302                 u_int old_current = mis[i]->currUsersCount();
303                 bool  current_data = mis[i]->isCurrHistogram();
304                 mis[i]->newCurrDataCollection(metricptr->getStyle(),
305                                               histDataCallBack,
306                                               histFoldCallBack);
307                 mis[i]->newGlobalDataCollection(metricptr->getStyle(),
308                                                 histDataCallBack,
309                                                 histFoldCallBack);
310                 mis[i]->addCurrentUser(enable.ps_handle);
311
312                 // trace data streams
313                 mis[i]->newTraceDataCollection(traceDataCallBack);
314                 mis[i]->addTraceUser(enable.pt_handle);
315
316                 // set sample rate to match current phase hist. bucket width
317                 if(!metricInstance::numCurrHists()){
318                     float rate = phaseInfo::GetLastBucketWidth();
319                     newSampleRate(rate);
320                 }
321                 // new active curr. histogram added if there are no previous
322                 // curr. subscribers and either persistent_collection is clear
323                 // or there was no curr. histogram prior to this
324                 if((!old_current)
325                     && (mis[i]->currUsersCount() == 1) && 
326                     (!(mis[i]->isCollectionPersistent()) || (!current_data))){
327                     metricInstance::incrNumCurrHists();
328                 }
329                 // new global histogram if this metricInstance was just enabled
330                 if(!((*enable.enabled)[i])){
331                     metricInstance::incrNumGlobalHists();
332                 }
333             }
334             else {  // this is a global phase enable
335                 mis[i]->newGlobalDataCollection(metricptr->getStyle(),
336                                                 histDataCallBack,
337                                                 histFoldCallBack);
338                 mis[i]->addGlobalUser(enable.ps_handle);
339
340                 // trace data streams
341                 mis[i]->newTraceDataCollection(traceDataCallBack);
342                 mis[i]->addTraceUser(enable.pt_handle);
343
344                 // if this is first global histogram enabled and there are no
345                 // curr hists, then set sample rate to global bucket width
346                 if(!metricInstance::numCurrHists()){
347                     if(!metricInstance::numGlobalHists()){
348                         float rate = Histogram::getGlobalBucketWidth();
349                         newSampleRate(rate);
350                 }}
351                 // new global hist added: update count
352                 if(!((*enable.enabled)[i])){
353                     metricInstance::incrNumGlobalHists();
354                 }
355             }
356             // update response vector
357             (*response)[i].successfully_enabled = true;
358             (*response)[i].mi_id = mis[i]->getHandle(); 
359             (*response)[i].m_id = mis[i]->getMetricHandle();
360             (*response)[i].r_id = mis[i]->getFocusHandle();
361             (*response)[i].metric_name = mis[i]->getMetricName();
362             (*response)[i].focus_name = mis[i]->getFocusName();
363             (*response)[i].metric_units = metricptr->getUnits();
364             (*response)[i].units_type = metricptr->getUnitsType();
365
366             // update the persistence flags: the OR of new & previous values
367             if(enable.persistent_data){
368                 mis[i]->setPersistentData();
369             }
370             if(enable.persistent_collection){
371                 mis[i]->setPersistentCollection();
372             }
373             if(enable.phase_persistent_data){
374                 mis[i]->setPhasePersistentData();
375             }
376           }
377           else {
378               cout << "mis enabled but no metric handle: " 
379                    << mis[i]->getMetricHandle() << endl;
380               assert(0);
381           }
382         }
383         else {  // was not successfully enabled
384             (*response)[i].successfully_enabled = false;
385             (*response)[i].mi_id = mis[i]->getHandle(); 
386             (*response)[i].m_id = mis[i]->getMetricHandle();
387             (*response)[i].r_id = mis[i]->getFocusHandle();
388             (*response)[i].metric_name = mis[i]->getMetricName();
389             (*response)[i].focus_name = mis[i]->getFocusName();
390         }
391
392 //        if(mis[i]) {
393 //          (*response)[i].mi_id = mis[i]->getHandle(); 
394 //          (*response)[i].m_id = mis[i]->getMetricHandle();
395 //          (*response)[i].r_id = mis[i]->getFocusHandle();
396 //          (*response)[i].metric_name = mis[i]->getMetricName();
397 //          (*response)[i].focus_name = mis[i]->getFocusName();
398 //        }
399     }
400
401     // make response call
402     dictionary_hash_iter<perfStreamHandle,performanceStream*>
403             allS(performanceStream::allStreams);
404     perfStreamHandle h; performanceStream *ps;
405     while(allS.next(h,ps)){
406         if(h == (perfStreamHandle)(enable.ps_handle)){
407             ps->callDataEnableFunc(response,enable.client_id);
408             return;
409     } }
410     // trace data streams
411     allS.reset();
412     while(allS.next(h,ps)){
413         if(h == (perfStreamHandle)(enable.pt_handle)){
414             ps->callDataEnableFunc(response,enable.client_id);
415             return;
416     } }
417     response = 0;
418 }
419
420 //
421 // handle an enable response from a daemon. If all daemons have responded
422 // then make response callback to calling thread, and check the outstanding
423 // enables list to see if this enable response satisfies any waiting requests.
424 // and enable for an MI is successful if its done entry is true and if its
425 // MI* is not 0
426 //
427 void dynRPCUser::enableDataCallback(u_int daemon_id, 
428                                     vector<int> return_id,
429                                     vector<u_int> mi_ids,
430                                     u_int request_id)
431 {
432     // find element in outstanding_enables corr. to request_id
433     u_int which =0;
434     DM_enableType *request_entry = 0;
435     for(u_int i=0; i < paradynDaemon::outstanding_enables.size(); i++){
436         if((paradynDaemon::outstanding_enables[i])->request_id == request_id){
437             which = i;
438             request_entry = paradynDaemon::outstanding_enables[i];
439             break;
440     } }
441
442     if(!request_entry){
443         // a request entry can be removed if a new phase event occurs
444         // between the enable request and response, so ignore the response
445         return;
446     }
447     assert(daemon_id < paradynDaemon::allDaemons.size());
448     paradynDaemon *pd = paradynDaemon::allDaemons[daemon_id];
449
450     // for each mi in request update mi's components with new daemon if
451     // it was successfully enabled
452     assert(mi_ids.size() == return_id.size());
453     for(u_int j=0; j< return_id.size(); j++){
454         if(return_id[j] != -1){
455             metricInstanceHandle mh =  mi_ids[j];     
456             metricInstance *mi = request_entry->findMI(mh);
457             assert(mi);
458             component *comp = new component(pd,return_id[j], mi);  
459             bool aflag;
460             aflag=(mi->addComponent(comp));
461             assert(aflag);
462             // if at least one daemon could enable, update done and enabled
463             request_entry->setDone(mh);
464     } }
465
466     // update count of outstanding daemon responses
467     assert(request_entry->how_many);
468     request_entry->how_many--;
469
470     // all daemons have responded to enable request, send result to caller 
471     if(!request_entry->how_many) { 
472         vector<bool> successful( request_entry->request->size());
473         for(u_int k=0; k < request_entry->request->size(); k++){
474             // if MI is 0 or if done is false
475             if(!((*(request_entry->done))[k]) 
476                 || !((*(request_entry->request))[k])){
477                 successful[k] = false;
478             }
479             else {
480                 successful[k] = true;
481             }
482         }
483         // if all daemons have responded update state for request and send
484         // result to caller
485         // a successful enable has both the enabled flag set and an mi*
486
487         // clear currentlyEnabling flag and decrement the count of 
488         // waiting enables for all MI's
489         for(u_int i1=0; i1 < request_entry->done->size(); i1++){
490             if((*request_entry->request)[i1]){
491                 ((*request_entry->request)[i1])->clearCurrentlyEnabling();
492                 if(request_entry->ph_type == CurrentPhase){
493                     ((*request_entry->request)[i1])->decrCurrWaiting();
494                 }
495                 else{
496                     ((*request_entry->request)[i1])->decrGlobalWaiting();
497                 }
498         } }
499
500         // update MI state for this entry and send response to caller
501         DMenableResponse(*request_entry,successful);
502         
503
504         // remove this entry from the outstanding enables list 
505         u_int size = paradynDaemon::outstanding_enables.size();
506         paradynDaemon::outstanding_enables[which] =
507                          paradynDaemon::outstanding_enables[size-1];
508                 paradynDaemon::outstanding_enables.resize(size-1);
509
510         // for each element on outstanding_enables, check to see if there are
511         // any outstatnding_enables that can be satisfied by this request
512         // if so, update state, and for any outstanding_enables that are 
513         // complete, send the result to the client thread  
514         // update not_all_done 
515         for(u_int i2=0; i2 < paradynDaemon::outstanding_enables.size(); i2++){
516             DM_enableType *next_entry = paradynDaemon::outstanding_enables[i2];
517             next_entry->updateAny(*(request_entry->request),successful);
518         }
519         delete request_entry;
520         request_entry = 0;
521
522         if(paradynDaemon::outstanding_enables.size()){
523           bool done = false;
524           u_int i3 = 0;
525           while(!done){
526               if((paradynDaemon::outstanding_enables[i3])->not_all_done){
527                   i3++;
528               }
529               else {  // this entry's request is complete
530                    // update MI state for this entry and send response to caller
531                    DM_enableType *temp = paradynDaemon::outstanding_enables[i3];
532                    successful.resize(temp->request->size());
533                    for(u_int k2=0; k2 < successful.size(); k2++){
534                        if(!((*(temp->done))[k2])) successful[k2] = false;
535                        else successful[k2] = true;
536                    }
537                    // decrement the number of waiting for enables for 
538                    // each MI in this response
539                    for(u_int k3=0; k3 < temp->request->size(); k3++){
540                        if((*temp->request)[k3]){
541                            if(temp->ph_type == CurrentPhase){
542                                ((*temp->request)[k3])->decrCurrWaiting();
543                            }
544                            else{
545                               ((*temp->request)[k3])->decrGlobalWaiting();
546                            }
547                    } }
548
549                    DMenableResponse(*temp,successful);
550
551                    // remove entry from outstanding_enables list
552                    u_int newsize=paradynDaemon::outstanding_enables.size()-1;
553                    paradynDaemon::outstanding_enables[i3] =
554                          paradynDaemon::outstanding_enables[newsize];
555                    paradynDaemon::outstanding_enables.resize(newsize);
556                    delete temp;
557               }
558               if(i3 >= paradynDaemon::outstanding_enables.size()) done = true;
559           }
560         }
561     }
562 }
563
564 //
565 // Upcall from daemon in response to getPredictedDataCost call
566 // id - perfStreamHandle assoc. with the call
567 // req_id - an identifier assoc. with the request 
568 // val - the cost of enabling the metric/focus pair
569 //
570 void dynRPCUser::getPredictedDataCostCallback(u_int id,
571                                               u_int req_id,
572                                               float val,
573                                               u_int clientID)
574 {
575     // find the assoc. perfStream and update it's pred data cost value
576     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
577                 allS(performanceStream::allStreams);
578     perfStreamHandle h; performanceStream *ps;
579     while(allS.next(h,ps)){
580         if(h == (perfStreamHandle)id){
581             ps->predictedDataCostCallback(req_id,val,clientID);
582             return;
583     } }
584     // TODO: call correct routine
585     assert(0);
586 }
587
588 //
589 // Display errors using showError function from the UIM class
590 // This function allows to display error messages from paradynd
591 // using the "upcall" or "call back" mechanism.
592 // Parameters:  errCode = Error code
593 //              errString = Error message
594 //              hostName = Host name where the error occur
595 // Call: there is a macro defined in "showerror.h". This macro must be
596 //       used when calling this function. A typical call is:
597 //       showErrorCallback(99, "Erro message test"). This macro will
598 //       automatically insert the additional host info required.
599 //
600 void dynRPCUser::showErrorCallback(int errCode, 
601                                    string errString,
602                                    string hostName)
603 {
604     string msg;
605
606     if (errString.length() > 0) {
607         if (hostName.length() > 0) {
608             msg = string("<Msg from daemon on host ") + hostName + 
609                   string("> ") + errString;
610         }
611         else { 
612             msg = string("<Msg from daemon on host ?> ") + errString; 
613         }
614         uiMgr->showError(errCode, P_strdup(msg.string_of()));
615     }
616     else {
617         uiMgr->showError(errCode, ""); 
618     }
619
620     //
621     // hostName.length() should always be > 0, otherwise
622     // hostName is not defined (i.e. "?" will be used instead).
623     // if errString.length()==0, (i.e. errString.string_of()==""),
624     // then we will use the default error message in errorList.tcl
625     // This message, however, will not include any info about the current
626     // host name.
627     //
628 }
629
630 //
631 // Paradynd calls this igen fn when it starts a new process (more
632 // specifically, after it starts the new process and the new process
633 // has completed running DYNINSTinit).
634 //
635 void dynRPCUser::newProgramCallbackFunc(int pid,
636                                         vector<string> argvString,
637                                         string machine_name,
638                                         bool calledFromExec,
639                                         bool runMe)
640 {
641     // there better be a paradynd running on this machine!
642
643     for (unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++) {
644         paradynDaemon *pd = paradynDaemon::allDaemons[i];
645         if (pd->machine.length() && (pd->machine == machine_name)){
646             if (!paradynDaemon::addRunningProgram(pid, argvString, pd, calledFromExec,
647                                                   runMe))
648                assert(false);
649
650             uiMgr->enablePauseOrRun();
651
652             return;
653         }
654     }
655
656     // for now, abort if there is no paradynd, this should not happen
657     printf("process started on %s, can't find paradynd there\n",
658            machine_name.string_of());
659     printf("paradyn error #1 encountered\n");
660     exit(-1);
661 }
662
663 void dynRPCUser::newMetricCallback(T_dyninstRPC::metricInfo info)
664 {
665     addMetric(info);
666 }
667
668 void dynRPCUser::firstSampleCallback (int,double) {
669
670   assert(0 && "Invalid virtual function");
671 }
672
673 void dynRPCUser::cpDataCallbackFunc(int,double,int,double,double)
674 {
675     assert(0 && "Invalid virtual function");
676 }
677
678 // batch the sample delivery
679 void dynRPCUser::batchSampleDataCallbackFunc(int,
680                     vector<T_dyninstRPC::batch_buffer_entry>)
681 {
682     assert(0 && "Invalid virtual function");
683 }
684
685 // batch the trace delivery
686 void dynRPCUser::batchTraceDataCallbackFunc(int,
687                     vector<T_dyninstRPC::trace_batch_buffer_entry>)
688 {
689     assert(0 && "Invalid virtual function");
690 }
691
692 //
693 // When a paradynd is started remotely, ie not by paradyn, this upcall
694 // reports the information for that paradynd to paradyn
695 //
696 void 
697 dynRPCUser::reportSelf (string , string , int , string)
698 {
699   assert(0);
700   return;
701 }
702
703 void 
704 dynRPCUser::reportStatus (string)
705 {
706     assert(0 && "Invalid virtual function");
707 }
708
709 void
710 dynRPCUser::processStatus(int, u_int)
711 {
712     assert(0 && "Invalid virtual function");
713 }
714
715 void
716 dynRPCUser::endOfDataCollection(int)
717 {
718   assert(0 && "Invalid virtual function");
719 }
720
721
722 // 
723 // establish socket that will be advertised to paradynd's
724 // this socket will allow paradynd's to connect to paradyn for pvm
725 //
726 static void
727 DMsetupSocket (int &sockfd)
728 {
729   // setup "well known" socket for pvm paradynd's to connect to
730   bool aflag;
731   aflag = ((dataManager::dm->socket =
732            RPC_setup_socket (sockfd, AF_INET, SOCK_STREAM)) >= 0);
733   assert(aflag);
734
735   // bind fd for this thread
736   msg_bind (sockfd, true);
737 }
738
739 static void
740 DMnewParadynd ()
741 {
742   // accept the connection
743   int new_fd = RPC_getConnect(dataManager::dm->sock_fd);
744   if (new_fd < 0)
745     uiMgr->showError(4, "");
746
747   // add new daemon to dictionary of all deamons
748   paradynDaemon::addDaemon(new_fd); 
749 }
750
751 bool dataManager::DM_sequential_init(const char* met_file){
752    string mfile = met_file;
753    return(metMain(mfile)); 
754 }
755
756 int dataManager::DM_post_thread_create_init(int tid) {
757
758
759     thr_name("Data Manager");
760     dataManager::dm = new dataManager(tid);
761
762     // supports argv passed to paradynDaemon
763     // new paradynd's may try to connect to well known port
764     DMsetupSocket (dataManager::dm->sock_fd);
765
766     bool aflag;
767     aflag=(RPC_make_arg_list(paradynDaemon::args,
768                              dataManager::dm->socket, 1, 1, "", false));
769     assert(aflag);
770
771     // start initial phase
772     string dm_phase0 = "phase_0";
773     phaseInfo::startPhase(0.0,dm_phase0,false,false);
774
775     char DMbuff[64];
776     unsigned int msgSize = 64;
777     msg_send (MAINtid, MSG_TAG_DM_READY, (char *) NULL, 0);
778     unsigned int tag = MSG_TAG_ALL_CHILDREN_READY;
779     msg_recv (&tag, DMbuff, &msgSize);
780     return 1;
781 }
782
783 //
784 // Main loop for the dataManager thread.
785 //
786 void *DMmain(void* varg)
787 {
788     unsigned fd_first = 0;
789     // We declare the "printChangeCollection" tunable constant here; it will
790     // last for the lifetime of this function, which is pretty much forever.
791     // (used to be declared as global in DMappContext.C.  Globally declared
792     //  tunables are now a no-no).  Note that the variable name (printCC) is
793     // unimportant.   -AT
794     tunableBooleanConstantDeclarator printCC("printChangeCollection", 
795               "Print the name of metric/focus when enabled or disabled",
796               false, // initial value
797               NULL, // callback
798               developerConstant);
799
800     // Now the same for "printSampleArrival"
801     extern bool our_print_sample_arrival;
802     our_print_sample_arrival = false;
803     extern void printSampleArrivalCallback(bool);
804     tunableBooleanConstantDeclarator printSA("printSampleArrival", 
805               "Print out status lines to show the arrival of samples",
806               our_print_sample_arrival, // init val
807               printSampleArrivalCallback,
808               developerConstant);
809
810     int tid; memcpy((void*)&tid,varg, sizeof(int));
811     dataManager::DM_post_thread_create_init(tid);
812
813     int ret;
814     unsigned int tag;
815     paradynDaemon *pd = NULL;
816     while (1) {
817         for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
818             pd = paradynDaemon::allDaemons[i]; 
819             // handle up to max async requests that may have been buffered
820             // while blocking on a sync request
821             while (pd->buffered_requests()){
822                 if(pd->process_buffered() == T_dyninstRPC::error) {
823                     cout << "error on paradyn daemon\n";
824                     paradynDaemon::removeDaemon(pd, true);
825         } } }
826
827         tag = MSG_TAG_ANY;
828         // ret = msg_poll(&tag, true);
829         ret = msg_poll_preference(&tag, true,fd_first);
830         fd_first = !fd_first;
831         assert(ret != THR_ERR);
832
833         if (tag == MSG_TAG_FILE) {
834             // must be an upcall on something speaking the dynRPC protocol.
835             if (ret == dataManager::dm->sock_fd){
836                 DMnewParadynd(); // set up a new daemon
837             }
838             else {
839                 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
840                     pd = paradynDaemon::allDaemons[i]; 
841                     if(pd->get_fd() == ret){
842                         if(pd->waitLoop() == T_dyninstRPC::error) {
843                             cout << "error on paradyn daemon\n";
844                             paradynDaemon::removeDaemon(pd, true);
845                     }}
846
847                     // handle async requests that may have been buffered
848                     // while blocking on a sync request
849                     while(pd->buffered_requests()){
850                         if(pd->process_buffered() == T_dyninstRPC::error) {
851                             cout << "error on paradyn daemon\n";
852                             paradynDaemon::removeDaemon(pd, true);
853                     }}
854                 }
855             }
856         } else if (dataManager::dm->isValidTag
857                   ((T_dataManager::message_tags)tag)) {
858             if (dataManager::dm->waitLoop(true, 
859                (T_dataManager::message_tags)tag) == T_dataManager::error) {
860                 // handle error
861                 assert(0);
862             }
863         } else {
864             cerr << "Unrecognized message in DMmain.C\n";
865             assert(0);
866         }
867    }
868 }
869
870
871 void addMetric(T_dyninstRPC::metricInfo &info)
872 {
873     // if metric already exists return
874     if(metric::allMetrics.defines(info.name)){
875         return;
876     }
877     metric *met = new metric(info);
878
879     // now tell all perfStreams
880     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
881                 allS(performanceStream::allStreams);
882     perfStreamHandle h;
883     performanceStream *ps;
884     while(allS.next(h,ps)){
885         if(ps->controlFunc.mFunc){
886             // set the correct destination thread.
887             dataManager::dm->setTid(ps->threadId);
888             dataManager::dm->newMetricDefined(ps->controlFunc.mFunc, 
889                                               ps->Handle(),
890                                               met->getName(),
891                                               met->getStyle(),
892                                               met->getAggregate(),
893                                               met->getUnits(),
894                                               met->getHandle(),
895                                               met->getUnitsType());
896         }
897     }
898 }
899
900
901 // I don't want to parse for '/' more than once, thus the use of a string vector
902 resourceHandle createResource(unsigned res_id, vector<string>& resource_name, string& abstr, unsigned type) {
903
904   static const string slashStr = "/";
905   static const string baseStr = "BASE";
906
907   resource *parent = NULL;
908   unsigned r_size = resource_name.size();
909   string p_name;
910
911
912   switch (r_size) {
913     case 0:
914         // Should this case ever occur ?
915         assert(0); break;
916     case 1:
917         parent = resource::rootResource; break;
918     default:
919         for (unsigned ri=0; ri<(r_size-1); ri++) 
920             p_name += slashStr + resource_name[ri];
921         parent = resource::string_to_resource(p_name);
922         assert(parent);
923         break;
924     }
925     if (!parent) assert(0);
926
927
928     /* first check to see if the resource has already been defined */
929     resource *p = resource::resources[parent->getHandle()];
930     string myName = p_name;
931     myName += slashStr;
932     myName += resource_name[r_size - 1];
933
934     resource *child;
935     if (resource::allResources.find(myName.string_of(), child)) {
936       return child->getHandle();
937     }
938
939     // if abstr is not defined then use default abstraction 
940     if(!abstr.string_of()){
941         abstr = baseStr;
942     }
943
944     /* then create it */
945     resource *ret =  new resource(parent->getHandle(),res_id, resource_name,
946                                   myName,abstr, type);
947
948     // check to see if the suppressMagnify option should be set...if
949     // this resource is specifed in the mdl exclude_lib option
950     vector<string> shared_lib_constraints;
951     if(resource::get_lib_constraints(shared_lib_constraints) &&
952        (string(parent->getFullName()) == "/Code")){
953             for(u_int i=0; i < shared_lib_constraints.size(); i++){
954                 if(shared_lib_constraints[i] == ret->getName()){
955                     ret->setSuppressMagnify();
956                 }
957             }
958     }
959
960     /* inform others about it if they need to know */
961     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
962                         allS(performanceStream::allStreams);
963     perfStreamHandle h;
964     performanceStream *ps;
965     resourceHandle r_handle = ret->getHandle();
966     string name = ret->getFullName(); 
967     while(allS.next(h,ps)){
968         ps->callResourceFunc(parent->getHandle(),r_handle,ret->getFullName(),
969         ret->getAbstractionName());
970     }
971     return(r_handle);
972 }
973
974 resourceHandle createResource_ncb(vector<string>& resource_name, string& abstr, unsigned type,
975                                   resourceHandle &p_handle, bool &exist
976                                  ) 
977 {
978   resource *parent = NULL;
979   unsigned r_size = resource_name.size();
980   string p_name;
981
982
983   switch (r_size) {
984     case 0:
985         // Should this case ever occur ?
986         assert(0); break;
987     case 1:
988         parent = resource::rootResource; break;
989     default:
990         for (unsigned ri=0; ri<(r_size-1); ri++) 
991             p_name += string("/") + resource_name[ri];
992         parent = resource::string_to_resource(p_name);
993         assert(parent);
994         break;
995     }
996     if (!parent) assert(0);
997
998
999     /* first check to see if the resource has already been defined */
1000     p_handle = parent->getHandle() ;
1001     resource *p = resource::resources[parent->getHandle()];
1002     string myName = p_name;
1003     myName += "/";
1004     myName += resource_name[r_size - 1];
1005     if(!exist) {
1006         resourceHandle *child = p->findChild(myName.string_of());
1007         if (child){
1008                 return(*child); 
1009                 delete child;
1010         }
1011     } else {
1012         exist = false ;
1013     }
1014
1015     // if abstr is not defined then use default abstraction 
1016     if(!abstr.string_of()){
1017         abstr = string("BASE");
1018     }
1019
1020     /* then create it */
1021     resource *ret =  new resource(parent->getHandle(),resource_name,
1022                                   myName,abstr, type);
1023
1024     resourceHandle r_handle = ret->getHandle() ;
1025     return(r_handle);
1026 }
1027
1028 void newSampleRate(float rate)
1029 {
1030     paradynDaemon *pd = NULL;
1031     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
1032         pd = paradynDaemon::allDaemons[i]; 
1033         pd->setSampleRate(rate);
1034     }
1035 }
1036
1037 #ifdef ndef
1038 // Note - the metric parser has been moved into the dataManager
1039 bool parse_metrics(string metric_file) {
1040      bool parseResult = metMain(metric_file);
1041     return parseResult;
1042 }
1043 #endif
1044
1045