Use channel for series aggregation

This fixes asynchronous read/write issues to when performing series
discovery by pushing series results onto a channel, instead of trying to
write them directly to a map.
This commit is contained in:
Joel Speed 2018-05-08 16:53:58 +01:00 committed by Solly Ross
parent 32e4c5b1c7
commit 6089fa8528

View file

@ -19,9 +19,9 @@ package provider
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/golang/glog"
"time" "time"
"github.com/golang/glog"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
pmodel "github.com/prometheus/common/model" pmodel "github.com/prometheus/common/model"
apierr "k8s.io/apimachinery/pkg/api/errors" apierr "k8s.io/apimachinery/pkg/api/errors"
@ -282,6 +282,11 @@ func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
}, l.updateInterval, stopChan) }, l.updateInterval, stopChan)
} }
type selectorSeries struct {
selector prom.Selector
series []prom.Series
}
func (l *cachingMetricsLister) updateMetrics() error { func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.updateInterval) startTime := pmodel.Now().Add(-1 * l.updateInterval)
@ -291,11 +296,13 @@ func (l *cachingMetricsLister) updateMetrics() error {
// these can take a while on large clusters, so launch in parallel // these can take a while on large clusters, so launch in parallel
// and don't duplicate // and don't duplicate
selectors := make(map[prom.Selector]struct{}) selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.namers))
errs := make(chan error, len(l.namers)) errs := make(chan error, len(l.namers))
for _, namer := range l.namers { for _, namer := range l.namers {
sel := namer.Selector() sel := namer.Selector()
if _, ok := selectors[sel]; ok { if _, ok := selectors[sel]; ok {
errs <- nil errs <- nil
selectorSeriesChan <- selectorSeries{}
continue continue
} }
selectors[sel] = struct{}{} selectors[sel] = struct{}{}
@ -306,7 +313,10 @@ func (l *cachingMetricsLister) updateMetrics() error {
return return
} }
errs <- nil errs <- nil
seriesCacheByQuery[sel] = series selectorSeriesChan <- selectorSeries{
selector: sel,
series: series,
}
}() }()
} }
@ -315,6 +325,9 @@ func (l *cachingMetricsLister) updateMetrics() error {
if err := <-errs; err != nil { if err := <-errs; err != nil {
return fmt.Errorf("unable to update list of all metrics: %v", err) return fmt.Errorf("unable to update list of all metrics: %v", err)
} }
if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series
}
} }
close(errs) close(errs)