From 043147e77cdbe8f756f221f5a0a3c859371889b9 Mon Sep 17 00:00:00 2001 From: Joel Speed Date: Tue, 8 May 2018 16:53:58 +0100 Subject: [PATCH] Use channel for series aggregation --- pkg/custom-provider/provider.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 83625385..2e62561c 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -19,7 +19,6 @@ package provider import ( "context" "fmt" - "sync" "time" "github.com/golang/glog" @@ -284,26 +283,30 @@ func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { }, l.updateInterval, stopChan) } +type selectorSeries struct { + selector prom.Selector + series []prom.Series +} + func (l *cachingMetricsLister) updateMetrics() error { startTime := pmodel.Now().Add(-1 * l.updateInterval) // don't do duplicate queries when it's just the matchers that change seriesCacheByQuery := make(map[prom.Selector][]prom.Series) - seriesCacheByQueryLock := sync.RWMutex{} // these can take a while on large clusters, so launch in parallel // and don't duplicate selectors := make(map[prom.Selector]struct{}) + selectorSeriesChan := make(chan selectorSeries, len(l.namers)) errs := make(chan error, len(l.namers)) for _, namer := range l.namers { sel := namer.Selector() if _, ok := selectors[sel]; ok { errs <- nil + selectorSeriesChan <- selectorSeries{} continue } - seriesCacheByQueryLock.Lock() selectors[sel] = struct{}{} - seriesCacheByQueryLock.Unlock() go func() { series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel) if err != nil { @@ -311,9 +314,10 @@ func (l *cachingMetricsLister) updateMetrics() error { return } errs <- nil - seriesCacheByQueryLock.Lock() - seriesCacheByQuery[sel] = series - seriesCacheByQueryLock.Unlock() + selectorSeriesChan <- selectorSeries{ + selector: sel, + series: series, + } }() } @@ -322,6 +326,9 @@ func (l *cachingMetricsLister) updateMetrics() error { if err := <-errs; err != nil { return fmt.Errorf("unable to update list of all metrics: %v", err) } + if ss := <-selectorSeriesChan; ss.series != nil { + seriesCacheByQuery[ss.selector] = ss.series + } } close(errs)