Breaking down some more components, adding tests.

* Some bug fixes found during testing/test repair.
* Trying to tease apart the various responsibilities of `metricNamer` into smaller chunks and adding tests for each individual chunk.
* Updating the `provider_test` and `series_registry_test` to fix failures.
This commit is contained in:
Tony Compton 2018-07-19 10:53:48 -04:00
parent 76217a552b
commit fc88e6e57a
18 changed files with 1038 additions and 516 deletions

View file

@ -105,6 +105,7 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
return return
} }
errs <- nil errs <- nil
//Push into the channel: "this selector produced these series"
selectorSeriesChan <- selectorSeries{ selectorSeriesChan <- selectorSeries{
selector: sel, selector: sel,
series: series, series: series,
@ -116,23 +117,32 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
seriesCacheByQuery := make(map[prom.Selector][]prom.Series) seriesCacheByQuery := make(map[prom.Selector][]prom.Series)
// iterate through, blocking until we've got all results // iterate through, blocking until we've got all results
// We know that, from above, we should have pushed one item into the channel
// for each namer. So here, we'll assume that we should receive one item per namer.
for range l.namers { for range l.namers {
if err := <-errs; err != nil { if err := <-errs; err != nil {
return result, fmt.Errorf("unable to update list of all metrics: %v", err) return result, fmt.Errorf("unable to update list of all metrics: %v", err)
} }
//Receive from the channel: "this selector produced these series"
//We stuff that into this map so that we can collect the data as it arrives
//and then, once we've received it all, we can process it below.
if ss := <-selectorSeriesChan; ss.series != nil { if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series seriesCacheByQuery[ss.selector] = ss.series
} }
} }
close(errs) close(errs)
//Now that we've collected all of the results into `seriesCacheByQuery`
//we can start processing them.
newSeries := make([][]prom.Series, len(l.namers)) newSeries := make([][]prom.Series, len(l.namers))
for i, namer := range l.namers { for i, namer := range l.namers {
series, cached := seriesCacheByQuery[namer.Selector()] series, cached := seriesCacheByQuery[namer.Selector()]
if !cached { if !cached {
return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector()) return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector())
} }
newSeries[i] = namer.FilterSeries(series) //Because namers provide a "post-filtering" option, it's not enough to
//simply take all the series that were produced. We need to further filter them.
newSeries[i] = namer.SeriesFilterer().FilterSeries(series)
} }
glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)

Binary file not shown.

View file

@ -1,81 +1,81 @@
package provider package provider
import ( // import (
"context" // "context"
"time" // "time"
pmodel "github.com/prometheus/common/model" // pmodel "github.com/prometheus/common/model"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" // "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta" // apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels" // "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/dynamic" // "k8s.io/client-go/dynamic"
"k8s.io/metrics/pkg/apis/external_metrics" // "k8s.io/metrics/pkg/apis/external_metrics"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" // prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
) // )
//TODO: Make sure everything has the proper licensing disclosure at the top. // //TODO: Make sure everything has the proper licensing disclosure at the top.
//TODO: I'd like to move these files into another directory, but the compiler was giving me // //TODO: I'd like to move these files into another directory, but the compiler was giving me
//some static around unexported types. I'm going to leave things as-is for now, but it // //some static around unexported types. I'm going to leave things as-is for now, but it
//might be worthwhile to, once the shared components are discovered, move some things around. // //might be worthwhile to, once the shared components are discovered, move some things around.
//TODO: Some of these members may not be necessary. // //TODO: Some of these members may not be necessary.
//Some of them are definitely duplicated between the // //Some of them are definitely duplicated between the
//external and custom providers. They should probably share // //external and custom providers. They should probably share
//the same instances of these objects (especially the SeriesRegistry) // //the same instances of these objects (especially the SeriesRegistry)
//to cut down on unnecessary chatter/bookkeeping. // //to cut down on unnecessary chatter/bookkeeping.
type externalPrometheusProvider struct { // type externalPrometheusProvider struct {
mapper apimeta.RESTMapper // mapper apimeta.RESTMapper
kubeClient dynamic.Interface // kubeClient dynamic.Interface
promClient prom.Client // promClient prom.Client
queryBuilder ExternalMetricQueryBuilder // queryBuilder ExternalMetricQueryBuilder
metricConverter conv.MetricConverter // metricConverter conv.MetricConverter
seriesRegistry SeriesRegistry // seriesRegistry SeriesRegistry
} // }
//TODO: It probably makes more sense to, once this is functional and complete, roll the // //TODO: It probably makes more sense to, once this is functional and complete, roll the
//prometheusProvider and externalPrometheusProvider up into a single type // //prometheusProvider and externalPrometheusProvider up into a single type
//that implements both interfaces or provide a thin wrapper that composes them. // //that implements both interfaces or provide a thin wrapper that composes them.
//Just glancing at start.go looks like it would be much more straightforward // //Just glancing at start.go looks like it would be much more straightforward
//to do one of those two things instead of trying to run the two providers // //to do one of those two things instead of trying to run the two providers
//independently. // //independently.
func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, metricConverter conv.MetricConverter, seriesRegistry SeriesRegistry) (provider.ExternalMetricsProvider, Runnable) { // func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, metricConverter conv.MetricConverter, seriesRegistry SeriesRegistry) (provider.ExternalMetricsProvider, Runnable) {
return &externalPrometheusProvider{ // return &externalPrometheusProvider{
mapper: mapper, // mapper: mapper,
kubeClient: kubeClient, // kubeClient: kubeClient,
promClient: promClient, // promClient: promClient,
metricConverter: metricConverter, // metricConverter: metricConverter,
seriesRegistry: seriesRegistry, // seriesRegistry: seriesRegistry,
}, lister // }, lister
} // }
func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { // func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) {
selector, found := p.seriesRegistry.QueryForExternalMetric(metricInfo, metricSelector) // selector, found := p.seriesRegistry.QueryForExternalMetric(metricInfo, metricSelector)
if !found { // if !found {
return &external_metrics.ExternalMetricValueList{ // return &external_metrics.ExternalMetricValueList{
Items: []external_metrics.ExternalMetricValue{}, // Items: []external_metrics.ExternalMetricValue{},
}, nil // }, nil
} // }
// query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata) // // query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata)
//TODO: I don't yet know what a context is, but apparently I should use a real one. // //TODO: I don't yet know what a context is, but apparently I should use a real one.
queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) // queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector)
if err != nil { // if err != nil {
//TODO: Is this how folks normally deal w/ errors? Just propagate them upwards? // //TODO: Is this how folks normally deal w/ errors? Just propagate them upwards?
//I should go look at what the customProvider does. // //I should go look at what the customProvider does.
return nil, err // return nil, err
} // }
return p.metricConverter.Convert(queryMetadata, queryResults) // return p.metricConverter.Convert(queryMetadata, queryResults)
} // }
func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { // func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
return p.seriesRegistry.ListAllExternalMetrics() // return p.seriesRegistry.ListAllExternalMetrics()
} // }

View file

