Added dynRPCUser::memoryInfoCallback, resourceHandle createResource_ncb
[dyninst.git] / paradyn / src / DMthread / DMmain.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 #include <assert.h>
43 extern "C" {
44 double   quiet_nan();
45 #include <malloc.h>
46 #include <stdio.h>
47 }
48
49 #include "thread/h/thread.h"
50 #include "paradyn/src/TCthread/tunableConst.h"
51 #include "dataManager.thread.SRVR.h"
52 #include "dyninstRPC.xdr.CLNT.h"
53 #include "DMdaemon.h"
54 #include "DMmetric.h"
55 #include "DMperfstream.h"
56 #include "DMabstractions.h"
57 #include "paradyn/src/pdMain/paradyn.h"
58 #include "paradyn/src/UIthread/Status.h"
59
60 #include "util/h/Vector.h"
61 #include "util/h/Dictionary.h"
62 #include "util/h/String.h"
63 #include "DMphase.h"
64
65 typedef vector<string> blahType;
66
67 // bool parse_metrics(string metric_file);
68 bool metMain(string &userFile);
69
70 // this has to be declared before baseAbstr, cmfAbstr, and rootResource 
71 int dataManager::sock_fd;  
72 int dataManager::socket;  
73 dataManager *dataManager::dm = NULL;  
74
75 dictionary_hash<string,abstraction*> abstraction::allAbstractions(string::hash);
76 abstraction *baseAbstr = new abstraction("BASE");
77 abstraction *cmfAbstr = new abstraction("CMF");
78
79 dictionary_hash<string,metric*> metric::allMetrics(string::hash);
80 dictionary_hash<metricInstanceHandle,metricInstance *> 
81                 metricInstance::allMetricInstances(metricInstance::mhash);
82 dictionary_hash<perfStreamHandle,performanceStream*>  
83                 performanceStream::allStreams(performanceStream::pshash);
84 dictionary_hash<string, resource*> resource::allResources(string::hash);
85 dictionary_hash<string,resourceList *> resourceList::allFoci(string::hash);
86
87 vector<resource*> resource::resources;
88 vector<string> resource::lib_constraints;
89 vector<metric*> metric::metrics;
90 vector<paradynDaemon*> paradynDaemon::allDaemons;
91 vector<daemonEntry*> paradynDaemon::allEntries;
92 vector<executable*> paradynDaemon::programs;
93 unsigned paradynDaemon::procRunning;
94 vector<resourceList *> resourceList::foci;
95 vector<phaseInfo *> phaseInfo::dm_phases;
96 u_int metricInstance::next_id = 1;
97 u_int performanceStream::next_id = 0;
98 vector<DM_enableType*> paradynDaemon::outstanding_enables;  
99 u_int paradynDaemon::next_enable_id = 0;  
100 u_int paradynDaemon::count = 0;
101
102 resource *resource::rootResource = new resource();
103 timeStamp metricInstance::curr_bucket_width;
104 timeStamp metricInstance::global_bucket_width;
105 phaseHandle metricInstance::curr_phase_id;
106 u_int metricInstance::num_curr_hists = 0;
107 u_int metricInstance::num_global_hists = 0;
108
109 double paradynDaemon::earliestFirstTime = 0;
110 void newSampleRate(float rate);
111
112 extern void histDataCallBack(sampleValue*, timeStamp, int, int, void*, bool);
113 extern void histFoldCallBack(timeStamp, void*, bool);
114
115 //
116 // IO from application processes.
117 //
118 void dynRPCUser::applicationIO(int,int,string data)
119 {
120
121     // NOTE: this fixes a purify error with the commented out code (a memory
122     // segment error occurs occasionally with the line "cout << rest << endl") 
123     // this is problably not the best fix,  but I can't figure out why 
124     // the error is occuring (rest is always '\0' terminated when this
125     // error occurs)---tn 
126     fprintf(stdout,data.string_of());
127     fflush(stdout);
128
129 #ifdef n_def
130     char *ptr;
131     char *rest;
132     // extra should really be per process.
133     static string extra;
134
135     rest = P_strdup(data.string_of());
136
137     char *tp = rest;
138     ptr = P_strchr(rest, '\n');
139     while (ptr) {
140         *ptr = '\0';
141         if (pid) {
142             printf("pid %d:", pid);
143         } else {
144             printf("paradynd: ");
145         }
146         if (extra.length()) {
147             cout << extra;
148             extra = (char*) NULL;
149         }
150         cout << rest << endl;
151         rest = ptr+1;
152         if(rest)
153             ptr = P_strchr(rest, '\n');
154         else
155             ptr = 0;
156     }
157     extra += rest;
158     delete tp;
159     rest = 0;
160 #endif
161
162 }
163
164 extern status_line *DMstatus;
165
166 void dynRPCUser::resourceBatchMode(bool onNow)
167 {
168    printf("error calling virtual func: dynRPCUser::resourceBatchMode\n");
169 }
170
171 //
172 // upcalls from remote process.
173 //
174 void dynRPCUser::resourceInfoCallback(int,
175                                       vector<string> resource_name,
176                                       string abstr, u_int type) {
177
178 printf("error calling virtual func: dynRPCUser::resourceInfoCallback\n");
179
180 }
181
182 void dynRPCUser::memoryInfoCallback(int,
183                                     string vname,
184                                     int va,
185                                     u_int mem_size,
186                                     u_int blk_size)
187 {//TO DO
188    string       abstr = "BASE";
189    u_int        type = MDL_T_INT;
190    int          end =  va + mem_size ;
191    int          start = va ;
192    vector<resourceHandle> handles, whereHandles ;
193    resourceHandle         p_handle, r_handle ;
194    int                    num_blks = 0 ;
195
196    printf("Paradyn received: (var:%s, va:%d, mem_size:%d, blk_size:%d)\n",
197               vname.string_of(), va, mem_size, blk_size) ;
198
199    vector<string> res_name;
200    res_name += "Memory" ; res_name += vname ;
201    bool exist = false ;
202    r_handle = createResource_ncb(res_name, abstr, MDL_T_VARIABLE, p_handle, exist);
203    handles += r_handle ;
204
205    /* inform others about it if they need to know */
206    if(!exist)
207    {
208     char temp[255] ;
209     sprintf(temp, "Memory/%s", vname.string_of()) ;
210     const char *name = strdup(temp) ;
211     const char *abs  = strdup(abstr.string_of()) ;
212     dictionary_hash_iter<perfStreamHandle,performanceStream*>
213                         allS(performanceStream::allStreams);
214     perfStreamHandle h;
215     performanceStream *ps;
216     while(allS.next(h,ps)){
217         ps->callResourceFunc(p_handle,r_handle,name,abs);
218     }
219    }
220
221    while (va < end)
222    {
223         char temp[255] ;
224         vector<string> res_name;
225         res_name += "Memory" ; res_name += vname ;
226
227         sprintf(temp, "%d", (int) va) ;
228         res_name += temp ;
229         exist = true; /* we do not want to search to duplication */
230         r_handle = createResource_ncb(res_name, abstr, type, p_handle, exist);
231         handles += r_handle ;
232         whereHandles += r_handle ;
233
234         va += blk_size ;
235         num_blks ++ ;
236    }
237    /* inform the daemon of the things it needs to know */
238    /* should send this to all daemons, not just one  */
239    {
240     for(u_int j=0; j < paradynDaemon::allDaemons.size(); j++){
241                 paradynDaemon *pd = paradynDaemon::allDaemons[j];
242                 pd->memoryInfoResponse(vname, start, mem_size, blk_size, handles
243 ) ;
244     }
245    }
246
247    /* inform others about it if they need to know */
248    {
249     dictionary_hash_iter<perfStreamHandle,performanceStream*>
250                         allS(performanceStream::allStreams);
251     perfStreamHandle h;
252     performanceStream *ps;
253     while(allS.next(h,ps)){
254         ps->callMemoryFunc(vname, start, mem_size, blk_size, p_handle, whereHandles);
255     }
256    }
257 }
258
259
260 void dynRPCUser::mappingInfoCallback(int,
261                                      string abstraction, 
262                                      string type, 
263                                      string key,
264                                      string value)
265 {
266   AMnewMapping(abstraction.string_of(),type.string_of(),key.string_of(),
267                value.string_of());    
268 }
269
270 class uniqueName {
271   public:
272     uniqueName(stringHandle base) { name = base; nextId = 0; }
273     int nextId;
274     stringHandle name;
275 };
276
277 //
278 // handles a completed enable response: updates metricInstance state
279 // and send the calling thread the response 
280 //
281 void DMenableResponse(DM_enableType &enable,vector<bool> &successful){
282     
283
284     vector<metricInstance *> &mis = (*enable.request);
285     assert(successful.size() == mis.size());
286     vector<metricInstInfo> *response = new vector<metricInstInfo>(mis.size()); 
287
288     // update MI state and response vector
289     for(u_int i=0; i < mis.size(); i++){
290         if(mis[i] && successful[i]){  // this MI could be enabled
291           mis[i]->setEnabled();
292           metric *metricptr = metric::getMetric(mis[i]->getMetricHandle());
293           if(metricptr){
294
295             if(enable.ph_type == CurrentPhase){
296                 u_int old_current = mis[i]->currUsersCount();
297                 bool  current_data = mis[i]->isCurrHistogram();
298                 mis[i]->newCurrDataCollection(metricptr->getStyle(),
299                                               histDataCallBack,
300                                               histFoldCallBack);
301                 mis[i]->newGlobalDataCollection(metricptr->getStyle(),
302                                                 histDataCallBack,
303                                                 histFoldCallBack);
304                 mis[i]->addCurrentUser(enable.ps_handle);
305                 // set sample rate to match current phase hist. bucket width
306                 if(!metricInstance::numCurrHists()){
307                     float rate = phaseInfo::GetLastBucketWidth();
308                     newSampleRate(rate);
309                 }
310                 // new active curr. histogram added if there are no previous
311                 // curr. subscribers and either persistent_collection is clear
312                 // or there was no curr. histogram prior to this
313                 if((!old_current)
314                     && (mis[i]->currUsersCount() == 1) && 
315                     (!(mis[i]->isCollectionPersistent()) || (!current_data))){
316                     metricInstance::incrNumCurrHists();
317                 }
318                 // new global histogram if this metricInstance was just enabled
319                 if(!((*enable.enabled)[i])){
320                     metricInstance::incrNumGlobalHists();
321                 }
322             }
323             else {  // this is a global phase enable
324                 mis[i]->newGlobalDataCollection(metricptr->getStyle(),
325                                                 histDataCallBack,
326                                                 histFoldCallBack);
327                 mis[i]->addGlobalUser(enable.ps_handle);
328
329                 // if this is first global histogram enabled and there are no
330                 // curr hists, then set sample rate to global bucket width
331                 if(!metricInstance::numCurrHists()){
332                     if(!metricInstance::numGlobalHists()){
333                         float rate = Histogram::getGlobalBucketWidth();
334                         newSampleRate(rate);
335                 }}
336                 // new global hist added: update count
337                 if(!((*enable.enabled)[i])){
338                     metricInstance::incrNumGlobalHists();
339                 }
340             }
341             // update response vector
342             (*response)[i].successfully_enabled = true;
343             (*response)[i].mi_id = mis[i]->getHandle(); 
344             (*response)[i].m_id = mis[i]->getMetricHandle();
345             (*response)[i].r_id = mis[i]->getFocusHandle();
346             (*response)[i].metric_name = mis[i]->getMetricName();
347             (*response)[i].focus_name = mis[i]->getFocusName();
348             (*response)[i].metric_units = metricptr->getUnits();
349             (*response)[i].units_type = metricptr->getUnitsType();
350
351             // update the persistence flags: the OR of new & previous values
352             if(enable.persistent_data){
353                 mis[i]->setPersistentData();
354             }
355             if(enable.persistent_collection){
356                 mis[i]->setPersistentCollection();
357             }
358             if(enable.phase_persistent_data){
359                 mis[i]->setPhasePersistentData();
360             }
361           }
362           else {
363               cout << "mis enabled but no metric handle: " 
364                    << mis[i]->getMetricHandle() << endl;
365               assert(0);
366           }
367         }
368         else {  // was not successfully enabled
369             (*response)[i].successfully_enabled = false;
370             (*response)[i].mi_id = mis[i]->getHandle(); 
371             (*response)[i].m_id = mis[i]->getMetricHandle();
372             (*response)[i].r_id = mis[i]->getFocusHandle();
373             (*response)[i].metric_name = mis[i]->getMetricName();
374             (*response)[i].focus_name = mis[i]->getFocusName();
375         }
376
377 //        if(mis[i]) {
378 //          (*response)[i].mi_id = mis[i]->getHandle(); 
379 //          (*response)[i].m_id = mis[i]->getMetricHandle();
380 //          (*response)[i].r_id = mis[i]->getFocusHandle();
381 //          (*response)[i].metric_name = mis[i]->getMetricName();
382 //          (*response)[i].focus_name = mis[i]->getFocusName();
383 //        }
384     }
385
386     // make response call
387     dictionary_hash_iter<perfStreamHandle,performanceStream*>
388             allS(performanceStream::allStreams);
389     perfStreamHandle h; performanceStream *ps;
390     while(allS.next(h,ps)){
391         if(h == (perfStreamHandle)(enable.ps_handle)){
392             ps->callDataEnableFunc(response,enable.client_id);
393             return;
394     } }
395     response = 0;
396 }
397
398 //
399 // handle an enable response from a daemon. If all daemons have responded
400 // then make response callback to calling thread, and check the outstanding
401 // enables list to see if this enable response satisfies any waiting requests.
402 // and enable for an MI is successful if its done entry is true and if its
403 // MI* is not 0
404 //
405 void dynRPCUser::enableDataCallback(u_int daemon_id, 
406                                     vector<int> return_id,
407                                     vector<u_int> mi_ids,
408                                     u_int request_id)
409 {
410     // find element in outstanding_enables corr. to request_id
411     u_int which =0;
412     DM_enableType *request_entry = 0;
413     for(u_int i=0; i < paradynDaemon::outstanding_enables.size(); i++){
414         if((paradynDaemon::outstanding_enables[i])->request_id == request_id){
415             which = i;
416             request_entry = paradynDaemon::outstanding_enables[i];
417             break;
418     } }
419
420     if(!request_entry){
421         // a request entry can be removed if a new phase event occurs
422         // between the enable request and response, so ignore the response
423         return;
424     }
425     assert(daemon_id < paradynDaemon::allDaemons.size());
426     paradynDaemon *pd = paradynDaemon::allDaemons[daemon_id];
427
428     // for each mi in request update mi's components with new daemon if
429     // it was successfully enabled
430     assert(mi_ids.size() == return_id.size());
431     for(u_int j=0; j< return_id.size(); j++){
432         if(return_id[j] != -1){
433             metricInstanceHandle mh =  mi_ids[j];     
434             metricInstance *mi = request_entry->findMI(mh);
435             assert(mi);
436             component *comp = new component(pd,return_id[j], mi);  
437             bool aflag;
438             aflag=(mi->addComponent(comp));
439             assert(aflag);
440             // if at least one daemon could enable, update done and enabled
441             request_entry->setDone(mh);
442     } }
443
444     // update count of outstanding daemon responses
445     assert(request_entry->how_many);
446     request_entry->how_many--;
447
448     // all daemons have responded to enable request, send result to caller 
449     if(!request_entry->how_many) { 
450         vector<bool> successful( request_entry->request->size());
451         for(u_int k=0; k < request_entry->request->size(); k++){
452             // if MI is 0 or if done is false
453             if(!((*(request_entry->done))[k]) 
454                 || !((*(request_entry->request))[k])){
455                 successful[k] = false;
456             }
457             else {
458                 successful[k] = true;
459             }
460         }
461         // if all daemons have responded update state for request and send
462         // result to caller
463         // a successful enable has both the enabled flag set and an mi*
464
465         // clear currentlyEnabling flag and decrement the count of 
466         // waiting enables for all MI's
467         for(u_int i1=0; i1 < request_entry->done->size(); i1++){
468             if((*request_entry->request)[i1]){
469                 ((*request_entry->request)[i1])->clearCurrentlyEnabling();
470                 if(request_entry->ph_type == CurrentPhase){
471                     ((*request_entry->request)[i1])->decrCurrWaiting();
472                 }
473                 else{
474                     ((*request_entry->request)[i1])->decrGlobalWaiting();
475                 }
476         } }
477
478         // update MI state for this entry and send response to caller
479         DMenableResponse(*request_entry,successful);
480         
481
482         // remove this entry from the outstanding enables list 
483         u_int size = paradynDaemon::outstanding_enables.size();
484         paradynDaemon::outstanding_enables[which] =
485                          paradynDaemon::outstanding_enables[size-1];
486                 paradynDaemon::outstanding_enables.resize(size-1);
487
488         // for each element on outstanding_enables, check to see if there are
489         // any outstatnding_enables that can be satisfied by this request
490         // if so, update state, and for any outstanding_enables that are 
491         // complete, send the result to the client thread  
492         // update not_all_done 
493         for(u_int i2=0; i2 < paradynDaemon::outstanding_enables.size(); i2++){
494             DM_enableType *next_entry = paradynDaemon::outstanding_enables[i2];
495             next_entry->updateAny(*(request_entry->request),successful);
496         }
497         delete request_entry;
498         request_entry = 0;
499
500         if(paradynDaemon::outstanding_enables.size()){
501           bool done = false;
502           u_int i3 = 0;
503           while(!done){
504               if((paradynDaemon::outstanding_enables[i3])->not_all_done){
505                   i3++;
506               }
507               else {  // this entry's request is complete
508                    // update MI state for this entry and send response to caller
509                    DM_enableType *temp = paradynDaemon::outstanding_enables[i3];
510                    successful.resize(temp->request->size());
511                    for(u_int k2=0; k2 < successful.size(); k2++){
512                        if(!((*(temp->done))[k2])) successful[k2] = false;
513                        else successful[k2] = true;
514                    }
515                    // decrement the number of waiting for enables for 
516                    // each MI in this response
517                    for(u_int k3=0; k3 < temp->request->size(); k3++){
518                        if((*temp->request)[k3]){
519                            if(temp->ph_type == CurrentPhase){
520                                ((*temp->request)[k3])->decrCurrWaiting();
521                            }
522                            else{
523                               ((*temp->request)[k3])->decrGlobalWaiting();
524                            }
525                    } }
526
527                    DMenableResponse(*temp,successful);
528
529                    // remove entry from outstanding_enables list
530                    u_int newsize=paradynDaemon::outstanding_enables.size()-1;
531                    paradynDaemon::outstanding_enables[i3] =
532                          paradynDaemon::outstanding_enables[newsize];
533                    paradynDaemon::outstanding_enables.resize(newsize);
534                    delete temp;
535               }
536               if(i3 >= paradynDaemon::outstanding_enables.size()) done = true;
537           }
538         }
539     }
540 }
541
542 //
543 // Upcall from daemon in response to getPredictedDataCost call
544 // id - perfStreamHandle assoc. with the call
545 // req_id - an identifier assoc. with the request 
546 // val - the cost of enabling the metric/focus pair
547 //
548 void dynRPCUser::getPredictedDataCostCallback(u_int id,
549                                               u_int req_id,
550                                               float val,
551                                               u_int clientID)
552 {
553     // find the assoc. perfStream and update it's pred data cost value
554     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
555                 allS(performanceStream::allStreams);
556     perfStreamHandle h; performanceStream *ps;
557     while(allS.next(h,ps)){
558         if(h == (perfStreamHandle)id){
559             ps->predictedDataCostCallback(req_id,val,clientID);
560             return;
561     } }
562     // TODO: call correct routine
563     assert(0);
564 }
565
566 //
567 // Display errors using showError function from the UIM class
568 // This function allows to display error messages from paradynd
569 // using the "upcall" or "call back" mechanism.
570 // Parameters:  errCode = Error code
571 //              errString = Error message
572 //              hostName = Host name where the error occur
573 // Call: there is a macro defined in "showerror.h". This macro must be
574 //       used when calling this function. A typical call is:
575 //       showErrorCallback(99, "Erro message test"). This macro will
576 //       automatically insert the additional host info required.
577 //
578 void dynRPCUser::showErrorCallback(int errCode, 
579                                    string errString,
580                                    string hostName)
581 {
582     string msg;
583
584     if (errString.length() > 0) {
585         if (hostName.length() > 0) {
586             msg = string("<Msg from daemon on host ") + hostName + 
587                   string("> ") + errString;
588         }
589         else { 
590             msg = string("<Msg from daemon on host ?> ") + errString; 
591         }
592         uiMgr->showError(errCode, P_strdup(msg.string_of()));
593     }
594     else {
595         uiMgr->showError(errCode, ""); 
596     }
597
598     //
599     // hostName.length() should always be > 0, otherwise
600     // hostName is not defined (i.e. "?" will be used instead).
601     // if errString.length()==0, (i.e. errString.string_of()==""),
602     // then we will use the default error message in errorList.tcl
603     // This message, however, will not include any info about the current
604     // host name.
605     //
606 }
607
608 //
609 // Paradynd calls this igen fn when it starts a new process (more
610 // specifically, after it starts the new process and the new process
611 // has completed running DYNINSTinit).
612 //
613 void dynRPCUser::newProgramCallbackFunc(int pid,
614                                         vector<string> argvString,
615                                         string machine_name,
616                                         bool calledFromExec,
617                                         bool runMe)
618 {
619     // there better be a paradynd running on this machine!
620
621     for (unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++) {
622         paradynDaemon *pd = paradynDaemon::allDaemons[i];
623         if (pd->machine.length() && (pd->machine == machine_name)){
624             if (!paradynDaemon::addRunningProgram(pid, argvString, pd, calledFromExec,
625                                                   runMe))
626                assert(false);
627
628             uiMgr->enablePauseOrRun();
629
630             return;
631         }
632     }
633
634     // for now, abort if there is no paradynd, this should not happen
635     printf("process started on %s, can't find paradynd there\n",
636            machine_name.string_of());
637     printf("paradyn error #1 encountered\n");
638     exit(-1);
639 }
640
641 void dynRPCUser::newMetricCallback(T_dyninstRPC::metricInfo info)
642 {
643     addMetric(info);
644 }
645
646 void dynRPCUser::firstSampleCallback (int,double) {
647
648   assert(0 && "Invalid virtual function");
649 }
650
651 void dynRPCUser::cpDataCallbackFunc(int,double,int,double,double)
652 {
653     assert(0 && "Invalid virtual function");
654 }
655
656 // batch the sample delivery
657 void dynRPCUser::batchSampleDataCallbackFunc(int,
658                     vector<T_dyninstRPC::batch_buffer_entry>)
659 {
660     assert(0 && "Invalid virtual function");
661 }
662
663 //
664 // When a paradynd is started remotely, ie not by paradyn, this upcall
665 // reports the information for that paradynd to paradyn
666 //
667 void 
668 dynRPCUser::reportSelf (string , string , int , string)
669 {
670   assert(0);
671   return;
672 }
673
674 void 
675 dynRPCUser::reportStatus (string)
676 {
677     assert(0 && "Invalid virtual function");
678 }
679
680 void
681 dynRPCUser::processStatus(int, u_int)
682 {
683     assert(0 && "Invalid virtual function");
684 }
685
686 void
687 dynRPCUser::endOfDataCollection(int)
688 {
689   assert(0 && "Invalid virtual function");
690 }
691
692
693 // 
694 // establish socket that will be advertised to paradynd's
695 // this socket will allow paradynd's to connect to paradyn for pvm
696 //
697 static void
698 DMsetupSocket (int &sockfd)
699 {
700   // setup "well known" socket for pvm paradynd's to connect to
701   bool aflag;
702   aflag = ((dataManager::dm->socket =
703            RPC_setup_socket (sockfd, AF_INET, SOCK_STREAM)) >= 0);
704   assert(aflag);
705
706   // bind fd for this thread
707   msg_bind (sockfd, true);
708 }
709
710 static void
711 DMnewParadynd ()
712 {
713   // accept the connection
714   int new_fd = RPC_getConnect(dataManager::dm->sock_fd);
715   if (new_fd < 0)
716     uiMgr->showError(4, "");
717
718   // add new daemon to dictionary of all deamons
719   paradynDaemon::addDaemon(new_fd); 
720 }
721
722 bool dataManager::DM_sequential_init(const char* met_file){
723    string mfile = met_file;
724    return(metMain(mfile)); 
725 }
726
727 int dataManager::DM_post_thread_create_init(int tid) {
728
729
730     thr_name("Data Manager");
731     dataManager::dm = new dataManager(tid);
732
733     // supports argv passed to paradynDaemon
734     // new paradynd's may try to connect to well known port
735     DMsetupSocket (dataManager::dm->sock_fd);
736
737     bool aflag;
738     aflag=(RPC_make_arg_list(paradynDaemon::args,
739                              dataManager::dm->socket, 1, 1, "", false));
740     assert(aflag);
741
742     // start initial phase
743     string dm_phase0 = "phase_0";
744     phaseInfo::startPhase(0.0,dm_phase0,false,false);
745
746     char DMbuff[64];
747     unsigned int msgSize = 64;
748     msg_send (MAINtid, MSG_TAG_DM_READY, (char *) NULL, 0);
749     unsigned int tag = MSG_TAG_ALL_CHILDREN_READY;
750     msg_recv (&tag, DMbuff, &msgSize);
751     return 1;
752 }
753
754 //
755 // Main loop for the dataManager thread.
756 //
757 void *DMmain(void* varg)
758 {
759     unsigned fd_first = 0;
760     // We declare the "printChangeCollection" tunable constant here; it will
761     // last for the lifetime of this function, which is pretty much forever.
762     // (used to be declared as global in DMappContext.C.  Globally declared
763     //  tunables are now a no-no).  Note that the variable name (printCC) is
764     // unimportant.   -AT
765     tunableBooleanConstantDeclarator printCC("printChangeCollection", 
766               "Print the name of metric/focus when enabled or disabled",
767               false, // initial value
768               NULL, // callback
769               developerConstant);
770
771     // Now the same for "printSampleArrival"
772     extern bool our_print_sample_arrival;
773     our_print_sample_arrival = false;
774     extern void printSampleArrivalCallback(bool);
775     tunableBooleanConstantDeclarator printSA("printSampleArrival", 
776               "Print out status lines to show the arrival of samples",
777               our_print_sample_arrival, // init val
778               printSampleArrivalCallback,
779               developerConstant);
780
781     int tid; memcpy((void*)&tid,varg, sizeof(int));
782     dataManager::DM_post_thread_create_init(tid);
783
784     int ret;
785     unsigned int tag;
786     paradynDaemon *pd = NULL;
787     while (1) {
788         for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
789             pd = paradynDaemon::allDaemons[i]; 
790             // handle up to max async requests that may have been buffered
791             // while blocking on a sync request
792             while (pd->buffered_requests()){
793                 if(pd->process_buffered() == T_dyninstRPC::error) {
794                     cout << "error on paradyn daemon\n";
795                     paradynDaemon::removeDaemon(pd, true);
796         } } }
797
798         tag = MSG_TAG_ANY;
799         // ret = msg_poll(&tag, true);
800         ret = msg_poll_preference(&tag, true,fd_first);
801         fd_first = !fd_first;
802         assert(ret != THR_ERR);
803
804         if (tag == MSG_TAG_FILE) {
805             // must be an upcall on something speaking the dynRPC protocol.
806             if (ret == dataManager::dm->sock_fd){
807                 DMnewParadynd(); // set up a new daemon
808             }
809             else {
810                 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
811                     pd = paradynDaemon::allDaemons[i]; 
812                     if(pd->get_fd() == ret){
813                         if(pd->waitLoop() == T_dyninstRPC::error) {
814                             cout << "error on paradyn daemon\n";
815                             paradynDaemon::removeDaemon(pd, true);
816                     }}
817
818                     // handle async requests that may have been buffered
819                     // while blocking on a sync request
820                     while(pd->buffered_requests()){
821                         if(pd->process_buffered() == T_dyninstRPC::error) {
822                             cout << "error on paradyn daemon\n";
823                             paradynDaemon::removeDaemon(pd, true);
824                     }}
825                 }
826             }
827         } else if (dataManager::dm->isValidTag
828                   ((T_dataManager::message_tags)tag)) {
829             if (dataManager::dm->waitLoop(true, 
830                (T_dataManager::message_tags)tag) == T_dataManager::error) {
831                 // handle error
832                 assert(0);
833             }
834         } else {
835             cerr << "Unrecognized message in DMmain.C\n";
836             assert(0);
837         }
838    }
839 }
840
841
842 void addMetric(T_dyninstRPC::metricInfo &info)
843 {
844     // if metric already exists return
845     if(metric::allMetrics.defines(info.name)){
846         return;
847     }
848     metric *met = new metric(info);
849
850     // now tell all perfStreams
851     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
852                 allS(performanceStream::allStreams);
853     perfStreamHandle h;
854     performanceStream *ps;
855     while(allS.next(h,ps)){
856         if(ps->controlFunc.mFunc){
857             // set the correct destination thread.
858             dataManager::dm->setTid(ps->threadId);
859             dataManager::dm->newMetricDefined(ps->controlFunc.mFunc, 
860                                               ps->Handle(),
861                                               met->getName(),
862                                               met->getStyle(),
863                                               met->getAggregate(),
864                                               met->getUnits(),
865                                               met->getHandle(),
866                                               met->getUnitsType());
867         }
868     }
869 }
870
871
872 // I don't want to parse for '/' more than once, thus the use of a string vector
873 resourceHandle createResource(vector<string>& resource_name, string& abstr, unsigned type) {
874   resource *parent = NULL;
875   unsigned r_size = resource_name.size();
876   string p_name;
877
878
879   switch (r_size) {
880     case 0:
881         // Should this case ever occur ?
882         assert(0); break;
883     case 1:
884         parent = resource::rootResource; break;
885     default:
886         for (unsigned ri=0; ri<(r_size-1); ri++) 
887             p_name += string("/") + resource_name[ri];
888         parent = resource::string_to_resource(p_name);
889         assert(parent);
890         break;
891     }
892     if (!parent) assert(0);
893
894
895     /* first check to see if the resource has already been defined */
896     resource *p = resource::resources[parent->getHandle()];
897     string myName = p_name;
898     myName += "/";
899     myName += resource_name[r_size - 1];
900     resourceHandle *child = p->findChild(myName.string_of());
901     if (child){
902         return(*child); 
903         delete child;
904     }
905
906     // if abstr is not defined then use default abstraction 
907     if(!abstr.string_of()){
908         abstr = string("BASE");
909     }
910
911     /* then create it */
912     resource *ret =  new resource(parent->getHandle(),resource_name,
913                                   myName,abstr, type);
914
915     // check to see if the suppressMagnify option should be set...if
916     // this resource is specifed in the mdl exclude_lib option
917     vector<string> shared_lib_constraints;
918     if(resource::get_lib_constraints(shared_lib_constraints) &&
919        (string(parent->getFullName()) == "/Code")){
920             for(u_int i=0; i < shared_lib_constraints.size(); i++){
921                 if(shared_lib_constraints[i] == ret->getName()){
922                     ret->setSuppressMagnify();
923                 }
924             }
925     }
926
927     /* inform others about it if they need to know */
928     dictionary_hash_iter<perfStreamHandle,performanceStream*> 
929                         allS(performanceStream::allStreams);
930     perfStreamHandle h;
931     performanceStream *ps;
932     resourceHandle r_handle = ret->getHandle();
933     string name = ret->getFullName(); 
934     while(allS.next(h,ps)){
935         ps->callResourceFunc(parent->getHandle(),r_handle,ret->getFullName(),
936         ret->getAbstractionName());
937     }
938     return(r_handle);
939 }
940
941 resourceHandle createResource_ncb(vector<string>& resource_name, string& abstr, unsigned type,
942                                   resourceHandle &p_handle, bool &exist
943                                  ) 
944 {
945   resource *parent = NULL;
946   unsigned r_size = resource_name.size();
947   string p_name;
948
949
950   switch (r_size) {
951     case 0:
952         // Should this case ever occur ?
953         assert(0); break;
954     case 1:
955         parent = resource::rootResource; break;
956     default:
957         for (unsigned ri=0; ri<(r_size-1); ri++) 
958             p_name += string("/") + resource_name[ri];
959         parent = resource::string_to_resource(p_name);
960         assert(parent);
961         break;
962     }
963     if (!parent) assert(0);
964
965
966     /* first check to see if the resource has already been defined */
967     p_handle = parent->getHandle() ;
968     resource *p = resource::resources[parent->getHandle()];
969     string myName = p_name;
970     myName += "/";
971     myName += resource_name[r_size - 1];
972     if(!exist) {
973         resourceHandle *child = p->findChild(myName.string_of());
974         if (child){
975                 return(*child); 
976                 delete child;
977         }
978     } else {
979         exist = false ;
980     }
981
982     // if abstr is not defined then use default abstraction 
983     if(!abstr.string_of()){
984         abstr = string("BASE");
985     }
986
987     /* then create it */
988     resource *ret =  new resource(parent->getHandle(),resource_name,
989                                   myName,abstr, type);
990
991     resourceHandle r_handle = ret->getHandle() ;
992     return(r_handle);
993 }
994
995 void newSampleRate(float rate)
996 {
997     paradynDaemon *pd = NULL;
998     for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
999         pd = paradynDaemon::allDaemons[i]; 
1000         pd->setSampleRate(rate);
1001     }
1002 }
1003
1004 #ifdef ndef
1005 // Note - the metric parser has been moved into the dataManager
1006 bool parse_metrics(string metric_file) {
1007      bool parseResult = metMain(metric_file);
1008     return parseResult;
1009 }
1010 #endif
1011
1012