Some more adjustment.

* Breaking out some types to make the functionality more composable and easier to digest. (e.g. `basicMetricLister` interacts with Prometheus and `periodicMetricLister` periodically invokes `basicMetricLister`.
* Pulling out some of the type embedding between `basicSeriesRegistry` and `MetricLister` to make it easier to digest.
* Deleting the `/metric-converter` code because I'm pretty certain it's not going to be necessary as things transition to using the namer-based configuration.
* Some light-ish refactoring in `metricNamer` to get some re-use out of query generation in preparation for using it with external metrics.
This commit is contained in:
Tony Compton 2018-07-17 15:50:32 -04:00
parent 277734dcdb
commit 76217a552b
15 changed files with 490 additions and 558 deletions

View file

@ -0,0 +1,148 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"context"
"fmt"
"time"
"github.com/golang/glog"
pmodel "github.com/prometheus/common/model"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
// Runnable represents something that can be run until told to stop.
type Runnable interface {
// Run runs the runnable forever.
Run()
// RunUntil runs the runnable until the given channel is closed.
RunUntil(stopChan <-chan struct{})
}
//A MetricLister provides a window into all of the metrics that are available within a given
//Prometheus instance, classified as either Custom or External metrics, but presented generically
//so that it can manage both types simultaneously.
type MetricLister interface {
// Run()
// UpdateMetrics() error
// GetAllMetrics() []GenericMetricInfo
// GetAllCustomMetrics() []GenericMetricInfo
// GetAllExternalMetrics() []GenericMetricInfo
// GetInfoForMetric(infoKey GenericMetricInfo) (seriesInfo, bool)
ListAllMetrics() (metricUpdateResult, error)
}
type MetricListerWithNotification interface {
//It can list metrics, just like a normal MetricLister.
MetricLister
//Because it periodically pulls metrics, it needs to be Runnable.
Runnable
//It provides notifications when it has new data to supply.
SetNotificationReceiver(func(metricUpdateResult))
}
type basicMetricLister struct {
promClient prom.Client
namers []MetricNamer
lookback time.Duration
}
func NewBasicMetricLister(promClient prom.Client, namers []MetricNamer, lookback time.Duration) MetricLister {
lister := basicMetricLister{
promClient: promClient,
namers: namers,
}
return &lister
}
type selectorSeries struct {
selector prom.Selector
series []prom.Series
}
func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
result := metricUpdateResult{
series: make([][]prom.Series, 0),
namers: make([]MetricNamer, 0),
}
startTime := pmodel.Now().Add(-1 * l.lookback)
// these can take a while on large clusters, so launch in parallel
// and don't duplicate
selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.namers))
errs := make(chan error, len(l.namers))
for _, namer := range l.namers {
sel := namer.Selector()
if _, ok := selectors[sel]; ok {
errs <- nil
selectorSeriesChan <- selectorSeries{}
continue
}
selectors[sel] = struct{}{}
go func() {
series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
if err != nil {
errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)
return
}
errs <- nil
selectorSeriesChan <- selectorSeries{
selector: sel,
series: series,
}
}()
}
// don't do duplicate queries when it's just the matchers that change
seriesCacheByQuery := make(map[prom.Selector][]prom.Series)
// iterate through, blocking until we've got all results
for range l.namers {
if err := <-errs; err != nil {
return result, fmt.Errorf("unable to update list of all metrics: %v", err)
}
if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series
}
}
close(errs)
newSeries := make([][]prom.Series, len(l.namers))
for i, namer := range l.namers {
series, cached := seriesCacheByQuery[namer.Selector()]
if !cached {
return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector())
}
newSeries[i] = namer.FilterSeries(series)
}
glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)
result.series = newSeries
result.namers = l.namers
return result, nil
}
type metricUpdateResult struct {
series [][]prom.Series
namers []MetricNamer
}

View file