@ -0,0 +1,83 @@
package provider
import (
"bytes"
"fmt"
"regexp"
"text/template"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
pmodel "github.com/prometheus/common/model"
)
// labelGroupResExtractor extracts schema.GroupResources from series labels.
type labelGroupResExtractor struct {
regex *regexp.Regexp
resourceInd int
groupInd *int
mapper apimeta.RESTMapper
}
// newLabelGroupResExtractor creates a new labelGroupResExtractor for labels whose form
// matches the given template. It does so by creating a regular expression from the template,
// so anything in the template which limits resource or group name length will cause issues.
func newLabelGroupResExtractor(labelTemplate *template.Template) (*labelGroupResExtractor, error) {
labelRegexBuff := new(bytes.Buffer)
if err := labelTemplate.Execute(labelRegexBuff, schema.GroupResource{"(?P<group>.+?)", "(?P<resource>.+?)"}); err != nil {
return nil, fmt.Errorf("unable to convert label template to matcher: %v", err)
}
if labelRegexBuff.Len() == 0 {
return nil, fmt.Errorf("unable to convert label template to matcher: empty template")
}
labelRegexRaw := "^" + labelRegexBuff.String() + "$"
labelRegex, err := regexp.Compile(labelRegexRaw)
if err != nil {
return nil, fmt.Errorf("unable to convert label template to matcher: %v", err)
}
var groupInd *int
var resInd *int
for i, name := range labelRegex.SubexpNames() {
switch name {
case "group":
ind := i // copy to avoid iteration variable reference
groupInd = &ind
case "resource":
ind := i // copy to avoid iteration variable reference
resInd = &ind
}
}
if resInd == nil {
return nil, fmt.Errorf("must include at least `{{.Resource}}` in the label template")
}
return &labelGroupResExtractor{
regex: labelRegex,
resourceInd: *resInd,
groupInd: groupInd,
}, nil
}
// GroupResourceForLabel extracts a schema.GroupResource from the given label, if possible.
// The second argument indicates whether or not a potential group-resource was found in this label.
func (e *labelGroupResExtractor) GroupResourceForLabel(lbl pmodel.LabelName) (schema.GroupResource, bool) {
matchGroups := e.regex.FindStringSubmatch(string(lbl))
if matchGroups != nil {
group := ""
if e.groupInd != nil {
group = matchGroups[*e.groupInd]
}
return schema.GroupResource{
Group: group,
Resource: matchGroups[e.resourceInd],
}, true
}
return schema.GroupResource{}, false
}

View file

@ -0,0 +1,60 @@
package provider
import (
"fmt"
"regexp"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
type MetricNameConverter interface {
GetMetricNameForSeries(series prom.Series) (string, error)
}
type metricNameConverter struct {
nameMatches *regexp.Regexp
nameAs string
}
func NewMetricNameConverter(mapping config.NameMapping) (MetricNameConverter, error) {
var nameMatches *regexp.Regexp
var err error
if mapping.Matches != "" {
nameMatches, err = regexp.Compile(mapping.Matches)
if err != nil {
return nil, fmt.Errorf("unable to compile series name match expression %q: %v", mapping.Matches, err)
}
} else {
// this will always succeed
nameMatches = regexp.MustCompile(".*")
}
nameAs := mapping.As
if nameAs == "" {
// check if we have an obvious default
subexpNames := nameMatches.SubexpNames()
if len(subexpNames) == 1 {
// no capture groups, use the whole thing
nameAs = "$0"
} else if len(subexpNames) == 2 {
// one capture group, use that
nameAs = "$1"
} else {
return nil, fmt.Errorf("must specify an 'as' value for name matcher %q", mapping.Matches)
}
}
return &metricNameConverter{
nameMatches: nameMatches,
nameAs: nameAs,
}, nil
}
func (c *metricNameConverter) GetMetricNameForSeries(series prom.Series) (string, error) {
matches := c.nameMatches.FindStringSubmatchIndex(series.Name)
if matches == nil {
return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, c.nameMatches.String())
}
outNameBytes := c.nameMatches.ExpandString(nil, c.nameAs, series.Name, matches)
return string(outNameBytes), nil
}

View file

