Initial Functionality

The initial functionality works.  There's still a number of TODOs to
clean up, and some edge cases to work around, and some errors that could
be handled better.
This commit is contained in:
Solly Ross 2017-05-09 21:34:24 -04:00
commit 5bff503339
13 changed files with 2364 additions and 0 deletions

View file

@ -0,0 +1,360 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"fmt"
"strings"
"sync"
"k8s.io/apimachinery/pkg/runtime/schema"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/custom-metrics-boilerplate/pkg/provider"
"github.com/golang/glog"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
pmodel "github.com/prometheus/common/model"
)
// NB: container metrics sourced from cAdvisor don't consistently follow naming conventions,
// so we need to whitelist them and handle them on a case-by-case basis. Metrics ending in `_total`
// *should* be counters, but may actually be guages in this case.
// SeriesType represents the kind of series backing a metric.
type SeriesType int
const (
CounterSeries SeriesType = iota
SecondsCounterSeries
GaugeSeries
)
// SeriesRegistry provides conversions between Prometheus series and MetricInfo
type SeriesRegistry interface {
// SetSeries replaces the known series in this registry
SetSeries(series []prom.Series) error
// ListAllMetrics lists all metrics known to this registry
ListAllMetrics() []provider.MetricInfo
// SeriesForMetric looks up the minimum required series information to make a query for the given metric
// against the given resource (namespace may be empty for non-namespaced resources)
QueryForMetric(info provider.MetricInfo, namespace string, resourceNames ...string) (kind SeriesType, query prom.Selector, groupBy string, found bool)
// MatchValuesToNames matches result values to resource names for the given metric and value set
MatchValuesToNames(metricInfo provider.MetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool)
}
type seriesInfo struct {
// baseSeries represents the minimum information to access a particular series
baseSeries prom.Series
// kind is the type of this series
kind SeriesType
// isContainer indicates if the series is a cAdvisor container_ metric, and thus needs special handling
isContainer bool
}
// overridableSeriesRegistry is a basic SeriesRegistry
type basicSeriesRegistry struct {
mu sync.RWMutex
// info maps metric info to information about the corresponding series
info map[provider.MetricInfo]seriesInfo
// metrics is the list of all known metrics
metrics []provider.MetricInfo
// namer is the metricNamer responsible for converting series to metric names and information
namer metricNamer
}
func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error {
newInfo := make(map[provider.MetricInfo]seriesInfo)
for _, series := range newSeries {
if strings.HasPrefix(series.Name, "container_") {
r.namer.processContainerSeries(series, newInfo)
} else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" {
// TODO: handle metrics describing a namespace
if err := r.namer.processNamespacedSeries(series, newInfo); err != nil {
// TODO: do we want to log this and continue, or abort?
return err
}
} else {
if err := r.namer.processRootScopedSeries(series, newInfo); err != nil {
// TODO: do we want to log this and continue, or abort?
return err
}
}
}
newMetrics := make([]provider.MetricInfo, 0, len(newInfo))
for info, _ := range newInfo {
newMetrics = append(newMetrics, info)
}
r.mu.Lock()
defer r.mu.Unlock()
r.info = newInfo
r.metrics = newMetrics
return nil
}
func (r *basicSeriesRegistry) ListAllMetrics() []provider.MetricInfo {
r.mu.RLock()
defer r.mu.RUnlock()
return r.metrics
}
func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, namespace string, resourceNames ...string) (kind SeriesType, query prom.Selector, groupBy string, found bool) {
r.mu.RLock()
defer r.mu.RUnlock()
if len(resourceNames) == 0 {
// TODO: return error? panic?
}
metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo)
if err != nil {
glog.Errorf("unable to normalize group resource while producing a query: %v", err)
return 0, "", "", false
}
// TODO: support container metrics
if info, found := r.info[metricInfo]; found {
targetValue := resourceNames[0]
matcher := prom.LabelEq
if len(resourceNames) > 1 {
targetValue = strings.Join(resourceNames, "|")
matcher = prom.LabelMatches
}
var expressions []string
if info.isContainer {
expressions = []string{matcher("pod_name", targetValue), prom.LabelNeq("container_name", "POD")}
groupBy = "pod_name"
} else {
// TODO: copy base series labels?
expressions = []string{matcher(singularResource, targetValue)}
groupBy = singularResource
}
if metricInfo.Namespaced {
expressions = append(expressions, prom.LabelEq("namespace", namespace))
}
return info.kind, prom.MatchSeries(info.baseSeries.Name, expressions...), groupBy, true
}
glog.V(10).Infof("metric %v not registered", metricInfo)
return 0, "", "", false
}
func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) {
r.mu.RLock()
defer r.mu.RUnlock()
metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo)
if err != nil {
glog.Errorf("unable to normalize group resource while matching values to names: %v", err)
return nil, false
}
if info, found := r.info[metricInfo]; found {
res := make(map[string]pmodel.SampleValue, len(values))
for _, val := range values {
if val == nil {
// skip empty values
continue
}
labelName := pmodel.LabelName(singularResource)
if info.isContainer {
labelName = pmodel.LabelName("pod_name")
}
res[string(val.Metric[labelName])] = val.Value
}
return res, true
}
return nil, false
}
// metricNamer knows how to construct MetricInfo out of raw prometheus series descriptions.
type metricNamer struct {
// overrides contains the list of container metrics whose naming we want to override.
// This is used to properly convert certain cAdvisor container metrics.
overrides map[string]seriesSpec
mapper apimeta.RESTMapper
}
// seriesSpec specifies how to produce metric info for a particular prometheus series source
type seriesSpec struct {
// metricName is the desired output API metric name
metricName string
// kind indicates whether or not this metric is cumulative,
// and thus has to be calculated as a rate when returning it
kind SeriesType
}
// normalizeInfo takes in some metricInfo an "normalizes" it to ensure a common GroupResource form.
func (r *metricNamer) normalizeInfo(metricInfo provider.MetricInfo) (provider.MetricInfo, string, error) {
// NB: we need to "normalize" the metricInfo's GroupResource so we have a consistent pluralization, etc
// TODO: move this to the boilerplate?
normalizedGroupRes, err := r.mapper.ResourceFor(metricInfo.GroupResource.WithVersion(""))
if err != nil {
return provider.MetricInfo{}, "", err
}
metricInfo.GroupResource = normalizedGroupRes.GroupResource()
singularResource, err := r.mapper.ResourceSingularizer(metricInfo.GroupResource.Resource)
if err != nil {
return provider.MetricInfo{}, "", err
}
return metricInfo, singularResource, nil
}
// processContainerSeries performs special work to extract metric definitions
// from cAdvisor-sourced container metrics, which don't particularly follow any useful conventions consistently.
func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) {
originalName := series.Name
var name string
metricKind := GaugeSeries
if override, hasOverride := n.overrides[series.Name]; hasOverride {
name = override.metricName
metricKind = override.kind
} else {
// chop of the "container_" prefix
series.Name = series.Name[10:]
name, metricKind = n.metricNameFromSeries(series)
}
info := provider.MetricInfo{
// TODO: is the plural correct?
GroupResource: schema.GroupResource{Resource: "pods"},
Namespaced: true,
Metric: name,
}
infos[info] = seriesInfo{
kind: metricKind,
baseSeries: prom.Series{Name: originalName},
isContainer: true,
}
}
// processNamespacedSeries adds the metric info for the given generic namespaced series to
// the map of metric info.
func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error {
name, metricKind := n.metricNameFromSeries(series)
resources, err := n.groupResourcesFromSeries(series)
if err != nil {
return fmt.Errorf("unable to process prometheus series %s: %v", series.Name, err)
}
// we add one metric for each resource that this could describe
for _, resource := range resources {
info := provider.MetricInfo{
GroupResource: resource,
Namespaced: true,
Metric: name,
}
// metrics describing namespaces aren't considered to be namespaced
if resource == (schema.GroupResource{Resource: "namespaces"}) {
info.Namespaced = false
}
infos[info] = seriesInfo{
kind: metricKind,
baseSeries: prom.Series{Name: series.Name},
}
}
return nil
}
// processesRootScopedSeries adds the metric info for the given generic namespaced series to
// the map of metric info.
func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error {
name, metricKind := n.metricNameFromSeries(series)
resources, err := n.groupResourcesFromSeries(series)
if err != nil {
return fmt.Errorf("unable to process prometheus series %s: %v", series.Name, err)
}
// we add one metric for each resource that this could describe
for _, resource := range resources {
info := provider.MetricInfo{
GroupResource: resource,
Namespaced: false,
Metric: name,
}
infos[info] = seriesInfo{
kind: metricKind,
baseSeries: prom.Series{Name: series.Name},
}
}
return nil
}
// groupResourceFromSeries collects the possible group-resources that this series could describe by
// going through each label, checking to see if it corresponds to a known resource. For instance,
// a series `ingress_http_hits_total{pod="foo",service="bar",ingress="baz",namespace="ns"}`
// would return three GroupResources: "pods", "services", and "ingresses".
// Returned MetricInfo is equilavent to the "normalized" info produced by normalizeInfo.
func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) {
// TODO: do we need to cache this, or is ResourceFor good enough?
var res []schema.GroupResource
for label, _ := range series.Labels {
// TODO: figure out a way to let people specify a fully-qualified name in label-form
// TODO: will this work when missing a group?
gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)})
if err != nil {
if apimeta.IsNoMatchError(err) {
continue
}
return nil, err
}
res = append(res, gvr.GroupResource())
}
return res, nil
}
// metricNameFromSeries extracts a metric name from a series name, and indicates
// whether or not that series was a counter. It also has special logic to deal with time-based
// counters, which general get converted to milli-unit rate metrics.
func (n *metricNamer) metricNameFromSeries(series prom.Series) (name string, kind SeriesType) {
kind = GaugeSeries
name = series.Name
if strings.HasSuffix(name, "_total") {
kind = CounterSeries
name = name[:len(name)-6]
if strings.HasSuffix(name, "_seconds") {
kind = SecondsCounterSeries
name = name[:len(name)-8]
}
}
return
}