@ -1,111 +0,0 @@
package provider
import (
"fmt"
s "strings"
provider "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)
type ExternalMetricQueryBuilder interface {
BuildPrometheusQuery(namespace string, metricName string, metricSelector labels.Selector, queryMetadata provider.QueryMetadata) string
}
type externalMetricQueryBuilder struct {
}
func NewExternalMetricQueryBuilder() ExternalMetricQueryBuilder {
return &externalMetricQueryBuilder{}
}
func (p *externalMetricQueryBuilder) BuildPrometheusQuery(namespace string, metricName string, metricSelector labels.Selector, queryMetadata provider.QueryMetadata) string {
//TODO: At least for my Prometheus install, the "namespace" label doesn't seem to be
//directly applied to the time series. I'm using prometheus-operator. The grafana dashboards
//seem to query for the pods in a namespace from kube_pod_info and then apply pod-specific
//label filters. This might need some more thought. Disabling for now.
// namespaceSelector := p.makeLabelFilter("namespace", "=", namespace)
labelSelectors := p.convertSelectors(metricSelector)
joinedLabels := s.Join(labelSelectors, ", ")
//TODO: Both the aggregation method and window should probably be configurable.
//I don't think we can make assumptions about the nature of someone's metrics.
//I'm guessing this might be covered by the recently added advanced configuration
//code, but I haven't yet had an opportunity to dig into that and understand it.
//We'll leave this here for testing purposes for now.
//As reasonable defaults, maybe:
//rate(...) for counters
//avg_over_time(...) for gauges
//I'm guessing that SeriesRegistry might store the metric type, but I haven't looked yet.
aggregation := queryMetadata.Aggregation
window := queryMetadata.WindowInSeconds
return fmt.Sprintf("%s(%s{%s}[%ds])", aggregation, metricName, joinedLabels, window)
}
func (p *externalMetricQueryBuilder) makeLabelFilter(labelName string, operator string, targetValue string) string {
return fmt.Sprintf("%s%s\"%s\"", labelName, operator, targetValue)
}
func (p *externalMetricQueryBuilder) convertSelectors(metricSelector labels.Selector) []string {
requirements, _ := metricSelector.Requirements()
selectors := []string{}
for i := 0; i < len(requirements); i++ {
selector := p.convertRequirement(requirements[i])
selectors = append(selectors, selector)
}
return selectors
}
func (p *externalMetricQueryBuilder) convertRequirement(requirement labels.Requirement) string {
labelName := requirement.Key()
values := requirement.Values().List()
stringValues := values[0]
valueCount := len(values)
if valueCount > 1 {
stringValues = s.Join(values, "|")
}
operator := p.selectOperator(requirement.Operator(), valueCount)
return p.makeLabelFilter(labelName, operator, stringValues)
}
func (p *externalMetricQueryBuilder) selectOperator(operator selection.Operator, valueCount int) string {
if valueCount > 1 {
return p.selectRegexOperator(operator)
}
return p.selectSingleValueOperator(operator)
}
func (p *externalMetricQueryBuilder) selectRegexOperator(operator selection.Operator) string {
switch operator {
case selection.Equals:
case selection.In:
return "=~"
case selection.NotIn:
case selection.NotEquals:
return "!~"
}
//TODO: Cover more cases, supply appropriate errors for any unhandled cases.
return "="
}
func (p *externalMetricQueryBuilder) selectSingleValueOperator(operator selection.Operator) string {
switch operator {
case selection.Equals:
return "="
case selection.NotEquals:
return "!="
}
//TODO: Cover more cases, supply appropriate errors for any unhandled cases.
return "="
}

View file

@ -1,30 +0,0 @@
package provider
import (
"testing"
conv "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)
var queryBuilder = NewExternalMetricQueryBuilder()
func TestBuildPrometheusQuery(t *testing.T) {
fakeSelector := labels.NewSelector()
metricName := "queue_name"
requirement, _ := labels.NewRequirement(metricName, selection.Equals, []string{"processing"})
fakeSelector = fakeSelector.Add(*requirement)
meta := conv.QueryMetadata{
Aggregation: "rate",
MetricName: metricName,
WindowInSeconds: 120,
}
result := queryBuilder.BuildPrometheusQuery("default", "queue_length", fakeSelector, meta)
expectedResult := "rate(queue_length{queue_name=\"processing\"}[120s])"
if result != expectedResult {
t.Errorf("Incorrect query generated. Expected: %s | Actual %s", result, expectedResult)
}
}

View file

@ -6,7 +6,6 @@ import (
pmodel "github.com/prometheus/common/model"
conv "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta"
@ -34,7 +33,7 @@ type externalPrometheusProvider struct {
queryBuilder ExternalMetricQueryBuilder
metricConverter conv.MetricConverter
SeriesRegistry
seriesRegistry SeriesRegistry
}
//TODO: It probably makes more sense to, once this is functional and complete, roll the
@ -44,37 +43,26 @@ type externalPrometheusProvider struct {
//to do one of those two things instead of trying to run the two providers
//independently.
func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, queryBuilder ExternalMetricQueryBuilder, metricConverter conv.MetricConverter) (provider.ExternalMetricsProvider, Runnable) {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
promClient: promClient,
namers: namers,
SeriesRegistry: &basicSeriesRegistry{
mapper: mapper,
},
}
func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, metricConverter conv.MetricConverter, seriesRegistry SeriesRegistry) (provider.ExternalMetricsProvider, Runnable) {
return &externalPrometheusProvider{
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,
metricConverter: metricConverter,
SeriesRegistry: lister,
seriesRegistry: seriesRegistry,
}, lister
}
func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) {
//TODO: Get the appropriate time window and aggregation type from somewhere
//based on the metric being selected. Does SeriesRegistry have the metric type cached?
queryMetadata := conv.QueryMetadata{
MetricName: metricName,
WindowInSeconds: 120,
Aggregation: "rate",
selector, found := p.seriesRegistry.QueryForExternalMetric(metricInfo, metricSelector)
if !found {
return &external_metrics.ExternalMetricValueList{
Items: []external_metrics.ExternalMetricValue{},
}, nil
}
query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata)
selector := prom.Selector(query)
// query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata)
//TODO: I don't yet know what a context is, but apparently I should use a real one.
queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector)
@ -89,6 +77,5 @@ func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricN
}
func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
//TODO: Provide a real response.
return nil
return p.seriesRegistry.ListAllExternalMetrics()
}

