diff --git a/pkg/config/config.go b/pkg/config/config.go index 6a31e8f1..9d6c0471 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -32,6 +32,9 @@ type DiscoveryRule struct { // `.GroupBy` is the comma-separated expected group-by label names. The delimeters // are `<<` and `>>`. MetricsQuery string `yaml:"metricsQuery,omitempty"` + // MetricType identifies whether the metrics derived from this rule should be classified + // as external or custom metrics. + MetricType MetricType `yaml:"metricType"` } // RegexFilter is a filter that matches positively or negatively against a regex. @@ -73,3 +76,6 @@ type NameMapping struct { // if only one is present, and will error if multiple are. As string `yaml:"as"` } + +//MetricType identifies whether a given metric should be handled and interpreted as a Custom or External metric. +type MetricType string diff --git a/pkg/custom-provider/basic_metric_lister.go b/pkg/custom-provider/basic_metric_lister.go new file mode 100644 index 00000000..b4a54d7f --- /dev/null +++ b/pkg/custom-provider/basic_metric_lister.go @@ -0,0 +1,148 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "fmt" + "time" + + "github.com/golang/glog" + pmodel "github.com/prometheus/common/model" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +// Runnable represents something that can be run until told to stop. +type Runnable interface { + // Run runs the runnable forever. + Run() + // RunUntil runs the runnable until the given channel is closed. + RunUntil(stopChan <-chan struct{}) +} + +//A MetricLister provides a window into all of the metrics that are available within a given +//Prometheus instance, classified as either Custom or External metrics, but presented generically +//so that it can manage both types simultaneously. +type MetricLister interface { + // Run() + // UpdateMetrics() error + // GetAllMetrics() []GenericMetricInfo + // GetAllCustomMetrics() []GenericMetricInfo + // GetAllExternalMetrics() []GenericMetricInfo + // GetInfoForMetric(infoKey GenericMetricInfo) (seriesInfo, bool) + ListAllMetrics() (metricUpdateResult, error) +} + +type MetricListerWithNotification interface { + //It can list metrics, just like a normal MetricLister. + MetricLister + //Because it periodically pulls metrics, it needs to be Runnable. + Runnable + //It provides notifications when it has new data to supply. + SetNotificationReceiver(func(metricUpdateResult)) +} + +type basicMetricLister struct { + promClient prom.Client + namers []MetricNamer + lookback time.Duration +} + +func NewBasicMetricLister(promClient prom.Client, namers []MetricNamer, lookback time.Duration) MetricLister { + lister := basicMetricLister{ + promClient: promClient, + namers: namers, + } + + return &lister +} + +type selectorSeries struct { + selector prom.Selector + series []prom.Series +} + +func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) { + result := metricUpdateResult{ + series: make([][]prom.Series, 0), + namers: make([]MetricNamer, 0), + } + + startTime := pmodel.Now().Add(-1 * l.lookback) + + // these can take a while on large clusters, so launch in parallel + // and don't duplicate + selectors := make(map[prom.Selector]struct{}) + selectorSeriesChan := make(chan selectorSeries, len(l.namers)) + errs := make(chan error, len(l.namers)) + for _, namer := range l.namers { + sel := namer.Selector() + if _, ok := selectors[sel]; ok { + errs <- nil + selectorSeriesChan <- selectorSeries{} + continue + } + selectors[sel] = struct{}{} + go func() { + series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel) + if err != nil { + errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err) + return + } + errs <- nil + selectorSeriesChan <- selectorSeries{ + selector: sel, + series: series, + } + }() + } + + // don't do duplicate queries when it's just the matchers that change + seriesCacheByQuery := make(map[prom.Selector][]prom.Series) + + // iterate through, blocking until we've got all results + for range l.namers { + if err := <-errs; err != nil { + return result, fmt.Errorf("unable to update list of all metrics: %v", err) + } + if ss := <-selectorSeriesChan; ss.series != nil { + seriesCacheByQuery[ss.selector] = ss.series + } + } + close(errs) + + newSeries := make([][]prom.Series, len(l.namers)) + for i, namer := range l.namers { + series, cached := seriesCacheByQuery[namer.Selector()] + if !cached { + return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector()) + } + newSeries[i] = namer.FilterSeries(series) + } + + glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) + + result.series = newSeries + result.namers = l.namers + return result, nil +} + +type metricUpdateResult struct { + series [][]prom.Series + namers []MetricNamer +} diff --git a/pkg/custom-provider/external_metric_query_builder.go b/pkg/custom-provider/external_metric_query_builder.go deleted file mode 100644 index 00aa06a1..00000000 --- a/pkg/custom-provider/external_metric_query_builder.go +++ /dev/null @@ -1,111 +0,0 @@ -package provider - -import ( - "fmt" - s "strings" - - provider "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" -) - -type ExternalMetricQueryBuilder interface { - BuildPrometheusQuery(namespace string, metricName string, metricSelector labels.Selector, queryMetadata provider.QueryMetadata) string -} - -type externalMetricQueryBuilder struct { -} - -func NewExternalMetricQueryBuilder() ExternalMetricQueryBuilder { - return &externalMetricQueryBuilder{} -} - -func (p *externalMetricQueryBuilder) BuildPrometheusQuery(namespace string, metricName string, metricSelector labels.Selector, queryMetadata provider.QueryMetadata) string { - //TODO: At least for my Prometheus install, the "namespace" label doesn't seem to be - //directly applied to the time series. I'm using prometheus-operator. The grafana dashboards - //seem to query for the pods in a namespace from kube_pod_info and then apply pod-specific - //label filters. This might need some more thought. Disabling for now. - // namespaceSelector := p.makeLabelFilter("namespace", "=", namespace) - labelSelectors := p.convertSelectors(metricSelector) - joinedLabels := s.Join(labelSelectors, ", ") - - //TODO: Both the aggregation method and window should probably be configurable. - //I don't think we can make assumptions about the nature of someone's metrics. - //I'm guessing this might be covered by the recently added advanced configuration - //code, but I haven't yet had an opportunity to dig into that and understand it. - //We'll leave this here for testing purposes for now. - //As reasonable defaults, maybe: - //rate(...) for counters - //avg_over_time(...) for gauges - //I'm guessing that SeriesRegistry might store the metric type, but I haven't looked yet. - aggregation := queryMetadata.Aggregation - window := queryMetadata.WindowInSeconds - return fmt.Sprintf("%s(%s{%s}[%ds])", aggregation, metricName, joinedLabels, window) -} - -func (p *externalMetricQueryBuilder) makeLabelFilter(labelName string, operator string, targetValue string) string { - return fmt.Sprintf("%s%s\"%s\"", labelName, operator, targetValue) -} - -func (p *externalMetricQueryBuilder) convertSelectors(metricSelector labels.Selector) []string { - requirements, _ := metricSelector.Requirements() - - selectors := []string{} - for i := 0; i < len(requirements); i++ { - selector := p.convertRequirement(requirements[i]) - - selectors = append(selectors, selector) - } - - return selectors -} - -func (p *externalMetricQueryBuilder) convertRequirement(requirement labels.Requirement) string { - labelName := requirement.Key() - values := requirement.Values().List() - - stringValues := values[0] - - valueCount := len(values) - if valueCount > 1 { - stringValues = s.Join(values, "|") - } - - operator := p.selectOperator(requirement.Operator(), valueCount) - - return p.makeLabelFilter(labelName, operator, stringValues) -} - -func (p *externalMetricQueryBuilder) selectOperator(operator selection.Operator, valueCount int) string { - if valueCount > 1 { - return p.selectRegexOperator(operator) - } - - return p.selectSingleValueOperator(operator) -} - -func (p *externalMetricQueryBuilder) selectRegexOperator(operator selection.Operator) string { - switch operator { - case selection.Equals: - case selection.In: - return "=~" - case selection.NotIn: - case selection.NotEquals: - return "!~" - } - - //TODO: Cover more cases, supply appropriate errors for any unhandled cases. - return "=" -} - -func (p *externalMetricQueryBuilder) selectSingleValueOperator(operator selection.Operator) string { - switch operator { - case selection.Equals: - return "=" - case selection.NotEquals: - return "!=" - } - - //TODO: Cover more cases, supply appropriate errors for any unhandled cases. - return "=" -} diff --git a/pkg/custom-provider/external_metric_query_builder_test.go b/pkg/custom-provider/external_metric_query_builder_test.go deleted file mode 100644 index 7d00d4ba..00000000 --- a/pkg/custom-provider/external_metric_query_builder_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package provider - -import ( - "testing" - - conv "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" -) - -var queryBuilder = NewExternalMetricQueryBuilder() - -func TestBuildPrometheusQuery(t *testing.T) { - fakeSelector := labels.NewSelector() - metricName := "queue_name" - requirement, _ := labels.NewRequirement(metricName, selection.Equals, []string{"processing"}) - fakeSelector = fakeSelector.Add(*requirement) - meta := conv.QueryMetadata{ - Aggregation: "rate", - MetricName: metricName, - WindowInSeconds: 120, - } - - result := queryBuilder.BuildPrometheusQuery("default", "queue_length", fakeSelector, meta) - - expectedResult := "rate(queue_length{queue_name=\"processing\"}[120s])" - if result != expectedResult { - t.Errorf("Incorrect query generated. Expected: %s | Actual %s", result, expectedResult) - } -} diff --git a/pkg/custom-provider/external_provider.go b/pkg/custom-provider/external_provider.go index aabcec5b..2a6fe95a 100644 --- a/pkg/custom-provider/external_provider.go +++ b/pkg/custom-provider/external_provider.go @@ -6,7 +6,6 @@ import ( pmodel "github.com/prometheus/common/model" - conv "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -34,7 +33,7 @@ type externalPrometheusProvider struct { queryBuilder ExternalMetricQueryBuilder metricConverter conv.MetricConverter - SeriesRegistry + seriesRegistry SeriesRegistry } //TODO: It probably makes more sense to, once this is functional and complete, roll the @@ -44,37 +43,26 @@ type externalPrometheusProvider struct { //to do one of those two things instead of trying to run the two providers //independently. -func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, queryBuilder ExternalMetricQueryBuilder, metricConverter conv.MetricConverter) (provider.ExternalMetricsProvider, Runnable) { - lister := &cachingMetricsLister{ - updateInterval: updateInterval, - promClient: promClient, - namers: namers, - - SeriesRegistry: &basicSeriesRegistry{ - mapper: mapper, - }, - } +func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, metricConverter conv.MetricConverter, seriesRegistry SeriesRegistry) (provider.ExternalMetricsProvider, Runnable) { return &externalPrometheusProvider{ mapper: mapper, kubeClient: kubeClient, promClient: promClient, metricConverter: metricConverter, - - SeriesRegistry: lister, + seriesRegistry: seriesRegistry, }, lister } func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { - //TODO: Get the appropriate time window and aggregation type from somewhere - //based on the metric being selected. Does SeriesRegistry have the metric type cached? - queryMetadata := conv.QueryMetadata{ - MetricName: metricName, - WindowInSeconds: 120, - Aggregation: "rate", + selector, found := p.seriesRegistry.QueryForExternalMetric(metricInfo, metricSelector) + + if !found { + return &external_metrics.ExternalMetricValueList{ + Items: []external_metrics.ExternalMetricValue{}, + }, nil } - query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata) - selector := prom.Selector(query) + // query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata) //TODO: I don't yet know what a context is, but apparently I should use a real one. queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) @@ -89,6 +77,5 @@ func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricN } func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { - //TODO: Provide a real response. - return nil + return p.seriesRegistry.ListAllExternalMetrics() } diff --git a/pkg/custom-provider/metric-converter/matrix_converter.go b/pkg/custom-provider/metric-converter/matrix_converter.go deleted file mode 100644 index e13c8404..00000000 --- a/pkg/custom-provider/metric-converter/matrix_converter.go +++ /dev/null @@ -1,37 +0,0 @@ -package provider - -import ( - "errors" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/prometheus/common/model" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -type matrixConverter struct { -} - -//NewMatrixConverter creates a MatrixConverter capable of converting -//matrix Prometheus query results into external metric types. -func NewMatrixConverter() MetricConverter { - return &matrixConverter{} -} - -func (c *matrixConverter) Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { - if queryResult.Type != model.ValMatrix { - return nil, errors.New("matrixConverter can only convert scalar query results") - } - - toConvert := queryResult.Matrix - - if toConvert == nil { - return nil, errors.New("the provided input did not contain matrix query results") - } - - return c.convert(toConvert) -} - -func (c *matrixConverter) convert(result *model.Matrix) (*external_metrics.ExternalMetricValueList, error) { - //TODO: Implementation. - return nil, errors.New("converting Matrix results is not yet supported") -} diff --git a/pkg/custom-provider/metric-converter/metric_converter.go b/pkg/custom-provider/metric-converter/metric_converter.go deleted file mode 100644 index a09f680b..00000000 --- a/pkg/custom-provider/metric-converter/metric_converter.go +++ /dev/null @@ -1,47 +0,0 @@ -package provider - -import ( - "errors" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/prometheus/common/model" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -//MetricConverter provides a unified interface for converting the results of -//Prometheus queries into external metric types. -type MetricConverter interface { - Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) -} - -type metricConverter struct { - scalarConverter MetricConverter - vectorConverter MetricConverter - matrixConverter MetricConverter -} - -//NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types -//returned by the Prometheus client into external metrics types. -func NewMetricConverter(scalar MetricConverter, vector MetricConverter, matrix MetricConverter) MetricConverter { - return &metricConverter{ - scalarConverter: scalar, - vectorConverter: vector, - matrixConverter: matrix, - } -} - -func (c *metricConverter) Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { - if queryResult.Type == model.ValScalar { - return c.scalarConverter.Convert(metadata, queryResult) - } - - if queryResult.Type == model.ValVector { - return c.vectorConverter.Convert(metadata, queryResult) - } - - if queryResult.Type == model.ValMatrix { - return c.matrixConverter.Convert(metadata, queryResult) - } - - return nil, errors.New("encountered an unexpected query result type") -} diff --git a/pkg/custom-provider/metric-converter/query_metadata.go b/pkg/custom-provider/metric-converter/query_metadata.go deleted file mode 100644 index 613cd3ea..00000000 --- a/pkg/custom-provider/metric-converter/query_metadata.go +++ /dev/null @@ -1,13 +0,0 @@ -package provider - -//QueryMetadata is a data object the holds information about what inputs -//were used to generate Prometheus query results. In most cases it's not -//necessary, as the Prometheus result come back with enough information -//to determine the metric name. However, for scalar results, Prometheus -//only provides the value. -type QueryMetadata struct { - MetricName string - WindowInSeconds int64 - //TODO: Type this? - Aggregation string -} diff --git a/pkg/custom-provider/metric-converter/sample_converter.go b/pkg/custom-provider/metric-converter/sample_converter.go deleted file mode 100644 index 8216ff80..00000000 --- a/pkg/custom-provider/metric-converter/sample_converter.go +++ /dev/null @@ -1,53 +0,0 @@ -package provider - -import ( - "github.com/prometheus/common/model" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -type sampleConverter struct { -} - -//SampleConverter is capable of translating Prometheus Sample objects -//into ExternamMetricValue objects. -type SampleConverter interface { - Convert(metadata QueryMetadata, sample *model.Sample) (*external_metrics.ExternalMetricValue, error) -} - -//NewSampleConverter creates a SampleConverter capable of translating Prometheus Sample objects -//into ExternamMetricValue objects. -func NewSampleConverter() SampleConverter { - return &sampleConverter{} -} - -func (c *sampleConverter) Convert(metadata QueryMetadata, sample *model.Sample) (*external_metrics.ExternalMetricValue, error) { - labels := c.convertLabels(sample.Metric) - - singleMetric := external_metrics.ExternalMetricValue{ - MetricName: string(sample.Metric[model.LabelName("__name__")]), - Timestamp: metav1.Time{ - sample.Timestamp.Time(), - }, - WindowSeconds: &metadata.WindowInSeconds, - //TODO: I'm not so sure about this type/conversions. - //This can't possibly be the right way to convert this. - //Also, does K8S only deal win integer metrics? - Value: *resource.NewQuantity(int64(float64(sample.Value)), resource.DecimalSI), - MetricLabels: labels, - } - - //TODO: Actual errors? - return &singleMetric, nil -} - -func (c *sampleConverter) convertLabels(inLabels model.Metric) map[string]string { - numLabels := len(inLabels) - outLabels := make(map[string]string, numLabels) - for labelName, labelVal := range inLabels { - outLabels[string(labelName)] = string(labelVal) - } - - return outLabels -} diff --git a/pkg/custom-provider/metric-converter/scalar_converter.go b/pkg/custom-provider/metric-converter/scalar_converter.go deleted file mode 100644 index 3e61806d..00000000 --- a/pkg/custom-provider/metric-converter/scalar_converter.go +++ /dev/null @@ -1,57 +0,0 @@ -package provider - -import ( - "errors" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/prometheus/common/model" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -type scalarConverter struct { -} - -//NewScalarConverter creates a ScalarConverter capable of converting -//scalar Prometheus query results into external metric types. -func NewScalarConverter() MetricConverter { - return &scalarConverter{} -} - -func (c *scalarConverter) Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { - if queryResult.Type != model.ValScalar { - return nil, errors.New("scalarConverter can only convert scalar query results") - } - - toConvert := queryResult.Scalar - - if toConvert == nil { - return nil, errors.New("the provided input did not contain scalar query results") - } - - return c.convert(metadata, toConvert) -} - -func (c *scalarConverter) convert(metadata QueryMetadata, input *model.Scalar) (*external_metrics.ExternalMetricValueList, error) { - result := external_metrics.ExternalMetricValueList{ - //Using prometheusProvider.metricsFor(...) as an example, - //it seems that I don't need to provide values for - //TypeMeta and ListMeta. - //TODO: Get some confirmation on this. - Items: []external_metrics.ExternalMetricValue{ - { - MetricName: metadata.MetricName, - Timestamp: metav1.Time{ - input.Timestamp.Time(), - }, - WindowSeconds: &metadata.WindowInSeconds, - //TODO: I'm not so sure about this type/conversions. - //Is there a meaningful loss of precision here? - //Does K8S only deal win integer metrics? - Value: *resource.NewQuantity(int64(input.Value), resource.DecimalSI), - }, - }, - } - return &result, nil -} diff --git a/pkg/custom-provider/metric-converter/vector_converter.go b/pkg/custom-provider/metric-converter/vector_converter.go deleted file mode 100644 index 4174e2bd..00000000 --- a/pkg/custom-provider/metric-converter/vector_converter.go +++ /dev/null @@ -1,58 +0,0 @@ -package provider - -import ( - "errors" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/prometheus/common/model" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -type vectorConverter struct { - SampleConverter SampleConverter -} - -//NewVectorConverter creates a VectorConverter capable of converting -//vector Prometheus query results into external metric types. -func NewVectorConverter(sampleConverter *SampleConverter) MetricConverter { - return &vectorConverter{ - SampleConverter: *sampleConverter, - } -} - -func (c *vectorConverter) Convert(metadata QueryMetadata, queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { - if queryResult.Type != model.ValVector { - return nil, errors.New("vectorConverter can only convert scalar query results") - } - - toConvert := *queryResult.Vector - - if toConvert == nil { - return nil, errors.New("the provided input did not contain vector query results") - } - - return c.convert(metadata, toConvert) -} - -func (c *vectorConverter) convert(metadata QueryMetadata, result model.Vector) (*external_metrics.ExternalMetricValueList, error) { - items := []external_metrics.ExternalMetricValue{} - metricValueList := external_metrics.ExternalMetricValueList{ - Items: items, - } - - numSamples := result.Len() - if numSamples == 0 { - return &metricValueList, nil - } - - for _, val := range result { - //TODO: Care about potential errors here. - singleMetric, _ := c.SampleConverter.Convert(metadata, val) - items = append(items, *singleMetric) - } - - metricValueList = external_metrics.ExternalMetricValueList{ - Items: items, - } - return &metricValueList, nil -} diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index 7de4f69d..172b7b64 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -11,6 +11,7 @@ import ( "github.com/golang/glog" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" @@ -44,6 +45,7 @@ type MetricNamer interface { // QueryForSeries returns the query for a given series (not API metric name), with // the given namespace name (if relevant), resource, and resource names. QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) + QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) } // labelGroupResExtractor extracts schema.GroupResources from series labels. @@ -172,6 +174,8 @@ type metricNamer struct { labelToResource map[pmodel.LabelName]schema.GroupResource resourceToLabel map[schema.GroupResource]pmodel.LabelName mapper apimeta.RESTMapper + + metricType config.MetricType } // queryTemplateArgs are the arguments for the metrics query template. @@ -202,39 +206,142 @@ SeriesLoop: return finalSeries } -func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { - var exprs []string - valuesByName := map[string][]string{} +func (n *metricNamer) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart { + requirements, _ := metricSelector.Requirements() + selectors := []queryPart{} + for i := 0; i < len(requirements); i++ { + selector := n.convertRequirement(requirements[i]) + + selectors = append(selectors, selector) + } + + return selectors +} + +func (n *metricNamer) convertRequirement(requirement labels.Requirement) queryPart { + labelName := requirement.Key() + values := requirement.Values().List() + + return queryPart{ + labelName: labelName, + values: values, + } +} + +type queryPart struct { + labelName string + values []string +} + +func (n *metricNamer) buildNamespaceQueryPartForSeries(namespace string) (queryPart, error) { + result := queryPart{} + + //If we've been given a namespace, then we need to set up + //the label requirements to target that namespace. if namespace != "" { namespaceLbl, err := n.LabelForResource(nsGroupResource) if err != nil { - return "", err + return result, err + } + + values := []string{namespace} + + result = queryPart{ + values: values, + labelName: string(namespaceLbl), } - exprs = append(exprs, prom.LabelEq(string(namespaceLbl), namespace)) - valuesByName[string(namespaceLbl)] = []string{namespace} } + return result, nil +} + +func (n *metricNamer) buildResourceQueryPartForSeries(resource schema.GroupResource, names ...string) (queryPart, error) { + result := queryPart{} + + //If we've been given a resource, then we need to set up + //the label requirements to target that resource. resourceLbl, err := n.LabelForResource(resource) + if err != nil { + return result, err + } + + result = queryPart{ + labelName: string(resourceLbl), + values: names, + } + + return result, nil +} + +func (n *metricNamer) processQueryParts(queryParts []queryPart) ([]string, map[string][]string) { + //Contains the expressions that we want to include as part of the query to Prometheus. + //e.g. "namespace=my-namespace" + //e.g. "some_label=some-value" + var exprs []string + + //Contains the list of label values we're targeting, by namespace. + //e.g. "some_label" => ["value-one", "value-two"] + valuesByName := map[string][]string{} + + //Convert our query parts into template arguments. + for _, qPart := range queryParts { + targetValue := qPart.values[0] + matcher := prom.LabelEq + + if len(qPart.values) > 1 { + targetValue = strings.Join(qPart.values, "|") + matcher = prom.LabelMatches + } + + expression := matcher(qPart.labelName, targetValue) + exprs = append(exprs, expression) + valuesByName[qPart.labelName] = qPart.values + } + + return exprs, valuesByName +} + +func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { + queryParts := []queryPart{} + + //Build up the namespace part of the query. + namespaceQueryPart, err := n.buildNamespaceQueryPartForSeries(namespace) if err != nil { return "", err } - matcher := prom.LabelEq - targetValue := names[0] - if len(names) > 1 { - matcher = prom.LabelMatches - targetValue = strings.Join(names, "|") + + queryParts = append(queryParts, namespaceQueryPart) + + //Build up the resource part of the query. + resourceQueryPart, err := n.buildResourceQueryPartForSeries(resource, names...) + if err != nil { + return "", err } - exprs = append(exprs, matcher(string(resourceLbl), targetValue)) - valuesByName[string(resourceLbl)] = names + + queryParts = append(queryParts, resourceQueryPart) + + //Convert our query parts into the types we need for our template. + exprs, valuesByName := n.processQueryParts(queryParts) args := queryTemplateArgs{ Series: series, LabelMatchers: strings.Join(exprs, ","), LabelValuesByName: valuesByName, - GroupBy: string(resourceLbl), - GroupBySlice: []string{string(resourceLbl)}, + GroupBy: resourceQueryPart.labelName, + GroupBySlice: []string{resourceQueryPart.labelName}, } + + selector, err := n.createSelectorFromTemplateArgs(args) + if err != nil { + return "", err + } + + return selector, nil +} + +func (n *metricNamer) createSelectorFromTemplateArgs(args queryTemplateArgs) (prom.Selector, error) { + //Turn our template arguments into a Selector. queryBuff := new(bytes.Buffer) if err := n.metricsQueryTemplate.Execute(queryBuff, args); err != nil { return "", err @@ -448,6 +555,8 @@ func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMap labelToResource: make(map[pmodel.LabelName]schema.GroupResource), resourceToLabel: make(map[schema.GroupResource]pmodel.LabelName), + + metricType: rule.MetricType, } // invert the structure for consistency with the template @@ -472,3 +581,22 @@ func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMap return namers, nil } + +func (n *metricNamer) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) { + queryParts := n.createQueryPartsFromSelector(metricSelector) + + exprs, valuesByName := n.processQueryParts(queryParts) + + args := queryTemplateArgs{ + Series: series, + LabelMatchers: strings.Join(exprs, ","), + LabelValuesByName: valuesByName, + } + + selector, err := n.createSelectorFromTemplateArgs(args) + if err != nil { + return "", err + } + + return selector, nil +} diff --git a/pkg/custom-provider/periodic_metric_lister.go b/pkg/custom-provider/periodic_metric_lister.go new file mode 100644 index 00000000..84e6a77c --- /dev/null +++ b/pkg/custom-provider/periodic_metric_lister.go @@ -0,0 +1,130 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" +) + +type periodicMetricLister struct { + realLister MetricLister + updateInterval time.Duration + mostRecentResult metricUpdateResult + callback func(metricUpdateResult) +} + +//NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics +//at the provided interval, but defers the actual act of retrieving the metrics to the supplied MetricLister. +func NewPeriodicMetricLister(realLister MetricLister, updateInterval time.Duration) (MetricListerWithNotification, Runnable) { + lister := periodicMetricLister{ + updateInterval: updateInterval, + realLister: realLister, + } + + return &lister, &lister +} + +func (l *periodicMetricLister) SetNotificationReceiver(callback func(metricUpdateResult)) { + l.callback = callback +} + +func (l *periodicMetricLister) ListAllMetrics() (metricUpdateResult, error) { + return l.mostRecentResult, nil +} + +func (l *periodicMetricLister) Run() { + l.RunUntil(wait.NeverStop) +} + +func (l *periodicMetricLister) RunUntil(stopChan <-chan struct{}) { + go wait.Until(func() { + if result, err := l.realLister.ListAllMetrics(); err != nil { + utilruntime.HandleError(err) + } else { + l.mostRecentResult = result + } + }, l.updateInterval, stopChan) +} + +// func (l *periodicMetricLister) updateMetrics() (metricUpdateResult, error) { + +// result := metricUpdateResult{ +// series: make([][]prom.Series, 0), +// namers: make([]MetricNamer, 0), +// } + +// startTime := pmodel.Now().Add(-1 * l.updateInterval) + +// // these can take a while on large clusters, so launch in parallel +// // and don't duplicate +// selectors := make(map[prom.Selector]struct{}) +// selectorSeriesChan := make(chan selectorSeries, len(l.namers)) +// errs := make(chan error, len(l.namers)) +// for _, namer := range l.namers { +// sel := namer.Selector() +// if _, ok := selectors[sel]; ok { +// errs <- nil +// selectorSeriesChan <- selectorSeries{} +// continue +// } +// selectors[sel] = struct{}{} +// go func() { +// series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel) +// if err != nil { +// errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err) +// return +// } +// errs <- nil +// selectorSeriesChan <- selectorSeries{ +// selector: sel, +// series: series, +// } +// }() +// } + +// // don't do duplicate queries when it's just the matchers that change +// seriesCacheByQuery := make(map[prom.Selector][]prom.Series) + +// // iterate through, blocking until we've got all results +// for range l.namers { +// if err := <-errs; err != nil { +// return result, fmt.Errorf("unable to update list of all metrics: %v", err) +// } +// if ss := <-selectorSeriesChan; ss.series != nil { +// seriesCacheByQuery[ss.selector] = ss.series +// } +// } +// close(errs) + +// newSeries := make([][]prom.Series, len(l.namers)) +// for i, namer := range l.namers { +// series, cached := seriesCacheByQuery[namer.Selector()] +// if !cached { +// return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector()) +// } +// newSeries[i] = namer.FilterSeries(series) +// } + +// glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) + +// result.series = newSeries +// result.namers = l.namers +// return result, nil +// } diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index b8e122cf..91682150 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -32,22 +32,12 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/metrics/pkg/apis/custom_metrics" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" ) -// Runnable represents something that can be run until told to stop. -type Runnable interface { - // Run runs the runnable forever. - Run() - // RunUntil runs the runnable until the given channel is closed. - RunUntil(stopChan <-chan struct{}) -} - type prometheusProvider struct { mapper apimeta.RESTMapper kubeClient dynamic.Interface @@ -57,23 +47,18 @@ type prometheusProvider struct { } func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) { - lister := &cachingMetricsLister{ - updateInterval: updateInterval, - promClient: promClient, - namers: namers, + basicLister := NewBasicMetricLister(promClient, namers, updateInterval) + periodicLister, periodicRunnable := NewPeriodicMetricLister(basicLister, updateInterval) - SeriesRegistry: &basicSeriesRegistry{ - mapper: mapper, - }, - } + seriesRegistry := NewBasicSeriesRegistry(periodicLister, mapper) return &prometheusProvider{ mapper: mapper, kubeClient: kubeClient, promClient: promClient, - SeriesRegistry: lister, - }, lister + SeriesRegistry: seriesRegistry, + }, periodicLister } func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) { @@ -262,86 +247,3 @@ func (p *prometheusProvider) GetNamespacedMetricBySelector(groupResource schema. } return p.getMultiple(info, namespace, selector) } - -type cachingMetricsLister struct { - SeriesRegistry - - promClient prom.Client - updateInterval time.Duration - namers []MetricNamer -} - -func (l *cachingMetricsLister) Run() { - l.RunUntil(wait.NeverStop) -} - -func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { - go wait.Until(func() { - if err := l.updateMetrics(); err != nil { - utilruntime.HandleError(err) - } - }, l.updateInterval, stopChan) -} - -type selectorSeries struct { - selector prom.Selector - series []prom.Series -} - -func (l *cachingMetricsLister) updateMetrics() error { - startTime := pmodel.Now().Add(-1 * l.updateInterval) - - // don't do duplicate queries when it's just the matchers that change - seriesCacheByQuery := make(map[prom.Selector][]prom.Series) - - // these can take a while on large clusters, so launch in parallel - // and don't duplicate - selectors := make(map[prom.Selector]struct{}) - selectorSeriesChan := make(chan selectorSeries, len(l.namers)) - errs := make(chan error, len(l.namers)) - for _, namer := range l.namers { - sel := namer.Selector() - if _, ok := selectors[sel]; ok { - errs <- nil - selectorSeriesChan <- selectorSeries{} - continue - } - selectors[sel] = struct{}{} - go func() { - series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel) - if err != nil { - errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err) - return - } - errs <- nil - selectorSeriesChan <- selectorSeries{ - selector: sel, - series: series, - } - }() - } - - // iterate through, blocking until we've got all results - for range l.namers { - if err := <-errs; err != nil { - return fmt.Errorf("unable to update list of all metrics: %v", err) - } - if ss := <-selectorSeriesChan; ss.series != nil { - seriesCacheByQuery[ss.selector] = ss.series - } - } - close(errs) - - newSeries := make([][]prom.Series, len(l.namers)) - for i, namer := range l.namers { - series, cached := seriesCacheByQuery[namer.Selector()] - if !cached { - return fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector()) - } - newSeries[i] = namer.FilterSeries(series) - } - - glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) - - return l.SetSeries(newSeries, l.namers) -} diff --git a/pkg/custom-provider/series_registry.go b/pkg/custom-provider/series_registry.go index a4701cf4..28ea08fd 100644 --- a/pkg/custom-provider/series_registry.go +++ b/pkg/custom-provider/series_registry.go @@ -17,11 +17,11 @@ limitations under the License. package provider import ( - "fmt" "sync" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" "github.com/golang/glog" @@ -43,14 +43,13 @@ const ( // SeriesRegistry provides conversions between Prometheus series and MetricInfo type SeriesRegistry interface { - // SetSeries replaces the known series in this registry. - // Each slice in series should correspond to a MetricNamer in namers. - SetSeries(series [][]prom.Series, namers []MetricNamer) error // ListAllMetrics lists all metrics known to this registry ListAllMetrics() []provider.CustomMetricInfo // SeriesForMetric looks up the minimum required series information to make a query for the given metric // against the given resource (namespace may be empty for non-namespaced resources) QueryForMetric(info provider.CustomMetricInfo, namespace string, resourceNames ...string) (query prom.Selector, found bool) + // TODO: Don't house the external metric stuff side-by-side with the custom metric stuff. + QueryForExternalMetric(info provider.ExternalMetricInfo, metricSelector labels.Selector) (query prom.Selector, found bool) // MatchValuesToNames matches result values to resource names for the given metric and value set MatchValuesToNames(metricInfo provider.CustomMetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) } @@ -68,18 +67,35 @@ type basicSeriesRegistry struct { mu sync.RWMutex // info maps metric info to information about the corresponding series - info map[provider.CustomMetricInfo]seriesInfo + info map[provider.CustomMetricInfo]seriesInfo + externalInfo map[string]seriesInfo // metrics is the list of all known metrics metrics []provider.CustomMetricInfo mapper apimeta.RESTMapper + + metricLister MetricListerWithNotification } -func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []MetricNamer) error { - if len(newSeriesSlices) != len(namers) { - return fmt.Errorf("need one set of series per namer") +func NewBasicSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) SeriesRegistry { + var registry = basicSeriesRegistry{ + mapper: mapper, + metricLister: lister, } + lister.SetNotificationReceiver(registry.onNewDataAvailable) + + return ®istry +} + +func (r basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { + newSeriesSlices := result.series + namers := result.namers + + // if len(newSeriesSlices) != len(namers) { + // return fmt.Errorf("need one set of series per namer") + // } + newInfo := make(map[provider.CustomMetricInfo]seriesInfo) for i, newSeries := range newSeriesSlices { namer := namers[i] @@ -123,8 +139,6 @@ func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers r.info = newInfo r.metrics = newMetrics - - return nil } func (r *basicSeriesRegistry) ListAllMetrics() []provider.CustomMetricInfo { @@ -164,6 +178,29 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInf return query, true } +func (r *basicSeriesRegistry) QueryForExternalMetric(metricInfo provider.ExternalMetricInfo, metricSelector labels.Selector) (query prom.Selector, found bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + info, infoFound := r.info[metricInfo] + if !infoFound { + //TODO: Weird that it switches between types here. + glog.V(10).Infof("metric %v not registered", metricInfo) + return "", false + } + + query, err := info.namer.QueryForExternalSeries(info.seriesName, metricSelector) + if err != nil { + //TODO: See what was being .String() and implement that for ExternalMetricInfo. + // errorVal := metricInfo.String() + errorVal := "something" + glog.Errorf("unable to construct query for metric %s: %v", errorVal, err) + return "", false + } + + return query, true +} + func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) { r.mu.RLock() defer r.mu.RUnlock()