View file

@ -0,0 +1,381 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"sort"
"testing"
"k8s.io/client-go/pkg/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/custom-metrics-boilerplate/pkg/provider"
// install extensions so that our RESTMapper knows about it
_ "k8s.io/client-go/pkg/apis/extensions/install"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
func setupMetricNamer(t *testing.T) *metricNamer {
return &metricNamer{
overrides: map[string]seriesSpec{
"container_actually_gauge_seconds_total": seriesSpec{
metricName: "actually_gauge",
kind: GaugeSeries,
},
},
mapper: api.Registry.RESTMapper(),
}
}
func TestMetricNamerContainerSeries(t *testing.T) {
testCases := []struct{
input prom.Series
outputMetricName string
outputInfo seriesInfo
}{
{
input: prom.Series{
Name: "container_actually_gauge_seconds_total",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
outputMetricName: "actually_gauge",
outputInfo: seriesInfo{
baseSeries: prom.Series{Name: "container_actually_gauge_seconds_total"},
kind: GaugeSeries,
isContainer: true,
},
},
{
input: prom.Series{
Name: "container_some_usage",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
outputMetricName: "some_usage",
outputInfo: seriesInfo{
baseSeries: prom.Series{Name: "container_some_usage"},
kind: GaugeSeries,
isContainer: true,
},
},
{
input: prom.Series{
Name: "container_some_count_total",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
outputMetricName: "some_count",
outputInfo: seriesInfo{
baseSeries: prom.Series{Name: "container_some_count_total"},
kind: CounterSeries,
isContainer: true,
},
},
{
input: prom.Series{
Name: "container_some_time_seconds_total",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
outputMetricName: "some_time",
outputInfo: seriesInfo{
baseSeries: prom.Series{Name: "container_some_time_seconds_total"},
kind: SecondsCounterSeries,
isContainer: true,
},
},
}
assert := assert.New(t)
namer := setupMetricNamer(t)
resMap := map[provider.MetricInfo]seriesInfo{}
for _, test := range testCases {
namer.processContainerSeries(test.input, resMap)
metric := provider.MetricInfo{
Metric: test.outputMetricName,
GroupResource: schema.GroupResource{Resource: "pods"},
Namespaced: true,
}
if assert.Contains(resMap, metric) {
assert.Equal(test.outputInfo, resMap[metric])
}
}
}
func TestSeriesRegistry(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
namer := setupMetricNamer(t)
registry := &basicSeriesRegistry{
namer: *namer,
}
inputSeries := []prom.Series{
// container series
{
Name: "container_actually_gauge_seconds_total",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
{
Name: "container_some_usage",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
{
Name: "container_some_count_total",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
{
Name: "container_some_time_seconds_total",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
// namespaced series
// a series that should turn into multiple metrics
{
Name: "ingress_hits_total",
Labels: map[string]string{"ingress": "someingress", "service": "somesvc", "pod": "backend1", "namespace": "somens"},
},
{
Name: "ingress_hits_total",
Labels: map[string]string{"ingress": "someingress", "service": "somesvc", "pod": "backend2", "namespace": "somens"},
},
{
Name: "service_proxy_packets",
Labels: map[string]string{"service": "somesvc", "namespace": "somens"},
},
{
Name: "work_queue_wait_seconds_total",
Labels: map[string]string{"deployment": "somedep", "namespace": "somens"},
},
// non-namespaced series
{
Name: "node_gigawatts",
Labels: map[string]string{"node": "somenode"},
},
{
Name: "volume_claims_total",
Labels: map[string]string{"persistentvolume": "somepv"},
},
{
Name: "node_fan_seconds_total",
Labels: map[string]string{"node": "somenode"},
},
// unrelated series
{
Name: "admin_coffee_liters_total",
Labels: map[string]string{"admin": "some-admin"},
},
{
Name: "admin_unread_emails",
Labels: map[string]string{"admin": "some-admin"},
},
{
Name: "admin_reddit_seconds_total",
Labels: map[string]string{"admin": "some-admin"},
},
}
// set up the registry
require.NoError(registry.SetSeries(inputSeries))
// make sure each metric got registered and can form queries
testCases := []struct{
title string
info provider.MetricInfo
namespace string
resourceNames []string
expectedKind SeriesType
expectedQuery string
}{
// container metrics
{
title: "container metrics overrides / single resource name",
info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "actually_gauge"},
namespace: "somens",
resourceNames: []string{"somepod"},
expectedKind: GaugeSeries,
expectedQuery: "container_actually_gauge_seconds_total{pod_name=\"somepod\",container_name!=\"POD\",namespace=\"somens\"}",
},
{
title: "container metrics gauge / multiple resource names",
info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_usage"},
namespace: "somens",
resourceNames: []string{"somepod1", "somepod2"},
expectedKind: GaugeSeries,
expectedQuery: "container_some_usage{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}",
},
{
title: "container metrics counter",
info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_count"},
namespace: "somens",
resourceNames: []string{"somepod1", "somepod2"},
expectedKind: CounterSeries,
expectedQuery: "container_some_count_total{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}",
},
{
title: "container metrics seconds counter",
info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_time"},
namespace: "somens",
resourceNames: []string{"somepod1", "somepod2"},
expectedKind: SecondsCounterSeries,
expectedQuery: "container_some_time_seconds_total{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}",
},
// namespaced metrics
{
title: "namespaced metrics counter / multidimensional (service)",
info: provider.MetricInfo{schema.GroupResource{Resource: "service"}, true, "ingress_hits"},
namespace: "somens",
resourceNames: []string{"somesvc"},
expectedKind: CounterSeries,
expectedQuery: "ingress_hits_total{service=\"somesvc\",namespace=\"somens\"}",
},
{
title: "namespaced metrics counter / multidimensional (ingress)",
info: provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "ingress"}, true, "ingress_hits"},
namespace: "somens",
resourceNames: []string{"someingress"},
expectedKind: CounterSeries,
expectedQuery: "ingress_hits_total{ingress=\"someingress\",namespace=\"somens\"}",
},
{
title: "namespaced metrics counter / multidimensional (pod)",
info: provider.MetricInfo{schema.GroupResource{Resource: "pod"}, true, "ingress_hits"},
namespace: "somens",
resourceNames: []string{"somepod"},
expectedKind: CounterSeries,
expectedQuery: "ingress_hits_total{pod=\"somepod\",namespace=\"somens\"}",
},
{
title: "namespaced metrics gauge",
info: provider.MetricInfo{schema.GroupResource{Resource: "service"}, true, "service_proxy_packets"},
namespace: "somens",
resourceNames: []string{"somesvc"},
expectedKind: GaugeSeries,
expectedQuery: "service_proxy_packets{service=\"somesvc\",namespace=\"somens\"}",
},
{
title: "namespaced metrics seconds counter",
info: provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "deployment"}, true, "work_queue_wait"},
namespace: "somens",
resourceNames: []string{"somedep"},
expectedKind: SecondsCounterSeries,
expectedQuery: "work_queue_wait_seconds_total{deployment=\"somedep\",namespace=\"somens\"}",
},
// non-namespaced series
{
title: "root scoped metrics gauge",
info: provider.MetricInfo{schema.GroupResource{Resource: "node"}, false, "node_gigawatts"},
resourceNames: []string{"somenode"},
expectedKind: GaugeSeries,
expectedQuery: "node_gigawatts{node=\"somenode\"}",
},
{
title: "root scoped metrics counter",
info: provider.MetricInfo{schema.GroupResource{Resource: "persistentvolume"}, false, "volume_claims"},
resourceNames: []string{"somepv"},
expectedKind: CounterSeries,
expectedQuery: "volume_claims_total{persistentvolume=\"somepv\"}",
},
{
title: "root scoped metrics seconds counter",
info: provider.MetricInfo{schema.GroupResource{Resource: "node"}, false, "node_fan"},
resourceNames: []string{"somenode"},
expectedKind: SecondsCounterSeries,
expectedQuery: "node_fan_seconds_total{node=\"somenode\"}",
},
}
for _, testCase := range testCases {
outputKind, outputQuery, found := registry.QueryForMetric(testCase.info, testCase.namespace, testCase.resourceNames...)
if !assert.True(found, "%s: metric %v should available", testCase.title, testCase.info) {
continue
}
assert.Equal(testCase.expectedKind, outputKind, "%s: metric %v should have had the right series type", testCase.title, testCase.info)
assert.Equal(prom.Selector(testCase.expectedQuery), outputQuery, "%s: metric %v should have produced the correct query for %v in namespace %s", testCase.title, testCase.info, testCase.resourceNames, testCase.namespace)
}
allMetrics := registry.ListAllMetrics()
expectedMetrics := []provider.MetricInfo{
provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "actually_gauge"},
provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_usage"},
provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_count"},
provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_time"},
provider.MetricInfo{schema.GroupResource{Resource: "services"}, true, "ingress_hits"},
provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "ingresses"}, true, "ingress_hits"},
provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "ingress_hits"},
provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "ingress_hits"},
provider.MetricInfo{schema.GroupResource{Resource: "services"}, true, "service_proxy_packets"},
provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "service_proxy_packets"},
provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "deployments"}, true, "work_queue_wait"},
provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "work_queue_wait"},
provider.MetricInfo{schema.GroupResource{Resource: "nodes"}, false, "node_gigawatts"},
provider.MetricInfo{schema.GroupResource{Resource: "persistentvolumes"}, false, "volume_claims"},
provider.MetricInfo{schema.GroupResource{Resource: "nodes"}, false, "node_fan"},
}
// sort both for easy comparison
sort.Sort(metricInfoSorter(allMetrics))
sort.Sort(metricInfoSorter(expectedMetrics))
assert.Equal(expectedMetrics, allMetrics, "should have listed all expected metrics")
}
// metricInfoSorter is a sort.Interface for sorting provider.MetricInfos
type metricInfoSorter []provider.MetricInfo
func (s metricInfoSorter) Len() int {
return len(s)
}
func (s metricInfoSorter) Less(i, j int) bool {
infoI := s[i]
infoJ := s[j]
if infoI.Metric == infoJ.Metric {
if infoI.GroupResource == infoJ.GroupResource {
return infoI.Namespaced
}
if infoI.GroupResource.Group == infoJ.GroupResource.Group {
return infoI.GroupResource.Resource < infoJ.GroupResource.Resource
}
return infoI.GroupResource.Group < infoJ.GroupResource.Group
}
return infoI.Metric < infoJ.Metric
}
func (s metricInfoSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

View file

@ -0,0 +1,343 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"context"
"time"
"fmt"
"net/http"
"github.com/golang/glog"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/client-go/pkg/api"
_ "k8s.io/client-go/pkg/api/install"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/dynamic"
"k8s.io/custom-metrics-boilerplate/pkg/provider"
"k8s.io/apimachinery/pkg/util/wait"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
pmodel "github.com/prometheus/common/model"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
// newMetricNotFoundError returns a StatusError indicating the given metric could not be found.
// It is similar to NewNotFound, but more specialized
func newMetricNotFoundError(resource schema.GroupResource, metricName string) *apierr.StatusError {
return &apierr.StatusError{metav1.Status{
Status: metav1.StatusFailure,
Code: int32(http.StatusNotFound),
Reason: metav1.StatusReasonNotFound,
Message: fmt.Sprintf("the server could not find the metric %s for %s", metricName, resource.String()),
}}
}
// newMetricNotFoundForError returns a StatusError indicating the given metric could not be found for
// the given named object. It is similar to NewNotFound, but more specialized
func newMetricNotFoundForError(resource schema.GroupResource, metricName string, resourceName string) *apierr.StatusError {
return &apierr.StatusError{metav1.Status{
Status: metav1.StatusFailure,
Code: int32(http.StatusNotFound),
Reason: metav1.StatusReasonNotFound,
Message: fmt.Sprintf("the server could not find the metric %s for %s %s", metricName, resource.String(), resourceName),
}}
}
type prometheusProvider struct {
mapper apimeta.RESTMapper
kubeClient dynamic.ClientPool
promClient prom.Client
SeriesRegistry
rateInterval time.Duration
}
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration) provider.CustomMetricsProvider {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
promClient: promClient,
SeriesRegistry: &basicSeriesRegistry{
namer: metricNamer{
// TODO: populate this...
overrides: nil,
mapper: mapper,
},
},
}
// TODO: allow for RunUntil
lister.Run()
return &prometheusProvider{
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,
SeriesRegistry: lister,
rateInterval: rateInterval,
}
}
func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) {
kind, err := p.mapper.KindFor(groupResource.WithVersion(""))
if err != nil {
return nil, err
}
return &custom_metrics.MetricValue{
DescribedObject: api.ObjectReference{
APIVersion: groupResource.Group+"/"+runtime.APIVersionInternal,
Kind: kind.Kind,
Name: name,
Namespace: namespace,
},
MetricName: metricName,
Timestamp: metav1.Time{time.Now()},
Value: *resource.NewMilliQuantity(int64(value * 1000.0), resource.DecimalSI),
}, nil
}
func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.MetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) {
if !apimeta.IsListType(list) {
// TODO: fix the error type here
return nil, fmt.Errorf("returned object was not a list")
}
values, found := p.MatchValuesToNames(info, valueSet)
if !found {
// TODO: throw error
}
res := []custom_metrics.MetricValue{}
// blech, EachListItem should pass an index --
// it's an implementation detail that it happens to be sequential
err := apimeta.EachListItem(list, func(item runtime.Object) error {
objUnstructured := item.(*unstructured.Unstructured)
objName := objUnstructured.GetName()
if _, found := values[objName]; !found {
return nil
}
value, err := p.metricFor(values[objName], info.GroupResource, objUnstructured.GetNamespace(), objName, info.Metric)
if err != nil {
return err
}
res = append(res, *value)
return nil
})
if err != nil {
return nil, err
}
return &custom_metrics.MetricValueList{
Items: res,
}, nil
}
func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace string, names ...string) (pmodel.Vector, error) {
kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...)
if !found {
return nil, newMetricNotFoundError(info.GroupResource, info.Metric)
}
fullQuery := baseQuery
switch kind {
case CounterSeries:
fullQuery = prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String()))
case SecondsCounterSeries:
// TODO: futher modify for seconds?
fullQuery = prom.Selector(prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String())))
}
// TODO: too small of a rate interval will return no results...
// sum over all other dimensions of this query (e.g. if we select on route, sum across all pods,
// but if we select on pods, sum across all routes), and split by the dimension of our resource
// TODO: return/populate the by list in SeriesForMetric
fullQuery = prom.Selector(fmt.Sprintf("sum(%s) by (%s)", fullQuery, groupBy))
// TODO: use an actual context
queryResults, err := p.promClient.Query(context.Background(), pmodel.Now(), fullQuery)
if err != nil {
// TODO: interpret this somehow?
glog.Errorf("unable to fetch metrics from prometheus: %v", err)
// don't leak implementation details to the user
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
}
if queryResults.Type != pmodel.ValVector {
glog.Errorf("unexpected results from prometheus: expected %s, got %s on results %v", pmodel.ValVector, queryResults.Type, queryResults)
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
}
return *queryResults.Vector, nil
}
func (p *prometheusProvider) getSingle(info provider.MetricInfo, namespace, name string) (*custom_metrics.MetricValue, error) {
queryResults, err := p.buildQuery(info, namespace, name)
if err != nil {
return nil, err
}
if len(queryResults) < 1 {
return nil, newMetricNotFoundForError(info.GroupResource, info.Metric, name)
}
// TODO: check if lenght of results > 1?
// TODO: check if our output name is the same as our input name
resultValue := queryResults[0].Value
return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric)
}
func (p *prometheusProvider) getMultiple(info provider.MetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) {
// construct a client to list the names of objects matching the label selector
// TODO: figure out version?
client, err := p.kubeClient.ClientForGroupVersionResource(info.GroupResource.WithVersion(""))
if err != nil {
glog.Errorf("unable to construct dynamic client to list matching resource names: %v", err)
// TODO: check for resource not found error?
// don't leak implementation details to the user
return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources"))
}
// we can construct a this APIResource ourself, since the dynamic client only uses Name and Namespaced
// TODO: use discovery information instead
apiRes := &metav1.APIResource{
Name: info.GroupResource.Resource,
Namespaced: info.Namespaced,
}
// actually list the objects matching the label selector
// TODO: work for objects not in core v1
matchingObjectsRaw, err := client.Resource(apiRes, namespace).
List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
glog.Errorf("unable to list matching resource names: %v", err)
// TODO: check for resource not found error?
// don't leak implementation details to the user
return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources"))
}
// make sure we have a list
if !apimeta.IsListType(matchingObjectsRaw) {
// TODO: fix the error type here
return nil, fmt.Errorf("returned object was not a list")
}
// convert a list of objects into the corresponding list of names
resourceNames := []string{}
err = apimeta.EachListItem(matchingObjectsRaw, func(item runtime.Object) error {
objName := item.(*unstructured.Unstructured).GetName()
resourceNames = append(resourceNames, objName)
return nil
})
// construct the actual query
queryResults, err := p.buildQuery(info, namespace, resourceNames...)
if err != nil {
return nil, err
}
return p.metricsFor(queryResults, info, matchingObjectsRaw)
}
func (p *prometheusProvider) GetRootScopedMetricByName(groupResource schema.GroupResource, name string, metricName string) (*custom_metrics.MetricValue, error) {
info := provider.MetricInfo{
GroupResource: groupResource,
Metric: metricName,
Namespaced: false,
}
return p.getSingle(info, "", name)
}
func (p *prometheusProvider) GetRootScopedMetricBySelector(groupResource schema.GroupResource, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) {
info := provider.MetricInfo{
GroupResource: groupResource,
Metric: metricName,
Namespaced: false,
}
return p.getMultiple(info, "", selector)
}
func (p *prometheusProvider) GetNamespacedMetricByName(groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) {
info := provider.MetricInfo{
GroupResource: groupResource,
Metric: metricName,
Namespaced: true,
}
return p.getSingle(info, namespace, name)
}
func (p *prometheusProvider) GetNamespacedMetricBySelector(groupResource schema.GroupResource, namespace string, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) {
info := provider.MetricInfo{
GroupResource: groupResource,
Metric: metricName,
Namespaced: true,
}
return p.getMultiple(info, namespace, selector)
}
type cachingMetricsLister struct {
SeriesRegistry
promClient prom.Client
updateInterval time.Duration
}
func (l *cachingMetricsLister) Run() {
go wait.Forever(func () {
if err := l.updateMetrics(); err != nil {
utilruntime.HandleError(err)
}
}, l.updateInterval)
}
func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1*l.updateInterval)
// TODO: figure out a good way to add all Kubernetes-related metrics at once
// (i.e. how do we determine if something is a Kubernetes-related metric?)
// container-specific metrics from cAdvsior have their own form, and need special handling
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))
// TODO: figure out how to determine which metrics on non-namespaced objects are kubernetes-related
// TODO: use an actual context here
series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, containerSel, namespacedSel)
if err != nil {
return fmt.Errorf("unable to update list of all available metrics: %v", err)
}
glog.V(10).Infof("Set available metric list from Prometheus to: %v", series)
l.SetSeries(series)
return nil
}

