From d1d60f7f5efa34b26fa7ff481f966cfd7ca8a18b Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Tue, 27 Jun 2017 19:50:08 -0400 Subject: [PATCH 1/9] Have provider constructor take a stop channel This causes the Prometheus provider to take a stop channel as an argument, which allows us to stop the lister (which keeps the series list up to date) in the unit tests. --- cmd/adapter/app/start.go | 2 +- pkg/custom-provider/provider.go | 13 ++++++++----- pkg/custom-provider/provider_test.go | 8 +++++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/cmd/adapter/app/start.go b/cmd/adapter/app/start.go index df007cd1..8c19ec56 100644 --- a/cmd/adapter/app/start.go +++ b/cmd/adapter/app/start.go @@ -129,7 +129,7 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) promClient := prom.NewClientForAPI(instrumentedGenericPromClient) - cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval) + cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval, stopCh) server, err := config.Complete().New(cmProvider) if err != nil { diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index a6357058..d46d38da 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -75,7 +75,7 @@ type prometheusProvider struct { rateInterval time.Duration } -func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration) provider.CustomMetricsProvider { +func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider { lister := &cachingMetricsLister{ updateInterval: updateInterval, promClient: promClient, @@ -89,8 +89,7 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP }, } - // TODO: allow for RunUntil - lister.Run() + lister.RunUntil(stopChan) return &prometheusProvider{ mapper: mapper, @@ -310,11 +309,15 @@ type cachingMetricsLister struct { } func (l *cachingMetricsLister) Run() { - go wait.Forever(func() { + l.RunUntil(wait.NeverStop) +} + +func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { + go wait.Until(func() { if err := l.updateMetrics(); err != nil { utilruntime.HandleError(err) } - }, l.updateInterval) + }, l.updateInterval, stopChan) } func (l *cachingMetricsLister) updateMetrics() error { diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go index 92ef15e1..27c6ceb3 100644 --- a/pkg/custom-provider/provider_test.go +++ b/pkg/custom-provider/provider_test.go @@ -90,11 +90,11 @@ func (c *fakePromClient) QueryRange(_ context.Context, r prom.Range, query prom. return prom.QueryResult{}, nil } -func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fakePromClient) { +func setupPrometheusProvider(t *testing.T, stopCh <-chan struct{}) (provider.CustomMetricsProvider, *fakePromClient) { fakeProm := &fakePromClient{} fakeKubeClient := &fakedyn.FakeClientPool{} - prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute) + prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute, stopCh) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) @@ -134,7 +134,9 @@ func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fak func TestListAllMetrics(t *testing.T) { // setup - prov, fakeProm := setupPrometheusProvider(t) + stopCh := make(chan struct{}) + defer close(stopCh) + prov, fakeProm := setupPrometheusProvider(t, stopCh) // assume we have no updates require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start") From 01755d5acb989e1f1d65811675d5a8929606cb36 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Tue, 27 Jun 2017 19:55:45 -0400 Subject: [PATCH 2/9] Extract timeout from context in Prom client For vector and range queries, the Prometheus HTTP API takes a timeout parameter. Previously, we ignored this parameter. Now, we check to see if the context passed to the query contains a deadline, and if so, compare that to the current time to calculate an appropriate timeout for query evaulation. --- pkg/client/api.go | 21 +++++++++++++++++---- pkg/client/interfaces.go | 4 +++- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pkg/client/api.go b/pkg/client/api.go index c79b68ac..1a5ef0aa 100644 --- a/pkg/client/api.go +++ b/pkg/client/api.go @@ -25,6 +25,7 @@ import ( "net/http" "net/url" "path" + "time" "github.com/golang/glog" "github.com/prometheus/common/model" @@ -54,7 +55,6 @@ func (c *httpAPIClient) Do(ctx context.Context, verb, endpoint string, query url u.RawQuery = query.Encode() req, err := http.NewRequest(verb, u.String(), nil) if err != nil { - // TODO: fix this to return Error? return APIResponse{}, fmt.Errorf("error constructing HTTP request to Prometheus: %v", err) } req.WithContext(ctx) @@ -96,7 +96,6 @@ func (c *httpAPIClient) Do(ctx context.Context, verb, endpoint string, query url var res APIResponse if err = json.NewDecoder(body).Decode(&res); err != nil { - // TODO: return what the body actually was? return APIResponse{}, &Error{ Type: ErrBadResponse, Msg: err.Error(), @@ -174,7 +173,9 @@ func (h *queryClient) Query(ctx context.Context, t model.Time, query Selector) ( if t != 0 { vals.Set("time", t.String()) } - // TODO: get timeout from context... + if timeout, hasTimeout := timeoutFromContext(ctx); hasTimeout { + vals.Set("timeout", model.Duration(timeout).String()) + } res, err := h.api.Do(ctx, "GET", queryURL, vals) if err != nil { @@ -199,7 +200,9 @@ func (h *queryClient) QueryRange(ctx context.Context, r Range, query Selector) ( if r.Step != 0 { vals.Set("step", model.Duration(r.Step).String()) } - // TODO: get timeout from context... + if timeout, hasTimeout := timeoutFromContext(ctx); hasTimeout { + vals.Set("timeout", model.Duration(timeout).String()) + } res, err := h.api.Do(ctx, "GET", queryRangeURL, vals) if err != nil { @@ -210,3 +213,13 @@ func (h *queryClient) QueryRange(ctx context.Context, r Range, query Selector) ( err = json.Unmarshal(res.Data, &queryRes) return queryRes, err } + +// timeoutFromContext checks the context for a deadline and calculates a "timeout" duration from it, +// when present +func timeoutFromContext(ctx context.Context) (time.Duration, bool) { + if deadline, hasDeadline := ctx.Deadline(); hasDeadline { + return time.Now().Sub(deadline), true + } + + return time.Duration(0), false +} diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index 5afc30d9..00ee720d 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -40,7 +40,9 @@ type Range struct { Step time.Duration } -// TODO: support timeout in the client? +// Client is a Prometheus client for the Prometheus HTTP API. +// The "timeout" parameter for the HTTP API is set based on the context's deadline, +// when present and applicable. type Client interface { // Series lists the time series matching the given series selectors Series(ctx context.Context, interval model.Interval, selectors ...Selector) ([]Series, error) From 823b8051c95f4a49c0209ee645354568be74fcf6 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Tue, 27 Jun 2017 19:59:39 -0400 Subject: [PATCH 3/9] Continue on error when processing series Previously, if we encountered an error while trying to update our series list, we'd return an error, aborting the processing of the entire batch. This could lead to the list of available metrics being severely out of date. Instead, we simply log an error when we fail to process a metric name, and skip it. --- pkg/custom-provider/metric_namer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index fe71d24f..f65a115e 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -86,13 +86,13 @@ func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error { } else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" { // TODO: handle metrics describing a namespace if err := r.namer.processNamespacedSeries(series, newInfo); err != nil { - // TODO: do we want to log this and continue, or abort? - return err + glog.Errorf("Unable to process namespaced series %q: %v", series.Name, err) + continue } } else { if err := r.namer.processRootScopedSeries(series, newInfo); err != nil { - // TODO: do we want to log this and continue, or abort? - return err + glog.Errorf("Unable to process root-scoped series %q: %v", series.Name, err) + continue } } } @@ -263,6 +263,7 @@ func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provi // processNamespacedSeries adds the metric info for the given generic namespaced series to // the map of metric info. func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { + // NB: all errors must occur *before* we save the series info name, metricKind := n.metricNameFromSeries(series) resources, err := n.groupResourcesFromSeries(series) if err != nil { @@ -294,6 +295,7 @@ func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[prov // processesRootScopedSeries adds the metric info for the given generic namespaced series to // the map of metric info. func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { + // NB: all errors must occur *before* we save the series info name, metricKind := n.metricNameFromSeries(series) resources, err := n.groupResourcesFromSeries(series) if err != nil { From 696fe9015acd3746acc589b64caeacac154ce4ea Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Tue, 27 Jun 2017 20:01:23 -0400 Subject: [PATCH 4/9] Clean up minor TODOs This commit cleans up some TODOs where were done or no longer applicable, and fixes a couple other minor TODOs, such as returning proper errors. --- pkg/custom-provider/metric_namer.go | 10 ++++------ pkg/custom-provider/provider.go | 15 ++------------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index f65a115e..e65799a7 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -84,7 +84,7 @@ func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error { if strings.HasPrefix(series.Name, "container_") { r.namer.processContainerSeries(series, newInfo) } else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" { - // TODO: handle metrics describing a namespace + // we also handle namespaced metrics here as part of the resource-association logic if err := r.namer.processNamespacedSeries(series, newInfo); err != nil { glog.Errorf("Unable to process namespaced series %q: %v", series.Name, err) continue @@ -123,7 +123,8 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, nam defer r.mu.RUnlock() if len(resourceNames) == 0 { - // TODO: return error? panic? + glog.Errorf("no resource names requested while producing a query for metric %s", metricInfo.String()) + return 0, "", "", false } metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) @@ -214,7 +215,7 @@ type seriesSpec struct { // normalizeInfo takes in some metricInfo an "normalizes" it to ensure a common GroupResource form. func (r *metricNamer) normalizeInfo(metricInfo provider.MetricInfo) (provider.MetricInfo, string, error) { // NB: we need to "normalize" the metricInfo's GroupResource so we have a consistent pluralization, etc - // TODO: move this to the boilerplate? + // TODO: move this to the boilerplate normalizedGroupRes, err := r.mapper.ResourceFor(metricInfo.GroupResource.WithVersion("")) if err != nil { return provider.MetricInfo{}, "", err @@ -247,7 +248,6 @@ func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provi } info := provider.MetricInfo{ - // TODO: is the plural correct? GroupResource: schema.GroupResource{Resource: "pods"}, Namespaced: true, Metric: name, @@ -325,11 +325,9 @@ func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[prov // would return three GroupResources: "pods", "services", and "ingresses". // Returned MetricInfo is equilavent to the "normalized" info produced by normalizeInfo. func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) { - // TODO: do we need to cache this, or is ResourceFor good enough? var res []schema.GroupResource for label := range series.Labels { // TODO: figure out a way to let people specify a fully-qualified name in label-form - // TODO: will this work when missing a group? gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)}) if err != nil { if apimeta.IsNoMatchError(err) { diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index d46d38da..c0e67bd3 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -82,7 +82,7 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP SeriesRegistry: &basicSeriesRegistry{ namer: metricNamer{ - // TODO: populate this... + // TODO: populate the overrides list overrides: nil, mapper: mapper, }, @@ -133,8 +133,6 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Me } res := []custom_metrics.MetricValue{} - // blech, EachListItem should pass an index -- - // it's an implementation detail that it happens to be sequential err := apimeta.EachListItem(list, func(item runtime.Object) error { objUnstructured := item.(*unstructured.Unstructured) objName := objUnstructured.GetName() @@ -173,7 +171,7 @@ func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace stri fullQuery = prom.Selector(prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String()))) } - // TODO: too small of a rate interval will return no results... + // NB: too small of a rate interval will return no results... // sum over all other dimensions of this query (e.g. if we select on route, sum across all pods, // but if we select on pods, sum across all routes), and split by the dimension of our resource @@ -183,7 +181,6 @@ func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace stri // TODO: use an actual context queryResults, err := p.promClient.Query(context.Background(), pmodel.Now(), fullQuery) if err != nil { - // TODO: interpret this somehow? glog.Errorf("unable to fetch metrics from prometheus: %v", err) // don't leak implementation details to the user return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) @@ -214,29 +211,24 @@ func (p *prometheusProvider) getSingle(info provider.MetricInfo, namespace, name func (p *prometheusProvider) getMultiple(info provider.MetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) { // construct a client to list the names of objects matching the label selector - // TODO: figure out version? client, err := p.kubeClient.ClientForGroupVersionResource(info.GroupResource.WithVersion("")) if err != nil { glog.Errorf("unable to construct dynamic client to list matching resource names: %v", err) - // TODO: check for resource not found error? // don't leak implementation details to the user return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) } // we can construct a this APIResource ourself, since the dynamic client only uses Name and Namespaced - // TODO: use discovery information instead apiRes := &metav1.APIResource{ Name: info.GroupResource.Resource, Namespaced: info.Namespaced, } // actually list the objects matching the label selector - // TODO: work for objects not in core v1 matchingObjectsRaw, err := client.Resource(apiRes, namespace). List(metav1.ListOptions{LabelSelector: selector.String()}) if err != nil { glog.Errorf("unable to list matching resource names: %v", err) - // TODO: check for resource not found error? // don't leak implementation details to the user return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) } @@ -323,9 +315,6 @@ func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { func (l *cachingMetricsLister) updateMetrics() error { startTime := pmodel.Now().Add(-1 * l.updateInterval) - // TODO: figure out a good way to add all Kubernetes-related metrics at once - // (i.e. how do we determine if something is a Kubernetes-related metric?) - // container-specific metrics from cAdvsior have their own form, and need special handling containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) From c912e1e3be1f8a294abce937c8e40b5da09c7b2a Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Tue, 27 Jun 2017 20:06:25 -0400 Subject: [PATCH 5/9] Return proper errors from provider All errors returned by the provider should be proper Kube API status errors. This commit cleans up a couple spots that either returned an invalid error, or were missing an error (such as the case in when no results matched the requested object). --- pkg/custom-provider/provider.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index c0e67bd3..75e9d107 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -123,13 +123,12 @@ func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource s func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.MetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) { if !apimeta.IsListType(list) { - // TODO: fix the error type here - return nil, fmt.Errorf("returned object was not a list") + return nil, apierr.NewInternalError(fmt.Errorf("result of label selector list operation was not a list")) } values, found := p.MatchValuesToNames(info, valueSet) if !found { - // TODO: throw error + return nil, newMetricNotFoundError(info.GroupResource, info.Metric) } res := []custom_metrics.MetricValue{} @@ -203,9 +202,22 @@ func (p *prometheusProvider) getSingle(info provider.MetricInfo, namespace, name if len(queryResults) < 1 { return nil, newMetricNotFoundForError(info.GroupResource, info.Metric, name) } - // TODO: check if lenght of results > 1? - // TODO: check if our output name is the same as our input name - resultValue := queryResults[0].Value + + namedValues, found := p.MatchValuesToNames(info, queryResults) + if !found { + return nil, newMetricNotFoundError(info.GroupResource, info.Metric) + } + + if len(namedValues) > 1 { + glog.V(2).Infof("Got more than one result (%v results) when fetching metric %s for %q, using the first one with a matching name...", len(queryResults), info.String(), name) + } + + resultValue, nameFound := namedValues[name] + if !nameFound { + glog.Errorf("None of the results returned by when fetching metric %s for %q matched the resource name", info.String(), name) + return nil, newMetricNotFoundForError(info.GroupResource, info.Metric, name) + } + return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric) } @@ -235,8 +247,7 @@ func (p *prometheusProvider) getMultiple(info provider.MetricInfo, namespace str // make sure we have a list if !apimeta.IsListType(matchingObjectsRaw) { - // TODO: fix the error type here - return nil, fmt.Errorf("returned object was not a list") + return nil, apierr.NewInternalError(fmt.Errorf("result of label selector list operation was not a list")) } // convert a list of objects into the corresponding list of names From 766cd07c3a96718b646e1e6aad4b9ee89aaac230 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Fri, 30 Jun 2017 17:45:21 -0400 Subject: [PATCH 6/9] Link to luxas's demo luxas has an excellent demo up, tying everything together. This adds a link (with a brief description) to that demo repo. --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index bbadbfc7..4bfc29b5 100644 --- a/README.md +++ b/README.md @@ -78,3 +78,19 @@ aggregated over all non-requested metrics. The adapter does not consider resources consumed by the "POD" container, which exists as part of all Kubernetes pods running in Docker simply supports the existance of the pod's shared network namespace. + +Example +------- + +@luxas has an excellent example deployment of Prometheus, this adapter, +and a demo pod which serves a metric `http_requests_total`, which becomes +the custom metrics API metric `pods/http_requests`. It also autoscales on +that metric using the `autoscaling/v2alpha1` HorizontalPodAutoscaler. + +It can be found at https://github.com/luxas/kubeadm-workshop. Pay special +attention to: + +- [Deploying the Prometheus + Operator](https://github.com/luxas/kubeadm-workshop#deploying-the-prometheus-operator-for-monitoring-services-in-the-cluster) +- [Setting up the custom metrics adapter and sample + app](https://github.com/luxas/kubeadm-workshop#deploying-a-custom-metrics-api-server-and-a-sample-app) From 703e9ecf099314983e8b0736f0bd20c67ff44c0d Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Wed, 2 Aug 2017 15:20:19 -0400 Subject: [PATCH 7/9] Use new kubenetes-incubator version of boilerplate The boilerplate moved to github.com/kuberntes-incubator/custom-metrics-apiserver, so we should use that now. --- cmd/adapter/app/start.go | 2 +- glide.lock | 20 ++++++++++---------- glide.yaml | 2 +- pkg/custom-provider/metric_namer.go | 2 +- pkg/custom-provider/metric_namer_test.go | 2 +- pkg/custom-provider/provider.go | 2 +- pkg/custom-provider/provider_test.go | 2 +- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cmd/adapter/app/start.go b/cmd/adapter/app/start.go index 8c19ec56..a1464f07 100644 --- a/cmd/adapter/app/start.go +++ b/cmd/adapter/app/start.go @@ -30,10 +30,10 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "github.com/directxman12/custom-metrics-boilerplate/pkg/cmd/server" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics" cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/server" ) // NewCommandStartPrometheusAdapterServer provides a CLI handler for 'start master' command diff --git a/glide.lock b/glide.lock index fe4f1ea8..d389ed74 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 4432fd0d13b3a79f1febb9040cad9576793e44ab1c4d6e40abc664ad0582999f -updated: 2017-06-27T18:59:14.961201169-04:00 +hash: 815ac74b9c61bedb6cc960ca8b9de13ff7584a66f1a0f9fdb80c5c49986a10cf +updated: 2017-08-02T15:16:48.029828608-04:00 imports: - name: bitbucket.org/ww/goautoneg version: 75cd24fc2f2c2a2088577d12123ddee5f54e0675 @@ -82,14 +82,6 @@ imports: version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d subpackages: - spew -- name: github.com/directxman12/custom-metrics-boilerplate - version: 066dd03155e1cdb3dfa513f8bdcb3a30edda1137 - subpackages: - - pkg/apiserver - - pkg/apiserver/installer - - pkg/cmd/server - - pkg/provider - - pkg/registry/custom_metrics - name: github.com/docker/distribution version: cd27f179f2c10c5d300e6d09025b538c475b0d51 subpackages: @@ -163,6 +155,14 @@ imports: version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 - name: github.com/juju/ratelimit version: 77ed1c8a01217656d2080ad51981f6e99adaa177 +- name: github.com/kubernetes-incubator/custom-metrics-apiserver + version: e17b36ca6ebbec8acbc76d729d24058a09d042ee + subpackages: + - pkg/apiserver + - pkg/apiserver/installer + - pkg/cmd/server + - pkg/provider + - pkg/registry/custom_metrics - name: github.com/mailru/easyjson version: d5b7844b561a7bc640052f1b935f7b800330d7e0 subpackages: diff --git a/glide.yaml b/glide.yaml index 8e80033c..03b2e1e3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -14,7 +14,7 @@ import: - kubernetes/typed/core/v1 - rest - tools/clientcmd -- package: github.com/directxman12/custom-metrics-boilerplate +- package: github.com/kubernetes-incubator/custom-metrics-apiserver subpackages: - pkg/cmd/server - pkg/provider diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index e65799a7..17f29d6f 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -21,7 +21,7 @@ import ( "strings" "sync" - "github.com/directxman12/custom-metrics-boilerplate/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" diff --git a/pkg/custom-provider/metric_namer_test.go b/pkg/custom-provider/metric_namer_test.go index 001d743a..c185e8bb 100644 --- a/pkg/custom-provider/metric_namer_test.go +++ b/pkg/custom-provider/metric_namer_test.go @@ -20,7 +20,7 @@ import ( "sort" "testing" - "github.com/directxman12/custom-metrics-boilerplate/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" pmodel "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 75e9d107..2aa96f49 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -23,7 +23,7 @@ import ( "net/http" "time" - "github.com/directxman12/custom-metrics-boilerplate/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" pmodel "github.com/prometheus/common/model" apierr "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go index 27c6ceb3..c63211ea 100644 --- a/pkg/custom-provider/provider_test.go +++ b/pkg/custom-provider/provider_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "github.com/directxman12/custom-metrics-boilerplate/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime/schema" From 8e5cb77e7f789e17ee4d1f1b1f29a4ec391b3e93 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Fri, 30 Jun 2017 16:58:49 -0400 Subject: [PATCH 8/9] Use periodically updating RESTMapper Currently, we fetch the discovery information once at the start of of the adapter, and then never update it. This could prove problematic, since other API servers might come only after the adapter is started up. The boilerplate contains a periodically updating RESTMapper that solves this issue by refreshing the discovery information at an interval (as specified by a flag), so that we have a chance to fetch new resources. --- cmd/adapter/app/start.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cmd/adapter/app/start.go b/cmd/adapter/app/start.go index a1464f07..f9c64a4c 100644 --- a/cmd/adapter/app/start.go +++ b/cmd/adapter/app/start.go @@ -34,6 +34,7 @@ import ( mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics" cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/server" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/dynamicmapper" ) // NewCommandStartPrometheusAdapterServer provides a CLI handler for 'start master' command @@ -44,6 +45,7 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan MetricsRelistInterval: 10 * time.Minute, RateInterval: 5 * time.Minute, PrometheusURL: "https://localhost", + DiscoveryInterval: 10 * time.Minute, } cmd := &cobra.Command{ @@ -76,6 +78,8 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan "interval at which to re-list the set of all available metrics from Prometheus") flags.DurationVar(&o.RateInterval, "rate-interval", o.RateInterval, ""+ "period of time used to calculate rate metrics from cumulative metrics") + flags.DurationVar(&o.DiscoveryInterval, "discovery-interval", o.DiscoveryInterval, ""+ + "interval at which to refresh API discovery information") flags.StringVar(&o.PrometheusURL, "prometheus-url", o.PrometheusURL, "URL and configuration for connecting to Prometheus. Query parameters are used to configure the connection") @@ -108,12 +112,10 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c return fmt.Errorf("unable to construct discovery client for dynamic client: %v", err) } - // TODO: this needs to refresh it's discovery info every once and a while - resources, err := discovery.GetAPIGroupResources(discoveryClient) + dynamicMapper, err := dynamicmapper.NewRESTMapper(discoveryClient, api.Registry.InterfacesFor, o.DiscoveryInterval) if err != nil { - return fmt.Errorf("unable to construct discovery REST mapper: unable to fetch list of resources: %v", err) + return fmt.Errorf("unable to construct dynamic discovery mapper: %v", err) } - dynamicMapper := discovery.NewRESTMapper(resources, api.Registry.InterfacesFor) clientPool := dynamic.NewClientPool(clientConfig, dynamicMapper, dynamic.LegacyAPIPathResolverFunc) if err != nil { @@ -147,6 +149,8 @@ type PrometheusAdapterServerOptions struct { MetricsRelistInterval time.Duration // RateInterval is the period of time used to calculate rate metrics RateInterval time.Duration + // DiscoveryInterval is the interval at which discovery information is refreshed + DiscoveryInterval time.Duration // PrometheusURL is the URL describing how to connect to Prometheus. Query parameters configure connection options. PrometheusURL string } From 58a9769eaabceabb060ca9c78d274de176ae6777 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Wed, 2 Aug 2017 15:48:12 -0400 Subject: [PATCH 9/9] Refactor to remove duplicate code from boilerplate This commit switches to using the boilerplate versions of a couple different utilities, including the metric info normalization and the common error types. --- pkg/custom-provider/metric_namer.go | 24 +++------------------ pkg/custom-provider/provider.go | 33 +++++------------------------ 2 files changed, 8 insertions(+), 49 deletions(-) diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index 17f29d6f..9fa031b2 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -127,7 +127,7 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, nam return 0, "", "", false } - metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) + metricInfo, singularResource, err := metricInfo.Normalized(r.namer.mapper) if err != nil { glog.Errorf("unable to normalize group resource while producing a query: %v", err) return 0, "", "", false @@ -167,7 +167,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo, r.mu.RLock() defer r.mu.RUnlock() - metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) + metricInfo, singularResource, err := metricInfo.Normalized(r.namer.mapper) if err != nil { glog.Errorf("unable to normalize group resource while matching values to names: %v", err) return nil, false @@ -212,24 +212,6 @@ type seriesSpec struct { kind SeriesType } -// normalizeInfo takes in some metricInfo an "normalizes" it to ensure a common GroupResource form. -func (r *metricNamer) normalizeInfo(metricInfo provider.MetricInfo) (provider.MetricInfo, string, error) { - // NB: we need to "normalize" the metricInfo's GroupResource so we have a consistent pluralization, etc - // TODO: move this to the boilerplate - normalizedGroupRes, err := r.mapper.ResourceFor(metricInfo.GroupResource.WithVersion("")) - if err != nil { - return provider.MetricInfo{}, "", err - } - metricInfo.GroupResource = normalizedGroupRes.GroupResource() - - singularResource, err := r.mapper.ResourceSingularizer(metricInfo.GroupResource.Resource) - if err != nil { - return provider.MetricInfo{}, "", err - } - - return metricInfo, singularResource, nil -} - // processContainerSeries performs special work to extract metric definitions // from cAdvisor-sourced container metrics, which don't particularly follow any useful conventions consistently. func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) { @@ -323,7 +305,7 @@ func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[prov // going through each label, checking to see if it corresponds to a known resource. For instance, // a series `ingress_http_hits_total{pod="foo",service="bar",ingress="baz",namespace="ns"}` // would return three GroupResources: "pods", "services", and "ingresses". -// Returned MetricInfo is equilavent to the "normalized" info produced by normalizeInfo. +// Returned MetricInfo is equilavent to the "normalized" info produced by metricInfo.Normalized. func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) { var res []schema.GroupResource for label := range series.Labels { diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 2aa96f49..2a75486d 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "github.com/golang/glog" - "net/http" "time" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" @@ -43,28 +42,6 @@ import ( prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" ) -// newMetricNotFoundError returns a StatusError indicating the given metric could not be found. -// It is similar to NewNotFound, but more specialized -func newMetricNotFoundError(resource schema.GroupResource, metricName string) *apierr.StatusError { - return &apierr.StatusError{metav1.Status{ - Status: metav1.StatusFailure, - Code: int32(http.StatusNotFound), - Reason: metav1.StatusReasonNotFound, - Message: fmt.Sprintf("the server could not find the metric %s for %s", metricName, resource.String()), - }} -} - -// newMetricNotFoundForError returns a StatusError indicating the given metric could not be found for -// the given named object. It is similar to NewNotFound, but more specialized -func newMetricNotFoundForError(resource schema.GroupResource, metricName string, resourceName string) *apierr.StatusError { - return &apierr.StatusError{metav1.Status{ - Status: metav1.StatusFailure, - Code: int32(http.StatusNotFound), - Reason: metav1.StatusReasonNotFound, - Message: fmt.Sprintf("the server could not find the metric %s for %s %s", metricName, resource.String(), resourceName), - }} -} - type prometheusProvider struct { mapper apimeta.RESTMapper kubeClient dynamic.ClientPool @@ -128,7 +105,7 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Me values, found := p.MatchValuesToNames(info, valueSet) if !found { - return nil, newMetricNotFoundError(info.GroupResource, info.Metric) + return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } res := []custom_metrics.MetricValue{} @@ -158,7 +135,7 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Me func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace string, names ...string) (pmodel.Vector, error) { kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...) if !found { - return nil, newMetricNotFoundError(info.GroupResource, info.Metric) + return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } fullQuery := baseQuery @@ -200,12 +177,12 @@ func (p *prometheusProvider) getSingle(info provider.MetricInfo, namespace, name } if len(queryResults) < 1 { - return nil, newMetricNotFoundForError(info.GroupResource, info.Metric, name) + return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name) } namedValues, found := p.MatchValuesToNames(info, queryResults) if !found { - return nil, newMetricNotFoundError(info.GroupResource, info.Metric) + return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } if len(namedValues) > 1 { @@ -215,7 +192,7 @@ func (p *prometheusProvider) getSingle(info provider.MetricInfo, namespace, name resultValue, nameFound := namedValues[name] if !nameFound { glog.Errorf("None of the results returned by when fetching metric %s for %q matched the resource name", info.String(), name) - return nil, newMetricNotFoundForError(info.GroupResource, info.Metric, name) + return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name) } return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric)