2 * DMmain.C: main loop of the Data Manager thread.
5 * Revision 1.36 1994/07/05 03:27:17 hollings
6 * added observed cost model.
8 * Revision 1.35 1994/07/02 01:43:11 markc
9 * Removed all uses of type aggregation from enableDataCollection.
10 * The metricInfo structure now contains the aggregation operator.
12 * Revision 1.34 1994/06/29 02:55:59 hollings
13 * fixed code to remove instrumenation when done with it.
15 * Revision 1.33 1994/06/27 21:23:25 rbi
16 * Abstraction-specific resources and mapping info
18 * Revision 1.32 1994/06/27 18:54:48 hollings
19 * changed stdio printf for paradynd.
21 * Revision 1.31 1994/06/17 22:07:59 hollings
22 * Added code to provide upcall for resource batch mode when a large number
23 * of resources is about to be added.
25 * Revision 1.30 1994/06/14 15:22:46 markc
26 * Added arg to enableDataCollection call to support aggregation.
28 * Revision 1.29 1994/06/02 23:25:19 markc
29 * Added virtual function 'handle_error' to pardynDaemon class which uses the
30 * error handling features that igen provides.
32 * Revision 1.28 1994/05/31 18:26:15 markc
33 * strdup'd a string passed into createResource, since igen will free the memory
34 * for the string on return from the function.
36 * Revision 1.27 1994/05/18 02:51:04 hollings
37 * fixed cast one return of malloc.
39 * Revision 1.26 1994/05/18 00:43:28 hollings
40 * added routine to print output of stdout.
42 * Revision 1.25 1994/05/17 00:16:38 hollings
43 * Changed process id speperator from [] to {} to get around braindead tcl.
45 * Revision 1.24 1994/05/16 22:31:38 hollings
46 * added way to request unique resource name.
48 * Revision 1.23 1994/05/12 23:34:00 hollings
49 * made path to paradyn.h relative.
51 * Revision 1.22 1994/05/10 03:57:37 hollings
52 * Changed data upcall to return array of buckets.
54 * Revision 1.21 1994/05/09 20:56:20 hollings
55 * added changeState callback.
57 * Revision 1.20 1994/05/02 20:37:45 hollings
58 * Fixed compiler warning.
60 * Revision 1.19 1994/04/21 23:24:26 hollings
61 * removed process name from calls to RPC_make_arg_list.
63 * Revision 1.18 1994/04/20 15:30:10 hollings
64 * Added error numbers.
65 * Added data manager function to get histogram buckets.
67 * Revision 1.17 1994/04/18 22:28:31 hollings
68 * Changes to create a canonical form of a resource list.
70 * Revision 1.16 1994/04/12 22:33:34 hollings
71 * Fixed casts back to time64 which were dropping off the fraction of seconds
72 * in the timestamps of samples.
74 * Revision 1.15 1994/04/12 15:32:00 hollings
75 * added tunable constant samplingRate to control the frequency of sampling.
77 * Revision 1.14 1994/04/11 23:18:49 hollings
78 * added checks to make sure time moves forward.
80 * Revision 1.13 1994/04/04 21:36:12 newhall
81 * added synchronization code to DM thread startup
83 * Revision 1.12 1994/04/01 20:17:22 hollings
84 * Added init of well known socket fd global.
86 * Revision 1.11 1994/03/25 22:59:33 hollings
87 * Made the data manager tolerate paraynd's dying.
89 * Revision 1.10 1994/03/24 16:41:20 hollings
90 * Added support for multiple paradynd's at once.
92 * Revision 1.9 1994/03/21 20:32:48 hollings
93 * Changed the mid to mi mapping to be per paradyn daemon. This is required
94 * because mids are asigned by the paradynd's, and are not globally unique.
96 * Revision 1.8 1994/03/20 01:49:48 markc
97 * Gave process structure a buffer to allow multiple writers. Added support
98 * to register name of paradyn daemon. Changed addProcess to return type int.
100 * Revision 1.7 1994/03/08 17:39:33 hollings
101 * Added foldCallback and getResourceListName.
103 * Revision 1.6 1994/02/25 20:58:11 markc
104 * Added support for storing paradynd's pids.
106 * Revision 1.5 1994/02/24 04:36:31 markc
107 * Added an upcall to dyninstRPC.I to allow paradynd's to report information at
108 * startup. Added a data member to the class that igen generates.
109 * Make depend differences due to new header files that igen produces.
110 * Added support to allow asynchronous starts of paradynd's. The dataManager has
111 * an advertised port that new paradynd's can connect to.
113 * Revision 1.4 1994/02/08 21:05:55 hollings
114 * Found a few pointer problems.
116 * Revision 1.3 1994/02/03 23:26:58 hollings
117 * Changes to work with g++ version 2.5.2.
119 * Revision 1.2 1994/02/02 00:42:33 hollings
120 * Changes to the Data manager to reflect the file naming convention and
121 * to support the integration of the Performance Consultant.
123 * Revision 1.1 1994/01/28 01:34:17 hollings
124 * The initial version of the Data Management thread.
131 double quiet_nan(int unused);
133 #include "thread/h/thread.h"
136 #include "util/h/tunableConst.h"
137 #include "dataManager.SRVR.h"
138 #include "dyninstRPC.CLNT.h"
139 #include "DMinternals.h"
140 #include "../pdMain/paradyn.h"
142 static dataManager *dm;
143 stringPool metric::names;
144 HTable<metric *> metric::allMetrics;
145 List<paradynDaemon*> paradynDaemon::allDaemons;
147 void newSampleRate(float rate);
149 tunableConstant samplingRate(0.5, 0.0, 1000.0, newSampleRate, "samplingRate",
150 "how often to sample intermediate performance data (in seconds)");
152 metricInstance *performanceStream::enableDataCollection(resourceList *rl,
158 if (!m || !rl) return(NULL);
160 name = rl->getCanonicalName();
161 mi = m->enabledCombos.find(name);
166 mi = appl->enableDataCollection(rl, m);
175 // Turn off data collection for this perf stream. Other streams may still
178 void performanceStream::disableDataCollection(metricInstance *mi)
181 mi->users.remove(this);
183 appl->disableDataCollection(mi);
188 void performanceStream::enableResourceCreationNotification(resource *r)
193 void performanceStream::disableResourceCreationNotification(resource *r)
195 r->notify.remove(this);
198 void performanceStream::callSampleFunc(metricInstance *mi,
199 sampleValue *buckets,
203 if (dataFunc.sample) {
204 dm->setTid(threadId);
205 dm->newPerfData(dataFunc.sample, this, mi, buckets, count, first);
209 void performanceStream::callResourceFunc(resource *p,
213 if (controlFunc.rFunc) {
214 dm->setTid(threadId);
215 dm->newResourceDefined(controlFunc.rFunc, this, p, c, name);
219 void performanceStream::callResourceBatchFunc(batchMode mode)
221 if (controlFunc.bFunc) {
222 dm->setTid(threadId);
223 dm->changeResourceBatchMode(controlFunc.bFunc, this, mode);
227 void performanceStream::callFoldFunc(timeStamp width)
229 if (controlFunc.fFunc) {
230 dm->setTid(threadId);
231 dm->histFold(controlFunc.fFunc, this, width);
236 void performanceStream::callStateFunc(appState state)
238 if (controlFunc.sFunc) {
239 dm->setTid(threadId);
240 dm->changeState(controlFunc.sFunc, this, state);
245 // IO from application processes.
247 void dynRPCUser::applicationIO(int pid, int len, String data)
251 // extra should really be per process.
255 ptr = strchr(rest, '\n');
259 printf("pid %d:", pid);
261 printf("paradynd:", pid);
268 printf("%s\n", rest);
270 ptr = strchr(rest, '\n');
272 extra = (char *) malloc(strlen(rest)+1);
276 abstractionType parseAbstractionName(String abstraction)
278 if (!abstraction) return BASE;
279 if (strcmp(abstraction,"TCL") == 0) return TCL;
280 if (strcmp(abstraction,"CMF") == 0) return CMF;
281 if (strcmp(abstraction,"BASE") == 0) return BASE;
282 printf("DATAMANGER: bad abstraction '%s'\n",abstraction);
287 // upcalls from remote process.
289 void dynRPCUser::resourceInfoCallback(int program,
298 // create the resource.
299 if (*parentString != '\0') {
301 parentString = resource::names.findAndAdd(parentString);
302 parent = resource::allResources.find(parentString);
303 if (!parent) abort();
305 parent = resource::rootResource;
309 at = parseAbstractionName(abstraction);
312 createResource(parent, name, at);
315 void dynRPCUser::mappingInfoCallback(int program,
322 // List<performanceStream *> curr;
323 // performanceStream *stream;
325 /* printf("DATAMANAGER: '%s' '%s' map '%s -> %s'\n", abstraction,
326 type, key, value); */
327 /* for (curr = applicationContext::streams; stream = *curr; curr++) {
328 Make sure stream is of right abstraction
329 stream->callMappingFunc(abstraction, type, key, value);
337 uniqueName(char *base) { name = base; nextId = 0; }
343 String dynRPCUser::getUniqueResource(int program,
350 static List<uniqueName*> allUniqueNames;
352 sprintf(newName, "%s/%s", parentString, newResource);
353 ptr = resource::names.findAndAdd(newName);
355 ret = allUniqueNames.find(ptr);
358 ret = new uniqueName(ptr);
359 allUniqueNames.add(ret, ptr);
361 // changed from [] to {} due to TCL braindeadness.
362 sprintf(newName, "%s{%d}", newResource, ret->nextId++);
363 ptr = resource::names.findAndAdd(newName);
369 // used when a new program gets forked.
371 void dynRPCUser::newProgramCallbackFunc(int pid,
373 String_Array argvString,
377 paradynDaemon *daemon;
378 List<paradynDaemon*> curr;
381 // there better be a paradynd running on this machine!
382 for (curr=paradynDaemon::allDaemons, daemon = NULL; *curr; curr++) {
383 if (!strcmp((*curr)->machine, machine_name))
386 // for now, abort if there is no paradynd, this should not happen
388 printf("process started on %s, can't find paradynd there\n",
390 printf("paradyn error #1 encountered\n");
393 argv = (char **) malloc (argvString.count);
395 printf(" cannot malloc memory in newProgramCallbackFunc\n");
398 for (i=0; i<argvString.count; ++i) {
399 argv[i] = strdup(argvString.data[i]);
401 printf(" cannot malloc memory in newProgramCallbackFunc\n");
406 assert (dm->appContext);
407 assert (!dm->appContext->addRunningProgram(pid, argc, argv, daemon));
410 void dynRPCUser::newMetricCallback(metricInfo info)
415 void dynRPCUser::sampleDataCallbackFunc(int program,
417 double startTimeStamp,
421 assert(0 && "Invalid virtual function");
424 void paradynDaemon::sampleDataCallbackFunc(int program,
426 double startTimeStamp,
432 struct sampleInterval ret;
434 // printf("mid %d %f from %f to %f\n", mid, value, startTimeStamp, endTimeStamp);
435 mi = activeMids.find((void*) mid);
437 printf("ERROR: data for unknown mid: %d\n", mid);
438 printf("paradyn Error #2\n");
442 if (mi->components.count() != 1) {
443 // find the right component.
444 part = mi->components.find(this);
447 printf("Unable to find component!!!\n");
448 printf("paradyn Error #3\n");
451 ret = part->sample.newValue(endTimeStamp, value);
453 ret = mi->sample.newValue(mi->parts, endTimeStamp, value);
456 assert(ret.end >= 0.0);
457 assert(ret.start >= 0.0);
458 assert(ret.end >= ret.start);
459 mi->enabledTime += ret.end - ret.start;
460 mi->data->addInterval(ret.start, ret.end, ret.value, FALSE);
465 // paradyn daemon should never go away. This represents an error state
466 // due to a paradynd being killed for some reason.
468 paradynDaemon::~paradynDaemon() {
472 HTable<metricInstance*> curr;
474 allDaemons.remove(this);
476 // remove the metric ID as required.
477 for (curr = activeMids; mi = *curr; curr++) {
478 mi->parts.remove(this);
479 mi->components.remove(this);
482 printf("Inconsistant state\n");
487 // When a paradynd is started remotely, ie not by paradyn, this upcall
488 // reports the information for that paradynd to paradyn
491 dynRPCUser::reportSelf (String m, String p, int pd)
498 // When an error is determined on an igen call, this function is
499 // called, since the default error handler will exit, and we don't
500 // want paradyn to exit.
502 void paradynDaemon::handle_error()
504 dm->appContext->removeDaemon(this, TRUE);
508 // When a paradynd is started remotely, ie not by paradyn, this upcall
509 // reports the information for that paradynd to paradyn
512 paradynDaemon::reportSelf (String m, String p, int pd)
521 // establish socket that will be advertised to paradynd's
522 // this socket will allow paradynd's to connect to paradyn for pvm
525 DMsetupSocket (int *sockfd, int *known_sock)
527 // setup "well known" socket for pvm paradynd's to connect to
528 assert ((*known_sock =
529 RPC_setup_socket (sockfd, AF_INET, SOCK_STREAM)) >= 0);
531 // this info is needed to create argument list for other paradynds
532 dm->socket = *known_sock;
533 dm->sock_fd = *sockfd;
535 // bind fd for this thread
536 msg_bind (*sockfd, TRUE);
540 DMnewParadynd (int sockfd, dataManager *dm)
544 // accept the connection
545 new_fd = RPC_getConnect(sockfd);
547 printf ("unable to connect to new paradynd\n");
548 printf("paradyn Error #4\n");
552 assert (dm->appContext);
553 assert (!dm->appContext->addDaemon(new_fd));
557 // Main loop for the dataManager thread.
559 void *DMmain(int arg)
563 List<paradynDaemon*> curr;
564 int known_sock, sockfd;
566 unsigned int msgSize = 64;
568 thr_name("Data Manager");
570 dm = new dataManager(arg);
571 // this will be set on addExecutable
574 // supports argv passed to paradynDaemon
575 // new paradynd's may try to connect to well known port
576 DMsetupSocket (&sockfd, &known_sock);
577 dynRPCUser::__wellKnownPortFd__ = sockfd;
579 paradynDaemon::args =
580 RPC_make_arg_list(AF_INET, SOCK_STREAM, known_sock, 1);
582 msg_send (MAINtid, MSG_TAG_DM_READY, (char *) NULL, 0);
583 tag = MSG_TAG_ALL_CHILDREN_READY;
584 msg_recv (&tag, DMbuff, &msgSize);
588 ret = msg_poll(&tag, TRUE);
589 assert(ret != THR_ERR);
591 if (tag == MSG_TAG_FILE) {
592 // must be an upcall on something speaking the dynRPC protocol.
593 for (curr = paradynDaemon::allDaemons; *curr; curr++) {
594 if ((*curr)->fd == ret) {
595 (*curr)->awaitResponce(-1);
599 DMnewParadynd(sockfd, dm); // set up a new paradynDaemon
607 void addMetric(metricInfo info)
611 performanceStream *stream;
612 List<performanceStream *> curr;
614 iName = metric::names.findAndAdd(info.name);
616 met = metric::allMetrics.find(iName);
618 // check that it is compatible ????
625 met = new metric(info);
626 metric::allMetrics.add(met, iName);
629 // now tell all perfStreams
631 for (curr = applicationContext::streams; *curr; curr++) {
633 if (stream->controlFunc.mFunc) {
634 // set the correct destination thread.
635 dm->setTid(stream->threadId);
636 dm->newMetricDefined(stream->controlFunc.mFunc, stream, met);
642 resource *createResource(resource *p, char *newResource, abstractionType at)
647 performanceStream *stream;
648 List<performanceStream *> curr;
650 /* first check to see if the resource has already been defined */
651 temp = p->children.find(newResource);
652 if (temp) return(temp);
655 ret = new resource(p, strdup(newResource), at);
656 fullName = ret->getFullName();
658 /* inform others about it if they need to know */
659 for (curr = applicationContext::streams; stream = *curr; curr++) {
660 if (stream->getAbstraction() == at) {
661 stream->callResourceFunc(p, ret, fullName);
668 void newSampleRate(float rate)
670 List<paradynDaemon*> curr;
672 for (curr = paradynDaemon::allDaemons; *curr; curr++) {
673 (*curr)->setSampleRate(rate);