From af9f11c817f7ccbe1f2639aabd07e84ea5d01a62 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Mon, 12 Feb 2018 17:35:32 -0500 Subject: [PATCH] Advanced Configuration This commit introduces advanced configuration. The rate-interval and label-prefix flags are removed, and replaced by a configuration file that allows you to specify series queries and the rules for transforming those into metrics queries and API resources. --- .gitignore | 1 + cmd/adapter/app/start.go | 28 +- docs/sample-config.yaml | 69 ++ pkg/client/interfaces.go | 9 + pkg/config/config.go | 75 ++ pkg/config/default.go | 92 +++ pkg/config/loader.go | 32 + pkg/custom-provider/metric_namer.go | 719 ++++++++++-------- pkg/custom-provider/provider.go | 95 ++- pkg/custom-provider/provider_test.go | 31 +- pkg/custom-provider/series_registry.go | 198 +++++ ..._namer_test.go => series_registry_test.go} | 276 +++---- 12 files changed, 1077 insertions(+), 548 deletions(-) create mode 100644 docs/sample-config.yaml create mode 100644 pkg/config/config.go create mode 100644 pkg/config/default.go create mode 100644 pkg/config/loader.go create mode 100644 pkg/custom-provider/series_registry.go rename pkg/custom-provider/{metric_namer_test.go => series_registry_test.go} (59%) diff --git a/.gitignore b/.gitignore index 69d2ba91..903d1ff3 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *~ vendor _output +deploy/adapter diff --git a/cmd/adapter/app/start.go b/cmd/adapter/app/start.go index 507bebb1..c605d241 100644 --- a/cmd/adapter/app/start.go +++ b/cmd/adapter/app/start.go @@ -32,6 +32,7 @@ import ( prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics" + adaptercfg "github.com/directxman12/k8s-prometheus-adapter/pkg/config" cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/server" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/dynamicmapper" @@ -85,11 +86,28 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan flags.StringVar(&o.LabelPrefix, "label-prefix", o.LabelPrefix, "Prefix to expect on labels referring to pod resources. For example, if the prefix is "+ "'kube_', any series with the 'kube_pod' label would be considered a pod metric") + flags.StringVar(&o.AdapterConfigFile, "config", o.AdapterConfigFile, + "Configuration file containing details of how to transform between Prometheus metrics "+ + "and custom metrics API resources") + + flags.MarkDeprecated("label-prefix", "use --config instead") + flags.MarkDeprecated("discovery-interval", "use --config instead") return cmd } func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct{}) error { + var metricsConfig *adaptercfg.MetricsDiscoveryConfig + if o.AdapterConfigFile != "" { + var err error + metricsConfig, err = adaptercfg.FromFile(o.AdapterConfigFile) + if err != nil { + return fmt.Errorf("unable to load metrics discovery configuration: %v", err) + } + } else { + metricsConfig = adaptercfg.DefaultConfig(o.RateInterval, o.LabelPrefix) + } + config, err := o.Config() if err != nil { return err @@ -134,7 +152,13 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) promClient := prom.NewClientForAPI(instrumentedGenericPromClient) - cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.LabelPrefix, o.MetricsRelistInterval, o.RateInterval, stopCh) + namers, err := cmprov.NamersFromConfig(metricsConfig, dynamicMapper) + if err != nil { + return fmt.Errorf("unable to construct naming scheme from metrics rules: %v", err) + } + + cmProvider, runner := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, namers, o.MetricsRelistInterval) + runner.RunUntil(stopCh) server, err := config.Complete().New("prometheus-custom-metrics-adapter", cmProvider) if err != nil { @@ -159,4 +183,6 @@ type PrometheusAdapterServerOptions struct { // LabelPrefix is the prefix to expect on labels for Kubernetes resources // (e.g. if the prefix is "kube_", we'd expect a "kube_pod" label for pod metrics). LabelPrefix string + // AdapterConfigFile points to the file containing the metrics discovery configuration. + AdapterConfigFile string } diff --git a/docs/sample-config.yaml b/docs/sample-config.yaml new file mode 100644 index 00000000..251ea9a3 --- /dev/null +++ b/docs/sample-config.yaml @@ -0,0 +1,69 @@ +rules: +# Each rule represents a some naming and discovery logic. +# Each rule is executed independently of the others, so +# take care to avoid overlap. As an optimization, rules +# with the same `seriesQuery` but different +# `name` or `seriesFilters` will use only one query to +# Prometheus for discovery. + +# some of these rules are taken from the "default" configuration, which +# can be found in pkg/config/default.go + +# this rule matches cumulative cAdvisor metrics measured in seconds +- seriesQuery: '{__name__=~"^container_.*",container_name!="POD",namespace!="",pod_name!=""}' + resources: + # skip specifying generic resource<->label mappings, and just + # attach only pod and namespace resources by mapping label names to group-resources + overrides: + namespace: {resource: "namespace"}, + pod_name: {resource: "pod"}, + # specify that the `container_` and `_seconds_total` suffixes should be removed. + # this also introduces an implicit filter on metric family names + name: + # we use the value of the capture group implicitly as the API name + # we could also explicitly write `as: "$1"` + matches: "^container_(.*)_seconds_total$" + # specify how to construct a query to fetch samples for a given series + # This is a Go template where the `.Series` and `.LabelMatchers` string values + # are available, and the delimiters are `${` and `}$` to avoid conflicts with + # the prometheus query language + metricsQuery: "sum(rate(${.Series}${${.LabelMatchers}$,container_name!="POD"}[2m])) by (${.GroupBy}$)" + +# this rule matches cumulative cAdvisor metrics not measured in seconds +- seriesQuery: '{__name__=~"^container_.*",container_name!="POD",namespace!="",pod_name!=""}' + resources: + overrides: + namespace: {resource: "namespace"}, + pod_name: {resource: "pod"}, + seriesFilters: + # since this is a superset of the query above, we introduce an additional filter here + - isNot: "^container_.*_seconds_total$" + name: {matches: "^container_(.*)_total$"} + metricsQuery: "sum(rate(${.Series}${${.LabelMatchers}$,container_name!="POD"}[2m])) by (${.GroupBy}$)" + +# this rule matches cumulative non-cAdvisor metrics +- seriesQuery: '{namespace!="",__name__!="^container_.*"}' + name: {matches: "^(.*)_total$"} + resources: + # specify an a generic mapping between resources and labels. This + # is a template, like the `metricsQuery` template, except with the `.Group` + # and `.Resource` strings available. It will also be used to match labels, + # so avoid using template functions which truncate the group or resource. + # Group will be converted to a form acceptible for use as a label automatically. + template: "${.Resource}$" + # if we wanted to, we could also specify overrides here + metricsQuery: "sum(rate(${.Series}${${.LabelMatchers}$,container_name!="POD"}[2m])) by (${.GroupBy}$)" + +# this rule matches only a single metric, explicitly naming it something else +# It's series query *must* return only a single metric family +- seriesQuery: 'cheddar{sharp="true"}' + # this metric will appear as "cheesy_goodness" in the custom metrics API + name: {as: "cheesy_goodness"} + resources: + overrides: + # this should still resolve in our cluster + brand: {group: "cheese.io", resource: "brand"} + metricQuery: 'count(cheddar{sharp="true"})' + +# TODO: should we be able to map to a constant instance of a resource +# (e.g. `resources: {constant: [{resource: "namespace", name: "kube-system"}}]`)? diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index 00ee720d..6c93a0be 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "github.com/prometheus/common/model" @@ -121,3 +122,11 @@ func (s *Series) UnmarshalJSON(data []byte) error { return nil } + +func (s *Series) String() string { + lblStrings := make([]string, 0, len(s.Labels)) + for k, v := range s.Labels { + lblStrings = append(lblStrings, fmt.Sprintf("%s=%q", k, v)) + } + return fmt.Sprintf("%s{%s}", s.Name, strings.Join(lblStrings, ",")) +} diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 00000000..3877d68e --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,75 @@ +package config + +type MetricsDiscoveryConfig struct { + // Rules specifies how to discover and map Prometheus metrics to + // custom metrics API resources. The rules are applied independently, + // and thus must be mutually exclusive. Rules will the same SeriesQuery + // will make only a single API call. + Rules []DiscoveryRule `yaml:"rules"` +} + +// DiscoveryRule describes on set of rules for transforming Prometheus metrics to/from +// custom metrics API resources. +type DiscoveryRule struct { + // SeriesQuery specifies which metrics this rule should consider via a Prometheus query + // series selector query. + SeriesQuery string `yaml:"seriesQuery"` + // SeriesFilters specifies additional regular expressions to be applied on + // the series names returned from the query. This is useful for constraints + // that can't be represented in the SeriesQuery (e.g. series matching `container_.+` + // not matching `container_.+_total`. A filter will be automatically appended to + // match the form specified in Name. + SeriesFilters []RegexFilter `yaml:"seriesFilter"` + // Resources specifies how associated Kubernetes resources should be discovered for + // the given metrics. + Resources ResourceMapping `yaml:"resources"` + // Name specifies how the metric name should be transformed between custom metric + // API resources, and Prometheus metric names. + Name NameMapping `yaml:"name"` + // MetricsQuery specifies modifications to the metrics query, such as converting + // cumulative metrics to rate metrics. It is a template where `.LabelMatchers` is + // a the comma-separated base label matchers and `.Series` is the series name, and + // `.GroupBy` is the comma-separated expected group-by label names. The delimeters + // are `${` and `}$`. + MetricsQuery string `yaml:"metricsQueries,omitempty"` +} + +// RegexFilter is a filter that matches positively or negatively against a regex. +// Only one field may be set at a time. +type RegexFilter struct { + Is string `yaml:"is,omitempty"` + IsNot string `yaml:"isNot,omitempty"` +} + +// ResourceMapping specifies how to map Kubernetes resources to Prometheus labels +type ResourceMapping struct { + // Template specifies a golang string template for converting a Kubernetes + // group-resource to a Prometheus label. The template object contains + // the `.Group` and `.Resource` fields. The `.Group` field will have + // dots replaced with underscores, and the `.Resource` field will be + // singularized. The delimiters are `${` and `}$`. + Template string `yaml:"template,omitempty"` + // Overrides specifies exceptions to the above template, mapping label names + // to group-resources + Overrides map[string]GroupResource `yaml:"overrides,omitempty"` +} + +// GroupResource represents a Kubernetes group-resource. +type GroupResource struct { + Group string `yaml:"group,omitempty"` + Resource string `yaml:"resource"` +} + +// NameMapping specifies how to convert Prometheus metrics +// to/from custom metrics API resources. +type NameMapping struct { + // Matches is a regular expression that is used to match + // Prometheus series names. It may be left blank, in which + // case it is equivalent to `.*`. + Matches string `yaml:"prometheus"` + // As is the name used in the API. Captures from Matches + // are available for use here. If not specified, it defaults + // to $0 if no capture groups are present in Matches, or $1 + // if only one is present, and will error if multiple are. + As string `yaml:"as"` +} diff --git a/pkg/config/default.go b/pkg/config/default.go new file mode 100644 index 00000000..f780e509 --- /dev/null +++ b/pkg/config/default.go @@ -0,0 +1,92 @@ +package config + +import ( + "fmt" + "time" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + pmodel "github.com/prometheus/common/model" +) + +// DefaultConfig returns a configuration equivalent to the former +// pre-advanced-config settings. This means that "normal" series labels +// will be of the form `${.Resource}$`, cadvisor series will be +// of the form `container_`, and have the label `pod_name`. Any series ending +// in total will be treated as a rate metric. +func DefaultConfig(rateInterval time.Duration, labelPrefix string) *MetricsDiscoveryConfig { + return &MetricsDiscoveryConfig{ + Rules: []DiscoveryRule{ + // container seconds rate metrics + { + SeriesQuery: string(prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))), + Resources: ResourceMapping{ + Overrides: map[string]GroupResource{ + "namespace": {Resource: "namespace"}, + "pod_name": {Resource: "pod"}, + }, + }, + Name: NameMapping{Matches: "^container_(.*)_seconds_total$"}, + MetricsQuery: fmt.Sprintf(`sum(rate(${.Series}${${.LabelMatchers}$,container_name!="POD"}[%s])) by (${.GroupBy}$)`, pmodel.Duration(rateInterval).String()), + }, + + // container rate metrics + { + SeriesQuery: string(prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))), + SeriesFilters: []RegexFilter{{IsNot: "^container_.*_seconds_total$"}}, + Resources: ResourceMapping{ + Overrides: map[string]GroupResource{ + "namespace": {Resource: "namespace"}, + "pod_name": {Resource: "pod"}, + }, + }, + Name: NameMapping{Matches: "^container_(.*)_total$"}, + MetricsQuery: fmt.Sprintf(`sum(rate(${.Series}${${.LabelMatchers}$,container_name!="POD"}[%s])) by (${.GroupBy}$)`, pmodel.Duration(rateInterval).String()), + }, + + // container non-cumulative metrics + { + SeriesQuery: string(prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))), + SeriesFilters: []RegexFilter{{IsNot: "^container_.*_total$"}}, + Resources: ResourceMapping{ + Overrides: map[string]GroupResource{ + "namespace": {Resource: "namespace"}, + "pod_name": {Resource: "pod"}, + }, + }, + Name: NameMapping{Matches: "^container_(.*)$"}, + MetricsQuery: `sum(${.Series}${${.LabelMatchers}$,container_name!="POD"}) by (${.GroupBy}$)`, + }, + + // normal non-cumulative metrics + { + SeriesQuery: string(prom.MatchSeries("", prom.LabelNeq(fmt.Sprintf("%snamespace", labelPrefix), ""), prom.NameNotMatches("^container_.*"))), + SeriesFilters: []RegexFilter{{IsNot: ".*_total$"}}, + Resources: ResourceMapping{ + Template: fmt.Sprintf("%s${.Resource}$", labelPrefix), + }, + MetricsQuery: "sum(${.Series}${${.LabelMatchers}$}) by (${.GroupBy}$)", + }, + + // normal rate metrics + { + SeriesQuery: string(prom.MatchSeries("", prom.LabelNeq(fmt.Sprintf("%snamespace", labelPrefix), ""), prom.NameNotMatches("^container_.*"))), + SeriesFilters: []RegexFilter{{IsNot: ".*_seconds_total"}}, + Name: NameMapping{Matches: "^(.*)_total$"}, + Resources: ResourceMapping{ + Template: fmt.Sprintf("%s${.Resource}$", labelPrefix), + }, + MetricsQuery: fmt.Sprintf("sum(rate(${.Series}${${.LabelMatchers}$}[%s])) by (${.GroupBy}$)", pmodel.Duration(rateInterval).String()), + }, + + // seconds rate metrics + { + SeriesQuery: string(prom.MatchSeries("", prom.LabelNeq(fmt.Sprintf("%snamespace", labelPrefix), ""), prom.NameNotMatches("^container_.*"))), + Name: NameMapping{Matches: "^(.*)_seconds_total$"}, + Resources: ResourceMapping{ + Template: fmt.Sprintf("%s${.Resource}$", labelPrefix), + }, + MetricsQuery: fmt.Sprintf("sum(rate(${.Series}${${.LabelMatchers}$}[%s])) by (${.GroupBy}$)", pmodel.Duration(rateInterval).String()), + }, + }, + } +} diff --git a/pkg/config/loader.go b/pkg/config/loader.go new file mode 100644 index 00000000..ecc36a23 --- /dev/null +++ b/pkg/config/loader.go @@ -0,0 +1,32 @@ +package config + +import ( + "fmt" + "io/ioutil" + "os" + + yaml "gopkg.in/yaml.v2" +) + +// FromFile loads the configuration from a particular file. +func FromFile(filename string) (*MetricsDiscoveryConfig, error) { + file, err := os.Open(filename) + defer file.Close() + if err != nil { + return nil, fmt.Errorf("unable to load metrics discovery config file: %v", err) + } + contents, err := ioutil.ReadAll(file) + if err != nil { + return nil, fmt.Errorf("unable to load metrics discovery config file: %v", err) + } + return FromYAML(contents) +} + +// FromYAML loads the configuration from a blob of YAML. +func FromYAML(contents []byte) (*MetricsDiscoveryConfig, error) { + var cfg MetricsDiscoveryConfig + if err := yaml.Unmarshal(contents, &cfg); err != nil { + return nil, fmt.Errorf("unable to parse metrics discovery config: %v", err) + } + return &cfg, nil +} diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index 90d7cdd5..c73b4e03 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -1,367 +1,470 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package provider import ( + "bytes" "fmt" + "regexp" "strings" "sync" + "text/template" + "github.com/golang/glog" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" - "github.com/golang/glog" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" pmodel "github.com/prometheus/common/model" ) -// NB: container metrics sourced from cAdvisor don't consistently follow naming conventions, -// so we need to whitelist them and handle them on a case-by-case basis. Metrics ending in `_total` -// *should* be counters, but may actually be guages in this case. +var nsGroupResource = schema.GroupResource{Resource: "namespaces"} +var groupNameSanitizer = strings.NewReplacer(".", "_", "-", "_") -// SeriesType represents the kind of series backing a metric. -type SeriesType int - -const ( - CounterSeries SeriesType = iota - SecondsCounterSeries - GaugeSeries -) - -// SeriesRegistry provides conversions between Prometheus series and MetricInfo -type SeriesRegistry interface { - // Selectors produces the appropriate Prometheus selectors to match all series handlable - // by this registry, as an optimization for SetSeries. - Selectors() []prom.Selector - // SetSeries replaces the known series in this registry - SetSeries(series []prom.Series) error - // ListAllMetrics lists all metrics known to this registry - ListAllMetrics() []provider.MetricInfo - // SeriesForMetric looks up the minimum required series information to make a query for the given metric - // against the given resource (namespace may be empty for non-namespaced resources) - QueryForMetric(info provider.MetricInfo, namespace string, resourceNames ...string) (kind SeriesType, query prom.Selector, groupBy string, found bool) - // MatchValuesToNames matches result values to resource names for the given metric and value set - MatchValuesToNames(metricInfo provider.MetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) +// MetricNamer knows how to convert Prometheus series names and label names to +// metrics API resources, and vice-versa. MetricNamers should be safe to access +// concurrently. Returned group-resources are "normalized" as per the +// MetricInfo#Normalized method. Group-resources passed as arguments must +// themselves be normalized. +type MetricNamer interface { + // Selector produces the appropriate Prometheus series selector to match all + // series handlable by this namer. + Selector() prom.Selector + // FilterSeries checks to see which of the given series match any additional + // constrains beyond the series query. It's assumed that the series given + // already matche the series query. + FilterSeries(series []prom.Series) []prom.Series + // ResourcesForSeries returns the group-resources associated with the given series, + // as well as whether or not the given series has the "namespace" resource). + ResourcesForSeries(series prom.Series) (res []schema.GroupResource, namespaced bool) + // LabelForResource returns the appropriate label for the given resource. + LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) + // MetricNameForSeries returns the name (as presented in the API) for a given series. + MetricNameForSeries(series prom.Series) (string, error) + // QueryForSeries returns the query for a given series (not API metric name), with + // the given namespace name (if relevant), resource, and resource names. + QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) } -type seriesInfo struct { - // baseSeries represents the minimum information to access a particular series - baseSeries prom.Series - // kind is the type of this series - kind SeriesType - // isContainer indicates if the series is a cAdvisor container_ metric, and thus needs special handling - isContainer bool +// labelGroupResExtractor extracts schema.GroupResources from series labels. +type labelGroupResExtractor struct { + regex *regexp.Regexp + + resourceInd int + groupInd *int + mapper apimeta.RESTMapper } -// overridableSeriesRegistry is a basic SeriesRegistry -type basicSeriesRegistry struct { - mu sync.RWMutex - - // info maps metric info to information about the corresponding series - info map[provider.MetricInfo]seriesInfo - // metrics is the list of all known metrics - metrics []provider.MetricInfo - - // namer is the metricNamer responsible for converting series to metric names and information - namer metricNamer -} - -func (r *basicSeriesRegistry) Selectors() []prom.Selector { - // container-specific metrics from cAdvsior have their own form, and need special handling - // TODO: figure out how to determine which metrics on non-namespaced objects are kubernetes-related - containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) - namespacedSel := prom.MatchSeries("", prom.LabelNeq(r.namer.labelPrefix+"namespace", ""), prom.NameNotMatches("^container_.*")) - - return []prom.Selector{containerSel, namespacedSel} -} - -func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error { - newInfo := make(map[provider.MetricInfo]seriesInfo) - for _, series := range newSeries { - if strings.HasPrefix(series.Name, "container_") { - r.namer.processContainerSeries(series, newInfo) - } else if namespaceLabel, hasNamespaceLabel := series.Labels[pmodel.LabelName(r.namer.labelPrefix+"namespace")]; hasNamespaceLabel && namespaceLabel != "" { - // we also handle namespaced metrics here as part of the resource-association logic - if err := r.namer.processNamespacedSeries(series, newInfo); err != nil { - glog.Errorf("Unable to process namespaced series %q: %v", series.Name, err) - continue - } - } else { - if err := r.namer.processRootScopedSeries(series, newInfo); err != nil { - glog.Errorf("Unable to process root-scoped series %q: %v", series.Name, err) - continue - } - } +// newLabelGroupResExtractor creates a new labelGroupResExtractor for labels whose form +// matches the given template. It does so by creating a regular expression from the template, +// so anything in the template which limits resource or group name length will cause issues. +func newLabelGroupResExtractor(labelTemplate *template.Template) (*labelGroupResExtractor, error) { + labelRegexBuff := new(bytes.Buffer) + if err := labelTemplate.Execute(labelRegexBuff, schema.GroupResource{"(?P.+?)", "(?P.+?)"}); err != nil { + return nil, fmt.Errorf("unable to convert label template to matcher: %v", err) } - - newMetrics := make([]provider.MetricInfo, 0, len(newInfo)) - for info := range newInfo { - newMetrics = append(newMetrics, info) + if labelRegexBuff.Len() == 0 { + return nil, fmt.Errorf("unable to convert label template to matcher: empty template") } - - r.mu.Lock() - defer r.mu.Unlock() - - r.info = newInfo - r.metrics = newMetrics - - return nil -} - -func (r *basicSeriesRegistry) ListAllMetrics() []provider.MetricInfo { - r.mu.RLock() - defer r.mu.RUnlock() - - return r.metrics -} - -func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, namespace string, resourceNames ...string) (kind SeriesType, query prom.Selector, groupBy string, found bool) { - r.mu.RLock() - defer r.mu.RUnlock() - - if len(resourceNames) == 0 { - glog.Errorf("no resource names requested while producing a query for metric %s", metricInfo.String()) - return 0, "", "", false - } - - metricInfo, singularResource, err := metricInfo.Normalized(r.namer.mapper) + labelRegexRaw := "^" + labelRegexBuff.String() + "$" + labelRegex, err := regexp.Compile(labelRegexRaw) if err != nil { - glog.Errorf("unable to normalize group resource while producing a query: %v", err) - return 0, "", "", false - } - resourceLbl := r.namer.labelPrefix + singularResource - - // TODO: support container metrics - if info, found := r.info[metricInfo]; found { - targetValue := resourceNames[0] - matcher := prom.LabelEq - if len(resourceNames) > 1 { - targetValue = strings.Join(resourceNames, "|") - matcher = prom.LabelMatches - } - - var expressions []string - if info.isContainer { - expressions = []string{matcher("pod_name", targetValue), prom.LabelNeq("container_name", "POD")} - groupBy = "pod_name" - } else { - // TODO: copy base series labels? - expressions = []string{matcher(resourceLbl, targetValue)} - groupBy = resourceLbl - } - - if metricInfo.Namespaced { - prefix := r.namer.labelPrefix - if info.isContainer { - prefix = "" - } - expressions = append(expressions, prom.LabelEq(prefix+"namespace", namespace)) - } - - return info.kind, prom.MatchSeries(info.baseSeries.Name, expressions...), groupBy, true + return nil, fmt.Errorf("unable to convert label template to matcher: %v", err) } - glog.V(10).Infof("metric %v not registered", metricInfo) - return 0, "", "", false + var groupInd *int + var resInd *int + + for i, name := range labelRegex.SubexpNames() { + switch name { + case "group": + ind := i // copy to avoid iteration variable reference + groupInd = &ind + case "resource": + ind := i // copy to avoid iteration variable reference + resInd = &ind + } + } + + if resInd == nil { + return nil, fmt.Errorf("must include at least `{{.Resource}}` in the label template") + } + + return &labelGroupResExtractor{ + regex: labelRegex, + resourceInd: *resInd, + groupInd: groupInd, + }, nil } -func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) { - r.mu.RLock() - defer r.mu.RUnlock() - - metricInfo, singularResource, err := metricInfo.Normalized(r.namer.mapper) - if err != nil { - glog.Errorf("unable to normalize group resource while matching values to names: %v", err) - return nil, false - } - resourceLbl := r.namer.labelPrefix + singularResource - - if info, found := r.info[metricInfo]; found { - res := make(map[string]pmodel.SampleValue, len(values)) - for _, val := range values { - if val == nil { - // skip empty values - continue - } - - labelName := pmodel.LabelName(resourceLbl) - if info.isContainer { - labelName = pmodel.LabelName("pod_name") - } - res[string(val.Metric[labelName])] = val.Value +// GroupResourceForLabel extracts a schema.GroupResource from the given label, if possible. +// The second argument indicates whether or not a potential group-resource was found in this label. +func (e *labelGroupResExtractor) GroupResourceForLabel(lbl pmodel.LabelName) (schema.GroupResource, bool) { + matchGroups := e.regex.FindStringSubmatch(string(lbl)) + if matchGroups != nil { + group := "" + if e.groupInd != nil { + group = matchGroups[*e.groupInd] } - return res, true + return schema.GroupResource{ + Group: group, + Resource: matchGroups[e.resourceInd], + }, true } - return nil, false + return schema.GroupResource{}, false } -// metricNamer knows how to construct MetricInfo out of raw prometheus series descriptions. -type metricNamer struct { - // overrides contains the list of container metrics whose naming we want to override. - // This is used to properly convert certain cAdvisor container metrics. - overrides map[string]seriesSpec - - mapper apimeta.RESTMapper - - labelPrefix string +func (r *metricNamer) Selector() prom.Selector { + return r.seriesQuery } -// seriesSpec specifies how to produce metric info for a particular prometheus series source -type seriesSpec struct { - // metricName is the desired output API metric name - metricName string - // kind indicates whether or not this metric is cumulative, - // and thus has to be calculated as a rate when returning it - kind SeriesType +// reMatcher either positively or negatively matches a regex +type reMatcher struct { + regex *regexp.Regexp + positive bool } -// processContainerSeries performs special work to extract metric definitions -// from cAdvisor-sourced container metrics, which don't particularly follow any useful conventions consistently. -func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) { +func newReMatcher(cfg config.RegexFilter) (*reMatcher, error) { + if cfg.Is != "" && cfg.IsNot != "" { + return nil, fmt.Errorf("cannot have both an `is` (%q) and `isNot` (%q) expression in a single filter", cfg.Is, cfg.IsNot) + } + if cfg.Is == "" && cfg.IsNot == "" { + return nil, fmt.Errorf("must have either an `is` or `isNot` expression in a filter") + } - originalName := series.Name - - var name string - metricKind := GaugeSeries - if override, hasOverride := n.overrides[series.Name]; hasOverride { - name = override.metricName - metricKind = override.kind + var positive bool + var regexRaw string + if cfg.Is != "" { + positive = true + regexRaw = cfg.Is } else { - // chop of the "container_" prefix - series.Name = series.Name[10:] - name, metricKind = n.metricNameFromSeries(series) + positive = false + regexRaw = cfg.IsNot } - info := provider.MetricInfo{ - GroupResource: schema.GroupResource{Resource: "pods"}, - Namespaced: true, - Metric: name, - } - - infos[info] = seriesInfo{ - kind: metricKind, - baseSeries: prom.Series{Name: originalName}, - isContainer: true, - } -} - -// processNamespacedSeries adds the metric info for the given generic namespaced series to -// the map of metric info. -func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { - // NB: all errors must occur *before* we save the series info - name, metricKind := n.metricNameFromSeries(series) - resources, err := n.groupResourcesFromSeries(series) + regex, err := regexp.Compile(regexRaw) if err != nil { - return fmt.Errorf("unable to process prometheus series %s: %v", series.Name, err) + return nil, fmt.Errorf("unable to compile series filter %q: %v", regexRaw, err) } - // we add one metric for each resource that this could describe - for _, resource := range resources { - info := provider.MetricInfo{ - GroupResource: resource, - Namespaced: true, - Metric: name, - } - - // metrics describing namespaces aren't considered to be namespaced - if resource == (schema.GroupResource{Resource: "namespaces"}) { - info.Namespaced = false - } - - infos[info] = seriesInfo{ - kind: metricKind, - baseSeries: prom.Series{Name: series.Name}, - } - } - - return nil + return &reMatcher{ + regex: regex, + positive: positive, + }, nil } -// processesRootScopedSeries adds the metric info for the given generic namespaced series to -// the map of metric info. -func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { - // NB: all errors must occur *before* we save the series info - name, metricKind := n.metricNameFromSeries(series) - resources, err := n.groupResourcesFromSeries(series) - if err != nil { - return fmt.Errorf("unable to process prometheus series %s: %v", series.Name, err) - } - - // we add one metric for each resource that this could describe - for _, resource := range resources { - info := provider.MetricInfo{ - GroupResource: resource, - Namespaced: false, - Metric: name, - } - - infos[info] = seriesInfo{ - kind: metricKind, - baseSeries: prom.Series{Name: series.Name}, - } - } - - return nil +func (m *reMatcher) Matches(val string) bool { + return m.regex.MatchString(val) == m.positive } -// groupResourceFromSeries collects the possible group-resources that this series could describe by -// going through each label, checking to see if it corresponds to a known resource. For instance, -// a series `ingress_http_hits_total{pod="foo",service="bar",ingress="baz",namespace="ns"}` -// would return three GroupResources: "pods", "services", and "ingresses". -// Returned MetricInfo is equilavent to the "normalized" info produced by metricInfo.Normalized. -func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) { - var res []schema.GroupResource - for label := range series.Labels { - if !strings.HasPrefix(string(label), n.labelPrefix) { - continue - } - label = label[len(n.labelPrefix):] - // TODO: figure out a way to let people specify a fully-qualified name in label-form - gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)}) - if err != nil { - if apimeta.IsNoMatchError(err) { - continue +type metricNamer struct { + seriesQuery prom.Selector + labelTemplate *template.Template + labelResExtractor *labelGroupResExtractor + metricsQueryTemplate *template.Template + nameMatches *regexp.Regexp + nameAs string + seriesMatchers []*reMatcher + + labelResourceMu sync.RWMutex + labelToResource map[pmodel.LabelName]schema.GroupResource + resourceToLabel map[schema.GroupResource]pmodel.LabelName + mapper apimeta.RESTMapper +} + +// queryTemplateArgs are the arguments for the metrics query template. +type queryTemplateArgs struct { + Series string + LabelMatchers string + LabelMatchersSlice []string + GroupBy string + GroupBySlice []string +} + +func (n *metricNamer) FilterSeries(initialSeries []prom.Series) []prom.Series { + if len(n.seriesMatchers) == 0 { + return initialSeries + } + + finalSeries := make([]prom.Series, 0, len(initialSeries)) +SeriesLoop: + for _, series := range initialSeries { + for _, matcher := range n.seriesMatchers { + if !matcher.Matches(series.Name) { + continue SeriesLoop } - return nil, err } - res = append(res, gvr.GroupResource()) + finalSeries = append(finalSeries, series) } - return res, nil + return finalSeries } -// metricNameFromSeries extracts a metric name from a series name, and indicates -// whether or not that series was a counter. It also has special logic to deal with time-based -// counters, which general get converted to milli-unit rate metrics. -func (n *metricNamer) metricNameFromSeries(series prom.Series) (name string, kind SeriesType) { - kind = GaugeSeries - name = series.Name - if strings.HasSuffix(name, "_total") { - kind = CounterSeries - name = name[:len(name)-6] +func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { + var exprs []string + if namespace != "" { + namespaceLbl, err := n.LabelForResource(nsGroupResource) + if err != nil { + return "", err + } + exprs = append(exprs, prom.LabelEq(string(namespaceLbl), namespace)) + } - if strings.HasSuffix(name, "_seconds") { - kind = SecondsCounterSeries - name = name[:len(name)-8] + resourceLbl, err := n.LabelForResource(resource) + if err != nil { + return "", err + } + matcher := prom.LabelEq + targetValue := names[0] + if len(names) > 1 { + matcher = prom.LabelMatches + targetValue = strings.Join(names, "|") + } + exprs = append(exprs, matcher(string(resourceLbl), targetValue)) + + args := queryTemplateArgs{ + Series: series, + LabelMatchers: strings.Join(exprs, ","), + LabelMatchersSlice: exprs, + GroupBy: string(resourceLbl), + GroupBySlice: []string{string(resourceLbl)}, + } + queryBuff := new(bytes.Buffer) + if err := n.metricsQueryTemplate.Execute(queryBuff, args); err != nil { + return "", err + } + + if queryBuff.Len() == 0 { + return "", fmt.Errorf("empty query produced by metrics query template") + } + + return prom.Selector(queryBuff.String()), nil +} + +func (n *metricNamer) ResourcesForSeries(series prom.Series) ([]schema.GroupResource, bool) { + // use an updates map to avoid having to drop the read lock to update the cache + // until the end. Since we'll probably have few updates after the first run, + // this should mean that we rarely have to hold the write lock. + var resources []schema.GroupResource + updates := make(map[pmodel.LabelName]schema.GroupResource) + namespaced := false + + // use an anon func to get the right defer behavior + func() { + n.labelResourceMu.RLock() + defer n.labelResourceMu.RUnlock() + + for lbl := range series.Labels { + var groupRes schema.GroupResource + var ok bool + + // check if we have an override + if groupRes, ok = n.labelToResource[lbl]; ok { + resources = append(resources, groupRes) + } else if groupRes, ok = updates[lbl]; ok { + resources = append(resources, groupRes) + } else if n.labelResExtractor != nil { + // if not, check if it matches the form we expect, and if so, + // convert to a group-resource. + if groupRes, ok = n.labelResExtractor.GroupResourceForLabel(lbl); ok { + info, _, err := provider.MetricInfo{GroupResource: groupRes}.Normalized(n.mapper) + if err != nil { + glog.Errorf("unable to normalize group-resource %s from label %q, skipping: %v", groupRes.String(), lbl, err) + continue + } + + groupRes = info.GroupResource + resources = append(resources, groupRes) + updates[lbl] = groupRes + } + } + + if groupRes == nsGroupResource { + namespaced = true + } + } + }() + + // update the cache for next time. This should only be called by discovery, + // so we don't really have to worry about the grap between read and write locks + // (plus, we don't care if someone else updates the cache first, since the results + // are necessarily the same, so at most we've done extra work). + if len(updates) > 0 { + n.labelResourceMu.Lock() + defer n.labelResourceMu.Unlock() + + for lbl, groupRes := range updates { + n.labelToResource[lbl] = groupRes } } - return + return resources, namespaced +} + +func (n *metricNamer) LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) { + n.labelResourceMu.RLock() + // check if we have a cached copy or override + lbl, ok := n.resourceToLabel[resource] + n.labelResourceMu.RUnlock() // release before we call makeLabelForResource + if ok { + return lbl, nil + } + + // NB: we don't actually care about the gap between releasing read lock + // and acquiring the write lock -- if we do duplicate work sometimes, so be + // it, as long as we're correct. + + // otherwise, use the template and save the result + lbl, err := n.makeLabelForResource(resource) + if err != nil { + return "", fmt.Errorf("unable to convert resource %s into label: %v", resource.String(), err) + } + return lbl, nil +} + +// makeLabelForResource constructs a label name for the given resource, and saves the result. +// It must *not* be called under an existing lock. +func (n *metricNamer) makeLabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) { + if n.labelTemplate == nil { + return "", fmt.Errorf("no generic resource label form specified for this metric") + } + buff := new(bytes.Buffer) + + singularRes, err := n.mapper.ResourceSingularizer(resource.Resource) + if err != nil { + return "", fmt.Errorf("unable to singularize resource %s: %v", resource.String, err) + } + convResource := schema.GroupResource{ + Group: groupNameSanitizer.Replace(resource.Group), + Resource: singularRes, + } + + if err := n.labelTemplate.Execute(buff, convResource); err != nil { + return "", err + } + if buff.Len() == 0 { + return "", fmt.Errorf("empty label produced by label template") + } + lbl := pmodel.LabelName(buff.String()) + + n.labelResourceMu.Lock() + defer n.labelResourceMu.Unlock() + + n.resourceToLabel[resource] = lbl + n.labelToResource[lbl] = resource + return lbl, nil +} + +func (n *metricNamer) MetricNameForSeries(series prom.Series) (string, error) { + matches := n.nameMatches.FindStringSubmatchIndex(series.Name) + if matches == nil { + return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, n.nameMatches.String()) + } + outNameBytes := n.nameMatches.ExpandString(nil, n.nameAs, series.Name, matches) + return string(outNameBytes), nil +} + +// NamersFromConfig produces a MetricNamer for each rule in the given config. +func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]MetricNamer, error) { + namers := make([]MetricNamer, len(cfg.Rules)) + + for i, rule := range cfg.Rules { + var labelTemplate *template.Template + var labelResExtractor *labelGroupResExtractor + var err error + if rule.Resources.Template != "" { + labelTemplate, err = template.New("resource-label").Delims("${", "}$").Parse(rule.Resources.Template) + if err != nil { + return nil, fmt.Errorf("unable to parse label template %q associated with series query %q: %v", rule.Resources.Template, rule.SeriesQuery, err) + } + + labelResExtractor, err = newLabelGroupResExtractor(labelTemplate) + if err != nil { + return nil, fmt.Errorf("unable to generate label format from template %q associated with series query %q: %v", rule.Resources.Template, rule.SeriesQuery, err) + } + } + + metricsQueryTemplate, err := template.New("metrics-query").Delims("${", "}$").Parse(rule.MetricsQuery) + if err != nil { + return nil, fmt.Errorf("unable to parse metrics query template %q associated with series query %q: %v", rule.MetricsQuery, rule.SeriesQuery, err) + } + + seriesMatchers := make([]*reMatcher, len(rule.SeriesFilters)) + for i, filterRaw := range rule.SeriesFilters { + matcher, err := newReMatcher(filterRaw) + if err != nil { + return nil, fmt.Errorf("unable to generate series name filter associated with series query %q: %v", rule.SeriesQuery, err) + } + seriesMatchers[i] = matcher + } + if rule.Name.Matches != "" { + matcher, err := newReMatcher(config.RegexFilter{Is: rule.Name.Matches}) + if err != nil { + return nil, fmt.Errorf("unable to generate series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err) + } + seriesMatchers = append(seriesMatchers, matcher) + } + + var nameMatches *regexp.Regexp + if rule.Name.Matches != "" { + nameMatches, err = regexp.Compile(rule.Name.Matches) + if err != nil { + return nil, fmt.Errorf("unable to compile series name match expression %q associated with series query %q: %v", rule.Name.Matches, rule.SeriesQuery, err) + } + } else { + // this will always succeed + nameMatches = regexp.MustCompile(".*") + } + nameAs := rule.Name.As + if nameAs == "" { + // check if we have an obvious default + subexpNames := nameMatches.SubexpNames() + if len(subexpNames) == 1 { + // no capture groups, use the whole thing + nameAs = "$0" + } else if len(subexpNames) == 2 { + // one capture group, use that + nameAs = "$1" + } else { + return nil, fmt.Errorf("must specify an 'as' value for name matcher %q associated with series query %q", rule.Name.Matches, rule.SeriesQuery) + } + } + + namer := &metricNamer{ + seriesQuery: prom.Selector(rule.SeriesQuery), + labelTemplate: labelTemplate, + labelResExtractor: labelResExtractor, + metricsQueryTemplate: metricsQueryTemplate, + mapper: mapper, + nameMatches: nameMatches, + nameAs: nameAs, + seriesMatchers: seriesMatchers, + + labelToResource: make(map[pmodel.LabelName]schema.GroupResource), + resourceToLabel: make(map[schema.GroupResource]pmodel.LabelName), + } + + // invert the structure for consistency with the template + for lbl, groupRes := range rule.Resources.Overrides { + infoRaw := provider.MetricInfo{ + GroupResource: schema.GroupResource{ + Group: groupRes.Group, + Resource: groupRes.Resource, + }, + } + info, _, err := infoRaw.Normalized(mapper) + if err != nil { + return nil, fmt.Errorf("unable to normalize group-resource %v: %v", groupRes, err) + } + + namer.labelToResource[pmodel.LabelName(lbl)] = info.GroupResource + namer.resourceToLabel[info.GroupResource] = pmodel.LabelName(lbl) + } + + namers[i] = namer + } + + return namers, nil } diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 9a529104..b9ea5f1c 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -40,42 +40,40 @@ import ( prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" ) +// Runnable represents something that can be run until told to stop. +type Runnable interface { + // Run runs the runnable forever. + Run() + // RunUntil runs the runnable until the given channel is closed. + RunUntil(stopChan <-chan struct{}) +} + type prometheusProvider struct { mapper apimeta.RESTMapper kubeClient dynamic.ClientPool promClient prom.Client SeriesRegistry - - rateInterval time.Duration } -func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider { +func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) { lister := &cachingMetricsLister{ updateInterval: updateInterval, promClient: promClient, + namers: namers, SeriesRegistry: &basicSeriesRegistry{ - namer: metricNamer{ - // TODO: populate the overrides list - overrides: nil, - mapper: mapper, - labelPrefix: labelPrefix, - }, + mapper: mapper, }, } - lister.RunUntil(stopChan) - return &prometheusProvider{ mapper: mapper, kubeClient: kubeClient, promClient: promClient, SeriesRegistry: lister, - - rateInterval: rateInterval, - } + }, lister } func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) { @@ -132,29 +130,13 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Me } func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace string, names ...string) (pmodel.Vector, error) { - kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...) + query, found := p.QueryForMetric(info, namespace, names...) if !found { return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } - fullQuery := baseQuery - switch kind { - case CounterSeries: - fullQuery = prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String())) - case SecondsCounterSeries: - // TODO: futher modify for seconds? - fullQuery = prom.Selector(prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String()))) - } - - // NB: too small of a rate interval will return no results... - - // sum over all other dimensions of this query (e.g. if we select on route, sum across all pods, - // but if we select on pods, sum across all routes), and split by the dimension of our resource - // TODO: return/populate the by list in SeriesForMetric - fullQuery = prom.Selector(fmt.Sprintf("sum(%s) by (%s)", fullQuery, groupBy)) - // TODO: use an actual context - queryResults, err := p.promClient.Query(context.Background(), pmodel.Now(), fullQuery) + queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), query) if err != nil { glog.Errorf("unable to fetch metrics from prometheus: %v", err) // don't leak implementation details to the user @@ -285,6 +267,7 @@ type cachingMetricsLister struct { promClient prom.Client updateInterval time.Duration + namers []MetricNamer } func (l *cachingMetricsLister) Run() { @@ -302,17 +285,49 @@ func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { func (l *cachingMetricsLister) updateMetrics() error { startTime := pmodel.Now().Add(-1 * l.updateInterval) - sels := l.Selectors() + // don't do duplicate queries when it's just the matchers that change + seriesCacheByQuery := make(map[prom.Selector][]prom.Series) - // TODO: use an actual context here - series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, sels...) - if err != nil { - return fmt.Errorf("unable to update list of all available metrics: %v", err) + // these can take a while on large clusters, so launch in parallel + // and don't duplicate + selectors := make(map[prom.Selector]struct{}) + errs := make(chan error, len(l.namers)) + for _, namer := range l.namers { + sel := namer.Selector() + if _, ok := selectors[sel]; ok { + errs <- nil + continue + } + selectors[sel] = struct{}{} + go func() { + series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel) + if err != nil { + errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err) + return + } + errs <- nil + seriesCacheByQuery[sel] = series + }() } - glog.V(10).Infof("Set available metric list from Prometheus to: %v", series) + // iterate through, blocking until we've got all results + for range l.namers { + if err := <-errs; err != nil { + return fmt.Errorf("unable to update list of all metrics: %v", err) + } + } + close(errs) - l.SetSeries(series) + newSeries := make([][]prom.Series, len(l.namers)) + for i, namer := range l.namers { + series, cached := seriesCacheByQuery[namer.Selector()] + if !cached { + return fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector()) + } + newSeries[i] = namer.FilterSeries(series) + } - return nil + glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) + + return l.SetSeries(newSeries, l.namers) } diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go index b5a756e0..06a69c03 100644 --- a/pkg/custom-provider/provider_test.go +++ b/pkg/custom-provider/provider_test.go @@ -30,6 +30,7 @@ import ( fakedyn "k8s.io/client-go/dynamic/fake" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" pmodel "github.com/prometheus/common/model" ) @@ -86,20 +87,20 @@ func (c *fakePromClient) QueryRange(_ context.Context, r prom.Range, query prom. return prom.QueryResult{}, nil } -func setupPrometheusProvider(t *testing.T, stopCh <-chan struct{}) (provider.CustomMetricsProvider, *fakePromClient) { +func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fakePromClient) { fakeProm := &fakePromClient{} fakeKubeClient := &fakedyn.FakeClientPool{} - prov := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, "", fakeProviderUpdateInterval, 1*time.Minute, stopCh) + cfg := config.DefaultConfig(1*time.Minute, "") + namers, err := NamersFromConfig(cfg, restMapper()) + require.NoError(t, err) + + prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) fakeProm.series = map[prom.Selector][]prom.Series{ containerSel: { - { - Name: "container_actually_gauge_seconds_total", - Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, - }, { Name: "container_some_usage", Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, @@ -130,28 +131,24 @@ func setupPrometheusProvider(t *testing.T, stopCh <-chan struct{}) (provider.Cus func TestListAllMetrics(t *testing.T) { // setup - stopCh := make(chan struct{}) - defer close(stopCh) - prov, fakeProm := setupPrometheusProvider(t, stopCh) + prov, fakeProm := setupPrometheusProvider(t) // assume we have no updates require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start") // set the acceptible interval (now until the next update, with a bit of wiggle room) - startTime := pmodel.Now() - endTime := startTime.Add(fakeProviderUpdateInterval + fakeProviderUpdateInterval/10) - fakeProm.acceptibleInterval = pmodel.Interval{Start: startTime, End: endTime} + startTime := pmodel.Now().Add(-1*fakeProviderUpdateInterval - fakeProviderUpdateInterval/10) + fakeProm.acceptibleInterval = pmodel.Interval{Start: startTime, End: 0} - // wait one update interval (with a bit of wiggle room) - time.Sleep(fakeProviderUpdateInterval + fakeProviderUpdateInterval/10) + // update the metrics (without actually calling RunUntil, so we can avoid timing issues) + lister := prov.(*prometheusProvider).SeriesRegistry.(*cachingMetricsLister) + require.NoError(t, lister.updateMetrics()) // list/sort the metrics actualMetrics := prov.ListAllMetrics() sort.Sort(metricInfoSorter(actualMetrics)) expectedMetrics := []provider.MetricInfo{ - {schema.GroupResource{Resource: "pods"}, true, "actually_gauge"}, - {schema.GroupResource{Resource: "pods"}, true, "some_usage"}, {schema.GroupResource{Resource: "services"}, true, "ingress_hits"}, {schema.GroupResource{Group: "extensions", Resource: "ingresses"}, true, "ingress_hits"}, {schema.GroupResource{Resource: "pods"}, true, "ingress_hits"}, @@ -160,6 +157,8 @@ func TestListAllMetrics(t *testing.T) { {schema.GroupResource{Resource: "namespaces"}, false, "service_proxy_packets"}, {schema.GroupResource{Group: "extensions", Resource: "deployments"}, true, "work_queue_wait"}, {schema.GroupResource{Resource: "namespaces"}, false, "work_queue_wait"}, + {schema.GroupResource{Resource: "namespaces"}, false, "some_usage"}, + {schema.GroupResource{Resource: "pods"}, true, "some_usage"}, } sort.Sort(metricInfoSorter(expectedMetrics)) diff --git a/pkg/custom-provider/series_registry.go b/pkg/custom-provider/series_registry.go new file mode 100644 index 00000000..a1c73a2f --- /dev/null +++ b/pkg/custom-provider/series_registry.go @@ -0,0 +1,198 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "fmt" + "sync" + + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + apimeta "k8s.io/apimachinery/pkg/api/meta" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/golang/glog" + pmodel "github.com/prometheus/common/model" +) + +// NB: container metrics sourced from cAdvisor don't consistently follow naming conventions, +// so we need to whitelist them and handle them on a case-by-case basis. Metrics ending in `_total` +// *should* be counters, but may actually be guages in this case. + +// SeriesType represents the kind of series backing a metric. +type SeriesType int + +const ( + CounterSeries SeriesType = iota + SecondsCounterSeries + GaugeSeries +) + +// SeriesRegistry provides conversions between Prometheus series and MetricInfo +type SeriesRegistry interface { + // SetSeries replaces the known series in this registry. + // Each slice in series should correspond to a MetricNamer in namers. + SetSeries(series [][]prom.Series, namers []MetricNamer) error + // ListAllMetrics lists all metrics known to this registry + ListAllMetrics() []provider.MetricInfo + // SeriesForMetric looks up the minimum required series information to make a query for the given metric + // against the given resource (namespace may be empty for non-namespaced resources) + QueryForMetric(info provider.MetricInfo, namespace string, resourceNames ...string) (query prom.Selector, found bool) + // MatchValuesToNames matches result values to resource names for the given metric and value set + MatchValuesToNames(metricInfo provider.MetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) +} + +type seriesInfo struct { + // seriesName is the name of the corresponding Prometheus series + seriesName string + + // namer is the MetricNamer used to name this series + namer MetricNamer +} + +// overridableSeriesRegistry is a basic SeriesRegistry +type basicSeriesRegistry struct { + mu sync.RWMutex + + // info maps metric info to information about the corresponding series + info map[provider.MetricInfo]seriesInfo + // metrics is the list of all known metrics + metrics []provider.MetricInfo + + mapper apimeta.RESTMapper +} + +func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []MetricNamer) error { + if len(newSeriesSlices) != len(namers) { + return fmt.Errorf("need one set of series per namer") + } + + newInfo := make(map[provider.MetricInfo]seriesInfo) + for i, newSeries := range newSeriesSlices { + namer := namers[i] + for _, series := range newSeries { + // TODO: warn if it doesn't match any resources + resources, namespaced := namer.ResourcesForSeries(series) + name, err := namer.MetricNameForSeries(series) + if err != nil { + glog.Errorf("unable to name series %q, skipping: %v", series.String(), err) + continue + } + for _, resource := range resources { + info := provider.MetricInfo{ + GroupResource: resource, + Namespaced: namespaced, + Metric: name, + } + + // namespace metrics aren't counted as namespaced + if resource == nsGroupResource { + info.Namespaced = false + } + + // we don't need to re-normalize, because the metric namer should have already normalized for us + newInfo[info] = seriesInfo{ + seriesName: series.Name, + namer: namer, + } + } + } + } + + // regenerate metrics + newMetrics := make([]provider.MetricInfo, 0, len(newInfo)) + for info := range newInfo { + newMetrics = append(newMetrics, info) + } + + r.mu.Lock() + defer r.mu.Unlock() + + r.info = newInfo + r.metrics = newMetrics + + return nil +} + +func (r *basicSeriesRegistry) ListAllMetrics() []provider.MetricInfo { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.metrics +} + +func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, namespace string, resourceNames ...string) (prom.Selector, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + if len(resourceNames) == 0 { + glog.Errorf("no resource names requested while producing a query for metric %s", metricInfo.String()) + return "", false + } + + metricInfo, _, err := metricInfo.Normalized(r.mapper) + if err != nil { + glog.Errorf("unable to normalize group resource while producing a query: %v", err) + return "", false + } + + info, infoFound := r.info[metricInfo] + if !infoFound { + glog.V(10).Infof("metric %v not registered", metricInfo) + return "", false + } + + query, err := info.namer.QueryForSeries(info.seriesName, metricInfo.GroupResource, namespace, resourceNames...) + if err != nil { + glog.Errorf("unable to construct query for metric %s: %v", metricInfo.String(), err) + return "", false + } + + return query, true +} + +func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + metricInfo, _, err := metricInfo.Normalized(r.mapper) + if err != nil { + glog.Errorf("unable to normalize group resource while matching values to names: %v", err) + return nil, false + } + + info, infoFound := r.info[metricInfo] + if !infoFound { + return nil, false + } + + resourceLbl, err := info.namer.LabelForResource(metricInfo.GroupResource) + if err != nil { + glog.Errorf("unable to construct resource label for metric %s: %v", metricInfo.String(), err) + return nil, false + } + + res := make(map[string]pmodel.SampleValue, len(values)) + for _, val := range values { + if val == nil { + // skip empty values + continue + } + res[string(val.Metric[resourceLbl])] = val.Value + } + + return res, true +} diff --git a/pkg/custom-provider/metric_namer_test.go b/pkg/custom-provider/series_registry_test.go similarity index 59% rename from pkg/custom-provider/metric_namer_test.go rename to pkg/custom-provider/series_registry_test.go index c01e25e1..6bef6c45 100644 --- a/pkg/custom-provider/metric_namer_test.go +++ b/pkg/custom-provider/series_registry_test.go @@ -19,6 +19,7 @@ package provider import ( "sort" "testing" + "time" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" pmodel "github.com/prometheus/common/model" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" ) // restMapper creates a RESTMapper with just the types we need for @@ -49,122 +51,46 @@ func restMapper() apimeta.RESTMapper { return mapper } -func setupMetricNamer(t *testing.T) *metricNamer { - return &metricNamer{ - overrides: map[string]seriesSpec{ - "container_actually_gauge_seconds_total": { - metricName: "actually_gauge", - kind: GaugeSeries, - }, - }, - labelPrefix: "kube_", - mapper: restMapper(), - } +func setupMetricNamer(t testing.TB) []MetricNamer { + cfg := config.DefaultConfig(1*time.Minute, "kube_") + namers, err := NamersFromConfig(cfg, restMapper()) + require.NoError(t, err) + return namers } -func TestMetricNamerContainerSeries(t *testing.T) { - testCases := []struct { - input prom.Series - outputMetricName string - outputInfo seriesInfo - }{ - { - input: prom.Series{ - Name: "container_actually_gauge_seconds_total", - Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, - }, - outputMetricName: "actually_gauge", - outputInfo: seriesInfo{ - baseSeries: prom.Series{Name: "container_actually_gauge_seconds_total"}, - kind: GaugeSeries, - isContainer: true, - }, - }, - { - input: prom.Series{ - Name: "container_some_usage", - Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, - }, - outputMetricName: "some_usage", - outputInfo: seriesInfo{ - baseSeries: prom.Series{Name: "container_some_usage"}, - kind: GaugeSeries, - isContainer: true, - }, - }, - { - input: prom.Series{ - Name: "container_some_count_total", - Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, - }, - outputMetricName: "some_count", - outputInfo: seriesInfo{ - baseSeries: prom.Series{Name: "container_some_count_total"}, - kind: CounterSeries, - isContainer: true, - }, - }, - { - input: prom.Series{ - Name: "container_some_time_seconds_total", - Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, - }, - outputMetricName: "some_time", - outputInfo: seriesInfo{ - baseSeries: prom.Series{Name: "container_some_time_seconds_total"}, - kind: SecondsCounterSeries, - isContainer: true, - }, - }, - } - - assert := assert.New(t) - - namer := setupMetricNamer(t) - resMap := map[provider.MetricInfo]seriesInfo{} - - for _, test := range testCases { - namer.processContainerSeries(test.input, resMap) - metric := provider.MetricInfo{ - Metric: test.outputMetricName, - GroupResource: schema.GroupResource{Resource: "pods"}, - Namespaced: true, - } - if assert.Contains(resMap, metric) { - assert.Equal(test.outputInfo, resMap[metric]) - } - } -} - -func TestSeriesRegistry(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - namer := setupMetricNamer(t) - registry := &basicSeriesRegistry{ - namer: *namer, - } - - inputSeries := []prom.Series{ - // container series - { - Name: "container_actually_gauge_seconds_total", - Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, - }, - { - Name: "container_some_usage", - Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, - }, - { - Name: "container_some_count_total", - Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, - }, +var seriesRegistryTestSeries = [][]prom.Series{ + // container series + { { Name: "container_some_time_seconds_total", Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, }, - // namespaced series - // a series that should turn into multiple metrics + }, + { + { + Name: "container_some_count_total", + Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + }, + { + { + Name: "container_some_usage", + Labels: pmodel.LabelSet{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + }, + { + // guage metrics + { + Name: "node_gigawatts", + Labels: pmodel.LabelSet{"kube_node": "somenode"}, + }, + { + Name: "service_proxy_packets", + Labels: pmodel.LabelSet{"kube_service": "somesvc", "kube_namespace": "somens"}, + }, + }, + { + // cumulative --> rate metrics { Name: "ingress_hits_total", Labels: pmodel.LabelSet{"kube_ingress": "someingress", "kube_service": "somesvc", "kube_pod": "backend1", "kube_namespace": "somens"}, @@ -174,43 +100,34 @@ func TestSeriesRegistry(t *testing.T) { Labels: pmodel.LabelSet{"kube_ingress": "someingress", "kube_service": "somesvc", "kube_pod": "backend2", "kube_namespace": "somens"}, }, { - Name: "service_proxy_packets", - Labels: pmodel.LabelSet{"kube_service": "somesvc", "kube_namespace": "somens"}, + Name: "volume_claims_total", + Labels: pmodel.LabelSet{"kube_persistentvolume": "somepv"}, }, + }, + { + // cumulative seconds --> rate metrics { Name: "work_queue_wait_seconds_total", Labels: pmodel.LabelSet{"kube_deployment": "somedep", "kube_namespace": "somens"}, }, - // non-namespaced series - { - Name: "node_gigawatts", - Labels: pmodel.LabelSet{"kube_node": "somenode"}, - }, - { - Name: "volume_claims_total", - Labels: pmodel.LabelSet{"kube_persistentvolume": "somepv"}, - }, { Name: "node_fan_seconds_total", Labels: pmodel.LabelSet{"kube_node": "somenode"}, }, - // unrelated series - { - Name: "admin_coffee_liters_total", - Labels: pmodel.LabelSet{"admin": "some-admin"}, - }, - { - Name: "admin_unread_emails", - Labels: pmodel.LabelSet{"admin": "some-admin"}, - }, - { - Name: "admin_reddit_seconds_total", - Labels: pmodel.LabelSet{"kube_admin": "some-admin"}, - }, + }, +} + +func TestSeriesRegistry(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + namers := setupMetricNamer(t) + registry := &basicSeriesRegistry{ + mapper: restMapper(), } // set up the registry - require.NoError(registry.SetSeries(inputSeries)) + require.NoError(registry.SetSeries(seriesRegistryTestSeries, namers)) // make sure each metric got registered and can form queries testCases := []struct { @@ -219,30 +136,16 @@ func TestSeriesRegistry(t *testing.T) { namespace string resourceNames []string - expectedKind SeriesType - expectedQuery string - expectedGroupBy string + expectedQuery string }{ // container metrics - { - title: "container metrics overrides / single resource name", - info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "actually_gauge"}, - namespace: "somens", - resourceNames: []string{"somepod"}, - - expectedKind: GaugeSeries, - expectedQuery: "container_actually_gauge_seconds_total{pod_name=\"somepod\",container_name!=\"POD\",namespace=\"somens\"}", - expectedGroupBy: "pod_name", - }, { title: "container metrics gauge / multiple resource names", info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_usage"}, namespace: "somens", resourceNames: []string{"somepod1", "somepod2"}, - expectedKind: GaugeSeries, - expectedQuery: "container_some_usage{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}", - expectedGroupBy: "pod_name", + expectedQuery: "sum(container_some_usage{namespace=\"somens\",pod_name=~\"somepod1|somepod2\",container_name!=\"POD\"}) by (pod_name)", }, { title: "container metrics counter", @@ -250,9 +153,7 @@ func TestSeriesRegistry(t *testing.T) { namespace: "somens", resourceNames: []string{"somepod1", "somepod2"}, - expectedKind: CounterSeries, - expectedQuery: "container_some_count_total{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}", - expectedGroupBy: "pod_name", + expectedQuery: "sum(rate(container_some_count_total{namespace=\"somens\",pod_name=~\"somepod1|somepod2\",container_name!=\"POD\"}[1m])) by (pod_name)", }, { title: "container metrics seconds counter", @@ -260,9 +161,7 @@ func TestSeriesRegistry(t *testing.T) { namespace: "somens", resourceNames: []string{"somepod1", "somepod2"}, - expectedKind: SecondsCounterSeries, - expectedQuery: "container_some_time_seconds_total{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}", - expectedGroupBy: "pod_name", + expectedQuery: "sum(rate(container_some_time_seconds_total{namespace=\"somens\",pod_name=~\"somepod1|somepod2\",container_name!=\"POD\"}[1m])) by (pod_name)", }, // namespaced metrics { @@ -271,8 +170,7 @@ func TestSeriesRegistry(t *testing.T) { namespace: "somens", resourceNames: []string{"somesvc"}, - expectedKind: CounterSeries, - expectedQuery: "ingress_hits_total{kube_service=\"somesvc\",kube_namespace=\"somens\"}", + expectedQuery: "sum(rate(ingress_hits_total{kube_namespace=\"somens\",kube_service=\"somesvc\"}[1m])) by (kube_service)", }, { title: "namespaced metrics counter / multidimensional (ingress)", @@ -280,8 +178,7 @@ func TestSeriesRegistry(t *testing.T) { namespace: "somens", resourceNames: []string{"someingress"}, - expectedKind: CounterSeries, - expectedQuery: "ingress_hits_total{kube_ingress=\"someingress\",kube_namespace=\"somens\"}", + expectedQuery: "sum(rate(ingress_hits_total{kube_namespace=\"somens\",kube_ingress=\"someingress\"}[1m])) by (kube_ingress)", }, { title: "namespaced metrics counter / multidimensional (pod)", @@ -289,8 +186,7 @@ func TestSeriesRegistry(t *testing.T) { namespace: "somens", resourceNames: []string{"somepod"}, - expectedKind: CounterSeries, - expectedQuery: "ingress_hits_total{kube_pod=\"somepod\",kube_namespace=\"somens\"}", + expectedQuery: "sum(rate(ingress_hits_total{kube_namespace=\"somens\",kube_pod=\"somepod\"}[1m])) by (kube_pod)", }, { title: "namespaced metrics gauge", @@ -298,8 +194,7 @@ func TestSeriesRegistry(t *testing.T) { namespace: "somens", resourceNames: []string{"somesvc"}, - expectedKind: GaugeSeries, - expectedQuery: "service_proxy_packets{kube_service=\"somesvc\",kube_namespace=\"somens\"}", + expectedQuery: "sum(service_proxy_packets{kube_namespace=\"somens\",kube_service=\"somesvc\"}) by (kube_service)", }, { title: "namespaced metrics seconds counter", @@ -307,8 +202,7 @@ func TestSeriesRegistry(t *testing.T) { namespace: "somens", resourceNames: []string{"somedep"}, - expectedKind: SecondsCounterSeries, - expectedQuery: "work_queue_wait_seconds_total{kube_deployment=\"somedep\",kube_namespace=\"somens\"}", + expectedQuery: "sum(rate(work_queue_wait_seconds_total{kube_namespace=\"somens\",kube_deployment=\"somedep\"}[1m])) by (kube_deployment)", }, // non-namespaced series { @@ -316,49 +210,41 @@ func TestSeriesRegistry(t *testing.T) { info: provider.MetricInfo{schema.GroupResource{Resource: "node"}, false, "node_gigawatts"}, resourceNames: []string{"somenode"}, - expectedKind: GaugeSeries, - expectedQuery: "node_gigawatts{kube_node=\"somenode\"}", + expectedQuery: "sum(node_gigawatts{kube_node=\"somenode\"}) by (kube_node)", }, { title: "root scoped metrics counter", info: provider.MetricInfo{schema.GroupResource{Resource: "persistentvolume"}, false, "volume_claims"}, resourceNames: []string{"somepv"}, - expectedKind: CounterSeries, - expectedQuery: "volume_claims_total{kube_persistentvolume=\"somepv\"}", + expectedQuery: "sum(rate(volume_claims_total{kube_persistentvolume=\"somepv\"}[1m])) by (kube_persistentvolume)", }, { title: "root scoped metrics seconds counter", info: provider.MetricInfo{schema.GroupResource{Resource: "node"}, false, "node_fan"}, resourceNames: []string{"somenode"}, - expectedKind: SecondsCounterSeries, - expectedQuery: "node_fan_seconds_total{kube_node=\"somenode\"}", + expectedQuery: "sum(rate(node_fan_seconds_total{kube_node=\"somenode\"}[1m])) by (kube_node)", }, } for _, testCase := range testCases { - outputKind, outputQuery, groupBy, found := registry.QueryForMetric(testCase.info, testCase.namespace, testCase.resourceNames...) + outputQuery, found := registry.QueryForMetric(testCase.info, testCase.namespace, testCase.resourceNames...) if !assert.True(found, "%s: metric %v should available", testCase.title, testCase.info) { continue } - assert.Equal(testCase.expectedKind, outputKind, "%s: metric %v should have had the right series type", testCase.title, testCase.info) assert.Equal(prom.Selector(testCase.expectedQuery), outputQuery, "%s: metric %v should have produced the correct query for %v in namespace %s", testCase.title, testCase.info, testCase.resourceNames, testCase.namespace) - - expectedGroupBy := testCase.expectedGroupBy - if expectedGroupBy == "" { - expectedGroupBy = registry.namer.labelPrefix + testCase.info.GroupResource.Resource - } - assert.Equal(expectedGroupBy, groupBy, "%s: metric %v should have produced the correct groupBy clause", testCase.title, testCase.info) } allMetrics := registry.ListAllMetrics() expectedMetrics := []provider.MetricInfo{ - {schema.GroupResource{Resource: "pods"}, true, "actually_gauge"}, - {schema.GroupResource{Resource: "pods"}, true, "some_usage"}, {schema.GroupResource{Resource: "pods"}, true, "some_count"}, + {schema.GroupResource{Resource: "namespaces"}, false, "some_count"}, {schema.GroupResource{Resource: "pods"}, true, "some_time"}, + {schema.GroupResource{Resource: "namespaces"}, false, "some_time"}, + {schema.GroupResource{Resource: "pods"}, true, "some_usage"}, + {schema.GroupResource{Resource: "namespaces"}, false, "some_usage"}, {schema.GroupResource{Resource: "services"}, true, "ingress_hits"}, {schema.GroupResource{Group: "extensions", Resource: "ingresses"}, true, "ingress_hits"}, {schema.GroupResource{Resource: "pods"}, true, "ingress_hits"}, @@ -379,6 +265,30 @@ func TestSeriesRegistry(t *testing.T) { assert.Equal(expectedMetrics, allMetrics, "should have listed all expected metrics") } +func BenchmarkSetSeries(b *testing.B) { + namers := setupMetricNamer(b) + registry := &basicSeriesRegistry{ + mapper: restMapper(), + } + + numDuplicates := 10000 + newSeriesSlices := make([][]prom.Series, len(seriesRegistryTestSeries)) + for i, seriesSlice := range seriesRegistryTestSeries { + newSlice := make([]prom.Series, len(seriesSlice)*numDuplicates) + for j, series := range seriesSlice { + for k := 0; k < numDuplicates; k++ { + newSlice[j*numDuplicates+k] = series + } + } + newSeriesSlices[i] = newSlice + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + registry.SetSeries(newSeriesSlices, namers) + } +} + // metricInfoSorter is a sort.Interface for sorting provider.MetricInfos type metricInfoSorter []provider.MetricInfo