Have provider constructor take a stop channel

This causes the Prometheus provider to take a stop channel as an
argument, which allows us to stop the lister (which keeps the series list
up to date) in the unit tests.
This commit is contained in:
Solly Ross 2017-06-27 19:50:08 -04:00
parent 895183e503
commit d1d60f7f5e
3 changed files with 14 additions and 9 deletions

View file

@ -129,7 +129,7 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c
instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String()) instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String())
promClient := prom.NewClientForAPI(instrumentedGenericPromClient) promClient := prom.NewClientForAPI(instrumentedGenericPromClient)
cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval) cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval, stopCh)
server, err := config.Complete().New(cmProvider) server, err := config.Complete().New(cmProvider)
if err != nil { if err != nil {

View file

@ -75,7 +75,7 @@ type prometheusProvider struct {
rateInterval time.Duration rateInterval time.Duration
} }
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration) provider.CustomMetricsProvider { func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider {
lister := &cachingMetricsLister{ lister := &cachingMetricsLister{
updateInterval: updateInterval, updateInterval: updateInterval,
promClient: promClient, promClient: promClient,
@ -89,8 +89,7 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP
}, },
} }
// TODO: allow for RunUntil lister.RunUntil(stopChan)
lister.Run()
return &prometheusProvider{ return &prometheusProvider{
mapper: mapper, mapper: mapper,
@ -310,11 +309,15 @@ type cachingMetricsLister struct {
} }
func (l *cachingMetricsLister) Run() { func (l *cachingMetricsLister) Run() {
go wait.Forever(func() { l.RunUntil(wait.NeverStop)
}
func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
go wait.Until(func() {
if err := l.updateMetrics(); err != nil { if err := l.updateMetrics(); err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
}, l.updateInterval) }, l.updateInterval, stopChan)
} }
func (l *cachingMetricsLister) updateMetrics() error { func (l *cachingMetricsLister) updateMetrics() error {

View file

@ -90,11 +90,11 @@ func (c *fakePromClient) QueryRange(_ context.Context, r prom.Range, query prom.
return prom.QueryResult{}, nil return prom.QueryResult{}, nil
} }
func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fakePromClient) { func setupPrometheusProvider(t *testing.T, stopCh <-chan struct{}) (provider.CustomMetricsProvider, *fakePromClient) {
fakeProm := &fakePromClient{} fakeProm := &fakePromClient{}
fakeKubeClient := &fakedyn.FakeClientPool{} fakeKubeClient := &fakedyn.FakeClientPool{}
prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute) prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute, stopCh)
containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))
@ -134,7 +134,9 @@ func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fak
func TestListAllMetrics(t *testing.T) { func TestListAllMetrics(t *testing.T) {
// setup // setup
prov, fakeProm := setupPrometheusProvider(t) stopCh := make(chan struct{})
defer close(stopCh)
prov, fakeProm := setupPrometheusProvider(t, stopCh)
// assume we have no updates // assume we have no updates
require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start") require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start")