View file

@ -0,0 +1,166 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"fmt"
"sort"
"time"
"testing"
fakedyn "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/pkg/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/custom-metrics-boilerplate/pkg/provider"
// install extensions so that our RESTMapper knows about it
_ "k8s.io/client-go/pkg/apis/extensions/install"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
pmodel "github.com/prometheus/common/model"
)
const fakeProviderUpdateInterval = 2*time.Second
// fakePromClient is a fake instance of prom.Client
type fakePromClient struct {
// acceptibleInterval is the interval in which to return queries
acceptibleInterval pmodel.Interval
// errQueries are queries that result in an error (whether from Query or Series)
errQueries map[prom.Selector]error
// series are non-error responses to partial Series calls
series map[prom.Selector][]prom.Series
// queryResults are non-error responses to Query
queryResults map[prom.Selector]prom.QueryResult
}
func (c *fakePromClient) Series(interval pmodel.Interval, selectors ...prom.Selector) ([]prom.Series, error) {
if (interval.Start != 0 && interval.Start < c.acceptibleInterval.Start) || (interval.End != 0 && interval.End > c.acceptibleInterval.End) {
return nil, fmt.Errorf("interval [%v, %v] for query is outside range [%v, %v]", interval.Start, interval.End, c.acceptibleInterval.Start, c.acceptibleInterval.End)
}
res := []prom.Series{}
for _, sel := range selectors {
if err, found := c.errQueries[sel]; found {
return nil, err
}
if series, found := c.series[sel]; found {
res = append(res, series...)
}
}
return res, nil
}
func (c *fakePromClient) Query(t pmodel.Time, query prom.Selector) (prom.QueryResult, error) {
if t < c.acceptibleInterval.Start || t > c.acceptibleInterval.End {
return prom.QueryResult{}, fmt.Errorf("time %v for query is outside range [%v, %v]", t, c.acceptibleInterval.Start, c.acceptibleInterval.End)
}
if err, found := c.errQueries[query]; found {
return prom.QueryResult{}, err
}
if res, found := c.queryResults[query]; found {
return res, nil
}
return prom.QueryResult{
Type: pmodel.ValVector,
Vector: &pmodel.Vector{},
}, nil
}
func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fakePromClient) {
fakeProm := &fakePromClient{}
fakeKubeClient := &fakedyn.FakeClientPool{}
prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute)
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))
fakeProm.series = map[prom.Selector][]prom.Series{
containerSel: []prom.Series{
{
Name: "container_actually_gauge_seconds_total",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
{
Name: "container_some_usage",
Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"},
},
},
namespacedSel: []prom.Series{
{
Name: "ingress_hits_total",
Labels: map[string]string{"ingress": "someingress", "service": "somesvc", "pod": "backend1", "namespace": "somens"},
},
{
Name: "ingress_hits_total",
Labels: map[string]string{"ingress": "someingress", "service": "somesvc", "pod": "backend2", "namespace": "somens"},
},
{
Name: "service_proxy_packets",
Labels: map[string]string{"service": "somesvc", "namespace": "somens"},
},
{
Name: "work_queue_wait_seconds_total",
Labels: map[string]string{"deployment": "somedep", "namespace": "somens"},
},
},
}
return prov, fakeProm
}
func TestListAllMetrics(t *testing.T) {
// setup
prov, fakeProm := setupPrometheusProvider(t)
// assume we have no updates
require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start")
// set the acceptible interval (now until the next update, with a bit of wiggle room)
startTime := pmodel.Now()
endTime := startTime.Add(fakeProviderUpdateInterval + fakeProviderUpdateInterval/10)
fakeProm.acceptibleInterval = pmodel.Interval{Start: startTime, End: endTime}
// wait one update interval (with a bit of wiggle room)
time.Sleep(fakeProviderUpdateInterval + fakeProviderUpdateInterval/10)
// list/sort the metrics
actualMetrics := prov.ListAllMetrics()
sort.Sort(metricInfoSorter(actualMetrics))
expectedMetrics := []provider.MetricInfo{
provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "actually_gauge"},
provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_usage"},
provider.MetricInfo{schema.GroupResource{Resource: "services"}, true, "ingress_hits"},
provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "ingresses"}, true, "ingress_hits"},
provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "ingress_hits"},
provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "ingress_hits"},
provider.MetricInfo{schema.GroupResource{Resource: "services"}, true, "service_proxy_packets"},
provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "service_proxy_packets"},
provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "deployments"}, true, "work_queue_wait"},
provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "work_queue_wait"},
}
sort.Sort(metricInfoSorter(expectedMetrics))
// assert that we got what we expected
assert.Equal(t, expectedMetrics, actualMetrics)
}