Allow setting a resource label prefix

This allows setting a prefix on the labels used to determine which
resources a series belongs to.  The prefix may be set using the
`--label-prefix` flag.
This commit is contained in:
Solly Ross 2018-01-30 17:56:36 -05:00
parent 842b850fcd
commit 0fa0d36e17
5 changed files with 63 additions and 34 deletions

View file

@ -82,6 +82,9 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan
"interval at which to refresh API discovery information") "interval at which to refresh API discovery information")
flags.StringVar(&o.PrometheusURL, "prometheus-url", o.PrometheusURL, flags.StringVar(&o.PrometheusURL, "prometheus-url", o.PrometheusURL,
"URL and configuration for connecting to Prometheus. Query parameters are used to configure the connection") "URL and configuration for connecting to Prometheus. Query parameters are used to configure the connection")
flags.StringVar(&o.LabelPrefix, "label-prefix", o.LabelPrefix,
"Prefix to expect on labels referring to pod resources. For example, if the prefix is "+
"'kube_', any series with the 'kube_pod' label would be considered a pod metric")
return cmd return cmd
} }
@ -131,7 +134,7 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c
instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String())
promClient := prom.NewClientForAPI(instrumentedGenericPromClient) promClient := prom.NewClientForAPI(instrumentedGenericPromClient)
cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval, stopCh) cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.LabelPrefix, o.MetricsRelistInterval, o.RateInterval, stopCh)
server, err := config.Complete().New("prometheus-custom-metrics-adapter", cmProvider) server, err := config.Complete().New("prometheus-custom-metrics-adapter", cmProvider)
if err != nil { if err != nil {
@ -153,4 +156,7 @@ type PrometheusAdapterServerOptions struct {
DiscoveryInterval time.Duration DiscoveryInterval time.Duration
// PrometheusURL is the URL describing how to connect to Prometheus. Query parameters configure connection options. // PrometheusURL is the URL describing how to connect to Prometheus. Query parameters configure connection options.
PrometheusURL string PrometheusURL string
// LabelPrefix is the prefix to expect on labels for Kubernetes resources
// (e.g. if the prefix is "kube_", we'd expect a "kube_pod" label for pod metrics).
LabelPrefix string
} }

View file

@ -45,6 +45,9 @@ const (
// SeriesRegistry provides conversions between Prometheus series and MetricInfo // SeriesRegistry provides conversions between Prometheus series and MetricInfo
type SeriesRegistry interface { type SeriesRegistry interface {
// Selectors produces the appropriate Prometheus selectors to match all series handlable
// by this registry, as an optimization for SetSeries.
Selectors() []prom.Selector
// SetSeries replaces the known series in this registry // SetSeries replaces the known series in this registry
SetSeries(series []prom.Series) error SetSeries(series []prom.Series) error
// ListAllMetrics lists all metrics known to this registry // ListAllMetrics lists all metrics known to this registry
@ -78,12 +81,21 @@ type basicSeriesRegistry struct {
namer metricNamer namer metricNamer
} }
func (r *basicSeriesRegistry) Selectors() []prom.Selector {
// container-specific metrics from cAdvsior have their own form, and need special handling
// TODO: figure out how to determine which metrics on non-namespaced objects are kubernetes-related
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq(r.namer.labelPrefix+"namespace", ""), prom.NameNotMatches("^container_.*"))
return []prom.Selector{containerSel, namespacedSel}
}
func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error { func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error {
newInfo := make(map[provider.MetricInfo]seriesInfo) newInfo := make(map[provider.MetricInfo]seriesInfo)
for _, series := range newSeries { for _, series := range newSeries {
if strings.HasPrefix(series.Name, "container_") { if strings.HasPrefix(series.Name, "container_") {
r.namer.processContainerSeries(series, newInfo) r.namer.processContainerSeries(series, newInfo)
} else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" { } else if namespaceLabel, hasNamespaceLabel := series.Labels[pmodel.LabelName(r.namer.labelPrefix+"namespace")]; hasNamespaceLabel && namespaceLabel != "" {
// we also handle namespaced metrics here as part of the resource-association logic // we also handle namespaced metrics here as part of the resource-association logic
if err := r.namer.processNamespacedSeries(series, newInfo); err != nil { if err := r.namer.processNamespacedSeries(series, newInfo); err != nil {
glog.Errorf("Unable to process namespaced series %q: %v", series.Name, err) glog.Errorf("Unable to process namespaced series %q: %v", series.Name, err)
@ -132,6 +144,7 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, nam
glog.Errorf("unable to normalize group resource while producing a query: %v", err) glog.Errorf("unable to normalize group resource while producing a query: %v", err)
return 0, "", "", false return 0, "", "", false
} }
resourceLbl := r.namer.labelPrefix + singularResource
// TODO: support container metrics // TODO: support container metrics
if info, found := r.info[metricInfo]; found { if info, found := r.info[metricInfo]; found {
@ -148,12 +161,16 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, nam
groupBy = "pod_name" groupBy = "pod_name"
} else { } else {
// TODO: copy base series labels? // TODO: copy base series labels?
expressions = []string{matcher(singularResource, targetValue)} expressions = []string{matcher(resourceLbl, targetValue)}
groupBy = singularResource groupBy = resourceLbl
} }
if metricInfo.Namespaced { if metricInfo.Namespaced {
expressions = append(expressions, prom.LabelEq("namespace", namespace)) prefix := r.namer.labelPrefix
if info.isContainer {
prefix = ""
}
expressions = append(expressions, prom.LabelEq(prefix+"namespace", namespace))
} }
return info.kind, prom.MatchSeries(info.baseSeries.Name, expressions...), groupBy, true return info.kind, prom.MatchSeries(info.baseSeries.Name, expressions...), groupBy, true
@ -172,6 +189,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo,
glog.Errorf("unable to normalize group resource while matching values to names: %v", err) glog.Errorf("unable to normalize group resource while matching values to names: %v", err)
return nil, false return nil, false
} }
resourceLbl := r.namer.labelPrefix + singularResource
if info, found := r.info[metricInfo]; found { if info, found := r.info[metricInfo]; found {
res := make(map[string]pmodel.SampleValue, len(values)) res := make(map[string]pmodel.SampleValue, len(values))
@ -181,7 +199,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo,
continue continue
} }
labelName := pmodel.LabelName(singularResource) labelName := pmodel.LabelName(resourceLbl)
if info.isContainer { if info.isContainer {
labelName = pmodel.LabelName("pod_name") labelName = pmodel.LabelName("pod_name")
} }
@ -201,6 +219,8 @@ type metricNamer struct {
overrides map[string]seriesSpec overrides map[string]seriesSpec
mapper apimeta.RESTMapper mapper apimeta.RESTMapper
labelPrefix string
} }
// seriesSpec specifies how to produce metric info for a particular prometheus series source // seriesSpec specifies how to produce metric info for a particular prometheus series source
@ -309,6 +329,10 @@ func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[prov
func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) { func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) {
var res []schema.GroupResource var res []schema.GroupResource
for label := range series.Labels { for label := range series.Labels {
if !strings.HasPrefix(string(label), n.labelPrefix) {
continue
}
label = label[len(n.labelPrefix):]
// TODO: figure out a way to let people specify a fully-qualified name in label-form // TODO: figure out a way to let people specify a fully-qualified name in label-form
gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)}) gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)})
if err != nil { if err != nil {

View file

@ -57,7 +57,8 @@ func setupMetricNamer(t *testing.T) *metricNamer {
kind: GaugeSeries, kind: GaugeSeries,
}, },
}, },
mapper: restMapper(), labelPrefix: "kube_",
mapper: restMapper(),
} }
} }
@ -166,32 +167,32 @@ func TestSeriesRegistry(t *testing.T) {
// a series that should turn into multiple metrics // a series that should turn into multiple metrics
{ {
Name: "ingress_hits_total", Name: "ingress_hits_total",
Labels: pmodel.LabelSet{"ingress": "someingress", "service": "somesvc", "pod": "backend1", "namespace": "somens"}, Labels: pmodel.LabelSet{"kube_ingress": "someingress", "kube_service": "somesvc", "kube_pod": "backend1", "kube_namespace": "somens"},
}, },
{ {
Name: "ingress_hits_total", Name: "ingress_hits_total",
Labels: pmodel.LabelSet{"ingress": "someingress", "service": "somesvc", "pod": "backend2", "namespace": "somens"}, Labels: pmodel.LabelSet{"kube_ingress": "someingress", "kube_service": "somesvc", "kube_pod": "backend2", "kube_namespace": "somens"},
}, },
{ {
Name: "service_proxy_packets", Name: "service_proxy_packets",
Labels: pmodel.LabelSet{"service": "somesvc", "namespace": "somens"}, Labels: pmodel.LabelSet{"kube_service": "somesvc", "kube_namespace": "somens"},
}, },
{ {
Name: "work_queue_wait_seconds_total", Name: "work_queue_wait_seconds_total",
Labels: pmodel.LabelSet{"deployment": "somedep", "namespace": "somens"}, Labels: pmodel.LabelSet{"kube_deployment": "somedep", "kube_namespace": "somens"},
}, },
// non-namespaced series // non-namespaced series
{ {
Name: "node_gigawatts", Name: "node_gigawatts",
Labels: pmodel.LabelSet{"node": "somenode"}, Labels: pmodel.LabelSet{"kube_node": "somenode"},
}, },
{ {
Name: "volume_claims_total", Name: "volume_claims_total",
Labels: pmodel.LabelSet{"persistentvolume": "somepv"}, Labels: pmodel.LabelSet{"kube_persistentvolume": "somepv"},
}, },
{ {
Name: "node_fan_seconds_total", Name: "node_fan_seconds_total",
Labels: pmodel.LabelSet{"node": "somenode"}, Labels: pmodel.LabelSet{"kube_node": "somenode"},
}, },
// unrelated series // unrelated series
{ {
@ -204,7 +205,7 @@ func TestSeriesRegistry(t *testing.T) {
}, },
{ {
Name: "admin_reddit_seconds_total", Name: "admin_reddit_seconds_total",
Labels: pmodel.LabelSet{"admin": "some-admin"}, Labels: pmodel.LabelSet{"kube_admin": "some-admin"},
}, },
} }
@ -271,7 +272,7 @@ func TestSeriesRegistry(t *testing.T) {
resourceNames: []string{"somesvc"}, resourceNames: []string{"somesvc"},
expectedKind: CounterSeries, expectedKind: CounterSeries,
expectedQuery: "ingress_hits_total{service=\"somesvc\",namespace=\"somens\"}", expectedQuery: "ingress_hits_total{kube_service=\"somesvc\",kube_namespace=\"somens\"}",
}, },
{ {
title: "namespaced metrics counter / multidimensional (ingress)", title: "namespaced metrics counter / multidimensional (ingress)",
@ -280,7 +281,7 @@ func TestSeriesRegistry(t *testing.T) {
resourceNames: []string{"someingress"}, resourceNames: []string{"someingress"},
expectedKind: CounterSeries, expectedKind: CounterSeries,
expectedQuery: "ingress_hits_total{ingress=\"someingress\",namespace=\"somens\"}", expectedQuery: "ingress_hits_total{kube_ingress=\"someingress\",kube_namespace=\"somens\"}",
}, },
{ {
title: "namespaced metrics counter / multidimensional (pod)", title: "namespaced metrics counter / multidimensional (pod)",
@ -289,7 +290,7 @@ func TestSeriesRegistry(t *testing.T) {
resourceNames: []string{"somepod"}, resourceNames: []string{"somepod"},
expectedKind: CounterSeries, expectedKind: CounterSeries,
expectedQuery: "ingress_hits_total{pod=\"somepod\",namespace=\"somens\"}", expectedQuery: "ingress_hits_total{kube_pod=\"somepod\",kube_namespace=\"somens\"}",
}, },
{ {
title: "namespaced metrics gauge", title: "namespaced metrics gauge",
@ -298,7 +299,7 @@ func TestSeriesRegistry(t *testing.T) {
resourceNames: []string{"somesvc"}, resourceNames: []string{"somesvc"},
expectedKind: GaugeSeries, expectedKind: GaugeSeries,
expectedQuery: "service_proxy_packets{service=\"somesvc\",namespace=\"somens\"}", expectedQuery: "service_proxy_packets{kube_service=\"somesvc\",kube_namespace=\"somens\"}",
}, },
{ {
title: "namespaced metrics seconds counter", title: "namespaced metrics seconds counter",
@ -307,7 +308,7 @@ func TestSeriesRegistry(t *testing.T) {
resourceNames: []string{"somedep"}, resourceNames: []string{"somedep"},
expectedKind: SecondsCounterSeries, expectedKind: SecondsCounterSeries,
expectedQuery: "work_queue_wait_seconds_total{deployment=\"somedep\",namespace=\"somens\"}", expectedQuery: "work_queue_wait_seconds_total{kube_deployment=\"somedep\",kube_namespace=\"somens\"}",
}, },
// non-namespaced series // non-namespaced series
{ {
@ -316,7 +317,7 @@ func TestSeriesRegistry(t *testing.T) {
resourceNames: []string{"somenode"}, resourceNames: []string{"somenode"},
expectedKind: GaugeSeries, expectedKind: GaugeSeries,
expectedQuery: "node_gigawatts{node=\"somenode\"}", expectedQuery: "node_gigawatts{kube_node=\"somenode\"}",
}, },
{ {
title: "root scoped metrics counter", title: "root scoped metrics counter",
@ -324,7 +325,7 @@ func TestSeriesRegistry(t *testing.T) {
resourceNames: []string{"somepv"}, resourceNames: []string{"somepv"},
expectedKind: CounterSeries, expectedKind: CounterSeries,
expectedQuery: "volume_claims_total{persistentvolume=\"somepv\"}", expectedQuery: "volume_claims_total{kube_persistentvolume=\"somepv\"}",
}, },
{ {
title: "root scoped metrics seconds counter", title: "root scoped metrics seconds counter",
@ -332,7 +333,7 @@ func TestSeriesRegistry(t *testing.T) {
resourceNames: []string{"somenode"}, resourceNames: []string{"somenode"},
expectedKind: SecondsCounterSeries, expectedKind: SecondsCounterSeries,
expectedQuery: "node_fan_seconds_total{node=\"somenode\"}", expectedQuery: "node_fan_seconds_total{kube_node=\"somenode\"}",
}, },
} }
@ -347,9 +348,9 @@ func TestSeriesRegistry(t *testing.T) {
expectedGroupBy := testCase.expectedGroupBy expectedGroupBy := testCase.expectedGroupBy
if expectedGroupBy == "" { if expectedGroupBy == "" {
expectedGroupBy = testCase.info.GroupResource.Resource expectedGroupBy = registry.namer.labelPrefix + testCase.info.GroupResource.Resource
} }
assert.Equal(expectedGroupBy, groupBy, "%s: metric %v should have produced the correct groupBy clause", testCase.title) assert.Equal(expectedGroupBy, groupBy, "%s: metric %v should have produced the correct groupBy clause", testCase.title, testCase.info)
} }
allMetrics := registry.ListAllMetrics() allMetrics := registry.ListAllMetrics()

View file

@ -50,7 +50,7 @@ type prometheusProvider struct {
rateInterval time.Duration rateInterval time.Duration
} }
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider { func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider {
lister := &cachingMetricsLister{ lister := &cachingMetricsLister{
updateInterval: updateInterval, updateInterval: updateInterval,
promClient: promClient, promClient: promClient,
@ -58,8 +58,9 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP
SeriesRegistry: &basicSeriesRegistry{ SeriesRegistry: &basicSeriesRegistry{
namer: metricNamer{ namer: metricNamer{
// TODO: populate the overrides list // TODO: populate the overrides list
overrides: nil, overrides: nil,
mapper: mapper, mapper: mapper,
labelPrefix: labelPrefix,
}, },
}, },
} }
@ -301,13 +302,10 @@ func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
func (l *cachingMetricsLister) updateMetrics() error { func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.updateInterval) startTime := pmodel.Now().Add(-1 * l.updateInterval)
// container-specific metrics from cAdvsior have their own form, and need special handling sels := l.Selectors()
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))
// TODO: figure out how to determine which metrics on non-namespaced objects are kubernetes-related
// TODO: use an actual context here // TODO: use an actual context here
series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, containerSel, namespacedSel) series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, sels...)
if err != nil { if err != nil {
return fmt.Errorf("unable to update list of all available metrics: %v", err) return fmt.Errorf("unable to update list of all available metrics: %v", err)
} }

View file

@ -90,7 +90,7 @@ func setupPrometheusProvider(t *testing.T, stopCh <-chan struct{}) (provider.Cus
fakeProm := &fakePromClient{} fakeProm := &fakePromClient{}
fakeKubeClient := &fakedyn.FakeClientPool{} fakeKubeClient := &fakedyn.FakeClientPool{}
prov := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute, stopCh) prov := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, "", fakeProviderUpdateInterval, 1*time.Minute, stopCh)
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))