From 9641e700054eaf0202c892c9f0ff0098ae439d51 Mon Sep 17 00:00:00 2001 From: Tony Compton Date: Fri, 20 Jul 2018 12:35:49 -0400 Subject: [PATCH] Fixing some refactoring bugs, first half-decent external metrics attempt. Fixed: * `basicMetricLister` wasn't applying the appropriate start time because I had forgotten to set the `lookback`. There are still a number of issues: * The `externalPrometheusProvider` is not hooked up to the web application yet, so it doesn't serve requests. * The namespace and label approach used in `external_info_map.go` is horrifically incorrect. It doesn't appropriately store multiple series with the same name but different labels. * The configuration is still not updated to appropriately handle external metrics, it's sort of half-piggy-backing on the pre-existing work. --- pkg/custom-provider/basic_metric_lister.go | 4 +- pkg/custom-provider/external_info_map.go | 108 ++++++++++++ pkg/custom-provider/external_provider.go | 119 ++++++------- .../external_series_registry.go | 165 ++++++++++++++++++ .../metric-converter/matrix_converter.go | 37 ++++ .../metric-converter/metric_converter.go | 52 ++++++ .../metric-converter/sample_converter.go | 52 ++++++ .../metric-converter/scalar_converter.go | 55 ++++++ .../metric-converter/vector_converter.go | 58 ++++++ pkg/custom-provider/metric_namer.go | 5 + pkg/custom-provider/periodic_metric_lister.go | 25 ++- .../periodic_metric_lister_test.go | 2 +- .../resource_converter_test.go | 8 +- pkg/custom-provider/series_registry.go | 24 ++- 14 files changed, 637 insertions(+), 77 deletions(-) create mode 100644 pkg/custom-provider/external_info_map.go create mode 100644 pkg/custom-provider/external_series_registry.go create mode 100644 pkg/custom-provider/metric-converter/matrix_converter.go create mode 100644 pkg/custom-provider/metric-converter/metric_converter.go create mode 100644 pkg/custom-provider/metric-converter/sample_converter.go create mode 100644 pkg/custom-provider/metric-converter/scalar_converter.go create mode 100644 pkg/custom-provider/metric-converter/vector_converter.go diff --git a/pkg/custom-provider/basic_metric_lister.go b/pkg/custom-provider/basic_metric_lister.go index 13ad0d75..bf640ee4 100644 --- a/pkg/custom-provider/basic_metric_lister.go +++ b/pkg/custom-provider/basic_metric_lister.go @@ -54,7 +54,8 @@ type MetricListerWithNotification interface { //Because it periodically pulls metrics, it needs to be Runnable. Runnable //It provides notifications when it has new data to supply. - SetNotificationReceiver(func(metricUpdateResult)) + AddNotificationReceiver(func(metricUpdateResult)) + UpdateNow() } type basicMetricLister struct { @@ -67,6 +68,7 @@ func NewBasicMetricLister(promClient prom.Client, namers []MetricNamer, lookback lister := basicMetricLister{ promClient: promClient, namers: namers, + lookback: lookback, } return &lister diff --git a/pkg/custom-provider/external_info_map.go b/pkg/custom-provider/external_info_map.go new file mode 100644 index 00000000..aae1f360 --- /dev/null +++ b/pkg/custom-provider/external_info_map.go @@ -0,0 +1,108 @@ +package provider + +import ( + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "k8s.io/apimachinery/pkg/labels" +) + +type ExportedMetric struct { + MetricName string + Labels labels.Set + Namespace string +} + +type ExternalInfoMap interface { + TrackMetric(metricName string, generatedBy MetricNamer) ExternalMetricData + ExportMetrics() []ExportedMetric + FindMetric(metricName string) (data ExternalMetricData, found bool) +} + +type ExternalMetricData interface { + MetricName() string + WithSeries(labels labels.Set) + WithNamespacedSeries(namespace string, labels labels.Set) + ExportMetrics() []ExportedMetric + GenerateQuery(selector labels.Selector) (prom.Selector, error) +} + +type externalInfoMap struct { + metrics map[string]ExternalMetricData +} + +type externalMetricData struct { + metricName string + namespacedData map[string]labels.Set + generatedBy MetricNamer +} + +func NewExternalMetricData(metricName string, generatedBy MetricNamer) ExternalMetricData { + return &externalMetricData{ + metricName: metricName, + generatedBy: generatedBy, + namespacedData: map[string]labels.Set{}, + } +} + +func NewExternalInfoMap() ExternalInfoMap { + return &externalInfoMap{ + metrics: map[string]ExternalMetricData{}, + } +} + +func (i *externalInfoMap) ExportMetrics() []ExportedMetric { + results := make([]ExportedMetric, 0) + for _, info := range i.metrics { + exported := info.ExportMetrics() + results = append(results, exported...) + } + + return results +} + +func (i *externalInfoMap) FindMetric(metricName string) (data ExternalMetricData, found bool) { + data, found = i.metrics[metricName] + return data, found +} + +func (i *externalInfoMap) TrackMetric(metricName string, generatedBy MetricNamer) ExternalMetricData { + data, found := i.metrics[metricName] + if !found { + data = NewExternalMetricData(metricName, generatedBy) + i.metrics[metricName] = data + } + + return data +} + +func (d *externalMetricData) MetricName() string { + return d.metricName +} + +func (d *externalMetricData) GenerateQuery(selector labels.Selector) (prom.Selector, error) { + return d.generatedBy.QueryForExternalSeries(d.metricName, selector) +} + +func (d *externalMetricData) ExportMetrics() []ExportedMetric { + results := make([]ExportedMetric, 0) + for namespace, labels := range d.namespacedData { + results = append(results, ExportedMetric{ + Labels: labels, + MetricName: d.metricName, + Namespace: namespace, + }) + } + + return results +} + +func (d *externalMetricData) WithSeries(labels labels.Set) { + d.WithNamespacedSeries("", labels) +} + +func (d *externalMetricData) WithNamespacedSeries(namespace string, labels labels.Set) { + data, found := d.namespacedData[namespace] + if !found { + data = labels + d.namespacedData[namespace] = data + } +} diff --git a/pkg/custom-provider/external_provider.go b/pkg/custom-provider/external_provider.go index b4d3b8d9..cceb6608 100644 --- a/pkg/custom-provider/external_provider.go +++ b/pkg/custom-provider/external_provider.go @@ -1,81 +1,74 @@ package provider -// import ( -// "context" -// "time" +import ( + "context" -// pmodel "github.com/prometheus/common/model" + pmodel "github.com/prometheus/common/model" -// "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" -// apimeta "k8s.io/apimachinery/pkg/api/meta" -// "k8s.io/apimachinery/pkg/labels" -// "k8s.io/client-go/dynamic" -// "k8s.io/metrics/pkg/apis/external_metrics" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" -// prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" -// ) + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" -// //TODO: Make sure everything has the proper licensing disclosure at the top. -// //TODO: I'd like to move these files into another directory, but the compiler was giving me -// //some static around unexported types. I'm going to leave things as-is for now, but it -// //might be worthwhile to, once the shared components are discovered, move some things around. + conv "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter" +) -// //TODO: Some of these members may not be necessary. -// //Some of them are definitely duplicated between the -// //external and custom providers. They should probably share -// //the same instances of these objects (especially the SeriesRegistry) -// //to cut down on unnecessary chatter/bookkeeping. -// type externalPrometheusProvider struct { -// mapper apimeta.RESTMapper -// kubeClient dynamic.Interface -// promClient prom.Client -// queryBuilder ExternalMetricQueryBuilder -// metricConverter conv.MetricConverter +//TODO: Make sure everything has the proper licensing disclosure at the top. +//TODO: I'd like to move these files into another directory, but the compiler was giving me +//some static around unexported types. I'm going to leave things as-is for now, but it +//might be worthwhile to, once the shared components are discovered, move some things around. -// seriesRegistry SeriesRegistry -// } +//TODO: Some of these members may not be necessary. +//Some of them are definitely duplicated between the +//external and custom providers. They should probably share +//the same instances of these objects (especially the SeriesRegistry) +//to cut down on unnecessary chatter/bookkeeping. +type externalPrometheusProvider struct { + promClient prom.Client + metricConverter conv.MetricConverter -// //TODO: It probably makes more sense to, once this is functional and complete, roll the -// //prometheusProvider and externalPrometheusProvider up into a single type -// //that implements both interfaces or provide a thin wrapper that composes them. -// //Just glancing at start.go looks like it would be much more straightforward -// //to do one of those two things instead of trying to run the two providers -// //independently. + seriesRegistry ExternalSeriesRegistry +} -// func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, metricConverter conv.MetricConverter, seriesRegistry SeriesRegistry) (provider.ExternalMetricsProvider, Runnable) { +//TODO: It probably makes more sense to, once this is functional and complete, roll the +//prometheusProvider and externalPrometheusProvider up into a single type +//that implements both interfaces or provide a thin wrapper that composes them. +//Just glancing at start.go looks like it would be much more straightforward +//to do one of those two things instead of trying to run the two providers +//independently. -// return &externalPrometheusProvider{ -// mapper: mapper, -// kubeClient: kubeClient, -// promClient: promClient, -// metricConverter: metricConverter, -// seriesRegistry: seriesRegistry, -// }, lister -// } +func NewExternalPrometheusProvider(seriesRegistry ExternalSeriesRegistry, promClient prom.Client, converter conv.MetricConverter) provider.ExternalMetricsProvider { + return &externalPrometheusProvider{ + promClient: promClient, + seriesRegistry: seriesRegistry, + metricConverter: converter, + } +} -// func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { -// selector, found := p.seriesRegistry.QueryForExternalMetric(metricInfo, metricSelector) +func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { + selector, found := p.seriesRegistry.QueryForMetric(metricName, metricSelector) -// if !found { -// return &external_metrics.ExternalMetricValueList{ -// Items: []external_metrics.ExternalMetricValue{}, -// }, nil -// } -// // query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata) + if !found { + return &external_metrics.ExternalMetricValueList{ + Items: []external_metrics.ExternalMetricValue{}, + }, nil + } + // query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata) -// //TODO: I don't yet know what a context is, but apparently I should use a real one. -// queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) + //TODO: I don't yet know what a context is, but apparently I should use a real one. + queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) -// if err != nil { -// //TODO: Is this how folks normally deal w/ errors? Just propagate them upwards? -// //I should go look at what the customProvider does. -// return nil, err -// } + if err != nil { + //TODO: Is this how folks normally deal w/ errors? Just propagate them upwards? + //I should go look at what the customProvider does. + return nil, err + } -// return p.metricConverter.Convert(queryMetadata, queryResults) -// } + return p.metricConverter.Convert(queryResults) +} -// func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { -// return p.seriesRegistry.ListAllExternalMetrics() -// } +func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { + return p.seriesRegistry.ListAllMetrics() +} diff --git a/pkg/custom-provider/external_series_registry.go b/pkg/custom-provider/external_series_registry.go new file mode 100644 index 00000000..0d5fb200 --- /dev/null +++ b/pkg/custom-provider/external_series_registry.go @@ -0,0 +1,165 @@ +package provider + +import ( + "sync" + + "github.com/prometheus/common/model" + + "github.com/golang/glog" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +type ExternalSeriesRegistry interface { + // ListAllMetrics lists all metrics known to this registry + ListAllMetrics() []provider.ExternalMetricInfo + QueryForMetric(metricName string, metricSelector labels.Selector) (query prom.Selector, found bool) +} + +// overridableSeriesRegistry is a basic SeriesRegistry +type externalSeriesRegistry struct { + mu sync.RWMutex + + externalInfo map[string]seriesInfo + // metrics is the list of all known metrics + metrics []provider.ExternalMetricInfo + + mapper apimeta.RESTMapper + + metricLister MetricListerWithNotification + tonyExternalInfo ExternalInfoMap +} + +func NewExternalSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) ExternalSeriesRegistry { + var registry = externalSeriesRegistry{ + mapper: mapper, + metricLister: lister, + tonyExternalInfo: NewExternalInfoMap(), + } + + lister.AddNotificationReceiver(registry.onNewDataAvailable) + + return ®istry +} + +func (r *externalSeriesRegistry) filterMetrics(result metricUpdateResult) metricUpdateResult { + namers := make([]MetricNamer, 0) + series := make([][]prom.Series, 0) + + targetType := config.MetricType("External") + + for i, namer := range result.namers { + if namer.MetricType() == targetType { + namers = append(namers, namer) + series = append(series, result.series[i]) + } + } + + return metricUpdateResult{ + namers: namers, + series: series, + } +} + +func (r *externalSeriesRegistry) convertLabels(labels model.LabelSet) labels.Set { + set := map[string]string{} + for key, value := range labels { + set[string(key)] = string(value) + } + return set +} + +func (r *externalSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { + result = r.filterMetrics(result) + + newSeriesSlices := result.series + namers := result.namers + + // if len(newSeriesSlices) != len(namers) { + // return fmt.Errorf("need one set of series per namer") + // } + + updatedCache := NewExternalInfoMap() + for i, newSeries := range newSeriesSlices { + namer := namers[i] + for _, series := range newSeries { + identity, err := namer.IdentifySeries(series) + + if err != nil { + glog.Errorf("unable to name series %q, skipping: %v", series.String(), err) + continue + } + + // resources := identity.resources + // namespaced := identity.namespaced + name := identity.name + labels := r.convertLabels(series.Labels) + //TODO: Figure out the namespace, if applicable + metricNs := "" + trackedMetric := updatedCache.TrackMetric(name, namer) + trackedMetric.WithNamespacedSeries(metricNs, labels) + } + } + + // regenerate metrics + allMetrics := updatedCache.ExportMetrics() + convertedMetrics := r.convertMetrics(allMetrics) + + r.mu.Lock() + defer r.mu.Unlock() + + r.tonyExternalInfo = updatedCache + r.metrics = convertedMetrics +} + +func (r *externalSeriesRegistry) convertMetrics(metrics []ExportedMetric) []provider.ExternalMetricInfo { + results := make([]provider.ExternalMetricInfo, len(metrics)) + for i, info := range metrics { + results[i] = provider.ExternalMetricInfo{ + Labels: info.Labels, + Metric: info.MetricName, + } + } + + return results +} + +func (r *externalSeriesRegistry) ListAllMetrics() []provider.ExternalMetricInfo { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.metrics +} + +func (r *externalSeriesRegistry) QueryForMetric(metricName string, metricSelector labels.Selector) (query prom.Selector, found bool) { + r.mu.RLock() + defer r.mu.RUnlock() + metric, found := r.tonyExternalInfo.FindMetric(metricName) + + if !found { + return "", false + } + + query, err := metric.GenerateQuery(metricSelector) + // info, infoFound := r.info[metricInfo] + // if !infoFound { + // //TODO: Weird that it switches between types here. + // glog.V(10).Infof("metric %v not registered", metricInfo) + // return "", false + // } + + // query, err := info.namer.QueryForExternalSeries(info.seriesName, metricSelector) + if err != nil { + //TODO: See what was being .String() and implement that for ExternalMetricInfo. + // errorVal := metricInfo.String() + errorVal := "something" + glog.Errorf("unable to construct query for metric %s: %v", errorVal, err) + return "", false + } + + return query, true +} diff --git a/pkg/custom-provider/metric-converter/matrix_converter.go b/pkg/custom-provider/metric-converter/matrix_converter.go new file mode 100644 index 00000000..453ce7e3 --- /dev/null +++ b/pkg/custom-provider/metric-converter/matrix_converter.go @@ -0,0 +1,37 @@ +package provider + +import ( + "errors" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/prometheus/common/model" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +type matrixConverter struct { +} + +//NewMatrixConverter creates a MatrixConverter capable of converting +//matrix Prometheus query results into external metric types. +func NewMatrixConverter() MetricConverter { + return &matrixConverter{} +} + +func (c *matrixConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type != model.ValMatrix { + return nil, errors.New("matrixConverter can only convert scalar query results") + } + + toConvert := queryResult.Matrix + + if toConvert == nil { + return nil, errors.New("the provided input did not contain matrix query results") + } + + return c.convert(toConvert) +} + +func (c *matrixConverter) convert(result *model.Matrix) (*external_metrics.ExternalMetricValueList, error) { + //TODO: Implementation. + return nil, errors.New("converting Matrix results is not yet supported") +} diff --git a/pkg/custom-provider/metric-converter/metric_converter.go b/pkg/custom-provider/metric-converter/metric_converter.go new file mode 100644 index 00000000..410588b4 --- /dev/null +++ b/pkg/custom-provider/metric-converter/metric_converter.go @@ -0,0 +1,52 @@ +package provider + +import ( + "errors" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/prometheus/common/model" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +//MetricConverter provides a unified interface for converting the results of +//Prometheus queries into external metric types. +type MetricConverter interface { + Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) +} + +type metricConverter struct { + scalarConverter MetricConverter + vectorConverter MetricConverter + matrixConverter MetricConverter +} + +func DefaultMetricConverter() MetricConverter { + sampleConverter := NewSampleConverter() + return NewMetricConverter(NewScalarConverter(), NewVectorConverter(&sampleConverter), NewMatrixConverter()) +} + +//NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types +//returned by the Prometheus client into external metrics types. +func NewMetricConverter(scalar MetricConverter, vector MetricConverter, matrix MetricConverter) MetricConverter { + return &metricConverter{ + scalarConverter: scalar, + vectorConverter: vector, + matrixConverter: matrix, + } +} + +func (c *metricConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type == model.ValScalar { + return c.scalarConverter.Convert(queryResult) + } + + if queryResult.Type == model.ValVector { + return c.vectorConverter.Convert(queryResult) + } + + if queryResult.Type == model.ValMatrix { + return c.matrixConverter.Convert(queryResult) + } + + return nil, errors.New("encountered an unexpected query result type") +} diff --git a/pkg/custom-provider/metric-converter/sample_converter.go b/pkg/custom-provider/metric-converter/sample_converter.go new file mode 100644 index 00000000..8cd3bb23 --- /dev/null +++ b/pkg/custom-provider/metric-converter/sample_converter.go @@ -0,0 +1,52 @@ +package provider + +import ( + "github.com/prometheus/common/model" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +type sampleConverter struct { +} + +//SampleConverter is capable of translating Prometheus Sample objects +//into ExternamMetricValue objects. +type SampleConverter interface { + Convert(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) +} + +//NewSampleConverter creates a SampleConverter capable of translating Prometheus Sample objects +//into ExternamMetricValue objects. +func NewSampleConverter() SampleConverter { + return &sampleConverter{} +} + +func (c *sampleConverter) Convert(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) { + labels := c.convertLabels(sample.Metric) + + singleMetric := external_metrics.ExternalMetricValue{ + MetricName: string(sample.Metric[model.LabelName("__name__")]), + Timestamp: metav1.Time{ + sample.Timestamp.Time(), + }, + //TODO: I'm not so sure about this type/conversions. + //This can't possibly be the right way to convert this. + //Also, does K8S only deal win integer metrics? + Value: *resource.NewQuantity(int64(float64(sample.Value)), resource.DecimalSI), + MetricLabels: labels, + } + + //TODO: Actual errors? + return &singleMetric, nil +} + +func (c *sampleConverter) convertLabels(inLabels model.Metric) map[string]string { + numLabels := len(inLabels) + outLabels := make(map[string]string, numLabels) + for labelName, labelVal := range inLabels { + outLabels[string(labelName)] = string(labelVal) + } + + return outLabels +} diff --git a/pkg/custom-provider/metric-converter/scalar_converter.go b/pkg/custom-provider/metric-converter/scalar_converter.go new file mode 100644 index 00000000..2a4e2095 --- /dev/null +++ b/pkg/custom-provider/metric-converter/scalar_converter.go @@ -0,0 +1,55 @@ +package provider + +import ( + "errors" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/prometheus/common/model" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +type scalarConverter struct { +} + +//NewScalarConverter creates a ScalarConverter capable of converting +//scalar Prometheus query results into external metric types. +func NewScalarConverter() MetricConverter { + return &scalarConverter{} +} + +func (c *scalarConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type != model.ValScalar { + return nil, errors.New("scalarConverter can only convert scalar query results") + } + + toConvert := queryResult.Scalar + + if toConvert == nil { + return nil, errors.New("the provided input did not contain scalar query results") + } + + return c.convert(toConvert) +} + +func (c *scalarConverter) convert(input *model.Scalar) (*external_metrics.ExternalMetricValueList, error) { + result := external_metrics.ExternalMetricValueList{ + //Using prometheusProvider.metricsFor(...) as an example, + //it seems that I don't need to provide values for + //TypeMeta and ListMeta. + //TODO: Get some confirmation on this. + Items: []external_metrics.ExternalMetricValue{ + { + Timestamp: metav1.Time{ + input.Timestamp.Time(), + }, + //TODO: I'm not so sure about this type/conversions. + //Is there a meaningful loss of precision here? + //Does K8S only deal win integer metrics? + Value: *resource.NewMilliQuantity(int64(input.Value*1000.0), resource.DecimalSI), + }, + }, + } + return &result, nil +} diff --git a/pkg/custom-provider/metric-converter/vector_converter.go b/pkg/custom-provider/metric-converter/vector_converter.go new file mode 100644 index 00000000..71b13631 --- /dev/null +++ b/pkg/custom-provider/metric-converter/vector_converter.go @@ -0,0 +1,58 @@ +package provider + +import ( + "errors" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/prometheus/common/model" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +type vectorConverter struct { + SampleConverter SampleConverter +} + +//NewVectorConverter creates a VectorConverter capable of converting +//vector Prometheus query results into external metric types. +func NewVectorConverter(sampleConverter *SampleConverter) MetricConverter { + return &vectorConverter{ + SampleConverter: *sampleConverter, + } +} + +func (c *vectorConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type != model.ValVector { + return nil, errors.New("vectorConverter can only convert scalar query results") + } + + toConvert := *queryResult.Vector + + if toConvert == nil { + return nil, errors.New("the provided input did not contain vector query results") + } + + return c.convert(toConvert) +} + +func (c *vectorConverter) convert(result model.Vector) (*external_metrics.ExternalMetricValueList, error) { + items := []external_metrics.ExternalMetricValue{} + metricValueList := external_metrics.ExternalMetricValueList{ + Items: items, + } + + numSamples := result.Len() + if numSamples == 0 { + return &metricValueList, nil + } + + for _, val := range result { + //TODO: Care about potential errors here. + singleMetric, _ := c.SampleConverter.Convert(val) + items = append(items, *singleMetric) + } + + metricValueList = external_metrics.ExternalMetricValueList{ + Items: items, + } + return &metricValueList, nil +} diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index 7e566e4b..f1ee45d7 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -36,6 +36,7 @@ type MetricNamer interface { QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) IdentifySeries(series prom.Series) (seriesIdentity, error) + MetricType() config.MetricType } type seriesIdentity struct { @@ -69,6 +70,10 @@ type queryTemplateArgs struct { GroupBySlice []string } +func (n *metricNamer) MetricType() config.MetricType { + return n.metricType +} + func (n *metricNamer) IdentifySeries(series prom.Series) (seriesIdentity, error) { // TODO: warn if it doesn't match any resources resources, namespaced := n.resourceConverter.ResourcesForSeries(series) diff --git a/pkg/custom-provider/periodic_metric_lister.go b/pkg/custom-provider/periodic_metric_lister.go index 75297982..81fceee7 100644 --- a/pkg/custom-provider/periodic_metric_lister.go +++ b/pkg/custom-provider/periodic_metric_lister.go @@ -27,7 +27,7 @@ type periodicMetricLister struct { realLister MetricLister updateInterval time.Duration mostRecentResult metricUpdateResult - callback func(metricUpdateResult) + callbacks []func(metricUpdateResult) } //NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics @@ -36,13 +36,14 @@ func NewPeriodicMetricLister(realLister MetricLister, updateInterval time.Durati lister := periodicMetricLister{ updateInterval: updateInterval, realLister: realLister, + callbacks: make([]func(metricUpdateResult), 0), } return &lister, &lister } -func (l *periodicMetricLister) SetNotificationReceiver(callback func(metricUpdateResult)) { - l.callback = callback +func (l *periodicMetricLister) AddNotificationReceiver(callback func(metricUpdateResult)) { + l.callbacks = append(l.callbacks, callback) } func (l *periodicMetricLister) ListAllMetrics() (metricUpdateResult, error) { @@ -70,13 +71,23 @@ func (l *periodicMetricLister) updateMetrics() error { //Cache the result. l.mostRecentResult = result - //Let our listener know we've got new data ready for them. - if l.callback != nil { - l.callback(result) - } + //Let our listeners know we've got new data ready for them. + l.notifyListeners() return nil } +func (l *periodicMetricLister) notifyListeners() { + for _, listener := range l.callbacks { + if listener != nil { + listener(l.mostRecentResult) + } + } +} + +func (l *periodicMetricLister) UpdateNow() { + l.updateMetrics() +} + // func (l *periodicMetricLister) updateMetrics() (metricUpdateResult, error) { // result := metricUpdateResult{ diff --git a/pkg/custom-provider/periodic_metric_lister_test.go b/pkg/custom-provider/periodic_metric_lister_test.go index d9548b10..2aa1b098 100644 --- a/pkg/custom-provider/periodic_metric_lister_test.go +++ b/pkg/custom-provider/periodic_metric_lister_test.go @@ -36,7 +36,7 @@ func TestWhenNewMetricsAvailableCallbackIsInvoked(t *testing.T) { callbackInvoked = true } - periodicLister.SetNotificationReceiver(callback) + periodicLister.AddNotificationReceiver(callback) err := periodicLister.updateMetrics() require.NoError(t, err) require.True(t, callbackInvoked) diff --git a/pkg/custom-provider/resource_converter_test.go b/pkg/custom-provider/resource_converter_test.go index c4774733..8fe8cad9 100644 --- a/pkg/custom-provider/resource_converter_test.go +++ b/pkg/custom-provider/resource_converter_test.go @@ -95,14 +95,14 @@ func TestDetectsNonNamespaceResourcesFromOverrides(t *testing.T) { }) require.NoError(t, err) require.Equal(t, []schema.GroupResource{ - schema.GroupResource{ - Group: "", - Resource: "pods", - }, schema.GroupResource{ Group: "extensions", Resource: "deployments", }, + schema.GroupResource{ + Group: "", + Resource: "pods", + }, }, resource) require.Equal(t, false, namespaced) } diff --git a/pkg/custom-provider/series_registry.go b/pkg/custom-provider/series_registry.go index 0b5be55d..092f7fe4 100644 --- a/pkg/custom-provider/series_registry.go +++ b/pkg/custom-provider/series_registry.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/labels" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" "github.com/golang/glog" pmodel "github.com/prometheus/common/model" ) @@ -83,12 +84,33 @@ func NewBasicSeriesRegistry(lister MetricListerWithNotification, mapper apimeta. metricLister: lister, } - lister.SetNotificationReceiver(registry.onNewDataAvailable) + lister.AddNotificationReceiver(registry.onNewDataAvailable) return ®istry } +func (r *basicSeriesRegistry) filterMetrics(result metricUpdateResult) metricUpdateResult { + namers := make([]MetricNamer, 0) + series := make([][]prom.Series, 0) + + targetType := config.MetricType("Custom") + + for i, namer := range result.namers { + if namer.MetricType() == targetType { + namers = append(namers, namer) + series = append(series, result.series[i]) + } + } + + return metricUpdateResult{ + namers: namers, + series: series, + } +} + func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { + result = r.filterMetrics(result) + newSeriesSlices := result.series namers := result.namers