Advanced Configuration

This commit introduces advanced configuration.  The rate-interval and
label-prefix flags are removed, and replaced by a configuration file
that allows you to specify series queries and the rules for transforming
those into metrics queries and API resources.
This commit is contained in:
Solly Ross 2018-02-12 17:35:32 -05:00
parent 61b071c186
commit af9f11c817
12 changed files with 1077 additions and 548 deletions

View file

@ -40,42 +40,40 @@ import (
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
// Runnable represents something that can be run until told to stop.
type Runnable interface {
// Run runs the runnable forever.
Run()
// RunUntil runs the runnable until the given channel is closed.
RunUntil(stopChan <-chan struct{})
}
type prometheusProvider struct {
mapper apimeta.RESTMapper
kubeClient dynamic.ClientPool
promClient prom.Client
SeriesRegistry
rateInterval time.Duration
}
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider {
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
promClient: promClient,
namers: namers,
SeriesRegistry: &basicSeriesRegistry{
namer: metricNamer{
// TODO: populate the overrides list
overrides: nil,
mapper: mapper,
labelPrefix: labelPrefix,
},
mapper: mapper,
},
}
lister.RunUntil(stopChan)
return &prometheusProvider{
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,
SeriesRegistry: lister,
rateInterval: rateInterval,
}
}, lister
}
func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) {
@ -132,29 +130,13 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Me
}
func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace string, names ...string) (pmodel.Vector, error) {
kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...)
query, found := p.QueryForMetric(info, namespace, names...)
if !found {
return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
}
fullQuery := baseQuery
switch kind {
case CounterSeries:
fullQuery = prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String()))
case SecondsCounterSeries:
// TODO: futher modify for seconds?
fullQuery = prom.Selector(prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String())))
}
// NB: too small of a rate interval will return no results...
// sum over all other dimensions of this query (e.g. if we select on route, sum across all pods,
// but if we select on pods, sum across all routes), and split by the dimension of our resource
// TODO: return/populate the by list in SeriesForMetric
fullQuery = prom.Selector(fmt.Sprintf("sum(%s) by (%s)", fullQuery, groupBy))
// TODO: use an actual context
queryResults, err := p.promClient.Query(context.Background(), pmodel.Now(), fullQuery)
queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), query)
if err != nil {
glog.Errorf("unable to fetch metrics from prometheus: %v", err)
// don't leak implementation details to the user
@ -285,6 +267,7 @@ type cachingMetricsLister struct {
promClient prom.Client
updateInterval time.Duration
namers []MetricNamer
}
func (l *cachingMetricsLister) Run() {
@ -302,17 +285,49 @@ func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.updateInterval)
sels := l.Selectors()
// don't do duplicate queries when it's just the matchers that change
seriesCacheByQuery := make(map[prom.Selector][]prom.Series)
// TODO: use an actual context here
series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, sels...)
if err != nil {
return fmt.Errorf("unable to update list of all available metrics: %v", err)
// these can take a while on large clusters, so launch in parallel
// and don't duplicate
selectors := make(map[prom.Selector]struct{})
errs := make(chan error, len(l.namers))
for _, namer := range l.namers {
sel := namer.Selector()
if _, ok := selectors[sel]; ok {
errs <- nil
continue
}
selectors[sel] = struct{}{}
go func() {
series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
if err != nil {
errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)
return
}
errs <- nil
seriesCacheByQuery[sel] = series
}()
}
glog.V(10).Infof("Set available metric list from Prometheus to: %v", series)
// iterate through, blocking until we've got all results
for range l.namers {
if err := <-errs; err != nil {
return fmt.Errorf("unable to update list of all metrics: %v", err)
}
}
close(errs)
l.SetSeries(series)
newSeries := make([][]prom.Series, len(l.namers))
for i, namer := range l.namers {
series, cached := seriesCacheByQuery[namer.Selector()]
if !cached {
return fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector())
}
newSeries[i] = namer.FilterSeries(series)
}
return nil
glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)
return l.SetSeries(newSeries, l.namers)
}