From 5bff503339473d41246fcb4a8295a90ced39c6f8 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Tue, 9 May 2017 21:34:24 -0400 Subject: [PATCH] Initial Functionality The initial functionality works. There's still a number of TODOs to clean up, and some edge cases to work around, and some errors that could be handled better. --- .gitignore | 4 + cmd/adapter.go | 43 +++ cmd/app/start.go | 147 ++++++++ glide.lock | 433 +++++++++++++++++++++++ glide.yaml | 24 ++ pkg/client/api.go | 212 +++++++++++ pkg/client/helpers.go | 68 ++++ pkg/client/interfaces.go | 121 +++++++ pkg/client/types.go | 62 ++++ pkg/custom-provider/metric_namer.go | 360 +++++++++++++++++++ pkg/custom-provider/metric_namer_test.go | 381 ++++++++++++++++++++ pkg/custom-provider/provider.go | 343 ++++++++++++++++++ pkg/custom-provider/provider_test.go | 166 +++++++++ 13 files changed, 2364 insertions(+) create mode 100644 .gitignore create mode 100644 cmd/adapter.go create mode 100644 cmd/app/start.go create mode 100644 glide.lock create mode 100644 glide.yaml create mode 100644 pkg/client/api.go create mode 100644 pkg/client/helpers.go create mode 100644 pkg/client/interfaces.go create mode 100644 pkg/client/types.go create mode 100644 pkg/custom-provider/metric_namer.go create mode 100644 pkg/custom-provider/metric_namer_test.go create mode 100644 pkg/custom-provider/provider.go create mode 100644 pkg/custom-provider/provider_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..f56a2b9f --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.swp +*~ +vendor +adapter diff --git a/cmd/adapter.go b/cmd/adapter.go new file mode 100644 index 00000000..25229e1b --- /dev/null +++ b/cmd/adapter.go @@ -0,0 +1,43 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "os" + "runtime" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/logs" + + "github.com/directxman12/k8s-prometheus-adapter/cmd/app" +) + +func main() { + logs.InitLogs() + defer logs.FlushLogs() + + if len(os.Getenv("GOMAXPROCS")) == 0 { + runtime.GOMAXPROCS(runtime.NumCPU()) + } + + cmd := app.NewCommandStartPrometheusAdapterServer(os.Stdout, os.Stderr, wait.NeverStop) + cmd.Flags().AddGoFlagSet(flag.CommandLine) + if err := cmd.Execute(); err != nil { + panic(err) + } +} diff --git a/cmd/app/start.go b/cmd/app/start.go new file mode 100644 index 00000000..3354c399 --- /dev/null +++ b/cmd/app/start.go @@ -0,0 +1,147 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "net/http" + "net/url" + "fmt" + "io" + "time" + + "github.com/spf13/cobra" + "k8s.io/client-go/rest" + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/discovery" + + "k8s.io/custom-metrics-boilerplate/pkg/cmd/server" + cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider" + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +// NewCommandStartPrometheusAdapterServer provides a CLI handler for 'start master' command +func NewCommandStartPrometheusAdapterServer(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command { + baseOpts := server.NewCustomMetricsAdapterServerOptions(out, errOut) + o := PrometheusAdapterServerOptions{ + CustomMetricsAdapterServerOptions: baseOpts, + MetricsRelistInterval: 10 * time.Minute, + RateInterval: 5 * time.Minute, + PrometheusURL: "https://localhost", + } + + cmd := &cobra.Command{ + Short: "Launch the custom metrics API adapter server", + Long: "Launch the custom metrics API adapter server", + RunE: func(c *cobra.Command, args []string) error { + if err := o.Complete(); err != nil { + return err + } + if err := o.Validate(args); err != nil { + return err + } + if err := o.RunCustomMetricsAdapterServer(stopCh); err != nil { + return err + } + return nil + }, + } + + flags := cmd.Flags() + o.SecureServing.AddFlags(flags) + o.Authentication.AddFlags(flags) + o.Authorization.AddFlags(flags) + o.Features.AddFlags(flags) + + flags.StringVar(&o.RemoteKubeConfigFile, "lister-kubeconfig", o.RemoteKubeConfigFile, ""+ + "kubeconfig file pointing at the 'core' kubernetes server with enough rights to list "+ + "any described objets") + flags.DurationVar(&o.MetricsRelistInterval, "metrics-relist-interval", o.MetricsRelistInterval, ""+ + "interval at which to re-list the set of all available metrics from Prometheus") + flags.DurationVar(&o.RateInterval, "rate-interval", o.RateInterval, ""+ + "period of time used to calculate rate metrics from cumulative metrics") + flags.StringVar(&o.PrometheusURL, "prometheus-url", o.PrometheusURL, + "URL and configuration for connecting to Prometheus. Query parameters are used to configure the connection") + + return cmd +} + +func (o PrometheusAdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct{}) error { + config, err := o.Config() + if err != nil { + return err + } + + var clientConfig *rest.Config + if len(o.RemoteKubeConfigFile) > 0 { + loadingRules := &clientcmd.ClientConfigLoadingRules{ExplicitPath: o.RemoteKubeConfigFile} + loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{}) + + clientConfig, err = loader.ClientConfig() + } else { + clientConfig, err = rest.InClusterConfig() + } + if err != nil { + return fmt.Errorf("unable to construct lister client config to initialize provider: %v", err) + } + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(clientConfig) + if err != nil { + return fmt.Errorf("unable to construct discovery client for dynamic client: %v", err) + } + + // TODO: this needs to refresh it's discovery info every once and a while + resources, err := discovery.GetAPIGroupResources(discoveryClient) + if err != nil { + return fmt.Errorf("unable to construct discovery REST mapper: unable to fetch list of resources: %v", err) + } + dynamicMapper := discovery.NewRESTMapper(resources, api.Registry.InterfacesFor) + + clientPool := dynamic.NewClientPool(clientConfig, dynamicMapper, dynamic.LegacyAPIPathResolverFunc) + if err != nil { + return fmt.Errorf("unable to construct lister client to initialize provider: %v", err) + } + + // TODO: actually configure this client (strip query vars, etc) + baseURL, err := url.Parse(o.PrometheusURL) + if err != nil { + return fmt.Errorf("invalid Prometheus URL %q: %v", baseURL, err) + } + promClient := prom.NewClient(http.DefaultClient, baseURL) + + cmProvider := cmprov.NewPrometheusProvider(dynamicMapper, clientPool, promClient, o.MetricsRelistInterval, o.RateInterval) + + server, err := config.Complete().New(cmProvider) + if err != nil { + return err + } + return server.GenericAPIServer.PrepareRun().Run(stopCh) +} + +type PrometheusAdapterServerOptions struct { + *server.CustomMetricsAdapterServerOptions + + // RemoteKubeConfigFile is the config used to list pods from the master API server + RemoteKubeConfigFile string + // MetricsRelistInterval is the interval at which to relist the set of available metrics + MetricsRelistInterval time.Duration + // RateInterval is the period of time used to calculate rate metrics + RateInterval time.Duration + // PrometheusURL is the URL describing how to connect to Prometheus. Query parameters configure connection options. + PrometheusURL string +} diff --git a/glide.lock b/glide.lock new file mode 100644 index 00000000..a2624daf --- /dev/null +++ b/glide.lock @@ -0,0 +1,433 @@ +hash: 392c27ea77f03fae3cba2775189e5e12f987b5bca8d897750ac83c407e84166f +updated: 2017-05-01T14:24:55.988198521-04:00 +imports: +- name: bitbucket.org/ww/goautoneg + version: 75cd24fc2f2c2a2088577d12123ddee5f54e0675 +- name: github.com/beorn7/perks + version: 3ac7bf7a47d159a033b107610db8a1b6575507a4 + subpackages: + - quantile +- name: github.com/coreos/etcd + version: cc198e22d3b8fd7ec98304c95e68ee375be54589 + subpackages: + - alarm + - auth + - auth/authpb + - client + - clientv3 + - compactor + - discovery + - error + - etcdserver + - etcdserver/api + - etcdserver/api/v2http + - etcdserver/api/v2http/httptypes + - etcdserver/api/v3rpc + - etcdserver/api/v3rpc/rpctypes + - etcdserver/auth + - etcdserver/etcdserverpb + - etcdserver/membership + - etcdserver/stats + - integration + - lease + - lease/leasehttp + - lease/leasepb + - mvcc + - mvcc/backend + - mvcc/mvccpb + - pkg/adt + - pkg/contention + - pkg/crc + - pkg/fileutil + - pkg/httputil + - pkg/idutil + - pkg/ioutil + - pkg/logutil + - pkg/netutil + - pkg/pathutil + - pkg/pbutil + - pkg/runtime + - pkg/schedule + - pkg/testutil + - pkg/tlsutil + - pkg/transport + - pkg/types + - pkg/wait + - raft + - raft/raftpb + - rafthttp + - snap + - snap/snappb + - store + - version + - wal + - wal/walpb +- name: github.com/coreos/go-systemd + version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6 + subpackages: + - daemon + - journal +- name: github.com/coreos/pkg + version: fa29b1d70f0beaddd4c7021607cc3c3be8ce94b8 + subpackages: + - capnslog + - health + - httputil + - timeutil +- name: github.com/davecgh/go-spew + version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d + subpackages: + - spew +- name: github.com/docker/distribution + version: cd27f179f2c10c5d300e6d09025b538c475b0d51 + subpackages: + - digest + - reference +- name: github.com/elazarl/go-bindata-assetfs + version: 3dcc96556217539f50599357fb481ac0dc7439b9 +- name: github.com/emicklei/go-restful + version: 09691a3b6378b740595c1002f40c34dd5f218a22 + subpackages: + - log + - swagger +- name: github.com/evanphx/json-patch + version: ba18e35c5c1b36ef6334cad706eb681153d2d379 +- name: github.com/ghodss/yaml + version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee +- name: github.com/go-openapi/jsonpointer + version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 +- name: github.com/go-openapi/jsonreference + version: 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272 +- name: github.com/go-openapi/spec + version: 6aced65f8501fe1217321abf0749d354824ba2ff +- name: github.com/go-openapi/swag + version: 1d0bd113de87027671077d3c71eb3ac5d7dbba72 +- name: github.com/gogo/protobuf + version: e18d7aa8f8c624c915db340349aad4c49b10d173 + subpackages: + - proto + - sortkeys +- name: github.com/golang/glog + version: 44145f04b68cf362d9c4df2182967c2275eaefed +- name: github.com/golang/groupcache + version: 02826c3e79038b59d737d3b1c0a1d937f71a4433 + subpackages: + - lru +- name: github.com/golang/protobuf + version: 8616e8ee5e20a1704615e6c8d7afcdac06087a67 + subpackages: + - jsonpb + - proto +- name: github.com/google/gofuzz + version: 44d81051d367757e1c7c6a5a86423ece9afcf63c +- name: github.com/grpc-ecosystem/grpc-gateway + version: f52d055dc48aec25854ed7d31862f78913cf17d1 + subpackages: + - runtime + - runtime/internal + - utilities +- name: github.com/howeyc/gopass + version: 3ca23474a7c7203e0a0a070fd33508f6efdb9b3d +- name: github.com/imdario/mergo + version: 6633656539c1639d9d78127b7d47c622b5d7b6dc +- name: github.com/inconshreveable/mousetrap + version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 +- name: github.com/juju/ratelimit + version: 77ed1c8a01217656d2080ad51981f6e99adaa177 +- name: github.com/mailru/easyjson + version: d5b7844b561a7bc640052f1b935f7b800330d7e0 + subpackages: + - buffer + - jlexer + - jwriter +- name: github.com/matttproud/golang_protobuf_extensions + version: fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a + subpackages: + - pbutil +- name: github.com/pborman/uuid + version: ca53cad383cad2479bbba7f7a1a05797ec1386e4 +- name: github.com/pkg/errors + version: a22138067af1c4942683050411a841ade67fe1eb +- name: github.com/prometheus/client_golang + version: e51041b3fa41cece0dca035740ba6411905be473 + subpackages: + - prometheus +- name: github.com/prometheus/client_model + version: fa8ad6fec33561be4280a8f0514318c79d7f6cb6 + subpackages: + - go +- name: github.com/prometheus/common + version: ffe929a3f4c4faeaa10f2b9535c2b1be3ad15650 + subpackages: + - expfmt + - model +- name: github.com/prometheus/procfs + version: 454a56f35412459b5e684fd5ec0f9211b94f002a +- name: github.com/PuerkitoBio/purell + version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 +- name: github.com/PuerkitoBio/urlesc + version: 5bd2802263f21d8788851d5305584c82a5c75d7e +- name: github.com/spf13/cobra + version: f62e98d28ab7ad31d707ba837a966378465c7b57 +- name: github.com/spf13/pflag + version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 +- name: github.com/stretchr/testify + version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 + subpackages: + - assert +- name: github.com/ugorji/go + version: ded73eae5db7e7a0ef6f55aace87a2873c5d2b74 + subpackages: + - codec +- name: golang.org/x/crypto + version: d172538b2cfce0c13cee31e647d0367aa8cd2486 + subpackages: + - bcrypt + - blowfish + - ssh/terminal +- name: golang.org/x/net + version: e90d6d0afc4c315a0d87a568ae68577cc15149a0 + subpackages: + - context + - html + - html/atom + - http2 + - http2/hpack + - idna + - internal/timeseries + - lex/httplex + - trace + - websocket +- name: golang.org/x/sys + version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 + subpackages: + - unix +- name: golang.org/x/text + version: 2910a502d2bf9e43193af9d68ca516529614eed3 + subpackages: + - cases + - internal/tag + - language + - runes + - secure/bidirule + - secure/precis + - transform + - unicode/bidi + - unicode/norm + - width +- name: google.golang.org/grpc + version: 231b4cfea0e79843053a33f5fe90bd4d84b23cd3 + subpackages: + - codes + - credentials + - grpclog + - internal + - metadata + - naming + - peer + - transport +- name: gopkg.in/inf.v0 + version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 +- name: gopkg.in/natefinch/lumberjack.v2 + version: 20b71e5b60d756d3d2f80def009790325acc2b23 +- name: gopkg.in/yaml.v2 + version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 +- name: k8s.io/apimachinery + version: 20e10d54608f05c3059443a6c0afb9979641e88d + subpackages: + - pkg/api/equality + - pkg/api/errors + - pkg/api/meta + - pkg/api/resource + - pkg/api/validation + - pkg/api/validation/path + - pkg/apimachinery + - pkg/apimachinery/announced + - pkg/apimachinery/registered + - pkg/apis/meta/internalversion + - pkg/apis/meta/v1 + - pkg/apis/meta/v1/unstructured + - pkg/apis/meta/v1/validation + - pkg/conversion + - pkg/conversion/queryparams + - pkg/conversion/unstructured + - pkg/fields + - pkg/labels + - pkg/openapi + - pkg/runtime + - pkg/runtime/schema + - pkg/runtime/serializer + - pkg/runtime/serializer/json + - pkg/runtime/serializer/protobuf + - pkg/runtime/serializer/recognizer + - pkg/runtime/serializer/streaming + - pkg/runtime/serializer/versioning + - pkg/selection + - pkg/types + - pkg/util/diff + - pkg/util/errors + - pkg/util/framer + - pkg/util/httpstream + - pkg/util/intstr + - pkg/util/json + - pkg/util/mergepatch + - pkg/util/net + - pkg/util/rand + - pkg/util/runtime + - pkg/util/sets + - pkg/util/strategicpatch + - pkg/util/uuid + - pkg/util/validation + - pkg/util/validation/field + - pkg/util/wait + - pkg/util/yaml + - pkg/version + - pkg/watch + - third_party/forked/golang/json + - third_party/forked/golang/netutil + - third_party/forked/golang/reflect +- name: k8s.io/apiserver + version: dcf548fbe26dacc3a78d18e1135adf17006552e9 + subpackages: + - pkg/admission + - pkg/apis/apiserver + - pkg/apis/apiserver/install + - pkg/apis/apiserver/v1alpha1 + - pkg/authentication/authenticator + - pkg/authentication/authenticatorfactory + - pkg/authentication/group + - pkg/authentication/request/anonymous + - pkg/authentication/request/bearertoken + - pkg/authentication/request/headerrequest + - pkg/authentication/request/union + - pkg/authentication/request/x509 + - pkg/authentication/serviceaccount + - pkg/authentication/token/tokenfile + - pkg/authentication/user + - pkg/authorization/authorizer + - pkg/authorization/authorizerfactory + - pkg/authorization/union + - pkg/endpoints + - pkg/endpoints/filters + - pkg/endpoints/handlers + - pkg/endpoints/handlers/negotiation + - pkg/endpoints/handlers/responsewriters + - pkg/endpoints/metrics + - pkg/endpoints/openapi + - pkg/endpoints/request + - pkg/features + - pkg/registry/generic + - pkg/registry/generic/registry + - pkg/registry/rest + - pkg/server + - pkg/server/filters + - pkg/server/healthz + - pkg/server/httplog + - pkg/server/mux + - pkg/server/openapi + - pkg/server/options + - pkg/server/routes + - pkg/server/routes/data/swagger + - pkg/server/storage + - pkg/storage + - pkg/storage/errors + - pkg/storage/etcd + - pkg/storage/etcd/metrics + - pkg/storage/etcd/util + - pkg/storage/etcd3 + - pkg/storage/names + - pkg/storage/storagebackend + - pkg/storage/storagebackend/factory + - pkg/util/cache + - pkg/util/feature + - pkg/util/flag + - pkg/util/flushwriter + - pkg/util/logs + - pkg/util/proxy + - pkg/util/trace + - pkg/util/trie + - pkg/util/webhook + - pkg/util/wsstream + - plugin/pkg/authenticator/token/webhook + - plugin/pkg/authorizer/webhook +- name: k8s.io/client-go + version: dabf37f5df16a224729883d9f616ce4a2c282e95 + subpackages: + - discovery + - dynamic + - kubernetes/scheme + - kubernetes/typed/authentication/v1beta1 + - kubernetes/typed/authorization/v1beta1 + - kubernetes/typed/core/v1 + - pkg/api + - pkg/api/install + - pkg/api/v1 + - pkg/apis/apps + - pkg/apis/apps/v1beta1 + - pkg/apis/authentication + - pkg/apis/authentication/install + - pkg/apis/authentication/v1 + - pkg/apis/authentication/v1beta1 + - pkg/apis/authorization + - pkg/apis/authorization/install + - pkg/apis/authorization/v1 + - pkg/apis/authorization/v1beta1 + - pkg/apis/autoscaling + - pkg/apis/autoscaling/v1 + - pkg/apis/autoscaling/v2alpha1 + - pkg/apis/batch + - pkg/apis/batch/v1 + - pkg/apis/batch/v2alpha1 + - pkg/apis/certificates + - pkg/apis/certificates/v1beta1 + - pkg/apis/extensions + - pkg/apis/extensions/v1beta1 + - pkg/apis/policy + - pkg/apis/policy/v1beta1 + - pkg/apis/rbac + - pkg/apis/rbac/v1alpha1 + - pkg/apis/rbac/v1beta1 + - pkg/apis/settings + - pkg/apis/settings/v1alpha1 + - pkg/apis/storage + - pkg/apis/storage/v1 + - pkg/apis/storage/v1beta1 + - pkg/util + - pkg/util/parsers + - pkg/version + - rest + - rest/watch + - tools/auth + - tools/cache + - tools/clientcmd + - tools/clientcmd/api + - tools/clientcmd/api/latest + - tools/clientcmd/api/v1 + - tools/metrics + - transport + - util/cert + - util/clock + - util/flowcontrol + - util/homedir + - util/integer +- name: k8s.io/custom-metrics-boilerplate + version: 0d451f3d16c623ca5b7025b34723e91e9ac543fc + repo: git@github.com:directxman12/custom-metrics-boilerplate.git + subpackages: + - pkg/apiserver + - pkg/apiserver/installer + - pkg/apiserver/installer/context + - pkg/cmd/server + - pkg/provider + - pkg/registry/custom_metrics +- name: k8s.io/metrics + version: fd2415bb9381a6731027b48a8c6b78f28e13f876 + subpackages: + - pkg/apis/custom_metrics + - pkg/apis/custom_metrics/install + - pkg/apis/custom_metrics/v1alpha1 +testImports: +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 00000000..61b769e4 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,24 @@ +package: github.com/directxman12/k8s-prometheus-adapter +import: +- package: github.com/spf13/cobra +- package: k8s.io/apimachinery + subpackages: + - pkg/util/wait +- package: k8s.io/apiserver + subpackages: + - pkg/util/logs +- package: k8s.io/client-go + subpackages: + - kubernetes/typed/core/v1 + - rest + - tools/clientcmd +- package: k8s.io/custom-metrics-boilerplate + repo: git@github.com:directxman12/custom-metrics-boilerplate.git + subpackages: + - pkg/cmd/server + - pkg/provider +- package: github.com/stretchr/testify + version: ^1.1.4 + subpackages: + - assert + - require diff --git a/pkg/client/api.go b/pkg/client/api.go new file mode 100644 index 00000000..c5e843ff --- /dev/null +++ b/pkg/client/api.go @@ -0,0 +1,212 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package prometheus provides bindings to the Prometheus HTTP API: +// http://prometheus.io/docs/querying/api/ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "path" + "io/ioutil" + "io" + + "github.com/prometheus/common/model" + "github.com/golang/glog" +) + +// APIClient is a raw client to the Prometheus Query API. +// It knows how to appropriately deal with generic Prometheus API +// responses, but does not know the specifics of different endpoints. +// You can use this to call query endpoints not represented in Client. +type GenericAPIClient interface { + // Do makes a request to the Prometheus HTTP API against a particular endpoint. Query + // parameters should be in `query`, not `endpoint`. An error will be returned on HTTP + // status errors or errors making or unmarshalling the request, as well as when the + // response has a Status of ResponseError. + Do(ctx context.Context, verb, endpoint string, query url.Values) (APIResponse, error) +} + +// httpAPIClient is a GenericAPIClient implemented in terms of an underlying http.Client. +type httpAPIClient struct { + client *http.Client + baseURL *url.URL +} + +func (c *httpAPIClient) Do(ctx context.Context, verb, endpoint string, query url.Values) (APIResponse, error) { + u := *c.baseURL + u.Path = path.Join(c.baseURL.Path, endpoint) + u.RawQuery = query.Encode() + req, err := http.NewRequest(verb, u.String(), nil) + if err != nil { + // TODO: fix this to return Error? + return APIResponse{}, fmt.Errorf("error constructing HTTP request to Prometheus: %v", err) + } + req.WithContext(ctx) + + resp, err := c.client.Do(req) + defer func() { + if resp != nil { + resp.Body.Close() + } + }() + + if err != nil { + return APIResponse{}, err + } + + if glog.V(6) { + glog.Infof("%s %s %s", verb, u.String(), resp.Status) + } + + code := resp.StatusCode + + // codes that aren't 2xx, 400, 422, or 503 won't return JSON objects + if code/100 != 2 && code != 400 && code != 422 && code != 503 { + return APIResponse{}, &Error{ + Type: ErrBadResponse, + Msg: fmt.Sprintf("unknown response code %d", code), + } + } + + var body io.Reader = resp.Body + if glog.V(8) { + data, err := ioutil.ReadAll(body) + if err != nil { + return APIResponse{}, fmt.Errorf("unable to log response body: %v", err) + } + glog.Infof("Response Body: %s", string(data)) + body = bytes.NewReader(data) + } + + var res APIResponse + if err = json.NewDecoder(body).Decode(&res); err != nil { + // TODO: return what the body actually was? + return APIResponse{}, &Error{ + Type: ErrBadResponse, + Msg: err.Error(), + } + } + + if res.Status == ResponseError { + return res, &Error{ + Type: res.ErrorType, + Msg: res.Error, + } + } + + return res, nil +} + +// NewGenericAPIClient builds a new generic Prometheus API client for the given base URL and HTTP Client. +func NewGenericAPIClient(client *http.Client, baseURL *url.URL) GenericAPIClient { + return &httpAPIClient{ + client: client, + baseURL: baseURL, + } +} + +const ( + queryURL = "/api/v1/query" + queryRangeURL = "/api/v1/query_range" + seriesURL = "/api/v1/series" +) + +// queryClient is a Client that connects to the Prometheus HTTP API. +type queryClient struct { + api GenericAPIClient +} + +// NewClientForAPI creates a Client for the given generic Prometheus API client. +func NewClientForAPI(client GenericAPIClient) Client { + return &queryClient{ + api: client, + } +} + +// NewClient creates a Client for the given HTTP client and base URL (the location of the Prometheus server). +func NewClient(client *http.Client, baseURL *url.URL) Client { + genericClient := NewGenericAPIClient(client, baseURL) + return NewClientForAPI(genericClient) +} + +func (h *queryClient) Series(ctx context.Context, interval model.Interval, selectors ...Selector) ([]Series, error) { + vals := url.Values{} + if interval.Start != 0 { + vals.Set("start", interval.Start.String()) + } + if interval.End != 0 { + vals.Set("end", interval.End.String()) + } + + for _, selector := range selectors { + vals.Add("match[]", string(selector)) + } + + res, err := h.api.Do(ctx, "GET", seriesURL, vals) + if err != nil { + return nil, err + } + + var seriesRes []Series + err = json.Unmarshal(res.Data, &seriesRes) + return seriesRes, err +} + +func (h *queryClient) Query(ctx context.Context, t model.Time, query Selector) (QueryResult, error) { + vals := url.Values{} + vals.Set("query", string(query)) + if t != 0 { + vals.Set("time", t.String()) + } + // TODO: get timeout from context... + + res, err := h.api.Do(ctx, "GET", queryURL, vals) + if err != nil { + return QueryResult{}, err + } + + var queryRes QueryResult + err = json.Unmarshal(res.Data, &queryRes) + return queryRes, err +} + +func (h *queryClient) QueryRange(ctx context.Context, r Range, query Selector) (QueryResult, error) { + vals := url.Values{} + vals.Set("query", string(query)) + + if r.Start != 0 { + vals.Set("start", r.Start.String()) + } + if r.End != 0 { + vals.Set("end", r.End.String()) + } + if r.Step != 0 { + vals.Set("step", model.Duration(r.Step).String()) + } + // TODO: get timeout from context... + + res, err := h.api.Do(ctx, "GET", queryRangeURL, vals) + if err != nil { + return QueryResult{}, err + } + + var queryRes QueryResult + err = json.Unmarshal(res.Data, &queryRes) + return queryRes, err +} diff --git a/pkg/client/helpers.go b/pkg/client/helpers.go new file mode 100644 index 00000000..706fc439 --- /dev/null +++ b/pkg/client/helpers.go @@ -0,0 +1,68 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package client + +import ( + "fmt" + "strings" +) + +// LabelNeq produces a not-equal label selector expression. +// Label is passed verbatim, and value is double-quote escaped +// using Go's escaping is used on value (as per the PromQL rules). +func LabelNeq(label string, value string) string { + return fmt.Sprintf("%s!=%q", label, value) +} + +// LabelEq produces a equal label selector expression. +// Label is passed verbatim, and value is double-quote escaped +// using Go's escaping is used on value (as per the PromQL rules). +func LabelEq(label string, value string) string { + return fmt.Sprintf("%s=%q", label, value) +} + +// LabelMatches produces a regexp-matching label selector expression. +// It has similar constraints to LabelNeq. +func LabelMatches(label string, expr string) string { + return fmt.Sprintf("%s=~%q", label, expr) +} + +// LabelNotMatches produces a inverse regexp-matching label selector expression (the opposite of LabelMatches). +func LabelNotMatches(label string, expr string) string { + return fmt.Sprintf("%s!~%q", label, expr) +} + +// NameMatches produces a label selector expression that checks that the series name matches the given expression. +// It's a convinience wrapper around LabelMatches. +func NameMatches(expr string) string { + return LabelMatches("__name__", expr) +} + +// NameNotMatches produces a label selector expression that checks that the series name doesn't matches the given expression. +// It's a convinience wrapper around LabelNotMatches. +func NameNotMatches(expr string) string { + return LabelNotMatches("__name__", expr) +} + +// MatchSeries takes a series name, and optionally some label expressions, and returns a series selector. +// TODO: validate series name and expressions? +func MatchSeries(name string, labelExpressions ...string) Selector { + if len(labelExpressions) == 0 { + return Selector(name) + } + + return Selector(fmt.Sprintf("%s{%s}", name, strings.Join(labelExpressions, ","))) +} diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go new file mode 100644 index 00000000..a988f091 --- /dev/null +++ b/pkg/client/interfaces.go @@ -0,0 +1,121 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package client + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/prometheus/common/model" +) + +// NB: the official prometheus API client at https://github.com/prometheus/client_golang +// is rather lackluster -- as of the time of writing of this file, it lacked support +// for querying the series metadata, which we need for the adapter. Instead, we use +// this client. + +// Selector represents a series selector +type Selector string + +// Range represents a sliced time range with increments. +type Range struct { + // Start and End are the boundaries of the time range. + Start, End model.Time + // Step is the maximum time between two slices within the boundaries. + Step time.Duration +} + +// TODO: support timeout in the client? +type Client interface { + // Series lists the time series matching the given series selectors + Series(ctx context.Context, interval model.Interval, selectors ...Selector) ([]Series, error) + // Query runs a non-range query at the given time. + Query(ctx context.Context, t model.Time, query Selector) (QueryResult, error) + // QueryRange runs a range query at the given time. + QueryRange(ctx context.Context, r Range, query Selector) (QueryResult, error) +} + +// QueryResult is the result of a query. +// Type will always be set, as well as one of the other fields, matching the type. +type QueryResult struct { + Type model.ValueType + + Vector *model.Vector + Scalar *model.Scalar + Matrix *model.Matrix +} + +func (qr *QueryResult) UnmarshalJSON(b []byte) error { + v := struct { + Type model.ValueType `json:"resultType"` + Result json.RawMessage `json:"result"` + }{} + + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + + qr.Type = v.Type + + switch v.Type { + case model.ValScalar: + var sv model.Scalar + err = json.Unmarshal(v.Result, &sv) + qr.Scalar = &sv + + case model.ValVector: + var vv model.Vector + err = json.Unmarshal(v.Result, &vv) + qr.Vector = &vv + + case model.ValMatrix: + var mv model.Matrix + err = json.Unmarshal(v.Result, &mv) + qr.Matrix = &mv + + default: + err = fmt.Errorf("unexpected value type %q", v.Type) + } + return err +} + +// Series represents a description of a series: a name and a set of labels. +// Series is roughly equivalent to model.Metrics, but has easy access to name +// and the set of non-name labels. +type Series struct { + Name string + Labels model.LabelSet +} + +func (s *Series) UnmarshalJSON(data []byte) error { + var rawMetric model.Metric + err := json.Unmarshal(data, &rawMetric) + if err != nil { + return err + } + + if name, ok := rawMetric[model.MetricNameLabel]; ok { + s.Name = string(name) + delete(rawMetric, model.MetricNameLabel) + } + + s.Labels = model.LabelSet(rawMetric) + + return nil +} diff --git a/pkg/client/types.go b/pkg/client/types.go new file mode 100644 index 00000000..5109111d --- /dev/null +++ b/pkg/client/types.go @@ -0,0 +1,62 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package prometheus provides bindings to the Prometheus HTTP API: +// http://prometheus.io/docs/querying/api/ +package client + +import ( + "encoding/json" + "fmt" +) + +// ErrorType is the type of the API error. +type ErrorType string + +const ( + ErrBadData ErrorType = "bad_data" + ErrTimeout = "timeout" + ErrCanceled = "canceled" + ErrExec = "execution" + ErrBadResponse = "bad_response" +) + +// Error is an error returned by the API. +type Error struct { + Type ErrorType + Msg string +} + +func (e *Error) Error() string { + return fmt.Sprintf("%s: %s", e.Type, e.Msg) +} + +// ResponseStatus is the type of response from the API: succeeded or error. +type ResponseStatus string +const ( + ResponseSucceeded ResponseStatus = "succeeded" + ResponseError = "error" +) + +// APIResponse represents the raw response returned by the API. +type APIResponse struct { + // Status indicates whether this request was successful or whether it errored out. + Status ResponseStatus `json:"status"` + // Data contains the raw data response for this request. + Data json.RawMessage `json:"data"` + + // ErrorType is the type of error, if this is an error response. + ErrorType ErrorType `json:"errorType"` + // Error is the error message, if this is an error response. + Error string `json:"error"` +} diff --git a/pkg/custom-provider/metric_namer.go b/pkg/custom-provider/metric_namer.go new file mode 100644 index 00000000..7c9cb115 --- /dev/null +++ b/pkg/custom-provider/metric_namer.go @@ -0,0 +1,360 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "fmt" + "strings" + "sync" + + "k8s.io/apimachinery/pkg/runtime/schema" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/custom-metrics-boilerplate/pkg/provider" + + "github.com/golang/glog" + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + pmodel "github.com/prometheus/common/model" +) + +// NB: container metrics sourced from cAdvisor don't consistently follow naming conventions, +// so we need to whitelist them and handle them on a case-by-case basis. Metrics ending in `_total` +// *should* be counters, but may actually be guages in this case. + +// SeriesType represents the kind of series backing a metric. +type SeriesType int +const ( + CounterSeries SeriesType = iota + SecondsCounterSeries + GaugeSeries +) + +// SeriesRegistry provides conversions between Prometheus series and MetricInfo +type SeriesRegistry interface { + // SetSeries replaces the known series in this registry + SetSeries(series []prom.Series) error + // ListAllMetrics lists all metrics known to this registry + ListAllMetrics() []provider.MetricInfo + // SeriesForMetric looks up the minimum required series information to make a query for the given metric + // against the given resource (namespace may be empty for non-namespaced resources) + QueryForMetric(info provider.MetricInfo, namespace string, resourceNames ...string) (kind SeriesType, query prom.Selector, groupBy string, found bool) + // MatchValuesToNames matches result values to resource names for the given metric and value set + MatchValuesToNames(metricInfo provider.MetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) +} + +type seriesInfo struct { + // baseSeries represents the minimum information to access a particular series + baseSeries prom.Series + // kind is the type of this series + kind SeriesType + // isContainer indicates if the series is a cAdvisor container_ metric, and thus needs special handling + isContainer bool +} + +// overridableSeriesRegistry is a basic SeriesRegistry +type basicSeriesRegistry struct { + mu sync.RWMutex + + // info maps metric info to information about the corresponding series + info map[provider.MetricInfo]seriesInfo + // metrics is the list of all known metrics + metrics []provider.MetricInfo + + // namer is the metricNamer responsible for converting series to metric names and information + namer metricNamer +} + +func (r *basicSeriesRegistry) SetSeries(newSeries []prom.Series) error { + newInfo := make(map[provider.MetricInfo]seriesInfo) + for _, series := range newSeries { + if strings.HasPrefix(series.Name, "container_") { + r.namer.processContainerSeries(series, newInfo) + } else if namespaceLabel, hasNamespaceLabel := series.Labels["namespace"]; hasNamespaceLabel && namespaceLabel != "" { + // TODO: handle metrics describing a namespace + if err := r.namer.processNamespacedSeries(series, newInfo); err != nil { + // TODO: do we want to log this and continue, or abort? + return err + } + } else { + if err := r.namer.processRootScopedSeries(series, newInfo); err != nil { + // TODO: do we want to log this and continue, or abort? + return err + } + } + } + + newMetrics := make([]provider.MetricInfo, 0, len(newInfo)) + for info, _ := range newInfo { + newMetrics = append(newMetrics, info) + } + + r.mu.Lock() + defer r.mu.Unlock() + + r.info = newInfo + r.metrics = newMetrics + + return nil +} + +func (r *basicSeriesRegistry) ListAllMetrics() []provider.MetricInfo { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.metrics +} + +func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.MetricInfo, namespace string, resourceNames ...string) (kind SeriesType, query prom.Selector, groupBy string, found bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + if len(resourceNames) == 0 { + // TODO: return error? panic? + } + + metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) + if err != nil { + glog.Errorf("unable to normalize group resource while producing a query: %v", err) + return 0, "", "", false + } + + // TODO: support container metrics + if info, found := r.info[metricInfo]; found { + targetValue := resourceNames[0] + matcher := prom.LabelEq + if len(resourceNames) > 1 { + targetValue = strings.Join(resourceNames, "|") + matcher = prom.LabelMatches + } + + var expressions []string + if info.isContainer { + expressions = []string{matcher("pod_name", targetValue), prom.LabelNeq("container_name", "POD")} + groupBy = "pod_name" + } else { + // TODO: copy base series labels? + expressions = []string{matcher(singularResource, targetValue)} + groupBy = singularResource + } + + if metricInfo.Namespaced { + expressions = append(expressions, prom.LabelEq("namespace", namespace)) + } + + return info.kind, prom.MatchSeries(info.baseSeries.Name, expressions...), groupBy, true + } + + glog.V(10).Infof("metric %v not registered", metricInfo) + return 0, "", "", false +} + +func (r *basicSeriesRegistry) MatchValuesToNames(metricInfo provider.MetricInfo, values pmodel.Vector) (matchedValues map[string]pmodel.SampleValue, found bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + metricInfo, singularResource, err := r.namer.normalizeInfo(metricInfo) + if err != nil { + glog.Errorf("unable to normalize group resource while matching values to names: %v", err) + return nil, false + } + + if info, found := r.info[metricInfo]; found { + res := make(map[string]pmodel.SampleValue, len(values)) + for _, val := range values { + if val == nil { + // skip empty values + continue + } + + labelName := pmodel.LabelName(singularResource) + if info.isContainer { + labelName = pmodel.LabelName("pod_name") + } + res[string(val.Metric[labelName])] = val.Value + } + + return res, true + } + + return nil, false +} + +// metricNamer knows how to construct MetricInfo out of raw prometheus series descriptions. +type metricNamer struct { + // overrides contains the list of container metrics whose naming we want to override. + // This is used to properly convert certain cAdvisor container metrics. + overrides map[string]seriesSpec + + mapper apimeta.RESTMapper +} + +// seriesSpec specifies how to produce metric info for a particular prometheus series source +type seriesSpec struct { + // metricName is the desired output API metric name + metricName string + // kind indicates whether or not this metric is cumulative, + // and thus has to be calculated as a rate when returning it + kind SeriesType +} + +// normalizeInfo takes in some metricInfo an "normalizes" it to ensure a common GroupResource form. +func (r *metricNamer) normalizeInfo(metricInfo provider.MetricInfo) (provider.MetricInfo, string, error) { + // NB: we need to "normalize" the metricInfo's GroupResource so we have a consistent pluralization, etc + // TODO: move this to the boilerplate? + normalizedGroupRes, err := r.mapper.ResourceFor(metricInfo.GroupResource.WithVersion("")) + if err != nil { + return provider.MetricInfo{}, "", err + } + metricInfo.GroupResource = normalizedGroupRes.GroupResource() + + singularResource, err := r.mapper.ResourceSingularizer(metricInfo.GroupResource.Resource) + if err != nil { + return provider.MetricInfo{}, "", err + } + + return metricInfo, singularResource, nil +} + +// processContainerSeries performs special work to extract metric definitions +// from cAdvisor-sourced container metrics, which don't particularly follow any useful conventions consistently. +func (n *metricNamer) processContainerSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) { + + originalName := series.Name + + var name string + metricKind := GaugeSeries + if override, hasOverride := n.overrides[series.Name]; hasOverride { + name = override.metricName + metricKind = override.kind + } else { + // chop of the "container_" prefix + series.Name = series.Name[10:] + name, metricKind = n.metricNameFromSeries(series) + } + + info := provider.MetricInfo{ + // TODO: is the plural correct? + GroupResource: schema.GroupResource{Resource: "pods"}, + Namespaced: true, + Metric: name, + } + + infos[info] = seriesInfo{ + kind: metricKind, + baseSeries: prom.Series{Name: originalName}, + isContainer: true, + } +} + +// processNamespacedSeries adds the metric info for the given generic namespaced series to +// the map of metric info. +func (n *metricNamer) processNamespacedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { + name, metricKind := n.metricNameFromSeries(series) + resources, err := n.groupResourcesFromSeries(series) + if err != nil { + return fmt.Errorf("unable to process prometheus series %s: %v", series.Name, err) + } + + // we add one metric for each resource that this could describe + for _, resource := range resources { + info := provider.MetricInfo{ + GroupResource: resource, + Namespaced: true, + Metric: name, + } + + // metrics describing namespaces aren't considered to be namespaced + if resource == (schema.GroupResource{Resource: "namespaces"}) { + info.Namespaced = false + } + + infos[info] = seriesInfo{ + kind: metricKind, + baseSeries: prom.Series{Name: series.Name}, + } + } + + return nil +} + +// processesRootScopedSeries adds the metric info for the given generic namespaced series to +// the map of metric info. +func (n *metricNamer) processRootScopedSeries(series prom.Series, infos map[provider.MetricInfo]seriesInfo) error { + name, metricKind := n.metricNameFromSeries(series) + resources, err := n.groupResourcesFromSeries(series) + if err != nil { + return fmt.Errorf("unable to process prometheus series %s: %v", series.Name, err) + } + + // we add one metric for each resource that this could describe + for _, resource := range resources { + info := provider.MetricInfo{ + GroupResource: resource, + Namespaced: false, + Metric: name, + } + + infos[info] = seriesInfo{ + kind: metricKind, + baseSeries: prom.Series{Name: series.Name}, + } + } + + return nil +} + +// groupResourceFromSeries collects the possible group-resources that this series could describe by +// going through each label, checking to see if it corresponds to a known resource. For instance, +// a series `ingress_http_hits_total{pod="foo",service="bar",ingress="baz",namespace="ns"}` +// would return three GroupResources: "pods", "services", and "ingresses". +// Returned MetricInfo is equilavent to the "normalized" info produced by normalizeInfo. +func (n *metricNamer) groupResourcesFromSeries(series prom.Series) ([]schema.GroupResource, error) { + // TODO: do we need to cache this, or is ResourceFor good enough? + var res []schema.GroupResource + for label, _ := range series.Labels { + // TODO: figure out a way to let people specify a fully-qualified name in label-form + // TODO: will this work when missing a group? + gvr, err := n.mapper.ResourceFor(schema.GroupVersionResource{Resource: string(label)}) + if err != nil { + if apimeta.IsNoMatchError(err) { + continue + } + return nil, err + } + res = append(res, gvr.GroupResource()) + } + + return res, nil +} + +// metricNameFromSeries extracts a metric name from a series name, and indicates +// whether or not that series was a counter. It also has special logic to deal with time-based +// counters, which general get converted to milli-unit rate metrics. +func (n *metricNamer) metricNameFromSeries(series prom.Series) (name string, kind SeriesType) { + kind = GaugeSeries + name = series.Name + if strings.HasSuffix(name, "_total") { + kind = CounterSeries + name = name[:len(name)-6] + + if strings.HasSuffix(name, "_seconds") { + kind = SecondsCounterSeries + name = name[:len(name)-8] + } + } + + return +} diff --git a/pkg/custom-provider/metric_namer_test.go b/pkg/custom-provider/metric_namer_test.go new file mode 100644 index 00000000..6ac4ebf7 --- /dev/null +++ b/pkg/custom-provider/metric_namer_test.go @@ -0,0 +1,381 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "sort" + "testing" + + "k8s.io/client-go/pkg/api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/custom-metrics-boilerplate/pkg/provider" + + // install extensions so that our RESTMapper knows about it + _ "k8s.io/client-go/pkg/apis/extensions/install" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +func setupMetricNamer(t *testing.T) *metricNamer { + return &metricNamer{ + overrides: map[string]seriesSpec{ + "container_actually_gauge_seconds_total": seriesSpec{ + metricName: "actually_gauge", + kind: GaugeSeries, + }, + }, + mapper: api.Registry.RESTMapper(), + } +} + +func TestMetricNamerContainerSeries(t *testing.T) { + testCases := []struct{ + input prom.Series + outputMetricName string + outputInfo seriesInfo + }{ + { + input: prom.Series{ + Name: "container_actually_gauge_seconds_total", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + outputMetricName: "actually_gauge", + outputInfo: seriesInfo{ + baseSeries: prom.Series{Name: "container_actually_gauge_seconds_total"}, + kind: GaugeSeries, + isContainer: true, + }, + }, + { + input: prom.Series{ + Name: "container_some_usage", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + outputMetricName: "some_usage", + outputInfo: seriesInfo{ + baseSeries: prom.Series{Name: "container_some_usage"}, + kind: GaugeSeries, + isContainer: true, + }, + }, + { + input: prom.Series{ + Name: "container_some_count_total", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + outputMetricName: "some_count", + outputInfo: seriesInfo{ + baseSeries: prom.Series{Name: "container_some_count_total"}, + kind: CounterSeries, + isContainer: true, + }, + }, + { + input: prom.Series{ + Name: "container_some_time_seconds_total", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + outputMetricName: "some_time", + outputInfo: seriesInfo{ + baseSeries: prom.Series{Name: "container_some_time_seconds_total"}, + kind: SecondsCounterSeries, + isContainer: true, + }, + }, + } + + assert := assert.New(t) + + namer := setupMetricNamer(t) + resMap := map[provider.MetricInfo]seriesInfo{} + + for _, test := range testCases { + namer.processContainerSeries(test.input, resMap) + metric := provider.MetricInfo{ + Metric: test.outputMetricName, + GroupResource: schema.GroupResource{Resource: "pods"}, + Namespaced: true, + } + if assert.Contains(resMap, metric) { + assert.Equal(test.outputInfo, resMap[metric]) + } + } +} + +func TestSeriesRegistry(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + namer := setupMetricNamer(t) + registry := &basicSeriesRegistry{ + namer: *namer, + } + + inputSeries := []prom.Series{ + // container series + { + Name: "container_actually_gauge_seconds_total", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + { + Name: "container_some_usage", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + { + Name: "container_some_count_total", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + { + Name: "container_some_time_seconds_total", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + // namespaced series + // a series that should turn into multiple metrics + { + Name: "ingress_hits_total", + Labels: map[string]string{"ingress": "someingress", "service": "somesvc", "pod": "backend1", "namespace": "somens"}, + }, + { + Name: "ingress_hits_total", + Labels: map[string]string{"ingress": "someingress", "service": "somesvc", "pod": "backend2", "namespace": "somens"}, + }, + { + Name: "service_proxy_packets", + Labels: map[string]string{"service": "somesvc", "namespace": "somens"}, + }, + { + Name: "work_queue_wait_seconds_total", + Labels: map[string]string{"deployment": "somedep", "namespace": "somens"}, + }, + // non-namespaced series + { + Name: "node_gigawatts", + Labels: map[string]string{"node": "somenode"}, + }, + { + Name: "volume_claims_total", + Labels: map[string]string{"persistentvolume": "somepv"}, + }, + { + Name: "node_fan_seconds_total", + Labels: map[string]string{"node": "somenode"}, + }, + // unrelated series + { + Name: "admin_coffee_liters_total", + Labels: map[string]string{"admin": "some-admin"}, + }, + { + Name: "admin_unread_emails", + Labels: map[string]string{"admin": "some-admin"}, + }, + { + Name: "admin_reddit_seconds_total", + Labels: map[string]string{"admin": "some-admin"}, + }, + } + + // set up the registry + require.NoError(registry.SetSeries(inputSeries)) + + // make sure each metric got registered and can form queries + testCases := []struct{ + title string + info provider.MetricInfo + namespace string + resourceNames []string + + expectedKind SeriesType + expectedQuery string + }{ + // container metrics + { + title: "container metrics overrides / single resource name", + info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "actually_gauge"}, + namespace: "somens", + resourceNames: []string{"somepod"}, + + expectedKind: GaugeSeries, + expectedQuery: "container_actually_gauge_seconds_total{pod_name=\"somepod\",container_name!=\"POD\",namespace=\"somens\"}", + }, + { + title: "container metrics gauge / multiple resource names", + info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_usage"}, + namespace: "somens", + resourceNames: []string{"somepod1", "somepod2"}, + + expectedKind: GaugeSeries, + expectedQuery: "container_some_usage{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}", + }, + { + title: "container metrics counter", + info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_count"}, + namespace: "somens", + resourceNames: []string{"somepod1", "somepod2"}, + + expectedKind: CounterSeries, + expectedQuery: "container_some_count_total{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}", + }, + { + title: "container metrics seconds counter", + info: provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_time"}, + namespace: "somens", + resourceNames: []string{"somepod1", "somepod2"}, + + expectedKind: SecondsCounterSeries, + expectedQuery: "container_some_time_seconds_total{pod_name=~\"somepod1|somepod2\",container_name!=\"POD\",namespace=\"somens\"}", + }, + // namespaced metrics + { + title: "namespaced metrics counter / multidimensional (service)", + info: provider.MetricInfo{schema.GroupResource{Resource: "service"}, true, "ingress_hits"}, + namespace: "somens", + resourceNames: []string{"somesvc"}, + + expectedKind: CounterSeries, + expectedQuery: "ingress_hits_total{service=\"somesvc\",namespace=\"somens\"}", + }, + { + title: "namespaced metrics counter / multidimensional (ingress)", + info: provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "ingress"}, true, "ingress_hits"}, + namespace: "somens", + resourceNames: []string{"someingress"}, + + expectedKind: CounterSeries, + expectedQuery: "ingress_hits_total{ingress=\"someingress\",namespace=\"somens\"}", + }, + { + title: "namespaced metrics counter / multidimensional (pod)", + info: provider.MetricInfo{schema.GroupResource{Resource: "pod"}, true, "ingress_hits"}, + namespace: "somens", + resourceNames: []string{"somepod"}, + + expectedKind: CounterSeries, + expectedQuery: "ingress_hits_total{pod=\"somepod\",namespace=\"somens\"}", + }, + { + title: "namespaced metrics gauge", + info: provider.MetricInfo{schema.GroupResource{Resource: "service"}, true, "service_proxy_packets"}, + namespace: "somens", + resourceNames: []string{"somesvc"}, + + expectedKind: GaugeSeries, + expectedQuery: "service_proxy_packets{service=\"somesvc\",namespace=\"somens\"}", + }, + { + title: "namespaced metrics seconds counter", + info: provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "deployment"}, true, "work_queue_wait"}, + namespace: "somens", + resourceNames: []string{"somedep"}, + + expectedKind: SecondsCounterSeries, + expectedQuery: "work_queue_wait_seconds_total{deployment=\"somedep\",namespace=\"somens\"}", + }, + // non-namespaced series + { + title: "root scoped metrics gauge", + info: provider.MetricInfo{schema.GroupResource{Resource: "node"}, false, "node_gigawatts"}, + resourceNames: []string{"somenode"}, + + expectedKind: GaugeSeries, + expectedQuery: "node_gigawatts{node=\"somenode\"}", + }, + { + title: "root scoped metrics counter", + info: provider.MetricInfo{schema.GroupResource{Resource: "persistentvolume"}, false, "volume_claims"}, + resourceNames: []string{"somepv"}, + + expectedKind: CounterSeries, + expectedQuery: "volume_claims_total{persistentvolume=\"somepv\"}", + }, + { + title: "root scoped metrics seconds counter", + info: provider.MetricInfo{schema.GroupResource{Resource: "node"}, false, "node_fan"}, + resourceNames: []string{"somenode"}, + + expectedKind: SecondsCounterSeries, + expectedQuery: "node_fan_seconds_total{node=\"somenode\"}", + }, + } + + for _, testCase := range testCases { + outputKind, outputQuery, found := registry.QueryForMetric(testCase.info, testCase.namespace, testCase.resourceNames...) + if !assert.True(found, "%s: metric %v should available", testCase.title, testCase.info) { + continue + } + + assert.Equal(testCase.expectedKind, outputKind, "%s: metric %v should have had the right series type", testCase.title, testCase.info) + assert.Equal(prom.Selector(testCase.expectedQuery), outputQuery, "%s: metric %v should have produced the correct query for %v in namespace %s", testCase.title, testCase.info, testCase.resourceNames, testCase.namespace) + } + + allMetrics := registry.ListAllMetrics() + expectedMetrics := []provider.MetricInfo{ + provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "actually_gauge"}, + provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_usage"}, + provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_count"}, + provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_time"}, + provider.MetricInfo{schema.GroupResource{Resource: "services"}, true, "ingress_hits"}, + provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "ingresses"}, true, "ingress_hits"}, + provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "ingress_hits"}, + provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "ingress_hits"}, + provider.MetricInfo{schema.GroupResource{Resource: "services"}, true, "service_proxy_packets"}, + provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "service_proxy_packets"}, + provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "deployments"}, true, "work_queue_wait"}, + provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "work_queue_wait"}, + provider.MetricInfo{schema.GroupResource{Resource: "nodes"}, false, "node_gigawatts"}, + provider.MetricInfo{schema.GroupResource{Resource: "persistentvolumes"}, false, "volume_claims"}, + provider.MetricInfo{schema.GroupResource{Resource: "nodes"}, false, "node_fan"}, + } + + // sort both for easy comparison + sort.Sort(metricInfoSorter(allMetrics)) + sort.Sort(metricInfoSorter(expectedMetrics)) + + assert.Equal(expectedMetrics, allMetrics, "should have listed all expected metrics") +} + +// metricInfoSorter is a sort.Interface for sorting provider.MetricInfos +type metricInfoSorter []provider.MetricInfo + +func (s metricInfoSorter) Len() int { + return len(s) +} + +func (s metricInfoSorter) Less(i, j int) bool { + infoI := s[i] + infoJ := s[j] + + if infoI.Metric == infoJ.Metric { + if infoI.GroupResource == infoJ.GroupResource { + return infoI.Namespaced + } + + if infoI.GroupResource.Group == infoJ.GroupResource.Group { + return infoI.GroupResource.Resource < infoJ.GroupResource.Resource + } + + return infoI.GroupResource.Group < infoJ.GroupResource.Group + } + + return infoI.Metric < infoJ.Metric +} + +func (s metricInfoSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go new file mode 100644 index 00000000..a40ce3ee --- /dev/null +++ b/pkg/custom-provider/provider.go @@ -0,0 +1,343 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "time" + "fmt" + "net/http" + "github.com/golang/glog" + + apierr "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/metrics/pkg/apis/custom_metrics" + "k8s.io/client-go/pkg/api" + _ "k8s.io/client-go/pkg/api/install" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/dynamic" + "k8s.io/custom-metrics-boilerplate/pkg/provider" + "k8s.io/apimachinery/pkg/util/wait" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + pmodel "github.com/prometheus/common/model" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" +) + +// newMetricNotFoundError returns a StatusError indicating the given metric could not be found. +// It is similar to NewNotFound, but more specialized +func newMetricNotFoundError(resource schema.GroupResource, metricName string) *apierr.StatusError { + return &apierr.StatusError{metav1.Status{ + Status: metav1.StatusFailure, + Code: int32(http.StatusNotFound), + Reason: metav1.StatusReasonNotFound, + Message: fmt.Sprintf("the server could not find the metric %s for %s", metricName, resource.String()), + }} +} + +// newMetricNotFoundForError returns a StatusError indicating the given metric could not be found for +// the given named object. It is similar to NewNotFound, but more specialized +func newMetricNotFoundForError(resource schema.GroupResource, metricName string, resourceName string) *apierr.StatusError { + return &apierr.StatusError{metav1.Status{ + Status: metav1.StatusFailure, + Code: int32(http.StatusNotFound), + Reason: metav1.StatusReasonNotFound, + Message: fmt.Sprintf("the server could not find the metric %s for %s %s", metricName, resource.String(), resourceName), + }} +} + +type prometheusProvider struct { + mapper apimeta.RESTMapper + kubeClient dynamic.ClientPool + promClient prom.Client + + SeriesRegistry + + rateInterval time.Duration +} + +func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.ClientPool, promClient prom.Client, updateInterval time.Duration, rateInterval time.Duration) provider.CustomMetricsProvider { + lister := &cachingMetricsLister{ + updateInterval: updateInterval, + promClient: promClient, + + SeriesRegistry: &basicSeriesRegistry{ + namer: metricNamer{ + // TODO: populate this... + overrides: nil, + mapper: mapper, + }, + }, + } + + // TODO: allow for RunUntil + lister.Run() + + return &prometheusProvider{ + mapper: mapper, + kubeClient: kubeClient, + promClient: promClient, + + SeriesRegistry: lister, + + rateInterval: rateInterval, + } +} + +func (p *prometheusProvider) metricFor(value pmodel.SampleValue, groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) { + kind, err := p.mapper.KindFor(groupResource.WithVersion("")) + if err != nil { + return nil, err + } + + return &custom_metrics.MetricValue{ + DescribedObject: api.ObjectReference{ + APIVersion: groupResource.Group+"/"+runtime.APIVersionInternal, + Kind: kind.Kind, + Name: name, + Namespace: namespace, + }, + MetricName: metricName, + Timestamp: metav1.Time{time.Now()}, + Value: *resource.NewMilliQuantity(int64(value * 1000.0), resource.DecimalSI), + }, nil +} + +func (p *prometheusProvider) metricsFor(valueSet pmodel.Vector, info provider.MetricInfo, list runtime.Object) (*custom_metrics.MetricValueList, error) { + if !apimeta.IsListType(list) { + // TODO: fix the error type here + return nil, fmt.Errorf("returned object was not a list") + } + + values, found := p.MatchValuesToNames(info, valueSet) + if !found { + // TODO: throw error + } + res := []custom_metrics.MetricValue{} + + // blech, EachListItem should pass an index -- + // it's an implementation detail that it happens to be sequential + err := apimeta.EachListItem(list, func(item runtime.Object) error { + objUnstructured := item.(*unstructured.Unstructured) + objName := objUnstructured.GetName() + if _, found := values[objName]; !found { + return nil + } + value, err := p.metricFor(values[objName], info.GroupResource, objUnstructured.GetNamespace(), objName, info.Metric) + if err != nil { + return err + } + res = append(res, *value) + + return nil + }) + if err != nil { + return nil, err + } + + return &custom_metrics.MetricValueList{ + Items: res, + }, nil +} + +func (p *prometheusProvider) buildQuery(info provider.MetricInfo, namespace string, names ...string) (pmodel.Vector, error) { + kind, baseQuery, groupBy, found := p.QueryForMetric(info, namespace, names...) + if !found { + return nil, newMetricNotFoundError(info.GroupResource, info.Metric) + } + + fullQuery := baseQuery + switch kind { + case CounterSeries: + fullQuery = prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String())) + case SecondsCounterSeries: + // TODO: futher modify for seconds? + fullQuery = prom.Selector(prom.Selector(fmt.Sprintf("rate(%s[%s])", baseQuery, pmodel.Duration(p.rateInterval).String()))) + } + + // TODO: too small of a rate interval will return no results... + + // sum over all other dimensions of this query (e.g. if we select on route, sum across all pods, + // but if we select on pods, sum across all routes), and split by the dimension of our resource + // TODO: return/populate the by list in SeriesForMetric + fullQuery = prom.Selector(fmt.Sprintf("sum(%s) by (%s)", fullQuery, groupBy)) + + // TODO: use an actual context + queryResults, err := p.promClient.Query(context.Background(), pmodel.Now(), fullQuery) + if err != nil { + // TODO: interpret this somehow? + glog.Errorf("unable to fetch metrics from prometheus: %v", err) + // don't leak implementation details to the user + return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) + } + + if queryResults.Type != pmodel.ValVector { + glog.Errorf("unexpected results from prometheus: expected %s, got %s on results %v", pmodel.ValVector, queryResults.Type, queryResults) + return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics")) + } + + return *queryResults.Vector, nil +} + +func (p *prometheusProvider) getSingle(info provider.MetricInfo, namespace, name string) (*custom_metrics.MetricValue, error) { + queryResults, err := p.buildQuery(info, namespace, name) + if err != nil { + return nil, err + } + + if len(queryResults) < 1 { + return nil, newMetricNotFoundForError(info.GroupResource, info.Metric, name) + } + // TODO: check if lenght of results > 1? + // TODO: check if our output name is the same as our input name + resultValue := queryResults[0].Value + return p.metricFor(resultValue, info.GroupResource, "", name, info.Metric) +} + +func (p *prometheusProvider) getMultiple(info provider.MetricInfo, namespace string, selector labels.Selector) (*custom_metrics.MetricValueList, error) { + // construct a client to list the names of objects matching the label selector + // TODO: figure out version? + client, err := p.kubeClient.ClientForGroupVersionResource(info.GroupResource.WithVersion("")) + if err != nil { + glog.Errorf("unable to construct dynamic client to list matching resource names: %v", err) + // TODO: check for resource not found error? + // don't leak implementation details to the user + return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) + } + + // we can construct a this APIResource ourself, since the dynamic client only uses Name and Namespaced + // TODO: use discovery information instead + apiRes := &metav1.APIResource{ + Name: info.GroupResource.Resource, + Namespaced: info.Namespaced, + } + + // actually list the objects matching the label selector + // TODO: work for objects not in core v1 + matchingObjectsRaw, err := client.Resource(apiRes, namespace). + List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + glog.Errorf("unable to list matching resource names: %v", err) + // TODO: check for resource not found error? + // don't leak implementation details to the user + return nil, apierr.NewInternalError(fmt.Errorf("unable to list matching resources")) + } + + // make sure we have a list + if !apimeta.IsListType(matchingObjectsRaw) { + // TODO: fix the error type here + return nil, fmt.Errorf("returned object was not a list") + } + + // convert a list of objects into the corresponding list of names + resourceNames := []string{} + err = apimeta.EachListItem(matchingObjectsRaw, func(item runtime.Object) error { + objName := item.(*unstructured.Unstructured).GetName() + resourceNames = append(resourceNames, objName) + return nil + }) + + // construct the actual query + queryResults, err := p.buildQuery(info, namespace, resourceNames...) + if err != nil { + return nil, err + } + return p.metricsFor(queryResults, info, matchingObjectsRaw) +} + +func (p *prometheusProvider) GetRootScopedMetricByName(groupResource schema.GroupResource, name string, metricName string) (*custom_metrics.MetricValue, error) { + info := provider.MetricInfo{ + GroupResource: groupResource, + Metric: metricName, + Namespaced: false, + } + + return p.getSingle(info, "", name) +} + + +func (p *prometheusProvider) GetRootScopedMetricBySelector(groupResource schema.GroupResource, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) { + info := provider.MetricInfo{ + GroupResource: groupResource, + Metric: metricName, + Namespaced: false, + } + return p.getMultiple(info, "", selector) +} + +func (p *prometheusProvider) GetNamespacedMetricByName(groupResource schema.GroupResource, namespace string, name string, metricName string) (*custom_metrics.MetricValue, error) { + info := provider.MetricInfo{ + GroupResource: groupResource, + Metric: metricName, + Namespaced: true, + } + + return p.getSingle(info, namespace, name) +} + +func (p *prometheusProvider) GetNamespacedMetricBySelector(groupResource schema.GroupResource, namespace string, selector labels.Selector, metricName string) (*custom_metrics.MetricValueList, error) { + info := provider.MetricInfo{ + GroupResource: groupResource, + Metric: metricName, + Namespaced: true, + } + return p.getMultiple(info, namespace, selector) +} + +type cachingMetricsLister struct { + SeriesRegistry + + promClient prom.Client + updateInterval time.Duration +} + +func (l *cachingMetricsLister) Run() { + go wait.Forever(func () { + if err := l.updateMetrics(); err != nil { + utilruntime.HandleError(err) + } + }, l.updateInterval) +} + +func (l *cachingMetricsLister) updateMetrics() error { + startTime := pmodel.Now().Add(-1*l.updateInterval) + + // TODO: figure out a good way to add all Kubernetes-related metrics at once + // (i.e. how do we determine if something is a Kubernetes-related metric?) + + // container-specific metrics from cAdvsior have their own form, and need special handling + containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) + namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) + // TODO: figure out how to determine which metrics on non-namespaced objects are kubernetes-related + + // TODO: use an actual context here + series, err := l.promClient.Series(context.Background(), pmodel.Interval{startTime, 0}, containerSel, namespacedSel) + if err != nil { + return fmt.Errorf("unable to update list of all available metrics: %v", err) + } + + glog.V(10).Infof("Set available metric list from Prometheus to: %v", series) + + l.SetSeries(series) + + return nil +} diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go new file mode 100644 index 00000000..95926c4c --- /dev/null +++ b/pkg/custom-provider/provider_test.go @@ -0,0 +1,166 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "fmt" + "sort" + "time" + "testing" + + fakedyn "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/pkg/api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/custom-metrics-boilerplate/pkg/provider" + + // install extensions so that our RESTMapper knows about it + _ "k8s.io/client-go/pkg/apis/extensions/install" + + prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client" + pmodel "github.com/prometheus/common/model" +) + +const fakeProviderUpdateInterval = 2*time.Second + +// fakePromClient is a fake instance of prom.Client +type fakePromClient struct { + // acceptibleInterval is the interval in which to return queries + acceptibleInterval pmodel.Interval + // errQueries are queries that result in an error (whether from Query or Series) + errQueries map[prom.Selector]error + // series are non-error responses to partial Series calls + series map[prom.Selector][]prom.Series + // queryResults are non-error responses to Query + queryResults map[prom.Selector]prom.QueryResult +} + +func (c *fakePromClient) Series(interval pmodel.Interval, selectors ...prom.Selector) ([]prom.Series, error) { + if (interval.Start != 0 && interval.Start < c.acceptibleInterval.Start) || (interval.End != 0 && interval.End > c.acceptibleInterval.End) { + return nil, fmt.Errorf("interval [%v, %v] for query is outside range [%v, %v]", interval.Start, interval.End, c.acceptibleInterval.Start, c.acceptibleInterval.End) + } + res := []prom.Series{} + for _, sel := range selectors { + if err, found := c.errQueries[sel]; found { + return nil, err + } + if series, found := c.series[sel]; found { + res = append(res, series...) + } + } + + return res, nil +} + +func (c *fakePromClient) Query(t pmodel.Time, query prom.Selector) (prom.QueryResult, error) { + if t < c.acceptibleInterval.Start || t > c.acceptibleInterval.End { + return prom.QueryResult{}, fmt.Errorf("time %v for query is outside range [%v, %v]", t, c.acceptibleInterval.Start, c.acceptibleInterval.End) + } + + if err, found := c.errQueries[query]; found { + return prom.QueryResult{}, err + } + + if res, found := c.queryResults[query]; found { + return res, nil + } + + return prom.QueryResult{ + Type: pmodel.ValVector, + Vector: &pmodel.Vector{}, + }, nil +} + +func setupPrometheusProvider(t *testing.T) (provider.CustomMetricsProvider, *fakePromClient) { + fakeProm := &fakePromClient{} + fakeKubeClient := &fakedyn.FakeClientPool{} + + prov := NewPrometheusProvider(api.Registry.RESTMapper(), fakeKubeClient, fakeProm, fakeProviderUpdateInterval, 1*time.Minute) + + containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container_name", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod_name", "")) + namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) + fakeProm.series = map[prom.Selector][]prom.Series{ + containerSel: []prom.Series{ + { + Name: "container_actually_gauge_seconds_total", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + { + Name: "container_some_usage", + Labels: map[string]string{"pod_name": "somepod", "namespace": "somens", "container_name": "somecont"}, + }, + }, + namespacedSel: []prom.Series{ + { + Name: "ingress_hits_total", + Labels: map[string]string{"ingress": "someingress", "service": "somesvc", "pod": "backend1", "namespace": "somens"}, + }, + { + Name: "ingress_hits_total", + Labels: map[string]string{"ingress": "someingress", "service": "somesvc", "pod": "backend2", "namespace": "somens"}, + }, + { + Name: "service_proxy_packets", + Labels: map[string]string{"service": "somesvc", "namespace": "somens"}, + }, + { + Name: "work_queue_wait_seconds_total", + Labels: map[string]string{"deployment": "somedep", "namespace": "somens"}, + }, + }, + } + + return prov, fakeProm +} + +func TestListAllMetrics(t *testing.T) { + // setup + prov, fakeProm := setupPrometheusProvider(t) + + // assume we have no updates + require.Len(t, prov.ListAllMetrics(), 0, "assume: should have no metrics updates at the start") + + // set the acceptible interval (now until the next update, with a bit of wiggle room) + startTime := pmodel.Now() + endTime := startTime.Add(fakeProviderUpdateInterval + fakeProviderUpdateInterval/10) + fakeProm.acceptibleInterval = pmodel.Interval{Start: startTime, End: endTime} + + // wait one update interval (with a bit of wiggle room) + time.Sleep(fakeProviderUpdateInterval + fakeProviderUpdateInterval/10) + + // list/sort the metrics + actualMetrics := prov.ListAllMetrics() + sort.Sort(metricInfoSorter(actualMetrics)) + + expectedMetrics := []provider.MetricInfo{ + provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "actually_gauge"}, + provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "some_usage"}, + provider.MetricInfo{schema.GroupResource{Resource: "services"}, true, "ingress_hits"}, + provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "ingresses"}, true, "ingress_hits"}, + provider.MetricInfo{schema.GroupResource{Resource: "pods"}, true, "ingress_hits"}, + provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "ingress_hits"}, + provider.MetricInfo{schema.GroupResource{Resource: "services"}, true, "service_proxy_packets"}, + provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "service_proxy_packets"}, + provider.MetricInfo{schema.GroupResource{Group: "extensions", Resource: "deployments"}, true, "work_queue_wait"}, + provider.MetricInfo{schema.GroupResource{Resource: "namespaces"}, false, "work_queue_wait"}, + } + sort.Sort(metricInfoSorter(expectedMetrics)) + + // assert that we got what we expected + assert.Equal(t, expectedMetrics, actualMetrics) +}