diff --git a/cmd/adapter/app/start.go b/cmd/adapter/app/start.go index 9859365d..d4b18e04 100644 --- a/cmd/adapter/app/start.go +++ b/cmd/adapter/app/start.go @@ -184,12 +184,12 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) promClient := prom.NewClientForAPI(instrumentedGenericPromClient) - namers, err := cmprov.NamersFromConfig(metricsConfig, dynamicMapper) + converters, err := cmprov.ConvertersFromConfig(metricsConfig, dynamicMapper) if err != nil { return fmt.Errorf("unable to construct naming scheme from metrics rules: %v", err) } - cmProvider, runner := cmprov.NewPrometheusProvider(dynamicMapper, dynamicClient, promClient, namers, o.MetricsRelistInterval) + cmProvider, runner := cmprov.NewPrometheusProvider(dynamicMapper, dynamicClient, promClient, converters, o.MetricsRelistInterval) runner.RunUntil(stopCh) server, err := config.Complete().New("prometheus-custom-metrics-adapter", cmProvider, nil) diff --git a/pkg/config/config.go b/pkg/config/config.go index c79e2752..f167970f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -36,7 +36,7 @@ type DiscoveryRule struct { // as external or custom metrics. MetricType MetricType `yaml:"metricType,omitempty"` // ExternalMetricNamespaceLabelName identifies what Prometheus label should be examined - // to apply a namespace to metrics creates from this rule. + // to apply a namespace to metrics created from this rule. ExternalMetricNamespaceLabelName string `yaml:"externalMetricNamespaceLabelName,omitempty"` } @@ -80,7 +80,7 @@ type NameMapping struct { As string `yaml:"as"` } -//MetricType identifies whether a given metric should be handled and interpreted as a Custom or External metric. +// MetricType identifies whether a given metric should be handled and interpreted as a Custom or External metric. type MetricType string // Operator represents a key/field's relationship to value(s). diff --git a/pkg/custom-provider/basic_metric_lister.go b/pkg/custom-provider/basic_metric_lister.go index 1bbbd15d..789baa01 100644 --- a/pkg/custom-provider/basic_metric_lister.go +++ b/pkg/custom-provider/basic_metric_lister.go @@ -35,42 +35,36 @@ type Runnable interface { RunUntil(stopChan <-chan struct{}) } -//A MetricLister provides a window into all of the metrics that are available within a given -//Prometheus instance, classified as either Custom or External metrics, but presented generically -//so that it can manage both types simultaneously. +// A MetricLister provides a window into all of the metrics that are available within a given +// Prometheus instance, classified as either Custom or External metrics, but presented generically +// so that it can manage both types simultaneously. type MetricLister interface { - // Run() - // UpdateMetrics() error - // GetAllMetrics() []GenericMetricInfo - // GetAllCustomMetrics() []GenericMetricInfo - // GetAllExternalMetrics() []GenericMetricInfo - // GetInfoForMetric(infoKey GenericMetricInfo) (seriesInfo, bool) - ListAllMetrics() (metricUpdateResult, error) + ListAllMetrics() (MetricUpdateResult, error) } -//A MetricListerWithNotification is a MetricLister that has the ability to notify listeners -//when new metric data is available. +// A MetricListerWithNotification is a MetricLister that has the ability to notify listeners +// when new metric data is available. type MetricListerWithNotification interface { MetricLister Runnable - //AddNotificationReceiver registers a callback to be invoked when new metric data is available. - AddNotificationReceiver(func(metricUpdateResult)) - //UpdateNow forces an immediate refresh from the source data. Primarily for test purposes. + // AddNotificationReceiver registers a callback to be invoked when new metric data is available. + AddNotificationReceiver(MetricUpdateCallback) + // UpdateNow forces an immediate refresh from the source data. Primarily for test purposes. UpdateNow() } type basicMetricLister struct { promClient prom.Client - namers []MetricNamer + converters []SeriesConverter lookback time.Duration } -//NewBasicMetricLister creates a MetricLister that is capable of interactly directly with Prometheus to list metrics. -func NewBasicMetricLister(promClient prom.Client, namers []MetricNamer, lookback time.Duration) MetricLister { +// NewBasicMetricLister creates a MetricLister that is capable of interactly directly with Prometheus to list metrics. +func NewBasicMetricLister(promClient prom.Client, converters []SeriesConverter, lookback time.Duration) MetricLister { lister := basicMetricLister{ promClient: promClient, - namers: namers, + converters: converters, lookback: lookback, } @@ -82,10 +76,10 @@ type selectorSeries struct { series []prom.Series } -func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) { - result := metricUpdateResult{ - series: make([][]prom.Series, 0), - namers: make([]MetricNamer, 0), +func (l *basicMetricLister) ListAllMetrics() (MetricUpdateResult, error) { + result := MetricUpdateResult{ + series: make([][]prom.Series, 0), + converters: make([]SeriesConverter, 0), } startTime := pmodel.Now().Add(-1 * l.lookback) @@ -93,10 +87,10 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) { // these can take a while on large clusters, so launch in parallel // and don't duplicate selectors := make(map[prom.Selector]struct{}) - selectorSeriesChan := make(chan selectorSeries, len(l.namers)) - errs := make(chan error, len(l.namers)) - for _, namer := range l.namers { - sel := namer.Selector() + selectorSeriesChan := make(chan selectorSeries, len(l.converters)) + errs := make(chan error, len(l.converters)) + for _, converter := range l.converters { + sel := converter.Selector() if _, ok := selectors[sel]; ok { errs <- nil selectorSeriesChan <- selectorSeries{} @@ -110,7 +104,7 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) { return } errs <- nil - //Push into the channel: "this selector produced these series" + // Push into the channel: "this selector produced these series" selectorSeriesChan <- selectorSeries{ selector: sel, series: series, @@ -123,41 +117,49 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) { // iterate through, blocking until we've got all results // We know that, from above, we should have pushed one item into the channel - // for each namer. So here, we'll assume that we should receive one item per namer. - for range l.namers { + // for each converter. So here, we'll assume that we should receive one item per converter. + for range l.converters { if err := <-errs; err != nil { return result, fmt.Errorf("unable to update list of all metrics: %v", err) } - //Receive from the channel: "this selector produced these series" - //We stuff that into this map so that we can collect the data as it arrives - //and then, once we've received it all, we can process it below. + // Receive from the channel: "this selector produced these series" + // We stuff that into this map so that we can collect the data as it arrives + // and then, once we've received it all, we can process it below. if ss := <-selectorSeriesChan; ss.series != nil { seriesCacheByQuery[ss.selector] = ss.series } } close(errs) - //Now that we've collected all of the results into `seriesCacheByQuery` - //we can start processing them. - newSeries := make([][]prom.Series, len(l.namers)) - for i, namer := range l.namers { - series, cached := seriesCacheByQuery[namer.Selector()] + // Now that we've collected all of the results into `seriesCacheByQuery` + // we can start processing them. + newSeries := make([][]prom.Series, len(l.converters)) + for i, converter := range l.converters { + series, cached := seriesCacheByQuery[converter.Selector()] if !cached { - return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector()) + return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", converter.Selector()) } - //Because namers provide a "post-filtering" option, it's not enough to - //simply take all the series that were produced. We need to further filter them. - newSeries[i] = namer.SeriesFilterer().FilterSeries(series) + // Because converters provide a "post-filtering" option, it's not enough to + // simply take all the series that were produced. We need to further filter them. + newSeries[i] = converter.SeriesFilterer().FilterSeries(series) } glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) result.series = newSeries - result.namers = l.namers + result.converters = l.converters return result, nil } -type metricUpdateResult struct { - series [][]prom.Series - namers []MetricNamer +// MetricUpdateResult represents the output of a periodic inspection of metrics found to be +// available in Prometheus. +// It includes both the series data the Prometheus exposed, as well as the configurational +// object that led to their discovery. +type MetricUpdateResult struct { + series [][]prom.Series + converters []SeriesConverter } + +// MetricUpdateCallback is a function signature for receiving periodic updates about +// available metrics. +type MetricUpdateCallback func(MetricUpdateResult) diff --git a/pkg/custom-provider/errors.go b/pkg/custom-provider/errors.go new file mode 100644 index 00000000..1459dd98 --- /dev/null +++ b/pkg/custom-provider/errors.go @@ -0,0 +1,27 @@ +package provider + +import "errors" + +// NewOperatorNotSupportedByPrometheusError creates an error that represents the fact that we were requested to service a query that +// Prometheus would be unable to support. +func NewOperatorNotSupportedByPrometheusError() error { + return errors.New("operator not supported by prometheus") +} + +// NewOperatorRequiresValuesError creates an error that represents the fact that we were requested to service a query +// that was malformed in its operator/value combination. +func NewOperatorRequiresValuesError() error { + return errors.New("operator requires values") +} + +// NewOperatorDoesNotSupportValuesError creates an error that represents the fact that we were requested to service a query +// that was malformed in its operator/value combination. +func NewOperatorDoesNotSupportValuesError() error { + return errors.New("operator does not support values") +} + +// NewLabelNotSpecifiedError creates an error that represents the fact that we were requested to service a query +// that was malformed in its label specification. +func NewLabelNotSpecifiedError() error { + return errors.New("label not specified") +} diff --git a/pkg/custom-provider/external_info_map.go b/pkg/custom-provider/external_info_map.go deleted file mode 100644 index 03933baf..00000000 --- a/pkg/custom-provider/external_info_map.go +++ /dev/null @@ -1,128 +0,0 @@ -package provider - -import ( - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "k8s.io/apimachinery/pkg/labels" -) - -//ExportedMetric is a description of an available metric. -type ExportedMetric struct { - MetricName string - Labels labels.Set - Namespace string -} - -//ExternalInfoMap is a data object that accepts and organizes information -//about available metrics. -type ExternalInfoMap interface { - //Begins tracking a metric, returning it to the caller. - TrackMetric(metricName string, generatedBy MetricNamer) ExternalMetricData - //Exports a collection of all of the metrics currently being tracked. - ExportMetrics() []ExportedMetric - //Finds a tracked metric with the given metric name, if it exists. - FindMetric(metricName string) (data ExternalMetricData, found bool) -} - -//ExternalMetricData is a data object that accepts and organizes information -//about the various series/namespaces that a metric is associated with. -type ExternalMetricData interface { - //MetricName returns the name of the metric represented by this object. - MetricName() string - //WithSeries associates the provided labels with this metric. - WithSeries(labels labels.Set) - //WithNamespacedSeries associates the provided labels with this metric, but within a particular namespace. - WithNamespacedSeries(namespace string, labels labels.Set) - //Exports a collection of all the metrics currently being tracked. - ExportMetrics() []ExportedMetric - //Generates a query to select the series/values for the metric this object represents. - GenerateQuery(namespace string, selector labels.Selector) (prom.Selector, error) -} - -type externalInfoMap struct { - metrics map[string]ExternalMetricData -} - -type externalMetricData struct { - metricName string - namespacedData map[string][]labels.Set - generatedBy MetricNamer -} - -//NewExternalMetricData creates an ExternalMetricData for the provided metric name and namer. -func NewExternalMetricData(metricName string, generatedBy MetricNamer) ExternalMetricData { - return &externalMetricData{ - metricName: metricName, - generatedBy: generatedBy, - namespacedData: map[string][]labels.Set{}, - } -} - -//NewExternalInfoMap creates an empty ExternalInfoMap for storing external metric information. -func NewExternalInfoMap() ExternalInfoMap { - return &externalInfoMap{ - metrics: map[string]ExternalMetricData{}, - } -} - -func (i *externalInfoMap) ExportMetrics() []ExportedMetric { - results := make([]ExportedMetric, 0) - for _, info := range i.metrics { - exported := info.ExportMetrics() - results = append(results, exported...) - } - - return results -} - -func (i *externalInfoMap) FindMetric(metricName string) (data ExternalMetricData, found bool) { - data, found = i.metrics[metricName] - return data, found -} - -func (i *externalInfoMap) TrackMetric(metricName string, generatedBy MetricNamer) ExternalMetricData { - data, found := i.metrics[metricName] - if !found { - data = NewExternalMetricData(metricName, generatedBy) - i.metrics[metricName] = data - } - - return data -} - -func (d *externalMetricData) MetricName() string { - return d.metricName -} - -func (d *externalMetricData) GenerateQuery(namespace string, selector labels.Selector) (prom.Selector, error) { - return d.generatedBy.QueryForExternalSeries(namespace, d.metricName, selector) -} - -func (d *externalMetricData) ExportMetrics() []ExportedMetric { - results := make([]ExportedMetric, 0) - for namespace, labelSets := range d.namespacedData { - for _, labelSet := range labelSets { - results = append(results, ExportedMetric{ - Labels: labelSet, - MetricName: d.metricName, - Namespace: namespace, - }) - } - } - - return results -} - -func (d *externalMetricData) WithSeries(labels labels.Set) { - d.WithNamespacedSeries("", labels) -} - -func (d *externalMetricData) WithNamespacedSeries(namespace string, seriesLabels labels.Set) { - data, found := d.namespacedData[namespace] - if !found { - data = []labels.Set{} - } - - data = append(data, seriesLabels) - d.namespacedData[namespace] = data - -} diff --git a/pkg/custom-provider/external_provider.go b/pkg/custom-provider/external_provider.go index b10a98ce..ba3bcc92 100644 --- a/pkg/custom-provider/external_provider.go +++ b/pkg/custom-provider/external_provider.go @@ -2,29 +2,32 @@ package provider import ( "context" + "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/golang/glog" pmodel "github.com/prometheus/common/model" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + apierr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - - conv "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter" ) -//TODO: AC - Make sure everything has the proper licensing disclosure at the top. +// TODO: Make sure everything has the proper licensing disclosure at the top. type externalPrometheusProvider struct { promClient prom.Client - metricConverter conv.MetricConverter + metricConverter MetricConverter seriesRegistry ExternalSeriesRegistry } -//NewExternalPrometheusProvider creates an ExternalMetricsProvider capable of responding to Kubernetes requests for external metric data. -func NewExternalPrometheusProvider(seriesRegistry ExternalSeriesRegistry, promClient prom.Client, converter conv.MetricConverter) provider.ExternalMetricsProvider { +// NewExternalPrometheusProvider creates an ExternalMetricsProvider capable of responding to Kubernetes requests for external metric data. +func NewExternalPrometheusProvider(seriesRegistry ExternalSeriesRegistry, promClient prom.Client, converter MetricConverter) provider.ExternalMetricsProvider { return &externalPrometheusProvider{ promClient: promClient, seriesRegistry: seriesRegistry, @@ -33,18 +36,23 @@ func NewExternalPrometheusProvider(seriesRegistry ExternalSeriesRegistry, promCl } func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { - selector, found := p.seriesRegistry.QueryForMetric(namespace, metricName, metricSelector) + selector, found, err := p.seriesRegistry.QueryForMetric(namespace, metricName, metricSelector) + + if err != nil { + glog.Errorf("unable to generate a query for the metric: %v", err) + return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) + } if !found { - return &external_metrics.ExternalMetricValueList{ - Items: []external_metrics.ExternalMetricValue{}, - }, nil + return nil, provider.NewMetricNotFoundError(p.selectGroupResource(namespace), metricName) } queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) if err != nil { - return nil, err + glog.Errorf("unable to fetch metrics from prometheus: %v", err) + // don't leak implementation details to the user + return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) } return p.metricConverter.Convert(queryResults) @@ -53,3 +61,14 @@ func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricN func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { return p.seriesRegistry.ListAllMetrics() } + +func (p *externalPrometheusProvider) selectGroupResource(namespace string) schema.GroupResource { + if namespace == "" { + return nsGroupResource + } + + return schema.GroupResource{ + Group: "", + Resource: "", + } +} diff --git a/pkg/custom-provider/external_series_registry.go b/pkg/custom-provider/external_series_registry.go index ad1752c9..f87e1425 100644 --- a/pkg/custom-provider/external_series_registry.go +++ b/pkg/custom-provider/external_series_registry.go @@ -3,8 +3,6 @@ package provider import ( "sync" - "github.com/prometheus/common/model" - "github.com/golang/glog" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -14,126 +12,99 @@ import ( "github.com/directxman12/k8s-prometheus-adapter/pkg/config" ) -//ExternalSeriesRegistry acts as the top-level converter for transforming Kubernetes requests -//for external metrics into Prometheus queries. +// ExternalSeriesRegistry acts as the top-level converter for transforming Kubernetes requests +// for external metrics into Prometheus queries. type ExternalSeriesRegistry interface { // ListAllMetrics lists all metrics known to this registry ListAllMetrics() []provider.ExternalMetricInfo - QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (query prom.Selector, found bool) + QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (query prom.Selector, found bool, err error) } // overridableSeriesRegistry is a basic SeriesRegistry type externalSeriesRegistry struct { mu sync.RWMutex - // metrics is the list of all known metrics + // metrics is the list of all known metrics, ready to return from the API metrics []provider.ExternalMetricInfo + // rawMetrics is a lookup from a metric to SeriesConverter for the sake of generating queries + rawMetrics map[string]SeriesConverter mapper apimeta.RESTMapper - metricLister MetricListerWithNotification - externalMetricInfo ExternalInfoMap + metricLister MetricListerWithNotification } -//NewExternalSeriesRegistry creates an ExternalSeriesRegistry driven by the data from the provided MetricLister. +// NewExternalSeriesRegistry creates an ExternalSeriesRegistry driven by the data from the provided MetricLister. func NewExternalSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) ExternalSeriesRegistry { var registry = externalSeriesRegistry{ - mapper: mapper, - metricLister: lister, - externalMetricInfo: NewExternalInfoMap(), + mapper: mapper, + metricLister: lister, + metrics: make([]provider.ExternalMetricInfo, 0), + rawMetrics: map[string]SeriesConverter{}, } - lister.AddNotificationReceiver(registry.onNewDataAvailable) + lister.AddNotificationReceiver(registry.filterAndStoreMetrics) return ®istry } -func (r *externalSeriesRegistry) filterMetrics(result metricUpdateResult) metricUpdateResult { - namers := make([]MetricNamer, 0) +func (r *externalSeriesRegistry) filterMetrics(result MetricUpdateResult) MetricUpdateResult { + converters := make([]SeriesConverter, 0) series := make([][]prom.Series, 0) targetType := config.External - for i, namer := range result.namers { - if namer.MetricType() == targetType { - namers = append(namers, namer) + for i, converter := range result.converters { + if converter.MetricType() == targetType { + converters = append(converters, converter) series = append(series, result.series[i]) } } - return metricUpdateResult{ - namers: namers, - series: series, + return MetricUpdateResult{ + converters: converters, + series: series, } } -func (r *externalSeriesRegistry) convertLabels(labels model.LabelSet) labels.Set { - set := map[string]string{} - for key, value := range labels { - set[string(key)] = string(value) - } - return set -} - -func (r *externalSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { +func (r *externalSeriesRegistry) filterAndStoreMetrics(result MetricUpdateResult) { result = r.filterMetrics(result) newSeriesSlices := result.series - namers := result.namers + converters := result.converters - // if len(newSeriesSlices) != len(namers) { - // return fmt.Errorf("need one set of series per namer") + // if len(newSeriesSlices) != len(converters) { + // return fmt.Errorf("need one set of series per converter") // } + apiMetricsCache := make([]provider.ExternalMetricInfo, 0) + rawMetricsCache := make(map[string]SeriesConverter) - updatedCache := NewExternalInfoMap() for i, newSeries := range newSeriesSlices { - namer := namers[i] + converter := converters[i] for _, series := range newSeries { - identity, err := namer.IdentifySeries(series) + identity, err := converter.IdentifySeries(series) if err != nil { glog.Errorf("unable to name series %q, skipping: %v", series.String(), err) continue } - // resources := identity.resources - // namespaced := identity.namespaced name := identity.name - labels := r.convertLabels(series.Labels) - - //Check for a label indicating namespace. - metricNs, found := series.Labels[model.LabelName(namer.ExternalMetricNamespaceLabelName())] - - if !found { - metricNs = "" - } - - trackedMetric := updatedCache.TrackMetric(name, namer) - trackedMetric.WithNamespacedSeries(string(metricNs), labels) + rawMetricsCache[name] = converter } } - // regenerate metrics - allMetrics := updatedCache.ExportMetrics() - convertedMetrics := r.convertMetrics(allMetrics) + for metricName := range rawMetricsCache { + apiMetricsCache = append(apiMetricsCache, provider.ExternalMetricInfo{ + Metric: metricName, + }) + } r.mu.Lock() defer r.mu.Unlock() - r.externalMetricInfo = updatedCache - r.metrics = convertedMetrics -} - -func (r *externalSeriesRegistry) convertMetrics(metrics []ExportedMetric) []provider.ExternalMetricInfo { - results := make([]provider.ExternalMetricInfo, len(metrics)) - for i, info := range metrics { - results[i] = provider.ExternalMetricInfo{ - Labels: info.Labels, - Metric: info.MetricName, - } - } - - return results + r.metrics = apiMetricsCache + r.rawMetrics = rawMetricsCache } func (r *externalSeriesRegistry) ListAllMetrics() []provider.ExternalMetricInfo { @@ -143,22 +114,17 @@ func (r *externalSeriesRegistry) ListAllMetrics() []provider.ExternalMetricInfo return r.metrics } -func (r *externalSeriesRegistry) QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (query prom.Selector, found bool) { +func (r *externalSeriesRegistry) QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (query prom.Selector, found bool, err error) { r.mu.RLock() defer r.mu.RUnlock() - metric, found := r.externalMetricInfo.FindMetric(metricName) + + converter, found := r.rawMetrics[metricName] if !found { - glog.V(10).Infof("external metric %q not registered", metricName) - return "", false + glog.V(10).Infof("external metric %q not found", metricName) + return "", false, nil } - query, err := metric.GenerateQuery(namespace, metricSelector) - - if err != nil { - glog.Errorf("unable to construct query for external metric %s: %v", metricName, err) - return "", false - } - - return query, true + query, err = converter.QueryForExternalSeries(namespace, metricName, metricSelector) + return query, found, err } diff --git a/pkg/custom-provider/metric-converter/matrix_converter.go b/pkg/custom-provider/metric-converter/matrix_converter.go deleted file mode 100644 index bd5f9c9b..00000000 --- a/pkg/custom-provider/metric-converter/matrix_converter.go +++ /dev/null @@ -1,37 +0,0 @@ -package provider - -import ( - "errors" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/prometheus/common/model" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -type matrixConverter struct { -} - -//NewMatrixConverter creates a MatrixConverter capable of converting -//matrix Prometheus query results into external metric types. -func NewMatrixConverter() MetricConverter { - return &matrixConverter{} -} - -func (c *matrixConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { - if queryResult.Type != model.ValMatrix { - return nil, errors.New("matrixConverter can only convert scalar query results") - } - - toConvert := queryResult.Matrix - - if toConvert == nil { - return nil, errors.New("the provided input did not contain matrix query results") - } - - return c.convert(toConvert) -} - -func (c *matrixConverter) convert(result *model.Matrix) (*external_metrics.ExternalMetricValueList, error) { - //TODO: AC - Implementation. - return nil, errors.New("converting Matrix results is not yet supported") -} diff --git a/pkg/custom-provider/metric-converter/metric_converter.go b/pkg/custom-provider/metric-converter/metric_converter.go deleted file mode 100644 index 410588b4..00000000 --- a/pkg/custom-provider/metric-converter/metric_converter.go +++ /dev/null @@ -1,52 +0,0 @@ -package provider - -import ( - "errors" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/prometheus/common/model" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -//MetricConverter provides a unified interface for converting the results of -//Prometheus queries into external metric types. -type MetricConverter interface { - Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) -} - -type metricConverter struct { - scalarConverter MetricConverter - vectorConverter MetricConverter - matrixConverter MetricConverter -} - -func DefaultMetricConverter() MetricConverter { - sampleConverter := NewSampleConverter() - return NewMetricConverter(NewScalarConverter(), NewVectorConverter(&sampleConverter), NewMatrixConverter()) -} - -//NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types -//returned by the Prometheus client into external metrics types. -func NewMetricConverter(scalar MetricConverter, vector MetricConverter, matrix MetricConverter) MetricConverter { - return &metricConverter{ - scalarConverter: scalar, - vectorConverter: vector, - matrixConverter: matrix, - } -} - -func (c *metricConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { - if queryResult.Type == model.ValScalar { - return c.scalarConverter.Convert(queryResult) - } - - if queryResult.Type == model.ValVector { - return c.vectorConverter.Convert(queryResult) - } - - if queryResult.Type == model.ValMatrix { - return c.matrixConverter.Convert(queryResult) - } - - return nil, errors.New("encountered an unexpected query result type") -} diff --git a/pkg/custom-provider/metric-converter/sample_converter.go b/pkg/custom-provider/metric-converter/sample_converter.go deleted file mode 100644 index 29444eb3..00000000 --- a/pkg/custom-provider/metric-converter/sample_converter.go +++ /dev/null @@ -1,48 +0,0 @@ -package provider - -import ( - "github.com/prometheus/common/model" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -type sampleConverter struct { -} - -//SampleConverter is capable of translating Prometheus Sample objects -//into ExternamMetricValue objects. -type SampleConverter interface { - Convert(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) -} - -//NewSampleConverter creates a SampleConverter capable of translating Prometheus Sample objects -//into ExternamMetricValue objects. -func NewSampleConverter() SampleConverter { - return &sampleConverter{} -} - -func (c *sampleConverter) Convert(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) { - labels := c.convertLabels(sample.Metric) - - singleMetric := external_metrics.ExternalMetricValue{ - MetricName: string(sample.Metric[model.LabelName("__name__")]), - Timestamp: metav1.Time{ - sample.Timestamp.Time(), - }, - Value: *resource.NewMilliQuantity(int64(sample.Value*1000.0), resource.DecimalSI), - MetricLabels: labels, - } - - return &singleMetric, nil -} - -func (c *sampleConverter) convertLabels(inLabels model.Metric) map[string]string { - numLabels := len(inLabels) - outLabels := make(map[string]string, numLabels) - for labelName, labelVal := range inLabels { - outLabels[string(labelName)] = string(labelVal) - } - - return outLabels -} diff --git a/pkg/custom-provider/metric-converter/scalar_converter.go b/pkg/custom-provider/metric-converter/scalar_converter.go deleted file mode 100644 index 284f55d5..00000000 --- a/pkg/custom-provider/metric-converter/scalar_converter.go +++ /dev/null @@ -1,48 +0,0 @@ -package provider - -import ( - "errors" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/prometheus/common/model" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -type scalarConverter struct { -} - -//NewScalarConverter creates a ScalarConverter capable of converting -//scalar Prometheus query results into external metric types. -func NewScalarConverter() MetricConverter { - return &scalarConverter{} -} - -func (c *scalarConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { - if queryResult.Type != model.ValScalar { - return nil, errors.New("scalarConverter can only convert scalar query results") - } - - toConvert := queryResult.Scalar - - if toConvert == nil { - return nil, errors.New("the provided input did not contain scalar query results") - } - - return c.convert(toConvert) -} - -func (c *scalarConverter) convert(input *model.Scalar) (*external_metrics.ExternalMetricValueList, error) { - result := external_metrics.ExternalMetricValueList{ - Items: []external_metrics.ExternalMetricValue{ - { - Timestamp: metav1.Time{ - input.Timestamp.Time(), - }, - Value: *resource.NewMilliQuantity(int64(input.Value*1000.0), resource.DecimalSI), - }, - }, - } - return &result, nil -} diff --git a/pkg/custom-provider/metric-converter/vector_converter.go b/pkg/custom-provider/metric-converter/vector_converter.go deleted file mode 100644 index 9171d5ca..00000000 --- a/pkg/custom-provider/metric-converter/vector_converter.go +++ /dev/null @@ -1,64 +0,0 @@ -package provider - -import ( - "errors" - "fmt" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/prometheus/common/model" - "k8s.io/metrics/pkg/apis/external_metrics" -) - -type vectorConverter struct { - SampleConverter SampleConverter -} - -//NewVectorConverter creates a VectorConverter capable of converting -//vector Prometheus query results into external metric types. -func NewVectorConverter(sampleConverter *SampleConverter) MetricConverter { - return &vectorConverter{ - SampleConverter: *sampleConverter, - } -} - -func (c *vectorConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { - if queryResult.Type != model.ValVector { - return nil, errors.New("vectorConverter can only convert scalar query results") - } - - toConvert := *queryResult.Vector - - if toConvert == nil { - return nil, errors.New("the provided input did not contain vector query results") - } - - return c.convert(toConvert) -} - -func (c *vectorConverter) convert(result model.Vector) (*external_metrics.ExternalMetricValueList, error) { - items := []external_metrics.ExternalMetricValue{} - metricValueList := external_metrics.ExternalMetricValueList{ - Items: items, - } - - numSamples := result.Len() - if numSamples == 0 { - return &metricValueList, nil - } - - for _, val := range result { - - singleMetric, err := c.SampleConverter.Convert(val) - - if err != nil { - return nil, fmt.Errorf("unable to convert vector: %v", err) - } - - items = append(items, *singleMetric) - } - - metricValueList = external_metrics.ExternalMetricValueList{ - Items: items, - } - return &metricValueList, nil -} diff --git a/pkg/custom-provider/metric_converter.go b/pkg/custom-provider/metric_converter.go new file mode 100644 index 00000000..44238f1b --- /dev/null +++ b/pkg/custom-provider/metric_converter.go @@ -0,0 +1,126 @@ +package provider + +import ( + "errors" + "fmt" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/prometheus/common/model" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +// MetricConverter provides a unified interface for converting the results of +// Prometheus queries into external metric types. +type MetricConverter interface { + Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) +} + +type metricConverter struct { +} + +// NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types +// returned by the Prometheus client into external metrics types. +func NewMetricConverter() MetricConverter { + return &metricConverter{} +} + +func (c *metricConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type == model.ValScalar { + return c.convertScalar(queryResult) + } + + if queryResult.Type == model.ValVector { + return c.convertVector(queryResult) + } + + return nil, errors.New("encountered an unexpected query result type") +} + +func (c *metricConverter) convertSample(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) { + labels := c.convertLabels(sample.Metric) + + singleMetric := external_metrics.ExternalMetricValue{ + MetricName: string(sample.Metric[model.LabelName("__name__")]), + Timestamp: metav1.Time{ + sample.Timestamp.Time(), + }, + Value: *resource.NewMilliQuantity(int64(sample.Value*1000.0), resource.DecimalSI), + MetricLabels: labels, + } + + return &singleMetric, nil +} + +func (c *metricConverter) convertLabels(inLabels model.Metric) map[string]string { + numLabels := len(inLabels) + outLabels := make(map[string]string, numLabels) + for labelName, labelVal := range inLabels { + outLabels[string(labelName)] = string(labelVal) + } + + return outLabels +} + +func (c *metricConverter) convertVector(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type != model.ValVector { + return nil, errors.New("incorrect query result type") + } + + toConvert := *queryResult.Vector + + if toConvert == nil { + return nil, errors.New("the provided input did not contain vector query results") + } + + items := []external_metrics.ExternalMetricValue{} + metricValueList := external_metrics.ExternalMetricValueList{ + Items: items, + } + + numSamples := toConvert.Len() + if numSamples == 0 { + return &metricValueList, nil + } + + for _, val := range toConvert { + + singleMetric, err := c.convertSample(val) + + if err != nil { + return nil, fmt.Errorf("unable to convert vector: %v", err) + } + + items = append(items, *singleMetric) + } + + metricValueList = external_metrics.ExternalMetricValueList{ + Items: items, + } + return &metricValueList, nil +} + +func (c *metricConverter) convertScalar(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type != model.ValScalar { + return nil, errors.New("scalarConverter can only convert scalar query results") + } + + toConvert := queryResult.Scalar + + if toConvert == nil { + return nil, errors.New("the provided input did not contain scalar query results") + } + + result := external_metrics.ExternalMetricValueList{ + Items: []external_metrics.ExternalMetricValue{ + { + Timestamp: metav1.Time{ + toConvert.Timestamp.Time(), + }, + Value: *resource.NewMilliQuantity(int64(toConvert.Value*1000.0), resource.DecimalSI), + }, + }, + } + return &result, nil +} diff --git a/pkg/custom-provider/metric_name_converter.go b/pkg/custom-provider/metric_name_converter.go deleted file mode 100644 index 6a3c26f6..00000000 --- a/pkg/custom-provider/metric_name_converter.go +++ /dev/null @@ -1,63 +0,0 @@ -package provider - -import ( - "fmt" - "regexp" - - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/directxman12/k8s-prometheus-adapter/pkg/config" -) - -//MetricNameConverter provides functions for naming custom metrics from Promethes series. -type MetricNameConverter interface { - GetMetricNameForSeries(series prom.Series) (string, error) -} - -type metricNameConverter struct { - nameMatches *regexp.Regexp - nameAs string -} - -//NewMetricNameConverter creates a MetricNameConverter capable of translating Prometheus series names -//into custom metric names. -func NewMetricNameConverter(mapping config.NameMapping) (MetricNameConverter, error) { - var nameMatches *regexp.Regexp - var err error - if mapping.Matches != "" { - nameMatches, err = regexp.Compile(mapping.Matches) - if err != nil { - return nil, fmt.Errorf("unable to compile series name match expression %q: %v", mapping.Matches, err) - } - } else { - // this will always succeed - nameMatches = regexp.MustCompile(".*") - } - nameAs := mapping.As - if nameAs == "" { - // check if we have an obvious default - subexpNames := nameMatches.SubexpNames() - if len(subexpNames) == 1 { - // no capture groups, use the whole thing - nameAs = "$0" - } else if len(subexpNames) == 2 { - // one capture group, use that - nameAs = "$1" - } else { - return nil, fmt.Errorf("must specify an 'as' value for name matcher %q", mapping.Matches) - } - } - - return &metricNameConverter{ - nameMatches: nameMatches, - nameAs: nameAs, - }, nil -} - -func (c *metricNameConverter) GetMetricNameForSeries(series prom.Series) (string, error) { - matches := c.nameMatches.FindStringSubmatchIndex(series.Name) - if matches == nil { - return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, c.nameMatches.String()) - } - outNameBytes := c.nameMatches.ExpandString(nil, c.nameAs, series.Name, matches) - return string(outNameBytes), nil -} diff --git a/pkg/custom-provider/metric_name_converter_test.go b/pkg/custom-provider/metric_name_converter_test.go index 8a082987..db2b4c18 100644 --- a/pkg/custom-provider/metric_name_converter_test.go +++ b/pkg/custom-provider/metric_name_converter_test.go @@ -11,8 +11,8 @@ import ( func TestWhenNoMappingMetricNameIsUnaltered(t *testing.T) { emptyMapping := config.NameMapping{} - RunTest(t, emptyMapping, "my_series", "my_series") - RunTest(t, emptyMapping, "your_series", "your_series") + runTest(t, emptyMapping, "my_series", "my_series") + runTest(t, emptyMapping, "your_series", "your_series") } func TestWhenMappingWithOneCaptureGroupMetricNameIsCorrect(t *testing.T) { @@ -21,52 +21,52 @@ func TestWhenMappingWithOneCaptureGroupMetricNameIsCorrect(t *testing.T) { As: "your_$1", } - RunTest(t, mapping, "my_requests_per_second", "your_requests_per_second") + runTest(t, mapping, "my_requests_per_second", "your_requests_per_second") } func TestWhenMappingWithMultipleCaptureGroupsMetricNameIsCorrect(t *testing.T) { - //ExpandString has some strange behavior when using the $1, $2 syntax - //Specifically, it doesn't return the expected values for templates like: - //$1_$2 - //You can work around it by using the ${1} syntax. + // ExpandString has some strange behavior when using the $1, $2 syntax + // Specifically, it doesn't return the expected values for templates like: + // $1_$2 + // You can work around it by using the ${1} syntax. mapping := config.NameMapping{ Matches: "my_([^_]+)_([^_]+)", As: "your_${1}_is_${2}_large", } - RunTest(t, mapping, "my_horse_very", "your_horse_is_very_large") - RunTest(t, mapping, "my_dog_not", "your_dog_is_not_large") + runTest(t, mapping, "my_horse_very", "your_horse_is_very_large") + runTest(t, mapping, "my_dog_not", "your_dog_is_not_large") } func TestAsCanBeInferred(t *testing.T) { - //When we've got one capture group, we should infer that as the target. + // When we've got one capture group, we should infer that as the target. mapping := config.NameMapping{ Matches: "my_(.+)", } - RunTest(t, mapping, "my_test_metric", "test_metric") + runTest(t, mapping, "my_test_metric", "test_metric") - //When we have no capture groups, we should infer that the whole thing as the target. + // When we have no capture groups, we should infer that the whole thing as the target. mapping = config.NameMapping{ Matches: "my_metric", } - RunTest(t, mapping, "my_metric", "my_metric") + runTest(t, mapping, "my_metric", "my_metric") } func TestWhenAsCannotBeInferredError(t *testing.T) { - //More than one capture group should - //result in us giving up on making an educated guess. + // More than one capture group should + // result in us giving up on making an educated guess. mapping := config.NameMapping{ Matches: "my_([^_]+)_([^_]+)", } - RunTestExpectingError(t, mapping, "my_horse_very") - RunTestExpectingError(t, mapping, "my_dog_not") + runTestExpectingError(t, mapping, "my_horse_very") + runTestExpectingError(t, mapping, "my_dog_not") } -func RunTest(t *testing.T, mapping config.NameMapping, input string, expectedResult string) { - converter, err := NewMetricNameConverter(mapping) +func runTest(t *testing.T, mapping config.NameMapping, input string, expectedResult string) { + converter, err := NewMetricNamer(mapping) require.NoError(t, err) series := prom.Series{ @@ -79,7 +79,7 @@ func RunTest(t *testing.T, mapping config.NameMapping, input string, expectedRes require.Equal(t, expectedResult, actualResult) } -func RunTestExpectingError(t *testing.T, mapping config.NameMapping, input string) { - _, err := NewMetricNameConverter(mapping) +func runTestExpectingError(t *testing.T, mapping config.NameMapping, input string) { + _, err := NewMetricNamer(mapping) require.Error(t, err) } diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index 248a9988..3fce9473 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -2,284 +2,62 @@ package provider import ( "fmt" - - apimeta "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" + "regexp" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" "github.com/directxman12/k8s-prometheus-adapter/pkg/config" ) -var nsGroupResource = schema.GroupResource{Resource: "namespaces"} - -// MetricNamer knows how to convert Prometheus series names and label names to -// metrics API resources, and vice-versa. MetricNamers should be safe to access -// concurrently. Returned group-resources are "normalized" as per the -// MetricInfo#Normalized method. Group-resources passed as arguments must -// themselves be normalized. +// MetricNamer provides functions for naming custom metrics from Promethes series. type MetricNamer interface { - // Selector produces the appropriate Prometheus series selector to match all - // series handlable by this namer. - Selector() prom.Selector - // FilterSeries checks to see which of the given series match any additional - // constrains beyond the series query. It's assumed that the series given - // already matche the series query. - // FilterSeries(series []prom.Series) []prom.Series - SeriesFilterer() SeriesFilterer - ResourceConverter() ResourceConverter - - // MetricNameForSeries returns the name (as presented in the API) for a given series. - // MetricNameForSeries(series prom.Series) (string, error) - // QueryForSeries returns the query for a given series (not API metric name), with - // the given namespace name (if relevant), resource, and resource names. - QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) - QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error) - IdentifySeries(series prom.Series) (seriesIdentity, error) - MetricType() config.MetricType - ExternalMetricNamespaceLabelName() string -} - -type seriesIdentity struct { - resources []schema.GroupResource - namespaced bool - name string -} - -func (n *metricNamer) Selector() prom.Selector { - return n.seriesQuery + GetMetricNameForSeries(series prom.Series) (string, error) } type metricNamer struct { - seriesQuery prom.Selector - - resourceConverter ResourceConverter - queryBuilder QueryBuilder - seriesFilterer SeriesFilterer - metricNameConverter MetricNameConverter - mapper apimeta.RESTMapper - - metricType config.MetricType - externalMetricNamespaceLabel string + nameMatches *regexp.Regexp + nameAs string } -// queryTemplateArgs are the arguments for the metrics query template. -type queryTemplateArgs struct { - Series string - LabelMatchers string - LabelValuesByName map[string][]string - GroupBy string - GroupBySlice []string -} - -func (n *metricNamer) MetricType() config.MetricType { - return n.metricType -} - -func (n *metricNamer) ExternalMetricNamespaceLabelName() string { - return n.externalMetricNamespaceLabel -} - -func (n *metricNamer) IdentifySeries(series prom.Series) (seriesIdentity, error) { - // TODO: warn if it doesn't match any resources - resources, namespaced := n.resourceConverter.ResourcesForSeries(series) - name, err := n.metricNameConverter.GetMetricNameForSeries(series) - - result := seriesIdentity{ - resources: resources, - namespaced: namespaced, - name: name, - } - - return result, err -} - -func (n *metricNamer) SeriesFilterer() SeriesFilterer { - return n.seriesFilterer -} - -func (n *metricNamer) ResourceConverter() ResourceConverter { - return n.resourceConverter -} - -func (n *metricNamer) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart { - requirements, _ := metricSelector.Requirements() - - selectors := []queryPart{} - for i := 0; i < len(requirements); i++ { - selector := n.convertRequirement(requirements[i]) - - selectors = append(selectors, selector) - } - - return selectors -} - -func (n *metricNamer) convertRequirement(requirement labels.Requirement) queryPart { - labelName := requirement.Key() - values := requirement.Values().List() - - return queryPart{ - labelName: labelName, - values: values, - } -} - -type queryPart struct { - labelName string - values []string -} - -func (n *metricNamer) buildNamespaceQueryPartForSeries(namespace string) (queryPart, error) { - result := queryPart{} - - //If we've been given a namespace, then we need to set up - //the label requirements to target that namespace. - if namespace != "" { - namespaceLbl, err := n.ResourceConverter().LabelForResource(nsGroupResource) +// NewMetricNamer creates a MetricNamer capable of translating Prometheus series names +// into custom metric names. +func NewMetricNamer(mapping config.NameMapping) (MetricNamer, error) { + var nameMatches *regexp.Regexp + var err error + if mapping.Matches != "" { + nameMatches, err = regexp.Compile(mapping.Matches) if err != nil { - return result, err + return nil, fmt.Errorf("unable to compile series name match expression %q: %v", mapping.Matches, err) } - - values := []string{namespace} - - result = queryPart{ - values: values, - labelName: string(namespaceLbl), + } else { + // this will always succeed + nameMatches = regexp.MustCompile(".*") + } + nameAs := mapping.As + if nameAs == "" { + // check if we have an obvious default + subexpNames := nameMatches.SubexpNames() + if len(subexpNames) == 1 { + // no capture groups, use the whole thing + nameAs = "$0" + } else if len(subexpNames) == 2 { + // one capture group, use that + nameAs = "$1" + } else { + return nil, fmt.Errorf("must specify an 'as' value for name matcher %q", mapping.Matches) } } - return result, nil -} - -func (n *metricNamer) buildResourceQueryPartForSeries(resource schema.GroupResource, names ...string) (queryPart, error) { - result := queryPart{} - - //If we've been given a resource, then we need to set up - //the label requirements to target that resource. - resourceLbl, err := n.ResourceConverter().LabelForResource(resource) - if err != nil { - return result, err - } - - result = queryPart{ - labelName: string(resourceLbl), - values: names, - } - - return result, nil -} - -func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { - queryParts := []queryPart{} - - //Build up the namespace part of the query. - namespaceQueryPart, err := n.buildNamespaceQueryPartForSeries(namespace) - if err != nil { - return "", err - } - - queryParts = append(queryParts, namespaceQueryPart) - - //Build up the resource part of the query. - resourceQueryPart, err := n.buildResourceQueryPartForSeries(resource, names...) - if err != nil { - return "", err - } - - queryParts = append(queryParts, resourceQueryPart) - - return n.queryBuilder.BuildSelector(series, resourceQueryPart.labelName, []string{resourceQueryPart.labelName}, queryParts) -} - -// NamersFromConfig produces a MetricNamer for each rule in the given config. -func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]MetricNamer, error) { - namers := make([]MetricNamer, len(cfg.Rules)) - for i, rule := range cfg.Rules { - var err error - - resourceConverter, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper) - if err != nil { - return nil, fmt.Errorf("unable to create ResourceConverter associated with series query %q: %v", rule.SeriesQuery, err) - } - - queryBuilder, err := NewQueryBuilder(rule.MetricsQuery) - if err != nil { - return nil, fmt.Errorf("unable to create a QueryBuilder associated with series query %q: %v", rule.SeriesQuery, err) - } - - seriesFilterer, err := NewSeriesFilterer(rule.SeriesFilters) - if err != nil { - return nil, fmt.Errorf("unable to create a SeriesFilter associated with series query %q: %v", rule.SeriesQuery, err) - } - - if rule.Name.Matches != "" { - err := seriesFilterer.AddRequirement(config.RegexFilter{Is: rule.Name.Matches}) - if err != nil { - return nil, fmt.Errorf("unable to apply the series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err) - } - } - - metricNameConverter, err := NewMetricNameConverter(rule.Name) - if err != nil { - return nil, fmt.Errorf("unable to create a MetricNameConverter associated with series query %q: %v", rule.SeriesQuery, err) - } - - namespaceLabel := "" - if rule.MetricType == config.External { - namespaceLabel = rule.ExternalMetricNamespaceLabelName - } - - metricType := rule.MetricType - if metricType == config.MetricType("") { - metricType = config.Custom - } - - namer := &metricNamer{ - seriesQuery: prom.Selector(rule.SeriesQuery), - mapper: mapper, - - resourceConverter: resourceConverter, - queryBuilder: queryBuilder, - seriesFilterer: seriesFilterer, - metricNameConverter: metricNameConverter, - metricType: metricType, - externalMetricNamespaceLabel: namespaceLabel, - } - - namers[i] = namer - } - - return namers, nil -} - -func (n *metricNamer) buildNamespaceQueryPartForExternalSeries(namespace string) (queryPart, error) { - return queryPart{ - labelName: n.externalMetricNamespaceLabel, - values: []string{namespace}, + return &metricNamer{ + nameMatches: nameMatches, + nameAs: nameAs, }, nil } -func (n *metricNamer) QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error) { - queryParts := []queryPart{} - - if namespace != "" { - //Build up the namespace part of the query. - namespaceQueryPart, err := n.buildNamespaceQueryPartForExternalSeries(namespace) - if err != nil { - return "", err - } - - queryParts = append(queryParts, namespaceQueryPart) +func (c *metricNamer) GetMetricNameForSeries(series prom.Series) (string, error) { + matches := c.nameMatches.FindStringSubmatchIndex(series.Name) + if matches == nil { + return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, c.nameMatches.String()) } - - //Build up the query parts from the selector. - queryParts = append(queryParts, n.createQueryPartsFromSelector(metricSelector)...) - - selector, err := n.queryBuilder.BuildSelector(series, "", []string{}, queryParts) - if err != nil { - return "", err - } - - return selector, nil + outNameBytes := c.nameMatches.ExpandString(nil, c.nameAs, series.Name, matches) + return string(outNameBytes), nil } diff --git a/pkg/custom-provider/periodic_metric_lister.go b/pkg/custom-provider/periodic_metric_lister.go index 2100865b..5a719490 100644 --- a/pkg/custom-provider/periodic_metric_lister.go +++ b/pkg/custom-provider/periodic_metric_lister.go @@ -26,27 +26,27 @@ import ( type periodicMetricLister struct { realLister MetricLister updateInterval time.Duration - mostRecentResult metricUpdateResult - callbacks []func(metricUpdateResult) + mostRecentResult MetricUpdateResult + callbacks []MetricUpdateCallback } -//NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics -//at the provided interval, but defers the actual act of retrieving the metrics to the supplied MetricLister. +// NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics +// at the provided interval, but defers the actual act of retrieving the metrics to the supplied MetricLister. func NewPeriodicMetricLister(realLister MetricLister, updateInterval time.Duration) (MetricListerWithNotification, Runnable) { lister := periodicMetricLister{ updateInterval: updateInterval, realLister: realLister, - callbacks: make([]func(metricUpdateResult), 0), + callbacks: make([]MetricUpdateCallback, 0), } return &lister, &lister } -func (l *periodicMetricLister) AddNotificationReceiver(callback func(metricUpdateResult)) { +func (l *periodicMetricLister) AddNotificationReceiver(callback MetricUpdateCallback) { l.callbacks = append(l.callbacks, callback) } -func (l *periodicMetricLister) ListAllMetrics() (metricUpdateResult, error) { +func (l *periodicMetricLister) ListAllMetrics() (MetricUpdateResult, error) { return l.mostRecentResult, nil } diff --git a/pkg/custom-provider/periodic_metric_lister_test.go b/pkg/custom-provider/periodic_metric_lister_test.go index 2aa1b098..e26d208c 100644 --- a/pkg/custom-provider/periodic_metric_lister_test.go +++ b/pkg/custom-provider/periodic_metric_lister_test.go @@ -12,10 +12,10 @@ type fakeLister struct { callCount int } -func (f *fakeLister) ListAllMetrics() (metricUpdateResult, error) { +func (f *fakeLister) ListAllMetrics() (MetricUpdateResult, error) { f.callCount++ - return metricUpdateResult{ + return MetricUpdateResult{ series: [][]prom.Series{ []prom.Series{ prom.Series{ @@ -32,7 +32,7 @@ func TestWhenNewMetricsAvailableCallbackIsInvoked(t *testing.T) { periodicLister := targetLister.(*periodicMetricLister) callbackInvoked := false - callback := func(r metricUpdateResult) { + callback := func(r MetricUpdateResult) { callbackInvoked = true } @@ -47,21 +47,21 @@ func TestWhenListingMetricsReturnsCachedValues(t *testing.T) { targetLister, _ := NewPeriodicMetricLister(fakeLister, time.Duration(1000)) periodicLister := targetLister.(*periodicMetricLister) - //We haven't invoked the inner lister yet, so we should have no results. + // We haven't invoked the inner lister yet, so we should have no results. resultBeforeUpdate, err := periodicLister.ListAllMetrics() require.NoError(t, err) require.Equal(t, 0, len(resultBeforeUpdate.series)) require.Equal(t, 0, fakeLister.callCount) - //We can simulate waiting for the udpate interval to pass... - //which should result in calling the inner lister to get the metrics. + // We can simulate waiting for the udpate interval to pass... + // which should result in calling the inner lister to get the metrics. err = periodicLister.updateMetrics() require.NoError(t, err) require.Equal(t, 1, fakeLister.callCount) - //If we list now, we should return the cached values. - //Make sure we got some results this time - //as well as that we didn't unnecessarily invoke the inner lister. + // If we list now, we should return the cached values. + // Make sure we got some results this time + // as well as that we didn't unnecessarily invoke the inner lister. resultAfterUpdate, err := periodicLister.ListAllMetrics() require.NoError(t, err) require.NotEqual(t, 0, len(resultAfterUpdate.series)) diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 15ec3cec..a22de1e2 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -46,10 +46,10 @@ type prometheusProvider struct { SeriesRegistry } -//NewPrometheusProvider creates an CustomMetricsProvider capable of responding to Kubernetes requests for custom metric data. -func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) { - //TODO: AC - Consider injecting these objects and calling .Run() before calling this function. - basicLister := NewBasicMetricLister(promClient, namers, updateInterval) +// NewPrometheusProvider creates an CustomMetricsProvider capable of responding to Kubernetes requests for custom metric data. +func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, converters []SeriesConverter, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) { + // TODO: Consider injecting these objects and calling .Run() on the runnables before calling this function. + basicLister := NewBasicMetricLister(promClient, converters, updateInterval) periodicLister, _ := NewPeriodicMetricLister(basicLister, updateInterval) seriesRegistry := NewBasicSeriesRegistry(periodicLister, mapper) diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go index 2ef6361d..d050ab4a 100644 --- a/pkg/custom-provider/provider_test.go +++ b/pkg/custom-provider/provider_test.go @@ -92,10 +92,10 @@ func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fak fakeKubeClient := &fakedyn.FakeDynamicClient{} cfg := config.DefaultConfig(1*time.Minute, "") - namers, err := NamersFromConfig(cfg, restMapper()) + converters, err := ConvertersFromConfig(cfg, restMapper()) require.NoError(t, err) - prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval) + prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, converters, fakeProviderUpdateInterval) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) diff --git a/pkg/custom-provider/query_builder.go b/pkg/custom-provider/query_builder.go index ebaa8037..50481c6d 100644 --- a/pkg/custom-provider/query_builder.go +++ b/pkg/custom-provider/query_builder.go @@ -2,14 +2,17 @@ package provider import ( "bytes" + "errors" "fmt" "strings" "text/template" + "k8s.io/apimachinery/pkg/selection" + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" ) -//QueryBuilder provides functions for generating Prometheus queries. +// QueryBuilder provides functions for generating Prometheus queries. type QueryBuilder interface { BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error) } @@ -18,7 +21,7 @@ type queryBuilder struct { metricsQueryTemplate *template.Template } -//NewQueryBuilder creates a QueryBuilder. +// NewQueryBuilder creates a QueryBuilder. func NewQueryBuilder(metricsQuery string) (QueryBuilder, error) { metricsQueryTemplate, err := template.New("metrics-query").Delims("<<", ">>").Parse(metricsQuery) if err != nil { @@ -32,7 +35,11 @@ func NewQueryBuilder(metricsQuery string) (QueryBuilder, error) { func (n *queryBuilder) BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error) { //Convert our query parts into the types we need for our template. - exprs, valuesByName := n.processQueryParts(queryParts) + exprs, valuesByName, err := n.processQueryParts(queryParts) + + if err != nil { + return "", err + } args := queryTemplateArgs{ Series: seriesName, @@ -64,7 +71,12 @@ func (n *queryBuilder) createSelectorFromTemplateArgs(args queryTemplateArgs) (p return prom.Selector(queryBuff.String()), nil } -func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[string][]string) { +func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[string][]string, error) { + //We've take the approach here that if we can't perfectly map their query into a Prometheus + //query that we should abandon the effort completely. + //The concern is that if we don't get a perfect match on their query parameters, the query result + //might contain unexpected data that would cause them to take an erroneous action based on the result. + //Contains the expressions that we want to include as part of the query to Prometheus. //e.g. "namespace=my-namespace" //e.g. "some_label=some-value" @@ -78,15 +90,23 @@ func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[ for _, qPart := range queryParts { //Be resilient against bad inputs. //We obviously can't generate label filters for these cases. - if qPart.labelName == "" || len(qPart.values) == 0 { - continue + if qPart.labelName == "" { + return nil, nil, NewLabelNotSpecifiedError() } - targetValue := qPart.values[0] - matcher := prom.LabelEq - if len(qPart.values) > 1 { - targetValue = strings.Join(qPart.values, "|") - matcher = prom.LabelMatches + if !n.operatorIsSupported(qPart.operator) { + return nil, nil, NewOperatorNotSupportedByPrometheusError() + } + + matcher, err := n.selectMatcher(qPart.operator, qPart.values) + + if err != nil { + return nil, nil, err + } + + targetValue, err := n.selectTargetValue(qPart.operator, qPart.values) + if err != nil { + return nil, nil, err } expression := matcher(qPart.labelName, targetValue) @@ -94,5 +114,92 @@ func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[ valuesByName[qPart.labelName] = qPart.values } - return exprs, valuesByName + return exprs, valuesByName, nil +} + +func (n *queryBuilder) selectMatcher(operator selection.Operator, values []string) (func(string, string) string, error) { + + numValues := len(values) + if numValues == 0 { + switch operator { + case selection.Exists: + return prom.LabelMatches, nil + case selection.DoesNotExist: + return prom.LabelNotMatches, nil + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn: + return nil, NewOperatorRequiresValuesError() + } + } else if numValues == 1 { + switch operator { + case selection.Equals, selection.DoubleEquals: + return prom.LabelEq, nil + case selection.NotEquals: + return prom.LabelNeq, nil + case selection.In, selection.Exists: + return prom.LabelMatches, nil + case selection.DoesNotExist, selection.NotIn: + return prom.LabelNotMatches, nil + } + } else { + //Since labels can only have one value, providing multiple + //values results in a regex match, even if that's not what the user + //asked for. + switch operator { + case selection.Equals, selection.DoubleEquals, selection.In, selection.Exists: + return prom.LabelMatches, nil + case selection.NotEquals, selection.DoesNotExist, selection.NotIn: + return prom.LabelNotMatches, nil + } + } + + return nil, errors.New("operator not supported by query builder") +} + +func (n *queryBuilder) selectTargetValue(operator selection.Operator, values []string) (string, error) { + numValues := len(values) + if numValues == 0 { + switch operator { + case selection.Exists, selection.DoesNotExist: + //Regex for any non-empty string. + //When the operator is LabelNotMatches this will select series without the label + //or with the label but a value of "". + //When the operator is LabelMatches this will select series with the label + //whose value is NOT "". + return ".+", nil + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn: + return "", NewOperatorRequiresValuesError() + } + } else if numValues == 1 { + switch operator { + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn: + //Pass the value through as-is. + //It's somewhat strange to do this for both the regex and equality + //operators, but if we do it this way it gives the user a little more control. + //They might choose to send an "IN" request and give a list of static values + //or they could send a single value that's a regex, giving them a passthrough + //for their label selector. + return values[0], nil + case selection.Exists, selection.DoesNotExist: + return "", errors.New("operator does not support values") + } + } else { + switch operator { + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn: + //Pass the value through as-is. + //It's somewhat strange to do this for both the regex and equality + //operators, but if we do it this way it gives the user a little more control. + //They might choose to send an "IN" request and give a list of static values + //or they could send a single value that's a regex, giving them a passthrough + //for their label selector. + return strings.Join(values, "|"), nil + case selection.Exists, selection.DoesNotExist: + return "", NewOperatorDoesNotSupportValuesError() + } + } + + return "", errors.New("operator not supported by query builder") +} + +func (n *queryBuilder) operatorIsSupported(operator selection.Operator) bool { + return operator != selection.GreaterThan && operator != selection.LessThan } diff --git a/pkg/custom-provider/query_builder_test.go b/pkg/custom-provider/query_builder_test.go index 308794c7..e9bc25cc 100644 --- a/pkg/custom-provider/query_builder_test.go +++ b/pkg/custom-provider/query_builder_test.go @@ -3,13 +3,15 @@ package provider import ( "testing" + "k8s.io/apimachinery/pkg/selection" + "github.com/directxman12/k8s-prometheus-adapter/pkg/client" "github.com/stretchr/testify/require" ) -func TestBadQueryPartsDontError(t *testing.T) { +func TestBadQueryPartsDontBuildQueries(t *testing.T) { builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") - selector, err := builder.BuildSelector("my_series", "", []string{}, []queryPart{ + _, err := builder.BuildSelector("my_series", "", []string{}, []queryPart{ queryPart{ labelName: "", values: nil, @@ -20,49 +22,305 @@ func TestBadQueryPartsDontError(t *testing.T) { }, }) - expectation := client.Selector("rate(my_series{}[2m])") - require.NoError(t, err) - require.Equal(t, selector, expectation) + require.Error(t, err) +} + +func runQueryBuilderTest(t *testing.T, queryParts []queryPart, expectation string) { + builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") + selector, err := builder.BuildSelector("my_series", "", []string{}, queryParts) + + expectError := expectation == "" + + if expectError { + require.Error(t, err) + } else { + selectorExpectation := client.Selector(expectation) + require.NoError(t, err) + require.Equal(t, selector, selectorExpectation) + } } func TestSimpleQuery(t *testing.T) { - builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") - - // builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)") - selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{}) - - expectation := client.Selector("rate(my_series{}[2m])") - require.Equal(t, selector, expectation) + runQueryBuilderTest(t, []queryPart{}, "") } -func TestSimpleQueryWithOneLabelValue(t *testing.T) { - builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") +//Equals +func TestEqualsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.Equals, + }, + }, "") +} - // builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)") - selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{ +func TestEqualsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ queryPart{ labelName: "target_label", values: []string{"one"}, + operator: selection.Equals, }, - }) - - expectation := client.Selector("rate(my_series{target_label=\"one\"}[2m])") - require.Equal(t, selector, expectation) + }, "rate(my_series{target_label=\"one\"}[2m])") } -func TestSimpleQueryWithMultipleLabelValues(t *testing.T) { - builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") - - // builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)") - selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{ +func TestEqualsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ queryPart{ labelName: "target_label", values: []string{"one", "two"}, + operator: selection.Equals, }, - }) + }, "rate(my_series{target_label=~\"one|two\"}[2m])") +} - expectation := client.Selector("rate(my_series{target_label=~\"one|two\"}[2m])") - require.Equal(t, selector, expectation) +//Double Equals +func TestDoubleEqualsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.DoubleEquals, + }, + }, "") +} + +func TestDoubleEqualsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + operator: selection.DoubleEquals, + }, + }, "rate(my_series{target_label=\"one\"}[2m])") +} + +func TestDoubleEqualsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.DoubleEquals, + }, + }, "rate(my_series{target_label=~\"one|two\"}[2m])") +} + +//Not Equals +func TestNotEqualsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.NotEquals, + }, + }, "") +} + +func TestNotEqualsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + operator: selection.NotEquals, + }, + }, "rate(my_series{target_label!=\"one\"}[2m])") +} + +func TestNotEqualsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.NotEquals, + }, + }, "rate(my_series{target_label!~\"one|two\"}[2m])") +} + +//In +func TestInQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.In, + }, + }, "") +} + +func TestInQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + operator: selection.In, + }, + }, "rate(my_series{target_label=~\"one\"}[2m])") +} + +func TestInQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.In, + }, + }, "rate(my_series{target_label=~\"one|two\"}[2m])") +} + +//NotIn +func TestNotInQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.NotIn, + }, + }, "") +} + +func TestNotInQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + operator: selection.NotIn, + }, + }, "rate(my_series{target_label!~\"one\"}[2m])") +} + +func TestNotInQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.NotIn, + }, + }, "rate(my_series{target_label!~\"one|two\"}[2m])") +} + +//Exists +func TestExistsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.Exists, + }, + }, "rate(my_series{target_label=~\".+\"}[2m])") +} + +func TestExistsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + operator: selection.Exists, + }, + }, "") +} + +func TestExistsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.Exists, + }, + }, "") +} + +//DoesNotExist +func TestDoesNotExistsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.DoesNotExist, + }, + }, "rate(my_series{target_label!~\".+\"}[2m])") +} + +func TestDoesNotExistsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + operator: selection.DoesNotExist, + }, + }, "") +} + +func TestDoesNotExistsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.DoesNotExist, + }, + }, "") +} + +//GreaterThan +func TestGreaterThanQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.GreaterThan, + }, + }, "") +} + +func TestGreaterThanQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + operator: selection.GreaterThan, + }, + }, "") +} + +func TestGreaterThanQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.GreaterThan, + }, + }, "") +} + +//LessThan +func TestLessThanQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{}, + operator: selection.LessThan, + }, + }, "") +} + +func TestLessThanQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + operator: selection.LessThan, + }, + }, "") +} + +func TestLessThanQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.LessThan, + }, + }, "") } func TestQueryWithGroupBy(t *testing.T) { @@ -72,6 +330,7 @@ func TestQueryWithGroupBy(t *testing.T) { queryPart{ labelName: "target_label", values: []string{"one", "two"}, + operator: selection.In, }, }) @@ -79,4 +338,4 @@ func TestQueryWithGroupBy(t *testing.T) { require.Equal(t, selector, expectation) } -//TODO: AC - Ensure that the LabelValuesByName and GroupBySlice placeholders function correctly. +// TODO: Ensure that the LabelValuesByName and GroupBySlice placeholders function correctly. diff --git a/pkg/custom-provider/resource_converter.go b/pkg/custom-provider/resource_converter.go index 6008ac02..92aa9a75 100644 --- a/pkg/custom-provider/resource_converter.go +++ b/pkg/custom-provider/resource_converter.go @@ -17,9 +17,9 @@ import ( pmodel "github.com/prometheus/common/model" ) -//ResourceConverter is a type for extracting associated Kubernetes GroupResource objects from -//Prometheus series and generating appropriate labels to target specific Kubernetes GroupResource -//objects. +// ResourceConverter is a type for extracting associated Kubernetes GroupResource objects from +// Prometheus series and generating appropriate labels to target specific Kubernetes GroupResource +// objects. type ResourceConverter interface { // ResourcesForSeries returns the group-resources associated with the given series, // as well as whether or not the given series has the "namespace" resource). @@ -37,7 +37,7 @@ type resourceConverter struct { labelTemplate *template.Template } -//NewResourceConverter creates a ResourceConverter that will use the provided parameters to map data between Prometheus and Kubernetes. +// NewResourceConverter creates a ResourceConverter that will use the provided parameters to map data between Prometheus and Kubernetes. func NewResourceConverter(resourceTemplate string, overrides map[string]config.GroupResource, mapper apimeta.RESTMapper) (ResourceConverter, error) { converter := &resourceConverter{ labelToResource: make(map[pmodel.LabelName]schema.GroupResource), diff --git a/pkg/custom-provider/series_converter.go b/pkg/custom-provider/series_converter.go new file mode 100644 index 00000000..e9434836 --- /dev/null +++ b/pkg/custom-provider/series_converter.go @@ -0,0 +1,295 @@ +package provider + +import ( + "fmt" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/selection" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +var nsGroupResource = schema.GroupResource{Resource: "namespaces"} + +// SeriesConverter knows how to convert Prometheus series names and label names to +// metrics API resources, and vice-versa. SeriesConverters should be safe to access +// concurrently. Returned group-resources are "normalized" as per the +// MetricInfo#Normalized method. Group-resources passed as arguments must +// themselves be normalized. +type SeriesConverter interface { + // Selector produces the appropriate Prometheus series selector to match all + // series handlable by this converter. + Selector() prom.Selector + // FilterSeries checks to see which of the given series match any additional + // constrains beyond the series query. It's assumed that the series given + // already matche the series query. + // FilterSeries(series []prom.Series) []prom.Series + SeriesFilterer() SeriesFilterer + ResourceConverter() ResourceConverter + + // MetricNameForSeries returns the name (as presented in the API) for a given series. + // MetricNameForSeries(series prom.Series) (string, error) + // QueryForSeries returns the query for a given series (not API metric name), with + // the given namespace name (if relevant), resource, and resource names. + QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) + QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error) + IdentifySeries(series prom.Series) (seriesIdentity, error) + MetricType() config.MetricType + ExternalMetricNamespaceLabelName() string +} + +type seriesIdentity struct { + resources []schema.GroupResource + namespaced bool + name string +} + +func (c *seriesConverter) Selector() prom.Selector { + return c.seriesQuery +} + +type seriesConverter struct { + seriesQuery prom.Selector + + resourceConverter ResourceConverter + queryBuilder QueryBuilder + seriesFilterer SeriesFilterer + metricNamer MetricNamer + mapper apimeta.RESTMapper + + metricType config.MetricType + externalMetricNamespaceLabel string +} + +// queryTemplateArgs are the arguments for the metrics query template. +type queryTemplateArgs struct { + Series string + LabelMatchers string + LabelValuesByName map[string][]string + GroupBy string + GroupBySlice []string +} + +func (c *seriesConverter) MetricType() config.MetricType { + return c.metricType +} + +func (c *seriesConverter) ExternalMetricNamespaceLabelName() string { + return c.externalMetricNamespaceLabel +} + +func (c *seriesConverter) IdentifySeries(series prom.Series) (seriesIdentity, error) { + // TODO: warn if it doesn't match any resources + resources, namespaced := c.resourceConverter.ResourcesForSeries(series) + name, err := c.metricNamer.GetMetricNameForSeries(series) + + result := seriesIdentity{ + resources: resources, + namespaced: namespaced, + name: name, + } + + return result, err +} + +func (c *seriesConverter) SeriesFilterer() SeriesFilterer { + return c.seriesFilterer +} + +func (c *seriesConverter) ResourceConverter() ResourceConverter { + return c.resourceConverter +} + +func (c *seriesConverter) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart { + requirements, _ := metricSelector.Requirements() + + selectors := []queryPart{} + for i := 0; i < len(requirements); i++ { + selector := c.convertRequirement(requirements[i]) + + selectors = append(selectors, selector) + } + + return selectors +} + +func (c *seriesConverter) convertRequirement(requirement labels.Requirement) queryPart { + labelName := requirement.Key() + values := requirement.Values().List() + + return queryPart{ + labelName: labelName, + values: values, + operator: requirement.Operator(), + } +} + +type queryPart struct { + labelName string + values []string + operator selection.Operator +} + +func (c *seriesConverter) buildNamespaceQueryPartForSeries(namespace string) (queryPart, error) { + result := queryPart{} + + // If we've been given a namespace, then we need to set up + // the label requirements to target that namespace. + if namespace != "" { + namespaceLbl, err := c.resourceConverter.LabelForResource(nsGroupResource) + if err != nil { + return result, err + } + + values := []string{namespace} + + result = queryPart{ + values: values, + labelName: string(namespaceLbl), + operator: selection.Equals, + } + } + + return result, nil +} + +func (c *seriesConverter) buildResourceQueryPartForSeries(resource schema.GroupResource, names ...string) (queryPart, error) { + result := queryPart{} + + // If we've been given a resource, then we need to set up + // the label requirements to target that resource. + resourceLbl, err := c.resourceConverter.LabelForResource(resource) + if err != nil { + return result, err + } + + result = queryPart{ + labelName: string(resourceLbl), + values: names, + operator: selection.Equals, + } + + return result, nil +} + +func (c *seriesConverter) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { + queryParts := []queryPart{} + + // Build up the namespace part of the query. + namespaceQueryPart, err := c.buildNamespaceQueryPartForSeries(namespace) + if err != nil { + return "", err + } + + if namespaceQueryPart.labelName != "" { + queryParts = append(queryParts, namespaceQueryPart) + } + + // Build up the resource part of the query. + resourceQueryPart, err := c.buildResourceQueryPartForSeries(resource, names...) + if err != nil { + return "", err + } + + if resourceQueryPart.labelName != "" { + queryParts = append(queryParts, resourceQueryPart) + } + + return c.queryBuilder.BuildSelector(series, resourceQueryPart.labelName, []string{resourceQueryPart.labelName}, queryParts) +} + +// ConvertersFromConfig produces a MetricNamer for each rule in the given config. +func ConvertersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]SeriesConverter, error) { + converters := make([]SeriesConverter, len(cfg.Rules)) + for i, rule := range cfg.Rules { + var err error + + resourceConverter, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper) + if err != nil { + return nil, fmt.Errorf("unable to create ResourceConverter associated with series query %q: %v", rule.SeriesQuery, err) + } + + queryBuilder, err := NewQueryBuilder(rule.MetricsQuery) + if err != nil { + return nil, fmt.Errorf("unable to create a QueryBuilder associated with series query %q: %v", rule.SeriesQuery, err) + } + + seriesFilterer, err := NewSeriesFilterer(rule.SeriesFilters) + if err != nil { + return nil, fmt.Errorf("unable to create a SeriesFilter associated with series query %q: %v", rule.SeriesQuery, err) + } + + if rule.Name.Matches != "" { + err := seriesFilterer.AddRequirement(config.RegexFilter{Is: rule.Name.Matches}) + if err != nil { + return nil, fmt.Errorf("unable to apply the series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err) + } + } + + metricNamer, err := NewMetricNamer(rule.Name) + if err != nil { + return nil, fmt.Errorf("unable to create a MetricNamer associated with series query %q: %v", rule.SeriesQuery, err) + } + + namespaceLabel := "" + if rule.MetricType == config.External { + namespaceLabel = rule.ExternalMetricNamespaceLabelName + } + + metricType := rule.MetricType + if metricType == config.MetricType("") { + metricType = config.Custom + } + + converter := &seriesConverter{ + seriesQuery: prom.Selector(rule.SeriesQuery), + mapper: mapper, + + resourceConverter: resourceConverter, + queryBuilder: queryBuilder, + seriesFilterer: seriesFilterer, + metricNamer: metricNamer, + metricType: metricType, + externalMetricNamespaceLabel: namespaceLabel, + } + + converters[i] = converter + } + + return converters, nil +} + +func (c *seriesConverter) buildNamespaceQueryPartForExternalSeries(namespace string) (queryPart, error) { + return queryPart{ + labelName: c.externalMetricNamespaceLabel, + values: []string{namespace}, + operator: selection.Equals, + }, nil +} + +func (c *seriesConverter) QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error) { + queryParts := []queryPart{} + + if namespace != "" { + // Build up the namespace part of the query. + namespaceQueryPart, err := c.buildNamespaceQueryPartForExternalSeries(namespace) + if err != nil { + return "", err + } + + queryParts = append(queryParts, namespaceQueryPart) + } + + // Build up the query parts from the selector. + queryParts = append(queryParts, c.createQueryPartsFromSelector(metricSelector)...) + + selector, err := c.queryBuilder.BuildSelector(series, "", []string{}, queryParts) + if err != nil { + return "", err + } + + return selector, nil +} diff --git a/pkg/custom-provider/series_filterer.go b/pkg/custom-provider/series_filterer.go index 303b572d..153dd6eb 100644 --- a/pkg/custom-provider/series_filterer.go +++ b/pkg/custom-provider/series_filterer.go @@ -7,8 +7,8 @@ import ( "github.com/directxman12/k8s-prometheus-adapter/pkg/config" ) -//SeriesFilterer provides functions for filtering collections of Prometheus series -//to only those that meet certain requirements. +// SeriesFilterer provides functions for filtering collections of Prometheus series +// to only those that meet certain requirements. type SeriesFilterer interface { FilterSeries(series []prom.Series) []prom.Series AddRequirement(filter config.RegexFilter) error @@ -18,8 +18,8 @@ type seriesFilterer struct { seriesMatchers []*reMatcher } -//NewSeriesFilterer creates a SeriesFilterer that will remove any series that do not -//meet the requirements of the provided RegexFilter(s). +// NewSeriesFilterer creates a SeriesFilterer that will remove any series that do not +// meet the requirements of the provided RegexFilter(s). func NewSeriesFilterer(filters []config.RegexFilter) (SeriesFilterer, error) { seriesMatchers := make([]*reMatcher, len(filters)) for i, filterRaw := range filters { diff --git a/pkg/custom-provider/series_filterer_test.go b/pkg/custom-provider/series_filterer_test.go index 57db97ac..7c8aa0a1 100644 --- a/pkg/custom-provider/series_filterer_test.go +++ b/pkg/custom-provider/series_filterer_test.go @@ -60,12 +60,12 @@ func TestAddRequirementAppliesFilter(t *testing.T) { filterer, err := NewSeriesFilterer(filters) require.NoError(t, err) - //Test it once with the default filters. + // Test it once with the default filters. result := filterer.FilterSeries(series) expectedSeries := []string{"series_1", "series_2", "series_3"} VerifyMatches(t, result, expectedSeries) - //Add a new filter and test again. + // Add a new filter and test again. filterer.AddRequirement(config.RegexFilter{ Is: "series_[2-3]", }) diff --git a/pkg/custom-provider/series_registry.go b/pkg/custom-provider/series_registry.go index 77b58726..04b773d0 100644 --- a/pkg/custom-provider/series_registry.go +++ b/pkg/custom-provider/series_registry.go @@ -56,8 +56,8 @@ type seriesInfo struct { // seriesName is the name of the corresponding Prometheus series seriesName string - // namer is the MetricNamer used to name this series - namer MetricNamer + // converter is the SeriesConverter used to name this series + converter SeriesConverter } // overridableSeriesRegistry is a basic SeriesRegistry @@ -75,52 +75,52 @@ type basicSeriesRegistry struct { metricLister MetricListerWithNotification } -//NewBasicSeriesRegistry creates a SeriesRegistry driven by the data from the provided MetricLister. +// NewBasicSeriesRegistry creates a SeriesRegistry driven by the data from the provided MetricLister. func NewBasicSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) SeriesRegistry { var registry = basicSeriesRegistry{ mapper: mapper, metricLister: lister, } - lister.AddNotificationReceiver(registry.onNewDataAvailable) + lister.AddNotificationReceiver(registry.filterAndStoreMetrics) return ®istry } -func (r *basicSeriesRegistry) filterMetrics(result metricUpdateResult) metricUpdateResult { - namers := make([]MetricNamer, 0) +func (r *basicSeriesRegistry) filterMetrics(result MetricUpdateResult) MetricUpdateResult { + converters := make([]SeriesConverter, 0) series := make([][]prom.Series, 0) targetType := config.Custom - for i, namer := range result.namers { - if namer.MetricType() == targetType { - namers = append(namers, namer) + for i, converter := range result.converters { + if converter.MetricType() == targetType { + converters = append(converters, converter) series = append(series, result.series[i]) } } - return metricUpdateResult{ - namers: namers, - series: series, + return MetricUpdateResult{ + converters: converters, + series: series, } } -func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { +func (r *basicSeriesRegistry) filterAndStoreMetrics(result MetricUpdateResult) { result = r.filterMetrics(result) newSeriesSlices := result.series - namers := result.namers + converters := result.converters - // if len(newSeriesSlices) != len(namers) { - // return fmt.Errorf("need one set of series per namer") + // if len(newSeriesSlices) != len(converters) { + // return fmt.Errorf("need one set of series per converter") // } newInfo := make(map[provider.CustomMetricInfo]seriesInfo) for i, newSeries := range newSeriesSlices { - namer := namers[i] + converter := converters[i] for _, series := range newSeries { - identity, err := namer.IdentifySeries(series) + identity, err := converter.IdentifySeries(series) if err != nil { glog.Errorf("unable to name series %q, skipping: %v", series.String(), err) @@ -147,7 +147,7 @@ func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { // we don't need to re-normalize, because the metric namer should have already normalized for us newInfo[info] = seriesInfo{ seriesName: series.Name, - namer: namer, + converter: converter, } } } @@ -194,7 +194,7 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInf return "", false } - query, err := info.namer.QueryForSeries(info.seriesName, metricInfo.GroupResource, namespace, resourceNames...) + query, err := info.converter.QueryForSeries(info.seriesName, metricInfo.GroupResource, namespace, resourceNames...) if err != nil { glog.Errorf("unable to construct query for metric %s: %v", metricInfo.String(), err) return "", false @@ -218,7 +218,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetri return nil, false } - resourceLbl, err := info.namer.ResourceConverter().LabelForResource(metricInfo.GroupResource) + resourceLbl, err := info.converter.ResourceConverter().LabelForResource(metricInfo.GroupResource) if err != nil { glog.Errorf("unable to construct resource label for metric %s: %v", metricInfo.String(), err) return nil, false diff --git a/pkg/custom-provider/series_registry_test.go b/pkg/custom-provider/series_registry_test.go index 21dde5b9..e8ea17e1 100644 --- a/pkg/custom-provider/series_registry_test.go +++ b/pkg/custom-provider/series_registry_test.go @@ -51,11 +51,11 @@ func restMapper() apimeta.RESTMapper { return mapper } -func setupMetricNamer(t testing.TB) []MetricNamer { +func setupMetricNamer(t testing.TB) []SeriesConverter { cfg := config.DefaultConfig(1*time.Minute, "kube_") - namers, err := NamersFromConfig(cfg, restMapper()) + converters, err := ConvertersFromConfig(cfg, restMapper()) require.NoError(t, err) - return namers + return converters } var seriesRegistryTestSeries = [][]prom.Series{ @@ -117,60 +117,21 @@ var seriesRegistryTestSeries = [][]prom.Series{ }, } -type myType struct { - a int - b string - m map[string]int -} - -type mapWrapper struct { - item map[string]int -} - -func (o *myType) Mutate(newMap mapWrapper) { - o.a = 2 - o.b = "two" - o.m = newMap.item -} - -func TestWeirdStuff(t *testing.T) { - o := myType{ - a: 1, - b: "one", - m: map[string]int{ - "one": 1, - }, - } - - oldMap := o.m - newMap := map[string]int{ - "two": 2, - } - newWrapper := mapWrapper{ - item: newMap, - } - oldWrapper := mapWrapper{ - item: oldMap, - } - o.Mutate(newWrapper) - o.Mutate(oldWrapper) -} - func TestSeriesRegistry(t *testing.T) { assert := assert.New(t) - namers := setupMetricNamer(t) + converters := setupMetricNamer(t) registry := &basicSeriesRegistry{ mapper: restMapper(), } - updateResult := metricUpdateResult{ - series: seriesRegistryTestSeries, - namers: namers, + updateResult := MetricUpdateResult{ + series: seriesRegistryTestSeries, + converters: converters, } // set up the registry - registry.onNewDataAvailable(updateResult) + registry.filterAndStoreMetrics(updateResult) // make sure each metric got registered and can form queries testCases := []struct { @@ -309,7 +270,7 @@ func TestSeriesRegistry(t *testing.T) { } func BenchmarkSetSeries(b *testing.B) { - namers := setupMetricNamer(b) + converters := setupMetricNamer(b) registry := &basicSeriesRegistry{ mapper: restMapper(), } @@ -328,11 +289,11 @@ func BenchmarkSetSeries(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - updateResult := metricUpdateResult{ - series: newSeriesSlices, - namers: namers, + updateResult := MetricUpdateResult{ + series: newSeriesSlices, + converters: converters, } - registry.onNewDataAvailable(updateResult) + registry.filterAndStoreMetrics(updateResult) } }