Bug fix to assert failure that sometimes occured with the trace data stuff
[dyninst.git] / paradyn / src / DMthread / DMperfstream.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 #include <assert.h>
43 extern "C" {
44 double   quiet_nan();
45 #include <malloc.h>
46 #include <stdio.h>
47 }
48 #include "DMperfstream.h"
49 #include "DMmetric.h"
50 #include "DMresource.h"
51 #include "paradyn/src/DMthread/BufferPool.h"
52 #include "paradyn/src/DMthread/DVbufferpool.h"
53
54
55 performanceStream::performanceStream(dataType t, 
56                                      dataCallback dc, 
57                                      controlCallback cc, 
58                                      int tid) {
59     type = t;
60     dataFunc = dc;
61     controlFunc = cc;
62     threadId = tid;
63     num_global_mis = 0;
64     num_curr_mis = 0;
65     my_buffer_size = 0;
66     next_buffer_loc = 0;
67     my_buffer = 0;
68     handle = next_id++;
69     allStreams[handle] = this;
70     nextpredCostReqestId = 0;
71     pred_Cost_buff.resize(0);
72     // trace data streams
73     my_traceBuffer_size = 0;
74     next_traceBuffer_loc = 0;
75     my_traceBuffer = 0;
76 }
77
78 performanceStream::~performanceStream(){
79     // indicate that handle is free for reuse
80     allStreams.undef(handle);
81     datavalues_bufferpool.dealloc(my_buffer);
82     my_buffer = 0;
83     // trace data streams
84     tracedatavalues_bufferpool.dealloc(my_traceBuffer);
85     my_traceBuffer = 0;
86 }
87
88
89 bool performanceStream::reallocBuffer(){
90     // reallocate a buffer of size my_buffer_size
91     assert(!my_buffer);
92     if(my_buffer_size){
93         my_buffer = datavalues_bufferpool.alloc(my_buffer_size); 
94         if(!my_buffer) return false;
95         assert(my_buffer);
96         assert(my_buffer->size() >= my_buffer_size);
97     }
98     return true;
99 }
100
101 bool performanceStream::reallocTraceBuffer(){
102     // reallocate a buffer of size my_traceBuffer_size
103     assert(!my_traceBuffer);
104     if(my_traceBuffer_size){
105         my_traceBuffer = tracedatavalues_bufferpool.alloc(my_traceBuffer_size);
106         if(!my_traceBuffer) return false;
107         assert(my_traceBuffer);
108         assert(my_traceBuffer->size() >= my_traceBuffer_size);
109     }
110     return true;
111 }
112
113 //
114 // send buffer of data values to client
115 //
116 void performanceStream::flushBuffer(){
117
118     // send data to client
119     if(next_buffer_loc){
120         assert(my_buffer);
121         dataManager::dm->setTid(threadId);
122         dataManager::dm->newPerfData(dataFunc.sample, 
123                                      my_buffer, next_buffer_loc);
124     }
125     next_buffer_loc = 0;
126     my_buffer = 0;
127 }
128
129 //
130 // send buffer of trace data values to client
131 //
132 void performanceStream::flushTraceBuffer(){
133
134     // send data to client
135     if(next_traceBuffer_loc){
136         assert(my_traceBuffer);
137         dataManager::dm->setTid(threadId);
138         dataManager::dm->newTracePerfData(dataFunc.trace,
139                                      my_traceBuffer,next_traceBuffer_loc);
140     }
141     next_traceBuffer_loc = 0;
142     my_traceBuffer = 0;
143 }
144
145 // 
146 // fills up buffer with new data values and flushs the buffer if 
147 // it is full 
148 //
149 void performanceStream::callSampleFunc(metricInstanceHandle mi,
150                                        sampleValue *buckets,
151                                        int count,
152                                        int first,
153                                        phaseType type)
154 {
155     if (dataFunc.sample) {
156         for(int i = first; i < (first+count); i++) {
157             if(!my_buffer) {
158                  if (!this->reallocBuffer()) assert(0); 
159                  assert(my_buffer);
160             }
161             (*my_buffer)[next_buffer_loc].mi = mi; 
162             (*my_buffer)[next_buffer_loc].bucketNum = i; 
163             (*my_buffer)[next_buffer_loc].value = buckets[i-first]; 
164             (*my_buffer)[next_buffer_loc].type = type; 
165             next_buffer_loc++;
166             if(next_buffer_loc >= my_buffer_size) {
167                 this->flushBuffer();
168                 assert(!next_buffer_loc);
169                 assert(!my_buffer);
170             }
171         }
172     }
173 }
174
175 //
176 // fills up trace buffer with new trace data values and flushs the buffer if
177 // it is full
178 //
179 void performanceStream::callTraceFunc(metricInstanceHandle mi,
180                                       void *data,
181                                       int length)
182 {
183     if (dataFunc.trace) {
184         if(!my_traceBuffer) {
185             if (!this->reallocTraceBuffer()) assert(0);
186                 assert(my_traceBuffer);
187         }
188         (*my_traceBuffer)[next_traceBuffer_loc].mi = mi;
189         (*my_traceBuffer)[next_traceBuffer_loc].traceRecord = byteArray(data,length);
190         next_traceBuffer_loc++;
191 //      if(next_traceBuffer_loc >= my_traceBuffer_size) {
192           if(next_traceBuffer_loc >= my_traceBuffer_size || length == 0) {
193             this->flushTraceBuffer();
194             assert(!next_traceBuffer_loc);
195             assert(!my_traceBuffer);
196         }
197     }
198 }
199
200 void performanceStream::callResourceFunc(resourceHandle parent,
201                                          resourceHandle  child,
202                                          const char *name,
203                                          const char *abstr)
204 {
205     if (controlFunc.rFunc) {
206         dataManager::dm->setTid(threadId);
207         dataManager::dm->newResourceDefined(controlFunc.rFunc,handle,
208                          parent,child,name,abstr);
209     }
210 }
211 void performanceStream::callMemoryFunc(string vname,
212                     int start, unsigned mem_size,
213                     unsigned blk_size, resourceHandle p_handle,
214                     vector<resourceHandle> handles)
215 {
216     const char *vn = strdup(vname.string_of()) ;
217     unsigned Psize = handles.size() ;
218     vector<resourceHandle> *children = new vector<resourceHandle> (Psize);
219     //printf("in callMemoryFunc, size = %d (%d)\n", Psize, children->size()) ;
220     //printf("(*children)[%d]=%d\n", k, (*children)[k]) ;
221     // The loop can be saved, if i alloc the vector in DMmain.C
222     for(unsigned k=0; k<Psize; k++) {
223          (*children)[k] = handles[k] ;
224     }
225
226     if (controlFunc.xFunc) {
227         dataManager::dm->setTid(threadId);
228         dataManager::dm->newMemoryDefined(controlFunc.xFunc,
229                                           handle,       //perfStream handle
230                                           vn,
231                                           start,
232                                           mem_size,
233                                           blk_size,
234                                           p_handle,
235                                           children) ;
236     }
237     //if(children) delete children ;
238 }
239
240 void performanceStream::callResourceBatchFunc(batchMode mode)
241 {
242     if (controlFunc.bFunc) {
243         dataManager::dm->setTid(threadId);
244         dataManager::dm->changeResourceBatchMode(controlFunc.bFunc, 
245                                                  handle, mode);
246     }
247 }
248
249 void performanceStream::callFoldFunc(timeStamp width,phaseType phase_type)
250 {
251     if (controlFunc.fFunc) {
252         dataManager::dm->setTid(threadId);
253         dataManager::dm->histFold(controlFunc.fFunc, handle, width, phase_type);
254     }
255 }
256
257
258 void performanceStream::callStateFunc(appState state)
259 {
260     if (controlFunc.sFunc) {
261         dataManager::dm->setTid(threadId);
262         dataManager::dm->changeState(controlFunc.sFunc, handle, state);
263     }
264 }
265
266 void performanceStream::callPredictedCostFuc(metricHandle m_handle,
267                                              resourceListHandle rl_handle,
268                                              float cost,
269                                              u_int clientID)
270 {
271     if (controlFunc.cFunc) {
272         dataManager::dm->setTid(threadId);
273         dataManager::dm->predictedDataCost(controlFunc.cFunc, m_handle,
274                                            rl_handle, cost,clientID);
275     }
276 }
277
278 void performanceStream::callDataEnableFunc(vector<metricInstInfo> *response,
279                                            u_int request_id)
280 {
281     if (controlFunc.eFunc) {
282         dataManager::dm->setTid(threadId);
283         dataManager::dm->enableDataCallback(controlFunc.eFunc, response,
284                                             request_id);
285     }
286 }
287
288 void performanceStream::notifyAllChange(appState state){
289
290    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
291    perfStreamHandle h;
292    performanceStream *ps;
293    while(allS.next(h,ps)){
294        ps->callStateFunc(state);
295    }
296 }
297
298 void performanceStream::ResourceBatchMode(batchMode mode){
299    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
300
301    perfStreamHandle h;
302    performanceStream *ps;
303    while(allS.next(h,ps)){
304        ps->callResourceBatchFunc(mode);
305    }
306
307 }
308
309 void performanceStream::foldAll(timeStamp width,phaseType phase_type){
310
311    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
312    perfStreamHandle h;
313    performanceStream *ps;
314    while(allS.next(h,ps)){
315        ps->flushBuffer();  // first flush any data values in buffer
316        ps->callFoldFunc(width,phase_type);
317    }
318 }
319
320 void performanceStream::callPhaseFunc(phaseInfo& phase,
321                                       bool with_new_pc,
322                                       bool with_visis)
323 {
324     if (controlFunc.pFunc) {
325         dataManager::dm->setTid(threadId);
326         dataManager::dm->newPhaseInfo(controlFunc.pFunc,
327                                       phase.GetPhaseHandle(),
328                                       phase.PhaseName(),
329                                       phase.GetPhaseHandle(),
330                                       phase.GetStartTime(),
331                                       phase.GetEndTime(),
332                                       phase.GetBucketWidth(),
333                                       with_new_pc,
334                                       with_visis);
335     }
336 }
337
338 performanceStream *performanceStream::find(perfStreamHandle psh){
339     performanceStream *pshan;
340     bool found = allStreams.find(psh, pshan);
341     if (found){
342       return(pshan);
343     }
344     return((performanceStream *)NULL);
345 }
346
347 //  if my_buffer has data values, flush it
348 //  then increase num_curr_mis count and my_buffer_size 
349 void performanceStream::addCurrentUser(perfStreamHandle p){
350
351     performanceStream *ps = performanceStream::find(p); 
352     if(!ps) return;
353     if(ps->next_buffer_loc && ps->my_buffer){
354         ps->flushBuffer();
355         assert(!(ps->my_buffer));
356         assert(!(ps->next_buffer_loc));
357     }
358     ps->num_curr_mis++;
359     if(ps->my_buffer_size < DM_DATABUF_LIMIT) {
360         ps->my_buffer_size++;
361     }
362     if(ps->num_curr_mis + ps->num_global_mis){
363         assert(ps->my_buffer_size);
364     }
365 }
366
367 //  if my_buffer has data values, flush it
368 //  then increase num_global_mis count and my_buffer_size 
369 void performanceStream::addGlobalUser(perfStreamHandle p){
370
371     performanceStream *ps = performanceStream::find(p); 
372     if(!ps) return;
373     if(ps->next_buffer_loc && ps->my_buffer){
374         ps->flushBuffer();
375         assert(!(ps->my_buffer));
376         assert(!(ps->next_buffer_loc));
377     }
378     ps->num_global_mis++;
379     if(ps->my_buffer_size < DM_DATABUF_LIMIT) {
380         ps->my_buffer_size++;
381     }
382     if(ps->num_curr_mis + ps->num_global_mis){
383         assert(ps->my_buffer_size);
384     }
385 }
386
387 //  if my_traceBuffer has data values, flush it
388 //  then increase num_trace_mis count
389 //  however, my_traceBuffer_size remains same (10)
390 void performanceStream::addTraceUser(perfStreamHandle p){
391
392     performanceStream *ps = performanceStream::find(p);
393     if(!ps) return;
394     if(ps->next_traceBuffer_loc && ps->my_traceBuffer){
395         ps->flushTraceBuffer();
396         assert(!(ps->my_traceBuffer));
397         assert(!(ps->next_traceBuffer_loc));
398     }
399     ps->num_trace_mis++;
400 //    if(ps->my_traceBuffer_size < DM_DATABUF_LIMIT) {
401 //        ps->my_traceBuffer_size++;
402 //    }
403     if(ps->num_trace_mis){
404         ps->my_traceBuffer_size = 10;
405         assert(ps->my_traceBuffer_size);
406     }
407
408 }
409
410 //
411 // if my_buffer has data values, flush it, then decrease its size
412 //
413 void performanceStream::removeGlobalUser(perfStreamHandle p){
414
415     performanceStream *ps = performanceStream::find(p); 
416     if(!ps) return;
417     if(ps->next_buffer_loc && ps->my_buffer){
418         ps->flushBuffer();
419         assert(!(ps->my_buffer));
420         assert(!(ps->next_buffer_loc));
421     }
422     if(ps->num_global_mis && ps->my_buffer_size){
423         ps->num_global_mis--;
424         if((ps->num_global_mis + ps->num_curr_mis) < DM_DATABUF_LIMIT){
425             if(ps->my_buffer_size) ps->my_buffer_size--;
426         }
427     }
428     if(ps->num_curr_mis + ps->num_global_mis){
429         assert(ps->my_buffer_size);
430     }
431 }
432
433 //
434 // if my_buffer has data values, flush it, then decrease its size
435 //
436 void performanceStream::removeCurrentUser(perfStreamHandle p){
437
438     performanceStream *ps = performanceStream::find(p); 
439     if(!ps) return;
440     if(ps->next_buffer_loc && ps->my_buffer){
441         ps->flushBuffer();
442         assert(!(ps->my_buffer));
443         assert(!(ps->next_buffer_loc));
444     }
445     if(ps->num_curr_mis && ps->my_buffer_size){
446         ps->num_curr_mis--;
447         if((ps->num_global_mis + ps->num_curr_mis) < DM_DATABUF_LIMIT){
448             if(ps->my_buffer_size) ps->my_buffer_size--;
449         }
450     }
451     if(ps->num_curr_mis + ps->num_global_mis){
452         assert(ps->my_buffer_size);
453     }
454 }
455
456 //
457 // For all performanceStreams delete buffer size by the num_curr_mis,
458 // flush buffer if neccessary, set num_curr_mis to 0 
459 //
460 void performanceStream::removeAllCurrUsers(){
461
462    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
463    perfStreamHandle h;
464    performanceStream *ps;
465    while(allS.next(h,ps)){
466        if(ps->next_buffer_loc && ps->my_buffer){
467            ps->flushBuffer();
468            assert(!(ps->my_buffer));
469            assert(!(ps->next_buffer_loc));
470        }
471        if(ps->num_curr_mis && ps->my_buffer_size){
472            ps->num_curr_mis = 0;
473            if((ps->num_global_mis) < DM_DATABUF_LIMIT){
474              ps->my_buffer_size = ps->num_global_mis;
475            }
476        }
477        if(ps->num_curr_mis + ps->num_global_mis){
478            assert(ps->my_buffer_size);
479        }
480    }
481 }
482
483 //
484 // if my_traceBuffer has data values, flush it
485 // if num_trace_mis becomes 0, set my_traceBuffer_size to 0 
486 //
487 void performanceStream::removeTraceUser(perfStreamHandle p){
488
489     performanceStream *ps = performanceStream::find(p);
490     if(!ps) return;
491     if(ps->next_traceBuffer_loc && ps->my_traceBuffer){
492         ps->flushTraceBuffer();
493         assert(!(ps->my_traceBuffer));
494         assert(!(ps->next_traceBuffer_loc));
495     }
496     if(ps->num_trace_mis && ps->my_traceBuffer_size){
497         ps->num_trace_mis--;
498         if(ps->num_trace_mis < DM_DATABUF_LIMIT){
499             if(ps->my_traceBuffer_size) ps->my_traceBuffer_size--;
500         }
501     }
502     if(ps->num_trace_mis){
503       assert(ps->my_traceBuffer_size);
504     }
505     else
506         ps->my_traceBuffer_size = 0;
507
508
509 }
510
511 //
512 // creates a new predCostRecord 
513 //
514 bool performanceStream::addPredCostRequest(perfStreamHandle ps_handle,
515                                            u_int &requestId,
516                                            metricHandle m_handle,
517                                            resourceListHandle rl_handle,
518                                            u_int howmany){
519
520     performanceStream *ps = performanceStream::find(ps_handle); 
521     if(!ps) return false;
522
523     predCostType *new_value = new predCostType; 
524     if(!new_value) return false;
525     new_value->m_handle =  m_handle;
526     new_value->rl_handle =  rl_handle;
527     new_value->max = 0.0;
528     new_value->howmany =  howmany;
529     requestId = ps->nextpredCostReqestId++; 
530     new_value->requestId = requestId;
531     ps->pred_Cost_buff += new_value;
532     new_value = 0;
533     return true;
534 }
535
536 //
537 // update the predicted cost value, and if this is the last outstanding
538 // response then send result to calling thread
539 //
540 void performanceStream::predictedDataCostCallback(u_int requestId,
541                                                   float cost,
542                                                   u_int clientID){
543
544     bool found = false; 
545     u_int which;
546     for(u_int i=0; i < pred_Cost_buff.size(); i++){
547         if((pred_Cost_buff[i])->requestId == requestId){
548             found = true;
549             which = i;
550             break;
551     } }
552     if(found){
553         predCostType *value = pred_Cost_buff[which]; 
554         assert(value->howmany);
555         if(cost > value->max) value->max = cost;
556         value->howmany--;
557         // if all values have been collected, send result to calling thread
558         if(!value->howmany && controlFunc.cFunc ){
559             this->callPredictedCostFuc(value->m_handle, value->rl_handle,
560                                        value->max,clientID);
561             u_int size = pred_Cost_buff.size();
562             if(size){
563                 pred_Cost_buff[which] = pred_Cost_buff[size-1];
564                 pred_Cost_buff.resize(size-1);
565             }
566             delete value;
567         }
568     }
569     else{
570         cout << "error in perfStream::predictedDataCostCallback" << endl;
571         assert(0);
572     }
573 }
574