Fixing some refactoring bugs, first half-decent external metrics attempt.

Fixed:
* `basicMetricLister` wasn't applying the appropriate start time because I had forgotten to set the `lookback`.

There are still a number of issues:
* The `externalPrometheusProvider` is not hooked up to the web application yet, so it doesn't serve requests.
* The namespace and label approach used in `external_info_map.go` is horrifically incorrect. It doesn't appropriately store multiple series with the same name but different labels.
* The configuration is still not updated to appropriately handle external metrics, it's sort of half-piggy-backing on the pre-existing work.
This commit is contained in:
Tony Compton 2018-07-20 12:35:49 -04:00
parent 056cb7f7f2
commit 9641e70005
14 changed files with 637 additions and 77 deletions

View file

@ -54,7 +54,8 @@ type MetricListerWithNotification interface {
//Because it periodically pulls metrics, it needs to be Runnable. //Because it periodically pulls metrics, it needs to be Runnable.
Runnable Runnable
//It provides notifications when it has new data to supply. //It provides notifications when it has new data to supply.
SetNotificationReceiver(func(metricUpdateResult)) AddNotificationReceiver(func(metricUpdateResult))
UpdateNow()
} }
type basicMetricLister struct { type basicMetricLister struct {
@ -67,6 +68,7 @@ func NewBasicMetricLister(promClient prom.Client, namers []MetricNamer, lookback
lister := basicMetricLister{ lister := basicMetricLister{
promClient: promClient, promClient: promClient,
namers: namers, namers: namers,
lookback: lookback,
} }
return &lister return &lister

View file

@ -0,0 +1,108 @@
package provider
import (
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"k8s.io/apimachinery/pkg/labels"
)
type ExportedMetric struct {
MetricName string
Labels labels.Set
Namespace string
}
type ExternalInfoMap interface {
TrackMetric(metricName string, generatedBy MetricNamer) ExternalMetricData
ExportMetrics() []ExportedMetric
FindMetric(metricName string) (data ExternalMetricData, found bool)
}
type ExternalMetricData interface {
MetricName() string
WithSeries(labels labels.Set)
WithNamespacedSeries(namespace string, labels labels.Set)
ExportMetrics() []ExportedMetric
GenerateQuery(selector labels.Selector) (prom.Selector, error)
}
type externalInfoMap struct {
metrics map[string]ExternalMetricData
}
type externalMetricData struct {
metricName string
namespacedData map[string]labels.Set
generatedBy MetricNamer
}
func NewExternalMetricData(metricName string, generatedBy MetricNamer) ExternalMetricData {
return &externalMetricData{
metricName: metricName,
generatedBy: generatedBy,
namespacedData: map[string]labels.Set{},
}
}
func NewExternalInfoMap() ExternalInfoMap {
return &externalInfoMap{
metrics: map[string]ExternalMetricData{},
}
}
func (i *externalInfoMap) ExportMetrics() []ExportedMetric {
results := make([]ExportedMetric, 0)
for _, info := range i.metrics {
exported := info.ExportMetrics()
results = append(results, exported...)
}
return results
}
func (i *externalInfoMap) FindMetric(metricName string) (data ExternalMetricData, found bool) {
data, found = i.metrics[metricName]
return data, found
}
func (i *externalInfoMap) TrackMetric(metricName string, generatedBy MetricNamer) ExternalMetricData {
data, found := i.metrics[metricName]
if !found {
data = NewExternalMetricData(metricName, generatedBy)
i.metrics[metricName] = data
}
return data
}
func (d *externalMetricData) MetricName() string {
return d.metricName
}
func (d *externalMetricData) GenerateQuery(selector labels.Selector) (prom.Selector, error) {
return d.generatedBy.QueryForExternalSeries(d.metricName, selector)
}
func (d *externalMetricData) ExportMetrics() []ExportedMetric {
results := make([]ExportedMetric, 0)
for namespace, labels := range d.namespacedData {
results = append(results, ExportedMetric{
Labels: labels,
MetricName: d.metricName,
Namespace: namespace,
})
}
return results
}
func (d *externalMetricData) WithSeries(labels labels.Set) {
d.WithNamespacedSeries("", labels)
}
func (d *externalMetricData) WithNamespacedSeries(namespace string, labels labels.Set) {
data, found := d.namespacedData[namespace]
if !found {
data = labels
d.namespacedData[namespace] = data
}
}

View file

@ -1,81 +1,74 @@
package provider package provider
// import ( import (
// "context" "context"
// "time"
// pmodel "github.com/prometheus/common/model" pmodel "github.com/prometheus/common/model"
// "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
// apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels"
// "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics"
// "k8s.io/client-go/dynamic"
// "k8s.io/metrics/pkg/apis/external_metrics"
// prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
// )
// //TODO: Make sure everything has the proper licensing disclosure at the top. conv "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider/metric-converter"
// //TODO: I'd like to move these files into another directory, but the compiler was giving me )
// //some static around unexported types. I'm going to leave things as-is for now, but it
// //might be worthwhile to, once the shared components are discovered, move some things around.
// //TODO: Some of these members may not be necessary. //TODO: Make sure everything has the proper licensing disclosure at the top.
// //Some of them are definitely duplicated between the //TODO: I'd like to move these files into another directory, but the compiler was giving me
// //external and custom providers. They should probably share //some static around unexported types. I'm going to leave things as-is for now, but it
// //the same instances of these objects (especially the SeriesRegistry) //might be worthwhile to, once the shared components are discovered, move some things around.
// //to cut down on unnecessary chatter/bookkeeping.
// type externalPrometheusProvider struct {
// mapper apimeta.RESTMapper
// kubeClient dynamic.Interface
// promClient prom.Client
// queryBuilder ExternalMetricQueryBuilder
// metricConverter conv.MetricConverter
// seriesRegistry SeriesRegistry //TODO: Some of these members may not be necessary.
// } //Some of them are definitely duplicated between the
//external and custom providers. They should probably share
//the same instances of these objects (especially the SeriesRegistry)
//to cut down on unnecessary chatter/bookkeeping.
type externalPrometheusProvider struct {
promClient prom.Client
metricConverter conv.MetricConverter
// //TODO: It probably makes more sense to, once this is functional and complete, roll the seriesRegistry ExternalSeriesRegistry
// //prometheusProvider and externalPrometheusProvider up into a single type }
// //that implements both interfaces or provide a thin wrapper that composes them.
// //Just glancing at start.go looks like it would be much more straightforward
// //to do one of those two things instead of trying to run the two providers
// //independently.
// func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, metricConverter conv.MetricConverter, seriesRegistry SeriesRegistry) (provider.ExternalMetricsProvider, Runnable) { //TODO: It probably makes more sense to, once this is functional and complete, roll the
//prometheusProvider and externalPrometheusProvider up into a single type
//that implements both interfaces or provide a thin wrapper that composes them.
//Just glancing at start.go looks like it would be much more straightforward
//to do one of those two things instead of trying to run the two providers
//independently.
// return &externalPrometheusProvider{ func NewExternalPrometheusProvider(seriesRegistry ExternalSeriesRegistry, promClient prom.Client, converter conv.MetricConverter) provider.ExternalMetricsProvider {
// mapper: mapper, return &externalPrometheusProvider{
// kubeClient: kubeClient, promClient: promClient,
// promClient: promClient, seriesRegistry: seriesRegistry,
// metricConverter: metricConverter, metricConverter: converter,
// seriesRegistry: seriesRegistry, }
// }, lister }
// }
// func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) {
// selector, found := p.seriesRegistry.QueryForExternalMetric(metricInfo, metricSelector) selector, found := p.seriesRegistry.QueryForMetric(metricName, metricSelector)
// if !found { if !found {
// return &external_metrics.ExternalMetricValueList{ return &external_metrics.ExternalMetricValueList{
// Items: []external_metrics.ExternalMetricValue{}, Items: []external_metrics.ExternalMetricValue{},
// }, nil }, nil
// } }
// // query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata) // query := p.queryBuilder.BuildPrometheusQuery(namespace, metricName, metricSelector, queryMetadata)
// //TODO: I don't yet know what a context is, but apparently I should use a real one. //TODO: I don't yet know what a context is, but apparently I should use a real one.
// queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector) queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), selector)
// if err != nil { if err != nil {
// //TODO: Is this how folks normally deal w/ errors? Just propagate them upwards? //TODO: Is this how folks normally deal w/ errors? Just propagate them upwards?
// //I should go look at what the customProvider does. //I should go look at what the customProvider does.
// return nil, err return nil, err
// } }
// return p.metricConverter.Convert(queryMetadata, queryResults) return p.metricConverter.Convert(queryResults)
// } }
// func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
// return p.seriesRegistry.ListAllExternalMetrics() return p.seriesRegistry.ListAllMetrics()
// } }

View file

@ -0,0 +1,165 @@
package provider
import (
"sync"
"github.com/prometheus/common/model"
"github.com/golang/glog"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
)
type ExternalSeriesRegistry interface {
// ListAllMetrics lists all metrics known to this registry
ListAllMetrics() []provider.ExternalMetricInfo
QueryForMetric(metricName string, metricSelector labels.Selector) (query prom.Selector, found bool)
}
// overridableSeriesRegistry is a basic SeriesRegistry
type externalSeriesRegistry struct {
mu sync.RWMutex
externalInfo map[string]seriesInfo
// metrics is the list of all known metrics
metrics []provider.ExternalMetricInfo
mapper apimeta.RESTMapper
metricLister MetricListerWithNotification
tonyExternalInfo ExternalInfoMap
}
func NewExternalSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) ExternalSeriesRegistry {
var registry = externalSeriesRegistry{
mapper: mapper,
metricLister: lister,
tonyExternalInfo: NewExternalInfoMap(),
}
lister.AddNotificationReceiver(registry.onNewDataAvailable)
return &registry
}
func (r *externalSeriesRegistry) filterMetrics(result metricUpdateResult) metricUpdateResult {
namers := make([]MetricNamer, 0)
series := make([][]prom.Series, 0)
targetType := config.MetricType("External")
for i, namer := range result.namers {
if namer.MetricType() == targetType {
namers = append(namers, namer)
series = append(series, result.series[i])
}
}
return metricUpdateResult{
namers: namers,
series: series,
}
}
func (r *externalSeriesRegistry) convertLabels(labels model.LabelSet) labels.Set {
set := map[string]string{}
for key, value := range labels {
set[string(key)] = string(value)
}
return set
}
func (r *externalSeriesRegistry) onNewDataAvailable(result metricUpdateResult) {
result = r.filterMetrics(result)
newSeriesSlices := result.series
namers := result.namers
// if len(newSeriesSlices) != len(namers) {
// return fmt.Errorf("need one set of series per namer")
// }
updatedCache := NewExternalInfoMap()
for i, newSeries := range newSeriesSlices {
namer := namers[i]
for _, series := range newSeries {
identity, err := namer.IdentifySeries(series)
if err != nil {
glog.Errorf("unable to name series %q, skipping: %v", series.String(), err)
continue
}
// resources := identity.resources
// namespaced := identity.namespaced
name := identity.name
labels := r.convertLabels(series.Labels)
//TODO: Figure out the namespace, if applicable
metricNs := ""
trackedMetric := updatedCache.TrackMetric(name, namer)
trackedMetric.WithNamespacedSeries(metricNs, labels)
}
}
// regenerate metrics
allMetrics := updatedCache.ExportMetrics()
convertedMetrics := r.convertMetrics(allMetrics)
r.mu.Lock()
defer r.mu.Unlock()
r.tonyExternalInfo = updatedCache
r.metrics = convertedMetrics
}
func (r *externalSeriesRegistry) convertMetrics(metrics []ExportedMetric) []provider.ExternalMetricInfo {
results := make([]provider.ExternalMetricInfo, len(metrics))
for i, info := range metrics {
results[i] = provider.ExternalMetricInfo{
Labels: info.Labels,
Metric: info.MetricName,
}
}
return results
}
func (r *externalSeriesRegistry) ListAllMetrics() []provider.ExternalMetricInfo {
r.mu.RLock()
defer r.mu.RUnlock()
return r.metrics
}
func (r *externalSeriesRegistry) QueryForMetric(metricName string, metricSelector labels.Selector) (query prom.Selector, found bool) {
r.mu.RLock()
defer r.mu.RUnlock()
metric, found := r.tonyExternalInfo.FindMetric(metricName)
if !found {
return "", false
}
query, err := metric.GenerateQuery(metricSelector)
// info, infoFound := r.info[metricInfo]
// if !infoFound {
// //TODO: Weird that it switches between types here.
// glog.V(10).Infof("metric %v not registered", metricInfo)
// return "", false
// }
// query, err := info.namer.QueryForExternalSeries(info.seriesName, metricSelector)
if err != nil {
//TODO: See what was being .String() and implement that for ExternalMetricInfo.
// errorVal := metricInfo.String()
errorVal := "something"
glog.Errorf("unable to construct query for metric %s: %v", errorVal, err)
return "", false
}
return query, true
}

View file

@ -0,0 +1,37 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type matrixConverter struct {
}
//NewMatrixConverter creates a MatrixConverter capable of converting
//matrix Prometheus query results into external metric types.
func NewMatrixConverter() MetricConverter {
return &matrixConverter{}
}
func (c *matrixConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValMatrix {
return nil, errors.New("matrixConverter can only convert scalar query results")
}
toConvert := queryResult.Matrix
if toConvert == nil {
return nil, errors.New("the provided input did not contain matrix query results")
}
return c.convert(toConvert)
}
func (c *matrixConverter) convert(result *model.Matrix) (*external_metrics.ExternalMetricValueList, error) {
//TODO: Implementation.
return nil, errors.New("converting Matrix results is not yet supported")
}

View file

@ -0,0 +1,52 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
//MetricConverter provides a unified interface for converting the results of
//Prometheus queries into external metric types.
type MetricConverter interface {
Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error)
}
type metricConverter struct {
scalarConverter MetricConverter
vectorConverter MetricConverter
matrixConverter MetricConverter
}
func DefaultMetricConverter() MetricConverter {
sampleConverter := NewSampleConverter()
return NewMetricConverter(NewScalarConverter(), NewVectorConverter(&sampleConverter), NewMatrixConverter())
}
//NewMetricConverter creates a MetricCoverter, capable of converting any of the three metric types
//returned by the Prometheus client into external metrics types.
func NewMetricConverter(scalar MetricConverter, vector MetricConverter, matrix MetricConverter) MetricConverter {
return &metricConverter{
scalarConverter: scalar,
vectorConverter: vector,
matrixConverter: matrix,
}
}
func (c *metricConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type == model.ValScalar {
return c.scalarConverter.Convert(queryResult)
}
if queryResult.Type == model.ValVector {
return c.vectorConverter.Convert(queryResult)
}
if queryResult.Type == model.ValMatrix {
return c.matrixConverter.Convert(queryResult)
}
return nil, errors.New("encountered an unexpected query result type")
}

View file

@ -0,0 +1,52 @@
package provider
import (
"github.com/prometheus/common/model"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type sampleConverter struct {
}
//SampleConverter is capable of translating Prometheus Sample objects
//into ExternamMetricValue objects.
type SampleConverter interface {
Convert(sample *model.Sample) (*external_metrics.ExternalMetricValue, error)
}
//NewSampleConverter creates a SampleConverter capable of translating Prometheus Sample objects
//into ExternamMetricValue objects.
func NewSampleConverter() SampleConverter {
return &sampleConverter{}
}
func (c *sampleConverter) Convert(sample *model.Sample) (*external_metrics.ExternalMetricValue, error) {
labels := c.convertLabels(sample.Metric)
singleMetric := external_metrics.ExternalMetricValue{
MetricName: string(sample.Metric[model.LabelName("__name__")]),
Timestamp: metav1.Time{
sample.Timestamp.Time(),
},
//TODO: I'm not so sure about this type/conversions.
//This can't possibly be the right way to convert this.
//Also, does K8S only deal win integer metrics?
Value: *resource.NewQuantity(int64(float64(sample.Value)), resource.DecimalSI),
MetricLabels: labels,
}
//TODO: Actual errors?
return &singleMetric, nil
}
func (c *sampleConverter) convertLabels(inLabels model.Metric) map[string]string {
numLabels := len(inLabels)
outLabels := make(map[string]string, numLabels)
for labelName, labelVal := range inLabels {
outLabels[string(labelName)] = string(labelVal)
}
return outLabels
}

View file

@ -0,0 +1,55 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type scalarConverter struct {
}
//NewScalarConverter creates a ScalarConverter capable of converting
//scalar Prometheus query results into external metric types.
func NewScalarConverter() MetricConverter {
return &scalarConverter{}
}
func (c *scalarConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValScalar {
return nil, errors.New("scalarConverter can only convert scalar query results")
}
toConvert := queryResult.Scalar
if toConvert == nil {
return nil, errors.New("the provided input did not contain scalar query results")
}
return c.convert(toConvert)
}
func (c *scalarConverter) convert(input *model.Scalar) (*external_metrics.ExternalMetricValueList, error) {
result := external_metrics.ExternalMetricValueList{
//Using prometheusProvider.metricsFor(...) as an example,
//it seems that I don't need to provide values for
//TypeMeta and ListMeta.
//TODO: Get some confirmation on this.
Items: []external_metrics.ExternalMetricValue{
{
Timestamp: metav1.Time{
input.Timestamp.Time(),
},
//TODO: I'm not so sure about this type/conversions.
//Is there a meaningful loss of precision here?
//Does K8S only deal win integer metrics?
Value: *resource.NewMilliQuantity(int64(input.Value*1000.0), resource.DecimalSI),
},
},
}
return &result, nil
}

View file

@ -0,0 +1,58 @@
package provider
import (
"errors"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/prometheus/common/model"
"k8s.io/metrics/pkg/apis/external_metrics"
)
type vectorConverter struct {
SampleConverter SampleConverter
}
//NewVectorConverter creates a VectorConverter capable of converting
//vector Prometheus query results into external metric types.
func NewVectorConverter(sampleConverter *SampleConverter) MetricConverter {
return &vectorConverter{
SampleConverter: *sampleConverter,
}
}
func (c *vectorConverter) Convert(queryResult prom.QueryResult) (*external_metrics.ExternalMetricValueList, error) {
if queryResult.Type != model.ValVector {
return nil, errors.New("vectorConverter can only convert scalar query results")
}
toConvert := *queryResult.Vector
if toConvert == nil {
return nil, errors.New("the provided input did not contain vector query results")
}
return c.convert(toConvert)
}
func (c *vectorConverter) convert(result model.Vector) (*external_metrics.ExternalMetricValueList, error) {
items := []external_metrics.ExternalMetricValue{}
metricValueList := external_metrics.ExternalMetricValueList{
Items: items,
}
numSamples := result.Len()
if numSamples == 0 {
return &metricValueList, nil
}
for _, val := range result {
//TODO: Care about potential errors here.
singleMetric, _ := c.SampleConverter.Convert(val)
items = append(items, *singleMetric)
}
metricValueList = external_metrics.ExternalMetricValueList{
Items: items,
}
return &metricValueList, nil
}

View file

@ -36,6 +36,7 @@ type MetricNamer interface {
QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error) QueryForSeries(series string, resource schema.GroupResource, namespace string, names ...string) (prom.Selector, error)
QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error) QueryForExternalSeries(series string, metricSelector labels.Selector) (prom.Selector, error)
IdentifySeries(series prom.Series) (seriesIdentity, error) IdentifySeries(series prom.Series) (seriesIdentity, error)
MetricType() config.MetricType
} }
type seriesIdentity struct { type seriesIdentity struct {
@ -69,6 +70,10 @@ type queryTemplateArgs struct {
GroupBySlice []string GroupBySlice []string
} }
func (n *metricNamer) MetricType() config.MetricType {
return n.metricType
}
func (n *metricNamer) IdentifySeries(series prom.Series) (seriesIdentity, error) { func (n *metricNamer) IdentifySeries(series prom.Series) (seriesIdentity, error) {
// TODO: warn if it doesn't match any resources // TODO: warn if it doesn't match any resources
resources, namespaced := n.resourceConverter.ResourcesForSeries(series) resources, namespaced := n.resourceConverter.ResourcesForSeries(series)

View file

@ -27,7 +27,7 @@ type periodicMetricLister struct {
realLister MetricLister realLister MetricLister
updateInterval time.Duration updateInterval time.Duration
mostRecentResult metricUpdateResult mostRecentResult metricUpdateResult
callback func(metricUpdateResult) callbacks []func(metricUpdateResult)
} }
//NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics //NewPeriodicMetricLister creates a MetricLister that periodically pulls the list of available metrics
@ -36,13 +36,14 @@ func NewPeriodicMetricLister(realLister MetricLister, updateInterval time.Durati
lister := periodicMetricLister{ lister := periodicMetricLister{
updateInterval: updateInterval, updateInterval: updateInterval,
realLister: realLister, realLister: realLister,
callbacks: make([]func(metricUpdateResult), 0),
} }
return &lister, &lister return &lister, &lister
} }
func (l *periodicMetricLister) SetNotificationReceiver(callback func(metricUpdateResult)) { func (l *periodicMetricLister) AddNotificationReceiver(callback func(metricUpdateResult)) {
l.callback = callback l.callbacks = append(l.callbacks, callback)
} }
func (l *periodicMetricLister) ListAllMetrics() (metricUpdateResult, error) { func (l *periodicMetricLister) ListAllMetrics() (metricUpdateResult, error) {
@ -70,13 +71,23 @@ func (l *periodicMetricLister) updateMetrics() error {
//Cache the result. //Cache the result.
l.mostRecentResult = result l.mostRecentResult = result
//Let our listener know we've got new data ready for them. //Let our listeners know we've got new data ready for them.
if l.callback != nil { l.notifyListeners()
l.callback(result)
}
return nil return nil
} }
func (l *periodicMetricLister) notifyListeners() {
for _, listener := range l.callbacks {
if listener != nil {
listener(l.mostRecentResult)
}
}
}
func (l *periodicMetricLister) UpdateNow() {
l.updateMetrics()
}
// func (l *periodicMetricLister) updateMetrics() (metricUpdateResult, error) { // func (l *periodicMetricLister) updateMetrics() (metricUpdateResult, error) {
// result := metricUpdateResult{ // result := metricUpdateResult{

View file

@ -36,7 +36,7 @@ func TestWhenNewMetricsAvailableCallbackIsInvoked(t *testing.T) {
callbackInvoked = true callbackInvoked = true
} }
periodicLister.SetNotificationReceiver(callback) periodicLister.AddNotificationReceiver(callback)
err := periodicLister.updateMetrics() err := periodicLister.updateMetrics()
require.NoError(t, err) require.NoError(t, err)
require.True(t, callbackInvoked) require.True(t, callbackInvoked)

View file

@ -95,14 +95,14 @@ func TestDetectsNonNamespaceResourcesFromOverrides(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []schema.GroupResource{ require.Equal(t, []schema.GroupResource{
schema.GroupResource{
Group: "",
Resource: "pods",
},
schema.GroupResource{ schema.GroupResource{
Group: "extensions", Group: "extensions",
Resource: "deployments", Resource: "deployments",
}, },
schema.GroupResource{
Group: "",
Resource: "pods",
},
}, resource) }, resource)
require.Equal(t, false, namespaced) require.Equal(t, false, namespaced)
} }

View file

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
"github.com/golang/glog" "github.com/golang/glog"
pmodel "github.com/prometheus/common/model" pmodel "github.com/prometheus/common/model"
) )
@ -83,12 +84,33 @@ func NewBasicSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.
metricLister: lister, metricLister: lister,
} }
lister.SetNotificationReceiver(registry.onNewDataAvailable) lister.AddNotificationReceiver(registry.onNewDataAvailable)
return &registry return &registry
} }
func (r *basicSeriesRegistry) filterMetrics(result metricUpdateResult) metricUpdateResult {
namers := make([]MetricNamer, 0)
series := make([][]prom.Series, 0)
targetType := config.MetricType("Custom")
for i, namer := range result.namers {
if namer.MetricType() == targetType {
namers = append(namers, namer)
series = append(series, result.series[i])
}
}
return metricUpdateResult{
namers: namers,
series: series,
}
}
func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) { func (r *basicSeriesRegistry) onNewDataAvailable(result metricUpdateResult) {
result = r.filterMetrics(result)
newSeriesSlices := result.series newSeriesSlices := result.series
namers := result.namers namers := result.namers