@ -0,0 +1,85 @@
package provider
import (
"testing"
"github.com/stretchr/testify/require"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
config "github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
func TestWhenNoMappingMetricNameIsUnaltered(t *testing.T) {
emptyMapping := config.NameMapping{}
RunTest(t, emptyMapping, "my_series", "my_series")
RunTest(t, emptyMapping, "your_series", "your_series")
}
func TestWhenMappingWithOneCaptureGroupMetricNameIsCorrect(t *testing.T) {
mapping := config.NameMapping{
Matches: "my_(.*)",
As: "your_$1",
}
RunTest(t, mapping, "my_requests_per_second", "your_requests_per_second")
}
func TestWhenMappingWithMultipleCaptureGroupsMetricNameIsCorrect(t *testing.T) {
//ExpandString has some strange behavior when using the $1, $2 syntax
//Specifically, it doesn't return the expected values for templates like:
//$1_$2
//You can work around it by using the ${1} syntax.
mapping := config.NameMapping{
Matches: "my_([^_]+)_([^_]+)",
As: "your_${1}_is_${2}_large",
}
RunTest(t, mapping, "my_horse_very", "your_horse_is_very_large")
RunTest(t, mapping, "my_dog_not", "your_dog_is_not_large")
}
func TestAsCanBeInferred(t *testing.T) {
//When we've got one capture group, we should infer that as the target.
mapping := config.NameMapping{
Matches: "my_(.+)",
}
RunTest(t, mapping, "my_test_metric", "test_metric")
//When we have no capture groups, we should infer that the whole thing as the target.
mapping = config.NameMapping{
Matches: "my_metric",
}
RunTest(t, mapping, "my_metric", "my_metric")
}
func TestWhenAsCannotBeInferredError(t *testing.T) {
//More than one capture group should
//result in us giving up on making an educated guess.
mapping := config.NameMapping{
Matches: "my_([^_]+)_([^_]+)",
}
RunTestExpectingError(t, mapping, "my_horse_very")
RunTestExpectingError(t, mapping, "my_dog_not")
}
func RunTest(t *testing.T, mapping config.NameMapping, input string, expectedResult string) {
converter, err := NewMetricNameConverter(mapping)
require.NoError(t, err)
series := prom.Series{
Name: input,
}
actualResult, err := converter.GetMetricNameForSeries(series)
require.NoError(t, err)
require.Equal(t, expectedResult, actualResult)
}
func RunTestExpectingError(t *testing.T, mapping config.NameMapping, input string) {
_, err := NewMetricNameConverter(mapping)
require.Error(t, err)
}

View file

@ -1,26 +1,17 @@
package provider package provider
import ( import (
"bytes"
"fmt" "fmt"
"regexp"
"strings"
"sync"
"text/template"
"github.com/golang/glog"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta" apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config" "github.com/directxman12/k8s-prometheus-adapter/pkg/config"
pmodel "github.com/prometheus/common/model"
) )
var nsGroupResource = schema.GroupResource{Resource: "namespaces"} var nsGroupResource = schema.GroupResource{Resource: "namespaces"}
var groupNameSanitizer = strings.NewReplacer(".", "_", "-", "_")
// MetricNamer knows how to convert Prometheus series names and label names to // MetricNamer knows how to convert Prometheus series names and label names to
// metrics API resources, and vice-versa. MetricNamers should be safe to access // metrics API resources, and vice-versa. MetricNamers should be safe to access
@ -34,146 +25,37 @@ type MetricNamer interface {
// FilterSeries checks to see which of the given series match any additional // FilterSeries checks to see which of the given series match any additional
// constrains beyond the series query. It's assumed that the series given // constrains beyond the series query. It's assumed that the series given
// already matche the series query. // already matche the series query.
FilterSeries(series []prom.Series) []prom.Series // FilterSeries(series []prom.Series) []prom.Series
// ResourcesForSeries returns the group-resources associated with the given series, SeriesFilterer() SeriesFilterer
// as well as whether or not the given series has the "namespace" resource). ResourceConverter() ResourceConverter
ResourcesForSeries(series prom.Series) (res []schema.GroupResource, namespaced bool)
// LabelForResource returns the appropriate label for the given resource.
LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error)
// MetricNameForSeries returns the name (as presented in the API) for a given series. // MetricNameForSeries returns the name (as presented in the API) for a given series.
MetricNameForSeries(series prom.Series) (string, error) // MetricNameForSeries(series prom.Series) (string, error)
// QueryForSeries returns the query for a given series (not API metric name), with // QueryForSeries returns the query for a given series (not API metric name), with
// the given namespace name (if relevant), resource, and resource names. // the given namespace name (if relevant), resource, and resource names.
QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error)
QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error)
IdentifySeries(series prom.Series) (seriesIdentity, error)
} }
// labelGroupResExtractor extracts schema.GroupResources from series labels. type seriesIdentity struct {
type labelGroupResExtractor struct { resources []schema.GroupResource
regex *regexp.Regexp namespaced bool
name string
resourceInd int
groupInd *int
mapper apimeta.RESTMapper
} }
// newLabelGroupResExtractor creates a new labelGroupResExtractor for labels whose form func (n *metricNamer) Selector() prom.Selector {
// matches the given template. It does so by creating a regular expression from the template, return n.seriesQuery
// so anything in the template which limits resource or group name length will cause issues.
func newLabelGroupResExtractor(labelTemplate *template.Template) (*labelGroupResExtractor, error) {
labelRegexBuff := new(bytes.Buffer)
if err := labelTemplate.Execute(labelRegexBuff, schema.GroupResource{"(?P<group>.+?)", "(?P<resource>.+?)"}); err != nil {
return nil, fmt.Errorf("unable to convert label template to matcher: %v", err)
}
if labelRegexBuff.Len() == 0 {
return nil, fmt.Errorf("unable to convert label template to matcher: empty template")
}
labelRegexRaw := "^" + labelRegexBuff.String() + "$"
labelRegex, err := regexp.Compile(labelRegexRaw)
if err != nil {
return nil, fmt.Errorf("unable to convert label template to matcher: %v", err)
}
var groupInd *int
var resInd *int
for i, name := range labelRegex.SubexpNames() {
switch name {
case "group":
ind := i // copy to avoid iteration variable reference
groupInd = &ind
case "resource":
ind := i // copy to avoid iteration variable reference
resInd = &ind
}
}
if resInd == nil {
return nil, fmt.Errorf("must include at least `{{.Resource}}` in the label template")
}
return &labelGroupResExtractor{
regex: labelRegex,
resourceInd: *resInd,
groupInd: groupInd,
}, nil
}
// GroupResourceForLabel extracts a schema.GroupResource from the given label, if possible.
// The second argument indicates whether or not a potential group-resource was found in this label.
func (e *labelGroupResExtractor) GroupResourceForLabel(lbl pmodel.LabelName) (schema.GroupResource, bool) {
matchGroups := e.regex.FindStringSubmatch(string(lbl))
if matchGroups != nil {
group := ""
if e.groupInd != nil {
group = matchGroups[*e.groupInd]
}
return schema.GroupResource{
Group: group,
Resource: matchGroups[e.resourceInd],
}, true
}
return schema.GroupResource{}, false
}
func (r *metricNamer) Selector() prom.Selector {
return r.seriesQuery
}
// reMatcher either positively or negatively matches a regex
type reMatcher struct {
regex *regexp.Regexp
positive bool
}
func newReMatcher(cfg config.RegexFilter) (*reMatcher, error) {
if cfg.Is != "" && cfg.IsNot != "" {
return nil, fmt.Errorf("cannot have both an `is` (%q) and `isNot` (%q) expression in a single filter", cfg.Is, cfg.IsNot)
}
if cfg.Is == "" && cfg.IsNot == "" {
return nil, fmt.Errorf("must have either an `is` or `isNot` expression in a filter")
}
var positive bool
var regexRaw string
if cfg.Is != "" {
positive = true
regexRaw = cfg.Is
} else {
positive = false
regexRaw = cfg.IsNot
}
regex, err := regexp.Compile(regexRaw)
if err != nil {
return nil, fmt.Errorf("unable to compile series filter %q: %v", regexRaw, err)
}
return &reMatcher{
regex: regex,
positive: positive,
}, nil
}
func (m *reMatcher) Matches(val string) bool {
return m.regex.MatchString(val) == m.positive
} }
type metricNamer struct { type metricNamer struct {
seriesQuery prom.Selector seriesQuery prom.Selector
labelTemplate *template.Template
labelResExtractor *labelGroupResExtractor
metricsQueryTemplate *template.Template
nameMatches *regexp.Regexp
nameAs string
seriesMatchers []*reMatcher
labelResourceMu sync.RWMutex resourceConverter ResourceConverter
labelToResource map[pmodel.LabelName]schema.GroupResource queryBuilder QueryBuilder
resourceToLabel map[schema.GroupResource]pmodel.LabelName seriesFilterer SeriesFilterer
mapper apimeta.RESTMapper metricNameConverter MetricNameConverter
mapper apimeta.RESTMapper
metricType config.MetricType metricType config.MetricType
} }
@ -187,23 +69,26 @@ type queryTemplateArgs struct {
GroupBySlice []string GroupBySlice []string
} }
func (n *metricNamer) FilterSeries(initialSeries []prom.Series) []prom.Series { func (n *metricNamer) IdentifySeries(series prom.Series) (seriesIdentity, error) {
if len(n.seriesMatchers) == 0 { // TODO: warn if it doesn't match any resources
return initialSeries resources, namespaced := n.resourceConverter.ResourcesForSeries(series)
name, err := n.metricNameConverter.GetMetricNameForSeries(series)
result := seriesIdentity{
resources: resources,
namespaced: namespaced,
name: name,
} }
finalSeries := make([]prom.Series, 0, len(initialSeries)) return result, err
SeriesLoop: }
for _, series := range initialSeries {
for _, matcher := range n.seriesMatchers {
if !matcher.Matches(series.Name) {
continue SeriesLoop
}
}
finalSeries = append(finalSeries, series)
}
return finalSeries func (n *metricNamer) SeriesFilterer() SeriesFilterer {
return n.seriesFilterer
}
func (n *metricNamer) ResourceConverter() ResourceConverter {
return n.resourceConverter
} }
func (n *metricNamer) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart { func (n *metricNamer) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart {
@ -240,7 +125,7 @@ func (n *metricNamer) buildNamespaceQueryPartForSeries(namespace string) (queryP
//If we've been given a namespace, then we need to set up //If we've been given a namespace, then we need to set up
//the label requirements to target that namespace. //the label requirements to target that namespace.
if namespace != "" { if namespace != "" {
namespaceLbl, err := n.LabelForResource(nsGroupResource) namespaceLbl, err := n.ResourceConverter().LabelForResource(nsGroupResource)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -261,7 +146,7 @@ func (n *metricNamer) buildResourceQueryPartForSeries(resource schema.GroupResou
//If we've been given a resource, then we need to set up //If we've been given a resource, then we need to set up
//the label requirements to target that resource. //the label requirements to target that resource.
resourceLbl, err := n.LabelForResource(resource) resourceLbl, err := n.ResourceConverter().LabelForResource(resource)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -274,34 +159,6 @@ func (n *metricNamer) buildResourceQueryPartForSeries(resource schema.GroupResou
return result, nil return result, nil
} }
func (n *metricNamer) processQueryParts(queryParts []queryPart) ([]string, map[string][]string) {
//Contains the expressions that we want to include as part of the query to Prometheus.
//e.g. "namespace=my-namespace"
//e.g. "some_label=some-value"
var exprs []string
//Contains the list of label values we're targeting, by namespace.
//e.g. "some_label" => ["value-one", "value-two"]
valuesByName := map[string][]string{}
//Convert our query parts into template arguments.
for _, qPart := range queryParts {
targetValue := qPart.values[0]
matcher := prom.LabelEq
if len(qPart.values) > 1 {
targetValue = strings.Join(qPart.values, "|")
matcher = prom.LabelMatches
}
expression := matcher(qPart.labelName, targetValue)
exprs = append(exprs, expression)
valuesByName[qPart.labelName] = qPart.values
}
return exprs, valuesByName
}
func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) {
queryParts := []queryPart{} queryParts := []queryPart{}
@ -321,259 +178,51 @@ func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResourc
queryParts = append(queryParts, resourceQueryPart) queryParts = append(queryParts, resourceQueryPart)
//Convert our query parts into the types we need for our template. return n.queryBuilder.BuildSelector(series, resourceQueryPart.labelName, []string{resourceQueryPart.labelName}, queryParts)
exprs, valuesByName := n.processQueryParts(queryParts)
args := queryTemplateArgs{
Series: series,
LabelMatchers: strings.Join(exprs, ","),
LabelValuesByName: valuesByName,
GroupBy: resourceQueryPart.labelName,
GroupBySlice: []string{resourceQueryPart.labelName},
}
selector, err := n.createSelectorFromTemplateArgs(args)
if err != nil {
return "", err
}
return selector, nil
}
func (n *metricNamer) createSelectorFromTemplateArgs(args queryTemplateArgs) (prom.Selector, error) {
//Turn our template arguments into a Selector.
queryBuff := new(bytes.Buffer)
if err := n.metricsQueryTemplate.Execute(queryBuff, args); err != nil {
return "", err
}
if queryBuff.Len() == 0 {
return "", fmt.Errorf("empty query produced by metrics query template")
}
return prom.Selector(queryBuff.String()), nil
}
func (n *metricNamer) ResourcesForSeries(series prom.Series) ([]schema.GroupResource, bool) {
// use an updates map to avoid having to drop the read lock to update the cache
// until the end. Since we'll probably have few updates after the first run,
// this should mean that we rarely have to hold the write lock.
var resources []schema.GroupResource
updates := make(map[pmodel.LabelName]schema.GroupResource)
namespaced := false
// use an anon func to get the right defer behavior
func() {
n.labelResourceMu.RLock()
defer n.labelResourceMu.RUnlock()
for lbl := range series.Labels {
var groupRes schema.GroupResource
var ok bool
// check if we have an override
if groupRes, ok = n.labelToResource[lbl]; ok {
resources = append(resources, groupRes)
} else if groupRes, ok = updates[lbl]; ok {
resources = append(resources, groupRes)
} else if n.labelResExtractor != nil {
// if not, check if it matches the form we expect, and if so,
// convert to a group-resource.
if groupRes, ok = n.labelResExtractor.GroupResourceForLabel(lbl); ok {
info, _, err := provider.CustomMetricInfo{GroupResource: groupRes}.Normalized(n.mapper)
if err != nil {
glog.Errorf("unable to normalize group-resource %s from label %q, skipping: %v", groupRes.String(), lbl, err)
continue
}
groupRes = info.GroupResource
resources = append(resources, groupRes)
updates[lbl] = groupRes
}
}
if groupRes == nsGroupResource {
namespaced = true
}
}
}()
// update the cache for next time. This should only be called by discovery,
// so we don't really have to worry about the grap between read and write locks
// (plus, we don't care if someone else updates the cache first, since the results
// are necessarily the same, so at most we've done extra work).
if len(updates) > 0 {
n.labelResourceMu.Lock()
defer n.labelResourceMu.Unlock()
for lbl, groupRes := range updates {
n.labelToResource[lbl] = groupRes
}
}
return resources, namespaced
}
func (n *metricNamer) LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) {
n.labelResourceMu.RLock()
// check if we have a cached copy or override
lbl, ok := n.resourceToLabel[resource]
n.labelResourceMu.RUnlock() // release before we call makeLabelForResource
if ok {
return lbl, nil
}
// NB: we don't actually care about the gap between releasing read lock
// and acquiring the write lock -- if we do duplicate work sometimes, so be
// it, as long as we're correct.
// otherwise, use the template and save the result
lbl, err := n.makeLabelForResource(resource)
if err != nil {
return "", fmt.Errorf("unable to convert resource %s into label: %v", resource.String(), err)
}
return lbl, nil
}
// makeLabelForResource constructs a label name for the given resource, and saves the result.
// It must *not* be called under an existing lock.
func (n *metricNamer) makeLabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) {
if n.labelTemplate == nil {
return "", fmt.Errorf("no generic resource label form specified for this metric")
}
buff := new(bytes.Buffer)
singularRes, err := n.mapper.ResourceSingularizer(resource.Resource)
if err != nil {
return "", fmt.Errorf("unable to singularize resource %s: %v", resource.String(), err)
}
convResource := schema.GroupResource{
Group: groupNameSanitizer.Replace(resource.Group),
Resource: singularRes,
}
if err := n.labelTemplate.Execute(buff, convResource); err != nil {
return "", err
}
if buff.Len() == 0 {
return "", fmt.Errorf("empty label produced by label template")
}
lbl := pmodel.LabelName(buff.String())
n.labelResourceMu.Lock()
defer n.labelResourceMu.Unlock()
n.resourceToLabel[resource] = lbl
n.labelToResource[lbl] = resource
return lbl, nil
}
func (n *metricNamer) MetricNameForSeries(series prom.Series) (string, error) {
matches := n.nameMatches.FindStringSubmatchIndex(series.Name)
if matches == nil {
return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, n.nameMatches.String())
}
outNameBytes := n.nameMatches.ExpandString(nil, n.nameAs, series.Name, matches)
return string(outNameBytes), nil
} }
// NamersFromConfig produces a MetricNamer for each rule in the given config. // NamersFromConfig produces a MetricNamer for each rule in the given config.
func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]MetricNamer, error) { func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]MetricNamer, error) {
namers := make([]MetricNamer, len(cfg.Rules)) namers := make([]MetricNamer, len(cfg.Rules))
for i, rule := range cfg.Rules { for i, rule := range cfg.Rules {
var labelTemplate *template.Template
var labelResExtractor *labelGroupResExtractor
var err error var err error
if rule.Resources.Template != "" {
labelTemplate, err = template.New("resource-label").Delims("<<", ">>").Parse(rule.Resources.Template)
if err != nil {
return nil, fmt.Errorf("unable to parse label template %q associated with series query %q: %v", rule.Resources.Template, rule.SeriesQuery, err)
}
labelResExtractor, err = newLabelGroupResExtractor(labelTemplate) resourceConverter, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper)
if err != nil {
return nil, fmt.Errorf("unable to generate label format from template %q associated with series query %q: %v", rule.Resources.Template, rule.SeriesQuery, err)
}
}
metricsQueryTemplate, err := template.New("metrics-query").Delims("<<", ">>").Parse(rule.MetricsQuery)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to parse metrics query template %q associated with series query %q: %v", rule.MetricsQuery, rule.SeriesQuery, err) return nil, fmt.Errorf("unable to create ResourceConverter associated with series query %q: %v", rule.SeriesQuery, err)
} }
seriesMatchers := make([]*reMatcher, len(rule.SeriesFilters)) queryBuilder, err := NewQueryBuilder(rule.MetricsQuery)
for i, filterRaw := range rule.SeriesFilters { if err != nil {
matcher, err := newReMatcher(filterRaw) return nil, fmt.Errorf("unable to create a QueryBuilder associated with series query %q: %v", rule.SeriesQuery, err)
if err != nil {
return nil, fmt.Errorf("unable to generate series name filter associated with series query %q: %v", rule.SeriesQuery, err)
}
seriesMatchers[i] = matcher
}
if rule.Name.Matches != "" {
matcher, err := newReMatcher(config.RegexFilter{Is: rule.Name.Matches})
if err != nil {
return nil, fmt.Errorf("unable to generate series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err)
}
seriesMatchers = append(seriesMatchers, matcher)
} }
var nameMatches *regexp.Regexp seriesFilterer, err := NewSeriesFilterer(rule.SeriesFilters)
if rule.Name.Matches != "" { if err != nil {
nameMatches, err = regexp.Compile(rule.Name.Matches) return nil, fmt.Errorf("unable to create a SeriesFilter associated with series query %q: %v", rule.SeriesQuery, err)
if err != nil {
return nil, fmt.Errorf("unable to compile series name match expression %q associated with series query %q: %v", rule.Name.Matches, rule.SeriesQuery, err)
}
} else {
// this will always succeed
nameMatches = regexp.MustCompile(".*")
} }
nameAs := rule.Name.As
if nameAs == "" { if rule.Name.Matches != "" {
// check if we have an obvious default err := seriesFilterer.AddRequirement(config.RegexFilter{Is: rule.Name.Matches})
subexpNames := nameMatches.SubexpNames() if err != nil {
if len(subexpNames) == 1 { return nil, fmt.Errorf("unable to apply the series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err)
// no capture groups, use the whole thing
nameAs = "$0"
} else if len(subexpNames) == 2 {
// one capture group, use that
nameAs = "$1"
} else {
return nil, fmt.Errorf("must specify an 'as' value for name matcher %q associated with series query %q", rule.Name.Matches, rule.SeriesQuery)
} }
} }
metricNameConverter, err := NewMetricNameConverter(rule.Name)
if err != nil {
return nil, fmt.Errorf("unable to create a MetricNameConverter associated with series query %q: %v", rule.SeriesQuery, err)
}
namer := &metricNamer{ namer := &metricNamer{
seriesQuery: prom.Selector(rule.SeriesQuery), seriesQuery: prom.Selector(rule.SeriesQuery),
labelTemplate: labelTemplate, mapper: mapper,
labelResExtractor: labelResExtractor,
metricsQueryTemplate: metricsQueryTemplate,
mapper: mapper,
nameMatches: nameMatches,
nameAs: nameAs,
seriesMatchers: seriesMatchers,
labelToResource: make(map[pmodel.LabelName]schema.GroupResource), resourceConverter: resourceConverter,
resourceToLabel: make(map[schema.GroupResource]pmodel.LabelName), queryBuilder: queryBuilder,
seriesFilterer: seriesFilterer,
metricType: rule.MetricType, metricNameConverter: metricNameConverter,
} metricType: rule.MetricType,
// invert the structure for consistency with the template
for lbl, groupRes := range rule.Resources.Overrides {
infoRaw := provider.CustomMetricInfo{
GroupResource: schema.GroupResource{
Group: groupRes.Group,
Resource: groupRes.Resource,
},
}
info, _, err := infoRaw.Normalized(mapper)
if err != nil {
return nil, fmt.Errorf("unable to normalize group-resource %v: %v", groupRes, err)
}
namer.labelToResource[pmodel.LabelName(lbl)] = info.GroupResource
namer.resourceToLabel[info.GroupResource] = pmodel.LabelName(lbl)
} }
namers[i] = namer namers[i] = namer
@ -585,15 +234,7 @@ func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMap
func (n *metricNamer) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) { func (n *metricNamer) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) {
queryParts := n.createQueryPartsFromSelector(metricSelector) queryParts := n.createQueryPartsFromSelector(metricSelector)
exprs, valuesByName := n.processQueryParts(queryParts) selector, err := n.queryBuilder.BuildSelector(series, "", []string{}, queryParts)
args := queryTemplateArgs{
Series: series,
LabelMatchers: strings.Join(exprs, ","),
LabelValuesByName: valuesByName,
}
selector, err := n.createSelectorFromTemplateArgs(args)
if err != nil { if err != nil {
return "", err return "", err
} }

View file

@ -55,14 +55,26 @@ func (l *periodicMetricLister) Run() {
func (l *periodicMetricLister) RunUntil(stopChan <-chan struct{}) { func (l *periodicMetricLister) RunUntil(stopChan <-chan struct{}) {
go wait.Until(func() { go wait.Until(func() {
if result, err := l.realLister.ListAllMetrics(); err != nil { if err := l.updateMetrics(); err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
} else {
l.mostRecentResult = result
} }
}, l.updateInterval, stopChan) }, l.updateInterval, stopChan)
} }
func (l *periodicMetricLister) updateMetrics() error {
result, err := l.realLister.ListAllMetrics()
if err != nil {
return err
}
//Cache the result.
l.mostRecentResult = result
//Let our listener know we've got new data ready for them.
l.callback(result)
return nil
}
// func (l *periodicMetricLister) updateMetrics() (metricUpdateResult, error) { // func (l *periodicMetricLister) updateMetrics() (metricUpdateResult, error) {
// result := metricUpdateResult{ // result := metricUpdateResult{

View file

@ -48,7 +48,9 @@ type prometheusProvider struct {
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) { func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) {
basicLister := NewBasicMetricLister(promClient, namers, updateInterval) basicLister := NewBasicMetricLister(promClient, namers, updateInterval)
periodicLister, periodicRunnable := NewPeriodicMetricLister(basicLister, updateInterval) //TODO: Be sure to run this runnable.
// periodicLister, periodicRunnable := NewPeriodicMetricLister(basicLister, updateInterval)
periodicLister, _ := NewPeriodicMetricLister(basicLister, updateInterval)
seriesRegistry := NewBasicSeriesRegistry(periodicLister, mapper) seriesRegistry := NewBasicSeriesRegistry(periodicLister, mapper)

View file

@ -141,7 +141,9 @@ func TestListAllMetrics(t *testing.T) {
fakeProm.acceptibleInterval = pmodel.Interval{Start: startTime, End: 0} fakeProm.acceptibleInterval = pmodel.Interval{Start: startTime, End: 0}
// update the metrics (without actually calling RunUntil, so we can avoid timing issues) // update the metrics (without actually calling RunUntil, so we can avoid timing issues)
lister := prov.(*prometheusProvider).SeriesRegistry.(*cachingMetricsLister) lister := prov.(*prometheusProvider).SeriesRegistry.(*basicSeriesRegistry).metricLister.(*periodicMetricLister)
// lister := prov.(*prometheusProvider).SeriesRegistry.(*basicSeriesRegistry).metricLister.(*periodicMetricLister).realLister.(*basicMetricLister)
require.NoError(t, lister.updateMetrics()) require.NoError(t, lister.updateMetrics())
// list/sort the metrics // list/sort the metrics

View file

@ -0,0 +1,96 @@
package provider
import (
"bytes"
"fmt"
"strings"
"text/template"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
type QueryBuilder interface {
BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error)
}
type queryBuilder struct {
metricsQueryTemplate *template.Template
}
func NewQueryBuilder(metricsQuery string) (QueryBuilder, error) {
metricsQueryTemplate, err := template.New("metrics-query").Delims("<<", ">>").Parse(metricsQuery)
if err != nil {
return nil, fmt.Errorf("unable to parse metrics query template %q: %v", metricsQuery, err)
}
return &queryBuilder{
metricsQueryTemplate: metricsQueryTemplate,
}, nil
}
func (n *queryBuilder) BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error) {
//Convert our query parts into the types we need for our template.
exprs, valuesByName := n.processQueryParts(queryParts)
args := queryTemplateArgs{
Series: seriesName,
LabelMatchers: strings.Join(exprs, ","),
LabelValuesByName: valuesByName,
GroupBy: groupBy,
GroupBySlice: groupBySlice,
}
selector, err := n.createSelectorFromTemplateArgs(args)
if err != nil {
return "", err
}
return selector, nil
}
func (n *queryBuilder) createSelectorFromTemplateArgs(args queryTemplateArgs) (prom.Selector, error) {
//Turn our template arguments into a Selector.
queryBuff := new(bytes.Buffer)
if err := n.metricsQueryTemplate.Execute(queryBuff, args); err != nil {
return "", err
}
if queryBuff.Len() == 0 {
return "", fmt.Errorf("empty query produced by metrics query template")
}
return prom.Selector(queryBuff.String()), nil
}
func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[string][]string) {
//Contains the expressions that we want to include as part of the query to Prometheus.
//e.g. "namespace=my-namespace"
//e.g. "some_label=some-value"
var exprs []string
//Contains the list of label values we're targeting, by namespace.
//e.g. "some_label" => ["value-one", "value-two"]
valuesByName := map[string][]string{}
//Convert our query parts into template arguments.
for _, qPart := range queryParts {
//Be resilient against bad inputs.
//We obviously can't generate label filters for these cases.
if qPart.labelName == "" || len(qPart.values) == 0 {
continue
}
targetValue := qPart.values[0]
matcher := prom.LabelEq
if len(qPart.values) > 1 {
targetValue = strings.Join(qPart.values, "|")
matcher = prom.LabelMatches
}
expression := matcher(qPart.labelName, targetValue)
exprs = append(exprs, expression)
valuesByName[qPart.labelName] = qPart.values
}
return exprs, valuesByName
}

View file

@ -0,0 +1,64 @@
package provider
import (
"testing"
"github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/stretchr/testify/require"
)
func TestSimpleQuery(t *testing.T) {
builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])")
// builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)")
selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{})
expectation := client.Selector("rate(my_series{}[2m])")
require.Equal(t, selector, expectation)
}
func TestSimpleQueryWithOneLabelValue(t *testing.T) {
builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])")
// builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)")
selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
},
})
expectation := client.Selector("rate(my_series{target_label=\"one\"}[2m])")
require.Equal(t, selector, expectation)
}
func TestSimpleQueryWithMultipleLabelValues(t *testing.T) {
builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])")
// builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)")
selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
},
})
expectation := client.Selector("rate(my_series{target_label=~\"one|two\"}[2m])")
require.Equal(t, selector, expectation)
}
func TestQueryWithGroupBy(t *testing.T) {
builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>}[2m])) by (<<.GroupBy>>)")
selector, _ := builder.BuildSelector("my_series", "my_grouping", []string{}, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
},
})
expectation := client.Selector("sum(rate(my_series{target_label=~\"one|two\"}[2m])) by (my_grouping)")
require.Equal(t, selector, expectation)
}
//TODO: Ensure that the LabelValuesByName and GroupBySlice placeholders function correctly.

