2 * Copyright (c) 1996 Barton P. Miller
4 * We provide the Paradyn Parallel Performance Tools (below
5 * described as Paradyn") on an AS IS basis, and do not warrant its
6 * validity or performance. We reserve the right to update, modify,
7 * or discontinue this software at any time. We shall have no
8 * obligation to supply such updates or modifications or any other
9 * form of support to you.
11 * This license is for research uses. For such uses, there is no
12 * charge. We define "research use" to mean you may freely use it
13 * inside your organization for whatever purposes you see fit. But you
14 * may not re-distribute Paradyn or parts of Paradyn, in any form
15 * source or binary (including derivatives), electronic or otherwise,
16 * to any other organization or entity without our permission.
18 * (for other uses, please contact us at paradyn@cs.wisc.edu)
20 * All warranties, including without limitation, any warranty of
21 * merchantability or fitness for a particular purpose, are hereby
24 * By your use of Paradyn, you understand and agree that we (or any
25 * other person or entity with proprietary rights in Paradyn) are
26 * under no obligation to provide either maintenance services,
27 * update services, notices of latent defects, or correction of
28 * defects for Paradyn.
30 * Even if advised of the possibility of such damages, under no
31 * circumstances shall we (or any other person or entity with
32 * proprietary rights in the software licensed hereunder) be liable
33 * to you or any third party for direct, indirect, or consequential
34 * damages of any character regardless of type of action, including,
35 * without limitation, loss of profits, loss of use, loss of good
36 * will, or computer failure or malfunction. You agree to indemnify
37 * us (and any other person or entity with proprietary rights in the
38 * software licensed hereunder) for any and all liability it may
39 * incur to third parties resulting from your use of Paradyn.
49 #include "thread/h/thread.h"
50 #include "paradyn/src/TCthread/tunableConst.h"
51 #include "dataManager.thread.SRVR.h"
52 #include "dyninstRPC.xdr.CLNT.h"
55 #include "DMperfstream.h"
56 #include "DMabstractions.h"
57 #include "paradyn/src/pdMain/paradyn.h"
58 #include "paradyn/src/UIthread/Status.h"
60 #include "util/h/Vector.h"
61 #include "util/h/Dictionary.h"
62 #include "util/h/String.h"
64 #include "util/h/ByteArray.h"
67 typedef vector<string> blahType;
69 // bool parse_metrics(string metric_file);
70 bool metMain(string &userFile);
72 // this has to be declared before baseAbstr, cmfAbstr, and rootResource
73 int dataManager::sock_fd;
74 int dataManager::socket;
75 dataManager *dataManager::dm = NULL;
77 dictionary_hash<string,abstraction*> abstraction::allAbstractions(string::hash);
78 abstraction *baseAbstr = new abstraction("BASE");
79 abstraction *cmfAbstr = new abstraction("CMF");
81 dictionary_hash<string,metric*> metric::allMetrics(string::hash);
82 dictionary_hash<metricInstanceHandle,metricInstance *>
83 metricInstance::allMetricInstances(metricInstance::mhash);
84 dictionary_hash<perfStreamHandle,performanceStream*>
85 performanceStream::allStreams(performanceStream::pshash);
86 dictionary_hash<string, resource*> resource::allResources(string::hash, 8192);
87 dictionary_hash<string,resourceList *> resourceList::allFoci(string::hash);
89 dictionary_hash<unsigned, resource*>resource::resources(uiHash);
90 vector<string> resource::lib_constraints;
91 vector< vector<string> > resource::func_constraints;
92 bool resource::func_constraints_built = false;
93 bool resource::lib_constraints_built = false;
95 vector<metric*> metric::metrics;
96 vector<paradynDaemon*> paradynDaemon::allDaemons;
97 vector<daemonEntry*> paradynDaemon::allEntries;
98 vector<executable*> paradynDaemon::programs;
99 unsigned paradynDaemon::procRunning;
100 vector<resourceList *> resourceList::foci;
101 vector<phaseInfo *> phaseInfo::dm_phases;
102 u_int metricInstance::next_id = 1;
103 // u_int performanceStream::next_id = 0;
104 vector<DM_enableType*> paradynDaemon::outstanding_enables;
105 u_int paradynDaemon::next_enable_id = 0;
106 u_int paradynDaemon::count = 0;
108 // to distinguish the enableDataRequest calls only for samples
109 // from those for both samples and traces
110 // 0 is reserved for non-trace use
111 u_int performanceStream::next_id = 1;
113 resource *resource::rootResource = new resource();
114 timeStamp metricInstance::curr_bucket_width;
115 timeStamp metricInstance::global_bucket_width;
116 phaseHandle metricInstance::curr_phase_id;
117 u_int metricInstance::num_curr_hists = 0;
118 u_int metricInstance::num_global_hists = 0;
120 double paradynDaemon::earliestFirstTime = 0;
121 void newSampleRate(float rate);
123 extern void histDataCallBack(sampleValue*, timeStamp, int, int, void*, bool);
124 extern void histFoldCallBack(timeStamp, void*, bool);
127 // IO from application processes.
129 void dynRPCUser::applicationIO(int,int,string data)
132 // NOTE: this fixes a purify error with the commented out code (a memory
133 // segment error occurs occasionally with the line "cout << rest << endl")
134 // this is problably not the best fix, but I can't figure out why
135 // the error is occuring (rest is always '\0' terminated when this
136 // error occurs)---tn
137 fprintf(stdout,data.string_of());
143 // extra should really be per process.
146 rest = P_strdup(data.string_of());
149 ptr = P_strchr(rest, '\n');
153 printf("pid %d:", pid);
155 printf("paradynd: ");
157 if (extra.length()) {
159 extra = (char*) NULL;
161 cout << rest << endl;
164 ptr = P_strchr(rest, '\n');
175 extern status_line *DMstatus;
177 void dynRPCUser::resourceBatchMode(bool onNow)
179 printf("error calling virtual func: dynRPCUser::resourceBatchMode\n");
183 // upcalls from remote process.
185 void dynRPCUser::resourceInfoCallback(u_int , vector<string> ,
188 printf("error calling virtual func: dynRPCUser::resourceInfoCallback\n");
192 void dynRPCUser::severalResourceInfoCallback(vector<T_dyninstRPC::resourceInfoCallbackStruct>) {
193 printf("error calling virtual func: dynRPCUser::severalResourceInfoCallback\n");
196 void dynRPCUser::memoryInfoCallback(int,
202 string abstr = "BASE";
203 u_int type = MDL_T_INT;
204 int end = va + mem_size ;
206 vector<resourceHandle> handles, whereHandles ;
207 resourceHandle p_handle, r_handle ;
210 printf("Paradyn received: (var:%s, va:%d, mem_size:%d, blk_size:%d)\n",
211 vname.string_of(), va, mem_size, blk_size) ;
213 vector<string> res_name;
214 res_name += "Memory" ; res_name += vname ;
216 r_handle = createResource_ncb(res_name, abstr, MDL_T_VARIABLE, p_handle, exist);
217 handles += r_handle ;
219 /* inform others about it if they need to know */
223 sprintf(temp, "Memory/%s", vname.string_of()) ;
224 const char *name = strdup(temp) ;
225 const char *abs = strdup(abstr.string_of()) ;
226 dictionary_hash_iter<perfStreamHandle,performanceStream*>
227 allS(performanceStream::allStreams);
229 performanceStream *ps;
230 while(allS.next(h,ps)){
231 ps->callResourceFunc(p_handle,r_handle,name,abs);
238 vector<string> res_name;
239 res_name += "Memory" ; res_name += vname ;
241 sprintf(temp, "%d", (int) va) ;
243 exist = true; /* we do not want to search to duplication */
244 r_handle = createResource_ncb(res_name, abstr, type, p_handle, exist);
245 handles += r_handle ;
246 whereHandles += r_handle ;
251 /* inform the daemon of the things it needs to know */
252 /* should send this to all daemons, not just one */
254 for(u_int j=0; j < paradynDaemon::allDaemons.size(); j++){
255 paradynDaemon *pd = paradynDaemon::allDaemons[j];
256 pd->memoryInfoResponse(vname, start, mem_size, blk_size, handles
261 /* inform others about it if they need to know */
263 dictionary_hash_iter<perfStreamHandle,performanceStream*>
264 allS(performanceStream::allStreams);
266 performanceStream *ps;
267 while(allS.next(h,ps)){
268 ps->callMemoryFunc(vname, start, mem_size, blk_size, p_handle, whereHandles);
274 void dynRPCUser::mappingInfoCallback(int,
280 AMnewMapping(abstraction.string_of(),type.string_of(),key.string_of(),
286 uniqueName(stringHandle base) { name = base; nextId = 0; }
292 // handles a completed enable response: updates metricInstance state
293 // and send the calling thread the response
295 void DMenableResponse(DM_enableType &enable,vector<bool> &successful){
298 vector<metricInstance *> &mis = (*enable.request);
299 assert(successful.size() == mis.size());
300 vector<metricInstInfo> *response = new vector<metricInstInfo>(mis.size());
302 // update MI state and response vector
303 for(u_int i=0; i < mis.size(); i++){
304 if(mis[i] && successful[i]){ // this MI could be enabled
305 mis[i]->setEnabled();
306 metric *metricptr = metric::getMetric(mis[i]->getMetricHandle());
309 if(enable.ph_type == CurrentPhase){
310 u_int old_current = mis[i]->currUsersCount();
311 bool current_data = mis[i]->isCurrHistogram();
312 mis[i]->newCurrDataCollection(metricptr->getStyle(),
315 mis[i]->newGlobalDataCollection(metricptr->getStyle(),
318 mis[i]->addCurrentUser(enable.ps_handle);
320 // trace data streams
321 mis[i]->newTraceDataCollection(traceDataCallBack);
322 mis[i]->addTraceUser(enable.pt_handle);
324 // set sample rate to match current phase hist. bucket width
325 if(!metricInstance::numCurrHists()){
326 float rate = phaseInfo::GetLastBucketWidth();
329 // new active curr. histogram added if there are no previous
330 // curr. subscribers and either persistent_collection is clear
331 // or there was no curr. histogram prior to this
333 && (mis[i]->currUsersCount() == 1) &&
334 (!(mis[i]->isCollectionPersistent()) || (!current_data))){
335 metricInstance::incrNumCurrHists();
337 // new global histogram if this metricInstance was just enabled
338 if(!((*enable.enabled)[i])){
339 metricInstance::incrNumGlobalHists();
342 else { // this is a global phase enable
343 mis[i]->newGlobalDataCollection(metricptr->getStyle(),
346 mis[i]->addGlobalUser(enable.ps_handle);
348 // trace data streams
349 mis[i]->newTraceDataCollection(traceDataCallBack);
350 mis[i]->addTraceUser(enable.pt_handle);
352 // if this is first global histogram enabled and there are no
353 // curr hists, then set sample rate to global bucket width
354 if(!metricInstance::numCurrHists()){
355 if(!metricInstance::numGlobalHists()){
356 float rate = Histogram::getGlobalBucketWidth();
359 // new global hist added: update count
360 if(!((*enable.enabled)[i])){
361 metricInstance::incrNumGlobalHists();
364 // update response vector
365 (*response)[i].successfully_enabled = true;
366 (*response)[i].mi_id = mis[i]->getHandle();
367 (*response)[i].m_id = mis[i]->getMetricHandle();
368 (*response)[i].r_id = mis[i]->getFocusHandle();
369 (*response)[i].metric_name = mis[i]->getMetricName();
370 (*response)[i].focus_name = mis[i]->getFocusName();
371 (*response)[i].metric_units = metricptr->getUnits();
372 (*response)[i].units_type = metricptr->getUnitsType();
374 // update the persistence flags: the OR of new & previous values
375 if(enable.persistent_data){
376 mis[i]->setPersistentData();
378 if(enable.persistent_collection){
379 mis[i]->setPersistentCollection();
381 if(enable.phase_persistent_data){
382 mis[i]->setPhasePersistentData();
386 cout << "mis enabled but no metric handle: "
387 << mis[i]->getMetricHandle() << endl;
391 else { // was not successfully enabled
392 (*response)[i].successfully_enabled = false;
393 (*response)[i].mi_id = mis[i]->getHandle();
394 (*response)[i].m_id = mis[i]->getMetricHandle();
395 (*response)[i].r_id = mis[i]->getFocusHandle();
396 (*response)[i].metric_name = mis[i]->getMetricName();
397 (*response)[i].focus_name = mis[i]->getFocusName();
401 // (*response)[i].mi_id = mis[i]->getHandle();
402 // (*response)[i].m_id = mis[i]->getMetricHandle();
403 // (*response)[i].r_id = mis[i]->getFocusHandle();
404 // (*response)[i].metric_name = mis[i]->getMetricName();
405 // (*response)[i].focus_name = mis[i]->getFocusName();
409 // make response call
410 dictionary_hash_iter<perfStreamHandle,performanceStream*>
411 allS(performanceStream::allStreams);
412 perfStreamHandle h; performanceStream *ps;
413 while(allS.next(h,ps)){
414 if(h == (perfStreamHandle)(enable.ps_handle)){
415 ps->callDataEnableFunc(response,enable.client_id);
418 // trace data streams
420 while(allS.next(h,ps)){
421 if(h == (perfStreamHandle)(enable.pt_handle)){
422 ps->callDataEnableFunc(response,enable.client_id);
429 // handle an enable response from a daemon. If all daemons have responded
430 // then make response callback to calling thread, and check the outstanding
431 // enables list to see if this enable response satisfies any waiting requests.
432 // and enable for an MI is successful if its done entry is true and if its
435 void dynRPCUser::enableDataCallback(u_int daemon_id,
436 vector<int> return_id,
437 vector<u_int> mi_ids,
440 // find element in outstanding_enables corr. to request_id
442 DM_enableType *request_entry = 0;
443 for(u_int i=0; i < paradynDaemon::outstanding_enables.size(); i++){
444 if((paradynDaemon::outstanding_enables[i])->request_id == request_id){
446 request_entry = paradynDaemon::outstanding_enables[i];
451 // a request entry can be removed if a new phase event occurs
452 // between the enable request and response, so ignore the response
455 assert(daemon_id < paradynDaemon::allDaemons.size());
456 paradynDaemon *pd = paradynDaemon::allDaemons[daemon_id];
458 // for each mi in request update mi's components with new daemon if
459 // it was successfully enabled
460 assert(mi_ids.size() == return_id.size());
461 for(u_int j=0; j< return_id.size(); j++){
462 if(return_id[j] != -1){
463 metricInstanceHandle mh = mi_ids[j];
464 metricInstance *mi = request_entry->findMI(mh);
466 component *comp = new component(pd,return_id[j], mi);
468 aflag=(mi->addComponent(comp));
470 // if at least one daemon could enable, update done and enabled
471 request_entry->setDone(mh);
474 // update count of outstanding daemon responses
475 assert(request_entry->how_many);
476 request_entry->how_many--;
478 // all daemons have responded to enable request, send result to caller
479 if(!request_entry->how_many) {
480 vector<bool> successful( request_entry->request->size());
481 for(u_int k=0; k < request_entry->request->size(); k++){
482 // if MI is 0 or if done is false
483 if(!((*(request_entry->done))[k])
484 || !((*(request_entry->request))[k])){
485 successful[k] = false;
488 successful[k] = true;
491 // if all daemons have responded update state for request and send
493 // a successful enable has both the enabled flag set and an mi*
495 // clear currentlyEnabling flag and decrement the count of
496 // waiting enables for all MI's
497 for(u_int i1=0; i1 < request_entry->done->size(); i1++){
498 if((*request_entry->request)[i1]){
499 ((*request_entry->request)[i1])->clearCurrentlyEnabling();
500 if(request_entry->ph_type == CurrentPhase){
501 ((*request_entry->request)[i1])->decrCurrWaiting();
504 ((*request_entry->request)[i1])->decrGlobalWaiting();
508 // update MI state for this entry and send response to caller
509 DMenableResponse(*request_entry,successful);
512 // remove this entry from the outstanding enables list
513 u_int size = paradynDaemon::outstanding_enables.size();
514 paradynDaemon::outstanding_enables[which] =
515 paradynDaemon::outstanding_enables[size-1];
516 paradynDaemon::outstanding_enables.resize(size-1);
518 // for each element on outstanding_enables, check to see if there are
519 // any outstatnding_enables that can be satisfied by this request
520 // if so, update state, and for any outstanding_enables that are
521 // complete, send the result to the client thread
522 // update not_all_done
523 for(u_int i2=0; i2 < paradynDaemon::outstanding_enables.size(); i2++){
524 DM_enableType *next_entry = paradynDaemon::outstanding_enables[i2];
525 next_entry->updateAny(*(request_entry->request),successful);
527 delete request_entry;
530 if(paradynDaemon::outstanding_enables.size()){
534 if((paradynDaemon::outstanding_enables[i3])->not_all_done){
537 else { // this entry's request is complete
538 // update MI state for this entry and send response to caller
539 DM_enableType *temp = paradynDaemon::outstanding_enables[i3];
540 successful.resize(temp->request->size());
541 for(u_int k2=0; k2 < successful.size(); k2++){
542 if(!((*(temp->done))[k2])) successful[k2] = false;
543 else successful[k2] = true;
545 // decrement the number of waiting for enables for
546 // each MI in this response
547 for(u_int k3=0; k3 < temp->request->size(); k3++){
548 if((*temp->request)[k3]){
549 if(temp->ph_type == CurrentPhase){
550 ((*temp->request)[k3])->decrCurrWaiting();
553 ((*temp->request)[k3])->decrGlobalWaiting();
557 DMenableResponse(*temp,successful);
559 // remove entry from outstanding_enables list
560 u_int newsize=paradynDaemon::outstanding_enables.size()-1;
561 paradynDaemon::outstanding_enables[i3] =
562 paradynDaemon::outstanding_enables[newsize];
563 paradynDaemon::outstanding_enables.resize(newsize);
566 if(i3 >= paradynDaemon::outstanding_enables.size()) done = true;
573 // Upcall from daemon in response to getPredictedDataCost call
574 // id - perfStreamHandle assoc. with the call
575 // req_id - an identifier assoc. with the request
576 // val - the cost of enabling the metric/focus pair
578 void dynRPCUser::getPredictedDataCostCallback(u_int id,
583 // find the assoc. perfStream and update it's pred data cost value
584 dictionary_hash_iter<perfStreamHandle,performanceStream*>
585 allS(performanceStream::allStreams);
586 perfStreamHandle h; performanceStream *ps;
587 while(allS.next(h,ps)){
588 if(h == (perfStreamHandle)id){
589 ps->predictedDataCostCallback(req_id,val,clientID);
592 // TODO: call correct routine
597 // Display errors using showError function from the UIM class
598 // This function allows to display error messages from paradynd
599 // using the "upcall" or "call back" mechanism.
600 // Parameters: errCode = Error code
601 // errString = Error message
602 // hostName = Host name where the error occur
603 // Call: there is a macro defined in "showerror.h". This macro must be
604 // used when calling this function. A typical call is:
605 // showErrorCallback(99, "Erro message test"). This macro will
606 // automatically insert the additional host info required.
608 void dynRPCUser::showErrorCallback(int errCode,
614 if (errString.length() > 0) {
615 if (hostName.length() > 0) {
616 msg = string("<Msg from daemon on host ") + hostName +
617 string("> ") + errString;
620 msg = string("<Msg from daemon on host ?> ") + errString;
622 uiMgr->showError(errCode, P_strdup(msg.string_of()));
625 uiMgr->showError(errCode, "");
629 // hostName.length() should always be > 0, otherwise
630 // hostName is not defined (i.e. "?" will be used instead).
631 // if errString.length()==0, (i.e. errString.string_of()==""),
632 // then we will use the default error message in errorList.tcl
633 // This message, however, will not include any info about the current
639 // Paradynd calls this igen fn when it starts a new process (more
640 // specifically, after it starts the new process and the new process
641 // has completed running DYNINSTinit).
643 void dynRPCUser::newProgramCallbackFunc(int pid,
644 vector<string> argvString,
649 // there better be a paradynd running on this machine!
651 for (unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++) {
652 paradynDaemon *pd = paradynDaemon::allDaemons[i];
653 if (pd->machine.length() && (pd->machine == machine_name)){
654 if (!paradynDaemon::addRunningProgram(pid, argvString, pd, calledFromExec,
658 uiMgr->enablePauseOrRun();
664 // for now, abort if there is no paradynd, this should not happen
665 printf("process started on %s, can't find paradynd there\n",
666 machine_name.string_of());
667 printf("paradyn error #1 encountered\n");
671 void dynRPCUser::newMetricCallback(T_dyninstRPC::metricInfo info)
676 void dynRPCUser::firstSampleCallback (int,double) {
678 assert(0 && "Invalid virtual function");
681 void dynRPCUser::cpDataCallbackFunc(int,double,int,double,double)
683 assert(0 && "Invalid virtual function");
686 // batch the sample delivery
687 void dynRPCUser::batchSampleDataCallbackFunc(int,
688 vector<T_dyninstRPC::batch_buffer_entry>)
690 assert(0 && "Invalid virtual function");
693 // batch the trace delivery
694 void dynRPCUser::batchTraceDataCallbackFunc(int,
695 vector<T_dyninstRPC::trace_batch_buffer_entry>)
697 assert(0 && "Invalid virtual function");
701 // When a paradynd is started remotely, ie not by paradyn, this upcall
702 // reports the information for that paradynd to paradyn
705 dynRPCUser::reportSelf (string , string , int , string)
712 dynRPCUser::reportStatus (string)
714 assert(0 && "Invalid virtual function");
718 dynRPCUser::processStatus(int, u_int)
720 assert(0 && "Invalid virtual function");
724 dynRPCUser::endOfDataCollection(int)
726 assert(0 && "Invalid virtual function");
731 // establish socket that will be advertised to paradynd's
732 // this socket will allow paradynd's to connect to paradyn for pvm
735 DMsetupSocket (int &sockfd)
737 // setup "well known" socket for pvm paradynd's to connect to
739 aflag = ((dataManager::dm->socket =
740 RPC_setup_socket (sockfd, AF_INET, SOCK_STREAM)) >= 0);
743 // bind fd for this thread
744 msg_bind (sockfd, true);
748 void dataManager::printDaemonStartInfo() {
749 string msg = string("To start a paradyn daemon on a remote machine, logon to that machine and run paradynd with the following arguments:\n\n")
750 + string(" paradynd -p") + string(dataManager::dm->socket) + string(" -m")
752 + string(" -l2 -v1 -z<flavor>")
753 + string("\n\nwhere flavor is the one of: unix, pvm, winnt, mpi\n")
754 + string("\nNote: paradyn daemons are usually started automatically.\nManual start-up is needed only when an rshd or rexecd is not available on the remote machine.\n");
756 static char buf[1000];
757 sprintf(buf, "%s", msg.string_of());
758 uiMgr->showError(99, buf);
759 //fprintf(stderr, msg.string_of());
765 // accept the connection
766 int new_fd = RPC_getConnect(dataManager::dm->sock_fd);
768 uiMgr->showError(4, "");
770 // add new daemon to dictionary of all deamons
771 paradynDaemon::addDaemon(new_fd);
774 bool dataManager::DM_sequential_init(const char* met_file){
775 string mfile = met_file;
776 return(metMain(mfile));
779 int dataManager::DM_post_thread_create_init(int tid) {
782 thr_name("Data Manager");
783 dataManager::dm = new dataManager(tid);
785 // supports argv passed to paradynDaemon
786 // new paradynd's may try to connect to well known port
787 DMsetupSocket (dataManager::dm->sock_fd);
790 aflag=(RPC_make_arg_list(paradynDaemon::args,
791 dataManager::dm->socket, 1, 1, "", false));
794 // start initial phase
795 string dm_phase0 = "phase_0";
796 phaseInfo::startPhase(0.0,dm_phase0,false,false);
799 unsigned int msgSize = 64;
800 msg_send (MAINtid, MSG_TAG_DM_READY, (char *) NULL, 0);
801 unsigned int tag = MSG_TAG_ALL_CHILDREN_READY;
802 msg_recv (&tag, DMbuff, &msgSize);
807 // Main loop for the dataManager thread.
809 void *DMmain(void* varg)
811 unsigned fd_first = 0;
812 // We declare the "printChangeCollection" tunable constant here; it will
813 // last for the lifetime of this function, which is pretty much forever.
814 // (used to be declared as global in DMappContext.C. Globally declared
815 // tunables are now a no-no). Note that the variable name (printCC) is
817 tunableBooleanConstantDeclarator printCC("printChangeCollection",
818 "Print the name of metric/focus when enabled or disabled",
819 false, // initial value
823 // Now the same for "printSampleArrival"
824 extern bool our_print_sample_arrival;
825 our_print_sample_arrival = false;
826 extern void printSampleArrivalCallback(bool);
827 tunableBooleanConstantDeclarator printSA("printSampleArrival",
828 "Print out status lines to show the arrival of samples",
829 our_print_sample_arrival, // init val
830 printSampleArrivalCallback,
833 int tid; memcpy((void*)&tid,varg, sizeof(int));
834 dataManager::DM_post_thread_create_init(tid);
838 paradynDaemon *pd = NULL;
840 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
841 pd = paradynDaemon::allDaemons[i];
842 // handle up to max async requests that may have been buffered
843 // while blocking on a sync request
844 while (pd->buffered_requests()){
845 if(pd->process_buffered() == T_dyninstRPC::error) {
846 cout << "error on paradyn daemon\n";
847 paradynDaemon::removeDaemon(pd, true);
851 // ret = msg_poll(&tag, true);
852 ret = msg_poll_preference(&tag, true,fd_first);
853 fd_first = !fd_first;
854 assert(ret != THR_ERR);
856 if (tag == MSG_TAG_FILE) {
857 // must be an upcall on something speaking the dynRPC protocol.
858 if (ret == dataManager::dm->sock_fd){
859 DMnewParadynd(); // set up a new daemon
862 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
863 pd = paradynDaemon::allDaemons[i];
864 if(pd->get_fd() == ret){
865 if(pd->waitLoop() == T_dyninstRPC::error) {
866 cout << "error on paradyn daemon\n";
867 paradynDaemon::removeDaemon(pd, true);
870 // handle async requests that may have been buffered
871 // while blocking on a sync request
872 while(pd->buffered_requests()){
873 if(pd->process_buffered() == T_dyninstRPC::error) {
874 cout << "error on paradyn daemon\n";
875 paradynDaemon::removeDaemon(pd, true);
879 } else if (dataManager::dm->isValidTag
880 ((T_dataManager::message_tags)tag)) {
881 if (dataManager::dm->waitLoop(true,
882 (T_dataManager::message_tags)tag) == T_dataManager::error) {
887 cerr << "Unrecognized message in DMmain.C\n";
894 void addMetric(T_dyninstRPC::metricInfo &info)
896 // if metric already exists return
897 if(metric::allMetrics.defines(info.name)){
900 metric *met = new metric(info);
902 // now tell all perfStreams
903 dictionary_hash_iter<perfStreamHandle,performanceStream*>
904 allS(performanceStream::allStreams);
906 performanceStream *ps;
907 while(allS.next(h,ps)){
908 if(ps->controlFunc.mFunc){
909 // set the correct destination thread.
910 dataManager::dm->setTid(ps->threadId);
911 dataManager::dm->newMetricDefined(ps->controlFunc.mFunc,
918 met->getUnitsType());
924 // I don't want to parse for '/' more than once, thus the use of a string vector
925 resourceHandle createResource(unsigned res_id, vector<string>& resource_name, string& abstr, unsigned type) {
927 static const string slashStr = "/";
928 static const string baseStr = "BASE";
930 resource *parent = NULL;
931 unsigned r_size = resource_name.size();
937 // Should this case ever occur ?
940 parent = resource::rootResource; break;
942 for (unsigned ri=0; ri<(r_size-1); ri++)
943 p_name += slashStr + resource_name[ri];
944 parent = resource::string_to_resource(p_name);
948 if (!parent) assert(0);
951 /* first check to see if the resource has already been defined */
952 resource *p = resource::resources[parent->getHandle()];
953 string myName = p_name;
955 myName += resource_name[r_size - 1];
958 if (resource::allResources.find(myName.string_of(), child)) {
959 return child->getHandle();
962 // if abstr is not defined then use default abstraction
963 if(!abstr.string_of()){
968 resource *ret = new resource(parent->getHandle(),res_id, resource_name,
971 // check to see if the suppressMagnify option should be set...if
972 // this resource is specifed in the mdl exclude_lib option
973 vector<string> shared_lib_constraints;
974 if(resource::get_lib_constraints(shared_lib_constraints) &&
975 (string(parent->getFullName()) == "/Code")) {
976 for(u_int i=0; i < shared_lib_constraints.size(); i++){
977 if(shared_lib_constraints[i] == ret->getName()){
978 ret->setSuppressMagnify();
983 // check to see if the suppressMagnify option should be set if
984 // the resource is a function that is specified in the mdl exclude_func
986 if(!ret->isMagnifySuppressed()){
987 if(parent != resource::rootResource) {
988 // get parent of parent, if it is "/Code" then check the
989 // list of exculded_funcs
990 resourceHandle pph = parent->getParent();
991 resource *ppr = resource::handle_to_resource(pph);
992 if( ppr && (string(ppr->getFullName()) == "/Code")) {
993 vector< vector<string> > libs;
994 if(resource::get_func_constraints(libs)) {
995 for(u_int i=0; i < libs.size(); i++){
996 if(((libs[i])[0] == parent->getName()) &&
997 ((libs[i])[1] == ret->getName())) {
998 ret->setSuppressMagnify();
1006 /* inform others about it if they need to know */
1007 dictionary_hash_iter<perfStreamHandle,performanceStream*>
1008 allS(performanceStream::allStreams);
1010 performanceStream *ps;
1011 resourceHandle r_handle = ret->getHandle();
1012 string name = ret->getFullName();
1013 while(allS.next(h,ps)){
1014 ps->callResourceFunc(parent->getHandle(),r_handle,ret->getFullName(),
1015 ret->getAbstractionName());
1020 resourceHandle createResource_ncb(vector<string>& resource_name, string& abstr, unsigned type,
1021 resourceHandle &p_handle, bool &exist
1024 resource *parent = NULL;
1025 unsigned r_size = resource_name.size();
1031 // Should this case ever occur ?
1034 parent = resource::rootResource; break;
1036 for (unsigned ri=0; ri<(r_size-1); ri++)
1037 p_name += string("/") + resource_name[ri];
1038 parent = resource::string_to_resource(p_name);
1042 if (!parent) assert(0);
1045 /* first check to see if the resource has already been defined */
1046 p_handle = parent->getHandle() ;
1047 resource *p = resource::resources[parent->getHandle()];
1048 string myName = p_name;
1050 myName += resource_name[r_size - 1];
1052 resourceHandle *child = p->findChild(myName.string_of());
1061 // if abstr is not defined then use default abstraction
1062 if(!abstr.string_of()){
1063 abstr = string("BASE");
1066 /* then create it */
1067 resource *ret = new resource(parent->getHandle(),resource_name,
1068 myName,abstr, type);
1070 resourceHandle r_handle = ret->getHandle() ;
1074 void newSampleRate(float rate)
1076 paradynDaemon *pd = NULL;
1077 for(unsigned i = 0; i < paradynDaemon::allDaemons.size(); i++){
1078 pd = paradynDaemon::allDaemons[i];
1079 pd->setSampleRate(rate);
1084 // Note - the metric parser has been moved into the dataManager
1085 bool parse_metrics(string metric_file) {
1086 bool parseResult = metMain(metric_file);