fix bug so don't try to set initial actual value for aggComponents
[dyninst.git] / pdutil / src / sampleAggregator.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 // $Id: sampleAggregator.C,v 1.2 2001/08/28 02:50:46 schendel Exp $
43
44 #include <assert.h>
45 #include <math.h>
46 #include <iostream.h>
47 #include "pdutil/h/sampleAggregator.h"
48 #include "pdutil/h/pdDebugOstream.h"
49
50
51 #ifdef AGGREGATE_DEBUG
52 pdDebug_ostream aggu_cerr(cerr, true);
53 #else
54 pdDebug_ostream aggu_cerr(cerr, false);
55 #endif
56
57 const timeStamp  sampleAggregator::DONTWAITTOAGGREGATE(timeStamp::ts1970());
58 timeLength sampleAggregator::aggDelayTime;
59
60 // The initial start time needs to be set before values can be aggregated.
61 // Samples added through addSamplePt before initial start time set, will be
62 // queued.
63 void aggComponent::setInitialStartTime(timeStamp initialStTime) {
64   assert(! isInitialStartTimeSet());
65   // the addSamplePt function won't work if the lastProcessedSampleTime
66   // isn't greater than the start of the interval
67   if(startIntvl().isInitialized()) {
68     if(! (initialStTime > startIntvl())) {
69       cerr << "trying to start a component later in time (" << initialStTime
70            << ") than\nthe current interval start time (" << startIntvl()
71            << ").  Error.\n";
72     }
73     assert(initialStTime > startIntvl());
74   }
75   lastProcessedSampleTime = initialStTime;
76 }
77
78 // used when need to adjust start of component to a more convenient time
79 void aggComponent::resetInitialStartTime(timeStamp initialStTime) {
80   // the addSamplePt function won't work if the lastProcessedSampleTime
81   // isn't greater than the start of the interval
82   if(startIntvl().isInitialized()) {
83     if(! (initialStTime > startIntvl())) {
84       cerr << "trying to start a component later in time (" << initialStTime
85            << ") than\nthe current interval start time (" << startIntvl()
86            << ").  Error.\n";
87     }
88     assert(initialStTime > startIntvl());
89   }
90   lastProcessedSampleTime = initialStTime;
91 }
92
93 void aggComponent::addSamplePt(timeStamp timeOfSample, pdSample value) {
94   aggu_cerr << "addSamplePt- " << this << ", timeOfSample: " << timeOfSample
95             << ", lastProcessedSampleTime: " << lastProcessedSampleTime 
96             << ", value: " << value << "\n";
97   assert(timeOfSample > lastProcessedSampleTime);
98   futureSamples.add(timeOfSample, value);
99 }
100
101 void aggComponent::processSamplePt(timeStamp timeOfSample, pdSample value) {
102   aggu_cerr << "processSamplePt\n";
103   aggu_cerr << "timeOfSample: " << timeOfSample << "\n";
104   aggu_cerr << "lastProcessedSampleTime: " << lastProcessedSampleTime << "\n";
105   aggu_cerr << "startIntvl: " << startIntvl();
106   aggu_cerr << "endIntvl: " << endIntvl();
107   aggu_cerr << "value: " << value << "\n";
108
109   assert(timeOfSample > startIntvl());
110   assert(readyToProcessSamples());
111   assert(! curIntvlFilled());
112   assert(timeOfSample > lastProcessedSampleTime);
113
114   timeStamp rightTimeMark = earlier(timeOfSample, endIntvl());
115   aggu_cerr << "rightTimeMark: " << rightTimeMark;
116   timeStamp leftTimeMark  = later(lastProcessedSampleTime, startIntvl());
117   aggu_cerr << "leftTimeMark: " << leftTimeMark << "\n";
118   timeLength timeInCurIntvl = rightTimeMark - leftTimeMark;
119   timeLength timeSinceLastSample = timeOfSample - lastProcessedSampleTime;
120   aggu_cerr << "timeInCurIntvl: " << timeInCurIntvl << "\n";
121   aggu_cerr << "timeSinceLastSampl: " << timeSinceLastSample << "\n";
122   double pcntInCurIntvl = timeInCurIntvl / timeSinceLastSample;
123   aggu_cerr << "pcntInCurIntvl: " << pcntInCurIntvl;
124   pdSample addToCur = pcntInCurIntvl * value;
125   aggu_cerr << "addToCur: " << addToCur << "\n";
126   curIntvlVal += addToCur;
127   pdSample addToLeftOver = value - addToCur;
128   aggu_cerr << "addToLeftOver: " << addToLeftOver << "\n";
129   if(addToLeftOver > pdSample::Zero()) {
130     // if there's value leftOver (for the next interval) than the timeOfSample
131     // better be later in time than the endIntvl() time
132     assert(timeOfSample > endIntvl() && rightTimeMark==endIntvl());
133     addSamplePt(timeOfSample, addToLeftOver);
134   }
135   lastProcessedSampleTime = rightTimeMark;
136 }
137
138 void aggComponent::updateCurIntvlWithQueuedSamples() {
139   aggu_cerr <<"------------   updateCurIntvlWithQueuedSamples  ---------------\n";
140   aggu_cerr << "this: " << this << ", curIntvlVal: " << curIntvlVal << "\n";
141   aggu_cerr << "curIntvlFilled: " << curIntvlFilled() << ", ";
142   aggu_cerr << "futureSamples.size: " << futureSamples.size() << "\n";
143   aggu_cerr << "endIntvl: " << endIntvl() << ", ";
144   aggu_cerr << "lastProcessedSampleTime: " << lastProcessedSampleTime << "\n";
145   while(!curIntvlFilled() && futureSamples.size()>0) {
146     timeStamp frontPtTime = futureSamples.peek_first_key();
147     pdSample frontPtVal = futureSamples.peek_first_data();
148     futureSamples.delete_first();
149     processSamplePt(frontPtTime, frontPtVal);
150   }
151   aggu_cerr << "curIntvlVal: " << curIntvlVal << "\n";
152   aggu_cerr <<"^^^^^^^^^^^^   updateCurIntvlWithQueuedSamples  ^^^^^^^^^^^^^^^\n";
153 }
154
155 // We want to consider that the interval is filled if it has been marked for
156 // removal and it has received atleast one sample in this interval.  We need
157 // to do this so that all the components can be considered to have filled
158 // intervals (even those requested for removal) and thus aggregation can
159 // occur even with components that are requested for removal.  We consider
160 // the case that the lastProcessedSampleTime = startIntvl as a filled
161 // interval since the times of its samples could be in line with the parent
162 // sampleAggregator's interval boundary times, and it could have been 
163 // aggregated in the last interval
164 bool aggComponent::curIntvlFilled()  const {
165   if(isInitialStartTimeSet() && filledUpto(endIntvl()))
166     return true;
167   else
168     return false;
169 }
170
171 ostream& operator<<(ostream&s, const aggComponent &info) {
172   s << "[aggComp- " << ", lastP.SampleTime: " << info.lastProcessedSampleTime
173     << ", curIntvlVal: " << info.curIntvlVal << ", " 
174     << info.futureSamples.size() << " future samples" << ", parentAggregator: "
175     << static_cast<const void*>(&info.parentAggregator) << ", requestRemove: "
176     << info._requestRemove << "]";
177   return s;
178 }
179
180 aggComponent *sampleAggregator::newComponent() {
181   aggComponent *comp = new aggComponent(*this);
182   componentBuf.push_back(comp);
183   bCachedAllStartTimesReceived = false;
184   return comp;
185 }
186
187 // I consider a component complete for an interval if it has either
188 // filled the interval with samples or else it's last sample was at the
189 // beginning of the interval and it's marked for removal
190 bool sampleAggregator::allCompsCompleteForInterval() const {
191   for(unsigned i=0; i<componentBuf.size(); i++) {
192     aggComponent *curComp = componentBuf[i];
193     bool compDone = false;  // the component is done for this interval 
194     if(curComp->isRemoveRequested() && curComp->filledUpto(curIntvlStart))
195       compDone = true;
196     bool compIntvlComplete = false;
197     if(curComp->curIntvlFilled() || compDone)
198       compIntvlComplete = true;
199     aggu_cerr << "  (" << i << ") for comp: " << curComp << ", intvlFilled: " 
200           << curComp->curIntvlFilled() << ", done: " << compDone << "\n";
201     if(! compIntvlComplete)  return false;
202   }
203   return true;
204 }
205
206 void sampleAggregator::updateCompValues() {
207   for(unsigned i=0; i<componentBuf.size(); i++) {
208     componentBuf[i]->updateActualValWithIntvlVal();
209   }
210 }
211
212 void sampleAggregator::updateQueuedSamples() {
213   for(unsigned i=0; i<componentBuf.size(); i++) {
214     componentBuf[i]->updateCurIntvlWithQueuedSamples();
215   }
216 }
217
218 bool sampleAggregator::allCompsReadyToReceiveSamples() const {
219   aggu_cerr << "allCompInitialStartTimesReceived()\n";
220   if(bCachedAllStartTimesReceived == true) return true;
221
222   bool allCompReady = true;
223   for(unsigned i=0; i<componentBuf.size(); i++) {
224     aggu_cerr << "  comp[" << i << "]: " 
225               << componentBuf[i]->isReadyToReceiveSamples() << "\n";
226     if(! componentBuf[i]->isReadyToReceiveSamples()) {
227       allCompReady = false;
228       break;
229     }
230   }
231   bCachedAllStartTimesReceived = allCompReady;
232   aggu_cerr << "  returning " << allCompReady << "\n";
233   return allCompReady;
234 }
235
236 // returns the component start time that is the earliest out of all
237 // of the components
238 // 
239 timeStamp sampleAggregator::earliestCompInitialStartTime() const {
240   timeStamp earliestTime = timeStamp::tsFarOffTime();
241   for(unsigned i=0; i<componentBuf.size(); i++) {
242     timeStamp stTime = componentBuf[i]->getInitialStartTime();
243     if(! stTime.isInitialized()) 
244       continue;
245     if(stTime < earliestTime)
246       earliestTime = stTime;
247   }
248   return earliestTime;
249 }
250
251 // returns true if start and end time of intervals was successfully set up
252 void sampleAggregator::tryToSetupIntvls() {
253   assert(! curIntvlStart.isInitialized());
254   aggu_cerr << "tryToSetupIntvls()\n";
255
256   timeStamp earliestStartTime = earliestCompInitialStartTime();
257   setCurIntvlStart(earliestStartTime);
258   aggu_cerr << "Setting earliest time to: " << earliestStartTime << "\n";
259 }
260
261 bool sampleAggregator::readyToAggregate(timeStamp /* curTime */) {
262   // the order that the following occurs is very specific
263   // don't change this sequence without thinking through how it'll work
264
265   if(numComponents() == 0) {
266     aggu_cerr << "numComponents == 0, returning\n";
267     return false;
268   }
269
270   // one example when this can occur is when a metricInstance component has
271   // been "added" by the front-end but the front-end hasn't yet received a
272   // message from the daemon that the daemon has created the metricInstance.
273   if(! allCompsReadyToReceiveSamples()) {
274     aggu_cerr << "! allCompsReadyToReceiveSamples, returning\n";
275     return false;
276   }
277
278   // make sure all the start & end time of the current interval is set up
279
280   if(! curIntvlStart.isInitialized())
281     tryToSetupIntvls();
282
283   // we want to wait a certain amount of time until we do aggregation (even
284   // if it seems like we're all ready to go) because a new component
285   // (ie. daemon/metric-focus pair) could start with a start time in the
286   // current interval.  If we start aggregating right away, then we've moved
287   // the current interval down to far in time, past the "new" components
288   // time.  I think this race condition is possible because the time
289   // adjustment factor between the daemon and the front-end can only be so
290   // accurate.  I haven't seen any of these particular race conditions
291   // recently so we'll investigate this further if we start seeing further
292   // problems.  If the bug occurs, it should be triggered in the assert in
293   // setInitialStartTime.
294   /*
295   if(curTime != DONTWAITTOAGGREGATE) {
296     timeLength timeWaitedSoFar = curTime - curIntvlEnd;
297     if(timeWaitedSoFar < getAggDelayTime()) {
298       aggu_cerr << "timeWaitedSoFar: " << timeWaitedSoFar << " < "
299                 << "aggDelayTime: " << getAggDelayTime() << ", returning\n";
300       //      return false;
301     }
302   }
303   */
304
305   // now start updating the components' current values with any queued
306   // samples
307   updateQueuedSamples();
308
309   // this can return false when either a component hasn't received samples
310   // yet that fill up the interval or else a component is marked for removal
311   // but it hasn't filled up the interval completely
312   if(! allCompsCompleteForInterval()) {
313     aggu_cerr << "! all components complete for interval, returning\n";
314     return false;
315   }
316
317   return true;
318 }
319
320 // the aggregation is passed a set of graphs that represent the rate of
321 // change (ie. derivative) of the sample value and and an initial actual
322 // value of the graph.  The aggregation code calculates a (derivative) graph
323 // that represents the change in the (sum, max, or min) of the individual
324 // graphs and an initial actual value for this combined graph.
325
326 bool sampleAggregator::aggregate(struct sampleInterval *ret, 
327                                  timeStamp curTime) {
328   aggu_cerr << "aggregate- start: " << curIntvlStart << ", end: " 
329             << curIntvlEnd << "\n";
330
331   if(! readyToAggregate(curTime))
332     return false;
333
334   if(getInitialActualValue().isNaN()) {
335     // sets initiActualVal and lastActualVal
336     calcInitialActualVal();
337   }
338   actuallyAggregate(ret);
339   return true;
340 }
341
342 void sampleAggregator::calcInitialActualVal() {
343   pdSample aggInitActualVal;
344   for(unsigned i=0; i<componentBuf.size(); i++) {
345     aggComponent &curComp = *componentBuf[i];
346     pdSample compInitActualVal = curComp.getInitialActualValue();
347     if(i==0) {
348       aggInitActualVal = compInitActualVal;
349     } else if(aggOp == aggSum) {
350       aggInitActualVal += compInitActualVal;
351     } else if(aggOp == aggMax) {
352       if(compInitActualVal > aggInitActualVal)
353         aggInitActualVal = compInitActualVal;
354     } else if(aggOp == aggMin) {
355       if(compInitActualVal < aggInitActualVal)
356         aggInitActualVal = compInitActualVal;
357     }
358   }
359   initActualVal = aggInitActualVal;
360   lastActualVal = aggInitActualVal;
361   aggu_cerr << "  setting initActualVal to " << initActualVal << "\n";
362 }
363
364 void sampleAggregator::actuallyAggregate(struct sampleInterval *ret) {
365   pdSample aggActualVal;
366   for(unsigned i=0; i<componentBuf.size(); i++) {
367     aggComponent &curComp = *componentBuf[i];
368     
369     pdSample compActualVal = curComp.getCurActualVal();
370     if(i==0) {
371       aggActualVal = compActualVal;
372     } else if(aggOp == aggSum) {
373       aggActualVal += compActualVal;
374     } else if(aggOp == aggMax) {
375       // For aggMax and aggMin, we need to look at the actual value since the
376       // max/min of the interval (ie. the delta) isn't the max/min of the
377       // actual value, which is what we want.
378       // Remember the point in time is actually at the end of the intervals
379       // since we've waited until the intervals are filled.
380       if(compActualVal > aggActualVal)  aggActualVal = compActualVal;
381     } else if(aggOp == aggMax) {
382       if(compActualVal < aggActualVal)  aggActualVal = compActualVal;   
383     }
384   }
385   aggu_cerr << "  calculated aggActualVal: " << aggActualVal 
386             << "lastActualVal: " << lastActualVal << ", initActVal: "
387             << initActualVal << "\n";
388
389   pdSample aggIntvlVal = aggActualVal - lastActualVal;
390
391   removeComponentsRequestedToRemove();
392   assert(curIntvlEnd > curIntvlStart);
393   (*ret).start = curIntvlStart;
394   (*ret).end = curIntvlEnd;
395   (*ret).value = aggIntvlVal;
396
397   aggu_cerr << "aggreg- st: " << curIntvlStart << ", end: " << curIntvlEnd << "\n";
398   aggu_cerr << "             value(interval): " << aggIntvlVal 
399             << ",  (aggActualVal: " << aggActualVal << ")\n";
400   setCurIntvlStart(curIntvlEnd);  // curIntvlEnd = curIntvlStart + intvlWidth
401   updateCompValues();
402   lastActualVal = aggActualVal;
403 }
404
405 void sampleAggregator::removeComponentsRequestedToRemove() {
406   for(unsigned i=0; i<componentBuf.size(); i++) {
407     aggComponent *curComp = componentBuf[i];
408     if(curComp->isRemoveRequested() && curComp->numQueuedSamples()==0) {
409       aggu_cerr << "removing aggComp: " << curComp << "\n";
410       componentBuf.erase(i);
411       delete curComp;
412     }
413   }  
414 }
415
416 ostream& operator<<(ostream&s, const sampleAggregator &ag) {
417   const char *aggStr[] = { "aggSum", "aggMin", "aggMax", "aggAvg" };
418
419   s << "------  sampleAggregator  --------------------------------------\n";
420   s << "[aggOp: " << aggStr[ag.aggOp] << ", numComponents: " 
421     << ag.numComponents() << ", curIntvlStart: " << ag.curIntvlStart 
422     << ", curIntvlEnd: " << ag.curIntvlEnd << ", aggIntervalWidth: " 
423     << ag.aggIntervalWidth << ", newAggIntervalWidth: " 
424     << ag.newAggIntervalWidth << "]";
425   return s;
426 }
427
428
429
430