From 0af14dc93d17ae2ba52731124564c77f39cf7c2d Mon Sep 17 00:00:00 2001 From: Tony Compton Date: Wed, 27 Jun 2018 15:34:05 -0400 Subject: [PATCH] Some conversation-starting progress. Still tons of work to do, but it's probably far enough along to get some feedback. --- cmd/adapter/app/start.go | 2 +- .../{provider.go => custom_provider.go} | 66 ++----- ...ovider_test.go => custom_provider_test.go} | 0 pkg/custom-provider/external_provider.go | 162 ++++++++++++++++++ pkg/custom-provider/types.go | 50 ++++++ 5 files changed, 225 insertions(+), 55 deletions(-) rename pkg/custom-provider/{provider.go => custom_provider.go} (74%) rename pkg/custom-provider/{provider_test.go => custom_provider_test.go} (100%) create mode 100644 pkg/custom-provider/external_provider.go create mode 100644 pkg/custom-provider/types.go diff --git a/cmd/adapter/app/start.go b/cmd/adapter/app/start.go index 7c7a8b60..2c68e482 100644 --- a/cmd/adapter/app/start.go +++ b/cmd/adapter/app/start.go @@ -176,7 +176,7 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) promClient := prom.NewClientForAPI(instrumentedGenericPromClient) - cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.LabelPrefix, o.MetricsRelistInterval, o.RateInterval, stopCh) + cmProvider := cmprov.NewCustomPrometheusProvider(dynamicMapper, clientPool, promClient, o.LabelPrefix, o.MetricsRelistInterval, o.RateInterval, stopCh) server, err := config.Complete().New("prometheus-custom-metrics-adapter", cmProvider) if err != nil { diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/custom_provider.go similarity index 74% rename from pkg/custom-provider/provider.go rename to pkg/custom-provider/custom_provider.go index 4453dca8..74d4723c 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/custom_provider.go @@ -33,15 +33,13 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/metrics/pkg/apis/custom_metrics" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" ) -type prometheusProvider struct { +type customPrometheusProvider struct { mapper apimeta.RESTMapper kubeClient dynamic.ClientPool promClient prom.Client @@ -51,10 +49,7 @@ type prometheusProvider struct { rateInterval time.Duration } -type DoubleMetricProvider interface { -} - -func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider { +func NewCustomPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider { lister := &cachingMetricsLister{ updateInterval: updateInterval, promClient: promClient, @@ -71,7 +66,7 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP lister.RunUntil(stopChan) - return &prometheusProvider{ + return &customPrometheusProvider{ mapper: mapper, kubeClient: kubeClient, promClient: promClient, @@ -82,7 +77,7 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP } } -func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) { +func (p *customPrometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) { kind, err := p.mapper.KindFor(groupResource.WithVersion("")) if err != nil { return nil, err @@ -101,7 +96,7 @@ func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource s }, nil } -func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.CustomMetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) { +func (p *customPrometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.CustomMetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) { if !apimeta.IsListType(list) { return nil, apierr.NewInternalError(fmt.Errorf("result of label selector list operation was not a list")) } @@ -135,7 +130,7 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Cu }, nil } -func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, names ...string) (pmodel.Vector, error) { +func (p *customPrometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, names ...string) (pmodel.Vector, error) { kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...) if !found { return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) @@ -173,7 +168,7 @@ func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespac return *queryResults.Vector, nil } -func (p *prometheusProvider) getSingle(info provider.CustomMetricInfo, namespace, name string) (*custom_metrics.MetricValue, error) { +func (p *customPrometheusProvider) getSingle(info provider.CustomMetricInfo, namespace, name string) (*custom_metrics.MetricValue, error) { queryResults, err := p.buildQuery(info, namespace, name) if err != nil { return nil, err @@ -201,7 +196,7 @@ func (p *prometheusProvider) getSingle(info provider.CustomMetricInfo, namespace return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric) } -func (p *prometheusProvider) getMultiple(info provider.CustomMetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) { +func (p *customPrometheusProvider) getMultiple(info provider.CustomMetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) { // construct a client to list the names of objects matching the label selector client, err := p.kubeClient.ClientForGroupVersionResource(info.GroupResource.WithVersion("")) if err != nil { @@ -246,7 +241,7 @@ func (p *prometheusProvider) getMultiple(info provider.CustomMetricInfo, namespa return p.metricsFor(queryResults, info, matchingObjectsRaw) } -func (p *prometheusProvider) GetRootScopedMetricByName(groupResource schema.GroupResource, name string, metricName string) (*custom_metrics.MetricValue, error) { +func (p *customPrometheusProvider) GetRootScopedMetricByName(groupResource schema.GroupResource, name string, metricName string) (*custom_metrics.MetricValue, error) { info := provider.CustomMetricInfo{ GroupResource: groupResource, Metric: metricName, @@ -256,7 +251,7 @@ func (p *prometheusProvider) GetRootScopedMetricByName(groupResource schema.Grou return p.getSingle(info, "", name) } -func (p *prometheusProvider) GetRootScopedMetricBySelector(groupResource schema.GroupResource, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) { +func (p *customPrometheusProvider) GetRootScopedMetricBySelector(groupResource schema.GroupResource, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) { info := provider.CustomMetricInfo{ GroupResource: groupResource, Metric: metricName, @@ -265,7 +260,7 @@ func (p *prometheusProvider) GetRootScopedMetricBySelector(groupResource schema. return p.getMultiple(info, "", selector) } -func (p *prometheusProvider) GetNamespacedMetricByName(groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) { +func (p *customPrometheusProvider) GetNamespacedMetricByName(groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) { info := provider.CustomMetricInfo{ GroupResource: groupResource, Metric: metricName, @@ -275,7 +270,7 @@ func (p *prometheusProvider) GetNamespacedMetricByName(groupResource schema.Grou return p.getSingle(info, namespace, name) } -func (p *prometheusProvider) GetNamespacedMetricBySelector(groupResource schema.GroupResource, namespace string, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) { +func (p *customPrometheusProvider) GetNamespacedMetricBySelector(groupResource schema.GroupResource, namespace string, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) { info := provider.CustomMetricInfo{ GroupResource: groupResource, Metric: metricName, @@ -283,40 +278,3 @@ func (p *prometheusProvider) GetNamespacedMetricBySelector(groupResource schema. } return p.getMultiple(info, namespace, selector) } - -type cachingMetricsLister struct { - SeriesRegistry - - promClient prom.Client - updateInterval time.Duration -} - -func (l *cachingMetricsLister) Run() { - l.RunUntil(wait.NeverStop) -} - -func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { - go wait.Until(func() { - if err := l.updateMetrics(); err != nil { - utilruntime.HandleError(err) - } - }, l.updateInterval, stopChan) -} - -func (l *cachingMetricsLister) updateMetrics() error { - startTime := pmodel.Now().Add(-1 * l.updateInterval) - - sels := l.Selectors() - - // TODO: use an actual context here - series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, sels...) - if err != nil { - return fmt.Errorf("unable to update list of all available metrics: %v", err) - } - - glog.V(10).Infof("Set available metric list from Prometheus to: %v", series) - - l.SetSeries(series) - - return nil -} diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/custom_provider_test.go similarity index 100% rename from pkg/custom-provider/provider_test.go rename to pkg/custom-provider/custom_provider_test.go diff --git a/pkg/custom-provider/external_provider.go b/pkg/custom-provider/external_provider.go new file mode 100644 index 00000000..743829a8 --- /dev/null +++ b/pkg/custom-provider/external_provider.go @@ -0,0 +1,162 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "fmt" + s "strings" + "time" + + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/dynamic" + "k8s.io/metrics/pkg/apis/external_metrics" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +type externalPrometheusProvider struct { + mapper apimeta.RESTMapper + kubeClient dynamic.ClientPool + promClient prom.Client + + SeriesRegistry + + rateInterval time.Duration +} + +func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.ExternalMetricsProvider { + lister := &cachingMetricsLister{ + updateInterval: updateInterval, + promClient: promClient, + + SeriesRegistry: &basicSeriesRegistry{ + namer: metricNamer{ + // TODO: populate the overrides list + overrides: nil, + mapper: mapper, + labelPrefix: labelPrefix, + }, + }, + } + + lister.RunUntil(stopChan) + + return &externalPrometheusProvider{ + mapper: mapper, + kubeClient: kubeClient, + promClient: promClient, + + SeriesRegistry: lister, + + rateInterval: rateInterval, + } +} + +func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { + //TODO: Steps + //1. Generate a Prometheus Query. + // Something like my_metric{namespace="namespace" some_label="some_value"} + //2. Send that query to Prometheus. + //3. Adapt the results. + //The query generation for external metrics is much more straightforward + //than for custom metrics because no renaming is applied. + //So we'll just start with some simple string operations and see how far that gets us. + //Then I'll circle back and figure out how much code reuse I can get out of the original implementation. + namespaceSelector := p.makeLabelFilter("namespace", "=", namespace) + otherSelectors := p.convertSelectors(metricSelector) + + finalTargets := append([]string{namespaceSelector}, otherSelectors...) + + //TODO: Only here to stop compiler issues in this incomplete code. + fmt.Printf("len=%d", len(finalTargets)) + + //TODO: Construct a real result. + return nil, nil +} + +func (p *externalPrometheusProvider) makeLabelFilter(labelName string, operator string, targetValue string) string { + return fmt.Sprintf("%s%s\"%s\"", labelName, operator, targetValue) +} + +func (p *externalPrometheusProvider) convertSelectors(metricSelector labels.Selector) []string { + requirements, _ := metricSelector.Requirements() + + selectors := []string{} + for i := 0; i < len(requirements); i++ { + selector := p.convertRequirement(requirements[i]) + + selectors = append(selectors, selector) + } + + return selectors +} + +func (p *externalPrometheusProvider) convertRequirement(requirement labels.Requirement) string { + labelName := requirement.Key() + values := requirement.Values().List() + + stringValues := values[0] + + valueCount := len(values) + if valueCount > 1 { + stringValues = s.Join(values, "|") + } + + operator := p.selectOperator(requirement.Operator(), valueCount) + + return p.makeLabelFilter(labelName, operator, stringValues) +} + +func (p *externalPrometheusProvider) selectOperator(operator selection.Operator, valueCount int) string { + if valueCount > 1 { + return p.selectRegexOperator(operator) + } + + return p.selectSingleValueOperator(operator) +} + +func (p *externalPrometheusProvider) selectRegexOperator(operator selection.Operator) string { + switch operator { + case selection.Equals: + return "=~" + case selection.NotEquals: + return "!~" + } + + //TODO: Cover more cases, supply appropriate errors for any unhandled cases. + return "=" +} + +func (p *externalPrometheusProvider) selectSingleValueOperator(operator selection.Operator) string { + switch operator { + case selection.Equals: + return "=" + case selection.NotEquals: + return "!=" + } + + //TODO: Cover more cases, supply appropriate errors for any unhandled cases. + return "=" +} + +func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { + //TODO: Provide a real response. + return nil +} diff --git a/pkg/custom-provider/types.go b/pkg/custom-provider/types.go new file mode 100644 index 00000000..cf53f988 --- /dev/null +++ b/pkg/custom-provider/types.go @@ -0,0 +1,50 @@ +package provider + +import ( + "context" + "fmt" + "time" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/golang/glog" + pmodel "github.com/prometheus/common/model" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" +) + +type cachingMetricsLister struct { + SeriesRegistry + + promClient prom.Client + updateInterval time.Duration +} + +func (l *cachingMetricsLister) Run() { + l.RunUntil(wait.NeverStop) +} + +func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { + go wait.Until(func() { + if err := l.updateMetrics(); err != nil { + utilruntime.HandleError(err) + } + }, l.updateInterval, stopChan) +} + +func (l *cachingMetricsLister) updateMetrics() error { + startTime := pmodel.Now().Add(-1 * l.updateInterval) + + sels := l.Selectors() + + // TODO: use an actual context here + series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, sels...) + if err != nil { + return fmt.Errorf("unable to update list of all available metrics: %v", err) + } + + glog.V(10).Infof("Set available metric list from Prometheus to: %v", series) + + l.SetSeries(series) + + return nil +}