View file

@ -0,0 +1,47 @@
package provider
import (
"fmt"
"regexp"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
// reMatcher either positively or negatively matches a regex
type reMatcher struct {
regex *regexp.Regexp
positive bool
}
func newReMatcher(cfg config.RegexFilter) (*reMatcher, error) {
if cfg.Is != "" && cfg.IsNot != "" {
return nil, fmt.Errorf("cannot have both an `is` (%q) and `isNot` (%q) expression in a single filter", cfg.Is, cfg.IsNot)
}
if cfg.Is == "" && cfg.IsNot == "" {
return nil, fmt.Errorf("must have either an `is` or `isNot` expression in a filter")
}
var positive bool
var regexRaw string
if cfg.Is != "" {
positive = true
regexRaw = cfg.Is
} else {
positive = false
regexRaw = cfg.IsNot
}
regex, err := regexp.Compile(regexRaw)
if err != nil {
return nil, fmt.Errorf("unable to compile series filter %q: %v", regexRaw, err)
}
return &reMatcher{
regex: regex,
positive: positive,
}, nil
}
func (m *reMatcher) Matches(val string) bool {
return m.regex.MatchString(val) == m.positive
}

View file

@ -0,0 +1,192 @@
package provider
import (
"bytes"
"fmt"
"strings"
"sync"
"text/template"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
"github.com/golang/glog"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
pmodel "github.com/prometheus/common/model"
)
type ResourceConverter interface {
// ResourcesForSeries returns the group-resources associated with the given series,
// as well as whether or not the given series has the "namespace" resource).
ResourcesForSeries(series prom.Series) (res []schema.GroupResource, namespaced bool)
// LabelForResource returns the appropriate label for the given resource.
LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error)
}
type resourceConverter struct {
labelResourceMu sync.RWMutex
labelToResource map[pmodel.LabelName]schema.GroupResource
resourceToLabel map[schema.GroupResource]pmodel.LabelName
labelResExtractor *labelGroupResExtractor
mapper apimeta.RESTMapper
labelTemplate *template.Template
}
func NewResourceConverter(resourceTemplate string, overrides map[string]config.GroupResource, mapper apimeta.RESTMapper) (ResourceConverter, error) {
converter := &resourceConverter{
labelToResource: make(map[pmodel.LabelName]schema.GroupResource),
resourceToLabel: make(map[schema.GroupResource]pmodel.LabelName),
mapper: mapper,
}
if resourceTemplate != "" {
labelTemplate, err := template.New("resource-label").Delims("<<", ">>").Parse(resourceTemplate)
if err != nil {
return converter, fmt.Errorf("unable to parse label template %q: %v", resourceTemplate, err)
}
converter.labelTemplate = labelTemplate
labelResExtractor, err := newLabelGroupResExtractor(labelTemplate)
if err != nil {
return converter, fmt.Errorf("unable to generate label format from template %q: %v", resourceTemplate, err)
}
converter.labelResExtractor = labelResExtractor
}
// invert the structure for consistency with the template
for lbl, groupRes := range overrides {
infoRaw := provider.CustomMetricInfo{
GroupResource: schema.GroupResource{
Group: groupRes.Group,
Resource: groupRes.Resource,
},
}
info, _, err := infoRaw.Normalized(converter.mapper)
if err != nil {
return nil, fmt.Errorf("unable to normalize group-resource %v: %v", groupRes, err)
}
converter.labelToResource[pmodel.LabelName(lbl)] = info.GroupResource
converter.resourceToLabel[info.GroupResource] = pmodel.LabelName(lbl)
}
return converter, nil
}
func (n *resourceConverter) LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) {
n.labelResourceMu.RLock()
// check if we have a cached copy or override
lbl, ok := n.resourceToLabel[resource]
n.labelResourceMu.RUnlock() // release before we call makeLabelForResource
if ok {
return lbl, nil
}
// NB: we don't actually care about the gap between releasing read lock
// and acquiring the write lock -- if we do duplicate work sometimes, so be
// it, as long as we're correct.
// otherwise, use the template and save the result
lbl, err := n.makeLabelForResource(resource)
if err != nil {
return "", fmt.Errorf("unable to convert resource %s into label: %v", resource.String(), err)
}
return lbl, nil
}
var groupNameSanitizer = strings.NewReplacer(".", "_", "-", "_")
// makeLabelForResource constructs a label name for the given resource, and saves the result.
// It must *not* be called under an existing lock.
func (n *resourceConverter) makeLabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) {
if n.labelTemplate == nil {
return "", fmt.Errorf("no generic resource label form specified for this metric")
}
buff := new(bytes.Buffer)
singularRes, err := n.mapper.ResourceSingularizer(resource.Resource)
if err != nil {
return "", fmt.Errorf("unable to singularize resource %s: %v", resource.String(), err)
}
convResource := schema.GroupResource{
Group: groupNameSanitizer.Replace(resource.Group),
Resource: singularRes,
}
if err := n.labelTemplate.Execute(buff, convResource); err != nil {
return "", err
}
if buff.Len() == 0 {
return "", fmt.Errorf("empty label produced by label template")
}
lbl := pmodel.LabelName(buff.String())
n.labelResourceMu.Lock()
defer n.labelResourceMu.Unlock()
n.resourceToLabel[resource] = lbl
n.labelToResource[lbl] = resource
return lbl, nil
}
func (n *resourceConverter) ResourcesForSeries(series prom.Series) ([]schema.GroupResource, bool) {
// use an updates map to avoid having to drop the read lock to update the cache
// until the end. Since we'll probably have few updates after the first run,
// this should mean that we rarely have to hold the write lock.
var resources []schema.GroupResource
updates := make(map[pmodel.LabelName]schema.GroupResource)
namespaced := false
// use an anon func to get the right defer behavior
func() {
n.labelResourceMu.RLock()
defer n.labelResourceMu.RUnlock()
for lbl := range series.Labels {
var groupRes schema.GroupResource
var ok bool
// check if we have an override
if groupRes, ok = n.labelToResource[lbl]; ok {
resources = append(resources, groupRes)
} else if groupRes, ok = updates[lbl]; ok {
resources = append(resources, groupRes)
} else if n.labelResExtractor != nil {
// if not, check if it matches the form we expect, and if so,
// convert to a group-resource.
if groupRes, ok = n.labelResExtractor.GroupResourceForLabel(lbl); ok {
info, _, err := provider.CustomMetricInfo{GroupResource: groupRes}.Normalized(n.mapper)
if err != nil {
glog.Errorf("unable to normalize group-resource %s from label %q, skipping: %v", groupRes.String(), lbl, err)
continue
}
groupRes = info.GroupResource
resources = append(resources, groupRes)
updates[lbl] = groupRes
}
}
if groupRes == nsGroupResource {
namespaced = true
}
}
}()
// update the cache for next time. This should only be called by discovery,
// so we don't really have to worry about the gap between read and write locks
// (plus, we don't care if someone else updates the cache first, since the results
// are necessarily the same, so at most we've done extra work).
if len(updates) > 0 {
n.labelResourceMu.Lock()
defer n.labelResourceMu.Unlock()
for lbl, groupRes := range updates {
n.labelToResource[lbl] = groupRes
}
}
return resources, namespaced
}

