Use channel for series aggregation

This commit is contained in:
Joel Speed 2018-05-08 16:53:58 +01:00
parent 44d755ae38
commit 043147e77c
No known key found for this signature in database
GPG key ID: 83695B8B3A376982

View file

@ -19,7 +19,6 @@ package provider
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -284,26 +283,30 @@ func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
}, l.updateInterval, stopChan) }, l.updateInterval, stopChan)
} }
type selectorSeries struct {
selector prom.Selector
series []prom.Series
}
func (l *cachingMetricsLister) updateMetrics() error { func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.updateInterval) startTime := pmodel.Now().Add(-1 * l.updateInterval)
// don't do duplicate queries when it's just the matchers that change // don't do duplicate queries when it's just the matchers that change
seriesCacheByQuery := make(map[prom.Selector][]prom.Series) seriesCacheByQuery := make(map[prom.Selector][]prom.Series)
seriesCacheByQueryLock := sync.RWMutex{}
// these can take a while on large clusters, so launch in parallel // these can take a while on large clusters, so launch in parallel
// and don't duplicate // and don't duplicate
selectors := make(map[prom.Selector]struct{}) selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.namers))
errs := make(chan error, len(l.namers)) errs := make(chan error, len(l.namers))
for _, namer := range l.namers { for _, namer := range l.namers {
sel := namer.Selector() sel := namer.Selector()
if _, ok := selectors[sel]; ok { if _, ok := selectors[sel]; ok {
errs <- nil errs <- nil
selectorSeriesChan <- selectorSeries{}
continue continue
} }
seriesCacheByQueryLock.Lock()
selectors[sel] = struct{}{} selectors[sel] = struct{}{}
seriesCacheByQueryLock.Unlock()
go func() { go func() {
series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel) series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
if err != nil { if err != nil {
@ -311,9 +314,10 @@ func (l *cachingMetricsLister) updateMetrics() error {
return return
} }
errs <- nil errs <- nil
seriesCacheByQueryLock.Lock() selectorSeriesChan <- selectorSeries{
seriesCacheByQuery[sel] = series selector: sel,
seriesCacheByQueryLock.Unlock() series: series,
}
}() }()
} }
@ -322,6 +326,9 @@ func (l *cachingMetricsLister) updateMetrics() error {
if err := <-errs; err != nil { if err := <-errs; err != nil {
return fmt.Errorf("unable to update list of all metrics: %v", err) return fmt.Errorf("unable to update list of all metrics: %v", err)
} }
if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series
}
} }
close(errs) close(errs)