bug fix to flushBuffer
[dyninst.git] / paradyn / src / DMthread / DMperfstream.C
1 /*
2  * Copyright (c) 1993, 1994 Barton P. Miller, Jeff Hollingsworth,
3  *     Bruce Irvin, Jon Cargille, Krishna Kunchithapadam, Karen
4  *     Karavanic, Tia Newhall, Mark Callaghan.  All rights reserved.
5  *
6  * This software is furnished under the condition that it may not be
7  * provided or otherwise made available to, or used by, any other
8  * person, except as provided for by the terms of applicable license
9  * agreements.  No title to or ownership of the software is hereby
10  * transferred.  The name of the principals may not be used in any
11  * advertising or publicity related to this software without specific,
12  * written prior authorization.  Any use of this software must include
13  * the above copyright notice.
14  *
15  */
16 #include <assert.h>
17 extern "C" {
18 double   quiet_nan();
19 #include <malloc.h>
20 #include <stdio.h>
21 }
22 #include "DMperfstream.h"
23 #include "DMmetric.h"
24 #include "DMresource.h"
25 #include "paradyn/src/DMthread/BufferPool.h"
26 #include "paradyn/src/DMthread/DVbufferpool.h"
27
28
29 performanceStream::performanceStream(dataType t, 
30                                      dataCallback dc, 
31                                      controlCallback cc, 
32                                      int tid) {
33     type = t;
34     dataFunc = dc;
35     controlFunc = cc;
36     threadId = tid;
37     num_global_mis = 0;
38     num_curr_mis = 0;
39     my_buffer_size = 0;
40     next_buffer_loc = 0;
41     my_buffer = 0;
42
43     //  check for reuse of existing handle
44     for(unsigned i=0; i < nextId.size(); i++){
45         if((!nextId[i])){
46             handle = i;
47             nextId[i] = true;
48             allStreams[handle] = this;
49             return;
50     }}
51     // if not found, add a new element to the id array
52     handle = nextId.size();
53     nextId += true;
54     assert(handle < nextId.size());
55     allStreams[handle] = this;
56 }
57
58 performanceStream::~performanceStream(){
59     // indicate that handle is free for reuse
60     nextId[handle] = false;
61     allStreams.undef(handle);
62     datavalues_bufferpool.dealloc(my_buffer);
63     my_buffer = 0;
64 }
65
66
67 bool performanceStream::reallocBuffer(){
68     // reallocate a buffer of size my_buffer_size
69     assert(!my_buffer);
70     if(my_buffer_size){
71         my_buffer = datavalues_bufferpool.alloc(my_buffer_size); 
72         if(!my_buffer) return false;
73         assert(my_buffer);
74         assert(my_buffer->size() >= my_buffer_size);
75     }
76     return true;
77 }
78
79 //
80 // send buffer of data values to client
81 //
82 void performanceStream::flushBuffer(){
83
84     // send data to client
85     if(next_buffer_loc){
86         assert(my_buffer);
87         dataManager::dm->setTid(threadId);
88         dataManager::dm->newPerfData(dataFunc.sample, 
89                                      my_buffer, next_buffer_loc);
90     }
91     next_buffer_loc = 0;
92     my_buffer = 0;
93 }
94
95 // 
96 // fills up buffer with new data values and flushs the buffer if 
97 // it is full 
98 //
99 void performanceStream::callSampleFunc(metricInstanceHandle mi,
100                                        sampleValue *buckets,
101                                        int count,
102                                        int first,
103                                        phaseType type)
104 {
105     if (dataFunc.sample) {
106         for(unsigned i = first; i < (first+count); i++) {
107             if(!my_buffer) {
108                  if (!this->reallocBuffer()) assert(0); 
109                  assert(my_buffer);
110             }
111             (*my_buffer)[next_buffer_loc].mi = mi; 
112             (*my_buffer)[next_buffer_loc].bucketNum = i; 
113             (*my_buffer)[next_buffer_loc].value = buckets[i-first]; 
114             (*my_buffer)[next_buffer_loc].type = type; 
115             next_buffer_loc++;
116             if(next_buffer_loc >= my_buffer_size) {
117                 this->flushBuffer();
118                 assert(!next_buffer_loc);
119                 assert(!my_buffer);
120             }
121         }
122     }
123 }
124
125 void performanceStream::callResourceFunc(resourceHandle parent,
126                                          resourceHandle  child,
127                                          const char *name,
128                                          const char *abstr)
129 {
130     if (controlFunc.rFunc) {
131         dataManager::dm->setTid(threadId);
132         dataManager::dm->newResourceDefined(controlFunc.rFunc,handle,
133                          parent,child,name,abstr);
134     }
135 }
136
137 void performanceStream::callResourceBatchFunc(batchMode mode)
138 {
139     if (controlFunc.bFunc) {
140         dataManager::dm->setTid(threadId);
141         dataManager::dm->changeResourceBatchMode(controlFunc.bFunc, 
142                                                  handle, mode);
143     }
144 }
145
146 void performanceStream::callFoldFunc(timeStamp width,phaseType phase_type)
147 {
148     if (controlFunc.fFunc) {
149         dataManager::dm->setTid(threadId);
150         dataManager::dm->histFold(controlFunc.fFunc, handle, width, phase_type);
151     }
152 }
153
154
155 void performanceStream::callStateFunc(appState state)
156 {
157     if (controlFunc.sFunc) {
158         dataManager::dm->setTid(threadId);
159         dataManager::dm->changeState(controlFunc.sFunc, handle, state);
160     }
161 }
162
163 void performanceStream::notifyAllChange(appState state){
164
165    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
166    perfStreamHandle h;
167    performanceStream *ps;
168    while(allS.next(h,ps)){
169        ps->callStateFunc(state);
170    }
171 }
172
173 void performanceStream::ResourceBatchMode(batchMode mode){
174    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
175    perfStreamHandle h;
176    performanceStream *ps;
177    while(allS.next(h,ps)){
178        ps->callResourceBatchFunc(mode);
179    }
180
181 }
182
183 void performanceStream::foldAll(timeStamp width,phaseType phase_type){
184
185    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
186    perfStreamHandle h;
187    performanceStream *ps;
188    while(allS.next(h,ps)){
189        ps->flushBuffer();  // first flush any data values in buffer
190        ps->callFoldFunc(width,phase_type);
191    }
192 }
193
194 void performanceStream::callPhaseFunc(phaseInfo& phase)
195 {
196     if (controlFunc.pFunc) {
197         dataManager::dm->setTid(threadId);
198         dataManager::dm->newPhaseInfo(controlFunc.pFunc,
199                                       phase.GetPhaseHandle(),
200                                       phase.PhaseName(),
201                                       phase.GetPhaseHandle(),
202                                       phase.GetStartTime(),
203                                       phase.GetEndTime(),
204                                       phase.GetBucketWidth());
205     }
206 }
207
208 performanceStream *performanceStream::find(perfStreamHandle psh){
209     if(allStreams.defines(psh)){
210         return(allStreams[psh]);
211     }
212     return((performanceStream *)NULL);
213 }
214
215 //  if my_buffer has data values, flush it
216 //  then increase num_curr_mis count and my_buffer_size 
217 void performanceStream::addCurrentUser(perfStreamHandle p){
218
219     performanceStream *ps = performanceStream::find(p); 
220     if(!ps) return;
221     if(ps->next_buffer_loc && ps->my_buffer){
222         ps->flushBuffer();
223         assert(!(ps->my_buffer));
224         assert(!(ps->next_buffer_loc));
225     }
226     ps->num_curr_mis++;
227     ps->my_buffer_size++;
228 }
229
230 //  if my_buffer has data values, flush it
231 //  then increase num_global_mis count and my_buffer_size 
232 void performanceStream::addGlobalUser(perfStreamHandle p){
233
234     performanceStream *ps = performanceStream::find(p); 
235     if(!ps) return;
236     if(ps->next_buffer_loc && ps->my_buffer){
237         ps->flushBuffer();
238         assert(!(ps->my_buffer));
239         assert(!(ps->next_buffer_loc));
240     }
241     ps->num_global_mis++;
242     ps->my_buffer_size++;
243 }
244
245 //
246 // if my_buffer has data values, flush it, then decrease its size
247 //
248 void performanceStream::removeGlobalUser(perfStreamHandle p){
249
250     performanceStream *ps = performanceStream::find(p); 
251     if(!ps) return;
252     assert(ps->my_buffer_size == (ps->num_global_mis + ps->num_curr_mis));
253     if(ps->next_buffer_loc && ps->my_buffer){
254         ps->flushBuffer();
255         assert(!(ps->my_buffer));
256         assert(!(ps->next_buffer_loc));
257     }
258     if(ps->num_global_mis && ps->my_buffer_size){
259         ps->num_global_mis--;
260         ps->my_buffer_size--;
261     }
262     assert(ps->my_buffer_size == (ps->num_global_mis + ps->num_curr_mis));
263 }
264
265 //
266 // if my_buffer has data values, flush it, then decrease its size
267 //
268 void performanceStream::removeCurrentUser(perfStreamHandle p){
269
270     performanceStream *ps = performanceStream::find(p); 
271     if(!ps) return;
272     assert(ps->my_buffer_size == (ps->num_global_mis + ps->num_curr_mis));
273     if(ps->next_buffer_loc && ps->my_buffer){
274         ps->flushBuffer();
275         assert(!(ps->my_buffer));
276         assert(!(ps->next_buffer_loc));
277     }
278     if(ps->num_curr_mis && ps->my_buffer_size){
279         ps->num_curr_mis--;
280         ps->my_buffer_size--;
281     }
282     assert(ps->my_buffer_size == (ps->num_global_mis + ps->num_curr_mis));
283 }
284
285 //
286 // For all performanceStreams delete buffer size by the num_curr_mis,
287 // flush buffer if neccessary, set num_curr_mis to 0 
288 //
289 void performanceStream::removeAllCurrUsers(){
290
291    dictionary_hash_iter<perfStreamHandle,performanceStream*> allS(allStreams);
292    perfStreamHandle h;
293    performanceStream *ps;
294    while(allS.next(h,ps)){
295        assert(ps->my_buffer_size == (ps->num_global_mis + ps->num_curr_mis));
296        if(ps->next_buffer_loc && ps->my_buffer){
297            ps->flushBuffer();
298            assert(!(ps->my_buffer));
299            assert(!(ps->next_buffer_loc));
300        }
301        if(ps->num_curr_mis && ps->my_buffer_size){
302            assert(ps->my_buffer_size >= ps->num_curr_mis);
303            ps->my_buffer_size -= ps->num_curr_mis;
304            ps->num_curr_mis = 0;
305        }
306        assert(ps->my_buffer_size == ps->num_global_mis);
307    }
308 }