diff --git a/Makefile b/Makefile index 935a0bbb..9efdb480 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,6 @@ ifeq ($(ARCH),ppc64le) endif ifeq ($(ARCH),s390x) BASEIMAGE?=s390x/busybox - GOIMAGE=s390x/golang:1.10 endif .PHONY: all docker-build push-% push test verify-gofmt gofmt verify build-local-image diff --git a/cmd/adapter/adapter.go b/cmd/adapter/adapter.go index d1afff24..24819f22 100644 --- a/cmd/adapter/adapter.go +++ b/cmd/adapter/adapter.go @@ -41,6 +41,7 @@ import ( mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics" adaptercfg "github.com/directxman12/k8s-prometheus-adapter/pkg/config" cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider" + extprov "github.com/directxman12/k8s-prometheus-adapter/pkg/external-provider" resprov "github.com/directxman12/k8s-prometheus-adapter/pkg/resourceprovider" ) @@ -171,6 +172,30 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan return cmProvider, nil } +func (cmd *PrometheusAdapter) makeExternalProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.ExternalMetricsProvider, error) { + if len(cmd.metricsConfig.ExternalRules) == 0 { + return nil, nil + } + + // grab the mapper + mapper, err := cmd.RESTMapper() + if err != nil { + return nil, fmt.Errorf("unable to construct RESTMapper: %v", err) + } + + // collect series converters for adapter + converters, errs := extprov.ConvertersFromConfig(cmd.metricsConfig, mapper) + if len(errs) > 0 { + return nil, fmt.Errorf("unable to construct naming scheme from metrics rules: %v", errs) + } + + // construct the provider and start it + emProvider, runner := extprov.NewExternalPrometheusProvider(mapper, promClient, converters, cmd.MetricsRelistInterval) + runner.RunUntil(stopCh) + + return emProvider, nil +} + func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error { if cmd.metricsConfig.ResourceRules == nil { // bail if we don't have rules for setting things up @@ -247,6 +272,17 @@ func main() { cmd.WithCustomMetrics(cmProvider) } + // construct the external provider + emProvider, err := cmd.makeExternalProvider(promClient, wait.NeverStop) + if err != nil { + glog.Fatalf("unable to construct external metrics provider: %v", err) + } + + // attach the provider to the server, if it's needed + if emProvider != nil { + cmd.WithExternalMetrics(emProvider) + } + // attach resource metrics support, if it's needed if err := cmd.addResourceMetricsAPI(promClient); err != nil { glog.Fatalf("unable to install resource metrics API: %v", err) diff --git a/pkg/config/config.go b/pkg/config/config.go index 8951923d..afeba8cd 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -11,6 +11,7 @@ type MetricsDiscoveryConfig struct { // will make only a single API call. Rules []DiscoveryRule `yaml:"rules"` ResourceRules *ResourceRules `yaml:"resourceRules,omitempty"` + ExternalRules []DiscoveryRule `yaml:"externalRules,omitempty"` } // DiscoveryRule describes a set of rules for transforming Prometheus metrics to/from diff --git a/pkg/external-provider/basic_metric_lister.go b/pkg/external-provider/basic_metric_lister.go new file mode 100644 index 00000000..789baa01 --- /dev/null +++ b/pkg/external-provider/basic_metric_lister.go @@ -0,0 +1,165 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "fmt" + "time" + + "github.com/golang/glog" + pmodel "github.com/prometheus/common/model" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +// Runnable represents something that can be run until told to stop. +type Runnable interface { + // Run runs the runnable forever. + Run() + // RunUntil runs the runnable until the given channel is closed. + RunUntil(stopChan <-chan struct{}) +} + +// A MetricLister provides a window into all of the metrics that are available within a given +// Prometheus instance, classified as either Custom or External metrics, but presented generically +// so that it can manage both types simultaneously. +type MetricLister interface { + ListAllMetrics() (MetricUpdateResult, error) +} + +// A MetricListerWithNotification is a MetricLister that has the ability to notify listeners +// when new metric data is available. +type MetricListerWithNotification interface { + MetricLister + Runnable + + // AddNotificationReceiver registers a callback to be invoked when new metric data is available. + AddNotificationReceiver(MetricUpdateCallback) + // UpdateNow forces an immediate refresh from the source data. Primarily for test purposes. + UpdateNow() +} + +type basicMetricLister struct { + promClient prom.Client + converters []SeriesConverter + lookback time.Duration +} + +// NewBasicMetricLister creates a MetricLister that is capable of interactly directly with Prometheus to list metrics. +func NewBasicMetricLister(promClient prom.Client, converters []SeriesConverter, lookback time.Duration) MetricLister { + lister := basicMetricLister{ + promClient: promClient, + converters: converters, + lookback: lookback, + } + + return &lister +} + +type selectorSeries struct { + selector prom.Selector + series []prom.Series +} + +func (l *basicMetricLister) ListAllMetrics() (MetricUpdateResult, error) { + result := MetricUpdateResult{ + series: make([][]prom.Series, 0), + converters: make([]SeriesConverter, 0), + } + + startTime := pmodel.Now().Add(-1 * l.lookback) + + // these can take a while on large clusters, so launch in parallel + // and don't duplicate + selectors := make(map[prom.Selector]struct{}) + selectorSeriesChan := make(chan selectorSeries, len(l.converters)) + errs := make(chan error, len(l.converters)) + for _, converter := range l.converters { + sel := converter.Selector() + if _, ok := selectors[sel]; ok { + errs <- nil + selectorSeriesChan <- selectorSeries{} + continue + } + selectors[sel] = struct{}{} + go func() { + series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel) + if err != nil { + errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err) + return + } + errs <- nil + // Push into the channel: "this selector produced these series" + selectorSeriesChan <- selectorSeries{ + selector: sel, + series: series, + } + }() + } + + // don't do duplicate queries when it's just the matchers that change + seriesCacheByQuery := make(map[prom.Selector][]prom.Series) + + // iterate through, blocking until we've got all results + // We know that, from above, we should have pushed one item into the channel + // for each converter. So here, we'll assume that we should receive one item per converter. + for range l.converters { + if err := <-errs; err != nil { + return result, fmt.Errorf("unable to update list of all metrics: %v", err) + } + // Receive from the channel: "this selector produced these series" + // We stuff that into this map so that we can collect the data as it arrives + // and then, once we've received it all, we can process it below. + if ss := <-selectorSeriesChan; ss.series != nil { + seriesCacheByQuery[ss.selector] = ss.series + } + } + close(errs) + + // Now that we've collected all of the results into `seriesCacheByQuery` + // we can start processing them. + newSeries := make([][]prom.Series, len(l.converters)) + for i, converter := range l.converters { + series, cached := seriesCacheByQuery[converter.Selector()] + if !cached { + return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", converter.Selector()) + } + // Because converters provide a "post-filtering" option, it's not enough to + // simply take all the series that were produced. We need to further filter them. + newSeries[i] = converter.SeriesFilterer().FilterSeries(series) + } + + glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) + + result.series = newSeries + result.converters = l.converters + return result, nil +} + +// MetricUpdateResult represents the output of a periodic inspection of metrics found to be +// available in Prometheus. +// It includes both the series data the Prometheus exposed, as well as the configurational +// object that led to their discovery. +type MetricUpdateResult struct { + series [][]prom.Series + converters []SeriesConverter +} + +// MetricUpdateCallback is a function signature for receiving periodic updates about +// available metrics. +type MetricUpdateCallback func(MetricUpdateResult) diff --git a/pkg/external-provider/errors.go b/pkg/external-provider/errors.go new file mode 100644 index 00000000..1459dd98 --- /dev/null +++ b/pkg/external-provider/errors.go @@ -0,0 +1,27 @@ +package provider + +import "errors" + +// NewOperatorNotSupportedByPrometheusError creates an error that represents the fact that we were requested to service a query that +// Prometheus would be unable to support. +func NewOperatorNotSupportedByPrometheusError() error { + return errors.New("operator not supported by prometheus") +} + +// NewOperatorRequiresValuesError creates an error that represents the fact that we were requested to service a query +// that was malformed in its operator/value combination. +func NewOperatorRequiresValuesError() error { + return errors.New("operator requires values") +} + +// NewOperatorDoesNotSupportValuesError creates an error that represents the fact that we were requested to service a query +// that was malformed in its operator/value combination. +func NewOperatorDoesNotSupportValuesError() error { + return errors.New("operator does not support values") +} + +// NewLabelNotSpecifiedError creates an error that represents the fact that we were requested to service a query +// that was malformed in its label specification. +func NewLabelNotSpecifiedError() error { + return errors.New("label not specified") +} diff --git a/pkg/external-provider/external_series_registry.go b/pkg/external-provider/external_series_registry.go new file mode 100644 index 00000000..e34769b7 --- /dev/null +++ b/pkg/external-provider/external_series_registry.go @@ -0,0 +1,109 @@ +package provider + +import ( + "sync" + + "github.com/golang/glog" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +// ExternalSeriesRegistry acts as the top-level converter for transforming Kubernetes requests +// for external metrics into Prometheus queries. +type ExternalSeriesRegistry interface { + // ListAllMetrics lists all metrics known to this registry + ListAllMetrics() []provider.ExternalMetricInfo + QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (prom.Selector, bool, error) +} + +// overridableSeriesRegistry is a basic SeriesRegistry +type externalSeriesRegistry struct { + mu sync.RWMutex + + // metrics is the list of all known metrics, ready to return from the API + metrics []provider.ExternalMetricInfo + // rawMetrics is a lookup from a metric to SeriesConverter for the sake of generating queries + rawMetrics map[string]SeriesConverter + + mapper apimeta.RESTMapper + + metricLister MetricListerWithNotification +} + +// NewExternalSeriesRegistry creates an ExternalSeriesRegistry driven by the data from the provided MetricLister. +func NewExternalSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) ExternalSeriesRegistry { + var registry = externalSeriesRegistry{ + mapper: mapper, + metricLister: lister, + metrics: make([]provider.ExternalMetricInfo, 0), + rawMetrics: map[string]SeriesConverter{}, + } + + lister.AddNotificationReceiver(registry.filterAndStoreMetrics) + + return ®istry +} + +func (r *externalSeriesRegistry) filterAndStoreMetrics(result MetricUpdateResult) { + + newSeriesSlices := result.series + converters := result.converters + + if len(newSeriesSlices) != len(converters) { + glog.Errorf("need one set of series per converter") + } + apiMetricsCache := make([]provider.ExternalMetricInfo, 0) + rawMetricsCache := make(map[string]SeriesConverter) + + for i, newSeries := range newSeriesSlices { + converter := converters[i] + for _, series := range newSeries { + identity, err := converter.IdentifySeries(series) + + if err != nil { + glog.Errorf("unable to name series %q, skipping: %v", series.String(), err) + continue + } + + name := identity.name + rawMetricsCache[name] = converter + } + } + + for metricName := range rawMetricsCache { + apiMetricsCache = append(apiMetricsCache, provider.ExternalMetricInfo{ + Metric: metricName, + }) + } + + r.mu.Lock() + defer r.mu.Unlock() + + r.metrics = apiMetricsCache + r.rawMetrics = rawMetricsCache +} + +func (r *externalSeriesRegistry) ListAllMetrics() []provider.ExternalMetricInfo { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.metrics +} + +func (r *externalSeriesRegistry) QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (prom.Selector, bool, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + converter, found := r.rawMetrics[metricName] + + if !found { + glog.V(10).Infof("external metric %q not found", metricName) + return "", false, nil + } + + query, err := converter.QueryForExternalSeries(namespace, metricName, metricSelector) + return query, found, err +} diff --git a/pkg/external-provider/metric_converter.go b/pkg/external-provider/metric_converter.go new file mode 100644 index 00000000..44238f1b --- /dev/null +++ b/pkg/external-provider/metric_converter.go @@ -0,0 +1,126 @@ +package provider + +import ( + "errors" + "fmt" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/prometheus/common/model" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +// MetricConverter provides a unified interface for converting the results of +// Prometheus queries into external metric types. +type MetricConverter interface { + Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) +} + +type metricConverter struct { +} + +// NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types +// returned by the Prometheus client into external metrics types. +func NewMetricConverter() MetricConverter { + return &metricConverter{} +} + +func (c *metricConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type == model.ValScalar { + return c.convertScalar(queryResult) + } + + if queryResult.Type == model.ValVector { + return c.convertVector(queryResult) + } + + return nil, errors.New("encountered an unexpected query result type") +} + +func (c *metricConverter) convertSample(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) { + labels := c.convertLabels(sample.Metric) + + singleMetric := external_metrics.ExternalMetricValue{ + MetricName: string(sample.Metric[model.LabelName("__name__")]), + Timestamp: metav1.Time{ + sample.Timestamp.Time(), + }, + Value: *resource.NewMilliQuantity(int64(sample.Value*1000.0), resource.DecimalSI), + MetricLabels: labels, + } + + return &singleMetric, nil +} + +func (c *metricConverter) convertLabels(inLabels model.Metric) map[string]string { + numLabels := len(inLabels) + outLabels := make(map[string]string, numLabels) + for labelName, labelVal := range inLabels { + outLabels[string(labelName)] = string(labelVal) + } + + return outLabels +} + +func (c *metricConverter) convertVector(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type != model.ValVector { + return nil, errors.New("incorrect query result type") + } + + toConvert := *queryResult.Vector + + if toConvert == nil { + return nil, errors.New("the provided input did not contain vector query results") + } + + items := []external_metrics.ExternalMetricValue{} + metricValueList := external_metrics.ExternalMetricValueList{ + Items: items, + } + + numSamples := toConvert.Len() + if numSamples == 0 { + return &metricValueList, nil + } + + for _, val := range toConvert { + + singleMetric, err := c.convertSample(val) + + if err != nil { + return nil, fmt.Errorf("unable to convert vector: %v", err) + } + + items = append(items, *singleMetric) + } + + metricValueList = external_metrics.ExternalMetricValueList{ + Items: items, + } + return &metricValueList, nil +} + +func (c *metricConverter) convertScalar(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) { + if queryResult.Type != model.ValScalar { + return nil, errors.New("scalarConverter can only convert scalar query results") + } + + toConvert := queryResult.Scalar + + if toConvert == nil { + return nil, errors.New("the provided input did not contain scalar query results") + } + + result := external_metrics.ExternalMetricValueList{ + Items: []external_metrics.ExternalMetricValue{ + { + Timestamp: metav1.Time{ + toConvert.Timestamp.Time(), + }, + Value: *resource.NewMilliQuantity(int64(toConvert.Value*1000.0), resource.DecimalSI), + }, + }, + } + return &result, nil +} diff --git a/pkg/external-provider/metric_namer.go b/pkg/external-provider/metric_namer.go new file mode 100644 index 00000000..64053344 --- /dev/null +++ b/pkg/external-provider/metric_namer.go @@ -0,0 +1,159 @@ +package provider + +import ( + "fmt" + "regexp" + "strings" + + "k8s.io/apimachinery/pkg/runtime/schema" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" + "github.com/directxman12/k8s-prometheus-adapter/pkg/naming" +) + +var nsGroupResource = schema.GroupResource{Resource: "namespaces"} +var groupNameSanitizer = strings.NewReplacer(".", "_", "-", "_") + +// MetricNamer knows how to convert Prometheus series names and label names to +// metrics API resources, and vice-versa. MetricNamers should be safe to access +// concurrently. Returned group-resources are "normalized" as per the +// MetricInfo#Normalized method. Group-resources passed as arguments must +// themselves be normalized. +type MetricNamer interface { + // Selector produces the appropriate Prometheus series selector to match all + // series handlable by this namer. + Selector() prom.Selector + // FilterSeries checks to see which of the given series match any additional + // constrains beyond the series query. It's assumed that the series given + // already matche the series query. + FilterSeries(series []prom.Series) []prom.Series + // MetricNameForSeries returns the name (as presented in the API) for a given series. + MetricNameForSeries(series prom.Series) (string, error) + // QueryForSeries returns the query for a given series (not API metric name), with + // the given namespace name (if relevant), resource, and resource names. + QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) + + naming.ResourceConverter +} + +func (n *metricNamer) Selector() prom.Selector { + return n.seriesQuery +} + +// reMatcher either positively or negatively matches a regex +type reMatcher struct { + regex *regexp.Regexp + positive bool +} + +func newReMatcher(cfg config.RegexFilter) (*reMatcher, error) { + if cfg.Is != "" && cfg.IsNot != "" { + return nil, fmt.Errorf("cannot have both an `is` (%q) and `isNot` (%q) expression in a single filter", cfg.Is, cfg.IsNot) + } + if cfg.Is == "" && cfg.IsNot == "" { + return nil, fmt.Errorf("must have either an `is` or `isNot` expression in a filter") + } + + var positive bool + var regexRaw string + if cfg.Is != "" { + positive = true + regexRaw = cfg.Is + } else { + positive = false + regexRaw = cfg.IsNot + } + + regex, err := regexp.Compile(regexRaw) + if err != nil { + return nil, fmt.Errorf("unable to compile series filter %q: %v", regexRaw, err) + } + + return &reMatcher{ + regex: regex, + positive: positive, + }, nil +} + +func (m *reMatcher) Matches(val string) bool { + return m.regex.MatchString(val) == m.positive +} + +type metricNamer struct { + seriesQuery prom.Selector + metricsQuery naming.MetricsQuery + nameMatches *regexp.Regexp + nameAs string + seriesMatchers []*reMatcher + + naming.ResourceConverter +} + +// queryTemplateArgs are the arguments for the metrics query template. +func (n *metricNamer) FilterSeries(initialSeries []prom.Series) []prom.Series { + if len(n.seriesMatchers) == 0 { + return initialSeries + } + + finalSeries := make([]prom.Series, 0, len(initialSeries)) +SeriesLoop: + for _, series := range initialSeries { + for _, matcher := range n.seriesMatchers { + if !matcher.Matches(series.Name) { + continue SeriesLoop + } + } + finalSeries = append(finalSeries, series) + } + + return finalSeries +} + +func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { + return n.metricsQuery.Build(series, resource, namespace, nil, names...) +} + +func (n *metricNamer) MetricNameForSeries(series prom.Series) (string, error) { + matches := n.nameMatches.FindStringSubmatchIndex(series.Name) + if matches == nil { + return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, n.nameMatches.String()) + } + outNameBytes := n.nameMatches.ExpandString(nil, n.nameAs, series.Name, matches) + return string(outNameBytes), nil +} + +// NewMetricNamer creates a MetricNamer capable of translating Prometheus series names +// into custom metric names. +func NewMetricNamer(mapping config.NameMapping) (MetricNamer, error) { + var nameMatches *regexp.Regexp + var err error + if mapping.Matches != "" { + nameMatches, err = regexp.Compile(mapping.Matches) + if err != nil { + return nil, fmt.Errorf("unable to compile series name match expression %q: %v", mapping.Matches, err) + } + } else { + // this will always succeed + nameMatches = regexp.MustCompile(".*") + } + nameAs := mapping.As + if nameAs == "" { + // check if we have an obvious default + subexpNames := nameMatches.SubexpNames() + if len(subexpNames) == 1 { + // no capture groups, use the whole thing + nameAs = "$0" + } else if len(subexpNames) == 2 { + // one capture group, use that + nameAs = "$1" + } else { + return nil, fmt.Errorf("must specify an 'as' value for name matcher %q", mapping.Matches) + } + } + + return &metricNamer{ + nameMatches: nameMatches, + nameAs: nameAs, + }, nil +} diff --git a/pkg/external-provider/periodic_metric_lister.go b/pkg/external-provider/periodic_metric_lister.go new file mode 100644 index 00000000..5a719490 --- /dev/null +++ b/pkg/external-provider/periodic_metric_lister.go @@ -0,0 +1,89 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" +) + +type periodicMetricLister struct { + realLister MetricLister + updateInterval time.Duration + mostRecentResult MetricUpdateResult + callbacks []MetricUpdateCallback +} + +// NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics +// at the provided interval, but defers the actual act of retrieving the metrics to the supplied MetricLister. +func NewPeriodicMetricLister(realLister MetricLister, updateInterval time.Duration) (MetricListerWithNotification, Runnable) { + lister := periodicMetricLister{ + updateInterval: updateInterval, + realLister: realLister, + callbacks: make([]MetricUpdateCallback, 0), + } + + return &lister, &lister +} + +func (l *periodicMetricLister) AddNotificationReceiver(callback MetricUpdateCallback) { + l.callbacks = append(l.callbacks, callback) +} + +func (l *periodicMetricLister) ListAllMetrics() (MetricUpdateResult, error) { + return l.mostRecentResult, nil +} + +func (l *periodicMetricLister) Run() { + l.RunUntil(wait.NeverStop) +} + +func (l *periodicMetricLister) RunUntil(stopChan <-chan struct{}) { + go wait.Until(func() { + if err := l.updateMetrics(); err != nil { + utilruntime.HandleError(err) + } + }, l.updateInterval, stopChan) +} + +func (l *periodicMetricLister) updateMetrics() error { + result, err := l.realLister.ListAllMetrics() + + if err != nil { + return err + } + + //Cache the result. + l.mostRecentResult = result + //Let our listeners know we've got new data ready for them. + l.notifyListeners() + return nil +} + +func (l *periodicMetricLister) notifyListeners() { + for _, listener := range l.callbacks { + if listener != nil { + listener(l.mostRecentResult) + } + } +} + +func (l *periodicMetricLister) UpdateNow() { + l.updateMetrics() +} diff --git a/pkg/external-provider/periodic_metric_lister_test.go b/pkg/external-provider/periodic_metric_lister_test.go new file mode 100644 index 00000000..b276294f --- /dev/null +++ b/pkg/external-provider/periodic_metric_lister_test.go @@ -0,0 +1,69 @@ +package provider + +import ( + "testing" + "time" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/stretchr/testify/require" +) + +type fakeLister struct { + callCount int +} + +func (f *fakeLister) ListAllMetrics() (MetricUpdateResult, error) { + f.callCount++ + + return MetricUpdateResult{ + series: [][]prom.Series{ + { + { + Name: "a_series", + }, + }, + }, + }, nil +} + +func TestWhenNewMetricsAvailableCallbackIsInvoked(t *testing.T) { + fakeLister := &fakeLister{} + targetLister, _ := NewPeriodicMetricLister(fakeLister, time.Duration(1000)) + periodicLister := targetLister.(*periodicMetricLister) + + callbackInvoked := false + callback := func(r MetricUpdateResult) { + callbackInvoked = true + } + + periodicLister.AddNotificationReceiver(callback) + err := periodicLister.updateMetrics() + require.NoError(t, err) + require.True(t, callbackInvoked) +} + +func TestWhenListingMetricsReturnsCachedValues(t *testing.T) { + fakeLister := &fakeLister{} + targetLister, _ := NewPeriodicMetricLister(fakeLister, time.Duration(1000)) + periodicLister := targetLister.(*periodicMetricLister) + + // We haven't invoked the inner lister yet, so we should have no results. + resultBeforeUpdate, err := periodicLister.ListAllMetrics() + require.NoError(t, err) + require.Equal(t, 0, len(resultBeforeUpdate.series)) + require.Equal(t, 0, fakeLister.callCount) + + // We can simulate waiting for the udpate interval to pass... + // which should result in calling the inner lister to get the metrics. + err = periodicLister.updateMetrics() + require.NoError(t, err) + require.Equal(t, 1, fakeLister.callCount) + + // If we list now, we should return the cached values. + // Make sure we got some results this time + // as well as that we didn't unnecessarily invoke the inner lister. + resultAfterUpdate, err := periodicLister.ListAllMetrics() + require.NoError(t, err) + require.NotEqual(t, 0, len(resultAfterUpdate.series)) + require.Equal(t, 1, fakeLister.callCount) +} diff --git a/pkg/external-provider/provider.go b/pkg/external-provider/provider.go new file mode 100644 index 00000000..a800f4c7 --- /dev/null +++ b/pkg/external-provider/provider.go @@ -0,0 +1,79 @@ +package provider + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/golang/glog" + pmodel "github.com/prometheus/common/model" + + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + + apierr "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +// TODO: Make sure everything has the proper licensing disclosure at the top. +type externalPrometheusProvider struct { + promClient prom.Client + metricConverter MetricConverter + + seriesRegistry ExternalSeriesRegistry +} + +func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) { + selector, found, err := p.seriesRegistry.QueryForMetric(namespace, info.Metric, metricSelector) + + if err != nil { + glog.Errorf("unable to generate a query for the metric: %v", err) + return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) + } + + if !found { + return nil, provider.NewMetricNotFoundError(p.selectGroupResource(namespace), info.Metric) + } + + queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) + + if err != nil { + glog.Errorf("unable to fetch metrics from prometheus: %v", err) + // don't leak implementation details to the user + return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) + } + return p.metricConverter.Convert(queryResults) +} + +func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { + return p.seriesRegistry.ListAllMetrics() +} + +func (p *externalPrometheusProvider) selectGroupResource(namespace string) schema.GroupResource { + if namespace == "default" { + return nsGroupResource + } + + return schema.GroupResource{ + Group: "", + Resource: "", + } +} + +// NewExternalPrometheusProvider creates an ExternalMetricsProvider capable of responding to Kubernetes requests for external metric data +func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, promClient prom.Client, converters []SeriesConverter, updateInterval time.Duration) (provider.ExternalMetricsProvider, Runnable) { + metricConverter := NewMetricConverter() + basicLister := NewBasicMetricLister(promClient, converters, updateInterval) + periodicLister, _ := NewPeriodicMetricLister(basicLister, updateInterval) + seriesRegistry := NewExternalSeriesRegistry(periodicLister, mapper) + return &externalPrometheusProvider{ + promClient: promClient, + seriesRegistry: seriesRegistry, + metricConverter: metricConverter, + }, periodicLister +} diff --git a/pkg/external-provider/query_builder.go b/pkg/external-provider/query_builder.go new file mode 100644 index 00000000..bb9b6d9c --- /dev/null +++ b/pkg/external-provider/query_builder.go @@ -0,0 +1,206 @@ +package provider + +import ( + "bytes" + "errors" + "fmt" + "strings" + "text/template" + + "k8s.io/apimachinery/pkg/selection" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +// QueryBuilder provides functions for generating Prometheus queries. +type QueryBuilder interface { + BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error) +} + +type queryBuilder struct { + metricsQueryTemplate *template.Template +} + +// NewQueryBuilder creates a QueryBuilder. +func NewQueryBuilder(metricsQuery string) (QueryBuilder, error) { + metricsQueryTemplate, err := template.New("metrics-query").Delims("<<", ">>").Parse(metricsQuery) + if err != nil { + return nil, fmt.Errorf("unable to parse metrics query template %q: %v", metricsQuery, err) + } + + return &queryBuilder{ + metricsQueryTemplate: metricsQueryTemplate, + }, nil +} + +func (n *queryBuilder) BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error) { + // Convert our query parts into the types we need for our template. + exprs, valuesByName, err := n.processQueryParts(queryParts) + + if err != nil { + return "", err + } + + args := queryTemplateArgs{ + Series: seriesName, + LabelMatchers: strings.Join(exprs, ","), + LabelValuesByName: valuesByName, + GroupBy: groupBy, + GroupBySlice: groupBySlice, + } + + selector, err := n.createSelectorFromTemplateArgs(args) + if err != nil { + return "", err + } + + return selector, nil +} + +func (n *queryBuilder) createSelectorFromTemplateArgs(args queryTemplateArgs) (prom.Selector, error) { + //Turn our template arguments into a Selector. + queryBuff := new(bytes.Buffer) + if err := n.metricsQueryTemplate.Execute(queryBuff, args); err != nil { + return "", err + } + + if queryBuff.Len() == 0 { + return "", fmt.Errorf("empty query produced by metrics query template") + } + + return prom.Selector(queryBuff.String()), nil +} + +func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[string][]string, error) { + // We've take the approach here that if we can't perfectly map their query into a Prometheus + // query that we should abandon the effort completely. + // The concern is that if we don't get a perfect match on their query parameters, the query result + // might contain unexpected data that would cause them to take an erroneous action based on the result. + + // Contains the expressions that we want to include as part of the query to Prometheus. + // e.g. "namespace=my-namespace" + // e.g. "some_label=some-value" + var exprs []string + + // Contains the list of label values we're targeting, by namespace. + // e.g. "some_label" => ["value-one", "value-two"] + valuesByName := map[string][]string{} + + // Convert our query parts into template arguments. + for _, qPart := range queryParts { + // Be resilient against bad inputs. + // We obviously can't generate label filters for these cases. + fmt.Println("This is queryPart", qPart.labelName, qPart.operator, qPart.values) + if qPart.labelName == "" { + return nil, nil, NewLabelNotSpecifiedError() + } + + if !n.operatorIsSupported(qPart.operator) { + return nil, nil, NewOperatorNotSupportedByPrometheusError() + } + + matcher, err := n.selectMatcher(qPart.operator, qPart.values) + + if err != nil { + return nil, nil, err + } + + targetValue, err := n.selectTargetValue(qPart.operator, qPart.values) + if err != nil { + return nil, nil, err + } + + expression := matcher(qPart.labelName, targetValue) + exprs = append(exprs, expression) + valuesByName[qPart.labelName] = qPart.values + } + + return exprs, valuesByName, nil +} + +func (n *queryBuilder) selectMatcher(operator selection.Operator, values []string) (func(string, string) string, error) { + + numValues := len(values) + if numValues == 0 { + switch operator { + case selection.Exists: + return prom.LabelNeq, nil + case selection.DoesNotExist: + return prom.LabelEq, nil + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn: + return nil, NewOperatorRequiresValuesError() + } + } else if numValues == 1 { + switch operator { + case selection.Equals, selection.DoubleEquals: + return prom.LabelEq, nil + case selection.NotEquals: + return prom.LabelNeq, nil + case selection.In, selection.Exists: + return prom.LabelMatches, nil + case selection.DoesNotExist, selection.NotIn: + return prom.LabelNotMatches, nil + } + } else { + // Since labels can only have one value, providing multiple + // values results in a regex match, even if that's not what the user + // asked for. + switch operator { + case selection.Equals, selection.DoubleEquals, selection.In, selection.Exists: + return prom.LabelMatches, nil + case selection.NotEquals, selection.DoesNotExist, selection.NotIn: + return prom.LabelNotMatches, nil + } + } + + return nil, errors.New("operator not supported by query builder") +} + +func (n *queryBuilder) selectTargetValue(operator selection.Operator, values []string) (string, error) { + numValues := len(values) + if numValues == 0 { + switch operator { + case selection.Exists, selection.DoesNotExist: + // Return an empty string when values are equal to 0 + // When the operator is LabelNotMatches this will select series without the label + // or with the label but a value of "". + // When the operator is LabelMatches this will select series with the label + // whose value is NOT "". + return "", nil + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn: + return "", NewOperatorRequiresValuesError() + } + } else if numValues == 1 { + switch operator { + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn: + // Pass the value through as-is. + // It's somewhat strange to do this for both the regex and equality + // operators, but if we do it this way it gives the user a little more control. + // They might choose to send an "IN" request and give a list of static values + // or they could send a single value that's a regex, giving them a passthrough + // for their label selector. + return values[0], nil + case selection.Exists, selection.DoesNotExist: + return "", errors.New("operator does not support values") + } + } else { + switch operator { + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn: + // Pass the value through as-is. + // It's somewhat strange to do this for both the regex and equality + // operators, but if we do it this way it gives the user a little more control. + // They might choose to send an "IN" request and give a list of static values + // or they could send a single value that's a regex, giving them a passthrough + // for their label selector. + return strings.Join(values, "|"), nil + case selection.Exists, selection.DoesNotExist: + return "", NewOperatorDoesNotSupportValuesError() + } + } + + return "", errors.New("operator not supported by query builder") +} + +func (n *queryBuilder) operatorIsSupported(operator selection.Operator) bool { + return operator != selection.GreaterThan && operator != selection.LessThan +} diff --git a/pkg/external-provider/query_builder_test.go b/pkg/external-provider/query_builder_test.go new file mode 100644 index 00000000..ecdcc414 --- /dev/null +++ b/pkg/external-provider/query_builder_test.go @@ -0,0 +1,343 @@ +package provider + +import ( + "testing" + + "k8s.io/apimachinery/pkg/selection" + + "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/stretchr/testify/require" +) + +func TestBadQueryPartsDontBuildQueries(t *testing.T) { + builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") + _, err := builder.BuildSelector("my_series", "", []string{}, []queryPart{ + { + labelName: "", + values: nil, + }, + { + labelName: "", + values: []string{}, + }, + }) + + require.Error(t, err) +} + +func runQueryBuilderTest(t *testing.T, queryParts []queryPart, expectation string) { + builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") + selector, err := builder.BuildSelector("my_series", "", []string{}, queryParts) + + expectError := expectation == "" + + if expectError { + require.Error(t, err) + } else { + selectorExpectation := client.Selector(expectation) + require.NoError(t, err) + require.Equal(t, selector, selectorExpectation) + } +} + +/* +func TestSimpleQuery(t *testing.T) { + runQueryBuilderTest(t, []queryPart{}, "") +} +*/ + +//Equals +func TestEqualsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.Equals, + }, + }, "") +} + +func TestEqualsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.Equals, + }, + }, "rate(my_series{target_label=\"one\"}[2m])") +} + +func TestEqualsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.Equals, + }, + }, "rate(my_series{target_label=~\"one|two\"}[2m])") +} + +//Double Equals +func TestDoubleEqualsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.DoubleEquals, + }, + }, "") +} + +func TestDoubleEqualsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.DoubleEquals, + }, + }, "rate(my_series{target_label=\"one\"}[2m])") +} + +func TestDoubleEqualsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.DoubleEquals, + }, + }, "rate(my_series{target_label=~\"one|two\"}[2m])") +} + +//Not Equals +func TestNotEqualsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.NotEquals, + }, + }, "") +} + +func TestNotEqualsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.NotEquals, + }, + }, "rate(my_series{target_label!=\"one\"}[2m])") +} + +func TestNotEqualsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.NotEquals, + }, + }, "rate(my_series{target_label!~\"one|two\"}[2m])") +} + +//In +func TestInQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.In, + }, + }, "") +} + +func TestInQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.In, + }, + }, "rate(my_series{target_label=~\"one\"}[2m])") +} + +func TestInQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.In, + }, + }, "rate(my_series{target_label=~\"one|two\"}[2m])") +} + +//NotIn +func TestNotInQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.NotIn, + }, + }, "") +} + +func TestNotInQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.NotIn, + }, + }, "rate(my_series{target_label!~\"one\"}[2m])") +} + +func TestNotInQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.NotIn, + }, + }, "rate(my_series{target_label!~\"one|two\"}[2m])") +} + +//Exists +func TestExistsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.Exists, + }, + }, "rate(my_series{target_label!=\"\"}[2m])") +} + +func TestExistsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.Exists, + }, + }, "") +} + +func TestExistsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.Exists, + }, + }, "") +} + +//DoesNotExist +func TestDoesNotExistsQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.DoesNotExist, + }, + }, "rate(my_series{target_label=\"\"}[2m])") +} + +func TestDoesNotExistsQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.DoesNotExist, + }, + }, "") +} + +func TestDoesNotExistsQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.DoesNotExist, + }, + }, "") +} + +//GreaterThan +func TestGreaterThanQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.GreaterThan, + }, + }, "") +} + +func TestGreaterThanQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.GreaterThan, + }, + }, "") +} + +func TestGreaterThanQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.GreaterThan, + }, + }, "") +} + +//LessThan +func TestLessThanQueryWithNoLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{}, + operator: selection.LessThan, + }, + }, "") +} + +func TestLessThanQueryWithOneLabelValue(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one"}, + operator: selection.LessThan, + }, + }, "") +} + +func TestLessThanQueryWithMultipleLabelValues(t *testing.T) { + runQueryBuilderTest(t, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.LessThan, + }, + }, "") +} + +func TestQueryWithGroupBy(t *testing.T) { + builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>}[2m])) by (<<.GroupBy>>)") + + selector, _ := builder.BuildSelector("my_series", "my_grouping", []string{}, []queryPart{ + { + labelName: "target_label", + values: []string{"one", "two"}, + operator: selection.In, + }, + }) + + expectation := client.Selector("sum(rate(my_series{target_label=~\"one|two\"}[2m])) by (my_grouping)") + require.Equal(t, selector, expectation) +} + +// TODO: Ensure that the LabelValuesByName and GroupBySlice placeholders function correctly. diff --git a/pkg/external-provider/regex_matcher_test.go b/pkg/external-provider/regex_matcher_test.go new file mode 100644 index 00000000..98d61400 --- /dev/null +++ b/pkg/external-provider/regex_matcher_test.go @@ -0,0 +1,49 @@ +package provider + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +func TestReMatcherIs(t *testing.T) { + filter := config.RegexFilter{ + Is: "my_.*", + } + + matcher, err := newReMatcher(filter) + require.NoError(t, err) + + result := matcher.Matches("my_label") + require.True(t, result) + + result = matcher.Matches("your_label") + require.False(t, result) +} + +func TestReMatcherIsNot(t *testing.T) { + filter := config.RegexFilter{ + IsNot: "my_.*", + } + + matcher, err := newReMatcher(filter) + require.NoError(t, err) + + result := matcher.Matches("my_label") + require.False(t, result) + + result = matcher.Matches("your_label") + require.True(t, result) +} + +func TestEnforcesIsOrIsNotButNotBoth(t *testing.T) { + filter := config.RegexFilter{ + Is: "my_.*", + IsNot: "your_.*", + } + + _, err := newReMatcher(filter) + require.Error(t, err) +} diff --git a/pkg/external-provider/series_converter.go b/pkg/external-provider/series_converter.go new file mode 100644 index 00000000..f4fe222c --- /dev/null +++ b/pkg/external-provider/series_converter.go @@ -0,0 +1,276 @@ +package provider + +import ( + "fmt" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/selection" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" + "github.com/directxman12/k8s-prometheus-adapter/pkg/naming" +) + +// SeriesConverter knows how to convert Prometheus series names and label names to +// metrics API resources, and vice-versa. SeriesConverters should be safe to access +// concurrently. Returned group-resources are "normalized" as per the +// MetricInfo#Normalized method. Group-resources passed as arguments must +// themselves be normalized. +type SeriesConverter interface { + // Selector produces the appropriate Prometheus series selector to match all + // series handlable by this converter. + Selector() prom.Selector + // FilterSeries checks to see which of the given series match any additional + // constrains beyond the series query. It's assumed that the series given + // already matche the series query. + // FilterSeries(series []prom.Series) []prom.Series + SeriesFilterer() SeriesFilterer + ResourceConverter() naming.ResourceConverter + + QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error) + IdentifySeries(series prom.Series) (seriesIdentity, error) +} + +type seriesIdentity struct { + resources []schema.GroupResource + namespaced bool + name string +} + +func (c *seriesConverter) Selector() prom.Selector { + return c.seriesQuery +} + +type seriesConverter struct { + seriesQuery prom.Selector + + resourceConverter naming.ResourceConverter + queryBuilder QueryBuilder + seriesFilterer SeriesFilterer + metricNamer MetricNamer + mapper apimeta.RESTMapper +} + +// queryTemplateArgs are the arguments for the metrics query template. +type queryTemplateArgs struct { + Series string + LabelMatchers string + LabelValuesByName map[string][]string + GroupBy string + GroupBySlice []string +} + +func (c *seriesConverter) IdentifySeries(series prom.Series) (seriesIdentity, error) { + // TODO: warn if it doesn't match any resources + + resources, _ := c.resourceConverter.ResourcesForSeries(series) + name, err := c.metricNamer.MetricNameForSeries(series) + + result := seriesIdentity{ + resources: resources, + namespaced: false, + name: name, + } + + return result, err +} + +func (c *seriesConverter) SeriesFilterer() SeriesFilterer { + return c.seriesFilterer +} + +func (c *seriesConverter) ResourceConverter() naming.ResourceConverter { + return c.resourceConverter +} + +func (c *seriesConverter) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart { + requirements, _ := metricSelector.Requirements() + + selectors := []queryPart{} + for i := 0; i < len(requirements); i++ { + fmt.Println("This is post Requirements", requirements[i].Key(), requirements[i].Values().List(), requirements[i].Operator()) + selector := c.convertRequirement(requirements[i]) + + selectors = append(selectors, selector) + } + + return selectors +} + +func (c *seriesConverter) convertRequirement(requirement labels.Requirement) queryPart { + labelName := requirement.Key() + values := requirement.Values().List() + + return queryPart{ + labelName: labelName, + values: values, + operator: requirement.Operator(), + } +} + +type queryPart struct { + labelName string + values []string + operator selection.Operator +} + +func (c *seriesConverter) buildNamespaceQueryPartForSeries(namespace string) (queryPart, error) { + result := queryPart{} + + // If we've been given a namespace, then we need to set up + // the label requirements to target that namespace. + if namespace != "default" { + namespaceLbl, err := c.resourceConverter.LabelForResource(nsGroupResource) + if err != nil { + return result, err + } + + values := []string{namespace} + + result = queryPart{ + values: values, + labelName: string(namespaceLbl), + operator: selection.Equals, + } + } + + return result, nil +} + +func (c *seriesConverter) buildResourceQueryPartForSeries(resource schema.GroupResource, names ...string) (queryPart, error) { + result := queryPart{} + + // If we've been given a resource, then we need to set up + // the label requirements to target that resource. + resourceLbl, err := c.resourceConverter.LabelForResource(resource) + if err != nil { + return result, err + } + + result = queryPart{ + labelName: string(resourceLbl), + values: names, + operator: selection.Equals, + } + + return result, nil +} + +func (c *seriesConverter) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { + queryParts := []queryPart{} + + // Build up the namespace part of the query. + namespaceQueryPart, err := c.buildNamespaceQueryPartForSeries(namespace) + if err != nil { + return "", err + } + + if namespaceQueryPart.labelName != "" { + queryParts = append(queryParts, namespaceQueryPart) + } + + // Build up the resource part of the query. + resourceQueryPart, err := c.buildResourceQueryPartForSeries(resource, names...) + if err != nil { + return "", err + } + + if resourceQueryPart.labelName != "" { + queryParts = append(queryParts, resourceQueryPart) + } + + return c.queryBuilder.BuildSelector(series, resourceQueryPart.labelName, []string{resourceQueryPart.labelName}, queryParts) +} + +// ConvertersFromConfig produces a MetricNamer for each rule in the given config. +func ConvertersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]SeriesConverter, []error) { + errs := []error{} + converters := []SeriesConverter{} + + for _, rule := range cfg.ExternalRules { + if externalConverter, err := converterFromRule(rule, mapper); err == nil { + converters = append(converters, externalConverter) + } else { + errs = append(errs, err) + } + } + return converters, errs +} + +func converterFromRule(rule config.DiscoveryRule, mapper apimeta.RESTMapper) (SeriesConverter, error) { + var ( + err error + ) + + resourceConverter, err := naming.NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper) + if err != nil { + return nil, fmt.Errorf("unable to create ResourceConverter associated with series query %q: %v", rule.SeriesQuery, err) + } + + queryBuilder, err := NewQueryBuilder(rule.MetricsQuery) + if err != nil { + return nil, fmt.Errorf("unable to create a QueryBuilder associated with series query %q: %v", rule.SeriesQuery, err) + } + + seriesFilterer, err := NewSeriesFilterer(rule.SeriesFilters) + if err != nil { + return nil, fmt.Errorf("unable to create a SeriesFilter associated with series query %q: %v", rule.SeriesQuery, err) + } + + if rule.Name.Matches != "" { + err := seriesFilterer.AddRequirement(config.RegexFilter{Is: rule.Name.Matches}) + if err != nil { + return nil, fmt.Errorf("unable to apply the series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err) + } + } + + metricNamer, err := NewMetricNamer(rule.Name) + if err != nil { + return nil, fmt.Errorf("unable to create a MetricNamer associated with series query %q: %v", rule.SeriesQuery, err) + } + + return &seriesConverter{ + seriesQuery: prom.Selector(rule.SeriesQuery), + mapper: mapper, + resourceConverter: resourceConverter, + queryBuilder: queryBuilder, + seriesFilterer: seriesFilterer, + metricNamer: metricNamer, + }, nil +} + +func (c *seriesConverter) buildNamespaceQueryPartForExternalSeries(namespace string) (queryPart, error) { + namespaceLbl, _ := c.metricNamer.LabelForResource(nsGroupResource) + + return queryPart{ + labelName: string(namespaceLbl), + values: []string{namespace}, + operator: selection.Equals, + }, nil +} + +func (c *seriesConverter) QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error) { + queryParts := []queryPart{} + + if namespace != "default" { + // Build up the namespace part of the query. + namespaceQueryPart, err := c.buildNamespaceQueryPartForExternalSeries(namespace) + if err != nil { + return "", err + } + + queryParts = append(queryParts, namespaceQueryPart) + } + + // Build up the query parts from the selector. + queryParts = append(queryParts, c.createQueryPartsFromSelector(metricSelector)...) + + selector, err := c.queryBuilder.BuildSelector(series, "", []string{}, queryParts) + if err != nil { + return "", err + } + + return selector, nil +} diff --git a/pkg/external-provider/series_filterer.go b/pkg/external-provider/series_filterer.go new file mode 100644 index 00000000..153dd6eb --- /dev/null +++ b/pkg/external-provider/series_filterer.go @@ -0,0 +1,65 @@ +package provider + +import ( + "fmt" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +// SeriesFilterer provides functions for filtering collections of Prometheus series +// to only those that meet certain requirements. +type SeriesFilterer interface { + FilterSeries(series []prom.Series) []prom.Series + AddRequirement(filter config.RegexFilter) error +} + +type seriesFilterer struct { + seriesMatchers []*reMatcher +} + +// NewSeriesFilterer creates a SeriesFilterer that will remove any series that do not +// meet the requirements of the provided RegexFilter(s). +func NewSeriesFilterer(filters []config.RegexFilter) (SeriesFilterer, error) { + seriesMatchers := make([]*reMatcher, len(filters)) + for i, filterRaw := range filters { + matcher, err := newReMatcher(filterRaw) + if err != nil { + return nil, fmt.Errorf("unable to generate series name filter: %v", err) + } + seriesMatchers[i] = matcher + } + + return &seriesFilterer{ + seriesMatchers: seriesMatchers, + }, nil +} + +func (n *seriesFilterer) AddRequirement(filterRaw config.RegexFilter) error { + matcher, err := newReMatcher(filterRaw) + if err != nil { + return fmt.Errorf("unable to generate series name filter: %v", err) + } + + n.seriesMatchers = append(n.seriesMatchers, matcher) + return nil +} + +func (n *seriesFilterer) FilterSeries(initialSeries []prom.Series) []prom.Series { + if len(n.seriesMatchers) == 0 { + return initialSeries + } + + finalSeries := make([]prom.Series, 0, len(initialSeries)) +SeriesLoop: + for _, series := range initialSeries { + for _, matcher := range n.seriesMatchers { + if !matcher.Matches(series.Name) { + continue SeriesLoop + } + } + finalSeries = append(finalSeries, series) + } + + return finalSeries +} diff --git a/pkg/external-provider/series_filterer_test.go b/pkg/external-provider/series_filterer_test.go new file mode 100644 index 00000000..75486570 --- /dev/null +++ b/pkg/external-provider/series_filterer_test.go @@ -0,0 +1,114 @@ +package provider + +import ( + "testing" + + "github.com/stretchr/testify/require" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +func TestPositiveFilter(t *testing.T) { + filters := []config.RegexFilter{ + { + Is: "one_of_(yes|positive|ok)", + }, + } + + series := []string{"one_of_yes", "one_of_no"} + expectedSeries := []string{"one_of_yes"} + + RunSeriesFiltererTest(t, filters, series, expectedSeries) +} + +func TestNegativeFilter(t *testing.T) { + filters := []config.RegexFilter{ + { + IsNot: "one_of_(yes|positive|ok)", + }, + } + + series := []string{"one_of_yes", "one_of_no"} + expectedSeries := []string{"one_of_no"} + + RunSeriesFiltererTest(t, filters, series, expectedSeries) +} + +func TestPositiveAndNegativeFilterError(t *testing.T) { + filters := []config.RegexFilter{ + { + Is: "series_\\d+", + IsNot: "series_[2-3]+", + }, + } + + _, err := NewSeriesFilterer(filters) + require.Error(t, err) +} + +func TestAddRequirementAppliesFilter(t *testing.T) { + seriesNames := []string{"series_1", "series_2", "series_3"} + series := BuildSeriesFromNames(seriesNames) + + filters := []config.RegexFilter{ + { + Is: "series_\\d+", + }, + } + + filterer, err := NewSeriesFilterer(filters) + require.NoError(t, err) + + // Test it once with the default filters. + result := filterer.FilterSeries(series) + expectedSeries := []string{"series_1", "series_2", "series_3"} + VerifyMatches(t, result, expectedSeries) + + // Add a new filter and test again. + filterer.AddRequirement(config.RegexFilter{ + Is: "series_[2-3]", + }) + result = filterer.FilterSeries(series) + expectedSeries = []string{"series_2", "series_3"} + VerifyMatches(t, result, expectedSeries) +} + +func RunSeriesFiltererTest(t *testing.T, filters []config.RegexFilter, seriesNames []string, expectedResults []string) { + series := BuildSeriesFromNames(seriesNames) + + filterer, err := NewSeriesFilterer(filters) + require.NoError(t, err) + + matches := filterer.FilterSeries(series) + + VerifyMatches(t, matches, expectedResults) +} + +func VerifyMatches(t *testing.T, series []prom.Series, expectedResults []string) { + require.Equal(t, len(series), len(expectedResults)) + + existingSeries := make(map[string]bool) + + for _, series := range series { + existingSeries[series.Name] = true + } + + for _, expectedResult := range expectedResults { + _, exists := existingSeries[expectedResult] + + require.True(t, exists) + } +} + +func BuildSeriesFromNames(seriesNames []string) []prom.Series { + series := make([]prom.Series, len(seriesNames)) + + for i, name := range seriesNames { + series[i] = prom.Series{ + Name: name, + } + } + + return series +}