dependency-injected non-global metrics object

This commit is contained in:
pdbogen 2019-07-22 11:35:28 -07:00
parent b394496f5c
commit bda754ad2d
7 changed files with 162 additions and 78 deletions

View file

@ -27,7 +27,7 @@ import (
"os"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/directxman12/k8s-prometheus-adapter/pkg/metrics"
basecmd "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
@ -68,9 +68,11 @@ type PrometheusAdapter struct {
// MetricsMaxAge is the period to query available metrics for
MetricsMaxAge time.Duration
// MetricsPort is the port on which the adapter itself will expose metrics
MetricsPort int16
MetricsPort uint16
metricsConfig *adaptercfg.MetricsDiscoveryConfig
ServiceMetrics *metrics.ServiceMetrics
}
func (cmd *PrometheusAdapter) makePromClient() (prom.Client, error) {
@ -128,7 +130,7 @@ func (cmd *PrometheusAdapter) addFlags() {
"interval at which to re-list the set of all available metrics from Prometheus")
cmd.Flags().DurationVar(&cmd.MetricsMaxAge, "metrics-max-age", cmd.MetricsMaxAge, ""+
"period for which to query the set of available metrics from Prometheus")
cmd.Flags().Int16Var(&cmd.MetricsPort, "metrics-port", 9593, "port on which to expose prometheus "+
cmd.Flags().Uint16Var(&cmd.MetricsPort, "metrics-port", 9593, "port on which to expose prometheus "+
"metrics about k8s-prometheus-adapter")
}
@ -142,12 +144,17 @@ func (cmd *PrometheusAdapter) loadConfig() error {
return fmt.Errorf("unable to load metrics discovery configuration: %v", err)
}
if cmd.ServiceMetrics != nil {
cmd.ServiceMetrics.Rules.WithLabelValues("normal").Set(float64(len(metricsConfig.Rules)))
cmd.ServiceMetrics.Rules.WithLabelValues("external").Set(float64(len(metricsConfig.ExternalRules)))
}
cmd.metricsConfig = metricsConfig
return nil
}
func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) {
func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}, serviceMetrics *metrics.ServiceMetrics) (provider.CustomMetricsProvider, error) {
if len(cmd.metricsConfig.Rules) == 0 {
return nil, nil
}
@ -173,7 +180,8 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan
}
// construct the provider and start it
cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge)
cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers,
cmd.MetricsRelistInterval, cmd.MetricsMaxAge, serviceMetrics)
runner.RunUntil(stopCh)
return cmProvider, nil
@ -197,7 +205,8 @@ func (cmd *PrometheusAdapter) makeExternalProvider(promClient prom.Client, stopC
}
// construct the provider and start it
emProvider, runner := extprov.NewExternalPrometheusProvider(promClient, namers, cmd.MetricsRelistInterval)
emProvider, runner := extprov.NewExternalPrometheusProvider(promClient, namers, cmd.MetricsRelistInterval,
cmd.ServiceMetrics)
runner.RunUntil(stopCh)
return emProvider, nil
@ -241,22 +250,23 @@ func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) erro
}
func (cmd *PrometheusAdapter) runMetrics() {
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
klog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", cmd.MetricsPort), mux))
}()
}
func main() {
logs.InitLogs()
defer logs.FlushLogs()
serviceMetrics, err := metrics.NewMetrics()
if err != nil {
klog.Fatalf("unable to construct Metrics registry: %v", err)
}
// set up flags
cmd := &PrometheusAdapter{
PrometheusURL: "https://localhost",
MetricsRelistInterval: 10 * time.Minute,
MetricsMaxAge: 20 * time.Minute,
ServiceMetrics: serviceMetrics,
}
cmd.Name = "prometheus-metrics-adapter"
cmd.addFlags()
@ -276,10 +286,10 @@ func main() {
klog.Fatalf("unable to load metrics discovery config: %v", err)
}
cmd.runMetrics()
serviceMetrics.Run(cmd.MetricsPort)
// construct the provider
cmProvider, err := cmd.makeProvider(promClient, wait.NeverStop)
cmProvider, err := cmd.makeProvider(promClient, wait.NeverStop, serviceMetrics)
if err != nil {
klog.Fatalf("unable to construct custom metrics provider: %v", err)
}

View file

@ -5,8 +5,6 @@ import (
"io/ioutil"
"os"
"github.com/directxman12/k8s-prometheus-adapter/pkg/metrics"
"gopkg.in/yaml.v2"
)
@ -30,7 +28,5 @@ func FromYAML(contents []byte) (*MetricsDiscoveryConfig, error) {
if err := yaml.UnmarshalStrict(contents, &cfg); err != nil {
return nil, fmt.Errorf("unable to parse metrics discovery config: %v", err)
}
metrics.Rules.WithLabelValues("normal").Set(float64(len(cfg.Rules)))
metrics.Rules.WithLabelValues("external").Set(float64(len(cfg.ExternalRules)))
return &cfg, nil
}

View file

@ -52,14 +52,15 @@ type Runnable interface {
}
type prometheusProvider struct {
mapper apimeta.RESTMapper
kubeClient dynamic.Interface
promClient prom.Client
mapper apimeta.RESTMapper
kubeClient dynamic.Interface
promClient prom.Client
serviceMetrics *metrics.ServiceMetrics
SeriesRegistry
}
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration) (provider.CustomMetricsProvider, Runnable) {
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration, serviceMetrics *metrics.ServiceMetrics) (provider.CustomMetricsProvider, Runnable) {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
maxAge: maxAge,
@ -67,15 +68,19 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interfa
namers: namers,
SeriesRegistry: &basicSeriesRegistry{
name: "custom",
mapper: mapper,
name: "custom",
mapper: mapper,
serviceMetrics: serviceMetrics,
},
serviceMetrics: serviceMetrics,
}
return &prometheusProvider{
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,
serviceMetrics: serviceMetrics,
SeriesRegistry: lister,
}, lister
@ -101,7 +106,9 @@ func (p *prometheusProvider) metricFor(value pmodel.SampleValue, name types.Name
func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.CustomMetricInfo, namespace string, names []string) (*custom_metrics.MetricValueList, error) {
values, found := p.MatchValuesToNames(info, valueSet)
if !found {
metrics.Errors.WithLabelValues("not_found").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("not_found").Inc()
}
return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
}
res := []custom_metrics.MetricValue{}
@ -126,7 +133,9 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Cu
func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, names ...string) (pmodel.Vector, error) {
query, found := p.QueryForMetric(info, namespace, names...)
if !found {
metrics.Errors.WithLabelValues("not_found").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("not_found").Inc()
}
return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
}
@ -134,14 +143,18 @@ func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespac
queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), query)
if err != nil {
klog.Errorf("unable to fetch metrics from prometheus: %v", err)
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("internal").Inc()
}
// don't leak implementation details to the user
metrics.Errors.WithLabelValues("internal").Inc()
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
}
if queryResults.Type != pmodel.ValVector {
klog.Errorf("unexpected results from prometheus: expected %s, got %s on results %v", pmodel.ValVector, queryResults.Type, queryResults)
metrics.Errors.WithLabelValues("internal").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("internal").Inc()
}
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
}
@ -157,13 +170,17 @@ func (p *prometheusProvider) GetMetricByName(name types.NamespacedName, info pro
// associate the metrics
if len(queryResults) < 1 {
metrics.Errors.WithLabelValues("not_found").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("not_found").Inc()
}
return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name)
}
namedValues, found := p.MatchValuesToNames(info, queryResults)
if !found {
metrics.Errors.WithLabelValues("not_found").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("not_found").Inc()
}
return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
}
@ -174,7 +191,9 @@ func (p *prometheusProvider) GetMetricByName(name types.NamespacedName, info pro
resultValue, nameFound := namedValues[name.Name]
if !nameFound {
klog.Errorf("None of the results returned by when fetching metric %s for %q matched the resource name", info.String(), name)
metrics.Errors.WithLabelValues("not_found").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("not_found").Inc()
}
return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name)
}
@ -187,8 +206,10 @@ func (p *prometheusProvider) GetMetricBySelector(namespace string, selector labe
resourceNames, err := helpers.ListObjectNames(p.mapper, p.kubeClient, namespace, selector, info)
if err != nil {
klog.Errorf("unable to list matching resource names: %v", err)
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("internal").Inc()
}
// don't leak implementation details to the user
metrics.Errors.WithLabelValues("internal").Inc()
return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources"))
}
@ -209,6 +230,7 @@ type cachingMetricsLister struct {
updateInterval time.Duration
maxAge time.Duration
namers []naming.MetricNamer
serviceMetrics *metrics.ServiceMetrics
}
func (l *cachingMetricsLister) Run() {
@ -264,7 +286,9 @@ func (l *cachingMetricsLister) updateMetrics() error {
// iterate through, blocking until we've got all results
for range l.namers {
if err := <-errs; err != nil {
metrics.PrometheusUp.Set(0)
if l.serviceMetrics != nil {
l.serviceMetrics.PrometheusUp.Set(0)
}
return fmt.Errorf("unable to update list of all metrics: %v", err)
}
if ss := <-selectorSeriesChan; ss.series != nil {
@ -272,7 +296,9 @@ func (l *cachingMetricsLister) updateMetrics() error {
}
}
close(errs)
metrics.PrometheusUp.Set(1)
if l.serviceMetrics != nil {
l.serviceMetrics.PrometheusUp.Set(1)
}
newSeries := make([][]prom.Series, len(l.namers))
for i, namer := range l.namers {

View file

@ -79,6 +79,8 @@ type basicSeriesRegistry struct {
metrics []provider.CustomMetricInfo
mapper apimeta.RESTMapper
serviceMetrics *metrics.ServiceMetrics
}
func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []naming.MetricNamer) error {
@ -127,7 +129,9 @@ func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers
r.mu.Lock()
defer r.mu.Unlock()
metrics.RegistryMetrics.WithLabelValues(r.name).Set(float64(len(newMetrics)))
if r.serviceMetrics != nil {
r.serviceMetrics.RegistryMetrics.WithLabelValues(r.name).Set(float64(len(newMetrics)))
}
r.info = newInfo
r.metrics = newMetrics

View file

@ -46,6 +46,8 @@ type externalSeriesRegistry struct {
metrics []provider.ExternalMetricInfo
// metricsInfo is a lookup from a metric to SeriesConverter for the sake of generating queries
metricsInfo map[string]seriesInfo
serviceMetrics *metrics.ServiceMetrics
}
type seriesInfo struct {
@ -57,10 +59,11 @@ type seriesInfo struct {
}
// NewExternalSeriesRegistry creates an ExternalSeriesRegistry driven by the data from the provided MetricLister.
func NewExternalSeriesRegistry(lister MetricListerWithNotification) ExternalSeriesRegistry {
func NewExternalSeriesRegistry(lister MetricListerWithNotification, serviceMetrics *metrics.ServiceMetrics) ExternalSeriesRegistry {
var registry = externalSeriesRegistry{
metrics: make([]provider.ExternalMetricInfo, 0),
metricsInfo: map[string]seriesInfo{},
metrics: make([]provider.ExternalMetricInfo, 0),
metricsInfo: map[string]seriesInfo{},
serviceMetrics: serviceMetrics,
}
lister.AddNotificationReceiver(registry.filterAndStoreMetrics)
@ -105,7 +108,9 @@ func (r *externalSeriesRegistry) filterAndStoreMetrics(result MetricUpdateResult
r.mu.Lock()
defer r.mu.Unlock()
metrics.RegistryMetrics.WithLabelValues(r.name).Set(float64(len(apiMetricsCache)))
if r.serviceMetrics != nil {
r.serviceMetrics.RegistryMetrics.WithLabelValues(r.name).Set(float64(len(apiMetricsCache)))
}
r.metrics = apiMetricsCache
r.metricsInfo = rawMetricsCache

View file

@ -40,6 +40,8 @@ type externalPrometheusProvider struct {
metricConverter MetricConverter
seriesRegistry ExternalSeriesRegistry
serviceMetrics *metrics.ServiceMetrics
}
func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) {
@ -47,12 +49,16 @@ func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricS
if err != nil {
klog.Errorf("unable to generate a query for the metric: %v", err)
metrics.Errors.WithLabelValues("internal").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("internal").Inc()
}
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
}
if !found {
metrics.Errors.WithLabelValues("not_found").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("not_found").Inc()
}
return nil, provider.NewMetricNotFoundError(p.selectGroupResource(namespace), info.Metric)
}
// Here is where we're making the query, need to be before here xD
@ -61,7 +67,9 @@ func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricS
if err != nil {
klog.Errorf("unable to fetch metrics from prometheus: %v", err)
// don't leak implementation details to the user
metrics.Errors.WithLabelValues("internal").Inc()
if p.serviceMetrics != nil {
p.serviceMetrics.Errors.WithLabelValues("internal").Inc()
}
return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics"))
}
return p.metricConverter.Convert(info, queryResults)
@ -83,14 +91,15 @@ func (p *externalPrometheusProvider) selectGroupResource(namespace string) schem
}
// NewExternalPrometheusProvider creates an ExternalMetricsProvider capable of responding to Kubernetes requests for external metric data
func NewExternalPrometheusProvider(promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration) (provider.ExternalMetricsProvider, Runnable) {
func NewExternalPrometheusProvider(promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, serviceMetrics *metrics.ServiceMetrics) (provider.ExternalMetricsProvider, Runnable) {
metricConverter := NewMetricConverter()
basicLister := NewBasicMetricLister(promClient, namers, updateInterval)
periodicLister, _ := NewPeriodicMetricLister(basicLister, updateInterval)
seriesRegistry := NewExternalSeriesRegistry(periodicLister)
seriesRegistry := NewExternalSeriesRegistry(periodicLister, serviceMetrics)
return &externalPrometheusProvider{
promClient: promClient,
seriesRegistry: seriesRegistry,
metricConverter: metricConverter,
serviceMetrics: serviceMetrics,
}, periodicLister
}

View file

@ -1,38 +1,72 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
import (
"fmt"
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog"
)
const MetricsNamespace = "adapter"
var PrometheusUp = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "prometheus_up",
Help: "1 when adapter is able to reach prometheus, 0 otherwise",
})
var RegistryMetrics = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "registry_metrics",
Help: "number of metrics entries in cache registry",
}, []string{"registry"})
var Errors = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "errors_total",
Help: "number of errors served",
}, []string{"type"})
var Rules = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "roles",
Help: "number of configured rules",
}, []string{"type"})
func init() {
prometheus.MustRegister(
PrometheusUp,
RegistryMetrics,
Errors,
Rules,
)
type ServiceMetrics struct {
PrometheusUp prometheus.Gauge
RegistryMetrics *prometheus.GaugeVec
Errors *prometheus.CounterVec
Rules *prometheus.GaugeVec
Registry *prometheus.Registry
}
func NewMetrics() (*ServiceMetrics, error) {
ret := &ServiceMetrics{
PrometheusUp: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "prometheus_up",
Help: "1 when adapter is able to reach prometheus, 0 otherwise",
}),
RegistryMetrics: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "registry_metrics",
Help: "number of metrics entries in cache registry",
}, []string{"registry"}),
Errors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "errors_total",
Help: "number of errors served",
}, []string{"type"}),
Rules: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "roles",
Help: "number of configured rules",
}, []string{"type"}),
Registry: prometheus.NewRegistry(),
}
for collectorName, collector := range map[string]prometheus.Collector{
"Go collector": prometheus.NewGoCollector(),
"Prometheus Up": ret.PrometheusUp,
"Registry Metrics": ret.RegistryMetrics,
"Errors": ret.Errors,
"Rules": ret.Rules,
} {
if err := ret.Registry.Register(collector); err != nil {
return nil, fmt.Errorf("during registration of %q: %v", collectorName, err)
}
}
return ret, nil
}
func (m *ServiceMetrics) Run(port uint16) {
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
klog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), mux))
}()
}