diff --git a/README.md b/README.md index bbadbfc7..4bfc29b5 100644 --- a/README.md +++ b/README.md @@ -78,3 +78,19 @@ aggregated over all non-requested metrics. The adapter does not consider resources consumed by the "POD" container, which exists as part of all Kubernetes pods running in Docker simply supports the existance of the pod's shared network namespace. + +Example +------- + +@luxas has an excellent example deployment of Prometheus, this adapter, +and a demo pod which serves a metric `http_requests_total`, which becomes +the custom metrics API metric `pods/http_requests`. It also autoscales on +that metric using the `autoscaling/v2alpha1` HorizontalPodAutoscaler. + +It can be found at https://github.com/luxas/kubeadm-workshop. Pay special +attention to: + +- [Deploying the Prometheus + Operator](https://github.com/luxas/kubeadm-workshop#deploying-the-prometheus-operator-for-monitoring-services-in-the-cluster) +- [Setting up the custom metrics adapter and sample + app](https://github.com/luxas/kubeadm-workshop#deploying-a-custom-metrics-api-server-and-a-sample-app) diff --git a/cmd/adapter/app/start.go b/cmd/adapter/app/start.go index df007cd1..f9c64a4c 100644 --- a/cmd/adapter/app/start.go +++ b/cmd/adapter/app/start.go @@ -30,10 +30,11 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "github.com/directxman12/custom-metrics-boilerplate/pkg/cmd/server" prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics" cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/server" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/dynamicmapper" ) // NewCommandStartPrometheusAdapterServer provides a CLI handler for 'start master' command @@ -44,6 +45,7 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan MetricsRelistInterval: 10 * time.Minute, RateInterval: 5 * time.Minute, PrometheusURL: "https://localhost", + DiscoveryInterval: 10 * time.Minute, } cmd := &cobra.Command{ @@ -76,6 +78,8 @@ func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan "interval at which to re-list the set of all available metrics from Prometheus") flags.DurationVar(&o.RateInterval, "rate-interval", o.RateInterval, ""+ "period of time used to calculate rate metrics from cumulative metrics") + flags.DurationVar(&o.DiscoveryInterval, "discovery-interval", o.DiscoveryInterval, ""+ + "interval at which to refresh API discovery information") flags.StringVar(&o.PrometheusURL, "prometheus-url", o.PrometheusURL, "URL and configuration for connecting to Prometheus. Query parameters are used to configure the connection") @@ -108,12 +112,10 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c return fmt.Errorf("unable to construct discovery client for dynamic client: %v", err) } - // TODO: this needs to refresh it's discovery info every once and a while - resources, err := discovery.GetAPIGroupResources(discoveryClient) + dynamicMapper, err := dynamicmapper.NewRESTMapper(discoveryClient, api.Registry.InterfacesFor, o.DiscoveryInterval) if err != nil { - return fmt.Errorf("unable to construct discovery REST mapper: unable to fetch list of resources: %v", err) + return fmt.Errorf("unable to construct dynamic discovery mapper: %v", err) } - dynamicMapper := discovery.NewRESTMapper(resources, api.Registry.InterfacesFor) clientPool := dynamic.NewClientPool(clientConfig, dynamicMapper, dynamic.LegacyAPIPathResolverFunc) if err != nil { @@ -129,7 +131,7 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) promClient := prom.NewClientForAPI(instrumentedGenericPromClient) - cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval) + cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval, stopCh) server, err := config.Complete().New(cmProvider) if err != nil { @@ -147,6 +149,8 @@ type PrometheusAdapterServerOptions struct { MetricsRelistInterval time.Duration // RateInterval is the period of time used to calculate rate metrics RateInterval time.Duration + // DiscoveryInterval is the interval at which discovery information is refreshed + DiscoveryInterval time.Duration // PrometheusURL is the URL describing how to connect to Prometheus. Query parameters configure connection options. PrometheusURL string } diff --git a/glide.lock b/glide.lock index fe4f1ea8..d389ed74 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 4432fd0d13b3a79f1febb9040cad9576793e44ab1c4d6e40abc664ad0582999f -updated: 2017-06-27T18:59:14.961201169-04:00 +hash: 815ac74b9c61bedb6cc960ca8b9de13ff7584a66f1a0f9fdb80c5c49986a10cf +updated: 2017-08-02T15:16:48.029828608-04:00 imports: - name: bitbucket.org/ww/goautoneg version: 75cd24fc2f2c2a2088577d12123ddee5f54e0675 @@ -82,14 +82,6 @@ imports: version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d subpackages: - spew -- name: github.com/directxman12/custom-metrics-boilerplate - version: 066dd03155e1cdb3dfa513f8bdcb3a30edda1137 - subpackages: - - pkg/apiserver - - pkg/apiserver/installer - - pkg/cmd/server - - pkg/provider - - pkg/registry/custom_metrics - name: github.com/docker/distribution version: cd27f179f2c10c5d300e6d09025b538c475b0d51 subpackages: @@ -163,6 +155,14 @@ imports: version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 - name: github.com/juju/ratelimit version: 77ed1c8a01217656d2080ad51981f6e99adaa177 +- name: github.com/kubernetes-incubator/custom-metrics-apiserver + version: e17b36ca6ebbec8acbc76d729d24058a09d042ee + subpackages: + - pkg/apiserver + - pkg/apiserver/installer + - pkg/cmd/server + - pkg/provider + - pkg/registry/custom_metrics - name: github.com/mailru/easyjson version: d5b7844b561a7bc640052f1b935f7b800330d7e0 subpackages: diff --git a/glide.yaml b/glide.yaml index 8e80033c..03b2e1e3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -14,7 +14,7 @@ import: - kubernetes/typed/core/v1 - rest - tools/clientcmd -- package: github.com/directxman12/custom-metrics-boilerplate +- package: github.com/kubernetes-incubator/custom-metrics-apiserver subpackages: - pkg/cmd/server - pkg/provider diff --git a/pkg/client/api.go b/pkg/client/api.go index c79b68ac..1a5ef0aa 100644 --- a/pkg/client/api.go +++ b/pkg/client/api.go @@ -25,6 +25,7 @@ import ( "net/http" "net/url" "path" + "time" "github.com/golang/glog" "github.com/prometheus/common/model" @@ -54,7 +55,6 @@ func (c *httpAPIClient) Do(ctx context.Context, verb, endpoint string, query url u.RawQuery = query.Encode() req, err := http.NewRequest(verb, u.String(), nil) if err != nil { - // TODO: fix this to return Error? return APIResponse{}, fmt.Errorf("error constructing HTTP request to Prometheus: %v", err) } req.WithContext(ctx) @@ -96,7 +96,6 @@ func (c *httpAPIClient) Do(ctx context.Context, verb, endpoint string, query url var res APIResponse if err = json.NewDecoder(body).Decode(&res); err != nil { - // TODO: return what the body actually was? return APIResponse{}, &Error{ Type: ErrBadResponse, Msg: err.Error(), @@ -174,7 +173,9 @@ func (h *queryClient) Query(ctx context.Context, t model.Time, query Selector) ( if t != 0 { vals.Set("time", t.String()) } - // TODO: get timeout from context... + if timeout, hasTimeout := timeoutFromContext(ctx); hasTimeout { + vals.Set("timeout", model.Duration(timeout).String()) + } res, err := h.api.Do(ctx, "GET", queryURL, vals) if err != nil { @@ -199,7 +200,9 @@ func (h *queryClient) QueryRange(ctx context.Context, r Range, query Selector) ( if r.Step != 0 { vals.Set("step", model.Duration(r.Step).String()) } - // TODO: get timeout from context... + if timeout, hasTimeout := timeoutFromContext(ctx); hasTimeout { + vals.Set("timeout", model.Duration(timeout).String()) + } res, err := h.api.Do(ctx, "GET", queryRangeURL, vals) if err != nil { @@ -210,3 +213,13 @@ func (h *queryClient) QueryRange(ctx context.Context, r Range, query Selector) ( err = json.Unmarshal(res.Data, &queryRes) return queryRes, err } + +// timeoutFromContext checks the context for a deadline and calculates a "timeout" duration from it, +// when present +func timeoutFromContext(ctx context.Context) (time.Duration, bool) { + if deadline, hasDeadline := ctx.Deadline(); hasDeadline { + return time.Now().Sub(deadline), true + } + + return time.Duration(0), false +} diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index 5afc30d9..00ee720d 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -40,7 +40,9 @@ type Range struct { Step time.Duration } -// TODO: support timeout in the client? +// Client is a Prometheus client for the Prometheus HTTP API. +// The "timeout" parameter for the HTTP API is set based on the context's deadline, +// when present and applicable. type Client interface { // Series lists the time series matching the given series selectors Series(ctx context.Context, interval model.Interval, selectors ...Selector) ([]Series, error) diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go index fe71d24f..9fa031b2 100644 --- a/pkg/custom-provider/metric_namer.go +++ b/pkg/custom-provider/metric_namer.go @@ -21,7 +21,7 @@ import ( "strings" "sync" - "github.com/directxman12/custom-metrics-boilerplate/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" @@ -84,15 +84,15 @@ func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error { if strings.HasPrefix(series.Name, "container_") { r.namer.processContainerSeries(series, newInfo) } else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" { - // TODO: handle metrics describing a namespace + // we also handle namespaced metrics here as part of the resource-association logic if err := r.namer.processNamespacedSeries(series, newInfo); err != nil { - // TODO: do we want to log this and continue, or abort? - return err + glog.Errorf("Unable to process namespaced series %q: %v", series.Name, err) + continue } } else { if err := r.namer.processRootScopedSeries(series, newInfo); err != nil { - // TODO: do we want to log this and continue, or abort? - return err + glog.Errorf("Unable to process root-scoped series %q: %v", series.Name, err) + continue } } } @@ -123,10 +123,11 @@ func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, nam defer r.mu.RUnlock() if len(resourceNames) == 0 { - // TODO: return error? panic? + glog.Errorf("no resource names requested while producing a query for metric %s", metricInfo.String()) + return 0, "", "", false } - metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) + metricInfo, singularResource, err := metricInfo.Normalized(r.namer.mapper) if err != nil { glog.Errorf("unable to normalize group resource while producing a query: %v", err) return 0, "", "", false @@ -166,7 +167,7 @@ func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo, r.mu.RLock() defer r.mu.RUnlock() - metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) + metricInfo, singularResource, err := metricInfo.Normalized(r.namer.mapper) if err != nil { glog.Errorf("unable to normalize group resource while matching values to names: %v", err) return nil, false @@ -211,24 +212,6 @@ type seriesSpec struct { kind SeriesType } -// normalizeInfo takes in some metricInfo an "normalizes" it to ensure a common GroupResource form. -func (r *metricNamer) normalizeInfo(metricInfo provider.MetricInfo) (provider.MetricInfo, string, error) { - // NB: we need to "normalize" the metricInfo's GroupResource so we have a consistent pluralization, etc - // TODO: move this to the boilerplate? - normalizedGroupRes, err := r.mapper.ResourceFor(metricInfo.GroupResource.WithVersion("")) - if err != nil { - return provider.MetricInfo{}, "", err - } - metricInfo.GroupResource = normalizedGroupRes.GroupResource() - - singularResource, err := r.mapper.ResourceSingularizer(metricInfo.GroupResource.Resource) - if err != nil { - return provider.MetricInfo{}, "", err - } - - return metricInfo, singularResource, nil -} - // processContainerSeries performs special work to extract metric definitions // from cAdvisor-sourced container metrics, which don't particularly follow any useful conventions consistently. func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) { @@ -247,7 +230,6 @@ func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provi } info := provider.MetricInfo{ - // TODO: is the plural correct? GroupResource: schema.GroupResource{Resource: "pods"}, Namespaced: true, Metric: name, @@ -263,6 +245,7 @@ func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provi // processNamespacedSeries adds the metric info for the given generic namespaced series to // the map of metric info. func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { + // NB: all errors must occur *before* we save the series info name, metricKind := n.metricNameFromSeries(series) resources, err := n.groupResourcesFromSeries(series) if err != nil { @@ -294,6 +277,7 @@ func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[prov // processesRootScopedSeries adds the metric info for the given generic namespaced series to // the map of metric info. func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { + // NB: all errors must occur *before* we save the series info name, metricKind := n.metricNameFromSeries(series) resources, err := n.groupResourcesFromSeries(series) if err != nil { @@ -321,13 +305,11 @@ func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[prov // going through each label, checking to see if it corresponds to a known resource. For instance, // a series `ingress_http_hits_total{pod="foo",service="bar",ingress="baz",namespace="ns"}` // would return three GroupResources: "pods", "services", and "ingresses". -// Returned MetricInfo is equilavent to the "normalized" info produced by normalizeInfo. +// Returned MetricInfo is equilavent to the "normalized" info produced by metricInfo.Normalized. func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) { - // TODO: do we need to cache this, or is ResourceFor good enough? var res []schema.GroupResource for label := range series.Labels { // TODO: figure out a way to let people specify a fully-qualified name in label-form - // TODO: will this work when missing a group? gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)}) if err != nil { if apimeta.IsNoMatchError(err) { diff --git a/pkg/custom-provider/metric_namer_test.go b/pkg/custom-provider/metric_namer_test.go index 001d743a..c185e8bb 100644 --- a/pkg/custom-provider/metric_namer_test.go +++ b/pkg/custom-provider/metric_namer_test.go @@ -20,7 +20,7 @@ import ( "sort" "testing" - "github.com/directxman12/custom-metrics-boilerplate/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" pmodel "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index a6357058..2a75486d 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -20,10 +20,9 @@ import ( "context" "fmt" "github.com/golang/glog" - "net/http" "time" - "github.com/directxman12/custom-metrics-boilerplate/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" pmodel "github.com/prometheus/common/model" apierr "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -43,28 +42,6 @@ import ( prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" ) -// newMetricNotFoundError returns a StatusError indicating the given metric could not be found. -// It is similar to NewNotFound, but more specialized -func newMetricNotFoundError(resource schema.GroupResource, metricName string) *apierr.StatusError { - return &apierr.StatusError{metav1.Status{ - Status: metav1.StatusFailure, - Code: int32(http.StatusNotFound), - Reason: metav1.StatusReasonNotFound, - Message: fmt.Sprintf("the server could not find the metric %s for %s", metricName, resource.String()), - }} -} - -// newMetricNotFoundForError returns a StatusError indicating the given metric could not be found for -// the given named object. It is similar to NewNotFound, but more specialized -func newMetricNotFoundForError(resource schema.GroupResource, metricName string, resourceName string) *apierr.StatusError { - return &apierr.StatusError{metav1.Status{ - Status: metav1.StatusFailure, - Code: int32(http.StatusNotFound), - Reason: metav1.StatusReasonNotFound, - Message: fmt.Sprintf("the server could not find the metric %s for %s %s", metricName, resource.String(), resourceName), - }} -} - type prometheusProvider struct { mapper apimeta.RESTMapper kubeClient dynamic.ClientPool @@ -75,22 +52,21 @@ type prometheusProvider struct { rateInterval time.Duration } -func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration) provider.CustomMetricsProvider { +func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider { lister := &cachingMetricsLister{ updateInterval: updateInterval, promClient: promClient, SeriesRegistry: &basicSeriesRegistry{ namer: metricNamer{ - // TODO: populate this... + // TODO: populate the overrides list overrides: nil, mapper: mapper, }, }, } - // TODO: allow for RunUntil - lister.Run() + lister.RunUntil(stopChan) return &prometheusProvider{ mapper: mapper, @@ -124,18 +100,15 @@ func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource s func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.MetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) { if !apimeta.IsListType(list) { - // TODO: fix the error type here - return nil, fmt.Errorf("returned object was not a list") + return nil, apierr.NewInternalError(fmt.Errorf("result of label selector list operation was not a list")) } values, found := p.MatchValuesToNames(info, valueSet) if !found { - // TODO: throw error + return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } res := []custom_metrics.MetricValue{} - // blech, EachListItem should pass an index -- - // it's an implementation detail that it happens to be sequential err := apimeta.EachListItem(list, func(item runtime.Object) error { objUnstructured := item.(*unstructured.Unstructured) objName := objUnstructured.GetName() @@ -162,7 +135,7 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Me func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace string, names ...string) (pmodel.Vector, error) { kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...) if !found { - return nil, newMetricNotFoundError(info.GroupResource, info.Metric) + return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } fullQuery := baseQuery @@ -174,7 +147,7 @@ func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace stri fullQuery = prom.Selector(prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String()))) } - // TODO: too small of a rate interval will return no results... + // NB: too small of a rate interval will return no results... // sum over all other dimensions of this query (e.g. if we select on route, sum across all pods, // but if we select on pods, sum across all routes), and split by the dimension of our resource @@ -184,7 +157,6 @@ func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace stri // TODO: use an actual context queryResults, err := p.promClient.Query(context.Background(), pmodel.Now(), fullQuery) if err != nil { - // TODO: interpret this somehow? glog.Errorf("unable to fetch metrics from prometheus: %v", err) // don't leak implementation details to the user return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) @@ -205,47 +177,54 @@ func (p *prometheusProvider) getSingle(info provider.MetricInfo, namespace, name } if len(queryResults) < 1 { - return nil, newMetricNotFoundForError(info.GroupResource, info.Metric, name) + return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name) } - // TODO: check if lenght of results > 1? - // TODO: check if our output name is the same as our input name - resultValue := queryResults[0].Value + + namedValues, found := p.MatchValuesToNames(info, queryResults) + if !found { + return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) + } + + if len(namedValues) > 1 { + glog.V(2).Infof("Got more than one result (%v results) when fetching metric %s for %q, using the first one with a matching name...", len(queryResults), info.String(), name) + } + + resultValue, nameFound := namedValues[name] + if !nameFound { + glog.Errorf("None of the results returned by when fetching metric %s for %q matched the resource name", info.String(), name) + return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name) + } + return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric) } func (p *prometheusProvider) getMultiple(info provider.MetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) { // construct a client to list the names of objects matching the label selector - // TODO: figure out version? client, err := p.kubeClient.ClientForGroupVersionResource(info.GroupResource.WithVersion("")) if err != nil { glog.Errorf("unable to construct dynamic client to list matching resource names: %v", err) - // TODO: check for resource not found error? // don't leak implementation details to the user return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) } // we can construct a this APIResource ourself, since the dynamic client only uses Name and Namespaced - // TODO: use discovery information instead apiRes := &metav1.APIResource{ Name: info.GroupResource.Resource, Namespaced: info.Namespaced, } // actually list the objects matching the label selector - // TODO: work for objects not in core v1 matchingObjectsRaw, err := client.Resource(apiRes, namespace). List(metav1.ListOptions{LabelSelector: selector.String()}) if err != nil { glog.Errorf("unable to list matching resource names: %v", err) - // TODO: check for resource not found error? // don't leak implementation details to the user return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) } // make sure we have a list if !apimeta.IsListType(matchingObjectsRaw) { - // TODO: fix the error type here - return nil, fmt.Errorf("returned object was not a list") + return nil, apierr.NewInternalError(fmt.Errorf("result of label selector list operation was not a list")) } // convert a list of objects into the corresponding list of names @@ -310,19 +289,20 @@ type cachingMetricsLister struct { } func (l *cachingMetricsLister) Run() { - go wait.Forever(func() { + l.RunUntil(wait.NeverStop) +} + +func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { + go wait.Until(func() { if err := l.updateMetrics(); err != nil { utilruntime.HandleError(err) } - }, l.updateInterval) + }, l.updateInterval, stopChan) } func (l *cachingMetricsLister) updateMetrics() error { startTime := pmodel.Now().Add(-1 * l.updateInterval) - // TODO: figure out a good way to add all Kubernetes-related metrics at once - // (i.e. how do we determine if something is a Kubernetes-related metric?) - // container-specific metrics from cAdvsior have their own form, and need special handling containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go index 92ef15e1..c63211ea 100644 --- a/pkg/custom-provider/provider_test.go +++ b/pkg/custom-provider/provider_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "github.com/directxman12/custom-metrics-boilerplate/pkg/provider" + "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime/schema" @@ -90,11 +90,11 @@ func (c *fakePromClient) QueryRange(_ context.Context, r prom.Range, query prom. return prom.QueryResult{}, nil } -func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fakePromClient) { +func setupPrometheusProvider(t *testing.T, stopCh <-chan struct{}) (provider.CustomMetricsProvider, *fakePromClient) { fakeProm := &fakePromClient{} fakeKubeClient := &fakedyn.FakeClientPool{} - prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute) + prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute, stopCh) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) @@ -134,7 +134,9 @@ func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fak func TestListAllMetrics(t *testing.T) { // setup - prov, fakeProm := setupPrometheusProvider(t) + stopCh := make(chan struct{}) + defer close(stopCh) + prov, fakeProm := setupPrometheusProvider(t, stopCh) // assume we have no updates require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start")