View file

@ -1,37 +0,0 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type matrixConverter struct {
}
//NewMatrixConverter creates a MatrixConverter capable of converting
//matrix Prometheus query results into external metric types.
func NewMatrixConverter() MetricConverter {
return &matrixConverter{}
}
func (c *matrixConverter) Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValMatrix {
return nil, errors.New("matrixConverter can only convert scalar query results")
}
toConvert := queryResult.Matrix
if toConvert == nil {
return nil, errors.New("the provided input did not contain matrix query results")
}
return c.convert(toConvert)
}
func (c *matrixConverter) convert(result *model.Matrix) (*external_metrics.ExternalMetricValueList, error) {
//TODO: Implementation.
return nil, errors.New("converting Matrix results is not yet supported")
}

View file

@ -1,47 +0,0 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
//MetricConverter provides a unified interface for converting the results of
//Prometheus queries into external metric types.
type MetricConverter interface {
Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error)
}
type metricConverter struct {
scalarConverter MetricConverter
vectorConverter MetricConverter
matrixConverter MetricConverter
}
//NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types
//returned by the Prometheus client into external metrics types.
func NewMetricConverter(scalar MetricConverter, vector MetricConverter, matrix MetricConverter) MetricConverter {
return &metricConverter{
scalarConverter: scalar,
vectorConverter: vector,
matrixConverter: matrix,
}
}
func (c *metricConverter) Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type == model.ValScalar {
return c.scalarConverter.Convert(metadata, queryResult)
}
if queryResult.Type == model.ValVector {
return c.vectorConverter.Convert(metadata, queryResult)
}
if queryResult.Type == model.ValMatrix {
return c.matrixConverter.Convert(metadata, queryResult)
}
return nil, errors.New("encountered an unexpected query result type")
}

View file

@ -1,13 +0,0 @@
package provider
//QueryMetadata is a data object the holds information about what inputs
//were used to generate Prometheus query results. In most cases it's not
//necessary, as the Prometheus result come back with enough information
//to determine the metric name. However, for scalar results, Prometheus
//only provides the value.
type QueryMetadata struct {
MetricName string
WindowInSeconds int64
//TODO: Type this?
Aggregation string
}

View file

@ -1,53 +0,0 @@
package provider
import (
"github.com/prometheus/common/model"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type sampleConverter struct {
}
//SampleConverter is capable of translating Prometheus Sample objects
//into ExternamMetricValue objects.
type SampleConverter interface {
Convert(metadata QueryMetadata, sample *model.Sample) (*external_metrics.ExternalMetricValue, error)
}
//NewSampleConverter creates a SampleConverter capable of translating Prometheus Sample objects
//into ExternamMetricValue objects.
func NewSampleConverter() SampleConverter {
return &sampleConverter{}
}
func (c *sampleConverter) Convert(metadata QueryMetadata, sample *model.Sample) (*external_metrics.ExternalMetricValue, error) {
labels := c.convertLabels(sample.Metric)
singleMetric := external_metrics.ExternalMetricValue{
MetricName: string(sample.Metric[model.LabelName("__name__")]),
Timestamp: metav1.Time{
sample.Timestamp.Time(),
},
WindowSeconds: &metadata.WindowInSeconds,
//TODO: I'm not so sure about this type/conversions.
//This can't possibly be the right way to convert this.
//Also, does K8S only deal win integer metrics?
Value: *resource.NewQuantity(int64(float64(sample.Value)), resource.DecimalSI),
MetricLabels: labels,
}
//TODO: Actual errors?
return &singleMetric, nil
}
func (c *sampleConverter) convertLabels(inLabels model.Metric) map[string]string {
numLabels := len(inLabels)
outLabels := make(map[string]string, numLabels)
for labelName, labelVal := range inLabels {
outLabels[string(labelName)] = string(labelVal)
}
return outLabels
}

View file

@ -1,57 +0,0 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type scalarConverter struct {
}
//NewScalarConverter creates a ScalarConverter capable of converting
//scalar Prometheus query results into external metric types.
func NewScalarConverter() MetricConverter {
return &scalarConverter{}
}
func (c *scalarConverter) Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValScalar {
return nil, errors.New("scalarConverter can only convert scalar query results")
}
toConvert := queryResult.Scalar
if toConvert == nil {
return nil, errors.New("the provided input did not contain scalar query results")
}
return c.convert(metadata, toConvert)
}
func (c *scalarConverter) convert(metadata QueryMetadata, input *model.Scalar) (*external_metrics.ExternalMetricValueList, error) {
result := external_metrics.ExternalMetricValueList{
//Using prometheusProvider.metricsFor(...) as an example,
//it seems that I don't need to provide values for
//TypeMeta and ListMeta.
//TODO: Get some confirmation on this.
Items: []external_metrics.ExternalMetricValue{
{
MetricName: metadata.MetricName,
Timestamp: metav1.Time{
input.Timestamp.Time(),
},
WindowSeconds: &metadata.WindowInSeconds,
//TODO: I'm not so sure about this type/conversions.
//Is there a meaningful loss of precision here?
//Does K8S only deal win integer metrics?
Value: *resource.NewQuantity(int64(input.Value), resource.DecimalSI),
},
},
}
return &result, nil
}

View file

