Tons of movement. See notes.

* Renamed `MetricNamer` to `SeriesConverter` and renamed `MetricNameConverter` to `MetricNamer`.
* Simplified the `metricConverter` code.
* Greatly simplified the `externalSeriesRegistry` and removed the `externalInfoMap` code.
* Fixed doc comment format.
* Still several `TODO`s to address.
This commit is contained in:
Tony Compton 2018-08-29 14:30:26 -04:00
parent a94494337e
commit d1827c5611
28 changed files with 1106 additions and 1006 deletions

View file

@ -36,7 +36,7 @@ type DiscoveryRule struct {
// as external or custom metrics.
MetricType MetricType `yaml:"metricType,omitempty"`
// ExternalMetricNamespaceLabelName identifies what Prometheus label should be examined
// to apply a namespace to metrics creates from this rule.
// to apply a namespace to metrics created from this rule.
ExternalMetricNamespaceLabelName string `yaml:"externalMetricNamespaceLabelName,omitempty"`
}
@ -80,7 +80,7 @@ type NameMapping struct {
As string `yaml:"as"`
}
//MetricType identifies whether a given metric should be handled and interpreted as a Custom or External metric.
// MetricType identifies whether a given metric should be handled and interpreted as a Custom or External metric.
type MetricType string
// Operator represents a key/field's relationship to value(s).

View file

