From bda754ad2d6bf6cfadbda35d3017b4fb66d9eb8c Mon Sep 17 00:00:00 2001 From: pdbogen Date: Mon, 22 Jul 2019 11:35:28 -0700 Subject: [PATCH] dependency-injected non-global metrics object --- cmd/adapter/adapter.go | 36 ++++--- pkg/config/loader.go | 4 - pkg/custom-provider/provider.go | 64 ++++++++---- pkg/custom-provider/series_registry.go | 6 +- .../external_series_registry.go | 13 ++- pkg/external-provider/provider.go | 19 +++- pkg/metrics/metrics.go | 98 +++++++++++++------ 7 files changed, 162 insertions(+), 78 deletions(-) diff --git a/cmd/adapter/adapter.go b/cmd/adapter/adapter.go index dd801222..990845eb 100644 --- a/cmd/adapter/adapter.go +++ b/cmd/adapter/adapter.go @@ -27,7 +27,7 @@ import ( "os" "time" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/directxman12/k8s-prometheus-adapter/pkg/metrics" basecmd "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" @@ -68,9 +68,11 @@ type PrometheusAdapter struct { // MetricsMaxAge is the period to query available metrics for MetricsMaxAge time.Duration // MetricsPort is the port on which the adapter itself will expose metrics - MetricsPort int16 + MetricsPort uint16 metricsConfig *adaptercfg.MetricsDiscoveryConfig + + ServiceMetrics *metrics.ServiceMetrics } func (cmd *PrometheusAdapter) makePromClient() (prom.Client, error) { @@ -128,7 +130,7 @@ func (cmd *PrometheusAdapter) addFlags() { "interval at which to re-list the set of all available metrics from Prometheus") cmd.Flags().DurationVar(&cmd.MetricsMaxAge, "metrics-max-age", cmd.MetricsMaxAge, ""+ "period for which to query the set of available metrics from Prometheus") - cmd.Flags().Int16Var(&cmd.MetricsPort, "metrics-port", 9593, "port on which to expose prometheus "+ + cmd.Flags().Uint16Var(&cmd.MetricsPort, "metrics-port", 9593, "port on which to expose prometheus "+ "metrics about k8s-prometheus-adapter") } @@ -142,12 +144,17 @@ func (cmd *PrometheusAdapter) loadConfig() error { return fmt.Errorf("unable to load metrics discovery configuration: %v", err) } + if cmd.ServiceMetrics != nil { + cmd.ServiceMetrics.Rules.WithLabelValues("normal").Set(float64(len(metricsConfig.Rules))) + cmd.ServiceMetrics.Rules.WithLabelValues("external").Set(float64(len(metricsConfig.ExternalRules))) + } + cmd.metricsConfig = metricsConfig return nil } -func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) { +func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}, serviceMetrics *metrics.ServiceMetrics) (provider.CustomMetricsProvider, error) { if len(cmd.metricsConfig.Rules) == 0 { return nil, nil } @@ -173,7 +180,8 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan } // construct the provider and start it - cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge) + cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, + cmd.MetricsRelistInterval, cmd.MetricsMaxAge, serviceMetrics) runner.RunUntil(stopCh) return cmProvider, nil @@ -197,7 +205,8 @@ func (cmd *PrometheusAdapter) makeExternalProvider(promClient prom.Client, stopC } // construct the provider and start it - emProvider, runner := extprov.NewExternalPrometheusProvider(promClient, namers, cmd.MetricsRelistInterval) + emProvider, runner := extprov.NewExternalPrometheusProvider(promClient, namers, cmd.MetricsRelistInterval, + cmd.ServiceMetrics) runner.RunUntil(stopCh) return emProvider, nil @@ -241,22 +250,23 @@ func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) erro } func (cmd *PrometheusAdapter) runMetrics() { - go func() { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - klog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", cmd.MetricsPort), mux)) - }() } func main() { logs.InitLogs() defer logs.FlushLogs() + serviceMetrics, err := metrics.NewMetrics() + if err != nil { + klog.Fatalf("unable to construct Metrics registry: %v", err) + } + // set up flags cmd := &PrometheusAdapter{ PrometheusURL: "https://localhost", MetricsRelistInterval: 10 * time.Minute, MetricsMaxAge: 20 * time.Minute, + ServiceMetrics: serviceMetrics, } cmd.Name = "prometheus-metrics-adapter" cmd.addFlags() @@ -276,10 +286,10 @@ func main() { klog.Fatalf("unable to load metrics discovery config: %v", err) } - cmd.runMetrics() + serviceMetrics.Run(cmd.MetricsPort) // construct the provider - cmProvider, err := cmd.makeProvider(promClient, wait.NeverStop) + cmProvider, err := cmd.makeProvider(promClient, wait.NeverStop, serviceMetrics) if err != nil { klog.Fatalf("unable to construct custom metrics provider: %v", err) } diff --git a/pkg/config/loader.go b/pkg/config/loader.go index b0ec0c4c..0e54357c 100644 --- a/pkg/config/loader.go +++ b/pkg/config/loader.go @@ -5,8 +5,6 @@ import ( "io/ioutil" "os" - "github.com/directxman12/k8s-prometheus-adapter/pkg/metrics" - "gopkg.in/yaml.v2" ) @@ -30,7 +28,5 @@ func FromYAML(contents []byte) (*MetricsDiscoveryConfig, error) { if err := yaml.UnmarshalStrict(contents, &cfg); err != nil { return nil, fmt.Errorf("unable to parse metrics discovery config: %v", err) } - metrics.Rules.WithLabelValues("normal").Set(float64(len(cfg.Rules))) - metrics.Rules.WithLabelValues("external").Set(float64(len(cfg.ExternalRules))) return &cfg, nil } diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 3bd28664..a813fccc 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -52,14 +52,15 @@ type Runnable interface { } type prometheusProvider struct { - mapper apimeta.RESTMapper - kubeClient dynamic.Interface - promClient prom.Client + mapper apimeta.RESTMapper + kubeClient dynamic.Interface + promClient prom.Client + serviceMetrics *metrics.ServiceMetrics SeriesRegistry } -func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration) (provider.CustomMetricsProvider, Runnable) { +func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration, serviceMetrics *metrics.ServiceMetrics) (provider.CustomMetricsProvider, Runnable) { lister := &cachingMetricsLister{ updateInterval: updateInterval, maxAge: maxAge, @@ -67,15 +68,19 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interfa namers: namers, SeriesRegistry: &basicSeriesRegistry{ - name: "custom", - mapper: mapper, + name: "custom", + mapper: mapper, + serviceMetrics: serviceMetrics, }, + + serviceMetrics: serviceMetrics, } return &prometheusProvider{ - mapper: mapper, - kubeClient: kubeClient, - promClient: promClient, + mapper: mapper, + kubeClient: kubeClient, + promClient: promClient, + serviceMetrics: serviceMetrics, SeriesRegistry: lister, }, lister @@ -101,7 +106,9 @@ func (p *prometheusProvider) metricFor(value pmodel.SampleValue, name types.Name func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.CustomMetricInfo, namespace string, names []string) (*custom_metrics.MetricValueList, error) { values, found := p.MatchValuesToNames(info, valueSet) if !found { - metrics.Errors.WithLabelValues("not_found").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("not_found").Inc() + } return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } res := []custom_metrics.MetricValue{} @@ -126,7 +133,9 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Cu func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, names ...string) (pmodel.Vector, error) { query, found := p.QueryForMetric(info, namespace, names...) if !found { - metrics.Errors.WithLabelValues("not_found").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("not_found").Inc() + } return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } @@ -134,14 +143,18 @@ func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespac queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), query) if err != nil { klog.Errorf("unable to fetch metrics from prometheus: %v", err) + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("internal").Inc() + } // don't leak implementation details to the user - metrics.Errors.WithLabelValues("internal").Inc() return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) } if queryResults.Type != pmodel.ValVector { klog.Errorf("unexpected results from prometheus: expected %s, got %s on results %v", pmodel.ValVector, queryResults.Type, queryResults) - metrics.Errors.WithLabelValues("internal").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("internal").Inc() + } return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) } @@ -157,13 +170,17 @@ func (p *prometheusProvider) GetMetricByName(name types.NamespacedName, info pro // associate the metrics if len(queryResults) < 1 { - metrics.Errors.WithLabelValues("not_found").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("not_found").Inc() + } return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name) } namedValues, found := p.MatchValuesToNames(info, queryResults) if !found { - metrics.Errors.WithLabelValues("not_found").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("not_found").Inc() + } return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) } @@ -174,7 +191,9 @@ func (p *prometheusProvider) GetMetricByName(name types.NamespacedName, info pro resultValue, nameFound := namedValues[name.Name] if !nameFound { klog.Errorf("None of the results returned by when fetching metric %s for %q matched the resource name", info.String(), name) - metrics.Errors.WithLabelValues("not_found").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("not_found").Inc() + } return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name) } @@ -187,8 +206,10 @@ func (p *prometheusProvider) GetMetricBySelector(namespace string, selector labe resourceNames, err := helpers.ListObjectNames(p.mapper, p.kubeClient, namespace, selector, info) if err != nil { klog.Errorf("unable to list matching resource names: %v", err) + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("internal").Inc() + } // don't leak implementation details to the user - metrics.Errors.WithLabelValues("internal").Inc() return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) } @@ -209,6 +230,7 @@ type cachingMetricsLister struct { updateInterval time.Duration maxAge time.Duration namers []naming.MetricNamer + serviceMetrics *metrics.ServiceMetrics } func (l *cachingMetricsLister) Run() { @@ -264,7 +286,9 @@ func (l *cachingMetricsLister) updateMetrics() error { // iterate through, blocking until we've got all results for range l.namers { if err := <-errs; err != nil { - metrics.PrometheusUp.Set(0) + if l.serviceMetrics != nil { + l.serviceMetrics.PrometheusUp.Set(0) + } return fmt.Errorf("unable to update list of all metrics: %v", err) } if ss := <-selectorSeriesChan; ss.series != nil { @@ -272,7 +296,9 @@ func (l *cachingMetricsLister) updateMetrics() error { } } close(errs) - metrics.PrometheusUp.Set(1) + if l.serviceMetrics != nil { + l.serviceMetrics.PrometheusUp.Set(1) + } newSeries := make([][]prom.Series, len(l.namers)) for i, namer := range l.namers { diff --git a/pkg/custom-provider/series_registry.go b/pkg/custom-provider/series_registry.go index 345f91f7..75eb3754 100644 --- a/pkg/custom-provider/series_registry.go +++ b/pkg/custom-provider/series_registry.go @@ -79,6 +79,8 @@ type basicSeriesRegistry struct { metrics []provider.CustomMetricInfo mapper apimeta.RESTMapper + + serviceMetrics *metrics.ServiceMetrics } func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []naming.MetricNamer) error { @@ -127,7 +129,9 @@ func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers r.mu.Lock() defer r.mu.Unlock() - metrics.RegistryMetrics.WithLabelValues(r.name).Set(float64(len(newMetrics))) + if r.serviceMetrics != nil { + r.serviceMetrics.RegistryMetrics.WithLabelValues(r.name).Set(float64(len(newMetrics))) + } r.info = newInfo r.metrics = newMetrics diff --git a/pkg/external-provider/external_series_registry.go b/pkg/external-provider/external_series_registry.go index 998fd54c..803683fd 100644 --- a/pkg/external-provider/external_series_registry.go +++ b/pkg/external-provider/external_series_registry.go @@ -46,6 +46,8 @@ type externalSeriesRegistry struct { metrics []provider.ExternalMetricInfo // metricsInfo is a lookup from a metric to SeriesConverter for the sake of generating queries metricsInfo map[string]seriesInfo + + serviceMetrics *metrics.ServiceMetrics } type seriesInfo struct { @@ -57,10 +59,11 @@ type seriesInfo struct { } // NewExternalSeriesRegistry creates an ExternalSeriesRegistry driven by the data from the provided MetricLister. -func NewExternalSeriesRegistry(lister MetricListerWithNotification) ExternalSeriesRegistry { +func NewExternalSeriesRegistry(lister MetricListerWithNotification, serviceMetrics *metrics.ServiceMetrics) ExternalSeriesRegistry { var registry = externalSeriesRegistry{ - metrics: make([]provider.ExternalMetricInfo, 0), - metricsInfo: map[string]seriesInfo{}, + metrics: make([]provider.ExternalMetricInfo, 0), + metricsInfo: map[string]seriesInfo{}, + serviceMetrics: serviceMetrics, } lister.AddNotificationReceiver(registry.filterAndStoreMetrics) @@ -105,7 +108,9 @@ func (r *externalSeriesRegistry) filterAndStoreMetrics(result MetricUpdateResult r.mu.Lock() defer r.mu.Unlock() - metrics.RegistryMetrics.WithLabelValues(r.name).Set(float64(len(apiMetricsCache))) + if r.serviceMetrics != nil { + r.serviceMetrics.RegistryMetrics.WithLabelValues(r.name).Set(float64(len(apiMetricsCache))) + } r.metrics = apiMetricsCache r.metricsInfo = rawMetricsCache diff --git a/pkg/external-provider/provider.go b/pkg/external-provider/provider.go index 686fdf91..efc83637 100644 --- a/pkg/external-provider/provider.go +++ b/pkg/external-provider/provider.go @@ -40,6 +40,8 @@ type externalPrometheusProvider struct { metricConverter MetricConverter seriesRegistry ExternalSeriesRegistry + + serviceMetrics *metrics.ServiceMetrics } func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) { @@ -47,12 +49,16 @@ func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricS if err != nil { klog.Errorf("unable to generate a query for the metric: %v", err) - metrics.Errors.WithLabelValues("internal").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("internal").Inc() + } return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) } if !found { - metrics.Errors.WithLabelValues("not_found").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("not_found").Inc() + } return nil, provider.NewMetricNotFoundError(p.selectGroupResource(namespace), info.Metric) } // Here is where we're making the query, need to be before here xD @@ -61,7 +67,9 @@ func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricS if err != nil { klog.Errorf("unable to fetch metrics from prometheus: %v", err) // don't leak implementation details to the user - metrics.Errors.WithLabelValues("internal").Inc() + if p.serviceMetrics != nil { + p.serviceMetrics.Errors.WithLabelValues("internal").Inc() + } return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) } return p.metricConverter.Convert(info, queryResults) @@ -83,14 +91,15 @@ func (p *externalPrometheusProvider) selectGroupResource(namespace string) schem } // NewExternalPrometheusProvider creates an ExternalMetricsProvider capable of responding to Kubernetes requests for external metric data -func NewExternalPrometheusProvider(promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration) (provider.ExternalMetricsProvider, Runnable) { +func NewExternalPrometheusProvider(promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, serviceMetrics *metrics.ServiceMetrics) (provider.ExternalMetricsProvider, Runnable) { metricConverter := NewMetricConverter() basicLister := NewBasicMetricLister(promClient, namers, updateInterval) periodicLister, _ := NewPeriodicMetricLister(basicLister, updateInterval) - seriesRegistry := NewExternalSeriesRegistry(periodicLister) + seriesRegistry := NewExternalSeriesRegistry(periodicLister, serviceMetrics) return &externalPrometheusProvider{ promClient: promClient, seriesRegistry: seriesRegistry, metricConverter: metricConverter, + serviceMetrics: serviceMetrics, }, periodicLister } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index d7448579..074b9168 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -1,38 +1,72 @@ package metrics -import "github.com/prometheus/client_golang/prometheus" +import ( + "fmt" + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "k8s.io/klog" +) const MetricsNamespace = "adapter" -var PrometheusUp = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: MetricsNamespace, - Name: "prometheus_up", - Help: "1 when adapter is able to reach prometheus, 0 otherwise", -}) - -var RegistryMetrics = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: MetricsNamespace, - Name: "registry_metrics", - Help: "number of metrics entries in cache registry", -}, []string{"registry"}) - -var Errors = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: MetricsNamespace, - Name: "errors_total", - Help: "number of errors served", -}, []string{"type"}) - -var Rules = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: MetricsNamespace, - Name: "roles", - Help: "number of configured rules", -}, []string{"type"}) - -func init() { - prometheus.MustRegister( - PrometheusUp, - RegistryMetrics, - Errors, - Rules, - ) +type ServiceMetrics struct { + PrometheusUp prometheus.Gauge + RegistryMetrics *prometheus.GaugeVec + Errors *prometheus.CounterVec + Rules *prometheus.GaugeVec + Registry *prometheus.Registry +} + +func NewMetrics() (*ServiceMetrics, error) { + ret := &ServiceMetrics{ + PrometheusUp: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "prometheus_up", + Help: "1 when adapter is able to reach prometheus, 0 otherwise", + }), + + RegistryMetrics: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "registry_metrics", + Help: "number of metrics entries in cache registry", + }, []string{"registry"}), + + Errors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "errors_total", + Help: "number of errors served", + }, []string{"type"}), + + Rules: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "roles", + Help: "number of configured rules", + }, []string{"type"}), + + Registry: prometheus.NewRegistry(), + } + + for collectorName, collector := range map[string]prometheus.Collector{ + "Go collector": prometheus.NewGoCollector(), + "Prometheus Up": ret.PrometheusUp, + "Registry Metrics": ret.RegistryMetrics, + "Errors": ret.Errors, + "Rules": ret.Rules, + } { + if err := ret.Registry.Register(collector); err != nil { + return nil, fmt.Errorf("during registration of %q: %v", collectorName, err) + } + } + + return ret, nil +} + +func (m *ServiceMetrics) Run(port uint16) { + go func() { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + klog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), mux)) + }() }