Merge pull request #15 from DirectXMan12/refactor/update-todos

Cleanups from initial development
This commit is contained in:
Solly Ross 2017-08-04 15:14:59 -04:00 committed by GitHub
commit b6602e1724
10 changed files with 109 additions and 110 deletions

View file

@ -78,3 +78,19 @@ aggregated over all non-requested metrics.
The adapter does not consider resources consumed by the "POD" container, The adapter does not consider resources consumed by the "POD" container,
which exists as part of all Kubernetes pods running in Docker simply which exists as part of all Kubernetes pods running in Docker simply
supports the existance of the pod's shared network namespace. supports the existance of the pod's shared network namespace.
Example
-------
@luxas has an excellent example deployment of Prometheus, this adapter,
and a demo pod which serves a metric `http_requests_total`, which becomes
the custom metrics API metric `pods/http_requests`. It also autoscales on
that metric using the `autoscaling/v2alpha1` HorizontalPodAutoscaler.
It can be found at https://github.com/luxas/kubeadm-workshop. Pay special
attention to:
- [Deploying the Prometheus
Operator](https://github.com/luxas/kubeadm-workshop#deploying-the-prometheus-operator-for-monitoring-services-in-the-cluster)
- [Setting up the custom metrics adapter and sample
app](https://github.com/luxas/kubeadm-workshop#deploying-a-custom-metrics-api-server-and-a-sample-app)

View file

@ -30,10 +30,11 @@ import (
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"github.com/directxman12/custom-metrics-boilerplate/pkg/cmd/server"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics" mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics"
cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider" cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/server"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/dynamicmapper"
) )
// NewCommandStartPrometheusAdapterServer provides a CLI handler for 'start master' command // NewCommandStartPrometheusAdapterServer provides a CLI handler for 'start master' command
@ -44,6 +45,7 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan
MetricsRelistInterval: 10 * time.Minute, MetricsRelistInterval: 10 * time.Minute,
RateInterval: 5 * time.Minute, RateInterval: 5 * time.Minute,
PrometheusURL: "https://localhost", PrometheusURL: "https://localhost",
DiscoveryInterval: 10 * time.Minute,
} }
cmd := &cobra.Command{ cmd := &cobra.Command{
@ -76,6 +78,8 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan
"interval at which to re-list the set of all available metrics from Prometheus") "interval at which to re-list the set of all available metrics from Prometheus")
flags.DurationVar(&o.RateInterval, "rate-interval", o.RateInterval, ""+ flags.DurationVar(&o.RateInterval, "rate-interval", o.RateInterval, ""+
"period of time used to calculate rate metrics from cumulative metrics") "period of time used to calculate rate metrics from cumulative metrics")
flags.DurationVar(&o.DiscoveryInterval, "discovery-interval", o.DiscoveryInterval, ""+
"interval at which to refresh API discovery information")
flags.StringVar(&o.PrometheusURL, "prometheus-url", o.PrometheusURL, flags.StringVar(&o.PrometheusURL, "prometheus-url", o.PrometheusURL,
"URL and configuration for connecting to Prometheus. Query parameters are used to configure the connection") "URL and configuration for connecting to Prometheus. Query parameters are used to configure the connection")
@ -108,12 +112,10 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c
return fmt.Errorf("unable to construct discovery client for dynamic client: %v", err) return fmt.Errorf("unable to construct discovery client for dynamic client: %v", err)
} }
// TODO: this needs to refresh it's discovery info every once and a while dynamicMapper, err := dynamicmapper.NewRESTMapper(discoveryClient, api.Registry.InterfacesFor, o.DiscoveryInterval)
resources, err := discovery.GetAPIGroupResources(discoveryClient)
if err != nil { if err != nil {
return fmt.Errorf("unable to construct discovery REST mapper: unable to fetch list of resources: %v", err) return fmt.Errorf("unable to construct dynamic discovery mapper: %v", err)
} }
dynamicMapper := discovery.NewRESTMapper(resources, api.Registry.InterfacesFor)
clientPool := dynamic.NewClientPool(clientConfig, dynamicMapper, dynamic.LegacyAPIPathResolverFunc) clientPool := dynamic.NewClientPool(clientConfig, dynamicMapper, dynamic.LegacyAPIPathResolverFunc)
if err != nil { if err != nil {
@ -129,7 +131,7 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c
instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String())
promClient := prom.NewClientForAPI(instrumentedGenericPromClient) promClient := prom.NewClientForAPI(instrumentedGenericPromClient)
cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval) cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval, stopCh)
server, err := config.Complete().New(cmProvider) server, err := config.Complete().New(cmProvider)
if err != nil { if err != nil {
@ -147,6 +149,8 @@ type PrometheusAdapterServerOptions struct {
MetricsRelistInterval time.Duration MetricsRelistInterval time.Duration
// RateInterval is the period of time used to calculate rate metrics // RateInterval is the period of time used to calculate rate metrics
RateInterval time.Duration RateInterval time.Duration
// DiscoveryInterval is the interval at which discovery information is refreshed
DiscoveryInterval time.Duration
// PrometheusURL is the URL describing how to connect to Prometheus. Query parameters configure connection options. // PrometheusURL is the URL describing how to connect to Prometheus. Query parameters configure connection options.
PrometheusURL string PrometheusURL string
} }

20
glide.lock generated
View file

@ -1,5 +1,5 @@
hash: 4432fd0d13b3a79f1febb9040cad9576793e44ab1c4d6e40abc664ad0582999f hash: 815ac74b9c61bedb6cc960ca8b9de13ff7584a66f1a0f9fdb80c5c49986a10cf
updated: 2017-06-27T18:59:14.961201169-04:00 updated: 2017-08-02T15:16:48.029828608-04:00
imports: imports:
- name: bitbucket.org/ww/goautoneg - name: bitbucket.org/ww/goautoneg
version: 75cd24fc2f2c2a2088577d12123ddee5f54e0675 version: 75cd24fc2f2c2a2088577d12123ddee5f54e0675
@ -82,14 +82,6 @@ imports:
version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
subpackages: subpackages:
- spew - spew
- name: github.com/directxman12/custom-metrics-boilerplate
version: 066dd03155e1cdb3dfa513f8bdcb3a30edda1137
subpackages:
- pkg/apiserver
- pkg/apiserver/installer
- pkg/cmd/server
- pkg/provider
- pkg/registry/custom_metrics
- name: github.com/docker/distribution - name: github.com/docker/distribution
version: cd27f179f2c10c5d300e6d09025b538c475b0d51 version: cd27f179f2c10c5d300e6d09025b538c475b0d51
subpackages: subpackages:
@ -163,6 +155,14 @@ imports:
version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75
- name: github.com/juju/ratelimit - name: github.com/juju/ratelimit
version: 77ed1c8a01217656d2080ad51981f6e99adaa177 version: 77ed1c8a01217656d2080ad51981f6e99adaa177
- name: github.com/kubernetes-incubator/custom-metrics-apiserver
version: e17b36ca6ebbec8acbc76d729d24058a09d042ee
subpackages:
- pkg/apiserver
- pkg/apiserver/installer
- pkg/cmd/server
- pkg/provider
- pkg/registry/custom_metrics
- name: github.com/mailru/easyjson - name: github.com/mailru/easyjson
version: d5b7844b561a7bc640052f1b935f7b800330d7e0 version: d5b7844b561a7bc640052f1b935f7b800330d7e0
subpackages: subpackages:

View file

@ -14,7 +14,7 @@ import:
- kubernetes/typed/core/v1 - kubernetes/typed/core/v1
- rest - rest
- tools/clientcmd - tools/clientcmd
- package: github.com/directxman12/custom-metrics-boilerplate - package: github.com/kubernetes-incubator/custom-metrics-apiserver
subpackages: subpackages:
- pkg/cmd/server - pkg/cmd/server
- pkg/provider - pkg/provider

View file

@ -25,6 +25,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
"time"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -54,7 +55,6 @@ func (c *httpAPIClient) Do(ctx context.Context, verb, endpoint string, query url
u.RawQuery = query.Encode() u.RawQuery = query.Encode()
req, err := http.NewRequest(verb, u.String(), nil) req, err := http.NewRequest(verb, u.String(), nil)
if err != nil { if err != nil {
// TODO: fix this to return Error?
return APIResponse{}, fmt.Errorf("error constructing HTTP request to Prometheus: %v", err) return APIResponse{}, fmt.Errorf("error constructing HTTP request to Prometheus: %v", err)
} }
req.WithContext(ctx) req.WithContext(ctx)
@ -96,7 +96,6 @@ func (c *httpAPIClient) Do(ctx context.Context, verb, endpoint string, query url
var res APIResponse var res APIResponse
if err = json.NewDecoder(body).Decode(&res); err != nil { if err = json.NewDecoder(body).Decode(&res); err != nil {
// TODO: return what the body actually was?
return APIResponse{}, &Error{ return APIResponse{}, &Error{
Type: ErrBadResponse, Type: ErrBadResponse,
Msg: err.Error(), Msg: err.Error(),
@ -174,7 +173,9 @@ func (h *queryClient) Query(ctx context.Context, t model.Time, query Selector) (
if t != 0 { if t != 0 {
vals.Set("time", t.String()) vals.Set("time", t.String())
} }
// TODO: get timeout from context... if timeout, hasTimeout := timeoutFromContext(ctx); hasTimeout {
vals.Set("timeout", model.Duration(timeout).String())
}
res, err := h.api.Do(ctx, "GET", queryURL, vals) res, err := h.api.Do(ctx, "GET", queryURL, vals)
if err != nil { if err != nil {
@ -199,7 +200,9 @@ func (h *queryClient) QueryRange(ctx context.Context, r Range, query Selector) (
if r.Step != 0 { if r.Step != 0 {
vals.Set("step", model.Duration(r.Step).String()) vals.Set("step", model.Duration(r.Step).String())
} }
// TODO: get timeout from context... if timeout, hasTimeout := timeoutFromContext(ctx); hasTimeout {
vals.Set("timeout", model.Duration(timeout).String())
}
res, err := h.api.Do(ctx, "GET", queryRangeURL, vals) res, err := h.api.Do(ctx, "GET", queryRangeURL, vals)
if err != nil { if err != nil {
@ -210,3 +213,13 @@ func (h *queryClient) QueryRange(ctx context.Context, r Range, query Selector) (
err = json.Unmarshal(res.Data, &queryRes) err = json.Unmarshal(res.Data, &queryRes)
return queryRes, err return queryRes, err
} }
// timeoutFromContext checks the context for a deadline and calculates a "timeout" duration from it,
// when present
func timeoutFromContext(ctx context.Context) (time.Duration, bool) {
if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
return time.Now().Sub(deadline), true
}
return time.Duration(0), false
}

View file

@ -40,7 +40,9 @@ type Range struct {
Step time.Duration Step time.Duration
} }
// TODO: support timeout in the client? // Client is a Prometheus client for the Prometheus HTTP API.
// The "timeout" parameter for the HTTP API is set based on the context's deadline,
// when present and applicable.
type Client interface { type Client interface {
// Series lists the time series matching the given series selectors // Series lists the time series matching the given series selectors
Series(ctx context.Context, interval model.Interval, selectors ...Selector) ([]Series, error) Series(ctx context.Context, interval model.Interval, selectors ...Selector) ([]Series, error)

View file

@ -21,7 +21,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/directxman12/custom-metrics-boilerplate/pkg/provider" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta" apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -84,15 +84,15 @@ func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error {
if strings.HasPrefix(series.Name, "container_") { if strings.HasPrefix(series.Name, "container_") {
r.namer.processContainerSeries(series, newInfo) r.namer.processContainerSeries(series, newInfo)
} else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" { } else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" {
// TODO: handle metrics describing a namespace // we also handle namespaced metrics here as part of the resource-association logic
if err := r.namer.processNamespacedSeries(series, newInfo); err != nil { if err := r.namer.processNamespacedSeries(series, newInfo); err != nil {
// TODO: do we want to log this and continue, or abort? glog.Errorf("Unable to process namespaced series %q: %v", series.Name, err)
return err continue
} }
} else { } else {
if err := r.namer.processRootScopedSeries(series, newInfo); err != nil { if err := r.namer.processRootScopedSeries(series, newInfo); err != nil {
// TODO: do we want to log this and continue, or abort? glog.Errorf("Unable to process root-scoped series %q: %v", series.Name, err)
return err continue
} }
} }
} }
@ -123,10 +123,11 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, nam
defer r.mu.RUnlock() defer r.mu.RUnlock()
if len(resourceNames) == 0 { if len(resourceNames) == 0 {
// TODO: return error? panic? glog.Errorf("no resource names requested while producing a query for metric %s", metricInfo.String())
return 0, "", "", false
} }
metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) metricInfo, singularResource, err := metricInfo.Normalized(r.namer.mapper)
if err != nil { if err != nil {
glog.Errorf("unable to normalize group resource while producing a query: %v", err) glog.Errorf("unable to normalize group resource while producing a query: %v", err)
return 0, "", "", false return 0, "", "", false
@ -166,7 +167,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo,
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) metricInfo, singularResource, err := metricInfo.Normalized(r.namer.mapper)
if err != nil { if err != nil {
glog.Errorf("unable to normalize group resource while matching values to names: %v", err) glog.Errorf("unable to normalize group resource while matching values to names: %v", err)
return nil, false return nil, false
@ -211,24 +212,6 @@ type seriesSpec struct {
kind SeriesType kind SeriesType
} }
// normalizeInfo takes in some metricInfo an "normalizes" it to ensure a common GroupResource form.
func (r *metricNamer) normalizeInfo(metricInfo provider.MetricInfo) (provider.MetricInfo, string, error) {
// NB: we need to "normalize" the metricInfo's GroupResource so we have a consistent pluralization, etc
// TODO: move this to the boilerplate?
normalizedGroupRes, err := r.mapper.ResourceFor(metricInfo.GroupResource.WithVersion(""))
if err != nil {
return provider.MetricInfo{}, "", err
}
metricInfo.GroupResource = normalizedGroupRes.GroupResource()
singularResource, err := r.mapper.ResourceSingularizer(metricInfo.GroupResource.Resource)
if err != nil {
return provider.MetricInfo{}, "", err
}
return metricInfo, singularResource, nil
}
// processContainerSeries performs special work to extract metric definitions // processContainerSeries performs special work to extract metric definitions
// from cAdvisor-sourced container metrics, which don't particularly follow any useful conventions consistently. // from cAdvisor-sourced container metrics, which don't particularly follow any useful conventions consistently.
func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) { func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) {
@ -247,7 +230,6 @@ func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provi
} }
info := provider.MetricInfo{ info := provider.MetricInfo{
// TODO: is the plural correct?
GroupResource: schema.GroupResource{Resource: "pods"}, GroupResource: schema.GroupResource{Resource: "pods"},
Namespaced: true, Namespaced: true,
Metric: name, Metric: name,
@ -263,6 +245,7 @@ func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provi
// processNamespacedSeries adds the metric info for the given generic namespaced series to // processNamespacedSeries adds the metric info for the given generic namespaced series to
// the map of metric info. // the map of metric info.
func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error {
// NB: all errors must occur *before* we save the series info
name, metricKind := n.metricNameFromSeries(series) name, metricKind := n.metricNameFromSeries(series)
resources, err := n.groupResourcesFromSeries(series) resources, err := n.groupResourcesFromSeries(series)
if err != nil { if err != nil {
@ -294,6 +277,7 @@ func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[prov
// processesRootScopedSeries adds the metric info for the given generic namespaced series to // processesRootScopedSeries adds the metric info for the given generic namespaced series to
// the map of metric info. // the map of metric info.
func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error {
// NB: all errors must occur *before* we save the series info
name, metricKind := n.metricNameFromSeries(series) name, metricKind := n.metricNameFromSeries(series)
resources, err := n.groupResourcesFromSeries(series) resources, err := n.groupResourcesFromSeries(series)
if err != nil { if err != nil {
@ -321,13 +305,11 @@ func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[prov
// going through each label, checking to see if it corresponds to a known resource. For instance, // going through each label, checking to see if it corresponds to a known resource. For instance,
// a series `ingress_http_hits_total{pod="foo",service="bar",ingress="baz",namespace="ns"}` // a series `ingress_http_hits_total{pod="foo",service="bar",ingress="baz",namespace="ns"}`
// would return three GroupResources: "pods", "services", and "ingresses". // would return three GroupResources: "pods", "services", and "ingresses".
// Returned MetricInfo is equilavent to the "normalized" info produced by normalizeInfo. // Returned MetricInfo is equilavent to the "normalized" info produced by metricInfo.Normalized.
func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) { func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) {
// TODO: do we need to cache this, or is ResourceFor good enough?
var res []schema.GroupResource var res []schema.GroupResource
for label := range series.Labels { for label := range series.Labels {
// TODO: figure out a way to let people specify a fully-qualified name in label-form // TODO: figure out a way to let people specify a fully-qualified name in label-form
// TODO: will this work when missing a group?
gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)}) gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)})
if err != nil { if err != nil {
if apimeta.IsNoMatchError(err) { if apimeta.IsNoMatchError(err) {

View file

@ -20,7 +20,7 @@ import (
"sort" "sort"
"testing" "testing"
"github.com/directxman12/custom-metrics-boilerplate/pkg/provider" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
pmodel "github.com/prometheus/common/model" pmodel "github.com/prometheus/common/model"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"

View file

@ -20,10 +20,9 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/golang/glog" "github.com/golang/glog"
"net/http"
"time" "time"
"github.com/directxman12/custom-metrics-boilerplate/pkg/provider" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
pmodel "github.com/prometheus/common/model" pmodel "github.com/prometheus/common/model"
apierr "k8s.io/apimachinery/pkg/api/errors" apierr "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta" apimeta "k8s.io/apimachinery/pkg/api/meta"
@ -43,28 +42,6 @@ import (
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
) )
// newMetricNotFoundError returns a StatusError indicating the given metric could not be found.
// It is similar to NewNotFound, but more specialized
func newMetricNotFoundError(resource schema.GroupResource, metricName string) *apierr.StatusError {
return &apierr.StatusError{metav1.Status{
Status: metav1.StatusFailure,
Code: int32(http.StatusNotFound),
Reason: metav1.StatusReasonNotFound,
Message: fmt.Sprintf("the server could not find the metric %s for %s", metricName, resource.String()),
}}
}
// newMetricNotFoundForError returns a StatusError indicating the given metric could not be found for
// the given named object. It is similar to NewNotFound, but more specialized
func newMetricNotFoundForError(resource schema.GroupResource, metricName string, resourceName string) *apierr.StatusError {
return &apierr.StatusError{metav1.Status{
Status: metav1.StatusFailure,
Code: int32(http.StatusNotFound),
Reason: metav1.StatusReasonNotFound,
Message: fmt.Sprintf("the server could not find the metric %s for %s %s", metricName, resource.String(), resourceName),
}}
}
type prometheusProvider struct { type prometheusProvider struct {
mapper apimeta.RESTMapper mapper apimeta.RESTMapper
kubeClient dynamic.ClientPool kubeClient dynamic.ClientPool
@ -75,22 +52,21 @@ type prometheusProvider struct {
rateInterval time.Duration rateInterval time.Duration
} }
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration) provider.CustomMetricsProvider { func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider {
lister := &cachingMetricsLister{ lister := &cachingMetricsLister{
updateInterval: updateInterval, updateInterval: updateInterval,
promClient: promClient, promClient: promClient,
SeriesRegistry: &basicSeriesRegistry{ SeriesRegistry: &basicSeriesRegistry{
namer: metricNamer{ namer: metricNamer{
// TODO: populate this... // TODO: populate the overrides list
overrides: nil, overrides: nil,
mapper: mapper, mapper: mapper,
}, },
}, },
} }
// TODO: allow for RunUntil lister.RunUntil(stopChan)
lister.Run()
return &prometheusProvider{ return &prometheusProvider{
mapper: mapper, mapper: mapper,
@ -124,18 +100,15 @@ func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource s
func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.MetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) { func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.MetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) {
if !apimeta.IsListType(list) { if !apimeta.IsListType(list) {
// TODO: fix the error type here return nil, apierr.NewInternalError(fmt.Errorf("result of label selector list operation was not a list"))
return nil, fmt.Errorf("returned object was not a list")
} }
values, found := p.MatchValuesToNames(info, valueSet) values, found := p.MatchValuesToNames(info, valueSet)
if !found { if !found {
// TODO: throw error return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
} }
res := []custom_metrics.MetricValue{} res := []custom_metrics.MetricValue{}
// blech, EachListItem should pass an index --
// it's an implementation detail that it happens to be sequential
err := apimeta.EachListItem(list, func(item runtime.Object) error { err := apimeta.EachListItem(list, func(item runtime.Object) error {
objUnstructured := item.(*unstructured.Unstructured) objUnstructured := item.(*unstructured.Unstructured)
objName := objUnstructured.GetName() objName := objUnstructured.GetName()
@ -162,7 +135,7 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Me
func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace string, names ...string) (pmodel.Vector, error) { func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace string, names ...string) (pmodel.Vector, error) {
kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...) kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...)
if !found { if !found {
return nil, newMetricNotFoundError(info.GroupResource, info.Metric) return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
} }
fullQuery := baseQuery fullQuery := baseQuery
@ -174,7 +147,7 @@ func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace stri
fullQuery = prom.Selector(prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String()))) fullQuery = prom.Selector(prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String())))
} }
// TODO: too small of a rate interval will return no results... // NB: too small of a rate interval will return no results...
// sum over all other dimensions of this query (e.g. if we select on route, sum across all pods, // sum over all other dimensions of this query (e.g. if we select on route, sum across all pods,
// but if we select on pods, sum across all routes), and split by the dimension of our resource // but if we select on pods, sum across all routes), and split by the dimension of our resource
@ -184,7 +157,6 @@ func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace stri
// TODO: use an actual context // TODO: use an actual context
queryResults, err := p.promClient.Query(context.Background(), pmodel.Now(), fullQuery) queryResults, err := p.promClient.Query(context.Background(), pmodel.Now(), fullQuery)
if err != nil { if err != nil {
// TODO: interpret this somehow?
glog.Errorf("unable to fetch metrics from prometheus: %v", err) glog.Errorf("unable to fetch metrics from prometheus: %v", err)
// don't leak implementation details to the user // don't leak implementation details to the user
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
@ -205,47 +177,54 @@ func (p *prometheusProvider) getSingle(info provider.MetricInfo, namespace, name
} }
if len(queryResults) < 1 { if len(queryResults) < 1 {
return nil, newMetricNotFoundForError(info.GroupResource, info.Metric, name) return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name)
} }
// TODO: check if lenght of results > 1?
// TODO: check if our output name is the same as our input name namedValues, found := p.MatchValuesToNames(info, queryResults)
resultValue := queryResults[0].Value if !found {
return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
}
if len(namedValues) > 1 {
glog.V(2).Infof("Got more than one result (%v results) when fetching metric %s for %q, using the first one with a matching name...", len(queryResults), info.String(), name)
}
resultValue, nameFound := namedValues[name]
if !nameFound {
glog.Errorf("None of the results returned by when fetching metric %s for %q matched the resource name", info.String(), name)
return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name)
}
return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric) return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric)
} }
func (p *prometheusProvider) getMultiple(info provider.MetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) { func (p *prometheusProvider) getMultiple(info provider.MetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) {
// construct a client to list the names of objects matching the label selector // construct a client to list the names of objects matching the label selector
// TODO: figure out version?
client, err := p.kubeClient.ClientForGroupVersionResource(info.GroupResource.WithVersion("")) client, err := p.kubeClient.ClientForGroupVersionResource(info.GroupResource.WithVersion(""))
if err != nil { if err != nil {
glog.Errorf("unable to construct dynamic client to list matching resource names: %v", err) glog.Errorf("unable to construct dynamic client to list matching resource names: %v", err)
// TODO: check for resource not found error?
// don't leak implementation details to the user // don't leak implementation details to the user
return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources"))
} }
// we can construct a this APIResource ourself, since the dynamic client only uses Name and Namespaced // we can construct a this APIResource ourself, since the dynamic client only uses Name and Namespaced
// TODO: use discovery information instead
apiRes := &metav1.APIResource{ apiRes := &metav1.APIResource{
Name: info.GroupResource.Resource, Name: info.GroupResource.Resource,
Namespaced: info.Namespaced, Namespaced: info.Namespaced,
} }
// actually list the objects matching the label selector // actually list the objects matching the label selector
// TODO: work for objects not in core v1
matchingObjectsRaw, err := client.Resource(apiRes, namespace). matchingObjectsRaw, err := client.Resource(apiRes, namespace).
List(metav1.ListOptions{LabelSelector: selector.String()}) List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil { if err != nil {
glog.Errorf("unable to list matching resource names: %v", err) glog.Errorf("unable to list matching resource names: %v", err)
// TODO: check for resource not found error?
// don't leak implementation details to the user // don't leak implementation details to the user
return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources"))
} }
// make sure we have a list // make sure we have a list
if !apimeta.IsListType(matchingObjectsRaw) { if !apimeta.IsListType(matchingObjectsRaw) {
// TODO: fix the error type here return nil, apierr.NewInternalError(fmt.Errorf("result of label selector list operation was not a list"))
return nil, fmt.Errorf("returned object was not a list")
} }
// convert a list of objects into the corresponding list of names // convert a list of objects into the corresponding list of names
@ -310,19 +289,20 @@ type cachingMetricsLister struct {
} }
func (l *cachingMetricsLister) Run() { func (l *cachingMetricsLister) Run() {
go wait.Forever(func() { l.RunUntil(wait.NeverStop)
}
func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
go wait.Until(func() {
if err := l.updateMetrics(); err != nil { if err := l.updateMetrics(); err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
}, l.updateInterval) }, l.updateInterval, stopChan)
} }
func (l *cachingMetricsLister) updateMetrics() error { func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.updateInterval) startTime := pmodel.Now().Add(-1 * l.updateInterval)
// TODO: figure out a good way to add all Kubernetes-related metrics at once
// (i.e. how do we determine if something is a Kubernetes-related metric?)
// container-specific metrics from cAdvsior have their own form, and need special handling // container-specific metrics from cAdvsior have their own form, and need special handling
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))

View file

@ -23,7 +23,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/directxman12/custom-metrics-boilerplate/pkg/provider" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -90,11 +90,11 @@ func (c *fakePromClient) QueryRange(_ context.Context, r prom.Range, query prom.
return prom.QueryResult{}, nil return prom.QueryResult{}, nil
} }
func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fakePromClient) { func setupPrometheusProvider(t *testing.T, stopCh <-chan struct{}) (provider.CustomMetricsProvider, *fakePromClient) {
fakeProm := &fakePromClient{} fakeProm := &fakePromClient{}
fakeKubeClient := &fakedyn.FakeClientPool{} fakeKubeClient := &fakedyn.FakeClientPool{}
prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute) prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute, stopCh)
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))
@ -134,7 +134,9 @@ func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fak
func TestListAllMetrics(t *testing.T) { func TestListAllMetrics(t *testing.T) {
// setup // setup
prov, fakeProm := setupPrometheusProvider(t) stopCh := make(chan struct{})
defer close(stopCh)
prov, fakeProm := setupPrometheusProvider(t, stopCh)
// assume we have no updates // assume we have no updates
require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start") require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start")