Introduce support for the resource metrics API

This introduces support for the resource metrics in the adapter.
The individual queries and window sizes are fully customizable via a new
config section.  This uses just the generic machinery from
metrics-server to serve the API.
This commit is contained in:
Solly Ross 2018-08-20 17:03:44 -04:00
parent 74c0c53e4f
commit c5801455ec
5 changed files with 503 additions and 9 deletions

View file

@ -51,6 +51,10 @@
name = "github.com/kubernetes-incubator/custom-metrics-apiserver"
version = "kubernetes-1.11.2"
[[constraint]]
name = "github.com/kubernetes-incubator/metrics-server"
branch = "master"
# Core Kubernetes deps
[[constraint]]
name = "k8s.io/api"

View file

@ -27,6 +27,7 @@ import (
"github.com/golang/glog"
basecmd "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
resmetrics "github.com/kubernetes-incubator/metrics-server/pkg/apiserver/generic"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/client-go/rest"
@ -36,6 +37,7 @@ import (
mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics"
adaptercfg "github.com/directxman12/k8s-prometheus-adapter/pkg/config"
cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider"
resprov "github.com/directxman12/k8s-prometheus-adapter/pkg/resourceprovider"
)
type PrometheusAdapter struct {
@ -51,6 +53,8 @@ type PrometheusAdapter struct {
AdapterConfigFile string
// MetricsRelistInterval is the interval at which to relist the set of available metrics
MetricsRelistInterval time.Duration
metricsConfig *adaptercfg.MetricsDiscoveryConfig
}
func (cmd *PrometheusAdapter) makePromClient() (prom.Client, error) {
@ -81,7 +85,7 @@ func (cmd *PrometheusAdapter) addFlags() {
"interval at which to re-list the set of all available metrics from Prometheus")
}
func (cmd *PrometheusAdapter) makeProvider(stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) {
func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) {
// load metrics discovery configuration
if cmd.AdapterConfigFile == "" {
return nil, fmt.Errorf("no metrics discovery configuration file specified (make sure to use --config)")
@ -91,11 +95,7 @@ func (cmd *PrometheusAdapter) makeProvider(stopCh <-chan struct{}) (provider.Cus
return nil, fmt.Errorf("unable to load metrics discovery configuration: %v", err)
}
// make the prometheus client
promClient, err := cmd.makePromClient()
if err != nil {
return nil, fmt.Errorf("unable to construct Prometheus client: %v", err)
}
cmd.metricsConfig = metricsConfig
// grab the mapper and dynamic client
mapper, err := cmd.RESTMapper()
@ -120,6 +120,43 @@ func (cmd *PrometheusAdapter) makeProvider(stopCh <-chan struct{}) (provider.Cus
return cmProvider, nil
}
func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error {
if cmd.metricsConfig.ResourceRules == nil {
// bail if we don't have rules for setting things up
return nil
}
mapper, err := cmd.RESTMapper()
if err != nil {
return err
}
provider, err := resprov.NewProvider(promClient, mapper, cmd.metricsConfig.ResourceRules)
if err != nil {
return fmt.Errorf("unable to construct resource metrics API provider: %v", err)
}
provCfg := &resmetrics.ProviderConfig{
Node: provider,
Pod: provider,
}
informers, err := cmd.Informers()
if err != nil {
return err
}
server, err := cmd.Server()
if err != nil {
return err
}
if err := resmetrics.InstallStorage(provCfg, informers.Core().V1(), server.GenericAPIServer); err != nil {
return err
}
return nil
}
func main() {
logs.InitLogs()
defer logs.FlushLogs()
@ -129,18 +166,33 @@ func main() {
PrometheusURL: "https://localhost",
MetricsRelistInterval: 10 * time.Minute,
}
cmd.Name = "prometheus-metrics-adapter"
cmd.addFlags()
cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the glog flags
cmd.Flags().Parse(os.Args)
// make the prometheus client
promClient, err := cmd.makePromClient()
if err != nil {
glog.Fatalf("unable to construct Prometheus client: %v", err)
}
// construct the provider
cmProvider, err := cmd.makeProvider(wait.NeverStop)
cmProvider, err := cmd.makeProvider(promClient, wait.NeverStop)
if err != nil {
glog.Fatalf("unable to construct custom metrics provider: %v", err)
}
// attach the provider to the server and run it
// attach the provider to the server
cmd.WithCustomMetrics(cmProvider)
// attach resource metrics support
// TODO: make this optional
if err := cmd.addResourceMetricsAPI(promClient); err != nil {
glog.Fatalf("unable to install resource metrics API: %v", err)
}
// run the server
if err := cmd.Run(wait.NeverStop); err != nil {
glog.Fatalf("unable to run custom metrics adapter: %v", err)
}

View file

@ -89,5 +89,25 @@ func DefaultConfig(rateInterval time.Duration, labelPrefix string) *MetricsDisco
MetricsQuery: fmt.Sprintf("sum(rate(<<.Series>>{<<.LabelMatchers>>}[%s])) by (<<.GroupBy>>)", pmodel.Duration(rateInterval).String()),
},
},
ResourceRules: &ResourceRules{
CPU: ResourceRule{
ContainerQuery: fmt.Sprintf("sum(rate(container_cpu_usage_seconds_total{<<.LabelMatchers>>}[%s])) by (<<.GroupBy>>)", pmodel.Duration(rateInterval).String()),
NodeQuery: fmt.Sprintf("sum(rate(container_cpu_usage_seconds_total{<<.LabelMatchers>>, id='/'}[%s])) by (<<.GroupBy>>)", pmodel.Duration(rateInterval).String()),
Resources: ResourceMapping{
Template: fmt.Sprintf("%s<<.Resource>>", labelPrefix),
},
ContainerLabel: fmt.Sprintf("%scontainer_name", labelPrefix),
},
Memory: ResourceRule{
ContainerQuery: "sum(container_memory_working_set_bytes{<<.LabelMatchers>>}) by (<<.GroupBy>>)",
NodeQuery: "sum(container_memory_working_set_bytes{<<.LabelMatchers>>,id='/'}) by (<<.GroupBy>>)",
Resources: ResourceMapping{
Template: fmt.Sprintf("%s<<.Resource>>", labelPrefix),
},
ContainerLabel: fmt.Sprintf("%scontainer_name", labelPrefix),
},
Window: pmodel.Duration(rateInterval),
},
}
}

View file

@ -1,11 +1,16 @@
package config
import (
pmodel "github.com/prometheus/common/model"
)
type MetricsDiscoveryConfig struct {
// Rules specifies how to discover and map Prometheus metrics to
// custom metrics API resources. The rules are applied independently,
// and thus must be mutually exclusive. Rules will the same SeriesQuery
// will make only a single API call.
Rules []DiscoveryRule `yaml:"rules"`
ResourceRules *ResourceRules `yaml:"resourceRules,omitempty"`
}
// DiscoveryRule describes on set of rules for transforming Prometheus metrics to/from
@ -73,3 +78,31 @@ type NameMapping struct {
// if only one is present, and will error if multiple are.
As string `yaml:"as"`
}
// ResourceRules describe the rules for querying resource metrics
// API results. It's assumed that the same metrics can be used
// to aggregate across different resources.
type ResourceRules struct {
CPU ResourceRule `yaml:"cpu"`
Memory ResourceRule `yaml:"memory"`
// Window is the window size reported by the resource metrics API. It should match the value used
// in your containerQuery and nodeQuery if you use a `rate` function.
Window pmodel.Duration `yaml:"window"`
}
// ResourceRule describes how to query metrics for some particular
// system resource metric.
type ResourceRule struct {
// Container is the query used to fetch the metrics for containers.
ContainerQuery string `yaml:"containerQuery"`
// NodeQuery is the query used to fetch the metrics for nodes
// (for instance, simply aggregating by node label is insufficient for
// cadvisor metrics -- you need to select the `/` container).
NodeQuery string `yaml:"nodeQuery"`
// Resources specifies how associated Kubernetes resources should be discovered for
// the given metrics.
Resources ResourceMapping `yaml:"resources"`
// ContainerLabel indicates the name of the Prometheus label containing the container name
// (since "container" is not a resource, this can't go in the `resources` block, but is similar).
ContainerLabel string `yaml:"containerLabel"`
}

View file

@ -0,0 +1,385 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceprovider
import (
"context"
"fmt"
"sync"
"time"
"github.com/golang/glog"
"github.com/kubernetes-incubator/metrics-server/pkg/provider"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
metrics "k8s.io/metrics/pkg/apis/metrics"
"github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/config"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
pmodel "github.com/prometheus/common/model"
)
var (
nodeResource = schema.GroupResource{Resource: "nodes"}
nsResource = schema.GroupResource{Resource: "ns"}
podResource = schema.GroupResource{Resource: "pods"}
)
// TODO(directxman12): consider support for nanocore values -- adjust scale if less than 1 millicore, or greater than max int64
// newResourceQuery instantiates query information from the give configuration rule for querying
// resource metrics for some resource.
func newResourceQuery(cfg config.ResourceRule, mapper apimeta.RESTMapper) (resourceQuery, error) {
converter, err := naming.NewResourceConverter(cfg.Resources.Template, cfg.Resources.Overrides, mapper)
if err != nil {
return resourceQuery{}, fmt.Errorf("unable to construct label-resource converter: %v", err)
}
contQuery, err := naming.NewMetricsQuery(cfg.ContainerQuery, converter)
if err != nil {
return resourceQuery{}, fmt.Errorf("unable to construct container metrics query: %v", err)
}
nodeQuery, err := naming.NewMetricsQuery(cfg.NodeQuery, converter)
if err != nil {
return resourceQuery{}, fmt.Errorf("unable to construct node metrics query: %v", err)
}
return resourceQuery{
converter: converter,
contQuery: contQuery,
nodeQuery: nodeQuery,
containerLabel: cfg.ContainerLabel,
}, nil
}
// resourceQuery represents query information for querying resource metrics for some resource,
// like CPU or memory.
type resourceQuery struct {
converter naming.ResourceConverter
contQuery naming.MetricsQuery
nodeQuery naming.MetricsQuery
containerLabel string
}
// NewProvider constructs a new MetricsProvider to provide resource metrics from Prometheus using the given rules.
func NewProvider(prom client.Client, mapper apimeta.RESTMapper, cfg *config.ResourceRules) (provider.MetricsProvider, error) {
cpuQuery, err := newResourceQuery(cfg.CPU, mapper)
if err != nil {
return nil, fmt.Errorf("unable to construct querier for CPU metrics: %v", err)
}
memQuery, err := newResourceQuery(cfg.Memory, mapper)
if err != nil {
return nil, fmt.Errorf("unable to construct querier for memory metrics: %v", err)
}
return &resourceProvider{
prom: prom,
cpu: cpuQuery,
mem: memQuery,
window: time.Duration(cfg.Window),
}, nil
}
// resourceProvider is a MetricsProvider that contacts Prometheus to provide
// the resource metrics.
type resourceProvider struct {
prom client.Client
cpu, mem resourceQuery
window time.Duration
}
// nsQueryResults holds the results of one set
// of queries necessary to construct a resource metrics
// API response for a single namespace.
type nsQueryResults struct {
namespace string
cpu, mem queryResults
err error
}
func (p *resourceProvider) GetContainerMetrics(pods ...apitypes.NamespacedName) ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error) {
if len(pods) == 0 {
return nil, nil, fmt.Errorf("no pods to fetch metrics for")
}
// TODO(directxman12): figure out how well this scales if we go to list 1000+ pods
// (and consider adding timeouts)
// group pods by namespace (we could be listing for all pods in the cluster)
podsByNs := make(map[string][]string, len(pods))
for _, pod := range pods {
podsByNs[pod.Namespace] = append(podsByNs[pod.Namespace], pod.Name)
}
// actually fetch the results for each namespace
now := pmodel.Now()
resChan := make(chan nsQueryResults, len(podsByNs))
var wg sync.WaitGroup
wg.Add(len(podsByNs))
for ns, podNames := range podsByNs {
go func(ns string, podNames []string) {
defer wg.Done()
resChan <- p.queryBoth(now, podResource, ns, podNames...)
}(ns, podNames)
}
wg.Wait()
close(resChan)
// index those results in a map for easy lookup
resultsByNs := make(map[string]nsQueryResults, len(podsByNs))
for result := range resChan {
if result.err != nil {
glog.Errorf("unable to fetch metrics for pods in namespace %q, skipping: %v", result.namespace, result.err)
continue
}
resultsByNs[result.namespace] = result
}
// convert the unorganized per-container results into results grouped
// together by namespace, pod, and container
resTimes := make([]provider.TimeInfo, len(pods))
resMetrics := make([][]metrics.ContainerMetrics, len(pods))
for i, pod := range pods {
p.assignForPod(pod, resultsByNs, &resMetrics[i], &resTimes[i])
}
return resTimes, resMetrics, nil
}
// assignForPod takes the resource metrics for all containers in the given pod
// from resultsByNs, and places them in MetricsProvider response format in resMetrics,
// also recording the earliest time in resTime. It will return without operating if
// any data is missing.
func (p *resourceProvider) assignForPod(pod apitypes.NamespacedName, resultsByNs map[string]nsQueryResults, resMetrics *[]metrics.ContainerMetrics, resTime *provider.TimeInfo) {
// check to make sure everything is present
nsRes, nsResPresent := resultsByNs[pod.Namespace]
if !nsResPresent {
glog.Errorf("unable to fetch metrics for pods in namespace %q, skipping pod %s", pod.Namespace, pod.String())
return
}
cpuRes, hasResult := nsRes.cpu[pod.Name]
if !hasResult {
glog.Errorf("unable to fetch CPU metrics for pod %s, skipping", pod.String())
return
}
memRes, hasResult := nsRes.mem[pod.Name]
if !hasResult {
glog.Errorf("unable to fetch memory metrics for pod %s, skipping", pod.String())
return
}
earliestTs := pmodel.Latest
containerMetrics := make(map[string]metrics.ContainerMetrics)
// organize all the CPU results
for _, cpu := range cpuRes {
containerName := string(cpu.Metric[pmodel.LabelName(p.cpu.containerLabel)])
if _, present := containerMetrics[containerName]; !present {
containerMetrics[containerName] = metrics.ContainerMetrics{
Name: containerName,
Usage: corev1.ResourceList{},
}
}
containerMetrics[containerName].Usage[corev1.ResourceCPU] = *resource.NewMilliQuantity(int64(cpu.Value*1000.0), resource.DecimalSI)
if cpu.Timestamp.Before(earliestTs) {
earliestTs = cpu.Timestamp
}
}
// organize the memory results
for _, mem := range memRes {
containerName := string(mem.Metric[pmodel.LabelName(p.mem.containerLabel)])
if _, present := containerMetrics[containerName]; !present {
containerMetrics[containerName] = metrics.ContainerMetrics{
Name: containerName,
Usage: corev1.ResourceList{},
}
}
containerMetrics[containerName].Usage[corev1.ResourceMemory] = *resource.NewMilliQuantity(int64(mem.Value*1000.0), resource.BinarySI)
if mem.Timestamp.Before(earliestTs) {
earliestTs = mem.Timestamp
}
}
// store the time in the final format
*resTime = provider.TimeInfo{
Timestamp: earliestTs.Time(),
Window: p.window,
}
// store the container metrics in the final format
containerMetricsList := make([]metrics.ContainerMetrics, 0, len(containerMetrics))
for _, containerMetric := range containerMetrics {
containerMetricsList = append(containerMetricsList, containerMetric)
}
*resMetrics = containerMetricsList
}
func (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]provider.TimeInfo, []corev1.ResourceList, error) {
if len(nodes) == 0 {
return nil, nil, fmt.Errorf("no nodes to fetch metrics for")
}
now := pmodel.Now()
// run the actual query
qRes := p.queryBoth(now, nodeResource, "", nodes...)
if qRes.err != nil {
return nil, nil, qRes.err
}
resTimes := make([]provider.TimeInfo, len(nodes))
resMetrics := make([]corev1.ResourceList, len(nodes))
// organize the results
for i, nodeName := range nodes {
// skip if any data is missing
rawCPUs, gotResult := qRes.cpu[nodeName]
if !gotResult {
glog.V(1).Infof("missing CPU for node %q, skipping", nodeName)
continue
}
rawMems, gotResult := qRes.mem[nodeName]
if !gotResult {
glog.V(1).Infof("missing memory for node %q, skipping", nodeName)
continue
}
rawMem := rawMems[0]
rawCPU := rawCPUs[0]
// store the results
resMetrics[i] = corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI),
corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI),
}
// use the earliest timestamp available (in order to be conservative
// when determining if metrics are tainted by startup)
if rawMem.Timestamp.Before(rawCPU.Timestamp) {
resTimes[i] = provider.TimeInfo{
Timestamp: rawMem.Timestamp.Time(),
Window: p.window,
}
} else {
resTimes[i] = provider.TimeInfo{
Timestamp: rawCPU.Timestamp.Time(),
Window: 1 * time.Minute,
}
}
}
return resTimes, resMetrics, nil
}
// queryBoth queries for both CPU and memory metrics on the given
// Kubernetes API resource (pods or nodes), and errors out if
// either query fails.
func (p *resourceProvider) queryBoth(now pmodel.Time, resource schema.GroupResource, namespace string, names ...string) nsQueryResults {
var cpuRes, memRes queryResults
var cpuErr, memErr error
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
cpuRes, cpuErr = p.runQuery(now, p.cpu, resource, "", names...)
}()
go func() {
defer wg.Done()
memRes, memErr = p.runQuery(now, p.mem, resource, "", names...)
}()
wg.Wait()
if cpuErr != nil {
return nsQueryResults{
namespace: namespace,
err: fmt.Errorf("unable to fetch node CPU metrics: %v", cpuErr),
}
}
if memErr != nil {
return nsQueryResults{
namespace: namespace,
err: fmt.Errorf("unable to fetch node memory metrics: %v", memErr),
}
}
return nsQueryResults{
namespace: namespace,
cpu: cpuRes,
mem: memRes,
}
}
// queryResults maps an object name to all the results matching that object
type queryResults map[string][]*pmodel.Sample
// runQuery actually queries Prometheus for the metric represented by the given query information, on
// the given Kubernetes API resource (pods or nodes).
func (p *resourceProvider) runQuery(now pmodel.Time, queryInfo resourceQuery, resource schema.GroupResource, namespace string, names ...string) (queryResults, error) {
var query client.Selector
var err error
// build the query, which needs the special "container" group by if this is for pod metrics
if resource == nodeResource {
query, err = queryInfo.nodeQuery.Build("", resource, namespace, nil, names...)
} else {
extraGroupBy := []string{queryInfo.containerLabel}
query, err = queryInfo.contQuery.Build("", resource, namespace, extraGroupBy, names...)
}
if err != nil {
return nil, fmt.Errorf("unable to construct query: %v", err)
}
// run the query
rawRes, err := p.prom.Query(context.Background(), now, query)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %v", err)
}
if rawRes.Type != pmodel.ValVector || rawRes.Vector == nil {
return nil, fmt.Errorf("invalid or empty value of non-vector type (%s) returned", rawRes.Type)
}
// check the appropriate label for the resource in question
resourceLbl, err := queryInfo.converter.LabelForResource(resource)
if err != nil {
return nil, fmt.Errorf("unable to find label for resource %s: %v", resource.String(), err)
}
// associate the results back to each given pod or node
res := make(queryResults, len(*rawRes.Vector))
for _, val := range *rawRes.Vector {
if val == nil {
// skip empty values
continue
}
resKey := string(val.Metric[resourceLbl])
res[resKey] = append(res[resKey], val)
}
return res, nil
}