Fix for perfStream delete assert failure
[dyninst.git] / paradyn / src / DMthread / DMperfstream.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 #include <assert.h>
43 extern "C" {
44 double   quiet_nan();
45 #include <malloc.h>
46 #include <stdio.h>
47 }
48 #include "DMperfstream.h"
49 #include "DMmetric.h"
50 #include "DMresource.h"
51 #include "paradyn/src/DMthread/BufferPool.h"
52 #include "paradyn/src/DMthread/DVbufferpool.h"
53
54
55 performanceStream::performanceStream(dataType t, 
56                                      dataCallback dc, 
57                                      controlCallback cc, 
58                                      int tid) {
59     type = t;
60     dataFunc = dc;
61     controlFunc = cc;
62     threadId = tid;
63     num_global_mis = 0;
64     num_curr_mis = 0;
65     my_buffer_size = 0;
66     next_buffer_loc = 0;
67     my_buffer = 0;
68     handle = next_id++;
69     allStreams[handle] = this;
70     nextpredCostReqestId = 0;
71     pred_Cost_buff.resize(0);
72     // trace data streams
73     num_trace_mis =0;
74     my_traceBuffer_size = 0;
75     next_traceBuffer_loc = 0;
76     my_traceBuffer = 0;
77 }
78
79 performanceStream::~performanceStream(){
80     // indicate that handle is free for reuse
81     allStreams.undef(handle);
82     datavalues_bufferpool.dealloc(my_buffer);
83     my_buffer = 0;
84     // trace data streams
85     tracedatavalues_bufferpool.dealloc(my_traceBuffer);
86     my_traceBuffer = 0;
87 }
88
89
90 bool performanceStream::reallocBuffer(){
91     // reallocate a buffer of size my_buffer_size
92     assert(!my_buffer);
93     if(my_buffer_size){
94         my_buffer = datavalues_bufferpool.alloc(my_buffer_size); 
95         if(!my_buffer) return false;
96         assert(my_buffer);
97         assert(my_buffer->size() >= my_buffer_size);
98     }
99     return true;
100 }
101
102 bool performanceStream::reallocTraceBuffer(){
103     // reallocate a buffer of size my_traceBuffer_size
104     assert(!my_traceBuffer);
105     if(my_traceBuffer_size){
106         my_traceBuffer = tracedatavalues_bufferpool.alloc(my_traceBuffer_size);
107         if(!my_traceBuffer) return false;
108         assert(my_traceBuffer);
109         assert(my_traceBuffer->size() >= my_traceBuffer_size);
110     }
111     return true;
112 }
113
114 //
115 // send buffer of data values to client
116 //
117 void performanceStream::flushBuffer(){
118
119     // send data to client
120     if(next_buffer_loc){
121         assert(my_buffer);
122         dataManager::dm->setTid(threadId);
123         dataManager::dm->newPerfData(dataFunc.sample, 
124                                      my_buffer, next_buffer_loc);
125     }
126     next_buffer_loc = 0;
127     my_buffer = 0;
128 }
129
130 //
131 // send buffer of trace data values to client
132 //
133 void performanceStream::flushTraceBuffer(){
134
135     // send data to client
136     if(next_traceBuffer_loc){
137         assert(my_traceBuffer);
138         dataManager::dm->setTid(threadId);
139         dataManager::dm->newTracePerfData(dataFunc.trace,
140                                      my_traceBuffer,next_traceBuffer_loc);
141     }
142     next_traceBuffer_loc = 0;
143     my_traceBuffer = 0;
144 }
145
146 // 
147 // fills up buffer with new data values and flushs the buffer if 
148 // it is full 
149 //
150 void performanceStream::callSampleFunc(metricInstanceHandle mi,
151                                        sampleValue *buckets,
152                                        int count,
153                                        int first,
154                                        phaseType type)
155 {
156     if (dataFunc.sample) {
157         for(int i = first; i < (first+count); i++) {
158             if(!my_buffer) {
159                  if (!this->reallocBuffer()) assert(0); 
160                  assert(my_buffer);
161             }
162             (*my_buffer)[next_buffer_loc].mi = mi; 
163             (*my_buffer)[next_buffer_loc].bucketNum = i; 
164             (*my_buffer)[next_buffer_loc].value = buckets[i-first]; 
165             (*my_buffer)[next_buffer_loc].type = type; 
166             next_buffer_loc++;
167             if(next_buffer_loc >= my_buffer_size) {
168                 this->flushBuffer();
169                 assert(!next_buffer_loc);
170                 assert(!my_buffer);
171             }
172         }
173     }
174 }
175
176 //
177 // fills up trace buffer with new trace data values and flushs the buffer if
178 // it is full
179 //
180 void performanceStream::callTraceFunc(metricInstanceHandle mi,
181                                       void *data,
182                                       int length)
183 {
184     if (dataFunc.trace) {
185         if(!my_traceBuffer) {
186             if (!this->reallocTraceBuffer()) assert(0);
187                 assert(my_traceBuffer);
188         }
189         (*my_traceBuffer)[next_traceBuffer_loc].mi = mi;
190         (*my_traceBuffer)[next_traceBuffer_loc].traceRecord = byteArray(data,length);
191         next_traceBuffer_loc++;
192 //      if(next_traceBuffer_loc >= my_traceBuffer_size) {
193           if(next_traceBuffer_loc >= my_traceBuffer_size || length == 0) {
194             this->flushTraceBuffer();
195             assert(!next_traceBuffer_loc);
196             assert(!my_traceBuffer);
197         }
198     }
199 }
200
201 void performanceStream::callResourceFunc(resourceHandle parent,
202                                          resourceHandle  child,
203                                          const char *name,
204                                          const char *abstr)
205 {
206     if (controlFunc.rFunc) {
207         dataManager::dm->setTid(threadId);
208         dataManager::dm->newResourceDefined(controlFunc.rFunc,handle,
209                          parent,child,name,abstr);
210     }
211 }
212 void performanceStream::callMemoryFunc(string vname,
213                     int start, unsigned mem_size,
214                     unsigned blk_size, resourceHandle p_handle,
215                     vector<resourceHandle> handles)
216 {
217     const char *vn = strdup(vname.string_of()) ;
218     unsigned Psize = handles.size() ;
219     vector<resourceHandle> *children = new vector<resourceHandle> (Psize);
220     //printf("in callMemoryFunc, size = %d (%d)\n", Psize, children->size()) ;
221     //printf("(*children)[%d]=%d\n", k, (*children)[k]) ;
222     // The loop can be saved, if i alloc the vector in DMmain.C
223     for(unsigned k=0; k<Psize; k++) {
224          (*children)[k] = handles[k] ;
225     }
226
227     if (controlFunc.xFunc) {
228         dataManager::dm->setTid(threadId);
229         dataManager::dm->newMemoryDefined(controlFunc.xFunc,
230                                           handle,       //perfStream handle
231                                           vn,
232                                           start,
233                                           mem_size,
234                                           blk_size,
235                                           p_handle,
236                                           children) ;
237     }
238     //if(children) delete children ;
239 }
240
241 void performanceStream::callResourceBatchFunc(batchMode mode)
242 {
243     if (controlFunc.bFunc) {
244         dataManager::dm->setTid(threadId);
245         dataManager::dm->changeResourceBatchMode(controlFunc.bFunc, 
246                                                  handle, mode);
247     }
248 }
249
250 void performanceStream::callFoldFunc(timeStamp width,phaseType phase_type)
251 {
252     if (controlFunc.fFunc) {
253         dataManager::dm->setTid(threadId);
254         dataManager::dm->histFold(controlFunc.fFunc, handle, width, phase_type);
255     }
256 }
257
258
259 void performanceStream::callStateFunc(appState state)
260 {
261     if (controlFunc.sFunc) {
262         dataManager::dm->setTid(threadId);
263         dataManager::dm->changeState(controlFunc.sFunc, handle, state);
264     }
265 }
266
267 void performanceStream::callPredictedCostFuc(metricHandle m_handle,
268                                              resourceListHandle rl_handle,
269                                              float cost,
270                                              u_int clientID)
271 {
272     if (controlFunc.cFunc) {
273         dataManager::dm->setTid(threadId);
274         dataManager::dm->predictedDataCost(controlFunc.cFunc, m_handle,
275                                            rl_handle, cost,clientID);
276     }
277 }
278
279 void performanceStream::callDataEnableFunc(vector<metricInstInfo> *response,
280                                            u_int request_id)
281 {
282     if (controlFunc.eFunc) {
283         dataManager::dm->setTid(threadId);
284         dataManager::dm->enableDataCallback(controlFunc.eFunc, response,
285                                             request_id);
286     }
287 }
288
289 void performanceStream::notifyAllChange(appState state){
290
291    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
292    perfStreamHandle h;
293    performanceStream *ps;
294    while(allS.next(h,ps)){
295        ps->callStateFunc(state);
296    }
297 }
298
299 void performanceStream::ResourceBatchMode(batchMode mode){
300    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
301
302    perfStreamHandle h;
303    performanceStream *ps;
304    while(allS.next(h,ps)){
305        ps->callResourceBatchFunc(mode);
306    }
307
308 }
309
310 void performanceStream::foldAll(timeStamp width,phaseType phase_type){
311
312    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
313    perfStreamHandle h;
314    performanceStream *ps;
315    while(allS.next(h,ps)){
316        ps->flushBuffer();  // first flush any data values in buffer
317        ps->callFoldFunc(width,phase_type);
318    }
319 }
320
321 void performanceStream::callPhaseFunc(phaseInfo& phase,
322                                       bool with_new_pc,
323                                       bool with_visis)
324 {
325     if (controlFunc.pFunc) {
326         dataManager::dm->setTid(threadId);
327         dataManager::dm->newPhaseInfo(controlFunc.pFunc,
328                                       phase.GetPhaseHandle(),
329                                       phase.PhaseName(),
330                                       phase.GetPhaseHandle(),
331                                       phase.GetStartTime(),
332                                       phase.GetEndTime(),
333                                       phase.GetBucketWidth(),
334                                       with_new_pc,
335                                       with_visis);
336     }
337 }
338
339 performanceStream *performanceStream::find(perfStreamHandle psh){
340     performanceStream *pshan;
341     bool found = allStreams.find(psh, pshan);
342     if (found){
343       return(pshan);
344     }
345     return((performanceStream *)NULL);
346 }
347
348 //  if my_buffer has data values, flush it
349 //  then increase num_curr_mis count and my_buffer_size 
350 void performanceStream::addCurrentUser(perfStreamHandle p){
351
352     performanceStream *ps = performanceStream::find(p); 
353     if(!ps) return;
354     if(ps->next_buffer_loc && ps->my_buffer){
355         ps->flushBuffer();
356         assert(!(ps->my_buffer));
357         assert(!(ps->next_buffer_loc));
358     }
359     ps->num_curr_mis++;
360     if(ps->my_buffer_size < DM_DATABUF_LIMIT) {
361         ps->my_buffer_size++;
362     }
363     if(ps->num_curr_mis + ps->num_global_mis){
364         assert(ps->my_buffer_size);
365     }
366 }
367
368 //  if my_buffer has data values, flush it
369 //  then increase num_global_mis count and my_buffer_size 
370 void performanceStream::addGlobalUser(perfStreamHandle p){
371
372     performanceStream *ps = performanceStream::find(p); 
373     if(!ps) return;
374     if(ps->next_buffer_loc && ps->my_buffer){
375         ps->flushBuffer();
376         assert(!(ps->my_buffer));
377         assert(!(ps->next_buffer_loc));
378     }
379     ps->num_global_mis++;
380     if(ps->my_buffer_size < DM_DATABUF_LIMIT) {
381         ps->my_buffer_size++;
382     }
383     if(ps->num_curr_mis + ps->num_global_mis){
384         assert(ps->my_buffer_size);
385     }
386 }
387
388 //  if my_traceBuffer has data values, flush it
389 //  then increase num_trace_mis count
390 //  however, my_traceBuffer_size remains same (10)
391 void performanceStream::addTraceUser(perfStreamHandle p){
392
393     performanceStream *ps = performanceStream::find(p);
394     if(!ps) return;
395     if(ps->next_traceBuffer_loc && ps->my_traceBuffer){
396         ps->flushTraceBuffer();
397         assert(!(ps->my_traceBuffer));
398         assert(!(ps->next_traceBuffer_loc));
399     }
400     ps->num_trace_mis++;
401     if(ps->my_traceBuffer_size < DM_DATABUF_LIMIT) {
402         ps->my_traceBuffer_size++;
403     }
404     //if(ps->num_trace_mis){ 
405     //    ps->my_traceBuffer_size = 10;
406     //    assert(ps->my_traceBuffer_size);
407     //}
408
409 }
410
411 //
412 // if my_buffer has data values, flush it, then decrease its size
413 //
414 void performanceStream::removeGlobalUser(perfStreamHandle p){
415
416     performanceStream *ps = performanceStream::find(p); 
417     if(!ps) return;
418     if(ps->next_buffer_loc && ps->my_buffer){
419         ps->flushBuffer();
420         assert(!(ps->my_buffer));
421         assert(!(ps->next_buffer_loc));
422     }
423     if(ps->num_global_mis && ps->my_buffer_size){
424         ps->num_global_mis--;
425         if((ps->num_global_mis + ps->num_curr_mis) < DM_DATABUF_LIMIT){
426             if(ps->my_buffer_size) ps->my_buffer_size--;
427         }
428     }
429     if(ps->num_curr_mis + ps->num_global_mis){
430         assert(ps->my_buffer_size);
431     }
432 }
433
434 //
435 // if my_buffer has data values, flush it, then decrease its size
436 //
437 void performanceStream::removeCurrentUser(perfStreamHandle p){
438
439     performanceStream *ps = performanceStream::find(p); 
440     if(!ps) return;
441     if(ps->next_buffer_loc && ps->my_buffer){
442         ps->flushBuffer();
443         assert(!(ps->my_buffer));
444         assert(!(ps->next_buffer_loc));
445     }
446     if(ps->num_curr_mis && ps->my_buffer_size){
447         ps->num_curr_mis--;
448         if((ps->num_global_mis + ps->num_curr_mis) < DM_DATABUF_LIMIT){
449             if(ps->my_buffer_size) ps->my_buffer_size--;
450         }
451     }
452     if(ps->num_curr_mis + ps->num_global_mis){
453         assert(ps->my_buffer_size);
454     }
455 }
456
457 //
458 // For all performanceStreams delete buffer size by the num_curr_mis,
459 // flush buffer if neccessary, set num_curr_mis to 0 
460 //
461 void performanceStream::removeAllCurrUsers(){
462
463    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
464    perfStreamHandle h;
465    performanceStream *ps;
466    while(allS.next(h,ps)){
467        if(ps->next_buffer_loc && ps->my_buffer){
468            ps->flushBuffer();
469            assert(!(ps->my_buffer));
470            assert(!(ps->next_buffer_loc));
471        }
472        if(ps->num_curr_mis && ps->my_buffer_size){
473            ps->num_curr_mis = 0;
474            if((ps->num_global_mis) < DM_DATABUF_LIMIT){
475              ps->my_buffer_size = ps->num_global_mis;
476            }
477        }
478        if(ps->num_curr_mis + ps->num_global_mis){
479            assert(ps->my_buffer_size);
480        }
481    }
482 }
483
484 //
485 // if my_traceBuffer has data values, flush it
486 // if num_trace_mis becomes 0, set my_traceBuffer_size to 0 
487 //
488 void performanceStream::removeTraceUser(perfStreamHandle p){
489
490     performanceStream *ps = performanceStream::find(p);
491     if(!ps) return;
492     if(ps->next_traceBuffer_loc && ps->my_traceBuffer){
493         ps->flushTraceBuffer();
494         assert(!(ps->my_traceBuffer));
495         assert(!(ps->next_traceBuffer_loc));
496     }
497     if(ps->num_trace_mis && ps->my_traceBuffer_size){
498         ps->num_trace_mis--;
499         if(ps->num_trace_mis < DM_DATABUF_LIMIT){
500             if(ps->my_traceBuffer_size) ps->my_traceBuffer_size--;
501         }
502     }
503     if(ps->num_trace_mis){
504       assert(ps->my_traceBuffer_size);
505     }
506     else
507         ps->my_traceBuffer_size = 0;
508
509
510 }
511
512 //
513 // creates a new predCostRecord 
514 //
515 bool performanceStream::addPredCostRequest(perfStreamHandle ps_handle,
516                                            u_int &requestId,
517                                            metricHandle m_handle,
518                                            resourceListHandle rl_handle,
519                                            u_int howmany){
520
521     performanceStream *ps = performanceStream::find(ps_handle); 
522     if(!ps) return false;
523
524     predCostType *new_value = new predCostType; 
525     if(!new_value) return false;
526     new_value->m_handle =  m_handle;
527     new_value->rl_handle =  rl_handle;
528     new_value->max = 0.0;
529     new_value->howmany =  howmany;
530     requestId = ps->nextpredCostReqestId++; 
531     new_value->requestId = requestId;
532     ps->pred_Cost_buff += new_value;
533     new_value = 0;
534     return true;
535 }
536
537 //
538 // update the predicted cost value, and if this is the last outstanding
539 // response then send result to calling thread
540 //
541 void performanceStream::predictedDataCostCallback(u_int requestId,
542                                                   float cost,
543                                                   u_int clientID){
544
545     bool found = false; 
546     u_int which;
547     for(u_int i=0; i < pred_Cost_buff.size(); i++){
548         if((pred_Cost_buff[i])->requestId == requestId){
549             found = true;
550             which = i;
551             break;
552     } }
553     if(found){
554         predCostType *value = pred_Cost_buff[which]; 
555         assert(value->howmany);
556         if(cost > value->max) value->max = cost;
557         value->howmany--;
558         // if all values have been collected, send result to calling thread
559         if(!value->howmany && controlFunc.cFunc ){
560             this->callPredictedCostFuc(value->m_handle, value->rl_handle,
561                                        value->max,clientID);
562             u_int size = pred_Cost_buff.size();
563             if(size){
564                 pred_Cost_buff[which] = pred_Cost_buff[size-1];
565                 pred_Cost_buff.resize(size-1);
566             }
567             delete value;
568         }
569     }
570     else{
571         cout << "error in perfStream::predictedDataCostCallback" << endl;
572         assert(0);
573     }
574 }
575