@ -1,58 +0,0 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type vectorConverter struct {
SampleConverter SampleConverter
}
//NewVectorConverter creates a VectorConverter capable of converting
//vector Prometheus query results into external metric types.
func NewVectorConverter(sampleConverter *SampleConverter) MetricConverter {
return &vectorConverter{
SampleConverter: *sampleConverter,
}
}
func (c *vectorConverter) Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValVector {
return nil, errors.New("vectorConverter can only convert scalar query results")
}
toConvert := *queryResult.Vector
if toConvert == nil {
return nil, errors.New("the provided input did not contain vector query results")
}
return c.convert(metadata, toConvert)
}
func (c *vectorConverter) convert(metadata QueryMetadata, result model.Vector) (*external_metrics.ExternalMetricValueList, error) {
items := []external_metrics.ExternalMetricValue{}
metricValueList := external_metrics.ExternalMetricValueList{
Items: items,
}
numSamples := result.Len()
if numSamples == 0 {
return &metricValueList, nil
}
for _, val := range result {
//TODO: Care about potential errors here.
singleMetric, _ := c.SampleConverter.Convert(metadata, val)
items = append(items, *singleMetric)
}
metricValueList = external_metrics.ExternalMetricValueList{
Items: items,
}
return &metricValueList, nil
}

View file

@ -11,6 +11,7 @@ import (
"github.com/golang/glog"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
@ -44,6 +45,7 @@ type MetricNamer interface {
// QueryForSeries returns the query for a given series (not API metric name), with
// the given namespace name (if relevant), resource, and resource names.
QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error)
QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error)
}
// labelGroupResExtractor extracts schema.GroupResources from series labels.
@ -172,6 +174,8 @@ type metricNamer struct {
labelToResource map[pmodel.LabelName]schema.GroupResource
resourceToLabel map[schema.GroupResource]pmodel.LabelName
mapper apimeta.RESTMapper
metricType config.MetricType
}
// queryTemplateArgs are the arguments for the metrics query template.
@ -202,39 +206,142 @@ SeriesLoop:
return finalSeries
}
func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) {
var exprs []string
valuesByName := map[string][]string{}
func (n *metricNamer) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart {
requirements, _ := metricSelector.Requirements()
selectors := []queryPart{}
for i := 0; i < len(requirements); i++ {
selector := n.convertRequirement(requirements[i])
selectors = append(selectors, selector)
}
return selectors
}
func (n *metricNamer) convertRequirement(requirement labels.Requirement) queryPart {
labelName := requirement.Key()
values := requirement.Values().List()
return queryPart{
labelName: labelName,
values: values,
}
}
type queryPart struct {
labelName string
values []string
}
func (n *metricNamer) buildNamespaceQueryPartForSeries(namespace string) (queryPart, error) {
result := queryPart{}
//If we've been given a namespace, then we need to set up
//the label requirements to target that namespace.
if namespace != "" {
namespaceLbl, err := n.LabelForResource(nsGroupResource)
if err != nil {
return "", err
return result, err
}
values := []string{namespace}
result = queryPart{
values: values,
labelName: string(namespaceLbl),
}
exprs = append(exprs, prom.LabelEq(string(namespaceLbl), namespace))
valuesByName[string(namespaceLbl)] = []string{namespace}
}
return result, nil
}
func (n *metricNamer) buildResourceQueryPartForSeries(resource schema.GroupResource, names ...string) (queryPart, error) {
result := queryPart{}
//If we've been given a resource, then we need to set up
//the label requirements to target that resource.
resourceLbl, err := n.LabelForResource(resource)
if err != nil {
return result, err
}
result = queryPart{
labelName: string(resourceLbl),
values: names,
}
return result, nil
}
func (n *metricNamer) processQueryParts(queryParts []queryPart) ([]string, map[string][]string) {
//Contains the expressions that we want to include as part of the query to Prometheus.
//e.g. "namespace=my-namespace"
//e.g. "some_label=some-value"
var exprs []string
//Contains the list of label values we're targeting, by namespace.
//e.g. "some_label" => ["value-one", "value-two"]
valuesByName := map[string][]string{}
//Convert our query parts into template arguments.
for _, qPart := range queryParts {
targetValue := qPart.values[0]
matcher := prom.LabelEq
if len(qPart.values) > 1 {
targetValue = strings.Join(qPart.values, "|")
matcher = prom.LabelMatches
}
expression := matcher(qPart.labelName, targetValue)
exprs = append(exprs, expression)
valuesByName[qPart.labelName] = qPart.values
}
return exprs, valuesByName
}
func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) {
queryParts := []queryPart{}
//Build up the namespace part of the query.
namespaceQueryPart, err := n.buildNamespaceQueryPartForSeries(namespace)
if err != nil {
return "", err
}
matcher := prom.LabelEq
targetValue := names[0]
if len(names) > 1 {
matcher = prom.LabelMatches
targetValue = strings.Join(names, "|")
queryParts = append(queryParts, namespaceQueryPart)
//Build up the resource part of the query.
resourceQueryPart, err := n.buildResourceQueryPartForSeries(resource, names...)
if err != nil {
return "", err
}
exprs = append(exprs, matcher(string(resourceLbl), targetValue))
valuesByName[string(resourceLbl)] = names
queryParts = append(queryParts, resourceQueryPart)
//Convert our query parts into the types we need for our template.
exprs, valuesByName := n.processQueryParts(queryParts)
args := queryTemplateArgs{
Series: series,
LabelMatchers: strings.Join(exprs, ","),
LabelValuesByName: valuesByName,
GroupBy: string(resourceLbl),
GroupBySlice: []string{string(resourceLbl)},
GroupBy: resourceQueryPart.labelName,
GroupBySlice: []string{resourceQueryPart.labelName},
}
selector, err := n.createSelectorFromTemplateArgs(args)
if err != nil {
return "", err
}
return selector, nil
}
func (n *metricNamer) createSelectorFromTemplateArgs(args queryTemplateArgs) (prom.Selector, error) {
//Turn our template arguments into a Selector.
queryBuff := new(bytes.Buffer)
if err := n.metricsQueryTemplate.Execute(queryBuff, args); err != nil {
return "", err
@ -448,6 +555,8 @@ func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMap
labelToResource: make(map[pmodel.LabelName]schema.GroupResource),
resourceToLabel: make(map[schema.GroupResource]pmodel.LabelName),
metricType: rule.MetricType,
}
// invert the structure for consistency with the template
@ -472,3 +581,22 @@ func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMap
return namers, nil
}
func (n *metricNamer) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) {
queryParts := n.createQueryPartsFromSelector(metricSelector)
exprs, valuesByName := n.processQueryParts(queryParts)
args := queryTemplateArgs{
Series: series,
LabelMatchers: strings.Join(exprs, ","),
LabelValuesByName: valuesByName,
}
selector, err := n.createSelectorFromTemplateArgs(args)
if err != nil {
return "", err
}
return selector, nil
}

