From c5801455ecdbecec363bb2d87927172690a7d983 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Mon, 20 Aug 2018 17:03:44 -0400 Subject: [PATCH] Introduce support for the resource metrics API This introduces support for the resource metrics in the adapter. The individual queries and window sizes are fully customizable via a new config section. This uses just the generic machinery from metrics-server to serve the API. --- Gopkg.toml | 4 + cmd/adapter/adapter.go | 68 +++++- cmd/config-gen/utils/default.go | 20 ++ pkg/config/config.go | 35 ++- pkg/resourceprovider/provider.go | 385 +++++++++++++++++++++++++++++++ 5 files changed, 503 insertions(+), 9 deletions(-) create mode 100644 pkg/resourceprovider/provider.go diff --git a/Gopkg.toml b/Gopkg.toml index e118a149..bed28988 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -51,6 +51,10 @@ name = "github.com/kubernetes-incubator/custom-metrics-apiserver" version = "kubernetes-1.11.2" +[[constraint]] + name = "github.com/kubernetes-incubator/metrics-server" + branch = "master" + # Core Kubernetes deps [[constraint]] name = "k8s.io/api" diff --git a/cmd/adapter/adapter.go b/cmd/adapter/adapter.go index abc0338b..22f70374 100644 --- a/cmd/adapter/adapter.go +++ b/cmd/adapter/adapter.go @@ -27,6 +27,7 @@ import ( "github.com/golang/glog" basecmd "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + resmetrics "github.com/kubernetes-incubator/metrics-server/pkg/apiserver/generic" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/util/logs" "k8s.io/client-go/rest" @@ -36,6 +37,7 @@ import ( mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics" adaptercfg "github.com/directxman12/k8s-prometheus-adapter/pkg/config" cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider" + resprov "github.com/directxman12/k8s-prometheus-adapter/pkg/resourceprovider" ) type PrometheusAdapter struct { @@ -51,6 +53,8 @@ type PrometheusAdapter struct { AdapterConfigFile string // MetricsRelistInterval is the interval at which to relist the set of available metrics MetricsRelistInterval time.Duration + + metricsConfig *adaptercfg.MetricsDiscoveryConfig } func (cmd *PrometheusAdapter) makePromClient() (prom.Client, error) { @@ -81,7 +85,7 @@ func (cmd *PrometheusAdapter) addFlags() { "interval at which to re-list the set of all available metrics from Prometheus") } -func (cmd *PrometheusAdapter) makeProvider(stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) { +func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) { // load metrics discovery configuration if cmd.AdapterConfigFile == "" { return nil, fmt.Errorf("no metrics discovery configuration file specified (make sure to use --config)") @@ -91,11 +95,7 @@ func (cmd *PrometheusAdapter) makeProvider(stopCh <-chan struct{}) (provider.Cus return nil, fmt.Errorf("unable to load metrics discovery configuration: %v", err) } - // make the prometheus client - promClient, err := cmd.makePromClient() - if err != nil { - return nil, fmt.Errorf("unable to construct Prometheus client: %v", err) - } + cmd.metricsConfig = metricsConfig // grab the mapper and dynamic client mapper, err := cmd.RESTMapper() @@ -120,6 +120,43 @@ func (cmd *PrometheusAdapter) makeProvider(stopCh <-chan struct{}) (provider.Cus return cmProvider, nil } +func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error { + if cmd.metricsConfig.ResourceRules == nil { + // bail if we don't have rules for setting things up + return nil + } + + mapper, err := cmd.RESTMapper() + if err != nil { + return err + } + + provider, err := resprov.NewProvider(promClient, mapper, cmd.metricsConfig.ResourceRules) + if err != nil { + return fmt.Errorf("unable to construct resource metrics API provider: %v", err) + } + + provCfg := &resmetrics.ProviderConfig{ + Node: provider, + Pod: provider, + } + informers, err := cmd.Informers() + if err != nil { + return err + } + + server, err := cmd.Server() + if err != nil { + return err + } + + if err := resmetrics.InstallStorage(provCfg, informers.Core().V1(), server.GenericAPIServer); err != nil { + return err + } + + return nil +} + func main() { logs.InitLogs() defer logs.FlushLogs() @@ -129,18 +166,33 @@ func main() { PrometheusURL: "https://localhost", MetricsRelistInterval: 10 * time.Minute, } + cmd.Name = "prometheus-metrics-adapter" cmd.addFlags() cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the glog flags cmd.Flags().Parse(os.Args) + // make the prometheus client + promClient, err := cmd.makePromClient() + if err != nil { + glog.Fatalf("unable to construct Prometheus client: %v", err) + } + // construct the provider - cmProvider, err := cmd.makeProvider(wait.NeverStop) + cmProvider, err := cmd.makeProvider(promClient, wait.NeverStop) if err != nil { glog.Fatalf("unable to construct custom metrics provider: %v", err) } - // attach the provider to the server and run it + // attach the provider to the server cmd.WithCustomMetrics(cmProvider) + + // attach resource metrics support + // TODO: make this optional + if err := cmd.addResourceMetricsAPI(promClient); err != nil { + glog.Fatalf("unable to install resource metrics API: %v", err) + } + + // run the server if err := cmd.Run(wait.NeverStop); err != nil { glog.Fatalf("unable to run custom metrics adapter: %v", err) } diff --git a/cmd/config-gen/utils/default.go b/cmd/config-gen/utils/default.go index d8873d96..5a7ab0b2 100644 --- a/cmd/config-gen/utils/default.go +++ b/cmd/config-gen/utils/default.go @@ -89,5 +89,25 @@ func DefaultConfig(rateInterval time.Duration, labelPrefix string) *MetricsDisco MetricsQuery: fmt.Sprintf("sum(rate(<<.Series>>{<<.LabelMatchers>>}[%s])) by (<<.GroupBy>>)", pmodel.Duration(rateInterval).String()), }, }, + + ResourceRules: &ResourceRules{ + CPU: ResourceRule{ + ContainerQuery: fmt.Sprintf("sum(rate(container_cpu_usage_seconds_total{<<.LabelMatchers>>}[%s])) by (<<.GroupBy>>)", pmodel.Duration(rateInterval).String()), + NodeQuery: fmt.Sprintf("sum(rate(container_cpu_usage_seconds_total{<<.LabelMatchers>>, id='/'}[%s])) by (<<.GroupBy>>)", pmodel.Duration(rateInterval).String()), + Resources: ResourceMapping{ + Template: fmt.Sprintf("%s<<.Resource>>", labelPrefix), + }, + ContainerLabel: fmt.Sprintf("%scontainer_name", labelPrefix), + }, + Memory: ResourceRule{ + ContainerQuery: "sum(container_memory_working_set_bytes{<<.LabelMatchers>>}) by (<<.GroupBy>>)", + NodeQuery: "sum(container_memory_working_set_bytes{<<.LabelMatchers>>,id='/'}) by (<<.GroupBy>>)", + Resources: ResourceMapping{ + Template: fmt.Sprintf("%s<<.Resource>>", labelPrefix), + }, + ContainerLabel: fmt.Sprintf("%scontainer_name", labelPrefix), + }, + Window: pmodel.Duration(rateInterval), + }, } } diff --git a/pkg/config/config.go b/pkg/config/config.go index 6a31e8f1..5f1d5142 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,11 +1,16 @@ package config +import ( + pmodel "github.com/prometheus/common/model" +) + type MetricsDiscoveryConfig struct { // Rules specifies how to discover and map Prometheus metrics to // custom metrics API resources. The rules are applied independently, // and thus must be mutually exclusive. Rules will the same SeriesQuery // will make only a single API call. - Rules []DiscoveryRule `yaml:"rules"` + Rules []DiscoveryRule `yaml:"rules"` + ResourceRules *ResourceRules `yaml:"resourceRules,omitempty"` } // DiscoveryRule describes on set of rules for transforming Prometheus metrics to/from @@ -73,3 +78,31 @@ type NameMapping struct { // if only one is present, and will error if multiple are. As string `yaml:"as"` } + +// ResourceRules describe the rules for querying resource metrics +// API results. It's assumed that the same metrics can be used +// to aggregate across different resources. +type ResourceRules struct { + CPU ResourceRule `yaml:"cpu"` + Memory ResourceRule `yaml:"memory"` + // Window is the window size reported by the resource metrics API. It should match the value used + // in your containerQuery and nodeQuery if you use a `rate` function. + Window pmodel.Duration `yaml:"window"` +} + +// ResourceRule describes how to query metrics for some particular +// system resource metric. +type ResourceRule struct { + // Container is the query used to fetch the metrics for containers. + ContainerQuery string `yaml:"containerQuery"` + // NodeQuery is the query used to fetch the metrics for nodes + // (for instance, simply aggregating by node label is insufficient for + // cadvisor metrics -- you need to select the `/` container). + NodeQuery string `yaml:"nodeQuery"` + // Resources specifies how associated Kubernetes resources should be discovered for + // the given metrics. + Resources ResourceMapping `yaml:"resources"` + // ContainerLabel indicates the name of the Prometheus label containing the container name + // (since "container" is not a resource, this can't go in the `resources` block, but is similar). + ContainerLabel string `yaml:"containerLabel"` +} diff --git a/pkg/resourceprovider/provider.go b/pkg/resourceprovider/provider.go new file mode 100644 index 00000000..234f2ec0 --- /dev/null +++ b/pkg/resourceprovider/provider.go @@ -0,0 +1,385 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceprovider + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/golang/glog" + "github.com/kubernetes-incubator/metrics-server/pkg/provider" + corev1 "k8s.io/api/core/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime/schema" + apitypes "k8s.io/apimachinery/pkg/types" + metrics "k8s.io/metrics/pkg/apis/metrics" + + "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + "github.com/directxman12/k8s-prometheus-adapter/pkg/config" + "github.com/directxman12/k8s-prometheus-adapter/pkg/naming" + pmodel "github.com/prometheus/common/model" +) + +var ( + nodeResource = schema.GroupResource{Resource: "nodes"} + nsResource = schema.GroupResource{Resource: "ns"} + podResource = schema.GroupResource{Resource: "pods"} +) + +// TODO(directxman12): consider support for nanocore values -- adjust scale if less than 1 millicore, or greater than max int64 + +// newResourceQuery instantiates query information from the give configuration rule for querying +// resource metrics for some resource. +func newResourceQuery(cfg config.ResourceRule, mapper apimeta.RESTMapper) (resourceQuery, error) { + converter, err := naming.NewResourceConverter(cfg.Resources.Template, cfg.Resources.Overrides, mapper) + if err != nil { + return resourceQuery{}, fmt.Errorf("unable to construct label-resource converter: %v", err) + } + + contQuery, err := naming.NewMetricsQuery(cfg.ContainerQuery, converter) + if err != nil { + return resourceQuery{}, fmt.Errorf("unable to construct container metrics query: %v", err) + } + nodeQuery, err := naming.NewMetricsQuery(cfg.NodeQuery, converter) + if err != nil { + return resourceQuery{}, fmt.Errorf("unable to construct node metrics query: %v", err) + } + + return resourceQuery{ + converter: converter, + contQuery: contQuery, + nodeQuery: nodeQuery, + containerLabel: cfg.ContainerLabel, + }, nil + +} + +// resourceQuery represents query information for querying resource metrics for some resource, +// like CPU or memory. +type resourceQuery struct { + converter naming.ResourceConverter + contQuery naming.MetricsQuery + nodeQuery naming.MetricsQuery + containerLabel string +} + +// NewProvider constructs a new MetricsProvider to provide resource metrics from Prometheus using the given rules. +func NewProvider(prom client.Client, mapper apimeta.RESTMapper, cfg *config.ResourceRules) (provider.MetricsProvider, error) { + cpuQuery, err := newResourceQuery(cfg.CPU, mapper) + if err != nil { + return nil, fmt.Errorf("unable to construct querier for CPU metrics: %v", err) + } + memQuery, err := newResourceQuery(cfg.Memory, mapper) + if err != nil { + return nil, fmt.Errorf("unable to construct querier for memory metrics: %v", err) + } + + return &resourceProvider{ + prom: prom, + cpu: cpuQuery, + mem: memQuery, + window: time.Duration(cfg.Window), + }, nil +} + +// resourceProvider is a MetricsProvider that contacts Prometheus to provide +// the resource metrics. +type resourceProvider struct { + prom client.Client + + cpu, mem resourceQuery + + window time.Duration +} + +// nsQueryResults holds the results of one set +// of queries necessary to construct a resource metrics +// API response for a single namespace. +type nsQueryResults struct { + namespace string + cpu, mem queryResults + err error +} + +func (p *resourceProvider) GetContainerMetrics(pods ...apitypes.NamespacedName) ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error) { + if len(pods) == 0 { + return nil, nil, fmt.Errorf("no pods to fetch metrics for") + } + + // TODO(directxman12): figure out how well this scales if we go to list 1000+ pods + // (and consider adding timeouts) + + // group pods by namespace (we could be listing for all pods in the cluster) + podsByNs := make(map[string][]string, len(pods)) + for _, pod := range pods { + podsByNs[pod.Namespace] = append(podsByNs[pod.Namespace], pod.Name) + } + + // actually fetch the results for each namespace + now := pmodel.Now() + resChan := make(chan nsQueryResults, len(podsByNs)) + var wg sync.WaitGroup + wg.Add(len(podsByNs)) + + for ns, podNames := range podsByNs { + go func(ns string, podNames []string) { + defer wg.Done() + resChan <- p.queryBoth(now, podResource, ns, podNames...) + }(ns, podNames) + } + + wg.Wait() + close(resChan) + + // index those results in a map for easy lookup + resultsByNs := make(map[string]nsQueryResults, len(podsByNs)) + for result := range resChan { + if result.err != nil { + glog.Errorf("unable to fetch metrics for pods in namespace %q, skipping: %v", result.namespace, result.err) + continue + } + resultsByNs[result.namespace] = result + } + + // convert the unorganized per-container results into results grouped + // together by namespace, pod, and container + resTimes := make([]provider.TimeInfo, len(pods)) + resMetrics := make([][]metrics.ContainerMetrics, len(pods)) + for i, pod := range pods { + p.assignForPod(pod, resultsByNs, &resMetrics[i], &resTimes[i]) + } + + return resTimes, resMetrics, nil +} + +// assignForPod takes the resource metrics for all containers in the given pod +// from resultsByNs, and places them in MetricsProvider response format in resMetrics, +// also recording the earliest time in resTime. It will return without operating if +// any data is missing. +func (p *resourceProvider) assignForPod(pod apitypes.NamespacedName, resultsByNs map[string]nsQueryResults, resMetrics *[]metrics.ContainerMetrics, resTime *provider.TimeInfo) { + // check to make sure everything is present + nsRes, nsResPresent := resultsByNs[pod.Namespace] + if !nsResPresent { + glog.Errorf("unable to fetch metrics for pods in namespace %q, skipping pod %s", pod.Namespace, pod.String()) + return + } + cpuRes, hasResult := nsRes.cpu[pod.Name] + if !hasResult { + glog.Errorf("unable to fetch CPU metrics for pod %s, skipping", pod.String()) + return + } + memRes, hasResult := nsRes.mem[pod.Name] + if !hasResult { + glog.Errorf("unable to fetch memory metrics for pod %s, skipping", pod.String()) + return + } + + earliestTs := pmodel.Latest + containerMetrics := make(map[string]metrics.ContainerMetrics) + + // organize all the CPU results + for _, cpu := range cpuRes { + containerName := string(cpu.Metric[pmodel.LabelName(p.cpu.containerLabel)]) + if _, present := containerMetrics[containerName]; !present { + containerMetrics[containerName] = metrics.ContainerMetrics{ + Name: containerName, + Usage: corev1.ResourceList{}, + } + } + containerMetrics[containerName].Usage[corev1.ResourceCPU] = *resource.NewMilliQuantity(int64(cpu.Value*1000.0), resource.DecimalSI) + if cpu.Timestamp.Before(earliestTs) { + earliestTs = cpu.Timestamp + } + } + + // organize the memory results + for _, mem := range memRes { + containerName := string(mem.Metric[pmodel.LabelName(p.mem.containerLabel)]) + if _, present := containerMetrics[containerName]; !present { + containerMetrics[containerName] = metrics.ContainerMetrics{ + Name: containerName, + Usage: corev1.ResourceList{}, + } + } + containerMetrics[containerName].Usage[corev1.ResourceMemory] = *resource.NewMilliQuantity(int64(mem.Value*1000.0), resource.BinarySI) + if mem.Timestamp.Before(earliestTs) { + earliestTs = mem.Timestamp + } + } + + // store the time in the final format + *resTime = provider.TimeInfo{ + Timestamp: earliestTs.Time(), + Window: p.window, + } + + // store the container metrics in the final format + containerMetricsList := make([]metrics.ContainerMetrics, 0, len(containerMetrics)) + for _, containerMetric := range containerMetrics { + containerMetricsList = append(containerMetricsList, containerMetric) + } + *resMetrics = containerMetricsList +} + +func (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]provider.TimeInfo, []corev1.ResourceList, error) { + if len(nodes) == 0 { + return nil, nil, fmt.Errorf("no nodes to fetch metrics for") + } + + now := pmodel.Now() + + // run the actual query + qRes := p.queryBoth(now, nodeResource, "", nodes...) + if qRes.err != nil { + return nil, nil, qRes.err + } + + resTimes := make([]provider.TimeInfo, len(nodes)) + resMetrics := make([]corev1.ResourceList, len(nodes)) + + // organize the results + for i, nodeName := range nodes { + // skip if any data is missing + rawCPUs, gotResult := qRes.cpu[nodeName] + if !gotResult { + glog.V(1).Infof("missing CPU for node %q, skipping", nodeName) + continue + } + rawMems, gotResult := qRes.mem[nodeName] + if !gotResult { + glog.V(1).Infof("missing memory for node %q, skipping", nodeName) + continue + } + + rawMem := rawMems[0] + rawCPU := rawCPUs[0] + + // store the results + resMetrics[i] = corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI), + corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI), + } + + // use the earliest timestamp available (in order to be conservative + // when determining if metrics are tainted by startup) + if rawMem.Timestamp.Before(rawCPU.Timestamp) { + resTimes[i] = provider.TimeInfo{ + Timestamp: rawMem.Timestamp.Time(), + Window: p.window, + } + } else { + resTimes[i] = provider.TimeInfo{ + Timestamp: rawCPU.Timestamp.Time(), + Window: 1 * time.Minute, + } + } + } + + return resTimes, resMetrics, nil +} + +// queryBoth queries for both CPU and memory metrics on the given +// Kubernetes API resource (pods or nodes), and errors out if +// either query fails. +func (p *resourceProvider) queryBoth(now pmodel.Time, resource schema.GroupResource, namespace string, names ...string) nsQueryResults { + var cpuRes, memRes queryResults + var cpuErr, memErr error + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + cpuRes, cpuErr = p.runQuery(now, p.cpu, resource, "", names...) + }() + go func() { + defer wg.Done() + memRes, memErr = p.runQuery(now, p.mem, resource, "", names...) + }() + wg.Wait() + + if cpuErr != nil { + return nsQueryResults{ + namespace: namespace, + err: fmt.Errorf("unable to fetch node CPU metrics: %v", cpuErr), + } + } + if memErr != nil { + return nsQueryResults{ + namespace: namespace, + err: fmt.Errorf("unable to fetch node memory metrics: %v", memErr), + } + } + + return nsQueryResults{ + namespace: namespace, + cpu: cpuRes, + mem: memRes, + } +} + +// queryResults maps an object name to all the results matching that object +type queryResults map[string][]*pmodel.Sample + +// runQuery actually queries Prometheus for the metric represented by the given query information, on +// the given Kubernetes API resource (pods or nodes). +func (p *resourceProvider) runQuery(now pmodel.Time, queryInfo resourceQuery, resource schema.GroupResource, namespace string, names ...string) (queryResults, error) { + var query client.Selector + var err error + + // build the query, which needs the special "container" group by if this is for pod metrics + if resource == nodeResource { + query, err = queryInfo.nodeQuery.Build("", resource, namespace, nil, names...) + } else { + extraGroupBy := []string{queryInfo.containerLabel} + query, err = queryInfo.contQuery.Build("", resource, namespace, extraGroupBy, names...) + } + if err != nil { + return nil, fmt.Errorf("unable to construct query: %v", err) + } + + // run the query + rawRes, err := p.prom.Query(context.Background(), now, query) + if err != nil { + return nil, fmt.Errorf("unable to execute query: %v", err) + } + + if rawRes.Type != pmodel.ValVector || rawRes.Vector == nil { + return nil, fmt.Errorf("invalid or empty value of non-vector type (%s) returned", rawRes.Type) + } + + // check the appropriate label for the resource in question + resourceLbl, err := queryInfo.converter.LabelForResource(resource) + if err != nil { + return nil, fmt.Errorf("unable to find label for resource %s: %v", resource.String(), err) + } + + // associate the results back to each given pod or node + res := make(queryResults, len(*rawRes.Vector)) + for _, val := range *rawRes.Vector { + if val == nil { + // skip empty values + continue + } + resKey := string(val.Metric[resourceLbl]) + res[resKey] = append(res[resKey], val) + } + + return res, nil +}