@ -35,42 +35,36 @@ type Runnable interface {
RunUntil(stopChan <-chan struct{})
}
//A MetricLister provides a window into all of the metrics that are available within a given
//Prometheus instance, classified as either Custom or External metrics, but presented generically
//so that it can manage both types simultaneously.
// A MetricLister provides a window into all of the metrics that are available within a given
// Prometheus instance, classified as either Custom or External metrics, but presented generically
// so that it can manage both types simultaneously.
type MetricLister interface {
// Run()
// UpdateMetrics() error
// GetAllMetrics() []GenericMetricInfo
// GetAllCustomMetrics() []GenericMetricInfo
// GetAllExternalMetrics() []GenericMetricInfo
// GetInfoForMetric(infoKey GenericMetricInfo) (seriesInfo, bool)
ListAllMetrics() (metricUpdateResult, error)
ListAllMetrics() (MetricUpdateResult, error)
}
//A MetricListerWithNotification is a MetricLister that has the ability to notify listeners
//when new metric data is available.
// A MetricListerWithNotification is a MetricLister that has the ability to notify listeners
// when new metric data is available.
type MetricListerWithNotification interface {
MetricLister
Runnable
//AddNotificationReceiver registers a callback to be invoked when new metric data is available.
AddNotificationReceiver(func(metricUpdateResult))
//UpdateNow forces an immediate refresh from the source data. Primarily for test purposes.
// AddNotificationReceiver registers a callback to be invoked when new metric data is available.
AddNotificationReceiver(MetricUpdateCallback)
// UpdateNow forces an immediate refresh from the source data. Primarily for test purposes.
UpdateNow()
}
type basicMetricLister struct {
promClient prom.Client
namers []MetricNamer
converters []SeriesConverter
lookback time.Duration
}
//NewBasicMetricLister creates a MetricLister that is capable of interactly directly with Prometheus to list metrics.
func NewBasicMetricLister(promClient prom.Client, namers []MetricNamer, lookback time.Duration) MetricLister {
// NewBasicMetricLister creates a MetricLister that is capable of interactly directly with Prometheus to list metrics.
func NewBasicMetricLister(promClient prom.Client, converters []SeriesConverter, lookback time.Duration) MetricLister {
lister := basicMetricLister{
promClient: promClient,
namers: namers,
converters: converters,
lookback: lookback,
}
@ -82,10 +76,10 @@ type selectorSeries struct {
series []prom.Series
}
func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
result := metricUpdateResult{
series: make([][]prom.Series, 0),
namers: make([]MetricNamer, 0),
func (l *basicMetricLister) ListAllMetrics() (MetricUpdateResult, error) {
result := MetricUpdateResult{
series: make([][]prom.Series, 0),
converters: make([]SeriesConverter, 0),
}
startTime := pmodel.Now().Add(-1 * l.lookback)
@ -93,10 +87,10 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
// these can take a while on large clusters, so launch in parallel
// and don't duplicate
selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.namers))
errs := make(chan error, len(l.namers))
for _, namer := range l.namers {
sel := namer.Selector()
selectorSeriesChan := make(chan selectorSeries, len(l.converters))
errs := make(chan error, len(l.converters))
for _, converter := range l.converters {
sel := converter.Selector()
if _, ok := selectors[sel]; ok {
errs <- nil
selectorSeriesChan <- selectorSeries{}
@ -110,7 +104,7 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
return
}
errs <- nil
//Push into the channel: "this selector produced these series"
// Push into the channel: "this selector produced these series"
selectorSeriesChan <- selectorSeries{
selector: sel,
series: series,
@ -123,41 +117,49 @@ func (l *basicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
// iterate through, blocking until we've got all results
// We know that, from above, we should have pushed one item into the channel
// for each namer. So here, we'll assume that we should receive one item per namer.
for range l.namers {
// for each converter. So here, we'll assume that we should receive one item per converter.
for range l.converters {
if err := <-errs; err != nil {
return result, fmt.Errorf("unable to update list of all metrics: %v", err)
}
//Receive from the channel: "this selector produced these series"
//We stuff that into this map so that we can collect the data as it arrives
//and then, once we've received it all, we can process it below.
// Receive from the channel: "this selector produced these series"
// We stuff that into this map so that we can collect the data as it arrives
// and then, once we've received it all, we can process it below.
if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series
}
}
close(errs)
//Now that we've collected all of the results into `seriesCacheByQuery`
//we can start processing them.
newSeries := make([][]prom.Series, len(l.namers))
for i, namer := range l.namers {
series, cached := seriesCacheByQuery[namer.Selector()]
// Now that we've collected all of the results into `seriesCacheByQuery`
// we can start processing them.
newSeries := make([][]prom.Series, len(l.converters))
for i, converter := range l.converters {
series, cached := seriesCacheByQuery[converter.Selector()]
if !cached {
return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector())
return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", converter.Selector())
}
//Because namers provide a "post-filtering" option, it's not enough to
//simply take all the series that were produced. We need to further filter them.
newSeries[i] = namer.SeriesFilterer().FilterSeries(series)
// Because converters provide a "post-filtering" option, it's not enough to
// simply take all the series that were produced. We need to further filter them.
newSeries[i] = converter.SeriesFilterer().FilterSeries(series)
}
glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)
result.series = newSeries
result.namers = l.namers
result.converters = l.converters
return result, nil
}
type metricUpdateResult struct {
series [][]prom.Series
namers []MetricNamer
// MetricUpdateResult represents the output of a periodic inspection of metrics found to be
// available in Prometheus.
// It includes both the series data the Prometheus exposed, as well as the configurational
// object that led to their discovery.
type MetricUpdateResult struct {
series [][]prom.Series
converters []SeriesConverter
}
// MetricUpdateCallback is a function signature for receiving periodic updates about
// available metrics.
type MetricUpdateCallback func(MetricUpdateResult)

View file

@ -0,0 +1,27 @@
package provider
import "errors"
// NewOperatorNotSupportedByPrometheusError creates an error that represents the fact that we were requested to service a query that
// Prometheus would be unable to support.
func NewOperatorNotSupportedByPrometheusError() error {
return errors.New("operator not supported by prometheus")
}
// NewOperatorRequiresValuesError creates an error that represents the fact that we were requested to service a query
// that was malformed in its operator/value combination.
func NewOperatorRequiresValuesError() error {
return errors.New("operator requires values")
}
// NewOperatorDoesNotSupportValuesError creates an error that represents the fact that we were requested to service a query
// that was malformed in its operator/value combination.
func NewOperatorDoesNotSupportValuesError() error {
return errors.New("operator does not support values")
}
// NewLabelNotSpecifiedError creates an error that represents the fact that we were requested to service a query
// that was malformed in its label specification.
func NewLabelNotSpecifiedError() error {
return errors.New("label not specified")
}

View file

@ -1,128 +0,0 @@
package provider
import (
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"k8s.io/apimachinery/pkg/labels"
)
//ExportedMetric is a description of an available metric.
type ExportedMetric struct {
MetricName string
Labels labels.Set
Namespace string
}
//ExternalInfoMap is a data object that accepts and organizes information
//about available metrics.
type ExternalInfoMap interface {
//Begins tracking a metric, returning it to the caller.
TrackMetric(metricName string, generatedBy MetricNamer) ExternalMetricData
//Exports a collection of all of the metrics currently being tracked.
ExportMetrics() []ExportedMetric
//Finds a tracked metric with the given metric name, if it exists.
FindMetric(metricName string) (data ExternalMetricData, found bool)
}
//ExternalMetricData is a data object that accepts and organizes information
//about the various series/namespaces that a metric is associated with.
type ExternalMetricData interface {
//MetricName returns the name of the metric represented by this object.
MetricName() string
//WithSeries associates the provided labels with this metric.
WithSeries(labels labels.Set)
//WithNamespacedSeries associates the provided labels with this metric, but within a particular namespace.
WithNamespacedSeries(namespace string, labels labels.Set)
//Exports a collection of all the metrics currently being tracked.
ExportMetrics() []ExportedMetric
//Generates a query to select the series/values for the metric this object represents.
GenerateQuery(namespace string, selector labels.Selector) (prom.Selector, error)
}
type externalInfoMap struct {
metrics map[string]ExternalMetricData
}
type externalMetricData struct {
metricName string
namespacedData map[string][]labels.Set
generatedBy MetricNamer
}
//NewExternalMetricData creates an ExternalMetricData for the provided metric name and namer.
func NewExternalMetricData(metricName string, generatedBy MetricNamer) ExternalMetricData {
return &externalMetricData{
metricName: metricName,
generatedBy: generatedBy,
namespacedData: map[string][]labels.Set{},
}
}
//NewExternalInfoMap creates an empty ExternalInfoMap for storing external metric information.
func NewExternalInfoMap() ExternalInfoMap {
return &externalInfoMap{
metrics: map[string]ExternalMetricData{},
}
}
func (i *externalInfoMap) ExportMetrics() []ExportedMetric {
results := make([]ExportedMetric, 0)
for _, info := range i.metrics {
exported := info.ExportMetrics()
results = append(results, exported...)
}
return results
}
func (i *externalInfoMap) FindMetric(metricName string) (data ExternalMetricData, found bool) {
data, found = i.metrics[metricName]
return data, found
}
func (i *externalInfoMap) TrackMetric(metricName string, generatedBy MetricNamer) ExternalMetricData {
data, found := i.metrics[metricName]
if !found {
data = NewExternalMetricData(metricName, generatedBy)
i.metrics[metricName] = data
}
return data
}
func (d *externalMetricData) MetricName() string {
return d.metricName
}
func (d *externalMetricData) GenerateQuery(namespace string, selector labels.Selector) (prom.Selector, error) {
return d.generatedBy.QueryForExternalSeries(namespace, d.metricName, selector)
}
func (d *externalMetricData) ExportMetrics() []ExportedMetric {
results := make([]ExportedMetric, 0)
for namespace, labelSets := range d.namespacedData {
for _, labelSet := range labelSets {
results = append(results, ExportedMetric{
Labels: labelSet,
MetricName: d.metricName,
Namespace: namespace,
})
}
}
return results
}
func (d *externalMetricData) WithSeries(labels labels.Set) {
d.WithNamespacedSeries("", labels)
}
func (d *externalMetricData) WithNamespacedSeries(namespace string, seriesLabels labels.Set) {
data, found := d.namespacedData[namespace]
if !found {
data = []labels.Set{}
}
data = append(data, seriesLabels)
d.namespacedData[namespace] = data
}

View file

@ -2,29 +2,32 @@ package provider
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/golang/glog"
pmodel "github.com/prometheus/common/model"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
conv "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter"
)
//TODO: AC - Make sure everything has the proper licensing disclosure at the top.
// TODO: Make sure everything has the proper licensing disclosure at the top.
type externalPrometheusProvider struct {
promClient prom.Client
metricConverter conv.MetricConverter
metricConverter MetricConverter
seriesRegistry ExternalSeriesRegistry
}
//NewExternalPrometheusProvider creates an ExternalMetricsProvider capable of responding to Kubernetes requests for external metric data.
func NewExternalPrometheusProvider(seriesRegistry ExternalSeriesRegistry, promClient prom.Client, converter conv.MetricConverter) provider.ExternalMetricsProvider {
// NewExternalPrometheusProvider creates an ExternalMetricsProvider capable of responding to Kubernetes requests for external metric data.
func NewExternalPrometheusProvider(seriesRegistry ExternalSeriesRegistry, promClient prom.Client, converter MetricConverter) provider.ExternalMetricsProvider {
return &externalPrometheusProvider{
promClient: promClient,
seriesRegistry: seriesRegistry,
@ -33,18 +36,23 @@ func NewExternalPrometheusProvider(seriesRegistry ExternalSeriesRegistry, promCl
}
func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) {
selector, found := p.seriesRegistry.QueryForMetric(namespace, metricName, metricSelector)
selector, found, err := p.seriesRegistry.QueryForMetric(namespace, metricName, metricSelector)
if err != nil {
glog.Errorf("unable to generate a query for the metric: %v", err)
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
}
if !found {
return &external_metrics.ExternalMetricValueList{
Items: []external_metrics.ExternalMetricValue{},
}, nil
return nil, provider.NewMetricNotFoundError(p.selectGroupResource(namespace), metricName)
}
queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector)
if err != nil {
return nil, err
glog.Errorf("unable to fetch metrics from prometheus: %v", err)
// don't leak implementation details to the user
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
}
return p.metricConverter.Convert(queryResults)
@ -53,3 +61,14 @@ func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricN
func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
return p.seriesRegistry.ListAllMetrics()
}
func (p *externalPrometheusProvider) selectGroupResource(namespace string) schema.GroupResource {
if namespace == "" {
return nsGroupResource
}
return schema.GroupResource{
Group: "",
Resource: "",
}
}

View file

@ -3,8 +3,6 @@ package provider
import (
"sync"
"github.com/prometheus/common/model"
"github.com/golang/glog"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta"
@ -14,126 +12,99 @@ import (
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
//ExternalSeriesRegistry acts as the top-level converter for transforming Kubernetes requests
//for external metrics into Prometheus queries.
// ExternalSeriesRegistry acts as the top-level converter for transforming Kubernetes requests
// for external metrics into Prometheus queries.
type ExternalSeriesRegistry interface {
// ListAllMetrics lists all metrics known to this registry
ListAllMetrics() []provider.ExternalMetricInfo
QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (query prom.Selector, found bool)
QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (query prom.Selector, found bool, err error)
}
// overridableSeriesRegistry is a basic SeriesRegistry
type externalSeriesRegistry struct {
mu sync.RWMutex
// metrics is the list of all known metrics
// metrics is the list of all known metrics, ready to return from the API
metrics []provider.ExternalMetricInfo
// rawMetrics is a lookup from a metric to SeriesConverter for the sake of generating queries
rawMetrics map[string]SeriesConverter
mapper apimeta.RESTMapper
metricLister MetricListerWithNotification
externalMetricInfo ExternalInfoMap
metricLister MetricListerWithNotification
}
//NewExternalSeriesRegistry creates an ExternalSeriesRegistry driven by the data from the provided MetricLister.
// NewExternalSeriesRegistry creates an ExternalSeriesRegistry driven by the data from the provided MetricLister.
func NewExternalSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) ExternalSeriesRegistry {
var registry = externalSeriesRegistry{
mapper: mapper,
metricLister: lister,
externalMetricInfo: NewExternalInfoMap(),
mapper: mapper,
metricLister: lister,
metrics: make([]provider.ExternalMetricInfo, 0),
rawMetrics: map[string]SeriesConverter{},
}
lister.AddNotificationReceiver(registry.onNewDataAvailable)
lister.AddNotificationReceiver(registry.filterAndStoreMetrics)
return &registry
}
func (r *externalSeriesRegistry) filterMetrics(result metricUpdateResult) metricUpdateResult {
namers := make([]MetricNamer, 0)
func (r *externalSeriesRegistry) filterMetrics(result MetricUpdateResult) MetricUpdateResult {
converters := make([]SeriesConverter, 0)
series := make([][]prom.Series, 0)
targetType := config.External
for i, namer := range result.namers {
if namer.MetricType() == targetType {
namers = append(namers, namer)
for i, converter := range result.converters {
if converter.MetricType() == targetType {
converters = append(converters, converter)
series = append(series, result.series[i])
}
}
return metricUpdateResult{
namers: namers,
series: series,
return MetricUpdateResult{
converters: converters,
series: series,
}
}
func (r *externalSeriesRegistry) convertLabels(labels model.LabelSet) labels.Set {
set := map[string]string{}
for key, value := range labels {
set[string(key)] = string(value)
}
return set
}
func (r *externalSeriesRegistry) onNewDataAvailable(result metricUpdateResult) {
func (r *externalSeriesRegistry) filterAndStoreMetrics(result MetricUpdateResult) {
result = r.filterMetrics(result)
newSeriesSlices := result.series
namers := result.namers
converters := result.converters
// if len(newSeriesSlices) != len(namers) {
// return fmt.Errorf("need one set of series per namer")
// if len(newSeriesSlices) != len(converters) {
// return fmt.Errorf("need one set of series per converter")
// }
apiMetricsCache := make([]provider.ExternalMetricInfo, 0)
rawMetricsCache := make(map[string]SeriesConverter)
updatedCache := NewExternalInfoMap()
for i, newSeries := range newSeriesSlices {
namer := namers[i]
converter := converters[i]
for _, series := range newSeries {
identity, err := namer.IdentifySeries(series)
identity, err := converter.IdentifySeries(series)
if err != nil {
glog.Errorf("unable to name series %q, skipping: %v", series.String(), err)
continue
}
// resources := identity.resources
// namespaced := identity.namespaced
name := identity.name
labels := r.convertLabels(series.Labels)
//Check for a label indicating namespace.
metricNs, found := series.Labels[model.LabelName(namer.ExternalMetricNamespaceLabelName())]
if !found {
metricNs = ""
}
trackedMetric := updatedCache.TrackMetric(name, namer)
trackedMetric.WithNamespacedSeries(string(metricNs), labels)
rawMetricsCache[name] = converter
}
}
// regenerate metrics
allMetrics := updatedCache.ExportMetrics()
convertedMetrics := r.convertMetrics(allMetrics)
for metricName := range rawMetricsCache {
apiMetricsCache = append(apiMetricsCache, provider.ExternalMetricInfo{
Metric: metricName,
})
}
r.mu.Lock()
defer r.mu.Unlock()
r.externalMetricInfo = updatedCache
r.metrics = convertedMetrics
}
func (r *externalSeriesRegistry) convertMetrics(metrics []ExportedMetric) []provider.ExternalMetricInfo {
results := make([]provider.ExternalMetricInfo, len(metrics))
for i, info := range metrics {
results[i] = provider.ExternalMetricInfo{
Labels: info.Labels,
Metric: info.MetricName,
}
}
return results
r.metrics = apiMetricsCache
r.rawMetrics = rawMetricsCache
}
func (r *externalSeriesRegistry) ListAllMetrics() []provider.ExternalMetricInfo {
@ -143,22 +114,17 @@ func (r *externalSeriesRegistry) ListAllMetrics() []provider.ExternalMetricInfo
return r.metrics
}
func (r *externalSeriesRegistry) QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (query prom.Selector, found bool) {
func (r *externalSeriesRegistry) QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (query prom.Selector, found bool, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
metric, found := r.externalMetricInfo.FindMetric(metricName)
converter, found := r.rawMetrics[metricName]
if !found {
glog.V(10).Infof("external metric %q not registered", metricName)
return "", false
glog.V(10).Infof("external metric %q not found", metricName)
return "", false, nil
}
query, err := metric.GenerateQuery(namespace, metricSelector)
if err != nil {
glog.Errorf("unable to construct query for external metric %s: %v", metricName, err)
return "", false
}
return query, true
query, err = converter.QueryForExternalSeries(namespace, metricName, metricSelector)
return query, found, err
}

View file

@ -1,37 +0,0 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type matrixConverter struct {
}
//NewMatrixConverter creates a MatrixConverter capable of converting
//matrix Prometheus query results into external metric types.
func NewMatrixConverter() MetricConverter {
return &matrixConverter{}
}
func (c *matrixConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValMatrix {
return nil, errors.New("matrixConverter can only convert scalar query results")
}
toConvert := queryResult.Matrix
if toConvert == nil {
return nil, errors.New("the provided input did not contain matrix query results")
}
return c.convert(toConvert)
}
func (c *matrixConverter) convert(result *model.Matrix) (*external_metrics.ExternalMetricValueList, error) {
//TODO: AC - Implementation.
return nil, errors.New("converting Matrix results is not yet supported")
}

View file

@ -1,52 +0,0 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
//MetricConverter provides a unified interface for converting the results of
//Prometheus queries into external metric types.
type MetricConverter interface {
Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error)
}
type metricConverter struct {
scalarConverter MetricConverter
vectorConverter MetricConverter
matrixConverter MetricConverter
}
func DefaultMetricConverter() MetricConverter {
sampleConverter := NewSampleConverter()
return NewMetricConverter(NewScalarConverter(), NewVectorConverter(&sampleConverter), NewMatrixConverter())
}
//NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types
//returned by the Prometheus client into external metrics types.
func NewMetricConverter(scalar MetricConverter, vector MetricConverter, matrix MetricConverter) MetricConverter {
return &metricConverter{
scalarConverter: scalar,
vectorConverter: vector,
matrixConverter: matrix,
}
}
func (c *metricConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type == model.ValScalar {
return c.scalarConverter.Convert(queryResult)
}
if queryResult.Type == model.ValVector {
return c.vectorConverter.Convert(queryResult)
}
if queryResult.Type == model.ValMatrix {
return c.matrixConverter.Convert(queryResult)
}
return nil, errors.New("encountered an unexpected query result type")
}

View file

@ -1,48 +0,0 @@
package provider
import (
"github.com/prometheus/common/model"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type sampleConverter struct {
}
//SampleConverter is capable of translating Prometheus Sample objects
//into ExternamMetricValue objects.
type SampleConverter interface {
Convert(sample *model.Sample) (*external_metrics.ExternalMetricValue, error)
}
//NewSampleConverter creates a SampleConverter capable of translating Prometheus Sample objects
//into ExternamMetricValue objects.
func NewSampleConverter() SampleConverter {
return &sampleConverter{}
}
func (c *sampleConverter) Convert(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) {
labels := c.convertLabels(sample.Metric)
singleMetric := external_metrics.ExternalMetricValue{
MetricName: string(sample.Metric[model.LabelName("__name__")]),
Timestamp: metav1.Time{
sample.Timestamp.Time(),
},
Value: *resource.NewMilliQuantity(int64(sample.Value*1000.0), resource.DecimalSI),
MetricLabels: labels,
}
return &singleMetric, nil
}
func (c *sampleConverter) convertLabels(inLabels model.Metric) map[string]string {
numLabels := len(inLabels)
outLabels := make(map[string]string, numLabels)
for labelName, labelVal := range inLabels {
outLabels[string(labelName)] = string(labelVal)
}
return outLabels
}

View file

@ -1,48 +0,0 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type scalarConverter struct {
}
//NewScalarConverter creates a ScalarConverter capable of converting
//scalar Prometheus query results into external metric types.
func NewScalarConverter() MetricConverter {
return &scalarConverter{}
}
func (c *scalarConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValScalar {
return nil, errors.New("scalarConverter can only convert scalar query results")
}
toConvert := queryResult.Scalar
if toConvert == nil {
return nil, errors.New("the provided input did not contain scalar query results")
}
return c.convert(toConvert)
}
func (c *scalarConverter) convert(input *model.Scalar) (*external_metrics.ExternalMetricValueList, error) {
result := external_metrics.ExternalMetricValueList{
Items: []external_metrics.ExternalMetricValue{
{
Timestamp: metav1.Time{
input.Timestamp.Time(),
},
Value: *resource.NewMilliQuantity(int64(input.Value*1000.0), resource.DecimalSI),
},
},
}
return &result, nil
}

View file

@ -1,64 +0,0 @@
package provider
import (
"errors"
"fmt"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type vectorConverter struct {
SampleConverter SampleConverter
}
//NewVectorConverter creates a VectorConverter capable of converting
//vector Prometheus query results into external metric types.
func NewVectorConverter(sampleConverter *SampleConverter) MetricConverter {
return &vectorConverter{
SampleConverter: *sampleConverter,
}
}
func (c *vectorConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValVector {
return nil, errors.New("vectorConverter can only convert scalar query results")
}
toConvert := *queryResult.Vector
if toConvert == nil {
return nil, errors.New("the provided input did not contain vector query results")
}
return c.convert(toConvert)
}
func (c *vectorConverter) convert(result model.Vector) (*external_metrics.ExternalMetricValueList, error) {
items := []external_metrics.ExternalMetricValue{}
metricValueList := external_metrics.ExternalMetricValueList{
Items: items,
}
numSamples := result.Len()
if numSamples == 0 {
return &metricValueList, nil
}
for _, val := range result {
singleMetric, err := c.SampleConverter.Convert(val)
if err != nil {
return nil, fmt.Errorf("unable to convert vector: %v", err)
}
items = append(items, *singleMetric)
}
metricValueList = external_metrics.ExternalMetricValueList{
Items: items,
}
return &metricValueList, nil
}

View file

@ -0,0 +1,126 @@
package provider
import (
"errors"
"fmt"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
// MetricConverter provides a unified interface for converting the results of
// Prometheus queries into external metric types.
type MetricConverter interface {
Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error)
}
type metricConverter struct {
}
// NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types
// returned by the Prometheus client into external metrics types.
func NewMetricConverter() MetricConverter {
return &metricConverter{}
}
func (c *metricConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type == model.ValScalar {
return c.convertScalar(queryResult)
}
if queryResult.Type == model.ValVector {
return c.convertVector(queryResult)
}
return nil, errors.New("encountered an unexpected query result type")
}
func (c *metricConverter) convertSample(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) {
labels := c.convertLabels(sample.Metric)
singleMetric := external_metrics.ExternalMetricValue{
MetricName: string(sample.Metric[model.LabelName("__name__")]),
Timestamp: metav1.Time{
sample.Timestamp.Time(),
},
Value: *resource.NewMilliQuantity(int64(sample.Value*1000.0), resource.DecimalSI),
MetricLabels: labels,
}
return &singleMetric, nil
}
func (c *metricConverter) convertLabels(inLabels model.Metric) map[string]string {
numLabels := len(inLabels)
outLabels := make(map[string]string, numLabels)
for labelName, labelVal := range inLabels {
outLabels[string(labelName)] = string(labelVal)
}
return outLabels
}
func (c *metricConverter) convertVector(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValVector {
return nil, errors.New("incorrect query result type")
}
toConvert := *queryResult.Vector
if toConvert == nil {
return nil, errors.New("the provided input did not contain vector query results")
}
items := []external_metrics.ExternalMetricValue{}
metricValueList := external_metrics.ExternalMetricValueList{
Items: items,
}
numSamples := toConvert.Len()
if numSamples == 0 {
return &metricValueList, nil
}
for _, val := range toConvert {
singleMetric, err := c.convertSample(val)
if err != nil {
return nil, fmt.Errorf("unable to convert vector: %v", err)
}
items = append(items, *singleMetric)
}
metricValueList = external_metrics.ExternalMetricValueList{
Items: items,
}
return &metricValueList, nil
}
func (c *metricConverter) convertScalar(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValScalar {
return nil, errors.New("scalarConverter can only convert scalar query results")
}
toConvert := queryResult.Scalar
if toConvert == nil {
return nil, errors.New("the provided input did not contain scalar query results")
}
result := external_metrics.ExternalMetricValueList{
Items: []external_metrics.ExternalMetricValue{
{
Timestamp: metav1.Time{
toConvert.Timestamp.Time(),
},
Value: *resource.NewMilliQuantity(int64(toConvert.Value*1000.0), resource.DecimalSI),
},
},
}
return &result, nil
}

View file

@ -1,63 +0,0 @@
package provider
import (
"fmt"
"regexp"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
//MetricNameConverter provides functions for naming custom metrics from Promethes series.
type MetricNameConverter interface {
GetMetricNameForSeries(series prom.Series) (string, error)
}
type metricNameConverter struct {
nameMatches *regexp.Regexp
nameAs string
}
//NewMetricNameConverter creates a MetricNameConverter capable of translating Prometheus series names
//into custom metric names.
func NewMetricNameConverter(mapping config.NameMapping) (MetricNameConverter, error) {
var nameMatches *regexp.Regexp
var err error
if mapping.Matches != "" {
nameMatches, err = regexp.Compile(mapping.Matches)
if err != nil {
return nil, fmt.Errorf("unable to compile series name match expression %q: %v", mapping.Matches, err)
}
} else {
// this will always succeed
nameMatches = regexp.MustCompile(".*")
}
nameAs := mapping.As
if nameAs == "" {
// check if we have an obvious default
subexpNames := nameMatches.SubexpNames()
if len(subexpNames) == 1 {
// no capture groups, use the whole thing
nameAs = "$0"
} else if len(subexpNames) == 2 {
// one capture group, use that
nameAs = "$1"
} else {
return nil, fmt.Errorf("must specify an 'as' value for name matcher %q", mapping.Matches)
}
}
return &metricNameConverter{
nameMatches: nameMatches,
nameAs: nameAs,
}, nil
}
func (c *metricNameConverter) GetMetricNameForSeries(series prom.Series) (string, error) {
matches := c.nameMatches.FindStringSubmatchIndex(series.Name)
if matches == nil {
return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, c.nameMatches.String())
}
outNameBytes := c.nameMatches.ExpandString(nil, c.nameAs, series.Name, matches)
return string(outNameBytes), nil
}

View file

@ -11,8 +11,8 @@ import (
func TestWhenNoMappingMetricNameIsUnaltered(t *testing.T) {
emptyMapping := config.NameMapping{}
RunTest(t, emptyMapping, "my_series", "my_series")
RunTest(t, emptyMapping, "your_series", "your_series")
runTest(t, emptyMapping, "my_series", "my_series")
runTest(t, emptyMapping, "your_series", "your_series")
}
func TestWhenMappingWithOneCaptureGroupMetricNameIsCorrect(t *testing.T) {
@ -21,52 +21,52 @@ func TestWhenMappingWithOneCaptureGroupMetricNameIsCorrect(t *testing.T) {
As: "your_$1",
}
RunTest(t, mapping, "my_requests_per_second", "your_requests_per_second")
runTest(t, mapping, "my_requests_per_second", "your_requests_per_second")
}
func TestWhenMappingWithMultipleCaptureGroupsMetricNameIsCorrect(t *testing.T) {
//ExpandString has some strange behavior when using the $1, $2 syntax
//Specifically, it doesn't return the expected values for templates like:
//$1_$2
//You can work around it by using the ${1} syntax.
// ExpandString has some strange behavior when using the $1, $2 syntax
// Specifically, it doesn't return the expected values for templates like:
// $1_$2
// You can work around it by using the ${1} syntax.
mapping := config.NameMapping{
Matches: "my_([^_]+)_([^_]+)",
As: "your_${1}_is_${2}_large",
}
RunTest(t, mapping, "my_horse_very", "your_horse_is_very_large")
RunTest(t, mapping, "my_dog_not", "your_dog_is_not_large")
runTest(t, mapping, "my_horse_very", "your_horse_is_very_large")
runTest(t, mapping, "my_dog_not", "your_dog_is_not_large")
}
func TestAsCanBeInferred(t *testing.T) {
//When we've got one capture group, we should infer that as the target.
// When we've got one capture group, we should infer that as the target.
mapping := config.NameMapping{
Matches: "my_(.+)",
}
RunTest(t, mapping, "my_test_metric", "test_metric")
runTest(t, mapping, "my_test_metric", "test_metric")
//When we have no capture groups, we should infer that the whole thing as the target.
// When we have no capture groups, we should infer that the whole thing as the target.
mapping = config.NameMapping{
Matches: "my_metric",
}
RunTest(t, mapping, "my_metric", "my_metric")
runTest(t, mapping, "my_metric", "my_metric")
}
func TestWhenAsCannotBeInferredError(t *testing.T) {
//More than one capture group should
//result in us giving up on making an educated guess.
// More than one capture group should
// result in us giving up on making an educated guess.
mapping := config.NameMapping{
Matches: "my_([^_]+)_([^_]+)",
}
RunTestExpectingError(t, mapping, "my_horse_very")
RunTestExpectingError(t, mapping, "my_dog_not")
runTestExpectingError(t, mapping, "my_horse_very")
runTestExpectingError(t, mapping, "my_dog_not")
}
func RunTest(t *testing.T, mapping config.NameMapping, input string, expectedResult string) {
converter, err := NewMetricNameConverter(mapping)
func runTest(t *testing.T, mapping config.NameMapping, input string, expectedResult string) {
converter, err := NewMetricNamer(mapping)
require.NoError(t, err)
series := prom.Series{
@ -79,7 +79,7 @@ func RunTest(t *testing.T, mapping config.NameMapping, input string, expectedRes
require.Equal(t, expectedResult, actualResult)
}
func RunTestExpectingError(t *testing.T, mapping config.NameMapping, input string) {
_, err := NewMetricNameConverter(mapping)
func runTestExpectingError(t *testing.T, mapping config.NameMapping, input string) {
_, err := NewMetricNamer(mapping)
require.Error(t, err)
}

View file

@ -2,284 +2,62 @@ package provider
import (
"fmt"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"regexp"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
var nsGroupResource = schema.GroupResource{Resource: "namespaces"}
// MetricNamer knows how to convert Prometheus series names and label names to
// metrics API resources, and vice-versa. MetricNamers should be safe to access
// concurrently. Returned group-resources are "normalized" as per the
// MetricInfo#Normalized method. Group-resources passed as arguments must
// themselves be normalized.
// MetricNamer provides functions for naming custom metrics from Promethes series.
type MetricNamer interface {
// Selector produces the appropriate Prometheus series selector to match all
// series handlable by this namer.
Selector() prom.Selector
// FilterSeries checks to see which of the given series match any additional
// constrains beyond the series query. It's assumed that the series given
// already matche the series query.
// FilterSeries(series []prom.Series) []prom.Series
SeriesFilterer() SeriesFilterer
ResourceConverter() ResourceConverter
// MetricNameForSeries returns the name (as presented in the API) for a given series.
// MetricNameForSeries(series prom.Series) (string, error)
// QueryForSeries returns the query for a given series (not API metric name), with
// the given namespace name (if relevant), resource, and resource names.
QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error)
QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error)
IdentifySeries(series prom.Series) (seriesIdentity, error)
MetricType() config.MetricType
ExternalMetricNamespaceLabelName() string
}
type seriesIdentity struct {
resources []schema.GroupResource
namespaced bool
name string
}
func (n *metricNamer) Selector() prom.Selector {
return n.seriesQuery
GetMetricNameForSeries(series prom.Series) (string, error)
}
type metricNamer struct {
seriesQuery prom.Selector
resourceConverter ResourceConverter
queryBuilder QueryBuilder
seriesFilterer SeriesFilterer
metricNameConverter MetricNameConverter
mapper apimeta.RESTMapper
metricType config.MetricType
externalMetricNamespaceLabel string
nameMatches *regexp.Regexp
nameAs string
}
// queryTemplateArgs are the arguments for the metrics query template.
type queryTemplateArgs struct {
Series string
LabelMatchers string
LabelValuesByName map[string][]string
GroupBy string
GroupBySlice []string
}
func (n *metricNamer) MetricType() config.MetricType {
return n.metricType
}
func (n *metricNamer) ExternalMetricNamespaceLabelName() string {
return n.externalMetricNamespaceLabel
}
func (n *metricNamer) IdentifySeries(series prom.Series) (seriesIdentity, error) {
// TODO: warn if it doesn't match any resources
resources, namespaced := n.resourceConverter.ResourcesForSeries(series)
name, err := n.metricNameConverter.GetMetricNameForSeries(series)
result := seriesIdentity{
resources: resources,
namespaced: namespaced,
name: name,
}
return result, err
}
func (n *metricNamer) SeriesFilterer() SeriesFilterer {
return n.seriesFilterer
}
func (n *metricNamer) ResourceConverter() ResourceConverter {
return n.resourceConverter
}
func (n *metricNamer) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart {
requirements, _ := metricSelector.Requirements()
selectors := []queryPart{}
for i := 0; i < len(requirements); i++ {
selector := n.convertRequirement(requirements[i])
selectors = append(selectors, selector)
}
return selectors
}
func (n *metricNamer) convertRequirement(requirement labels.Requirement) queryPart {
labelName := requirement.Key()
values := requirement.Values().List()
return queryPart{
labelName: labelName,
values: values,
}
}
type queryPart struct {
labelName string
values []string
}
func (n *metricNamer) buildNamespaceQueryPartForSeries(namespace string) (queryPart, error) {
result := queryPart{}
//If we've been given a namespace, then we need to set up
//the label requirements to target that namespace.
if namespace != "" {
namespaceLbl, err := n.ResourceConverter().LabelForResource(nsGroupResource)
// NewMetricNamer creates a MetricNamer capable of translating Prometheus series names
// into custom metric names.
func NewMetricNamer(mapping config.NameMapping) (MetricNamer, error) {
var nameMatches *regexp.Regexp
var err error
if mapping.Matches != "" {
nameMatches, err = regexp.Compile(mapping.Matches)
if err != nil {
return result, err
return nil, fmt.Errorf("unable to compile series name match expression %q: %v", mapping.Matches, err)
}
values := []string{namespace}
result = queryPart{
values: values,
labelName: string(namespaceLbl),
} else {
// this will always succeed
nameMatches = regexp.MustCompile(".*")
}
nameAs := mapping.As
if nameAs == "" {
// check if we have an obvious default
subexpNames := nameMatches.SubexpNames()
if len(subexpNames) == 1 {
// no capture groups, use the whole thing
nameAs = "$0"
} else if len(subexpNames) == 2 {
// one capture group, use that
nameAs = "$1"
} else {
return nil, fmt.Errorf("must specify an 'as' value for name matcher %q", mapping.Matches)
}
}
return result, nil
}
func (n *metricNamer) buildResourceQueryPartForSeries(resource schema.GroupResource, names ...string) (queryPart, error) {
result := queryPart{}
//If we've been given a resource, then we need to set up
//the label requirements to target that resource.
resourceLbl, err := n.ResourceConverter().LabelForResource(resource)
if err != nil {
return result, err
}
result = queryPart{
labelName: string(resourceLbl),
values: names,
}
return result, nil
}
func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) {
queryParts := []queryPart{}
//Build up the namespace part of the query.
namespaceQueryPart, err := n.buildNamespaceQueryPartForSeries(namespace)
if err != nil {
return "", err
}
queryParts = append(queryParts, namespaceQueryPart)
//Build up the resource part of the query.
resourceQueryPart, err := n.buildResourceQueryPartForSeries(resource, names...)
if err != nil {
return "", err
}
queryParts = append(queryParts, resourceQueryPart)
return n.queryBuilder.BuildSelector(series, resourceQueryPart.labelName, []string{resourceQueryPart.labelName}, queryParts)
}
// NamersFromConfig produces a MetricNamer for each rule in the given config.
func NamersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]MetricNamer, error) {
namers := make([]MetricNamer, len(cfg.Rules))
for i, rule := range cfg.Rules {
var err error
resourceConverter, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper)
if err != nil {
return nil, fmt.Errorf("unable to create ResourceConverter associated with series query %q: %v", rule.SeriesQuery, err)
}
queryBuilder, err := NewQueryBuilder(rule.MetricsQuery)
if err != nil {
return nil, fmt.Errorf("unable to create a QueryBuilder associated with series query %q: %v", rule.SeriesQuery, err)
}
seriesFilterer, err := NewSeriesFilterer(rule.SeriesFilters)
if err != nil {
return nil, fmt.Errorf("unable to create a SeriesFilter associated with series query %q: %v", rule.SeriesQuery, err)
}
if rule.Name.Matches != "" {
err := seriesFilterer.AddRequirement(config.RegexFilter{Is: rule.Name.Matches})
if err != nil {
return nil, fmt.Errorf("unable to apply the series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err)
}
}
metricNameConverter, err := NewMetricNameConverter(rule.Name)
if err != nil {
return nil, fmt.Errorf("unable to create a MetricNameConverter associated with series query %q: %v", rule.SeriesQuery, err)
}
namespaceLabel := ""
if rule.MetricType == config.External {
namespaceLabel = rule.ExternalMetricNamespaceLabelName
}
metricType := rule.MetricType
if metricType == config.MetricType("") {
metricType = config.Custom
}
namer := &metricNamer{
seriesQuery: prom.Selector(rule.SeriesQuery),
mapper: mapper,
resourceConverter: resourceConverter,
queryBuilder: queryBuilder,
seriesFilterer: seriesFilterer,
metricNameConverter: metricNameConverter,
metricType: metricType,
externalMetricNamespaceLabel: namespaceLabel,
}
namers[i] = namer
}
return namers, nil
}
func (n *metricNamer) buildNamespaceQueryPartForExternalSeries(namespace string) (queryPart, error) {
return queryPart{
labelName: n.externalMetricNamespaceLabel,
values: []string{namespace},
return &metricNamer{
nameMatches: nameMatches,
nameAs: nameAs,
}, nil
}
func (n *metricNamer) QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error) {
queryParts := []queryPart{}
if namespace != "" {
//Build up the namespace part of the query.
namespaceQueryPart, err := n.buildNamespaceQueryPartForExternalSeries(namespace)
if err != nil {
return "", err
}
queryParts = append(queryParts, namespaceQueryPart)
func (c *metricNamer) GetMetricNameForSeries(series prom.Series) (string, error) {
matches := c.nameMatches.FindStringSubmatchIndex(series.Name)
if matches == nil {
return "", fmt.Errorf("series name %q did not match expected pattern %q", series.Name, c.nameMatches.String())
}
//Build up the query parts from the selector.
queryParts = append(queryParts, n.createQueryPartsFromSelector(metricSelector)...)
selector, err := n.queryBuilder.BuildSelector(series, "", []string{}, queryParts)
if err != nil {
return "", err
}
return selector, nil
outNameBytes := c.nameMatches.ExpandString(nil, c.nameAs, series.Name, matches)
return string(outNameBytes), nil
}

View file

@ -26,27 +26,27 @@ import (
type periodicMetricLister struct {
realLister MetricLister
updateInterval time.Duration
mostRecentResult metricUpdateResult
callbacks []func(metricUpdateResult)
mostRecentResult MetricUpdateResult
callbacks []MetricUpdateCallback
}
//NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics
//at the provided interval, but defers the actual act of retrieving the metrics to the supplied MetricLister.
// NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics
// at the provided interval, but defers the actual act of retrieving the metrics to the supplied MetricLister.
func NewPeriodicMetricLister(realLister MetricLister, updateInterval time.Duration) (MetricListerWithNotification, Runnable) {
lister := periodicMetricLister{
updateInterval: updateInterval,
realLister: realLister,
callbacks: make([]func(metricUpdateResult), 0),
callbacks: make([]MetricUpdateCallback, 0),
}
return &lister, &lister
}
func (l *periodicMetricLister) AddNotificationReceiver(callback func(metricUpdateResult)) {
func (l *periodicMetricLister) AddNotificationReceiver(callback MetricUpdateCallback) {
l.callbacks = append(l.callbacks, callback)
}
func (l *periodicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
func (l *periodicMetricLister) ListAllMetrics() (MetricUpdateResult, error) {
return l.mostRecentResult, nil
}

View file

@ -12,10 +12,10 @@ type fakeLister struct {
callCount int
}
func (f *fakeLister) ListAllMetrics() (metricUpdateResult, error) {
func (f *fakeLister) ListAllMetrics() (MetricUpdateResult, error) {
f.callCount++
return metricUpdateResult{
return MetricUpdateResult{
series: [][]prom.Series{
[]prom.Series{
prom.Series{
@ -32,7 +32,7 @@ func TestWhenNewMetricsAvailableCallbackIsInvoked(t *testing.T) {
periodicLister := targetLister.(*periodicMetricLister)
callbackInvoked := false
callback := func(r metricUpdateResult) {
callback := func(r MetricUpdateResult) {
callbackInvoked = true
}
@ -47,21 +47,21 @@ func TestWhenListingMetricsReturnsCachedValues(t *testing.T) {
targetLister, _ := NewPeriodicMetricLister(fakeLister, time.Duration(1000))
periodicLister := targetLister.(*periodicMetricLister)
//We haven't invoked the inner lister yet, so we should have no results.
// We haven't invoked the inner lister yet, so we should have no results.
resultBeforeUpdate, err := periodicLister.ListAllMetrics()
require.NoError(t, err)
require.Equal(t, 0, len(resultBeforeUpdate.series))
require.Equal(t, 0, fakeLister.callCount)
//We can simulate waiting for the udpate interval to pass...
//which should result in calling the inner lister to get the metrics.
// We can simulate waiting for the udpate interval to pass...
// which should result in calling the inner lister to get the metrics.
err = periodicLister.updateMetrics()
require.NoError(t, err)
require.Equal(t, 1, fakeLister.callCount)
//If we list now, we should return the cached values.
//Make sure we got some results this time
//as well as that we didn't unnecessarily invoke the inner lister.
// If we list now, we should return the cached values.
// Make sure we got some results this time
// as well as that we didn't unnecessarily invoke the inner lister.
resultAfterUpdate, err := periodicLister.ListAllMetrics()
require.NoError(t, err)
require.NotEqual(t, 0, len(resultAfterUpdate.series))

View file

@ -46,10 +46,10 @@ type prometheusProvider struct {
SeriesRegistry
}
//NewPrometheusProvider creates an CustomMetricsProvider capable of responding to Kubernetes requests for custom metric data.
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) {
//TODO: AC - Consider injecting these objects and calling .Run() before calling this function.
basicLister := NewBasicMetricLister(promClient, namers, updateInterval)
// NewPrometheusProvider creates an CustomMetricsProvider capable of responding to Kubernetes requests for custom metric data.
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, converters []SeriesConverter, updateInterval time.Duration) (provider.CustomMetricsProvider, Runnable) {
// TODO: Consider injecting these objects and calling .Run() on the runnables before calling this function.
basicLister := NewBasicMetricLister(promClient, converters, updateInterval)
periodicLister, _ := NewPeriodicMetricLister(basicLister, updateInterval)
seriesRegistry := NewBasicSeriesRegistry(periodicLister, mapper)

View file

@ -92,10 +92,10 @@ func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fak
fakeKubeClient := &fakedyn.FakeDynamicClient{}
cfg := config.DefaultConfig(1*time.Minute, "")
namers, err := NamersFromConfig(cfg, restMapper())
converters, err := ConvertersFromConfig(cfg, restMapper())
require.NoError(t, err)
prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval)
prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, converters, fakeProviderUpdateInterval)
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))

View file

@ -2,14 +2,17 @@ package provider
import (
"bytes"
"errors"
"fmt"
"strings"
"text/template"
"k8s.io/apimachinery/pkg/selection"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
//QueryBuilder provides functions for generating Prometheus queries.
// QueryBuilder provides functions for generating Prometheus queries.
type QueryBuilder interface {
BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error)
}
@ -18,7 +21,7 @@ type queryBuilder struct {
metricsQueryTemplate *template.Template
}
//NewQueryBuilder creates a QueryBuilder.
// NewQueryBuilder creates a QueryBuilder.
func NewQueryBuilder(metricsQuery string) (QueryBuilder, error) {
metricsQueryTemplate, err := template.New("metrics-query").Delims("<<", ">>").Parse(metricsQuery)
if err != nil {
@ -32,7 +35,11 @@ func NewQueryBuilder(metricsQuery string) (QueryBuilder, error) {
func (n *queryBuilder) BuildSelector(seriesName string, groupBy string, groupBySlice []string, queryParts []queryPart) (prom.Selector, error) {
//Convert our query parts into the types we need for our template.
exprs, valuesByName := n.processQueryParts(queryParts)
exprs, valuesByName, err := n.processQueryParts(queryParts)
if err != nil {
return "", err
}
args := queryTemplateArgs{
Series: seriesName,
@ -64,7 +71,12 @@ func (n *queryBuilder) createSelectorFromTemplateArgs(args queryTemplateArgs) (p
return prom.Selector(queryBuff.String()), nil
}
func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[string][]string) {
func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[string][]string, error) {
//We've take the approach here that if we can't perfectly map their query into a Prometheus
//query that we should abandon the effort completely.
//The concern is that if we don't get a perfect match on their query parameters, the query result
//might contain unexpected data that would cause them to take an erroneous action based on the result.
//Contains the expressions that we want to include as part of the query to Prometheus.
//e.g. "namespace=my-namespace"
//e.g. "some_label=some-value"
@ -78,15 +90,23 @@ func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[
for _, qPart := range queryParts {
//Be resilient against bad inputs.
//We obviously can't generate label filters for these cases.
if qPart.labelName == "" || len(qPart.values) == 0 {
continue
if qPart.labelName == "" {
return nil, nil, NewLabelNotSpecifiedError()
}
targetValue := qPart.values[0]
matcher := prom.LabelEq
if len(qPart.values) > 1 {
targetValue = strings.Join(qPart.values, "|")
matcher = prom.LabelMatches
if !n.operatorIsSupported(qPart.operator) {
return nil, nil, NewOperatorNotSupportedByPrometheusError()
}
matcher, err := n.selectMatcher(qPart.operator, qPart.values)
if err != nil {
return nil, nil, err
}
targetValue, err := n.selectTargetValue(qPart.operator, qPart.values)
if err != nil {
return nil, nil, err
}
expression := matcher(qPart.labelName, targetValue)
@ -94,5 +114,92 @@ func (n *queryBuilder) processQueryParts(queryParts []queryPart) ([]string, map[
valuesByName[qPart.labelName] = qPart.values
}
return exprs, valuesByName
return exprs, valuesByName, nil
}
func (n *queryBuilder) selectMatcher(operator selection.Operator, values []string) (func(string, string) string, error) {
numValues := len(values)
if numValues == 0 {
switch operator {
case selection.Exists:
return prom.LabelMatches, nil
case selection.DoesNotExist:
return prom.LabelNotMatches, nil
case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn:
return nil, NewOperatorRequiresValuesError()
}
} else if numValues == 1 {
switch operator {
case selection.Equals, selection.DoubleEquals:
return prom.LabelEq, nil
case selection.NotEquals:
return prom.LabelNeq, nil
case selection.In, selection.Exists:
return prom.LabelMatches, nil
case selection.DoesNotExist, selection.NotIn:
return prom.LabelNotMatches, nil
}
} else {
//Since labels can only have one value, providing multiple
//values results in a regex match, even if that's not what the user
//asked for.
switch operator {
case selection.Equals, selection.DoubleEquals, selection.In, selection.Exists:
return prom.LabelMatches, nil
case selection.NotEquals, selection.DoesNotExist, selection.NotIn:
return prom.LabelNotMatches, nil
}
}
return nil, errors.New("operator not supported by query builder")
}
func (n *queryBuilder) selectTargetValue(operator selection.Operator, values []string) (string, error) {
numValues := len(values)
if numValues == 0 {
switch operator {
case selection.Exists, selection.DoesNotExist:
//Regex for any non-empty string.
//When the operator is LabelNotMatches this will select series without the label
//or with the label but a value of "".
//When the operator is LabelMatches this will select series with the label
//whose value is NOT "".
return ".+", nil
case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn:
return "", NewOperatorRequiresValuesError()
}
} else if numValues == 1 {
switch operator {
case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn:
//Pass the value through as-is.
//It's somewhat strange to do this for both the regex and equality
//operators, but if we do it this way it gives the user a little more control.
//They might choose to send an "IN" request and give a list of static values
//or they could send a single value that's a regex, giving them a passthrough
//for their label selector.
return values[0], nil
case selection.Exists, selection.DoesNotExist:
return "", errors.New("operator does not support values")
}
} else {
switch operator {
case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.In, selection.NotIn:
//Pass the value through as-is.
//It's somewhat strange to do this for both the regex and equality
//operators, but if we do it this way it gives the user a little more control.
//They might choose to send an "IN" request and give a list of static values
//or they could send a single value that's a regex, giving them a passthrough
//for their label selector.
return strings.Join(values, "|"), nil
case selection.Exists, selection.DoesNotExist:
return "", NewOperatorDoesNotSupportValuesError()
}
}
return "", errors.New("operator not supported by query builder")
}
func (n *queryBuilder) operatorIsSupported(operator selection.Operator) bool {
return operator != selection.GreaterThan && operator != selection.LessThan
}

View file

@ -3,13 +3,15 @@ package provider
import (
"testing"
"k8s.io/apimachinery/pkg/selection"
"github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/stretchr/testify/require"
)
func TestBadQueryPartsDontError(t *testing.T) {
func TestBadQueryPartsDontBuildQueries(t *testing.T) {
builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])")
selector, err := builder.BuildSelector("my_series", "", []string{}, []queryPart{
_, err := builder.BuildSelector("my_series", "", []string{}, []queryPart{
queryPart{
labelName: "",
values: nil,
@ -20,49 +22,305 @@ func TestBadQueryPartsDontError(t *testing.T) {
},
})
expectation := client.Selector("rate(my_series{}[2m])")
require.NoError(t, err)
require.Equal(t, selector, expectation)
require.Error(t, err)
}
func runQueryBuilderTest(t *testing.T, queryParts []queryPart, expectation string) {
builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])")
selector, err := builder.BuildSelector("my_series", "", []string{}, queryParts)
expectError := expectation == ""
if expectError {
require.Error(t, err)
} else {
selectorExpectation := client.Selector(expectation)
require.NoError(t, err)
require.Equal(t, selector, selectorExpectation)
}
}
func TestSimpleQuery(t *testing.T) {
builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])")
// builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)")
selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{})
expectation := client.Selector("rate(my_series{}[2m])")
require.Equal(t, selector, expectation)
runQueryBuilderTest(t, []queryPart{}, "")
}
func TestSimpleQueryWithOneLabelValue(t *testing.T) {
builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])")
//Equals
func TestEqualsQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.Equals,
},
}, "")
}
// builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)")
selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{
func TestEqualsQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.Equals,
},
})
expectation := client.Selector("rate(my_series{target_label=\"one\"}[2m])")
require.Equal(t, selector, expectation)
}, "rate(my_series{target_label=\"one\"}[2m])")
}
func TestSimpleQueryWithMultipleLabelValues(t *testing.T) {
builder, _ := NewQueryBuilder("rate(<<.Series>>{<<.LabelMatchers>>}[2m])")
// builder, _ := NewQueryBuilder("sum(rate(<<.Series>>{<<.LabelMatchers>>,static_label!=\"static_value\"}[2m])) by (<<.GroupBy>>)")
selector, _ := builder.BuildSelector("my_series", "", []string{}, []queryPart{
func TestEqualsQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.Equals,
},
})
}, "rate(my_series{target_label=~\"one|two\"}[2m])")
}
expectation := client.Selector("rate(my_series{target_label=~\"one|two\"}[2m])")
require.Equal(t, selector, expectation)
//Double Equals
func TestDoubleEqualsQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.DoubleEquals,
},
}, "")
}
func TestDoubleEqualsQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.DoubleEquals,
},
}, "rate(my_series{target_label=\"one\"}[2m])")
}
func TestDoubleEqualsQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.DoubleEquals,
},
}, "rate(my_series{target_label=~\"one|two\"}[2m])")
}
//Not Equals
func TestNotEqualsQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.NotEquals,
},
}, "")
}
func TestNotEqualsQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.NotEquals,
},
}, "rate(my_series{target_label!=\"one\"}[2m])")
}
func TestNotEqualsQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.NotEquals,
},
}, "rate(my_series{target_label!~\"one|two\"}[2m])")
}
//In
func TestInQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.In,
},
}, "")
}
func TestInQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.In,
},
}, "rate(my_series{target_label=~\"one\"}[2m])")
}
func TestInQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.In,
},
}, "rate(my_series{target_label=~\"one|two\"}[2m])")
}
//NotIn
func TestNotInQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.NotIn,
},
}, "")
}
func TestNotInQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.NotIn,
},
}, "rate(my_series{target_label!~\"one\"}[2m])")
}
func TestNotInQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.NotIn,
},
}, "rate(my_series{target_label!~\"one|two\"}[2m])")
}
//Exists
func TestExistsQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.Exists,
},
}, "rate(my_series{target_label=~\".+\"}[2m])")
}
func TestExistsQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.Exists,
},
}, "")
}
func TestExistsQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.Exists,
},
}, "")
}
//DoesNotExist
func TestDoesNotExistsQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.DoesNotExist,
},
}, "rate(my_series{target_label!~\".+\"}[2m])")
}
func TestDoesNotExistsQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.DoesNotExist,
},
}, "")
}
func TestDoesNotExistsQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.DoesNotExist,
},
}, "")
}
//GreaterThan
func TestGreaterThanQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.GreaterThan,
},
}, "")
}
func TestGreaterThanQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.GreaterThan,
},
}, "")
}
func TestGreaterThanQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.GreaterThan,
},
}, "")
}
//LessThan
func TestLessThanQueryWithNoLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{},
operator: selection.LessThan,
},
}, "")
}
func TestLessThanQueryWithOneLabelValue(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one"},
operator: selection.LessThan,
},
}, "")
}
func TestLessThanQueryWithMultipleLabelValues(t *testing.T) {
runQueryBuilderTest(t, []queryPart{
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.LessThan,
},
}, "")
}
func TestQueryWithGroupBy(t *testing.T) {
@ -72,6 +330,7 @@ func TestQueryWithGroupBy(t *testing.T) {
queryPart{
labelName: "target_label",
values: []string{"one", "two"},
operator: selection.In,
},
})
@ -79,4 +338,4 @@ func TestQueryWithGroupBy(t *testing.T) {
require.Equal(t, selector, expectation)
}
//TODO: AC - Ensure that the LabelValuesByName and GroupBySlice placeholders function correctly.
// TODO: Ensure that the LabelValuesByName and GroupBySlice placeholders function correctly.