View file

@ -0,0 +1,130 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
)
type periodicMetricLister struct {
realLister MetricLister
updateInterval time.Duration
mostRecentResult metricUpdateResult
callback func(metricUpdateResult)
}
//NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics
//at the provided interval, but defers the actual act of retrieving the metrics to the supplied MetricLister.
func NewPeriodicMetricLister(realLister MetricLister, updateInterval time.Duration) (MetricListerWithNotification, Runnable) {
lister := periodicMetricLister{
updateInterval: updateInterval,
realLister: realLister,
}
return &lister, &lister
}
func (l *periodicMetricLister) SetNotificationReceiver(callback func(metricUpdateResult)) {
l.callback = callback
}
func (l *periodicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
return l.mostRecentResult, nil
}
func (l *periodicMetricLister) Run() {
l.RunUntil(wait.NeverStop)
}
func (l *periodicMetricLister) RunUntil(stopChan <-chan struct{}) {
go wait.Until(func() {
if result, err := l.realLister.ListAllMetrics(); err != nil {
utilruntime.HandleError(err)
} else {
l.mostRecentResult = result
}
}, l.updateInterval, stopChan)
}
// func (l *periodicMetricLister) updateMetrics() (metricUpdateResult, error) {
// result := metricUpdateResult{
// series: make([][]prom.Series, 0),
// namers: make([]MetricNamer, 0),
// }
// startTime := pmodel.Now().Add(-1 * l.updateInterval)
// // these can take a while on large clusters, so launch in parallel
// // and don't duplicate
// selectors := make(map[prom.Selector]struct{})
// selectorSeriesChan := make(chan selectorSeries, len(l.namers))
// errs := make(chan error, len(l.namers))
// for _, namer := range l.namers {
// sel := namer.Selector()
// if _, ok := selectors[sel]; ok {
// errs <- nil
// selectorSeriesChan <- selectorSeries{}
// continue
// }
// selectors[sel] = struct{}{}
// go func() {
// series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
// if err != nil {
// errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)
// return
// }
// errs <- nil
// selectorSeriesChan <- selectorSeries{
// selector: sel,
// series: series,
// }
// }()
// }
// // don't do duplicate queries when it's just the matchers that change
// seriesCacheByQuery := make(map[prom.Selector][]prom.Series)
// // iterate through, blocking until we've got all results
// for range l.namers {
// if err := <-errs; err != nil {
// return result, fmt.Errorf("unable to update list of all metrics: %v", err)
// }
// if ss := <-selectorSeriesChan; ss.series != nil {
// seriesCacheByQuery[ss.selector] = ss.series
// }
// }
// close(errs)
// newSeries := make([][]prom.Series, len(l.namers))
// for i, namer := range l.namers {
// series, cached := seriesCacheByQuery[namer.Selector()]
// if !cached {
// return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector())
// }
// newSeries[i] = namer.FilterSeries(series)
// }
// glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)
// result.series = newSeries
// result.namers = l.namers
// return result, nil
// }

