added way to request unique resource name.
[dyninst.git] / paradyn / src / DMthread / DMmain.C
1 /*
2  * DMmain.C: main loop of the Data Manager thread.
3  *
4  * $Log: DMmain.C,v $
5  * Revision 1.24  1994/05/16 22:31:38  hollings
6  * added way to request unique resource name.
7  *
8  * Revision 1.23  1994/05/12  23:34:00  hollings
9  * made path to paradyn.h relative.
10  *
11  * Revision 1.22  1994/05/10  03:57:37  hollings
12  * Changed data upcall to return array of buckets.
13  *
14  * Revision 1.21  1994/05/09  20:56:20  hollings
15  * added changeState callback.
16  *
17  * Revision 1.20  1994/05/02  20:37:45  hollings
18  * Fixed compiler warning.
19  *
20  * Revision 1.19  1994/04/21  23:24:26  hollings
21  * removed process name from calls to RPC_make_arg_list.
22  *
23  * Revision 1.18  1994/04/20  15:30:10  hollings
24  * Added error numbers.
25  * Added data manager function to get histogram buckets.
26  *
27  * Revision 1.17  1994/04/18  22:28:31  hollings
28  * Changes to create a canonical form of a resource list.
29  *
30  * Revision 1.16  1994/04/12  22:33:34  hollings
31  * Fixed casts back to time64 which were dropping off the fraction of seconds
32  * in the timestamps of samples.
33  *
34  * Revision 1.15  1994/04/12  15:32:00  hollings
35  * added tunable constant samplingRate to control the frequency of sampling.
36  *
37  * Revision 1.14  1994/04/11  23:18:49  hollings
38  * added checks to make sure time moves forward.
39  *
40  * Revision 1.13  1994/04/04  21:36:12  newhall
41  * added synchronization code to DM thread startup
42  *
43  * Revision 1.12  1994/04/01  20:17:22  hollings
44  * Added init of well known socket fd global.
45  *
46  * Revision 1.11  1994/03/25  22:59:33  hollings
47  * Made the data manager tolerate paraynd's dying.
48  *
49  * Revision 1.10  1994/03/24  16:41:20  hollings
50  * Added support for multiple paradynd's at once.
51  *
52  * Revision 1.9  1994/03/21  20:32:48  hollings
53  * Changed the mid to mi mapping to be per paradyn daemon.  This is required
54  * because mids are asigned by the paradynd's, and are not globally unique.
55  *
56  * Revision 1.8  1994/03/20  01:49:48  markc
57  * Gave process structure a buffer to allow multiple writers.  Added support
58  * to register name of paradyn daemon.  Changed addProcess to return type int.
59  *
60  * Revision 1.7  1994/03/08  17:39:33  hollings
61  * Added foldCallback and getResourceListName.
62  *
63  * Revision 1.6  1994/02/25  20:58:11  markc
64  * Added support for storing paradynd's pids.
65  *
66  * Revision 1.5  1994/02/24  04:36:31  markc
67  * Added an upcall to dyninstRPC.I to allow paradynd's to report information at
68  * startup.  Added a data member to the class that igen generates.
69  * Make depend differences due to new header files that igen produces.
70  * Added support to allow asynchronous starts of paradynd's.  The dataManager has
71  * an advertised port that new paradynd's can connect to.
72  *
73  * Revision 1.4  1994/02/08  21:05:55  hollings
74  * Found a few pointer problems.
75  *
76  * Revision 1.3  1994/02/03  23:26:58  hollings
77  * Changes to work with g++ version 2.5.2.
78  *
79  * Revision 1.2  1994/02/02  00:42:33  hollings
80  * Changes to the Data manager to reflect the file naming convention and
81  * to support the integration of the Performance Consultant.
82  *
83  * Revision 1.1  1994/01/28  01:34:17  hollings
84  * The initial version of the Data Management thread.
85  *
86  *
87  */
88 #include <assert.h>
89 extern "C" {
90 #include <math.h>
91 double   quiet_nan(int unused);
92 #include <malloc.h>
93 #include "thread/h/thread.h"
94 }
95
96 #include "util/h/tunableConst.h"
97 #include "dataManager.SRVR.h"
98 #include "dyninstRPC.CLNT.h"
99 #include "DMinternals.h"
100 #include "../pdMain/paradyn.h"
101
102 static dataManager *dm;
103 stringPool metric::names;
104 HTable<metric *> metric::allMetrics;
105 List<paradynDaemon*> paradynDaemon::allDaemons;
106
107 void newSampleRate(float rate);
108
109 tunableConstant samplingRate(0.5, 0.0, 1000.0, newSampleRate, "samplingRate",
110    "how often to sample intermediate performance data (in seconds)");
111
112 metricInstance *performanceStream::enableDataCollection(resourceList *rl, 
113                                                         metric *m)
114 {
115     char *name;
116     metricInstance *mi;
117
118     if (!m || !rl) return(NULL);
119
120     name = rl->getCanonicalName();
121     mi = m->enabledCombos.find(name);
122     if (mi) {
123         mi->count++;
124         mi->users.add(this);
125     } else {
126         mi = appl->enableDataCollection(rl, m);
127         if (mi) mi->users.add(this);
128     }
129     return(mi);
130 }
131
132 //
133 // Turn off data collection for this perf stream.  Other streams may still
134 //    get the data.
135 //
136 void performanceStream::disableDataCollection(metricInstance *mi)
137 {
138     mi->count--;
139     mi->users.remove(this);
140     if (!mi->count) {
141         appl->disableDataCollection(mi);
142     }
143 }
144
145
146 void performanceStream::enableResourceCreationNotification(resource *r)
147 {
148     r->notify.add(this);
149 }
150
151 void performanceStream::disableResourceCreationNotification(resource *r)
152 {
153     r->notify.remove(this);
154 }
155
156 void performanceStream::callSampleFunc(metricInstance *mi,
157                                        sampleValue *buckets,
158                                        int count,
159                                        int first)
160 {
161     if (dataFunc.sample) {
162         dm->setTid(threadId);
163         dm->newPerfData(dataFunc.sample, this, mi, buckets, count, first);
164     }
165 }
166
167 void performanceStream::callResourceFunc(resource *p,
168                                          resource *c,
169                                          char *name)
170 {
171     if (controlFunc.rFunc) {
172         dm->setTid(threadId);
173         dm->newResourceDefined(controlFunc.rFunc, this, p, c, name);
174     }
175 }
176
177
178 void performanceStream::callFoldFunc(timeStamp width)
179 {
180     if (controlFunc.fFunc) {
181         dm->setTid(threadId);
182         dm->histFold(controlFunc.fFunc, this, width);
183     }
184 }
185
186
187 void performanceStream::callStateFunc(appState state)
188 {
189     if (controlFunc.sFunc) {
190         dm->setTid(threadId);
191         dm->changeState(controlFunc.sFunc, this, state);
192     }
193 }
194
195 //
196 // upcalls from remote process.
197 //
198 void dynRPCUser::resourceInfoCallback(int program,
199                                       String parentString,
200                                       String newResource,
201                                       String name)
202 {
203     resource *parent;
204
205     // create the resource.
206     if (*parentString != '\0') {
207         // non-null string.
208         parentString = resource::names.findAndAdd(parentString);
209         parent = resource::allResources.find(parentString);
210         if (!parent) abort();
211     } else {
212         parent = resource::rootResource;
213     }
214
215     createResource(parent, name);
216 }
217
218 class uniqueName {
219   public:
220     uniqueName(char *base) { name = base; nextId = 0; }
221     int nextId;
222     char *name;
223 };
224
225
226 String dynRPCUser::getUniqueResource(int program, 
227                                      String parentString, 
228                                      String newResource)
229 {
230     char *ptr;
231     uniqueName *ret;
232     char newName[80];
233     static List<uniqueName*> allUniqueNames;
234
235     sprintf(newName, "%s/%s", parentString, newResource);
236     ptr = resource::names.findAndAdd(newName);
237
238     ret = allUniqueNames.find(ptr);
239
240     if (!ret) {
241         ret = new uniqueName(ptr);
242         allUniqueNames.add(ret, ptr);
243     }
244     sprintf(newName, "%s[%d]", newResource, ret->nextId++);
245     ptr = resource::names.findAndAdd(newName);
246
247     return(ptr);
248 }
249
250 //
251 // used when a new program gets forked.
252 //
253 void dynRPCUser::newProgramCallbackFunc(int pid,
254                                         int argc, 
255                                         String_Array argvString,
256                                         String machine_name)
257 {
258      char **argv;
259      paradynDaemon *daemon;
260      List<paradynDaemon*> curr;
261      int i;
262
263     // there better be a paradynd running on this machine!
264     for (curr=paradynDaemon::allDaemons, daemon = NULL; *curr; curr++) {
265         if (!strcmp((*curr)->machine, machine_name))
266             daemon = *curr;
267     }
268     // for now, abort if there is no paradynd, this should not happen
269     if (!daemon) {
270         printf("process started on %s, can't find paradynd there\n",
271                 machine_name);
272         printf("paradyn error #1 encountered\n");
273         exit(-1);
274     }
275    argv = (char **) malloc (argvString.count);
276    if (!argv) {
277         printf(" cannot malloc memory in newProgramCallbackFunc\n");
278         exit(-1);
279    }
280    for (i=0; i<argvString.count; ++i) {
281         argv[i] = strdup(argvString.data[i]);
282         if (!argv[i]) {
283                 printf(" cannot malloc memory in newProgramCallbackFunc\n");
284                 exit(-1);
285         }
286    }
287       
288    assert (dm->appContext);
289    assert (!dm->appContext->addRunningProgram(pid, argc, argv, daemon));
290 }
291
292 void dynRPCUser::newMetricCallback(metricInfo info)
293 {
294     addMetric(info);
295 }
296
297 void dynRPCUser::sampleDataCallbackFunc(int program,
298                                            int mid,
299                                            double startTimeStamp,
300                                            double endTimeStamp,
301                                            double value)
302 {
303     assert(0 && "Invalid virtual function");
304 }
305
306 void paradynDaemon::sampleDataCallbackFunc(int program,
307                                            int mid,
308                                            double startTimeStamp,
309                                            double endTimeStamp,
310                                            double value)
311 {
312     component *part;
313     metricInstance *mi;
314     struct sampleInterval ret;
315
316     // printf("mid %d %f from %f to %f\n", mid, value, startTimeStamp, endTimeStamp);
317     mi = activeMids.find((void*) mid);
318     if (!mi) {
319         printf("ERROR: data for unknown mid: %d\n", mid);
320         printf("paradyn Error #2\n");
321         exit(-1);
322     }
323
324     if (mi->components.count() != 1) {
325         // find the right component.
326         part = mi->components.find(this);
327
328         if (!part) {
329             printf("Unable to find component!!!\n");
330             printf("paradyn Error #3\n");
331             exit(-1);
332         }
333         ret = part->sample.newValue(endTimeStamp, value);
334     }
335     ret = mi->sample.newValue(mi->parts, endTimeStamp, value);
336
337     if (ret.valid) {
338         assert(ret.end >= 0.0);
339         assert(ret.start >= 0.0);
340         assert(ret.end >= ret.start);
341         mi->enabledTime += ret.end - ret.start;
342         mi->data->addInterval(ret.start, ret.end, ret.value, FALSE);
343     }
344 }
345
346 paradynDaemon::~paradynDaemon() {
347     metricInstance *mi;
348     HTable<metricInstance*> curr;
349
350     allDaemons.remove(this);
351
352     // remove the metric ID as required.
353     for (curr = activeMids; mi = *curr; curr++) {
354         mi->parts.remove(this);
355         mi->components.remove(this);
356     }
357 }
358
359 //
360 // When a paradynd is started remotely, ie not by paradyn, this upcall
361 // reports the information for that paradynd to paradyn
362 //
363 void 
364 dynRPCUser::reportSelf (String m, String p, int pd)
365 {
366   assert(0);
367   return;
368 }
369
370 //
371 // When a paradynd is started remotely, ie not by paradyn, this upcall
372 // reports the information for that paradynd to paradyn
373 //
374 void 
375 paradynDaemon::reportSelf (String m, String p, int pd)
376 {
377   machine = strdup(m);
378   program = strdup(p);
379   my_pid = pd;
380   return;
381 }
382
383 // 
384 // establish socket that will be advertised to paradynd's
385 // this socket will allow paradynd's to connect to paradyn for pvm
386 //
387 static void
388 DMsetupSocket (int *sockfd, int *known_sock)
389 {
390   // setup "well known" socket for pvm paradynd's to connect to
391   assert ((*known_sock =
392            RPC_setup_socket (sockfd, AF_INET, SOCK_STREAM)) >= 0);
393
394   // this info is needed to create argument list for other paradynds
395   dm->socket = *known_sock;
396   dm->sock_fd = *sockfd;
397
398   // bind fd for this thread
399   msg_bind (*sockfd, TRUE);
400 }
401
402 static void
403 DMnewParadynd (int sockfd, dataManager *dm)
404 {
405   int new_fd;
406
407   // accept the connection
408   new_fd = RPC_getConnect(sockfd);
409   if (new_fd < 0) {
410     printf ("unable to connect to new paradynd\n");
411     printf("paradyn Error #4\n");
412     exit(-1);
413   }
414
415   assert (dm->appContext);
416   assert (!dm->appContext->addDaemon(new_fd));
417 }
418
419 //
420 // Main loop for the dataManager thread.
421 //
422 void *DMmain(int arg)
423 {
424     int ret;
425     unsigned int tag;
426     List<paradynDaemon*> curr;
427     int known_sock, sockfd;
428     char DMbuff[64];
429     unsigned int msgSize = 64;
430
431     thr_name("Data Manager");
432
433     dm = new dataManager(arg);
434     // this will be set on addExecutable
435     dm->appContext = 0;
436
437     // supports argv passed to paradynDaemon
438     // new paradynd's may try to connect to well known port
439     DMsetupSocket (&sockfd, &known_sock);
440     dynRPCUser::__wellKnownPortFd__ = sockfd;
441
442     paradynDaemon::args =
443               RPC_make_arg_list(AF_INET, SOCK_STREAM, known_sock, 1);
444
445     msg_send (MAINtid, MSG_TAG_DM_READY, (char *) NULL, 0);
446     tag = MSG_TAG_ALL_CHILDREN_READY;
447     msg_recv (&tag, DMbuff, &msgSize);
448
449     while (1) {
450         tag = MSG_TAG_ANY;
451         ret = msg_poll(&tag, TRUE);
452         assert(ret != THR_ERR);
453
454         if (tag == MSG_TAG_FILE) {
455           // must be an upcall on something speaking the dynRPC protocol.
456           for (curr = paradynDaemon::allDaemons; *curr; curr++) {
457             if ((*curr)->fd == ret) {
458               (*curr)->awaitResponce(-1);
459             }
460           }
461           if (ret == sockfd)
462             DMnewParadynd(sockfd, dm);        // set up a new paradynDaemon
463         } else {
464             dm->mainLoop(); 
465         }
466     }
467 }
468
469
470 void addMetric(metricInfo info)
471 {
472     char *iName;
473     metric *met;
474     performanceStream *stream;
475     List<performanceStream *> curr;
476
477     iName = metric::names.findAndAdd(info.name);
478     assert(iName);
479     met = metric::allMetrics.find(iName);
480     if (met) {
481         // check that it is compatible ????
482         return;
483     }
484
485     //
486     // It's really new 
487     //
488     met = new metric(info);
489     metric::allMetrics.add(met, iName);
490
491     //
492     // now tell all perfStreams
493     //
494     for (curr = applicationContext::streams; *curr; curr++) {
495         stream = *curr;
496         if (stream->controlFunc.mFunc) {
497             // set the correct destination thread.
498             dm->setTid(stream->threadId);
499             dm->newMetricDefined(stream->controlFunc.mFunc, stream, met);
500         }
501     }
502 }
503
504
505 resource *createResource(resource *p, char *newResource)
506 {
507     resource *ret;
508     char *fullName;
509     resource *temp;
510     performanceStream *stream;
511     List<performanceStream *> curr;
512
513     /* first check to see if the resource has already been defined */
514     temp = p->children.find(newResource);
515     if (temp) return(temp);
516
517     /* then create it */
518     ret = new resource(p, newResource);
519     fullName = ret->getFullName();
520
521     /* inform others about it */
522     for (curr = applicationContext::streams; stream = *curr; curr++) {
523         stream->callResourceFunc(p, ret, fullName);
524     }
525
526     return(ret);
527 }
528
529 void newSampleRate(float rate)
530 {
531     List<paradynDaemon*> curr;
532
533     for (curr = paradynDaemon::allDaemons; *curr; curr++) {
534         (*curr)->setSampleRate(rate);
535     }
536 }