View file

@ -17,9 +17,9 @@ import (
pmodel "github.com/prometheus/common/model"
)
//ResourceConverter is a type for extracting associated Kubernetes GroupResource objects from
//Prometheus series and generating appropriate labels to target specific Kubernetes GroupResource
//objects.
// ResourceConverter is a type for extracting associated Kubernetes GroupResource objects from
// Prometheus series and generating appropriate labels to target specific Kubernetes GroupResource
// objects.
type ResourceConverter interface {
// ResourcesForSeries returns the group-resources associated with the given series,
// as well as whether or not the given series has the "namespace" resource).
@ -37,7 +37,7 @@ type resourceConverter struct {
labelTemplate *template.Template
}
//NewResourceConverter creates a ResourceConverter that will use the provided parameters to map data between Prometheus and Kubernetes.
// NewResourceConverter creates a ResourceConverter that will use the provided parameters to map data between Prometheus and Kubernetes.
func NewResourceConverter(resourceTemplate string, overrides map[string]config.GroupResource, mapper apimeta.RESTMapper) (ResourceConverter, error) {
converter := &resourceConverter{
labelToResource: make(map[pmodel.LabelName]schema.GroupResource),

View file

@ -0,0 +1,295 @@
package provider
import (
"fmt"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
var nsGroupResource = schema.GroupResource{Resource: "namespaces"}
// SeriesConverter knows how to convert Prometheus series names and label names to
// metrics API resources, and vice-versa. SeriesConverters should be safe to access
// concurrently. Returned group-resources are "normalized" as per the
// MetricInfo#Normalized method. Group-resources passed as arguments must
// themselves be normalized.
type SeriesConverter interface {
// Selector produces the appropriate Prometheus series selector to match all
// series handlable by this converter.
Selector() prom.Selector
// FilterSeries checks to see which of the given series match any additional
// constrains beyond the series query. It's assumed that the series given
// already matche the series query.
// FilterSeries(series []prom.Series) []prom.Series
SeriesFilterer() SeriesFilterer
ResourceConverter() ResourceConverter
// MetricNameForSeries returns the name (as presented in the API) for a given series.
// MetricNameForSeries(series prom.Series) (string, error)
// QueryForSeries returns the query for a given series (not API metric name), with
// the given namespace name (if relevant), resource, and resource names.
QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error)
QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error)
IdentifySeries(series prom.Series) (seriesIdentity, error)
MetricType() config.MetricType
ExternalMetricNamespaceLabelName() string
}
type seriesIdentity struct {
resources []schema.GroupResource
namespaced bool
name string
}
func (c *seriesConverter) Selector() prom.Selector {
return c.seriesQuery
}
type seriesConverter struct {
seriesQuery prom.Selector
resourceConverter ResourceConverter
queryBuilder QueryBuilder
seriesFilterer SeriesFilterer
metricNamer MetricNamer
mapper apimeta.RESTMapper
metricType config.MetricType
externalMetricNamespaceLabel string
}
// queryTemplateArgs are the arguments for the metrics query template.
type queryTemplateArgs struct {
Series string
LabelMatchers string
LabelValuesByName map[string][]string
GroupBy string
GroupBySlice []string
}
func (c *seriesConverter) MetricType() config.MetricType {
return c.metricType
}
func (c *seriesConverter) ExternalMetricNamespaceLabelName() string {
return c.externalMetricNamespaceLabel
}
func (c *seriesConverter) IdentifySeries(series prom.Series) (seriesIdentity, error) {
// TODO: warn if it doesn't match any resources
resources, namespaced := c.resourceConverter.ResourcesForSeries(series)
name, err := c.metricNamer.GetMetricNameForSeries(series)
result := seriesIdentity{
resources: resources,
namespaced: namespaced,
name: name,
}
return result, err
}
func (c *seriesConverter) SeriesFilterer() SeriesFilterer {
return c.seriesFilterer
}
func (c *seriesConverter) ResourceConverter() ResourceConverter {
return c.resourceConverter
}
func (c *seriesConverter) createQueryPartsFromSelector(metricSelector labels.Selector) []queryPart {
requirements, _ := metricSelector.Requirements()
selectors := []queryPart{}
for i := 0; i < len(requirements); i++ {
selector := c.convertRequirement(requirements[i])
selectors = append(selectors, selector)
}
return selectors
}
func (c *seriesConverter) convertRequirement(requirement labels.Requirement) queryPart {
labelName := requirement.Key()
values := requirement.Values().List()
return queryPart{
labelName: labelName,
values: values,
operator: requirement.Operator(),
}
}
type queryPart struct {
labelName string
values []string
operator selection.Operator
}
func (c *seriesConverter) buildNamespaceQueryPartForSeries(namespace string) (queryPart, error) {
result := queryPart{}
// If we've been given a namespace, then we need to set up
// the label requirements to target that namespace.
if namespace != "" {
namespaceLbl, err := c.resourceConverter.LabelForResource(nsGroupResource)
if err != nil {
return result, err
}
values := []string{namespace}
result = queryPart{
values: values,
labelName: string(namespaceLbl),
operator: selection.Equals,
}
}
return result, nil
}
func (c *seriesConverter) buildResourceQueryPartForSeries(resource schema.GroupResource, names ...string) (queryPart, error) {
result := queryPart{}
// If we've been given a resource, then we need to set up
// the label requirements to target that resource.
resourceLbl, err := c.resourceConverter.LabelForResource(resource)
if err != nil {
return result, err
}
result = queryPart{
labelName: string(resourceLbl),
values: names,
operator: selection.Equals,
}
return result, nil
}
func (c *seriesConverter) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) {
queryParts := []queryPart{}
// Build up the namespace part of the query.
namespaceQueryPart, err := c.buildNamespaceQueryPartForSeries(namespace)
if err != nil {
return "", err
}
if namespaceQueryPart.labelName != "" {
queryParts = append(queryParts, namespaceQueryPart)
}
// Build up the resource part of the query.
resourceQueryPart, err := c.buildResourceQueryPartForSeries(resource, names...)
if err != nil {
return "", err
}
if resourceQueryPart.labelName != "" {
queryParts = append(queryParts, resourceQueryPart)
}
return c.queryBuilder.BuildSelector(series, resourceQueryPart.labelName, []string{resourceQueryPart.labelName}, queryParts)
}
// ConvertersFromConfig produces a MetricNamer for each rule in the given config.
func ConvertersFromConfig(cfg *config.MetricsDiscoveryConfig, mapper apimeta.RESTMapper) ([]SeriesConverter, error) {
converters := make([]SeriesConverter, len(cfg.Rules))
for i, rule := range cfg.Rules {
var err error
resourceConverter, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper)
if err != nil {
return nil, fmt.Errorf("unable to create ResourceConverter associated with series query %q: %v", rule.SeriesQuery, err)
}
queryBuilder, err := NewQueryBuilder(rule.MetricsQuery)
if err != nil {
return nil, fmt.Errorf("unable to create a QueryBuilder associated with series query %q: %v", rule.SeriesQuery, err)
}
seriesFilterer, err := NewSeriesFilterer(rule.SeriesFilters)
if err != nil {
return nil, fmt.Errorf("unable to create a SeriesFilter associated with series query %q: %v", rule.SeriesQuery, err)
}
if rule.Name.Matches != "" {
err := seriesFilterer.AddRequirement(config.RegexFilter{Is: rule.Name.Matches})
if err != nil {
return nil, fmt.Errorf("unable to apply the series name filter from name rules associated with series query %q: %v", rule.SeriesQuery, err)
}
}
metricNamer, err := NewMetricNamer(rule.Name)
if err != nil {
return nil, fmt.Errorf("unable to create a MetricNamer associated with series query %q: %v", rule.SeriesQuery, err)
}
namespaceLabel := ""
if rule.MetricType == config.External {
namespaceLabel = rule.ExternalMetricNamespaceLabelName
}
metricType := rule.MetricType
if metricType == config.MetricType("") {
metricType = config.Custom
}
converter := &seriesConverter{
seriesQuery: prom.Selector(rule.SeriesQuery),
mapper: mapper,
resourceConverter: resourceConverter,
queryBuilder: queryBuilder,
seriesFilterer: seriesFilterer,
metricNamer: metricNamer,
metricType: metricType,
externalMetricNamespaceLabel: namespaceLabel,
}
converters[i] = converter
}
return converters, nil
}
func (c *seriesConverter) buildNamespaceQueryPartForExternalSeries(namespace string) (queryPart, error) {
return queryPart{
labelName: c.externalMetricNamespaceLabel,
values: []string{namespace},
operator: selection.Equals,
}, nil
}
func (c *seriesConverter) QueryForExternalSeries(namespace string, series string, metricSelector labels.Selector) (prom.Selector, error) {
queryParts := []queryPart{}
if namespace != "" {
// Build up the namespace part of the query.
namespaceQueryPart, err := c.buildNamespaceQueryPartForExternalSeries(namespace)
if err != nil {
return "", err
}
queryParts = append(queryParts, namespaceQueryPart)
}
// Build up the query parts from the selector.
queryParts = append(queryParts, c.createQueryPartsFromSelector(metricSelector)...)
selector, err := c.queryBuilder.BuildSelector(series, "", []string{}, queryParts)
if err != nil {
return "", err
}
return selector, nil
}