View file

@ -32,22 +32,12 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/metrics/pkg/apis/custom_metrics"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
// Runnable represents something that can be run until told to stop.
type Runnable interface {
// Run runs the runnable forever.
Run()
// RunUntil runs the runnable until the given channel is closed.
RunUntil(stopChan <-chan struct{})
}
type prometheusProvider struct {
mapper apimeta.RESTMapper
kubeClient dynamic.Interface
@ -57,23 +47,18 @@ type prometheusProvider struct {
}
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
promClient: promClient,
namers: namers,
basicLister := NewBasicMetricLister(promClient, namers, updateInterval)
periodicLister, periodicRunnable := NewPeriodicMetricLister(basicLister, updateInterval)
SeriesRegistry: &basicSeriesRegistry{
mapper: mapper,
},
}
seriesRegistry := NewBasicSeriesRegistry(periodicLister, mapper)
return &prometheusProvider{
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,
SeriesRegistry: lister,
}, lister
SeriesRegistry: seriesRegistry,
}, periodicLister
}
func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) {
@ -262,86 +247,3 @@ func (p *prometheusProvider) GetNamespacedMetricBySelector(groupResource schema.
}
return p.getMultiple(info, namespace, selector)
}
type cachingMetricsLister struct {
SeriesRegistry
promClient prom.Client
updateInterval time.Duration
namers []MetricNamer
}
func (l *cachingMetricsLister) Run() {
l.RunUntil(wait.NeverStop)
}
func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
go wait.Until(func() {
if err := l.updateMetrics(); err != nil {
utilruntime.HandleError(err)
}
}, l.updateInterval, stopChan)
}
type selectorSeries struct {
selector prom.Selector
series []prom.Series
}
func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.updateInterval)
// don't do duplicate queries when it's just the matchers that change
seriesCacheByQuery := make(map[prom.Selector][]prom.Series)
// these can take a while on large clusters, so launch in parallel
// and don't duplicate
selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.namers))
errs := make(chan error, len(l.namers))
for _, namer := range l.namers {
sel := namer.Selector()
if _, ok := selectors[sel]; ok {
errs <- nil
selectorSeriesChan <- selectorSeries{}
continue
}
selectors[sel] = struct{}{}
go func() {
series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
if err != nil {
errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)
return
}
errs <- nil
selectorSeriesChan <- selectorSeries{
selector: sel,
series: series,
}
}()
}
// iterate through, blocking until we've got all results
for range l.namers {
if err := <-errs; err != nil {
return fmt.Errorf("unable to update list of all metrics: %v", err)
}
if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series
}
}
close(errs)
newSeries := make([][]prom.Series, len(l.namers))
for i, namer := range l.namers {
series, cached := seriesCacheByQuery[namer.Selector()]
if !cached {
return fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector())
}
newSeries[i] = namer.FilterSeries(series)
}
glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)
return l.SetSeries(newSeries, l.namers)
}

