diff --git a/pkg/custom-provider/basic_metric_lister.go b/pkg/custom-provider/basic_metric_lister.go index b4a54d7f..13ad0d75 100644 --- a/pkg/custom-provider/basic_metric_lister.go +++ b/pkg/custom-provider/basic_metric_lister.go @@ -105,6 +105,7 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) { return } errs <- nil + //Push into the channel: "this selector produced these series" selectorSeriesChan <- selectorSeries{ selector: sel, series: series, @@ -116,23 +117,32 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) { seriesCacheByQuery := make(map[prom.Selector][]prom.Series) // iterate through, blocking until we've got all results + // We know that, from above, we should have pushed one item into the channel + // for each namer. So here, we'll assume that we should receive one item per namer. for range l.namers { if err := <-errs; err != nil { return result, fmt.Errorf("unable to update list of all metrics: %v", err) } + //Receive from the channel: "this selector produced these series" + //We stuff that into this map so that we can collect the data as it arrives + //and then, once we've received it all, we can process it below. if ss := <-selectorSeriesChan; ss.series != nil { seriesCacheByQuery[ss.selector] = ss.series } } close(errs) + //Now that we've collected all of the results into `seriesCacheByQuery` + //we can start processing them. newSeries := make([][]prom.Series, len(l.namers)) for i, namer := range l.namers { series, cached := seriesCacheByQuery[namer.Selector()] if !cached { return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector()) } - newSeries[i] = namer.FilterSeries(series) + //Because namers provide a "post-filtering" option, it's not enough to + //simply take all the series that were produced. We need to further filter them. + newSeries[i] = namer.SeriesFilterer().FilterSeries(series) } glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) diff --git a/pkg/custom-provider/debug.test b/pkg/custom-provider/debug.test new file mode 100644 index 00000000..be9b85c7 Binary files /dev/null and b/pkg/custom-provider/debug.test differ diff --git a/pkg/custom-provider/external_provider.go b/pkg/custom-provider/external_provider.go index 2a6fe95a..b4d3b8d9 100644 --- a/pkg/custom-provider/external_provider.go +++ b/pkg/custom-provider/external_provider.go @@ -1,81 +1,81 @@ package provider -import ( - "context" - "time" +// import ( +// "context" +// "time" - pmodel "github.com/prometheus/common/model" +// pmodel "github.com/prometheus/common/model" - "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" +// "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" - apimeta "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/dynamic" - "k8s.io/metrics/pkg/apis/external_metrics" +// apimeta "k8s.io/apimachinery/pkg/api/meta" +// "k8s.io/apimachinery/pkg/labels" +// "k8s.io/client-go/dynamic" +// "k8s.io/metrics/pkg/apis/external_metrics" - prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" -) +// prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +// ) -//TODO: Make sure everything has the proper licensing disclosure at the top. -//TODO: I'd like to move these files into another directory, but the compiler was giving me -//some static around unexported types. I'm going to leave things as-is for now, but it -//might be worthwhile to, once the shared components are discovered, move some things around. +// //TODO: Make sure everything has the proper licensing disclosure at the top. +// //TODO: I'd like to move these files into another directory, but the compiler was giving me +// //some static around unexported types. I'm going to leave things as-is for now, but it +// //might be worthwhile to, once the shared components are discovered, move some things around. -//TODO: Some of these members may not be necessary. -//Some of them are definitely duplicated between the -//external and custom providers. They should probably share -//the same instances of these objects (especially the SeriesRegistry) -//to cut down on unnecessary chatter/bookkeeping. -type externalPrometheusProvider struct { - mapper apimeta.RESTMapper - kubeClient dynamic.Interface - promClient prom.Client - queryBuilder ExternalMetricQueryBuilder - metricConverter conv.MetricConverter +// //TODO: Some of these members may not be necessary. +// //Some of them are definitely duplicated between the +// //external and custom providers. They should probably share +// //the same instances of these objects (especially the SeriesRegistry) +// //to cut down on unnecessary chatter/bookkeeping. +// type externalPrometheusProvider struct { +// mapper apimeta.RESTMapper +// kubeClient dynamic.Interface +// promClient prom.Client +// queryBuilder ExternalMetricQueryBuilder +// metricConverter conv.MetricConverter - seriesRegistry SeriesRegistry -} +// seriesRegistry SeriesRegistry +// } -//TODO: It probably makes more sense to, once this is functional and complete, roll the -//prometheusProvider and externalPrometheusProvider up into a single type -//that implements both interfaces or provide a thin wrapper that composes them. -//Just glancing at start.go looks like it would be much more straightforward -//to do one of those two things instead of trying to run the two providers -//independently. +// //TODO: It probably makes more sense to, once this is functional and complete, roll the +// //prometheusProvider and externalPrometheusProvider up into a single type +// //that implements both interfaces or provide a thin wrapper that composes them. +// //Just glancing at start.go looks like it would be much more straightforward +// //to do one of those two things instead of trying to run the two providers +// //independently. -func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, metricConverter conv.MetricConverter, seriesRegistry SeriesRegistry) (provider.ExternalMetricsProvider, Runnable) { +// func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, metricConverter conv.MetricConverter, seriesRegistry SeriesRegistry) (provider.ExternalMetricsProvider, Runnable) { - return &externalPrometheusProvider{ - mapper: mapper, - kubeClient: kubeClient, - promClient: promClient, - metricConverter: metricConverter, - seriesRegistry: seriesRegistry, - }, lister -} +// return &externalPrometheusProvider{ +// mapper: mapper, +// kubeClient: kubeClient, +// promClient: promClient, +// metricConverter: metricConverter, +// seriesRegistry: seriesRegistry, +// }, lister +// } -func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { - selector, found := p.seriesRegistry.QueryForExternalMetric(metricInfo, metricSelector) +// func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { +// selector, found := p.seriesRegistry.QueryForExternalMetric(metricInfo, metricSelector) - if !found { - return &external_metrics.ExternalMetricValueList{ - Items: []external_metrics.ExternalMetricValue{}, - }, nil - } - // query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata) +// if !found { +// return &external_metrics.ExternalMetricValueList{ +// Items: []external_metrics.ExternalMetricValue{}, +// }, nil +// } +// // query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata) - //TODO: I don't yet know what a context is, but apparently I should use a real one. - queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) +// //TODO: I don't yet know what a context is, but apparently I should use a real one. +// queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) - if err != nil { - //TODO: Is this how folks normally deal w/ errors? Just propagate them upwards? - //I should go look at what the customProvider does. - return nil, err - } +// if err != nil { +// //TODO: Is this how folks normally deal w/ errors? Just propagate them upwards? +// //I should go look at what the customProvider does. +// return nil, err +// } - return p.metricConverter.Convert(queryMetadata, queryResults) -} +// return p.metricConverter.Convert(queryMetadata, queryResults) +// } -func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { - return p.seriesRegistry.ListAllExternalMetrics() -} +// func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { +// return p.seriesRegistry.ListAllExternalMetrics() +// } diff --git a/pkg/custom-provider/label_group_resource_extractor.go b/pkg/custom-provider/label_group_resource_extractor.go new file mode 100644 index 00000000..ac744f9e --- /dev/null +++ b/pkg/custom-provider/label_group_resource_extractor.go @@ -0,0 +1,83 @@ +package provider + +import ( + "bytes" + "fmt" + "regexp" + "text/template" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + + pmodel "github.com/prometheus/common/model" +) + +// labelGroupResExtractor extracts schema.GroupResources from series labels. +type labelGroupResExtractor struct { + regex *regexp.Regexp + + resourceInd int + groupInd *int + mapper apimeta.RESTMapper +} + +// newLabelGroupResExtractor creates a new labelGroupResExtractor for labels whose form +// matches the given template. It does so by creating a regular expression from the template, +// so anything in the template which limits resource or group name length will cause issues. +func newLabelGroupResExtractor(labelTemplate *template.Template) (*labelGroupResExtractor, error) { + labelRegexBuff := new(bytes.Buffer) + if err := labelTemplate.Execute(labelRegexBuff, schema.GroupResource{"(?P.+?)", "(?P.+?)"}); err != nil { + return nil, fmt.Errorf("unable to convert label template to matcher: %v", err) + } + if labelRegexBuff.Len() == 0 { + return nil, fmt.Errorf("unable to convert label template to matcher: empty template") + } + labelRegexRaw := "^" + labelRegexBuff.String() + "$" + labelRegex, err := regexp.Compile(labelRegexRaw) + if err != nil { + return nil, fmt.Errorf("unable to convert label template to matcher: %v", err) + } + + var groupInd *int + var resInd *int + + for i, name := range labelRegex.SubexpNames() { + switch name { + case "group": + ind := i // copy to avoid iteration variable reference + groupInd = &ind + case "resource": + ind := i // copy to avoid iteration variable reference + resInd = &ind + } + } + + if resInd == nil { + return nil, fmt.Errorf("must include at least `{{.Resource}}` in the label template") + } + + return &labelGroupResExtractor{ + regex: labelRegex, + resourceInd: *resInd, + groupInd: groupInd, + }, nil +} + +// GroupResourceForLabel extracts a schema.GroupResource from the given label, if possible. +// The second argument indicates whether or not a potential group-resource was found in this label. +func (e *labelGroupResExtractor) GroupResourceForLabel(lbl pmodel.LabelName) (schema.GroupResource, bool) { + matchGroups := e.regex.FindStringSubmatch(string(lbl)) + if matchGroups != nil { + group := "" + if e.groupInd != nil { + group = matchGroups[*e.groupInd] + } + + return schema.GroupResource{ + Group: group, + Resource: matchGroups[e.resourceInd], + }, true + } + + return schema.GroupResource{}, false +} diff --git a/pkg/custom-provider/metric_name_converter.go b/pkg/custom-provider/metric_name_converter.go new file mode 100644 index 00000000..a35d6b2d --- /dev/null +++ b/pkg/custom-provider/metric_name_converter.go @@ -0,0 +1,60 @@ +package provider + +import ( + "fmt" + "regexp" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +type MetricNameConverter interface { + GetMetricNameForSeries(series prom.Series) (string, error) +} + +type metricNameConverter struct { + nameMatches *regexp.Regexp + nameAs string +} + +func NewMetricNameConverter(mapping config.NameMapping) (MetricNameConverter, error) { + var nameMatches *regexp.Regexp + var err error + if mapping.Matches != "" { + nameMatches, err = regexp.Compile(mapping.Matches) + if err != nil { + return nil, fmt.Errorf("unable to compile series name match expression %q: %v", mapping.Matches, err) + } + } else { + // this will always succeed + nameMatches = regexp.MustCompile(".*") + } + nameAs := mapping.As + if nameAs == "" { + // check if we have an obvious default + subexpNames := nameMatches.SubexpNames() + if len(subexpNames) == 1 { + // no capture groups, use the whole thing + nameAs = "$0" + } else if len(subexpNames) == 2 { + // one capture group, use that + nameAs = "$1" + } else { + return nil, fmt.Errorf("must specify an 'as' value for name matcher %q", mapping.Matches) + } + } + + return &metricNameConverter{ + nameMatches: nameMatches, + nameAs: nameAs, + }, nil +} + +func (c *metricNameConverter) GetMetricNameForSeries(series prom.Series) (string, error) { + matches := c.nameMatches.FindStringSubmatchIndex(series.Name) + if matches == nil { + return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, c.nameMatches.String()) + } + outNameBytes := c.nameMatches.ExpandString(nil, c.nameAs, series.Name, matches) + return string(outNameBytes), nil +} diff --git a/pkg/custom-provider/metric_name_converter_test.go b/pkg/custom-provider/metric_name_converter_test.go new file mode 100644 index 00000000..8a082987 --- /dev/null +++ b/pkg/custom-provider/metric_name_converter_test.go @@ -0,0 +1,85 @@ +package provider + +import ( + "testing" + + "github.com/stretchr/testify/require" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + config "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +func TestWhenNoMappingMetricNameIsUnaltered(t *testing.T) { + emptyMapping := config.NameMapping{} + RunTest(t, emptyMapping, "my_series", "my_series") + RunTest(t, emptyMapping, "your_series", "your_series") +} + +func TestWhenMappingWithOneCaptureGroupMetricNameIsCorrect(t *testing.T) { + mapping := config.NameMapping{ + Matches: "my_(.*)", + As: "your_$1", + } + + RunTest(t, mapping, "my_requests_per_second", "your_requests_per_second") +} + +func TestWhenMappingWithMultipleCaptureGroupsMetricNameIsCorrect(t *testing.T) { + //ExpandString has some strange behavior when using the $1, $2 syntax + //Specifically, it doesn't return the expected values for templates like: + //$1_$2 + //You can work around it by using the ${1} syntax. + mapping := config.NameMapping{ + Matches: "my_([^_]+)_([^_]+)", + As: "your_${1}_is_${2}_large", + } + + RunTest(t, mapping, "my_horse_very", "your_horse_is_very_large") + RunTest(t, mapping, "my_dog_not", "your_dog_is_not_large") +} + +func TestAsCanBeInferred(t *testing.T) { + //When we've got one capture group, we should infer that as the target. + mapping := config.NameMapping{ + Matches: "my_(.+)", + } + + RunTest(t, mapping, "my_test_metric", "test_metric") + + //When we have no capture groups, we should infer that the whole thing as the target. + mapping = config.NameMapping{ + Matches: "my_metric", + } + + RunTest(t, mapping, "my_metric", "my_metric") +} + +func TestWhenAsCannotBeInferredError(t *testing.T) { + //More than one capture group should + //result in us giving up on making an educated guess. + mapping := config.NameMapping{ + Matches: "my_([^_]+)_([^_]+)", + } + + RunTestExpectingError(t, mapping, "my_horse_very") + RunTestExpectingError(t, mapping, "my_dog_not") +} + +func RunTest(t *testing.T, mapping config.NameMapping, input string, expectedResult string) { + converter, err := NewMetricNameConverter(mapping) + require.NoError(t, err) + + series := prom.Series{ + Name: input, + } + + actualResult, err := converter.GetMetricNameForSeries(series) + require.NoError(t, err) + + require.Equal(t, expectedResult, actualResult) +} + +func RunTestExpectingError(t *testing.T, mapping config.NameMapping, input string) { + _, err := NewMetricNameConverter(mapping) + require.Error(t, err) +} diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index 172b7b64..7e566e4b 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -1,26 +1,17 @@ package provider import ( - "bytes" "fmt" - "regexp" - "strings" - "sync" - "text/template" - "github.com/golang/glog" - "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" "github.com/directxman12/k8s-prometheus-adapter/pkg/config" - pmodel "github.com/prometheus/common/model" ) var nsGroupResource = schema.GroupResource{Resource: "namespaces"} -var groupNameSanitizer = strings.NewReplacer(".", "_", "-", "_") // MetricNamer knows how to convert Prometheus series names and label names to // metrics API resources, and vice-versa. MetricNamers should be safe to access @@ -34,146 +25,37 @@ type MetricNamer interface { // FilterSeries checks to see which of the given series match any additional // constrains beyond the series query. It's assumed that the series given // already matche the series query. - FilterSeries(series []prom.Series) []prom.Series - // ResourcesForSeries returns the group-resources associated with the given series, - // as well as whether or not the given series has the "namespace" resource). - ResourcesForSeries(series prom.Series) (res []schema.GroupResource, namespaced bool) - // LabelForResource returns the appropriate label for the given resource. - LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) + // FilterSeries(series []prom.Series) []prom.Series + SeriesFilterer() SeriesFilterer + ResourceConverter() ResourceConverter + // MetricNameForSeries returns the name (as presented in the API) for a given series. - MetricNameForSeries(series prom.Series) (string, error) + // MetricNameForSeries(series prom.Series) (string, error) // QueryForSeries returns the query for a given series (not API metric name), with // the given namespace name (if relevant), resource, and resource names. QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) + IdentifySeries(series prom.Series) (seriesIdentity, error) } -// labelGroupResExtractor extracts schema.GroupResources from series labels. -type labelGroupResExtractor struct { - regex *regexp.Regexp - - resourceInd int - groupInd *int - mapper apimeta.RESTMapper +type seriesIdentity struct { + resources []schema.GroupResource + namespaced bool + name string } -// newLabelGroupResExtractor creates a new labelGroupResExtractor for labels whose form -// matches the given template. It does so by creating a regular expression from the template, -// so anything in the template which limits resource or group name length will cause issues. -func newLabelGroupResExtractor(labelTemplate *template.Template) (*labelGroupResExtractor, error) { - labelRegexBuff := new(bytes.Buffer) - if err := labelTemplate.Execute(labelRegexBuff, schema.GroupResource{"(?P.+?)", "(?P.+?)"}); err != nil { - return nil, fmt.Errorf("unable to convert label template to matcher: %v", err) - } - if labelRegexBuff.Len() == 0 { - return nil, fmt.Errorf("unable to convert label template to matcher: empty template") - } - labelRegexRaw := "^" + labelRegexBuff.String() + "$" - labelRegex, err := regexp.Compile(labelRegexRaw) - if err != nil { - return nil, fmt.Errorf("unable to convert label template to matcher: %v", err) - } - - var groupInd *int - var resInd *int - - for i, name := range labelRegex.SubexpNames() { - switch name { - case "group": - ind := i // copy to avoid iteration variable reference - groupInd = &ind - case "resource": - ind := i // copy to avoid iteration variable reference - resInd = &ind - } - } - - if resInd == nil { - return nil, fmt.Errorf("must include at least `{{.Resource}}` in the label template") - } - - return &labelGroupResExtractor{ - regex: labelRegex, - resourceInd: *resInd, - groupInd: groupInd, - }, nil -} - -// GroupResourceForLabel extracts a schema.GroupResource from the given label, if possible. -// The second argument indicates whether or not a potential group-resource was found in this label. -func (e *labelGroupResExtractor) GroupResourceForLabel(lbl pmodel.LabelName) (schema.GroupResource, bool) { - matchGroups := e.regex.FindStringSubmatch(string(lbl)) - if matchGroups != nil { - group := "" - if e.groupInd != nil { - group = matchGroups[*e.groupInd] - } - - return schema.GroupResource{ - Group: group, - Resource: matchGroups[e.resourceInd], - }, true - } - - return schema.GroupResource{}, false -} - -func (r *metricNamer) Selector() prom.Selector { - return r.seriesQuery -} - -// reMatcher either positively or negatively matches a regex -type reMatcher struct { - regex *regexp.Regexp - positive bool -} - -func newReMatcher(cfg config.RegexFilter) (*reMatcher, error) { - if cfg.Is != "" && cfg.IsNot != "" { - return nil, fmt.Errorf("cannot have both an `is` (%q) and `isNot` (%q) expression in a single filter", cfg.Is, cfg.IsNot) - } - if cfg.Is == "" && cfg.IsNot == "" { - return nil, fmt.Errorf("must have either an `is` or `isNot` expression in a filter") - } - - var positive bool - var regexRaw string - if cfg.Is != "" { - positive = true - regexRaw = cfg.Is - } else { - positive = false - regexRaw = cfg.IsNot - } - - regex, err := regexp.Compile(regexRaw) - if err != nil { - return nil, fmt.Errorf("unable to compile series filter %q: %v", regexRaw, err) - } - - return &reMatcher{ - regex: regex, - positive: positive, - }, nil -} - -func (m *reMatcher) Matches(val string) bool { - return m.regex.MatchString(val) == m.positive +func (n *metricNamer) Selector() prom.Selector { + return n.seriesQuery } type metricNamer struct { - seriesQuery prom.Selector - labelTemplate *template.Template - labelResExtractor *labelGroupResExtractor - metricsQueryTemplate *template.Template - nameMatches *regexp.Regexp - nameAs string - seriesMatchers []*reMatcher + seriesQuery prom.Selector - labelResourceMu sync.RWMutex - labelToResource map[pmodel.LabelName]schema.GroupResource - resourceToLabel map[schema.GroupResource]pmodel.LabelName - mapper apimeta.RESTMapper + resourceConverter ResourceConverter + queryBuilder QueryBuilder + seriesFilterer SeriesFilterer + metricNameConverter MetricNameConverter + mapper apimeta.RESTMapper metricType config.MetricType } @@ -187,23 +69,26 @@ type queryTemplateArgs struct { GroupBySlice []string } -func (n *metricNamer) FilterSeries(initialSeries []prom.Series) []prom.Series { - if len(n.seriesMatchers) == 0 { - return initialSeries +func (n *metricNamer) IdentifySeries(series prom.Series) (seriesIdentity, error) { + // TODO: warn if it doesn't match any resources + resources, namespaced := n.resourceConverter.ResourcesForSeries(series) + name, err := n.metricNameConverter.GetMetricNameForSeries(series) + + result := seriesIdentity{ + resources: resources, + namespaced: namespaced, + name: name, } - finalSeries := make([]prom.Series, 0, len(initialSeries)) -SeriesLoop: - for _, series := range initialSeries { - for _, matcher := range n.seriesMatchers { - if !matcher.Matches(series.Name) { - continue SeriesLoop - } - } - finalSeries = append(finalSeries, series) - } + return result, err +} - return finalSeries +func (n *metricNamer) SeriesFilterer() SeriesFilterer { + return n.seriesFilterer +} + +func (n *metricNamer) ResourceConverter() ResourceConverter { + return n.resourceConverter } func (n *metricNamer) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart { @@ -240,7 +125,7 @@ func (n *metricNamer) buildNamespaceQueryPartForSeries(namespace string) (queryP //If we've been given a namespace, then we need to set up //the label requirements to target that namespace. if namespace != "" { - namespaceLbl, err := n.LabelForResource(nsGroupResource) + namespaceLbl, err := n.ResourceConverter().LabelForResource(nsGroupResource) if err != nil { return result, err } @@ -261,7 +146,7 @@ func (n *metricNamer) buildResourceQueryPartForSeries(resource schema.GroupResou //If we've been given a resource, then we need to set up //the label requirements to target that resource. - resourceLbl, err := n.LabelForResource(resource) + resourceLbl, err := n.ResourceConverter().LabelForResource(resource) if err != nil { return result, err } @@ -274,34 +159,6 @@ func (n *metricNamer) buildResourceQueryPartForSeries(resource schema.GroupResou return result, nil } -func (n *metricNamer) processQueryParts(queryParts []queryPart) ([]string, map[string][]string) { - //Contains the expressions that we want to include as part of the query to Prometheus. - //e.g. "namespace=my-namespace" - //e.g. "some_label=some-value" - var exprs []string - - //Contains the list of label values we're targeting, by namespace. - //e.g. "some_label" => ["value-one", "value-two"] - valuesByName := map[string][]string{} - - //Convert our query parts into template arguments. - for _, qPart := range queryParts { - targetValue := qPart.values[0] - matcher := prom.LabelEq - - if len(qPart.values) > 1 { - targetValue = strings.Join(qPart.values, "|") - matcher = prom.LabelMatches - } - - expression := matcher(qPart.labelName, targetValue) - exprs = append(exprs, expression) - valuesByName[qPart.labelName] = qPart.values - } - - return exprs, valuesByName -} - func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) { queryParts := []queryPart{} @@ -321,259 +178,51 @@ func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResourc queryParts = append(queryParts, resourceQueryPart) - //Convert our query parts into the types we need for our template. - exprs, valuesByName := n.processQueryParts(queryParts) - - args := queryTemplateArgs{ - Series: series, - LabelMatchers: strings.Join(exprs, ","), - LabelValuesByName: valuesByName, - GroupBy: resourceQueryPart.labelName, - GroupBySlice: []string{resourceQueryPart.labelName}, - } - - selector, err := n.createSelectorFromTemplateArgs(args) - if err != nil { - return "", err - } - - return selector, nil -} - -func (n *metricNamer) createSelectorFromTemplateArgs(args queryTemplateArgs) (prom.Selector, error) { - //Turn our template arguments into a Selector. - queryBuff := new(bytes.Buffer) - if err := n.metricsQueryTemplate.Execute(queryBuff, args); err != nil { - return "", err - } - - if queryBuff.Len() == 0 { - return "", fmt.Errorf("empty query produced by metrics query template") - } - - return prom.Selector(queryBuff.String()), nil -} - -func (n *metricNamer) ResourcesForSeries(series prom.Series) ([]schema.GroupResource, bool) { - // use an updates map to avoid having to drop the read lock to update the cache - // until the end. Since we'll probably have few updates after the first run, - // this should mean that we rarely have to hold the write lock. - var resources []schema.GroupResource - updates := make(map[pmodel.LabelName]schema.GroupResource) - namespaced := false - - // use an anon func to get the right defer behavior - func() { - n.labelResourceMu.RLock() - defer n.labelResourceMu.RUnlock() - - for lbl := range series.Labels { - var groupRes schema.GroupResource - var ok bool - - // check if we have an override - if groupRes, ok = n.labelToResource[lbl]; ok { - resources = append(resources, groupRes) - } else if groupRes, ok = updates[lbl]; ok { - resources = append(resources, groupRes) - } else if n.labelResExtractor != nil { - // if not, check if it matches the form we expect, and if so, - // convert to a group-resource. - if groupRes, ok = n.labelResExtractor.GroupResourceForLabel(lbl); ok { - info, _, err := provider.CustomMetricInfo{GroupResource: groupRes}.Normalized(n.mapper) - if err != nil { - glog.Errorf("unable to normalize group-resource %s from label %q, skipping: %v", groupRes.String(), lbl, err) - continue - } - - groupRes = info.GroupResource - resources = append(resources, groupRes) - updates[lbl] = groupRes - } - } - - if groupRes == nsGroupResource { - namespaced = true - } - } - }() - - // update the cache for next time. This should only be called by discovery, - // so we don't really have to worry about the grap between read and write locks - // (plus, we don't care if someone else updates the cache first, since the results - // are necessarily the same, so at most we've done extra work). - if len(updates) > 0 { - n.labelResourceMu.Lock() - defer n.labelResourceMu.Unlock() - - for lbl, groupRes := range updates { - n.labelToResource[lbl] = groupRes - } - } - - return resources, namespaced -} - -func (n *metricNamer) LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) { - n.labelResourceMu.RLock() - // check if we have a cached copy or override - lbl, ok := n.resourceToLabel[resource] - n.labelResourceMu.RUnlock() // release before we call makeLabelForResource - if ok { - return lbl, nil - } - - // NB: we don't actually care about the gap between releasing read lock - // and acquiring the write lock -- if we do duplicate work sometimes, so be - // it, as long as we're correct. - - // otherwise, use the template and save the result - lbl, err := n.makeLabelForResource(resource) - if err != nil { - return "", fmt.Errorf("unable to convert resource %s into label: %v", resource.String(), err) - } - return lbl, nil -} - -// makeLabelForResource constructs a label name for the given resource, and saves the result. -// It must *not* be called under an existing lock. -func (n *metricNamer) makeLabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) { - if n.labelTemplate == nil { - return "", fmt.Errorf("no generic resource label form specified for this metric") - } - buff := new(bytes.Buffer) - - singularRes, err := n.mapper.ResourceSingularizer(resource.Resource) - if err != nil { - return "", fmt.Errorf("unable to singularize resource %s: %v", resource.String(), err) - } - convResource := schema.GroupResource{ - Group: groupNameSanitizer.Replace(resource.Group), - Resource: singularRes, - } - - if err := n.labelTemplate.Execute(buff, convResource); err != nil { - return "", err - } - if buff.Len() == 0 { - return "", fmt.Errorf("empty label produced by label template") - } - lbl := pmodel.LabelName(buff.String()) - - n.labelResourceMu.Lock() - defer n.labelResourceMu.Unlock() - - n.resourceToLabel[resource] = lbl - n.labelToResource[lbl] = resource - return lbl, nil -} - -func (n *metricNamer) MetricNameForSeries(series prom.Series) (string, error) { - matches := n.nameMatches.FindStringSubmatchIndex(series.Name) - if matches == nil { - return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, n.nameMatches.String()) - } - outNameBytes := n.nameMatches.ExpandString(nil, n.nameAs, series.Name, matches) - return string(outNameBytes), nil + return n.queryBuilder.BuildSelector(series, resourceQueryPart.labelName, []string{resourceQueryPart.labelName}, queryParts) } // NamersFromConfig produces a MetricNamer for each rule in the given config. func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]MetricNamer, error) { namers := make([]MetricNamer, len(cfg.Rules)) - for i, rule := range cfg.Rules { - var labelTemplate *template.Template - var labelResExtractor *labelGroupResExtractor var err error - if rule.Resources.Template != "" { - labelTemplate, err = template.New("resource-label").Delims("<<", ">>").Parse(rule.Resources.Template) - if err != nil { - return nil, fmt.Errorf("unable to parse label template %q associated with series query %q: %v", rule.Resources.Template, rule.SeriesQuery, err) - } - labelResExtractor, err = newLabelGroupResExtractor(labelTemplate) - if err != nil { - return nil, fmt.Errorf("unable to generate label format from template %q associated with series query %q: %v", rule.Resources.Template, rule.SeriesQuery, err) - } - } - - metricsQueryTemplate, err := template.New("metrics-query").Delims("<<", ">>").Parse(rule.MetricsQuery) + resourceConverter, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper) if err != nil { - return nil, fmt.Errorf("unable to parse metrics query template %q associated with series query %q: %v", rule.MetricsQuery, rule.SeriesQuery, err) + return nil, fmt.Errorf("unable to create ResourceConverter associated with series query %q: %v", rule.SeriesQuery, err) } - seriesMatchers := make([]*reMatcher, len(rule.SeriesFilters)) - for i, filterRaw := range rule.SeriesFilters { - matcher, err := newReMatcher(filterRaw) - if err != nil { - return nil, fmt.Errorf("unable to generate series name filter associated with series query %q: %v", rule.SeriesQuery, err) - } - seriesMatchers[i] = matcher - } - if rule.Name.Matches != "" { - matcher, err := newReMatcher(config.RegexFilter{Is: rule.Name.Matches}) - if err != nil { - return nil, fmt.Errorf("unable to generate series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err) - } - seriesMatchers = append(seriesMatchers, matcher) + queryBuilder, err := NewQueryBuilder(rule.MetricsQuery) + if err != nil { + return nil, fmt.Errorf("unable to create a QueryBuilder associated with series query %q: %v", rule.SeriesQuery, err) } - var nameMatches *regexp.Regexp - if rule.Name.Matches != "" { - nameMatches, err = regexp.Compile(rule.Name.Matches) - if err != nil { - return nil, fmt.Errorf("unable to compile series name match expression %q associated with series query %q: %v", rule.Name.Matches, rule.SeriesQuery, err) - } - } else { - // this will always succeed - nameMatches = regexp.MustCompile(".*") + seriesFilterer, err := NewSeriesFilterer(rule.SeriesFilters) + if err != nil { + return nil, fmt.Errorf("unable to create a SeriesFilter associated with series query %q: %v", rule.SeriesQuery, err) } - nameAs := rule.Name.As - if nameAs == "" { - // check if we have an obvious default - subexpNames := nameMatches.SubexpNames() - if len(subexpNames) == 1 { - // no capture groups, use the whole thing - nameAs = "$0" - } else if len(subexpNames) == 2 { - // one capture group, use that - nameAs = "$1" - } else { - return nil, fmt.Errorf("must specify an 'as' value for name matcher %q associated with series query %q", rule.Name.Matches, rule.SeriesQuery) + + if rule.Name.Matches != "" { + err := seriesFilterer.AddRequirement(config.RegexFilter{Is: rule.Name.Matches}) + if err != nil { + return nil, fmt.Errorf("unable to apply the series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err) } } + metricNameConverter, err := NewMetricNameConverter(rule.Name) + if err != nil { + return nil, fmt.Errorf("unable to create a MetricNameConverter associated with series query %q: %v", rule.SeriesQuery, err) + } + namer := &metricNamer{ - seriesQuery: prom.Selector(rule.SeriesQuery), - labelTemplate: labelTemplate, - labelResExtractor: labelResExtractor, - metricsQueryTemplate: metricsQueryTemplate, - mapper: mapper, - nameMatches: nameMatches, - nameAs: nameAs, - seriesMatchers: seriesMatchers, + seriesQuery: prom.Selector(rule.SeriesQuery), + mapper: mapper, - labelToResource: make(map[pmodel.LabelName]schema.GroupResource), - resourceToLabel: make(map[schema.GroupResource]pmodel.LabelName), - - metricType: rule.MetricType, - } - - // invert the structure for consistency with the template - for lbl, groupRes := range rule.Resources.Overrides { - infoRaw := provider.CustomMetricInfo{ - GroupResource: schema.GroupResource{ - Group: groupRes.Group, - Resource: groupRes.Resource, - }, - } - info, _, err := infoRaw.Normalized(mapper) - if err != nil { - return nil, fmt.Errorf("unable to normalize group-resource %v: %v", groupRes, err) - } - - namer.labelToResource[pmodel.LabelName(lbl)] = info.GroupResource - namer.resourceToLabel[info.GroupResource] = pmodel.LabelName(lbl) + resourceConverter: resourceConverter, + queryBuilder: queryBuilder, + seriesFilterer: seriesFilterer, + metricNameConverter: metricNameConverter, + metricType: rule.MetricType, } namers[i] = namer @@ -585,15 +234,7 @@ func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMap func (n *metricNamer) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) { queryParts := n.createQueryPartsFromSelector(metricSelector) - exprs, valuesByName := n.processQueryParts(queryParts) - - args := queryTemplateArgs{ - Series: series, - LabelMatchers: strings.Join(exprs, ","), - LabelValuesByName: valuesByName, - } - - selector, err := n.createSelectorFromTemplateArgs(args) + selector, err := n.queryBuilder.BuildSelector(series, "", []string{}, queryParts) if err != nil { return "", err } diff --git a/pkg/custom-provider/periodic_metric_lister.go b/pkg/custom-provider/periodic_metric_lister.go index 84e6a77c..ce87f5a3 100644 --- a/pkg/custom-provider/periodic_metric_lister.go +++ b/pkg/custom-provider/periodic_metric_lister.go @@ -55,14 +55,26 @@ func (l *periodicMetricLister) Run() { func (l *periodicMetricLister) RunUntil(stopChan <-chan struct{}) { go wait.Until(func() { - if result, err := l.realLister.ListAllMetrics(); err != nil { + if err := l.updateMetrics(); err != nil { utilruntime.HandleError(err) - } else { - l.mostRecentResult = result } }, l.updateInterval, stopChan) } +func (l *periodicMetricLister) updateMetrics() error { + result, err := l.realLister.ListAllMetrics() + + if err != nil { + return err + } + + //Cache the result. + l.mostRecentResult = result + //Let our listener know we've got new data ready for them. + l.callback(result) + return nil +} + // func (l *periodicMetricLister) updateMetrics() (metricUpdateResult, error) { // result := metricUpdateResult{ diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 91682150..011456a9 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -48,7 +48,9 @@ type prometheusProvider struct { func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) { basicLister := NewBasicMetricLister(promClient, namers, updateInterval) - periodicLister, periodicRunnable := NewPeriodicMetricLister(basicLister, updateInterval) + //TODO: Be sure to run this runnable. + // periodicLister, periodicRunnable := NewPeriodicMetricLister(basicLister, updateInterval) + periodicLister, _ := NewPeriodicMetricLister(basicLister, updateInterval) seriesRegistry := NewBasicSeriesRegistry(periodicLister, mapper) diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go index a62bc3ac..2ef6361d 100644 --- a/pkg/custom-provider/provider_test.go +++ b/pkg/custom-provider/provider_test.go @@ -141,7 +141,9 @@ func TestListAllMetrics(t *testing.T) { fakeProm.acceptibleInterval = pmodel.Interval{Start: startTime, End: 0} // update the metrics (without actually calling RunUntil, so we can avoid timing issues) - lister := prov.(*prometheusProvider).SeriesRegistry.(*cachingMetricsLister) + lister := prov.(*prometheusProvider).SeriesRegistry.(*basicSeriesRegistry).metricLister.(*periodicMetricLister) + + // lister := prov.(*prometheusProvider).SeriesRegistry.(*basicSeriesRegistry).metricLister.(*periodicMetricLister).realLister.(*basicMetricLister) require.NoError(t, lister.updateMetrics()) // list/sort the metrics diff --git a/pkg/custom-provider/query_builder.go b/pkg/custom-provider/query_builder.go new file mode 100644 index 00000000..64db037b --- /dev/null +++ b/pkg/custom-provider/query_builder.go @@ -0,0 +1,96 @@ +package provider + +import ( + "bytes" + "fmt" + "strings" + "text/template" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +type QueryBuilder interface { + BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error) +} + +type queryBuilder struct { + metricsQueryTemplate *template.Template +} + +func NewQueryBuilder(metricsQuery string) (QueryBuilder, error) { + metricsQueryTemplate, err := template.New("metrics-query").Delims("<<", ">>").Parse(metricsQuery) + if err != nil { + return nil, fmt.Errorf("unable to parse metrics query template %q: %v", metricsQuery, err) + } + + return &queryBuilder{ + metricsQueryTemplate: metricsQueryTemplate, + }, nil +} + +func (n *queryBuilder) BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error) { + //Convert our query parts into the types we need for our template. + exprs, valuesByName := n.processQueryParts(queryParts) + + args := queryTemplateArgs{ + Series: seriesName, + LabelMatchers: strings.Join(exprs, ","), + LabelValuesByName: valuesByName, + GroupBy: groupBy, + GroupBySlice: groupBySlice, + } + + selector, err := n.createSelectorFromTemplateArgs(args) + if err != nil { + return "", err + } + + return selector, nil +} + +func (n *queryBuilder) createSelectorFromTemplateArgs(args queryTemplateArgs) (prom.Selector, error) { + //Turn our template arguments into a Selector. + queryBuff := new(bytes.Buffer) + if err := n.metricsQueryTemplate.Execute(queryBuff, args); err != nil { + return "", err + } + + if queryBuff.Len() == 0 { + return "", fmt.Errorf("empty query produced by metrics query template") + } + + return prom.Selector(queryBuff.String()), nil +} + +func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[string][]string) { + //Contains the expressions that we want to include as part of the query to Prometheus. + //e.g. "namespace=my-namespace" + //e.g. "some_label=some-value" + var exprs []string + + //Contains the list of label values we're targeting, by namespace. + //e.g. "some_label" => ["value-one", "value-two"] + valuesByName := map[string][]string{} + + //Convert our query parts into template arguments. + for _, qPart := range queryParts { + //Be resilient against bad inputs. + //We obviously can't generate label filters for these cases. + if qPart.labelName == "" || len(qPart.values) == 0 { + continue + } + targetValue := qPart.values[0] + matcher := prom.LabelEq + + if len(qPart.values) > 1 { + targetValue = strings.Join(qPart.values, "|") + matcher = prom.LabelMatches + } + + expression := matcher(qPart.labelName, targetValue) + exprs = append(exprs, expression) + valuesByName[qPart.labelName] = qPart.values + } + + return exprs, valuesByName +} diff --git a/pkg/custom-provider/query_builder_test.go b/pkg/custom-provider/query_builder_test.go new file mode 100644 index 00000000..46d949a6 --- /dev/null +++ b/pkg/custom-provider/query_builder_test.go @@ -0,0 +1,64 @@ +package provider + +import ( + "testing" + + "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/stretchr/testify/require" +) + +func TestSimpleQuery(t *testing.T) { + builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") + + // builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)") + selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{}) + + expectation := client.Selector("rate(my_series{}[2m])") + require.Equal(t, selector, expectation) +} + +func TestSimpleQueryWithOneLabelValue(t *testing.T) { + builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") + + // builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)") + selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one"}, + }, + }) + + expectation := client.Selector("rate(my_series{target_label=\"one\"}[2m])") + require.Equal(t, selector, expectation) +} + +func TestSimpleQueryWithMultipleLabelValues(t *testing.T) { + builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])") + + // builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)") + selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + }, + }) + + expectation := client.Selector("rate(my_series{target_label=~\"one|two\"}[2m])") + require.Equal(t, selector, expectation) +} + +func TestQueryWithGroupBy(t *testing.T) { + builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>}[2m])) by (<<.GroupBy>>)") + + selector, _ := builder.BuildSelector("my_series", "my_grouping", []string{}, []queryPart{ + queryPart{ + labelName: "target_label", + values: []string{"one", "two"}, + }, + }) + + expectation := client.Selector("sum(rate(my_series{target_label=~\"one|two\"}[2m])) by (my_grouping)") + require.Equal(t, selector, expectation) +} + +//TODO: Ensure that the LabelValuesByName and GroupBySlice placeholders function correctly. diff --git a/pkg/custom-provider/regex_matcher.go b/pkg/custom-provider/regex_matcher.go new file mode 100644 index 00000000..69dfa1f0 --- /dev/null +++ b/pkg/custom-provider/regex_matcher.go @@ -0,0 +1,47 @@ +package provider + +import ( + "fmt" + "regexp" + + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +// reMatcher either positively or negatively matches a regex +type reMatcher struct { + regex *regexp.Regexp + positive bool +} + +func newReMatcher(cfg config.RegexFilter) (*reMatcher, error) { + if cfg.Is != "" && cfg.IsNot != "" { + return nil, fmt.Errorf("cannot have both an `is` (%q) and `isNot` (%q) expression in a single filter", cfg.Is, cfg.IsNot) + } + if cfg.Is == "" && cfg.IsNot == "" { + return nil, fmt.Errorf("must have either an `is` or `isNot` expression in a filter") + } + + var positive bool + var regexRaw string + if cfg.Is != "" { + positive = true + regexRaw = cfg.Is + } else { + positive = false + regexRaw = cfg.IsNot + } + + regex, err := regexp.Compile(regexRaw) + if err != nil { + return nil, fmt.Errorf("unable to compile series filter %q: %v", regexRaw, err) + } + + return &reMatcher{ + regex: regex, + positive: positive, + }, nil +} + +func (m *reMatcher) Matches(val string) bool { + return m.regex.MatchString(val) == m.positive +} diff --git a/pkg/custom-provider/resource_converter.go b/pkg/custom-provider/resource_converter.go new file mode 100644 index 00000000..958fbd40 --- /dev/null +++ b/pkg/custom-provider/resource_converter.go @@ -0,0 +1,192 @@ +package provider + +import ( + "bytes" + "fmt" + "strings" + "sync" + "text/template" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" + "github.com/golang/glog" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + pmodel "github.com/prometheus/common/model" +) + +type ResourceConverter interface { + // ResourcesForSeries returns the group-resources associated with the given series, + // as well as whether or not the given series has the "namespace" resource). + ResourcesForSeries(series prom.Series) (res []schema.GroupResource, namespaced bool) + // LabelForResource returns the appropriate label for the given resource. + LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) +} + +type resourceConverter struct { + labelResourceMu sync.RWMutex + labelToResource map[pmodel.LabelName]schema.GroupResource + resourceToLabel map[schema.GroupResource]pmodel.LabelName + labelResExtractor *labelGroupResExtractor + mapper apimeta.RESTMapper + labelTemplate *template.Template +} + +func NewResourceConverter(resourceTemplate string, overrides map[string]config.GroupResource, mapper apimeta.RESTMapper) (ResourceConverter, error) { + converter := &resourceConverter{ + labelToResource: make(map[pmodel.LabelName]schema.GroupResource), + resourceToLabel: make(map[schema.GroupResource]pmodel.LabelName), + mapper: mapper, + } + + if resourceTemplate != "" { + labelTemplate, err := template.New("resource-label").Delims("<<", ">>").Parse(resourceTemplate) + if err != nil { + return converter, fmt.Errorf("unable to parse label template %q: %v", resourceTemplate, err) + } + converter.labelTemplate = labelTemplate + + labelResExtractor, err := newLabelGroupResExtractor(labelTemplate) + if err != nil { + return converter, fmt.Errorf("unable to generate label format from template %q: %v", resourceTemplate, err) + } + converter.labelResExtractor = labelResExtractor + } + + // invert the structure for consistency with the template + for lbl, groupRes := range overrides { + infoRaw := provider.CustomMetricInfo{ + GroupResource: schema.GroupResource{ + Group: groupRes.Group, + Resource: groupRes.Resource, + }, + } + info, _, err := infoRaw.Normalized(converter.mapper) + if err != nil { + return nil, fmt.Errorf("unable to normalize group-resource %v: %v", groupRes, err) + } + + converter.labelToResource[pmodel.LabelName(lbl)] = info.GroupResource + converter.resourceToLabel[info.GroupResource] = pmodel.LabelName(lbl) + } + + return converter, nil +} + +func (n *resourceConverter) LabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) { + n.labelResourceMu.RLock() + // check if we have a cached copy or override + lbl, ok := n.resourceToLabel[resource] + n.labelResourceMu.RUnlock() // release before we call makeLabelForResource + if ok { + return lbl, nil + } + + // NB: we don't actually care about the gap between releasing read lock + // and acquiring the write lock -- if we do duplicate work sometimes, so be + // it, as long as we're correct. + + // otherwise, use the template and save the result + lbl, err := n.makeLabelForResource(resource) + if err != nil { + return "", fmt.Errorf("unable to convert resource %s into label: %v", resource.String(), err) + } + return lbl, nil +} + +var groupNameSanitizer = strings.NewReplacer(".", "_", "-", "_") + +// makeLabelForResource constructs a label name for the given resource, and saves the result. +// It must *not* be called under an existing lock. +func (n *resourceConverter) makeLabelForResource(resource schema.GroupResource) (pmodel.LabelName, error) { + if n.labelTemplate == nil { + return "", fmt.Errorf("no generic resource label form specified for this metric") + } + buff := new(bytes.Buffer) + + singularRes, err := n.mapper.ResourceSingularizer(resource.Resource) + if err != nil { + return "", fmt.Errorf("unable to singularize resource %s: %v", resource.String(), err) + } + convResource := schema.GroupResource{ + Group: groupNameSanitizer.Replace(resource.Group), + Resource: singularRes, + } + + if err := n.labelTemplate.Execute(buff, convResource); err != nil { + return "", err + } + if buff.Len() == 0 { + return "", fmt.Errorf("empty label produced by label template") + } + lbl := pmodel.LabelName(buff.String()) + + n.labelResourceMu.Lock() + defer n.labelResourceMu.Unlock() + + n.resourceToLabel[resource] = lbl + n.labelToResource[lbl] = resource + return lbl, nil +} + +func (n *resourceConverter) ResourcesForSeries(series prom.Series) ([]schema.GroupResource, bool) { + // use an updates map to avoid having to drop the read lock to update the cache + // until the end. Since we'll probably have few updates after the first run, + // this should mean that we rarely have to hold the write lock. + var resources []schema.GroupResource + updates := make(map[pmodel.LabelName]schema.GroupResource) + namespaced := false + + // use an anon func to get the right defer behavior + func() { + n.labelResourceMu.RLock() + defer n.labelResourceMu.RUnlock() + + for lbl := range series.Labels { + var groupRes schema.GroupResource + var ok bool + + // check if we have an override + if groupRes, ok = n.labelToResource[lbl]; ok { + resources = append(resources, groupRes) + } else if groupRes, ok = updates[lbl]; ok { + resources = append(resources, groupRes) + } else if n.labelResExtractor != nil { + // if not, check if it matches the form we expect, and if so, + // convert to a group-resource. + if groupRes, ok = n.labelResExtractor.GroupResourceForLabel(lbl); ok { + info, _, err := provider.CustomMetricInfo{GroupResource: groupRes}.Normalized(n.mapper) + if err != nil { + glog.Errorf("unable to normalize group-resource %s from label %q, skipping: %v", groupRes.String(), lbl, err) + continue + } + + groupRes = info.GroupResource + resources = append(resources, groupRes) + updates[lbl] = groupRes + } + } + + if groupRes == nsGroupResource { + namespaced = true + } + } + }() + + // update the cache for next time. This should only be called by discovery, + // so we don't really have to worry about the gap between read and write locks + // (plus, we don't care if someone else updates the cache first, since the results + // are necessarily the same, so at most we've done extra work). + if len(updates) > 0 { + n.labelResourceMu.Lock() + defer n.labelResourceMu.Unlock() + + for lbl, groupRes := range updates { + n.labelToResource[lbl] = groupRes + } + } + + return resources, namespaced +} diff --git a/pkg/custom-provider/series_filterer.go b/pkg/custom-provider/series_filterer.go new file mode 100644 index 00000000..5a05444f --- /dev/null +++ b/pkg/custom-provider/series_filterer.go @@ -0,0 +1,61 @@ +package provider + +import ( + "fmt" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +type SeriesFilterer interface { + FilterSeries(series []prom.Series) []prom.Series + AddRequirement(filter config.RegexFilter) error +} + +type seriesFilterer struct { + seriesMatchers []*reMatcher +} + +func NewSeriesFilterer(filters []config.RegexFilter) (SeriesFilterer, error) { + seriesMatchers := make([]*reMatcher, len(filters)) + for i, filterRaw := range filters { + matcher, err := newReMatcher(filterRaw) + if err != nil { + return nil, fmt.Errorf("unable to generate series name filter: %v", err) + } + seriesMatchers[i] = matcher + } + + return &seriesFilterer{ + seriesMatchers: seriesMatchers, + }, nil +} + +func (n *seriesFilterer) AddRequirement(filterRaw config.RegexFilter) error { + matcher, err := newReMatcher(filterRaw) + if err != nil { + return fmt.Errorf("unable to generate series name filter: %v", err) + } + + n.seriesMatchers = append(n.seriesMatchers, matcher) + return nil +} + +func (n *seriesFilterer) FilterSeries(initialSeries []prom.Series) []prom.Series { + if len(n.seriesMatchers) == 0 { + return initialSeries + } + + finalSeries := make([]prom.Series, 0, len(initialSeries)) +SeriesLoop: + for _, series := range initialSeries { + for _, matcher := range n.seriesMatchers { + if !matcher.Matches(series.Name) { + continue SeriesLoop + } + } + finalSeries = append(finalSeries, series) + } + + return finalSeries +} diff --git a/pkg/custom-provider/series_filterer_test.go b/pkg/custom-provider/series_filterer_test.go new file mode 100644 index 00000000..57db97ac --- /dev/null +++ b/pkg/custom-provider/series_filterer_test.go @@ -0,0 +1,114 @@ +package provider + +import ( + "testing" + + "github.com/stretchr/testify/require" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" +) + +func TestPositiveFilter(t *testing.T) { + filters := []config.RegexFilter{ + config.RegexFilter{ + Is: "one_of_(yes|positive|ok)", + }, + } + + series := []string{"one_of_yes", "one_of_no"} + expectedSeries := []string{"one_of_yes"} + + RunSeriesFiltererTest(t, filters, series, expectedSeries) +} + +func TestNegativeFilter(t *testing.T) { + filters := []config.RegexFilter{ + config.RegexFilter{ + IsNot: "one_of_(yes|positive|ok)", + }, + } + + series := []string{"one_of_yes", "one_of_no"} + expectedSeries := []string{"one_of_no"} + + RunSeriesFiltererTest(t, filters, series, expectedSeries) +} + +func TestPositiveAndNegativeFilterError(t *testing.T) { + filters := []config.RegexFilter{ + config.RegexFilter{ + Is: "series_\\d+", + IsNot: "series_[2-3]+", + }, + } + + _, err := NewSeriesFilterer(filters) + require.Error(t, err) +} + +func TestAddRequirementAppliesFilter(t *testing.T) { + seriesNames := []string{"series_1", "series_2", "series_3"} + series := BuildSeriesFromNames(seriesNames) + + filters := []config.RegexFilter{ + config.RegexFilter{ + Is: "series_\\d+", + }, + } + + filterer, err := NewSeriesFilterer(filters) + require.NoError(t, err) + + //Test it once with the default filters. + result := filterer.FilterSeries(series) + expectedSeries := []string{"series_1", "series_2", "series_3"} + VerifyMatches(t, result, expectedSeries) + + //Add a new filter and test again. + filterer.AddRequirement(config.RegexFilter{ + Is: "series_[2-3]", + }) + result = filterer.FilterSeries(series) + expectedSeries = []string{"series_2", "series_3"} + VerifyMatches(t, result, expectedSeries) +} + +func RunSeriesFiltererTest(t *testing.T, filters []config.RegexFilter, seriesNames []string, expectedResults []string) { + series := BuildSeriesFromNames(seriesNames) + + filterer, err := NewSeriesFilterer(filters) + require.NoError(t, err) + + matches := filterer.FilterSeries(series) + + VerifyMatches(t, matches, expectedResults) +} + +func VerifyMatches(t *testing.T, series []prom.Series, expectedResults []string) { + require.Equal(t, len(series), len(expectedResults)) + + existingSeries := make(map[string]bool) + + for _, series := range series { + existingSeries[series.Name] = true + } + + for _, expectedResult := range expectedResults { + _, exists := existingSeries[expectedResult] + + require.True(t, exists) + } +} + +func BuildSeriesFromNames(seriesNames []string) []prom.Series { + series := make([]prom.Series, len(seriesNames)) + + for i, name := range seriesNames { + series[i] = prom.Series{ + Name: name, + } + } + + return series +} diff --git a/pkg/custom-provider/series_registry.go b/pkg/custom-provider/series_registry.go index 28ea08fd..0b5be55d 100644 --- a/pkg/custom-provider/series_registry.go +++ b/pkg/custom-provider/series_registry.go @@ -88,7 +88,7 @@ func NewBasicSeriesRegistry(lister MetricListerWithNotification, mapper apimeta. return ®istry } -func (r basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { +func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { newSeriesSlices := result.series namers := result.namers @@ -100,13 +100,18 @@ func (r basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { for i, newSeries := range newSeriesSlices { namer := namers[i] for _, series := range newSeries { - // TODO: warn if it doesn't match any resources - resources, namespaced := namer.ResourcesForSeries(series) - name, err := namer.MetricNameForSeries(series) + identity, err := namer.IdentifySeries(series) + if err != nil { glog.Errorf("unable to name series %q, skipping: %v", series.String(), err) continue } + + // TODO: warn if it doesn't match any resources + resources := identity.resources + namespaced := identity.namespaced + name := identity.name + for _, resource := range resources { info := provider.CustomMetricInfo{ GroupResource: resource, @@ -181,24 +186,25 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInf func (r *basicSeriesRegistry) QueryForExternalMetric(metricInfo provider.ExternalMetricInfo, metricSelector labels.Selector) (query prom.Selector, found bool) { r.mu.RLock() defer r.mu.RUnlock() + //TODO: Implementation + return "", false + // info, infoFound := r.info[metricInfo] + // if !infoFound { + // //TODO: Weird that it switches between types here. + // glog.V(10).Infof("metric %v not registered", metricInfo) + // return "", false + // } - info, infoFound := r.info[metricInfo] - if !infoFound { - //TODO: Weird that it switches between types here. - glog.V(10).Infof("metric %v not registered", metricInfo) - return "", false - } + // query, err := info.namer.QueryForExternalSeries(info.seriesName, metricSelector) + // if err != nil { + // //TODO: See what was being .String() and implement that for ExternalMetricInfo. + // // errorVal := metricInfo.String() + // errorVal := "something" + // glog.Errorf("unable to construct query for metric %s: %v", errorVal, err) + // return "", false + // } - query, err := info.namer.QueryForExternalSeries(info.seriesName, metricSelector) - if err != nil { - //TODO: See what was being .String() and implement that for ExternalMetricInfo. - // errorVal := metricInfo.String() - errorVal := "something" - glog.Errorf("unable to construct query for metric %s: %v", errorVal, err) - return "", false - } - - return query, true + // return query, true } func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) { @@ -216,7 +222,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetri return nil, false } - resourceLbl, err := info.namer.LabelForResource(metricInfo.GroupResource) + resourceLbl, err := info.namer.ResourceConverter().LabelForResource(metricInfo.GroupResource) if err != nil { glog.Errorf("unable to construct resource label for metric %s: %v", metricInfo.String(), err) return nil, false diff --git a/pkg/custom-provider/series_registry_test.go b/pkg/custom-provider/series_registry_test.go index ed5eb11f..21dde5b9 100644 --- a/pkg/custom-provider/series_registry_test.go +++ b/pkg/custom-provider/series_registry_test.go @@ -117,17 +117,60 @@ var seriesRegistryTestSeries = [][]prom.Series{ }, } +type myType struct { + a int + b string + m map[string]int +} + +type mapWrapper struct { + item map[string]int +} + +func (o *myType) Mutate(newMap mapWrapper) { + o.a = 2 + o.b = "two" + o.m = newMap.item +} + +func TestWeirdStuff(t *testing.T) { + o := myType{ + a: 1, + b: "one", + m: map[string]int{ + "one": 1, + }, + } + + oldMap := o.m + newMap := map[string]int{ + "two": 2, + } + newWrapper := mapWrapper{ + item: newMap, + } + oldWrapper := mapWrapper{ + item: oldMap, + } + o.Mutate(newWrapper) + o.Mutate(oldWrapper) +} + func TestSeriesRegistry(t *testing.T) { assert := assert.New(t) - require := require.New(t) namers := setupMetricNamer(t) registry := &basicSeriesRegistry{ mapper: restMapper(), } + updateResult := metricUpdateResult{ + series: seriesRegistryTestSeries, + namers: namers, + } + // set up the registry - require.NoError(registry.SetSeries(seriesRegistryTestSeries, namers)) + registry.onNewDataAvailable(updateResult) // make sure each metric got registered and can form queries testCases := []struct { @@ -285,7 +328,11 @@ func BenchmarkSetSeries(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - registry.SetSeries(newSeriesSlices, namers) + updateResult := metricUpdateResult{ + series: newSeriesSlices, + namers: namers, + } + registry.onNewDataAvailable(updateResult) } }