added observed cost model.
[dyninst.git] / paradyn / src / DMthread / DMmain.C
1 /*
2  * DMmain.C: main loop of the Data Manager thread.
3  *
4  * $Log: DMmain.C,v $
5  * Revision 1.36  1994/07/05 03:27:17  hollings
6  * added observed cost model.
7  *
8  * Revision 1.35  1994/07/02  01:43:11  markc
9  * Removed all uses of type aggregation from enableDataCollection.
10  * The metricInfo structure now contains the aggregation operator.
11  *
12  * Revision 1.34  1994/06/29  02:55:59  hollings
13  * fixed code to remove instrumenation when done with it.
14  *
15  * Revision 1.33  1994/06/27  21:23:25  rbi
16  * Abstraction-specific resources and mapping info
17  *
18  * Revision 1.32  1994/06/27  18:54:48  hollings
19  * changed stdio printf for paradynd.
20  *
21  * Revision 1.31  1994/06/17  22:07:59  hollings
22  * Added code to provide upcall for resource batch mode when a large number
23  * of resources is about to be added.
24  *
25  * Revision 1.30  1994/06/14  15:22:46  markc
26  * Added arg to enableDataCollection call to support aggregation.
27  *
28  * Revision 1.29  1994/06/02  23:25:19  markc
29  * Added virtual function 'handle_error' to pardynDaemon class which uses the
30  * error handling features that igen provides.
31  *
32  * Revision 1.28  1994/05/31  18:26:15  markc
33  * strdup'd a string passed into createResource, since igen will free the memory
34  * for the string on return from the function.
35  *
36  * Revision 1.27  1994/05/18  02:51:04  hollings
37  * fixed cast one return of malloc.
38  *
39  * Revision 1.26  1994/05/18  00:43:28  hollings
40  * added routine to print output of stdout.
41  *
42  * Revision 1.25  1994/05/17  00:16:38  hollings
43  * Changed process id speperator from [] to {} to get around braindead tcl.
44  *
45  * Revision 1.24  1994/05/16  22:31:38  hollings
46  * added way to request unique resource name.
47  *
48  * Revision 1.23  1994/05/12  23:34:00  hollings
49  * made path to paradyn.h relative.
50  *
51  * Revision 1.22  1994/05/10  03:57:37  hollings
52  * Changed data upcall to return array of buckets.
53  *
54  * Revision 1.21  1994/05/09  20:56:20  hollings
55  * added changeState callback.
56  *
57  * Revision 1.20  1994/05/02  20:37:45  hollings
58  * Fixed compiler warning.
59  *
60  * Revision 1.19  1994/04/21  23:24:26  hollings
61  * removed process name from calls to RPC_make_arg_list.
62  *
63  * Revision 1.18  1994/04/20  15:30:10  hollings
64  * Added error numbers.
65  * Added data manager function to get histogram buckets.
66  *
67  * Revision 1.17  1994/04/18  22:28:31  hollings
68  * Changes to create a canonical form of a resource list.
69  *
70  * Revision 1.16  1994/04/12  22:33:34  hollings
71  * Fixed casts back to time64 which were dropping off the fraction of seconds
72  * in the timestamps of samples.
73  *
74  * Revision 1.15  1994/04/12  15:32:00  hollings
75  * added tunable constant samplingRate to control the frequency of sampling.
76  *
77  * Revision 1.14  1994/04/11  23:18:49  hollings
78  * added checks to make sure time moves forward.
79  *
80  * Revision 1.13  1994/04/04  21:36:12  newhall
81  * added synchronization code to DM thread startup
82  *
83  * Revision 1.12  1994/04/01  20:17:22  hollings
84  * Added init of well known socket fd global.
85  *
86  * Revision 1.11  1994/03/25  22:59:33  hollings
87  * Made the data manager tolerate paraynd's dying.
88  *
89  * Revision 1.10  1994/03/24  16:41:20  hollings
90  * Added support for multiple paradynd's at once.
91  *
92  * Revision 1.9  1994/03/21  20:32:48  hollings
93  * Changed the mid to mi mapping to be per paradyn daemon.  This is required
94  * because mids are asigned by the paradynd's, and are not globally unique.
95  *
96  * Revision 1.8  1994/03/20  01:49:48  markc
97  * Gave process structure a buffer to allow multiple writers.  Added support
98  * to register name of paradyn daemon.  Changed addProcess to return type int.
99  *
100  * Revision 1.7  1994/03/08  17:39:33  hollings
101  * Added foldCallback and getResourceListName.
102  *
103  * Revision 1.6  1994/02/25  20:58:11  markc
104  * Added support for storing paradynd's pids.
105  *
106  * Revision 1.5  1994/02/24  04:36:31  markc
107  * Added an upcall to dyninstRPC.I to allow paradynd's to report information at
108  * startup.  Added a data member to the class that igen generates.
109  * Make depend differences due to new header files that igen produces.
110  * Added support to allow asynchronous starts of paradynd's.  The dataManager has
111  * an advertised port that new paradynd's can connect to.
112  *
113  * Revision 1.4  1994/02/08  21:05:55  hollings
114  * Found a few pointer problems.
115  *
116  * Revision 1.3  1994/02/03  23:26:58  hollings
117  * Changes to work with g++ version 2.5.2.
118  *
119  * Revision 1.2  1994/02/02  00:42:33  hollings
120  * Changes to the Data manager to reflect the file naming convention and
121  * to support the integration of the Performance Consultant.
122  *
123  * Revision 1.1  1994/01/28  01:34:17  hollings
124  * The initial version of the Data Management thread.
125  *
126  *
127  */
128 #include <assert.h>
129 extern "C" {
130 #include <math.h>
131 double   quiet_nan(int unused);
132 #include <malloc.h>
133 #include "thread/h/thread.h"
134 }
135
136 #include "util/h/tunableConst.h"
137 #include "dataManager.SRVR.h"
138 #include "dyninstRPC.CLNT.h"
139 #include "DMinternals.h"
140 #include "../pdMain/paradyn.h"
141
142 static dataManager *dm;
143 stringPool metric::names;
144 HTable<metric *> metric::allMetrics;
145 List<paradynDaemon*> paradynDaemon::allDaemons;
146
147 void newSampleRate(float rate);
148
149 tunableConstant samplingRate(0.5, 0.0, 1000.0, newSampleRate, "samplingRate",
150    "how often to sample intermediate performance data (in seconds)");
151
152 metricInstance *performanceStream::enableDataCollection(resourceList *rl, 
153                                                         metric *m)
154 {
155     char *name;
156     metricInstance *mi;
157
158     if (!m || !rl) return(NULL);
159
160     name = rl->getCanonicalName();
161     mi = m->enabledCombos.find(name);
162     if (mi) {
163         mi->count++;
164         mi->users.add(this);
165     } else {
166         mi = appl->enableDataCollection(rl, m);
167         if (mi) {
168             mi->users.add(this);
169         }
170     }
171     return(mi);
172 }
173
174 //
175 // Turn off data collection for this perf stream.  Other streams may still
176 //    get the data.
177 //
178 void performanceStream::disableDataCollection(metricInstance *mi)
179 {
180     mi->count--;
181     mi->users.remove(this);
182     if (!mi->count) {
183         appl->disableDataCollection(mi);
184     }
185 }
186
187
188 void performanceStream::enableResourceCreationNotification(resource *r)
189 {
190     r->notify.add(this);
191 }
192
193 void performanceStream::disableResourceCreationNotification(resource *r)
194 {
195     r->notify.remove(this);
196 }
197
198 void performanceStream::callSampleFunc(metricInstance *mi,
199                                        sampleValue *buckets,
200                                        int count,
201                                        int first)
202 {
203     if (dataFunc.sample) {
204         dm->setTid(threadId);
205         dm->newPerfData(dataFunc.sample, this, mi, buckets, count, first);
206     }
207 }
208
209 void performanceStream::callResourceFunc(resource *p,
210                                          resource *c,
211                                          char *name)
212 {
213     if (controlFunc.rFunc) {
214         dm->setTid(threadId);
215         dm->newResourceDefined(controlFunc.rFunc, this, p, c, name);
216     }
217 }
218
219 void performanceStream::callResourceBatchFunc(batchMode mode)
220 {
221     if (controlFunc.bFunc) {
222         dm->setTid(threadId);
223         dm->changeResourceBatchMode(controlFunc.bFunc, this, mode);
224     }
225 }
226
227 void performanceStream::callFoldFunc(timeStamp width)
228 {
229     if (controlFunc.fFunc) {
230         dm->setTid(threadId);
231         dm->histFold(controlFunc.fFunc, this, width);
232     }
233 }
234
235
236 void performanceStream::callStateFunc(appState state)
237 {
238     if (controlFunc.sFunc) {
239         dm->setTid(threadId);
240         dm->changeState(controlFunc.sFunc, this, state);
241     }
242 }
243
244 //
245 // IO from application processes.
246 //
247 void dynRPCUser::applicationIO(int pid, int len, String data)
248 {
249     char *ptr;
250     char *rest;
251     // extra should really be per process.
252     static char *extra;
253
254     rest = data;
255     ptr = strchr(rest, '\n');
256     while (ptr) {
257         *ptr = '\0';
258         if (pid) {
259             printf("pid %d:", pid);
260         } else {
261             printf("paradynd:", pid);
262         }
263         if (extra) {
264             printf(extra);
265             free(extra);
266             extra = NULL;
267         }
268         printf("%s\n", rest);
269         rest = ptr+1;
270         ptr = strchr(rest, '\n');
271     }
272     extra = (char *) malloc(strlen(rest)+1);
273     strcpy(extra, rest);
274 }
275
276 abstractionType parseAbstractionName(String abstraction)
277 {
278   if (!abstraction) return BASE;
279   if (strcmp(abstraction,"TCL") == 0) return TCL;
280   if (strcmp(abstraction,"CMF") == 0) return CMF;
281   if (strcmp(abstraction,"BASE") == 0) return BASE;
282   printf("DATAMANGER: bad abstraction '%s'\n",abstraction);
283   return BASE;
284 }
285
286 //
287 // upcalls from remote process.
288 //
289 void dynRPCUser::resourceInfoCallback(int program,
290                                       String parentString,
291                                       String newResource,
292                                       String name,
293                                       String abstraction)
294 {
295     resource *parent;
296     abstractionType at;
297
298     // create the resource.
299     if (*parentString != '\0') {
300         // non-null string.
301         parentString = resource::names.findAndAdd(parentString);
302         parent = resource::allResources.find(parentString);
303         if (!parent) abort();
304     } else {
305         parent = resource::rootResource;
306     }
307
308 // rbi
309     at = parseAbstractionName(abstraction);
310 // rbi
311
312     createResource(parent, name, at);
313 }
314
315 void dynRPCUser::mappingInfoCallback(int program,
316                                      String abstraction, 
317                                      String type, 
318                                      String key,
319                                      String value)
320
321 {
322 //  List<performanceStream *> curr;
323 //  performanceStream *stream;
324
325 /*  printf("DATAMANAGER: '%s' '%s' map '%s -> %s'\n", abstraction, 
326          type, key, value); */
327 /*  for (curr = applicationContext::streams; stream = *curr; curr++) {
328      Make sure stream is of right abstraction 
329     stream->callMappingFunc(abstraction, type, key, value);
330   }
331 */
332   
333 }
334
335 class uniqueName {
336   public:
337     uniqueName(char *base) { name = base; nextId = 0; }
338     int nextId;
339     char *name;
340 };
341
342
343 String dynRPCUser::getUniqueResource(int program, 
344                                      String parentString, 
345                                      String newResource)
346 {
347     char *ptr;
348     uniqueName *ret;
349     char newName[80];
350     static List<uniqueName*> allUniqueNames;
351
352     sprintf(newName, "%s/%s", parentString, newResource);
353     ptr = resource::names.findAndAdd(newName);
354
355     ret = allUniqueNames.find(ptr);
356
357     if (!ret) {
358         ret = new uniqueName(ptr);
359         allUniqueNames.add(ret, ptr);
360     }
361     // changed from [] to {} due to TCL braindeadness.
362     sprintf(newName, "%s{%d}", newResource, ret->nextId++);
363     ptr = resource::names.findAndAdd(newName);
364
365     return(ptr);
366 }
367
368 //
369 // used when a new program gets forked.
370 //
371 void dynRPCUser::newProgramCallbackFunc(int pid,
372                                         int argc, 
373                                         String_Array argvString,
374                                         String machine_name)
375 {
376      char **argv;
377      paradynDaemon *daemon;
378      List<paradynDaemon*> curr;
379      int i;
380
381     // there better be a paradynd running on this machine!
382     for (curr=paradynDaemon::allDaemons, daemon = NULL; *curr; curr++) {
383         if (!strcmp((*curr)->machine, machine_name))
384             daemon = *curr;
385     }
386     // for now, abort if there is no paradynd, this should not happen
387     if (!daemon) {
388         printf("process started on %s, can't find paradynd there\n",
389                 machine_name);
390         printf("paradyn error #1 encountered\n");
391         exit(-1);
392     }
393    argv = (char **) malloc (argvString.count);
394    if (!argv) {
395         printf(" cannot malloc memory in newProgramCallbackFunc\n");
396         exit(-1);
397    }
398    for (i=0; i<argvString.count; ++i) {
399         argv[i] = strdup(argvString.data[i]);
400         if (!argv[i]) {
401                 printf(" cannot malloc memory in newProgramCallbackFunc\n");
402                 exit(-1);
403         }
404    }
405       
406    assert (dm->appContext);
407    assert (!dm->appContext->addRunningProgram(pid, argc, argv, daemon));
408 }
409
410 void dynRPCUser::newMetricCallback(metricInfo info)
411 {
412     addMetric(info);
413 }
414
415 void dynRPCUser::sampleDataCallbackFunc(int program,
416                                            int mid,
417                                            double startTimeStamp,
418                                            double endTimeStamp,
419                                            double value)
420 {
421     assert(0 && "Invalid virtual function");
422 }
423
424 void paradynDaemon::sampleDataCallbackFunc(int program,
425                                            int mid,
426                                            double startTimeStamp,
427                                            double endTimeStamp,
428                                            double value)
429 {
430     component *part;
431     metricInstance *mi;
432     struct sampleInterval ret;
433
434     // printf("mid %d %f from %f to %f\n", mid, value, startTimeStamp, endTimeStamp);
435     mi = activeMids.find((void*) mid);
436     if (!mi) {
437         printf("ERROR: data for unknown mid: %d\n", mid);
438         printf("paradyn Error #2\n");
439         exit(-1);
440     }
441
442     if (mi->components.count() != 1) {
443         // find the right component.
444         part = mi->components.find(this);
445
446         if (!part) {
447             printf("Unable to find component!!!\n");
448             printf("paradyn Error #3\n");
449             exit(-1);
450         }
451         ret = part->sample.newValue(endTimeStamp, value);
452     }
453     ret = mi->sample.newValue(mi->parts, endTimeStamp, value);
454
455     if (ret.valid) {
456         assert(ret.end >= 0.0);
457         assert(ret.start >= 0.0);
458         assert(ret.end >= ret.start);
459         mi->enabledTime += ret.end - ret.start;
460         mi->data->addInterval(ret.start, ret.end, ret.value, FALSE);
461     }
462 }
463
464 //
465 // paradyn daemon should never go away.  This represents an error state
466 //    due to a paradynd being killed for some reason.
467 //
468 paradynDaemon::~paradynDaemon() {
469
470 #ifdef notdef
471     metricInstance *mi;
472     HTable<metricInstance*> curr;
473
474     allDaemons.remove(this);
475
476     // remove the metric ID as required.
477     for (curr = activeMids; mi = *curr; curr++) {
478         mi->parts.remove(this);
479         mi->components.remove(this);
480     }
481 #endif
482     printf("Inconsistant state\n");
483     abort();
484 }
485
486 //
487 // When a paradynd is started remotely, ie not by paradyn, this upcall
488 // reports the information for that paradynd to paradyn
489 //
490 void 
491 dynRPCUser::reportSelf (String m, String p, int pd)
492 {
493   assert(0);
494   return;
495 }
496
497 //
498 // When an error is determined on an igen call, this function is
499 // called, since the default error handler will exit, and we don't
500 // want paradyn to exit.
501 //
502 void paradynDaemon::handle_error()
503 {
504    dm->appContext->removeDaemon(this, TRUE);
505 }
506
507 //
508 // When a paradynd is started remotely, ie not by paradyn, this upcall
509 // reports the information for that paradynd to paradyn
510 //
511 void 
512 paradynDaemon::reportSelf (String m, String p, int pd)
513 {
514   machine = strdup(m);
515   program = strdup(p);
516   my_pid = pd;
517   return;
518 }
519
520 // 
521 // establish socket that will be advertised to paradynd's
522 // this socket will allow paradynd's to connect to paradyn for pvm
523 //
524 static void
525 DMsetupSocket (int *sockfd, int *known_sock)
526 {
527   // setup "well known" socket for pvm paradynd's to connect to
528   assert ((*known_sock =
529            RPC_setup_socket (sockfd, AF_INET, SOCK_STREAM)) >= 0);
530
531   // this info is needed to create argument list for other paradynds
532   dm->socket = *known_sock;
533   dm->sock_fd = *sockfd;
534
535   // bind fd for this thread
536   msg_bind (*sockfd, TRUE);
537 }
538
539 static void
540 DMnewParadynd (int sockfd, dataManager *dm)
541 {
542   int new_fd;
543
544   // accept the connection
545   new_fd = RPC_getConnect(sockfd);
546   if (new_fd < 0) {
547     printf ("unable to connect to new paradynd\n");
548     printf("paradyn Error #4\n");
549     exit(-1);
550   }
551
552   assert (dm->appContext);
553   assert (!dm->appContext->addDaemon(new_fd));
554 }
555
556 //
557 // Main loop for the dataManager thread.
558 //
559 void *DMmain(int arg)
560 {
561     int ret;
562     unsigned int tag;
563     List<paradynDaemon*> curr;
564     int known_sock, sockfd;
565     char DMbuff[64];
566     unsigned int msgSize = 64;
567
568     thr_name("Data Manager");
569
570     dm = new dataManager(arg);
571     // this will be set on addExecutable
572     dm->appContext = 0;
573
574     // supports argv passed to paradynDaemon
575     // new paradynd's may try to connect to well known port
576     DMsetupSocket (&sockfd, &known_sock);
577     dynRPCUser::__wellKnownPortFd__ = sockfd;
578
579     paradynDaemon::args =
580               RPC_make_arg_list(AF_INET, SOCK_STREAM, known_sock, 1);
581
582     msg_send (MAINtid, MSG_TAG_DM_READY, (char *) NULL, 0);
583     tag = MSG_TAG_ALL_CHILDREN_READY;
584     msg_recv (&tag, DMbuff, &msgSize);
585
586     while (1) {
587         tag = MSG_TAG_ANY;
588         ret = msg_poll(&tag, TRUE);
589         assert(ret != THR_ERR);
590
591         if (tag == MSG_TAG_FILE) {
592           // must be an upcall on something speaking the dynRPC protocol.
593           for (curr = paradynDaemon::allDaemons; *curr; curr++) {
594             if ((*curr)->fd == ret) {
595               (*curr)->awaitResponce(-1);
596             }
597           }
598           if (ret == sockfd)
599             DMnewParadynd(sockfd, dm);        // set up a new paradynDaemon
600         } else {
601             dm->mainLoop(); 
602         }
603     }
604 }
605
606
607 void addMetric(metricInfo info)
608 {
609     char *iName;
610     metric *met;
611     performanceStream *stream;
612     List<performanceStream *> curr;
613
614     iName = metric::names.findAndAdd(info.name);
615     assert(iName);
616     met = metric::allMetrics.find(iName);
617     if (met) {
618         // check that it is compatible ????
619         return;
620     }
621
622     //
623     // It's really new 
624     //
625     met = new metric(info);
626     metric::allMetrics.add(met, iName);
627
628     //
629     // now tell all perfStreams
630     //
631     for (curr = applicationContext::streams; *curr; curr++) {
632         stream = *curr;
633         if (stream->controlFunc.mFunc) {
634             // set the correct destination thread.
635             dm->setTid(stream->threadId);
636             dm->newMetricDefined(stream->controlFunc.mFunc, stream, met);
637         }
638     }
639 }
640
641
642 resource *createResource(resource *p, char *newResource, abstractionType at)
643 {
644     resource *ret;
645     char *fullName;
646     resource *temp;
647     performanceStream *stream;
648     List<performanceStream *> curr;
649
650     /* first check to see if the resource has already been defined */
651     temp = p->children.find(newResource);
652     if (temp) return(temp);
653
654     /* then create it */
655     ret = new resource(p, strdup(newResource), at);
656     fullName = ret->getFullName();
657
658     /* inform others about it if they need to know */
659     for (curr = applicationContext::streams; stream = *curr; curr++) {
660       if (stream->getAbstraction() == at) {
661         stream->callResourceFunc(p, ret, fullName);
662       }
663     }
664
665     return(ret);
666 }
667
668 void newSampleRate(float rate)
669 {
670     List<paradynDaemon*> curr;
671
672     for (curr = paradynDaemon::allDaemons; *curr; curr++) {
673         (*curr)->setSampleRate(rate);
674     }
675 }