dictionary_lite --> dictionary_hash to take advantage of the new
[dyninst.git] / paradyn / src / PCthread / PCfilter.C
1 /*
2  * Copyright (c) 1996 Barton P. Miller
3  * 
4  * We provide the Paradyn Parallel Performance Tools (below
5  * described as Paradyn") on an AS IS basis, and do not warrant its
6  * validity or performance.  We reserve the right to update, modify,
7  * or discontinue this software at any time.  We shall have no
8  * obligation to supply such updates or modifications or any other
9  * form of support to you.
10  * 
11  * This license is for research uses.  For such uses, there is no
12  * charge. We define "research use" to mean you may freely use it
13  * inside your organization for whatever purposes you see fit. But you
14  * may not re-distribute Paradyn or parts of Paradyn, in any form
15  * source or binary (including derivatives), electronic or otherwise,
16  * to any other organization or entity without our permission.
17  * 
18  * (for other uses, please contact us at paradyn@cs.wisc.edu)
19  * 
20  * All warranties, including without limitation, any warranty of
21  * merchantability or fitness for a particular purpose, are hereby
22  * excluded.
23  * 
24  * By your use of Paradyn, you understand and agree that we (or any
25  * other person or entity with proprietary rights in Paradyn) are
26  * under no obligation to provide either maintenance services,
27  * update services, notices of latent defects, or correction of
28  * defects for Paradyn.
29  * 
30  * Even if advised of the possibility of such damages, under no
31  * circumstances shall we (or any other person or entity with
32  * proprietary rights in the software licensed hereunder) be liable
33  * to you or any third party for direct, indirect, or consequential
34  * damages of any character regardless of type of action, including,
35  * without limitation, loss of profits, loss of use, loss of good
36  * will, or computer failure or malfunction.  You agree to indemnify
37  * us (and any other person or entity with proprietary rights in the
38  * software licensed hereunder) for any and all liability it may
39  * incur to third parties resulting from your use of Paradyn.
40  */
41
42 /*
43  * PCfilter.C
44  *
45  * Data filter class performs initial processing of raw DM data arriving 
46  * in the Performance Consultant.  
47  *
48  * $Log: PCfilter.C,v $
49  * Revision 1.32  1997/10/28 20:34:30  tamches
50  * dictionary_lite --> dictionary_hash to take advantage of the new
51  * and improved dictionary_hash class
52  *
53  * Revision 1.31  1997/03/29 02:03:48  sec
54  * Adding some debugging stuff.
55  *
56  * Revision 1.30  1996/08/16 21:03:25  tamches
57  * updated copyright for release 1.1
58  *
59  * Revision 1.29  1996/07/26 18:01:46  karavan
60  * removed debug prints.
61  *
62  * Revision 1.28  1996/07/26 07:28:11  karavan
63  * bug fix: eliminated race condition from data subscription code.  Changed
64  * data structures used as indices in class filteredDataServer.  Obsoleted
65  * class fmf.
66  *
67  * Revision 1.27  1996/07/23 20:28:01  karavan
68  * second part of two-part commit.
69  *
70  * implements new search strategy which retests false nodes under certain
71  * circumstances.
72  *
73  * change in handling of high-cost nodes blocking the ready queue.
74  *
75  * code cleanup.
76  *
77  * Revision 1.26  1996/07/22 18:55:40  karavan
78  * part one of two-part commit for new PC functionality of restarting searches.
79  *
80  * Revision 1.25  1996/05/15 04:42:02  karavan
81  * oops! removed debugging print!
82  *
83  * Revision 1.24  1996/05/15 04:35:11  karavan
84  * bug fixes: changed pendingCost pendingSearches and numexperiments to
85  * break down by phase type, so starting a new current phase updates these
86  * totals correctly; fixed error in estimated cost propagation.
87  *
88  */
89
90 #include "PCfilter.h"
91 #include "PCintern.h"
92 #include "PCmetricInst.h"
93 #include "dataManager.thread.h"
94
95 #ifdef MYPCDEBUG
96 #include <sys/time.h>
97 double TESTgetTime()
98 {
99   double now;
100   struct timeval tv;
101   gettimeofday(&tv,NULL);
102   now = (double) tv.tv_sec + ((double)tv.tv_usec/(double)1000000.0);
103   return(now);
104 }
105 static double enableTotTime=0.0,enableWorstTime=0.0;
106 static int enableCounter=0;
107 #endif
108
109 extern performanceConsultant *pc;
110
111 ostream& operator <<(ostream &os, filter& f)
112 {
113   const char *focname = dataMgr->getFocusNameFromMI(f.mi);
114   const char *metname = dataMgr->getMetricNameFromMI(f.mi);
115   os << "FILTER (mi=)" << f.mi << ":" << "metric: " << f.metric 
116     << " " << metname << endl;
117   os << " focus: " << f.foc << " " << focname << endl
118     << "  workingValue: " << f.workingValue << endl
119     << "  workingInterval: " << f.workingInterval << endl
120       << "  cum average: " << (f.workingValue / f.workingInterval) << endl;
121   
122   os  << "  nextSendTime: " << f.nextSendTime << endl
123     << "  lastDataSeen: " << f.lastDataSeen << endl
124       << "  curr int start: " << f.partialIntervalStartTime <<endl
125       << "  intervalLength: " << f.intervalLength << endl;
126   return os;
127 }
128
129 filter::filter(filteredDataServer *keeper, 
130                metricHandle met, focus focs, 
131                bool cf) 
132 : intervalLength(0), nextSendTime(0.0),  
133   lastDataSeen(0), partialIntervalStartTime(0.0), workingValue(0), 
134   workingInterval(0), mi(0), metric(met), foc (focs), 
135   status(Inactive), costFlag(cf), server(keeper)
136 {
137   ;
138 }
139  
140 void 
141 filter::updateNextSendTime(timeStamp startTime) 
142 {
143   // each filter's interval size may get out of sync with the server 
144   // for one send, so here we check if we need to resync.
145   timeStamp newint = server->intervalSize;
146   if (newint > intervalLength) {
147     // intervalSize has changed since we last checked.  This means
148     // this is the first piece of data since the fold occurred.
149     intervalLength = newint;
150     nextSendTime = startTime + newint;
151     // ensure that the server always contains a valid send time 
152     // for the current interval size.  
153     if (nextSendTime > server->nextSendTime)
154       server->nextSendTime = nextSendTime;
155   } else {
156     // need the loop here in case there's been an interruption in data 
157     // values.
158     while (nextSendTime < (startTime + newint))
159       nextSendTime += newint;
160   }
161 }
162
163 void filter::getInitialSendTime(timeStamp startTime)
164 {
165   // set first value for intervalLength for this filter
166   intervalLength = server->intervalSize;
167   // the server gives us a valid send time for this interval size, although
168   // it may be old.  we can catch up to startTime by simply adding 
169   // intervalLength sized increments till first send will be a complete 
170   // interval.
171   nextSendTime = server->nextSendTime;
172   while (nextSendTime < (startTime + intervalLength))
173     nextSendTime += intervalLength;
174 }
175
176 void filter::wakeUp()
177 {
178   server->makeEnableDataRequest (metric, foc);
179   status = ActivationRequestPending;
180 }
181   
182 void
183 avgFilter::newData(sampleValue newVal, timeStamp start, timeStamp end)
184 {
185 #ifdef MYPCDEBUG
186     double t1,t2,t3,t4;
187     t1=TESTgetTime();
188 #endif
189   // lack of forward progress signals a serious internal data handling error
190   // in paradyn DM and will absolutely break the PC
191   assert (end > lastDataSeen);
192
193   // first, make adjustments to nextSendTime, partialIntervalStartTime,
194   // and/or start to handle some special cases
195   if (nextSendTime == 0) {
196     // CASE 1: this is first data received since this filter was created.
197     // We compute nextSendTime, the first possible send, and initialize 
198     // partial interval to begin at time "start".
199     getInitialSendTime(start);  
200     partialIntervalStartTime = nextSendTime - intervalLength;
201   } else if (start > lastDataSeen)  {
202     // CASE 2: this is first data value after a gap in data.
203     // We re-compute nextSendTime and reset partial interval (if any)
204     // to begin at time "start".  if there was partial data before this 
205     // new data came along, it is already included in the running 
206     // average, so we're not discarding it, we're just not sending it
207     // separately, since we never send partial intervals.
208     updateNextSendTime(start);
209     partialIntervalStartTime = nextSendTime - intervalLength;
210   } else if (start < lastDataSeen) {
211     // CASE 3: partially duplicate data after a fold
212     // we've already seen data for a piece of this interval, so we reset 
213     // the start time to use the new portion and ignore the rest
214     start = lastDataSeen;
215   }
216   if (end > nextSendTime) {
217   // CASE 4: first data value after a fold
218   // call to updateNextSendTime will correctly adjust intervalSize
219   // we roll back nextSendTime to last send, then recompute nextSendTime.
220     nextSendTime = partialIntervalStartTime;
221     updateNextSendTime (partialIntervalStartTime);
222   }
223
224   // now we update our running average, and send a data value to subscribers
225   // if this datum fills an interval.
226   // Note: because intervalLength is always set to 
227   //       MAX(requested value, bucket size)
228   // we never overlap more than one interval with a single datum
229
230   timeStamp currInterval = end - start;
231
232  if (end >= nextSendTime) {
233     // PART A: if we have a full interval of data, split off piece of 
234     // data which is within interval (if needed) and send.
235     timeStamp pieceOfInterval = nextSendTime - start;
236     workingValue += newVal * (pieceOfInterval);
237     workingInterval += pieceOfInterval;
238 #ifdef MYPCDEBUG
239     t3=TESTgetTime();
240 #endif
241     sendValue(mi, workingValue/workingInterval, partialIntervalStartTime, 
242               nextSendTime, 0);
243 #ifdef MYPCDEBUG
244     t4=TESTgetTime();
245 #endif
246 #ifdef PCDEBUG
247     // debug printing
248     if (performanceConsultant::printDataTrace) {
249       cout << "FILTER SEND mi=" << mi << " mh=" << metric << " foc=" << foc 
250         << " value=" << workingValue/workingInterval 
251           << "interval=" << start << " to " << end << endl;
252     }
253 #endif
254
255     // its important to update intervalLength only after we send a sample;
256     // part of what this filtering accomplishes is keeping intervals correctly
257     // aligned in spite of the unaligned data the data manager sends 
258     // around a fold.  If you're not clear on what happens around a fold,
259     // go find out before you try to change this code!!
260     partialIntervalStartTime = nextSendTime;
261     updateNextSendTime (nextSendTime);
262     // adjust currInterval to contain only portion of time not just sent,   
263     // since we already added the other portion to workingInterval above
264     currInterval = currInterval - pieceOfInterval;     
265   }
266   // PART B: either the new value received did not complete an interval, 
267   // or it did and there may be some remainder data.  If no remainder data,
268   // currInterval will be 0 so the next two instructions have no effect.
269   workingValue += newVal * currInterval;
270   workingInterval += currInterval;
271   lastDataSeen = end;
272   
273 #ifdef PCDEBUG
274   // debug printing
275   if (performanceConsultant::printDataTrace) {
276     cout << *this;
277   }
278 #endif
279
280 #ifdef MYPCDEBUG
281     t2=TESTgetTime();
282     if ((t2-t1) > 1.0) {
283       printf("********** filter::newData took %5.2f seconds, sendValue took %5.2f seconds\n",t2-t1,t4-t3);
284     }
285 #endif
286 }
287
288 void
289 valFilter::newData(sampleValue newVal, timeStamp start, timeStamp end)
290 {
291   // lack of forward progress signals a serious internal data handling error
292   // in paradyn DM and will absolutely break the PC
293   assert (end > lastDataSeen);
294   // first, make adjustments to nextSendTime, partialIntervalStartTime,
295   // and/or start to handle some special cases
296   if (nextSendTime == 0) {
297     // CASE 1: this is first data received since this filter was created.
298     // We compute nextSendTime, the first possible send, and initialize 
299     // partial interval to begin at time "start".
300     getInitialSendTime(start);  
301     partialIntervalStartTime = nextSendTime - intervalLength;
302   } else if (start > lastDataSeen)  {
303     // CASE 2: this is first data value after a gap in data.
304     // We re-compute nextSendTime and reset partial interval (if any)
305     // to begin at time "start".  if there was partial data before this 
306     // new data came along, it is discarded since it may no longer 
307     // be current. 
308     updateNextSendTime(start);
309     partialIntervalStartTime = nextSendTime - intervalLength;
310   } else if (start < lastDataSeen) {
311     // CASE 3: partially duplicate data after a fold
312     // we've already seen data for a piece of this interval, so we reset 
313     // the start time to use the new portion and ignore the rest
314     start = lastDataSeen;
315   }
316   if (end > nextSendTime) {
317   // CASE 4: first data value after a fold
318   // call to updateNextSendTime will correctly adjust intervalSize
319   // we roll back nextSendTime to last send, then recompute nextSendTime.
320     nextSendTime = partialIntervalStartTime;
321     updateNextSendTime (partialIntervalStartTime);
322   }
323
324   // now we update our running average, and send a data value to subscribers
325   // if this datum fills an interval.
326   // Note: because intervalLength is always set to 
327   //       MAX(requested value, bucket size)
328   // we never overlap more than one interval with a single datum
329
330   timeStamp currInterval = end - start;
331
332  if (end >= nextSendTime) {
333     // PART A: if we have a full interval of data, split off piece of 
334     // data which is within interval (if needed) and send.
335     timeStamp pieceOfInterval = nextSendTime - start;
336     workingValue += newVal * (pieceOfInterval);
337     workingInterval += pieceOfInterval;
338     sendValue(mi, workingValue/workingInterval, partialIntervalStartTime, 
339               nextSendTime, 0);
340 #ifdef PCDEBUG
341     // debug printing
342     if (performanceConsultant::printDataTrace) {
343       cout << "FILTER SEND mi=" << mi << " mh=" << metric << " foc=" << foc 
344         << " value=" << workingValue/workingInterval 
345           << "interval=" << start << " to " << end << endl;
346     }
347 #endif
348
349     // its important to update intervalLength only after we send a sample;
350     // part of what this filtering accomplishes is keeping intervals correctly
351     // aligned in spite of the unaligned data the data manager sends 
352     // around a fold.  If you're not clear on what happens around a fold,
353     // go find out before you try to change this code!!
354     partialIntervalStartTime = nextSendTime;
355     updateNextSendTime (nextSendTime);
356     // adjust currInterval to contain only portion of time not just sent,   
357     // since we already added the other portion to workingInterval above
358     currInterval = currInterval - pieceOfInterval;     
359   }
360
361   // PART B: either the new value received did not complete an interval, 
362   // or it did and there may be some remainder data.  If no remainder data,
363   // currInterval will be 0 so workingValue and workingInterval will both
364   // be set to 0.
365   workingValue = newVal * currInterval;
366   workingInterval = currInterval;
367   lastDataSeen = end;
368   
369 #ifdef PCDEBUG
370   // debug printing
371   if (performanceConsultant::printDataTrace) {
372     cout << *this;
373   }
374 #endif
375 }
376
377 filteredDataServer::filteredDataServer(unsigned phID)
378 : nextSendTime(0.0), 
379   DataFilters(filteredDataServer::fdid_hash)
380 {
381   dmPhaseID = phID - 1;
382   if (phID == GlobalPhaseID) {
383     phType = GlobalPhase;
384     currentBinSize = dataMgr->getGlobalBucketWidth();
385     performanceConsultant::globalRawDataServer = this;
386   } else {
387     phType = CurrentPhase;
388     currentBinSize = dataMgr->getCurrentBucketWidth();
389     performanceConsultant::currentRawDataServer = this;
390   }
391   timeStamp minGranularity = performanceConsultant::minObservationTime/4.0;
392   // ensure that starting interval size is an even multiple of dm's 
393   // bucket width, for efficiency:  once bucket width passes minimum,
394   // we will send on each piece of data as it comes rather than splitting
395   // it across intervals each time.
396   timeStamp binFactor = currentBinSize;
397   while (binFactor < minGranularity) {
398     binFactor *= 2;
399   }
400   // adjusted minGranularity, or binsize, whichever's bigger
401   intervalSize = binFactor;
402   miIndex = new (dictionary_hash<focus, filter*>*) 
403     [performanceConsultant::numMetrics];
404   for (unsigned j = 0; j < performanceConsultant::numMetrics; j++)
405     miIndex[j] = new dictionary_hash<focus, filter*> 
406       (filteredDataServer::fdid_hash);
407 }
408
409 //
410 // dm has doubled the bin size for the data histogram.  server needs the 
411 // actual size to convert data values to intervals.  Also, we adjust
412 // intervalSize if needed to ensure intervalSize = MAX(minInterval, binSize)
413 //
414 void
415 filteredDataServer::newBinSize (timeStamp bs)
416 {
417   currentBinSize = bs;
418   if (bs > intervalSize)
419     intervalSize = bs;
420 }
421
422 //
423 // restart PC after PC-pause (not application-level pause)
424 //
425 void
426 filteredDataServer::resubscribeAllData() 
427 {
428   for (unsigned i = 0; i < AllDataFilters.size(); i++) {
429     if (AllDataFilters[i]->pausable()) {
430       makeEnableDataRequest (AllDataFilters[i]->getMetric(),
431                              AllDataFilters[i]->getFocus());
432     }
433   }
434 }
435
436 //
437 // stop all data flow to PC for a PC-pause
438 //
439 void
440 filteredDataServer::unsubscribeAllData() 
441 {
442   for (unsigned i = 0; i < AllDataFilters.size(); i++) {
443     if (AllDataFilters[i]->pausable()) {
444       dataMgr->disableDataCollection(performanceConsultant::pstream, 
445                                      AllDataFilters[i]->getMI(), phType);
446     }
447   }
448 }
449
450 filteredDataServer::~filteredDataServer ()
451 {
452   unsubscribeAllData();
453   for (unsigned i = 0; i < AllDataFilters.size(); i++) {
454     delete AllDataFilters[i];
455   }
456 }
457
458 void
459 filteredDataServer::newDataEnabled(vector<metricInstInfo> *newlyEnabled)
460 {
461   //** cache focus name here??
462   filter *curr = NULL;
463   metricInstInfo *miicurr;
464   unsigned nesz = newlyEnabled->size();
465   for (unsigned i = 0; i < nesz; i++) {
466     miicurr = &((*newlyEnabled)[i]);
467
468 #ifdef PCDEBUG
469     cout << "enable REPLY for m=" << miicurr->m_id << " f=" << miicurr->r_id 
470       << " mi=" << miicurr->mi_id  
471       << "  enabled=" << miicurr->successfully_enabled << endl;
472 #endif
473
474     bool beenEnabled = DataFilters.find(miicurr->mi_id, curr);
475
476     if (beenEnabled && curr) {
477       // this request was part of a PC pause; there is no pending
478       // record and metricInstanceHandle is unchanged
479       curr->sendEnableReply(miicurr->m_id, miicurr->r_id, 
480                             miicurr->mi_id, miicurr->successfully_enabled);
481       return;
482     } 
483     // mihandle not in use; get filter for this mh/f pair
484     filter *curr = findFilter(miicurr->m_id, miicurr->r_id);
485     if (!curr) {
486       //**
487       //cout << "UH-OH, FILTER NOT FOUND! mh=" << miicurr->m_id << " f=" 
488         //<< miicurr->r_id << endl;
489       return;
490     }
491     if (curr->status != filter::ActivationRequestPending) {
492       // this enable request was cancelled while it was being handled by the dm
493       // so we need to send back an explicit cancel request
494       if (phType == GlobalPhase)
495         dataMgr->disableDataAndClearPersistentData
496           (performanceConsultant::pstream, miicurr->mi_id, phType, true, false);
497       else
498         dataMgr->disableDataAndClearPersistentData
499           (performanceConsultant::pstream, miicurr->mi_id, phType, false, true);
500
501 #ifdef PCDEBUG
502       cout << "PCdisableData: m=" << miicurr->m_id << " f=" << miicurr->r_id 
503         << " mi=" << miicurr->mi_id << endl;
504 #endif
505     } else {
506       if (miicurr->successfully_enabled) {
507         curr->mi = miicurr->mi_id;
508         DataFilters[(fdsDataID) curr->mi] = curr;
509         curr->status = filter::Active;
510         curr->sendEnableReply(miicurr->m_id, miicurr->r_id, miicurr->mi_id, true);
511       } else { 
512         // enable failed 
513         //**
514         curr->sendEnableReply(miicurr->m_id, miicurr->r_id, 0, false);
515         curr->status = filter::Inactive;
516       }
517     }
518   }
519
520 #ifdef PCDEBUG
521   if (performanceConsultant::printDataCollection) 
522     printPendings();
523     ;
524 #endif
525 }
526         
527 void 
528 filteredDataServer::makeEnableDataRequest (metricHandle met,
529                                            focus foc)
530 {
531
532 #ifdef PCDEBUG
533   if (performanceConsultant::printDataCollection)
534     printPendings();
535 #endif
536
537   vector<metricRLType> *request = new vector<metricRLType>;
538   metricRLType request_entry(met, foc);
539   *request += request_entry;
540   assert(request->size() == 1);
541   
542   // make async request to enable data
543   unsigned myPhaseID = getPCphaseID();
544   if (phType == GlobalPhase)
545     dataMgr->enableDataRequest2(performanceConsultant::pstream, request, 
546                                 myPhaseID, phType, dmPhaseID, 1, 0, 0);
547   else
548     dataMgr->enableDataRequest2(performanceConsultant::pstream, request, 
549                                 myPhaseID, phType, dmPhaseID, 0, 0, 1);
550 #ifdef PCDEBUG
551   // ---------------------  debug printing ----------------------------
552   if (performanceConsultant::printDataCollection) {
553     cout << "FDS: subscribed to "  
554       << " methandle=" << met 
555         << " foc= " << "fochandle=" << foc << endl;
556   }
557 #endif
558 }
559
560 void
561 filteredDataServer::printPendings() 
562 {
563   cout << "Pending Enables:" << endl;
564   cout << "=============== " << endl;
565   for (unsigned k = 0; k < AllDataFilters.size(); k++) {
566     filter *curr = AllDataFilters[k];
567     if (curr && curr->isPending()) {
568       cout << " mh:" << curr->metric << " f:" << curr->foc;
569     }
570   }
571 }
572
573
574 filter *
575 filteredDataServer::findFilter(metricHandle mh, focus f)
576 {
577   filter *fil;
578   dictionary_hash<focus, filter*> *curr = miIndex[mh];
579   bool fndflag = curr->find(f, fil);
580   if (!fndflag) return (filter *)NULL;
581   return fil;
582 }
583
584 void
585 filteredDataServer::addSubscription(fdsSubscriber sub,
586                                     metricHandle mh,
587                                     focus f,
588                                     filterType ft,
589                                     bool flag)
590 {
591   // is there already a filter for this met/focus pair?
592   filter *subfilter = findFilter(mh, f);
593
594   if (subfilter) {
595     if (flag) subfilter->setcostFlag();
596     // add subscriber to filter, which already exists
597
598     if (subfilter->status == filter::Active) {
599       subfilter->addConsumer (sub);
600       subfilter->sendEnableReply(mh, f, subfilter->getMI(), true);
601     } else if (subfilter->status == filter::Inactive) {
602       subfilter->addConsumer (sub);
603       subfilter->wakeUp();
604     } else if (subfilter->status == filter::ActivationRequestPending) {
605       subfilter->addConsumer(sub);
606     }
607     return;
608   }
609
610   // this is first request received for this met focus pair; construct new filter
611   if (ft == nonfiltering)
612     subfilter = new valFilter (this, mh, f, flag);
613   else
614     subfilter = new avgFilter (this, mh, f, flag);
615   AllDataFilters += subfilter;
616   (*(miIndex[mh])) [f] = subfilter;  
617   subfilter->addConsumer(sub);
618   subfilter->wakeUp();
619 }
620
621 void 
622 filteredDataServer::cancelSubRequest (fdsSubscriber sub, metricHandle met, focus foc)
623 {
624   // find the pending enable request for this met/focus pair
625   filter *subfilter = findFilter(met, foc);
626   assert (subfilter);
627
628   if (subfilter->numConsumers == 1) {
629     // this was the only pending request; change status
630     subfilter->status = filter::Inactive;
631   }
632   subfilter->rmConsumer(sub);
633 }
634  
635 void
636 filteredDataServer::inActivateFilter (filter *fil)
637 {
638   // ask dm to disable metric/focus pair 
639   if (phType == GlobalPhase)
640     dataMgr->disableDataAndClearPersistentData
641       (performanceConsultant::pstream, fil->getMI(), phType, true, false);
642   else
643     dataMgr->disableDataAndClearPersistentData
644       (performanceConsultant::pstream, fil->getMI(), phType, false, true);
645
646   // update server indices
647   fil->inactivate();
648   DataFilters[fil->getMI()] = NULL;
649   
650 #ifdef PCDEBUG
651   cout << "PCdisableData: m=" << fil->getMetric() << " f=" << fil->getFocus() 
652     << " mi=" << fil->getMI() << endl;
653 #endif
654 }
655   
656 // 
657 // all filters live until end of Search, although no subscribers may remain
658 // if subscriber count goes to 0, disable raw data
659 //
660 void 
661 filteredDataServer::endSubscription(fdsSubscriber sub, metricHandle met, focus foc)
662 {
663   filter *curr = findFilter (met, foc);
664   if (!curr) 
665     // invalid request 
666     return;
667   int subsLeft = curr->rmConsumer(sub);
668   if (subsLeft == 0) {
669     // we just removed the only subscriber
670     inActivateFilter(curr);
671   }
672 #ifdef PCDEBUG
673   if (performanceConsultant::printDataCollection) {
674     cout << "FDS: subscription ended: " << curr->getMI() << "numLeft=" << subsLeft 
675          << endl; 
676   }
677 #endif
678 }
679   
680 void 
681 filteredDataServer::endSubscription(fdsSubscriber sub, 
682                                     fdsDataID subID)
683 {
684   // find filter by subID
685   int subsLeft;
686   filter *curr;
687   bool fndflag = DataFilters.find((unsigned)subID, curr);
688   if (!fndflag) return;
689   subsLeft = curr->rmConsumer(sub);
690   if (subsLeft == 0) {
691     // we just removed the only subscriber
692     inActivateFilter(curr);
693   }
694 #ifdef PCDEBUG
695   // debug printing
696   if (performanceConsultant::printDataCollection) {
697     cout << "FDS: subscription ended: " << subID << "numLeft=" << subsLeft 
698          << endl; 
699   }
700 #endif
701 }
702
703 void
704 filteredDataServer::newData (metricInstanceHandle mih, 
705                              sampleValue value, 
706                              int bucketNumber)
707 {
708   filter *curr;
709   bool fndflag = DataFilters.find(mih, curr);
710   if (fndflag && curr) {
711     // convert data to start and end based on bin
712     timeStamp start = currentBinSize * bucketNumber;
713     timeStamp end = currentBinSize * (bucketNumber + 1);
714     curr->newData(value, start, end);
715   } 
716
717 #ifdef PCDEBUG
718   else {
719     cout << "FDS unexpected data, mi handle = " << mih << endl;
720   }
721 #endif
722   //DataFilters.printStats();
723
724 }
725
726
727