diff --git a/cmd/adapter/app/start.go b/cmd/adapter/app/start.go index ac13095e..507bebb1 100644 --- a/cmd/adapter/app/start.go +++ b/cmd/adapter/app/start.go @@ -82,6 +82,9 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan "interval at which to refresh API discovery information") flags.StringVar(&o.PrometheusURL, "prometheus-url", o.PrometheusURL, "URL and configuration for connecting to Prometheus. Query parameters are used to configure the connection") + flags.StringVar(&o.LabelPrefix, "label-prefix", o.LabelPrefix, + "Prefix to expect on labels referring to pod resources. For example, if the prefix is "+ + "'kube_', any series with the 'kube_pod' label would be considered a pod metric") return cmd } @@ -131,7 +134,7 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) promClient := prom.NewClientForAPI(instrumentedGenericPromClient) - cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval, stopCh) + cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.LabelPrefix, o.MetricsRelistInterval, o.RateInterval, stopCh) server, err := config.Complete().New("prometheus-custom-metrics-adapter", cmProvider) if err != nil { @@ -153,4 +156,7 @@ type PrometheusAdapterServerOptions struct { DiscoveryInterval time.Duration // PrometheusURL is the URL describing how to connect to Prometheus. Query parameters configure connection options. PrometheusURL string + // LabelPrefix is the prefix to expect on labels for Kubernetes resources + // (e.g. if the prefix is "kube_", we'd expect a "kube_pod" label for pod metrics). + LabelPrefix string } diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index 9fa031b2..90d7cdd5 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -45,6 +45,9 @@ const ( // SeriesRegistry provides conversions between Prometheus series and MetricInfo type SeriesRegistry interface { + // Selectors produces the appropriate Prometheus selectors to match all series handlable + // by this registry, as an optimization for SetSeries. + Selectors() []prom.Selector // SetSeries replaces the known series in this registry SetSeries(series []prom.Series) error // ListAllMetrics lists all metrics known to this registry @@ -78,12 +81,21 @@ type basicSeriesRegistry struct { namer metricNamer } +func (r *basicSeriesRegistry) Selectors() []prom.Selector { + // container-specific metrics from cAdvsior have their own form, and need special handling + // TODO: figure out how to determine which metrics on non-namespaced objects are kubernetes-related + containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) + namespacedSel := prom.MatchSeries("", prom.LabelNeq(r.namer.labelPrefix+"namespace", ""), prom.NameNotMatches("^container_.*")) + + return []prom.Selector{containerSel, namespacedSel} +} + func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error { newInfo := make(map[provider.MetricInfo]seriesInfo) for _, series := range newSeries { if strings.HasPrefix(series.Name, "container_") { r.namer.processContainerSeries(series, newInfo) - } else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" { + } else if namespaceLabel, hasNamespaceLabel := series.Labels[pmodel.LabelName(r.namer.labelPrefix+"namespace")]; hasNamespaceLabel && namespaceLabel != "" { // we also handle namespaced metrics here as part of the resource-association logic if err := r.namer.processNamespacedSeries(series, newInfo); err != nil { glog.Errorf("Unable to process namespaced series %q: %v", series.Name, err) @@ -132,6 +144,7 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, nam glog.Errorf("unable to normalize group resource while producing a query: %v", err) return 0, "", "", false } + resourceLbl := r.namer.labelPrefix + singularResource // TODO: support container metrics if info, found := r.info[metricInfo]; found { @@ -148,12 +161,16 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, nam groupBy = "pod_name" } else { // TODO: copy base series labels? - expressions = []string{matcher(singularResource, targetValue)} - groupBy = singularResource + expressions = []string{matcher(resourceLbl, targetValue)} + groupBy = resourceLbl } if metricInfo.Namespaced { - expressions = append(expressions, prom.LabelEq("namespace", namespace)) + prefix := r.namer.labelPrefix + if info.isContainer { + prefix = "" + } + expressions = append(expressions, prom.LabelEq(prefix+"namespace", namespace)) } return info.kind, prom.MatchSeries(info.baseSeries.Name, expressions...), groupBy, true @@ -172,6 +189,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo, glog.Errorf("unable to normalize group resource while matching values to names: %v", err) return nil, false } + resourceLbl := r.namer.labelPrefix + singularResource if info, found := r.info[metricInfo]; found { res := make(map[string]pmodel.SampleValue, len(values)) @@ -181,7 +199,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo, continue } - labelName := pmodel.LabelName(singularResource) + labelName := pmodel.LabelName(resourceLbl) if info.isContainer { labelName = pmodel.LabelName("pod_name") } @@ -201,6 +219,8 @@ type metricNamer struct { overrides map[string]seriesSpec mapper apimeta.RESTMapper + + labelPrefix string } // seriesSpec specifies how to produce metric info for a particular prometheus series source @@ -309,6 +329,10 @@ func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[prov func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) { var res []schema.GroupResource for label := range series.Labels { + if !strings.HasPrefix(string(label), n.labelPrefix) { + continue + } + label = label[len(n.labelPrefix):] // TODO: figure out a way to let people specify a fully-qualified name in label-form gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)}) if err != nil { diff --git a/pkg/custom-provider/metric_namer_test.go b/pkg/custom-provider/metric_namer_test.go index 23cd04c0..c01e25e1 100644 --- a/pkg/custom-provider/metric_namer_test.go +++ b/pkg/custom-provider/metric_namer_test.go @@ -57,7 +57,8 @@ func setupMetricNamer(t *testing.T) *metricNamer { kind: GaugeSeries, }, }, - mapper: restMapper(), + labelPrefix: "kube_", + mapper: restMapper(), } } @@ -166,32 +167,32 @@ func TestSeriesRegistry(t *testing.T) { // a series that should turn into multiple metrics { Name: "ingress_hits_total", - Labels: pmodel.LabelSet{"ingress": "someingress", "service": "somesvc", "pod": "backend1", "namespace": "somens"}, + Labels: pmodel.LabelSet{"kube_ingress": "someingress", "kube_service": "somesvc", "kube_pod": "backend1", "kube_namespace": "somens"}, }, { Name: "ingress_hits_total", - Labels: pmodel.LabelSet{"ingress": "someingress", "service": "somesvc", "pod": "backend2", "namespace": "somens"}, + Labels: pmodel.LabelSet{"kube_ingress": "someingress", "kube_service": "somesvc", "kube_pod": "backend2", "kube_namespace": "somens"}, }, { Name: "service_proxy_packets", - Labels: pmodel.LabelSet{"service": "somesvc", "namespace": "somens"}, + Labels: pmodel.LabelSet{"kube_service": "somesvc", "kube_namespace": "somens"}, }, { Name: "work_queue_wait_seconds_total", - Labels: pmodel.LabelSet{"deployment": "somedep", "namespace": "somens"}, + Labels: pmodel.LabelSet{"kube_deployment": "somedep", "kube_namespace": "somens"}, }, // non-namespaced series { Name: "node_gigawatts", - Labels: pmodel.LabelSet{"node": "somenode"}, + Labels: pmodel.LabelSet{"kube_node": "somenode"}, }, { Name: "volume_claims_total", - Labels: pmodel.LabelSet{"persistentvolume": "somepv"}, + Labels: pmodel.LabelSet{"kube_persistentvolume": "somepv"}, }, { Name: "node_fan_seconds_total", - Labels: pmodel.LabelSet{"node": "somenode"}, + Labels: pmodel.LabelSet{"kube_node": "somenode"}, }, // unrelated series { @@ -204,7 +205,7 @@ func TestSeriesRegistry(t *testing.T) { }, { Name: "admin_reddit_seconds_total", - Labels: pmodel.LabelSet{"admin": "some-admin"}, + Labels: pmodel.LabelSet{"kube_admin": "some-admin"}, }, } @@ -271,7 +272,7 @@ func TestSeriesRegistry(t *testing.T) { resourceNames: []string{"somesvc"}, expectedKind: CounterSeries, - expectedQuery: "ingress_hits_total{service=\"somesvc\",namespace=\"somens\"}", + expectedQuery: "ingress_hits_total{kube_service=\"somesvc\",kube_namespace=\"somens\"}", }, { title: "namespaced metrics counter / multidimensional (ingress)", @@ -280,7 +281,7 @@ func TestSeriesRegistry(t *testing.T) { resourceNames: []string{"someingress"}, expectedKind: CounterSeries, - expectedQuery: "ingress_hits_total{ingress=\"someingress\",namespace=\"somens\"}", + expectedQuery: "ingress_hits_total{kube_ingress=\"someingress\",kube_namespace=\"somens\"}", }, { title: "namespaced metrics counter / multidimensional (pod)", @@ -289,7 +290,7 @@ func TestSeriesRegistry(t *testing.T) { resourceNames: []string{"somepod"}, expectedKind: CounterSeries, - expectedQuery: "ingress_hits_total{pod=\"somepod\",namespace=\"somens\"}", + expectedQuery: "ingress_hits_total{kube_pod=\"somepod\",kube_namespace=\"somens\"}", }, { title: "namespaced metrics gauge", @@ -298,7 +299,7 @@ func TestSeriesRegistry(t *testing.T) { resourceNames: []string{"somesvc"}, expectedKind: GaugeSeries, - expectedQuery: "service_proxy_packets{service=\"somesvc\",namespace=\"somens\"}", + expectedQuery: "service_proxy_packets{kube_service=\"somesvc\",kube_namespace=\"somens\"}", }, { title: "namespaced metrics seconds counter", @@ -307,7 +308,7 @@ func TestSeriesRegistry(t *testing.T) { resourceNames: []string{"somedep"}, expectedKind: SecondsCounterSeries, - expectedQuery: "work_queue_wait_seconds_total{deployment=\"somedep\",namespace=\"somens\"}", + expectedQuery: "work_queue_wait_seconds_total{kube_deployment=\"somedep\",kube_namespace=\"somens\"}", }, // non-namespaced series { @@ -316,7 +317,7 @@ func TestSeriesRegistry(t *testing.T) { resourceNames: []string{"somenode"}, expectedKind: GaugeSeries, - expectedQuery: "node_gigawatts{node=\"somenode\"}", + expectedQuery: "node_gigawatts{kube_node=\"somenode\"}", }, { title: "root scoped metrics counter", @@ -324,7 +325,7 @@ func TestSeriesRegistry(t *testing.T) { resourceNames: []string{"somepv"}, expectedKind: CounterSeries, - expectedQuery: "volume_claims_total{persistentvolume=\"somepv\"}", + expectedQuery: "volume_claims_total{kube_persistentvolume=\"somepv\"}", }, { title: "root scoped metrics seconds counter", @@ -332,7 +333,7 @@ func TestSeriesRegistry(t *testing.T) { resourceNames: []string{"somenode"}, expectedKind: SecondsCounterSeries, - expectedQuery: "node_fan_seconds_total{node=\"somenode\"}", + expectedQuery: "node_fan_seconds_total{kube_node=\"somenode\"}", }, } @@ -347,9 +348,9 @@ func TestSeriesRegistry(t *testing.T) { expectedGroupBy := testCase.expectedGroupBy if expectedGroupBy == "" { - expectedGroupBy = testCase.info.GroupResource.Resource + expectedGroupBy = registry.namer.labelPrefix + testCase.info.GroupResource.Resource } - assert.Equal(expectedGroupBy, groupBy, "%s: metric %v should have produced the correct groupBy clause", testCase.title) + assert.Equal(expectedGroupBy, groupBy, "%s: metric %v should have produced the correct groupBy clause", testCase.title, testCase.info) } allMetrics := registry.ListAllMetrics() diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 801eed78..9a529104 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -50,7 +50,7 @@ type prometheusProvider struct { rateInterval time.Duration } -func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider { +func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider { lister := &cachingMetricsLister{ updateInterval: updateInterval, promClient: promClient, @@ -58,8 +58,9 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP SeriesRegistry: &basicSeriesRegistry{ namer: metricNamer{ // TODO: populate the overrides list - overrides: nil, - mapper: mapper, + overrides: nil, + mapper: mapper, + labelPrefix: labelPrefix, }, }, } @@ -301,13 +302,10 @@ func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { func (l *cachingMetricsLister) updateMetrics() error { startTime := pmodel.Now().Add(-1 * l.updateInterval) - // container-specific metrics from cAdvsior have their own form, and need special handling - containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) - namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) - // TODO: figure out how to determine which metrics on non-namespaced objects are kubernetes-related + sels := l.Selectors() // TODO: use an actual context here - series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, containerSel, namespacedSel) + series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, sels...) if err != nil { return fmt.Errorf("unable to update list of all available metrics: %v", err) } diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go index 679b5854..b5a756e0 100644 --- a/pkg/custom-provider/provider_test.go +++ b/pkg/custom-provider/provider_test.go @@ -90,7 +90,7 @@ func setupPrometheusProvider(t *testing.T, stopCh <-chan struct{}) (provider.Cus fakeProm := &fakePromClient{} fakeKubeClient := &fakedyn.FakeClientPool{} - prov := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute, stopCh) + prov := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, "", fakeProviderUpdateInterval, 1*time.Minute, stopCh) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))