View file

@ -7,8 +7,8 @@ import (
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
//SeriesFilterer provides functions for filtering collections of Prometheus series
//to only those that meet certain requirements.
// SeriesFilterer provides functions for filtering collections of Prometheus series
// to only those that meet certain requirements.
type SeriesFilterer interface {
FilterSeries(series []prom.Series) []prom.Series
AddRequirement(filter config.RegexFilter) error
@ -18,8 +18,8 @@ type seriesFilterer struct {
seriesMatchers []*reMatcher
}
//NewSeriesFilterer creates a SeriesFilterer that will remove any series that do not
//meet the requirements of the provided RegexFilter(s).
// NewSeriesFilterer creates a SeriesFilterer that will remove any series that do not
// meet the requirements of the provided RegexFilter(s).
func NewSeriesFilterer(filters []config.RegexFilter) (SeriesFilterer, error) {
seriesMatchers := make([]*reMatcher, len(filters))
for i, filterRaw := range filters {

View file

@ -60,12 +60,12 @@ func TestAddRequirementAppliesFilter(t *testing.T) {
filterer, err := NewSeriesFilterer(filters)
require.NoError(t, err)
//Test it once with the default filters.
// Test it once with the default filters.
result := filterer.FilterSeries(series)
expectedSeries := []string{"series_1", "series_2", "series_3"}
VerifyMatches(t, result, expectedSeries)
//Add a new filter and test again.
// Add a new filter and test again.
filterer.AddRequirement(config.RegexFilter{
Is: "series_[2-3]",
})

View file

@ -56,8 +56,8 @@ type seriesInfo struct {
// seriesName is the name of the corresponding Prometheus series
seriesName string
// namer is the MetricNamer used to name this series
namer MetricNamer
// converter is the SeriesConverter used to name this series
converter SeriesConverter
}
// overridableSeriesRegistry is a basic SeriesRegistry
@ -75,52 +75,52 @@ type basicSeriesRegistry struct {
metricLister MetricListerWithNotification
}
//NewBasicSeriesRegistry creates a SeriesRegistry driven by the data from the provided MetricLister.
// NewBasicSeriesRegistry creates a SeriesRegistry driven by the data from the provided MetricLister.
func NewBasicSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) SeriesRegistry {
var registry = basicSeriesRegistry{
mapper: mapper,
metricLister: lister,
}
lister.AddNotificationReceiver(registry.onNewDataAvailable)
lister.AddNotificationReceiver(registry.filterAndStoreMetrics)
return &registry
}
func (r *basicSeriesRegistry) filterMetrics(result metricUpdateResult) metricUpdateResult {
namers := make([]MetricNamer, 0)
func (r *basicSeriesRegistry) filterMetrics(result MetricUpdateResult) MetricUpdateResult {
converters := make([]SeriesConverter, 0)
series := make([][]prom.Series, 0)
targetType := config.Custom
for i, namer := range result.namers {
if namer.MetricType() == targetType {
namers = append(namers, namer)
for i, converter := range result.converters {
if converter.MetricType() == targetType {
converters = append(converters, converter)
series = append(series, result.series[i])
}
}
return metricUpdateResult{
namers: namers,
series: series,
return MetricUpdateResult{
converters: converters,
series: series,
}
}
func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) {
func (r *basicSeriesRegistry) filterAndStoreMetrics(result MetricUpdateResult) {
result = r.filterMetrics(result)
newSeriesSlices := result.series
namers := result.namers
converters := result.converters
// if len(newSeriesSlices) != len(namers) {
// return fmt.Errorf("need one set of series per namer")
// if len(newSeriesSlices) != len(converters) {
// return fmt.Errorf("need one set of series per converter")
// }
newInfo := make(map[provider.CustomMetricInfo]seriesInfo)
for i, newSeries := range newSeriesSlices {
namer := namers[i]
converter := converters[i]
for _, series := range newSeries {
identity, err := namer.IdentifySeries(series)
identity, err := converter.IdentifySeries(series)
if err != nil {
glog.Errorf("unable to name series %q, skipping: %v", series.String(), err)
@ -147,7 +147,7 @@ func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) {
// we don't need to re-normalize, because the metric namer should have already normalized for us
newInfo[info] = seriesInfo{
seriesName: series.Name,
namer: namer,
converter: converter,
}
}
}
@ -194,7 +194,7 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInf
return "", false
}
query, err := info.namer.QueryForSeries(info.seriesName, metricInfo.GroupResource, namespace, resourceNames...)
query, err := info.converter.QueryForSeries(info.seriesName, metricInfo.GroupResource, namespace, resourceNames...)
if err != nil {
glog.Errorf("unable to construct query for metric %s: %v", metricInfo.String(), err)
return "", false
@ -218,7 +218,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.CustomMetri
return nil, false
}
resourceLbl, err := info.namer.ResourceConverter().LabelForResource(metricInfo.GroupResource)
resourceLbl, err := info.converter.ResourceConverter().LabelForResource(metricInfo.GroupResource)
if err != nil {
glog.Errorf("unable to construct resource label for metric %s: %v", metricInfo.String(), err)
return nil, false

View file

@ -51,11 +51,11 @@ func restMapper() apimeta.RESTMapper {
return mapper
}
func setupMetricNamer(t testing.TB) []MetricNamer {
func setupMetricNamer(t testing.TB) []SeriesConverter {
cfg := config.DefaultConfig(1*time.Minute, "kube_")
namers, err := NamersFromConfig(cfg, restMapper())
converters, err := ConvertersFromConfig(cfg, restMapper())
require.NoError(t, err)
return namers
return converters
}
var seriesRegistryTestSeries = [][]prom.Series{
@ -117,60 +117,21 @@ var seriesRegistryTestSeries = [][]prom.Series{
},
}
type myType struct {
a int
b string
m map[string]int
}
type mapWrapper struct {
item map[string]int
}
func (o *myType) Mutate(newMap mapWrapper) {
o.a = 2
o.b = "two"
o.m = newMap.item
}
func TestWeirdStuff(t *testing.T) {
o := myType{
a: 1,
b: "one",
m: map[string]int{
"one": 1,
},
}
oldMap := o.m
newMap := map[string]int{
"two": 2,
}
newWrapper := mapWrapper{
item: newMap,
}
oldWrapper := mapWrapper{
item: oldMap,
}
o.Mutate(newWrapper)
o.Mutate(oldWrapper)
}
func TestSeriesRegistry(t *testing.T) {
assert := assert.New(t)
namers := setupMetricNamer(t)
converters := setupMetricNamer(t)
registry := &basicSeriesRegistry{
mapper: restMapper(),
}
updateResult := metricUpdateResult{
series: seriesRegistryTestSeries,
namers: namers,
updateResult := MetricUpdateResult{
series: seriesRegistryTestSeries,
converters: converters,
}
// set up the registry
registry.onNewDataAvailable(updateResult)
registry.filterAndStoreMetrics(updateResult)
// make sure each metric got registered and can form queries
testCases := []struct {
@ -309,7 +270,7 @@ func TestSeriesRegistry(t *testing.T) {
}
func BenchmarkSetSeries(b *testing.B) {
namers := setupMetricNamer(b)
converters := setupMetricNamer(b)
registry := &basicSeriesRegistry{
mapper: restMapper(),
}
@ -328,11 +289,11 @@ func BenchmarkSetSeries(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
updateResult := metricUpdateResult{
series: newSeriesSlices,
namers: namers,
updateResult := MetricUpdateResult{
series: newSeriesSlices,
converters: converters,
}
registry.onNewDataAvailable(updateResult)
registry.filterAndStoreMetrics(updateResult)
}
}