Some conversation-starting progress.

Still tons of work to do, but it's probably far enough along to get some feedback.
This commit is contained in:
Tony Compton 2018-06-27 15:34:05 -04:00
parent f49892b097
commit 0af14dc93d
5 changed files with 225 additions and 55 deletions

View file

@ -176,7 +176,7 @@ func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-c
instrumentedGenericPromClient := mprom.InstrumentGenericAPIClient(genericPromClient, baseURL.String())
promClient := prom.NewClientForAPI(instrumentedGenericPromClient)
cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.LabelPrefix, o.MetricsRelistInterval, o.RateInterval, stopCh)
cmProvider := cmprov.NewCustomPrometheusProvider(dynamicMapper, clientPool, promClient, o.LabelPrefix, o.MetricsRelistInterval, o.RateInterval, stopCh)
server, err := config.Complete().New("prometheus-custom-metrics-adapter", cmProvider)
if err != nil {

View file

@ -33,15 +33,13 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/metrics/pkg/apis/custom_metrics"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
type prometheusProvider struct {
type customPrometheusProvider struct {
mapper apimeta.RESTMapper
kubeClient dynamic.ClientPool
promClient prom.Client
@ -51,10 +49,7 @@ type prometheusProvider struct {
rateInterval time.Duration
}
type DoubleMetricProvider interface {
}
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider {
func NewCustomPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.CustomMetricsProvider {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
promClient: promClient,
@ -71,7 +66,7 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP
lister.RunUntil(stopChan)
return &prometheusProvider{
return &customPrometheusProvider{
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,
@ -82,7 +77,7 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientP
}
}
func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) {
func (p *customPrometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) {
kind, err := p.mapper.KindFor(groupResource.WithVersion(""))
if err != nil {
return nil, err
@ -101,7 +96,7 @@ func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource s
}, nil
}
func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.CustomMetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) {
func (p *customPrometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.CustomMetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) {
if !apimeta.IsListType(list) {
return nil, apierr.NewInternalError(fmt.Errorf("result of label selector list operation was not a list"))
}
@ -135,7 +130,7 @@ func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.Cu
}, nil
}
func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, names ...string) (pmodel.Vector, error) {
func (p *customPrometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, names ...string) (pmodel.Vector, error) {
kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...)
if !found {
return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
@ -173,7 +168,7 @@ func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespac
return *queryResults.Vector, nil
}
func (p *prometheusProvider) getSingle(info provider.CustomMetricInfo, namespace, name string) (*custom_metrics.MetricValue, error) {
func (p *customPrometheusProvider) getSingle(info provider.CustomMetricInfo, namespace, name string) (*custom_metrics.MetricValue, error) {
queryResults, err := p.buildQuery(info, namespace, name)
if err != nil {
return nil, err
@ -201,7 +196,7 @@ func (p *prometheusProvider) getSingle(info provider.CustomMetricInfo, namespace
return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric)
}
func (p *prometheusProvider) getMultiple(info provider.CustomMetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) {
func (p *customPrometheusProvider) getMultiple(info provider.CustomMetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) {
// construct a client to list the names of objects matching the label selector
client, err := p.kubeClient.ClientForGroupVersionResource(info.GroupResource.WithVersion(""))
if err != nil {
@ -246,7 +241,7 @@ func (p *prometheusProvider) getMultiple(info provider.CustomMetricInfo, namespa
return p.metricsFor(queryResults, info, matchingObjectsRaw)
}
func (p *prometheusProvider) GetRootScopedMetricByName(groupResource schema.GroupResource, name string, metricName string) (*custom_metrics.MetricValue, error) {
func (p *customPrometheusProvider) GetRootScopedMetricByName(groupResource schema.GroupResource, name string, metricName string) (*custom_metrics.MetricValue, error) {
info := provider.CustomMetricInfo{
GroupResource: groupResource,
Metric: metricName,
@ -256,7 +251,7 @@ func (p *prometheusProvider) GetRootScopedMetricByName(groupResource schema.Grou
return p.getSingle(info, "", name)
}
func (p *prometheusProvider) GetRootScopedMetricBySelector(groupResource schema.GroupResource, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) {
func (p *customPrometheusProvider) GetRootScopedMetricBySelector(groupResource schema.GroupResource, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) {
info := provider.CustomMetricInfo{
GroupResource: groupResource,
Metric: metricName,
@ -265,7 +260,7 @@ func (p *prometheusProvider) GetRootScopedMetricBySelector(groupResource schema.
return p.getMultiple(info, "", selector)
}
func (p *prometheusProvider) GetNamespacedMetricByName(groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) {
func (p *customPrometheusProvider) GetNamespacedMetricByName(groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) {
info := provider.CustomMetricInfo{
GroupResource: groupResource,
Metric: metricName,
@ -275,7 +270,7 @@ func (p *prometheusProvider) GetNamespacedMetricByName(groupResource schema.Grou
return p.getSingle(info, namespace, name)
}
func (p *prometheusProvider) GetNamespacedMetricBySelector(groupResource schema.GroupResource, namespace string, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) {
func (p *customPrometheusProvider) GetNamespacedMetricBySelector(groupResource schema.GroupResource, namespace string, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) {
info := provider.CustomMetricInfo{
GroupResource: groupResource,
Metric: metricName,
@ -283,40 +278,3 @@ func (p *prometheusProvider) GetNamespacedMetricBySelector(groupResource schema.
}
return p.getMultiple(info, namespace, selector)
}
type cachingMetricsLister struct {
SeriesRegistry
promClient prom.Client
updateInterval time.Duration
}
func (l *cachingMetricsLister) Run() {
l.RunUntil(wait.NeverStop)
}
func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
go wait.Until(func() {
if err := l.updateMetrics(); err != nil {
utilruntime.HandleError(err)
}
}, l.updateInterval, stopChan)
}
func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.updateInterval)
sels := l.Selectors()
// TODO: use an actual context here
series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, sels...)
if err != nil {
return fmt.Errorf("unable to update list of all available metrics: %v", err)
}
glog.V(10).Infof("Set available metric list from Prometheus to: %v", series)
l.SetSeries(series)
return nil
}

View file

@ -0,0 +1,162 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"fmt"
s "strings"
"time"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/dynamic"
"k8s.io/metrics/pkg/apis/external_metrics"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)
type externalPrometheusProvider struct {
mapper apimeta.RESTMapper
kubeClient dynamic.ClientPool
promClient prom.Client
SeriesRegistry
rateInterval time.Duration
}
func NewExternalPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, labelPrefix string, updateInterval time.Duration, rateInterval time.Duration, stopChan <-chan struct{}) provider.ExternalMetricsProvider {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
promClient: promClient,
SeriesRegistry: &basicSeriesRegistry{
namer: metricNamer{
// TODO: populate the overrides list
overrides: nil,
mapper: mapper,
labelPrefix: labelPrefix,
},
},
}
lister.RunUntil(stopChan)
return &externalPrometheusProvider{
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,
SeriesRegistry: lister,
rateInterval: rateInterval,
}
}
func (p *externalPrometheusProvider) GetExternalMetric(namespace string, metricName string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) {
//TODO: Steps
//1. Generate a Prometheus Query.
// Something like my_metric{namespace="namespace" some_label="some_value"}
//2. Send that query to Prometheus.
//3. Adapt the results.
//The query generation for external metrics is much more straightforward
//than for custom metrics because no renaming is applied.
//So we'll just start with some simple string operations and see how far that gets us.
//Then I'll circle back and figure out how much code reuse I can get out of the original implementation.
namespaceSelector := p.makeLabelFilter("namespace", "=", namespace)
otherSelectors := p.convertSelectors(metricSelector)
finalTargets := append([]string{namespaceSelector}, otherSelectors...)
//TODO: Only here to stop compiler issues in this incomplete code.
fmt.Printf("len=%d", len(finalTargets))
//TODO: Construct a real result.
return nil, nil
}
func (p *externalPrometheusProvider) makeLabelFilter(labelName string, operator string, targetValue string) string {
return fmt.Sprintf("%s%s\"%s\"", labelName, operator, targetValue)
}
func (p *externalPrometheusProvider) convertSelectors(metricSelector labels.Selector) []string {
requirements, _ := metricSelector.Requirements()
selectors := []string{}
for i := 0; i < len(requirements); i++ {
selector := p.convertRequirement(requirements[i])
selectors = append(selectors, selector)
}
return selectors
}
func (p *externalPrometheusProvider) convertRequirement(requirement labels.Requirement) string {
labelName := requirement.Key()
values := requirement.Values().List()
stringValues := values[0]
valueCount := len(values)
if valueCount > 1 {
stringValues = s.Join(values, "|")
}
operator := p.selectOperator(requirement.Operator(), valueCount)
return p.makeLabelFilter(labelName, operator, stringValues)
}
func (p *externalPrometheusProvider) selectOperator(operator selection.Operator, valueCount int) string {
if valueCount > 1 {
return p.selectRegexOperator(operator)
}
return p.selectSingleValueOperator(operator)
}
func (p *externalPrometheusProvider) selectRegexOperator(operator selection.Operator) string {
switch operator {
case selection.Equals:
return "=~"
case selection.NotEquals:
return "!~"
}
//TODO: Cover more cases, supply appropriate errors for any unhandled cases.
return "="
}
func (p *externalPrometheusProvider) selectSingleValueOperator(operator selection.Operator) string {
switch operator {
case selection.Equals:
return "="
case selection.NotEquals:
return "!="
}
//TODO: Cover more cases, supply appropriate errors for any unhandled cases.
return "="
}
func (p *externalPrometheusProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
//TODO: Provide a real response.
return nil
}

View file

@ -0,0 +1,50 @@
package provider
import (
"context"
"fmt"
"time"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/golang/glog"
pmodel "github.com/prometheus/common/model"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
)
type cachingMetricsLister struct {
SeriesRegistry
promClient prom.Client
updateInterval time.Duration
}
func (l *cachingMetricsLister) Run() {
l.RunUntil(wait.NeverStop)
}
func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {
go wait.Until(func() {
if err := l.updateMetrics(); err != nil {
utilruntime.HandleError(err)
}
}, l.updateInterval, stopChan)
}
func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.updateInterval)
sels := l.Selectors()
// TODO: use an actual context here
series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, sels...)
if err != nil {
return fmt.Errorf("unable to update list of all available metrics: %v", err)
}
glog.V(10).Infof("Set available metric list from Prometheus to: %v", series)
l.SetSeries(series)
return nil
}