View file

@ -17,11 +17,11 @@ limitations under the License.
package provider
import (
"fmt"
"sync"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/golang/glog"
@ -43,14 +43,13 @@ const (
// SeriesRegistry provides conversions between Prometheus series and MetricInfo
type SeriesRegistry interface {
// SetSeries replaces the known series in this registry.
// Each slice in series should correspond to a MetricNamer in namers.
SetSeries(series [][]prom.Series, namers []MetricNamer) error
// ListAllMetrics lists all metrics known to this registry
ListAllMetrics() []provider.CustomMetricInfo
// SeriesForMetric looks up the minimum required series information to make a query for the given metric
// against the given resource (namespace may be empty for non-namespaced resources)
QueryForMetric(info provider.CustomMetricInfo, namespace string, resourceNames ...string) (query prom.Selector, found bool)
// TODO: Don't house the external metric stuff side-by-side with the custom metric stuff.
QueryForExternalMetric(info provider.ExternalMetricInfo, metricSelector labels.Selector) (query prom.Selector, found bool)
// MatchValuesToNames matches result values to resource names for the given metric and value set
MatchValuesToNames(metricInfo provider.CustomMetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool)
}
@ -68,18 +67,35 @@ type basicSeriesRegistry struct {
mu sync.RWMutex
// info maps metric info to information about the corresponding series
info map[provider.CustomMetricInfo]seriesInfo
info map[provider.CustomMetricInfo]seriesInfo
externalInfo map[string]seriesInfo
// metrics is the list of all known metrics
metrics []provider.CustomMetricInfo
mapper apimeta.RESTMapper
metricLister MetricListerWithNotification
}
func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []MetricNamer) error {
if len(newSeriesSlices) != len(namers) {
return fmt.Errorf("need one set of series per namer")
func NewBasicSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) SeriesRegistry {
var registry = basicSeriesRegistry{
mapper: mapper,
metricLister: lister,
}
lister.SetNotificationReceiver(registry.onNewDataAvailable)
return &registry
}
func (r basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) {
newSeriesSlices := result.series
namers := result.namers
// if len(newSeriesSlices) != len(namers) {
// return fmt.Errorf("need one set of series per namer")
// }
newInfo := make(map[provider.CustomMetricInfo]seriesInfo)
for i, newSeries := range newSeriesSlices {
namer := namers[i]
@ -123,8 +139,6 @@ func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers
r.info = newInfo
r.metrics = newMetrics
return nil
}
func (r *basicSeriesRegistry) ListAllMetrics() []provider.CustomMetricInfo {
@ -164,6 +178,29 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInf
return query, true
}
func (r *basicSeriesRegistry) QueryForExternalMetric(metricInfo provider.ExternalMetricInfo, metricSelector labels.Selector) (query prom.Selector, found bool) {
r.mu.RLock()
defer r.mu.RUnlock()
info, infoFound := r.info[metricInfo]
if !infoFound {
//TODO: Weird that it switches between types here.
glog.V(10).Infof("metric %v not registered", metricInfo)
return "", false
}
query, err := info.namer.QueryForExternalSeries(info.seriesName, metricSelector)
if err != nil {
//TODO: See what was being .String() and implement that for ExternalMetricInfo.
// errorVal := metricInfo.String()
errorVal := "something"
glog.Errorf("unable to construct query for metric %s: %v", errorVal, err)
return "", false
}
return query, true
}
func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) {
r.mu.RLock()
defer r.mu.RUnlock()