View file

@ -0,0 +1,61 @@
package provider
import (
"fmt"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
type SeriesFilterer interface {
FilterSeries(series []prom.Series) []prom.Series
AddRequirement(filter config.RegexFilter) error
}
type seriesFilterer struct {
seriesMatchers []*reMatcher
}
func NewSeriesFilterer(filters []config.RegexFilter) (SeriesFilterer, error) {
seriesMatchers := make([]*reMatcher, len(filters))
for i, filterRaw := range filters {
matcher, err := newReMatcher(filterRaw)
if err != nil {
return nil, fmt.Errorf("unable to generate series name filter: %v", err)
}
seriesMatchers[i] = matcher
}
return &seriesFilterer{
seriesMatchers: seriesMatchers,
}, nil
}
func (n *seriesFilterer) AddRequirement(filterRaw config.RegexFilter) error {
matcher, err := newReMatcher(filterRaw)
if err != nil {
return fmt.Errorf("unable to generate series name filter: %v", err)
}
n.seriesMatchers = append(n.seriesMatchers, matcher)
return nil
}
func (n *seriesFilterer) FilterSeries(initialSeries []prom.Series) []prom.Series {
if len(n.seriesMatchers) == 0 {
return initialSeries
}
finalSeries := make([]prom.Series, 0, len(initialSeries))
SeriesLoop:
for _, series := range initialSeries {
for _, matcher := range n.seriesMatchers {
if !matcher.Matches(series.Name) {
continue SeriesLoop
}
}
finalSeries = append(finalSeries, series)
}
return finalSeries
}

View file

@ -0,0 +1,114 @@
package provider
import (
"testing"
"github.com/stretchr/testify/require"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
func TestPositiveFilter(t *testing.T) {
filters := []config.RegexFilter{
config.RegexFilter{
Is: "one_of_(yes|positive|ok)",
},
}
series := []string{"one_of_yes", "one_of_no"}
expectedSeries := []string{"one_of_yes"}
RunSeriesFiltererTest(t, filters, series, expectedSeries)
}
func TestNegativeFilter(t *testing.T) {
filters := []config.RegexFilter{
config.RegexFilter{
IsNot: "one_of_(yes|positive|ok)",
},
}
series := []string{"one_of_yes", "one_of_no"}
expectedSeries := []string{"one_of_no"}
RunSeriesFiltererTest(t, filters, series, expectedSeries)
}
func TestPositiveAndNegativeFilterError(t *testing.T) {
filters := []config.RegexFilter{
config.RegexFilter{
Is: "series_\\d+",
IsNot: "series_[2-3]+",
},
}
_, err := NewSeriesFilterer(filters)
require.Error(t, err)
}
func TestAddRequirementAppliesFilter(t *testing.T) {
seriesNames := []string{"series_1", "series_2", "series_3"}
series := BuildSeriesFromNames(seriesNames)
filters := []config.RegexFilter{
config.RegexFilter{
Is: "series_\\d+",
},
}
filterer, err := NewSeriesFilterer(filters)
require.NoError(t, err)
//Test it once with the default filters.
result := filterer.FilterSeries(series)
expectedSeries := []string{"series_1", "series_2", "series_3"}
VerifyMatches(t, result, expectedSeries)
//Add a new filter and test again.
filterer.AddRequirement(config.RegexFilter{
Is: "series_[2-3]",
})
result = filterer.FilterSeries(series)
expectedSeries = []string{"series_2", "series_3"}
VerifyMatches(t, result, expectedSeries)
}
func RunSeriesFiltererTest(t *testing.T, filters []config.RegexFilter, seriesNames []string, expectedResults []string) {
series := BuildSeriesFromNames(seriesNames)
filterer, err := NewSeriesFilterer(filters)
require.NoError(t, err)
matches := filterer.FilterSeries(series)
VerifyMatches(t, matches, expectedResults)
}
func VerifyMatches(t *testing.T, series []prom.Series, expectedResults []string) {
require.Equal(t, len(series), len(expectedResults))
existingSeries := make(map[string]bool)
for _, series := range series {
existingSeries[series.Name] = true
}
for _, expectedResult := range expectedResults {
_, exists := existingSeries[expectedResult]
require.True(t, exists)
}
}
func BuildSeriesFromNames(seriesNames []string) []prom.Series {
series := make([]prom.Series, len(seriesNames))
for i, name := range seriesNames {
series[i] = prom.Series{
Name: name,
}
}
return series
}

View file

@ -88,7 +88,7 @@ func NewBasicSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.
return &registry return &registry
} }
func (r basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) {
newSeriesSlices := result.series newSeriesSlices := result.series
namers := result.namers namers := result.namers
@ -100,13 +100,18 @@ func (r basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) {
for i, newSeries := range newSeriesSlices { for i, newSeries := range newSeriesSlices {
namer := namers[i] namer := namers[i]
for _, series := range newSeries { for _, series := range newSeries {
// TODO: warn if it doesn't match any resources identity, err := namer.IdentifySeries(series)
resources, namespaced := namer.ResourcesForSeries(series)
name, err := namer.MetricNameForSeries(series)
if err != nil { if err != nil {
glog.Errorf("unable to name series %q, skipping: %v", series.String(), err) glog.Errorf("unable to name series %q, skipping: %v", series.String(), err)
continue continue
} }
// TODO: warn if it doesn't match any resources
resources := identity.resources
namespaced := identity.namespaced
name := identity.name
for _, resource := range resources { for _, resource := range resources {
info := provider.CustomMetricInfo{ info := provider.CustomMetricInfo{
GroupResource: resource, GroupResource: resource,
@ -181,24 +186,25 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInf
func (r *basicSeriesRegistry) QueryForExternalMetric(metricInfo provider.ExternalMetricInfo, metricSelector labels.Selector) (query prom.Selector, found bool) { func (r *basicSeriesRegistry) QueryForExternalMetric(metricInfo provider.ExternalMetricInfo, metricSelector labels.Selector) (query prom.Selector, found bool) {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
//TODO: Implementation
return "", false
// info, infoFound := r.info[metricInfo]
// if !infoFound {
// //TODO: Weird that it switches between types here.
// glog.V(10).Infof("metric %v not registered", metricInfo)
// return "", false
// }
info, infoFound := r.info[metricInfo] // query, err := info.namer.QueryForExternalSeries(info.seriesName, metricSelector)
if !infoFound { // if err != nil {
//TODO: Weird that it switches between types here. // //TODO: See what was being .String() and implement that for ExternalMetricInfo.
glog.V(10).Infof("metric %v not registered", metricInfo) // // errorVal := metricInfo.String()
return "", false // errorVal := "something"
} // glog.Errorf("unable to construct query for metric %s: %v", errorVal, err)
// return "", false
// }
query, err := info.namer.QueryForExternalSeries(info.seriesName, metricSelector) // return query, true
if err != nil {
//TODO: See what was being .String() and implement that for ExternalMetricInfo.
// errorVal := metricInfo.String()
errorVal := "something"
glog.Errorf("unable to construct query for metric %s: %v", errorVal, err)
return "", false
}
return query, true
} }
func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) { func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) {
@ -216,7 +222,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetri
return nil, false return nil, false
} }
resourceLbl, err := info.namer.LabelForResource(metricInfo.GroupResource) resourceLbl, err := info.namer.ResourceConverter().LabelForResource(metricInfo.GroupResource)
if err != nil { if err != nil {
glog.Errorf("unable to construct resource label for metric %s: %v", metricInfo.String(), err) glog.Errorf("unable to construct resource label for metric %s: %v", metricInfo.String(), err)
return nil, false return nil, false

View file

@ -117,17 +117,60 @@ var seriesRegistryTestSeries = [][]prom.Series{
}, },
} }
type myType struct {
a int
b string
m map[string]int
}
type mapWrapper struct {
item map[string]int
}
func (o *myType) Mutate(newMap mapWrapper) {
o.a = 2
o.b = "two"
o.m = newMap.item
}
func TestWeirdStuff(t *testing.T) {
o := myType{
a: 1,
b: "one",
m: map[string]int{
"one": 1,
},
}
oldMap := o.m
newMap := map[string]int{
"two": 2,
}
newWrapper := mapWrapper{
item: newMap,
}
oldWrapper := mapWrapper{
item: oldMap,
}
o.Mutate(newWrapper)
o.Mutate(oldWrapper)
}
func TestSeriesRegistry(t *testing.T) { func TestSeriesRegistry(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
require := require.New(t)
namers := setupMetricNamer(t) namers := setupMetricNamer(t)
registry := &basicSeriesRegistry{ registry := &basicSeriesRegistry{
mapper: restMapper(), mapper: restMapper(),
} }
updateResult := metricUpdateResult{
series: seriesRegistryTestSeries,
namers: namers,
}
// set up the registry // set up the registry
require.NoError(registry.SetSeries(seriesRegistryTestSeries, namers)) registry.onNewDataAvailable(updateResult)
// make sure each metric got registered and can form queries // make sure each metric got registered and can form queries
testCases := []struct { testCases := []struct {
@ -285,7 +328,11 @@ func BenchmarkSetSeries(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
registry.SetSeries(newSeriesSlices, namers) updateResult := metricUpdateResult{
series: newSeriesSlices,
namers: namers,
}
registry.onNewDataAvailable(updateResult)
} }
} }