vendor: revendor metrics-server, custom-metrics-apiserver

This commit is contained in:
Sergiusz Urbaniak 2020-10-28 15:52:52 +01:00
parent 752ce84723
commit 523aa52367
1010 changed files with 91458 additions and 29107 deletions

View file

@ -15,7 +15,6 @@ reviewers:
- mikedanese
- liggitt
- ncdc
- tallclair
- timothysc
- hongchaodeng
- krousey

View file

@ -39,29 +39,11 @@ import (
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/20190404-kubernetes-control-plane-metrics-stability.md#stability-classes)
*
* Promoting the stability level of the metric is a responsibility of the component owner, since it
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
* the metric stability policy.
*/
var (
initCounter = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "apiserver_init_events_total",
Help: "Counter of init events processed in watchcache broken by resource type",
StabilityLevel: metrics.ALPHA,
},
[]string{"resource"},
)
emptyFunc = func() {}
)
@ -69,17 +51,15 @@ const (
// storageWatchListPageSize is the cacher's request chunk size of
// initial and resync watch lists to storage.
storageWatchListPageSize = int64(10000)
// defaultBookmarkFrequency defines how frequently watch bookmarks should be send
// in addition to sending a bookmark right before watch deadline.
//
// NOTE: Update `eventFreshDuration` when changing this value.
defaultBookmarkFrequency = time.Minute
)
func init() {
legacyregistry.MustRegister(initCounter)
}
// Config contains the configuration for a given Cache.
type Config struct {
// Maximum size of the history cached in memory.
CacheCapacity int
// An underlying storage.Interface.
Storage storage.Interface
@ -112,6 +92,8 @@ type Config struct {
NewListFunc func() runtime.Object
Codec runtime.Codec
Clock clock.Clock
}
type watchersMap map[int]*cacheWatcher
@ -176,24 +158,26 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache
// second in a bucket, and pop up them once at the timeout. To be more specific,
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
type watcherBookmarkTimeBuckets struct {
lock sync.Mutex
watchersBuckets map[int64][]*cacheWatcher
startBucketID int64
clock clock.Clock
lock sync.Mutex
watchersBuckets map[int64][]*cacheWatcher
startBucketID int64
clock clock.Clock
bookmarkFrequency time.Duration
}
func newTimeBucketWatchers(clock clock.Clock) *watcherBookmarkTimeBuckets {
func newTimeBucketWatchers(clock clock.Clock, bookmarkFrequency time.Duration) *watcherBookmarkTimeBuckets {
return &watcherBookmarkTimeBuckets{
watchersBuckets: make(map[int64][]*cacheWatcher),
startBucketID: clock.Now().Unix(),
clock: clock,
watchersBuckets: make(map[int64][]*cacheWatcher),
startBucketID: clock.Now().Unix(),
clock: clock,
bookmarkFrequency: bookmarkFrequency,
}
}
// adds a watcher to the bucket, if the deadline is before the start, it will be
// added to the first one.
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
nextTime, ok := w.nextBookmarkTime(t.clock.Now())
nextTime, ok := w.nextBookmarkTime(t.clock.Now(), t.bookmarkFrequency)
if !ok {
return false
}
@ -336,11 +320,14 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}
}
clock := clock.RealClock{}
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
objType := reflect.TypeOf(obj)
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(obj),
objectType: objType,
versioner: config.Versioner,
newFunc: config.NewFunc,
indexedTrigger: indexedTrigger,
@ -358,9 +345,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh,
clock: clock,
clock: config.Clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock),
bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
}
// Ensure that timer is stopped.
@ -371,7 +358,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}
watchCache := newWatchCache(
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers)
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
@ -412,6 +399,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
c.watchCache.SetOnReplace(func() {
successfulList = true
c.ready.set(true)
klog.V(1).Infof("cacher (%v): initialized", c.objectType.String())
})
defer func() {
if successfulList {
@ -425,7 +413,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
// Also note that startCaching is called in a loop, so there's no need
// to have another loop here.
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
klog.Errorf("unexpected ListAndWatch error: %v", err)
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err)
}
}
@ -445,8 +433,9 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
}
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion)
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
pred := opts.Predicate
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
@ -529,22 +518,22 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
}
// WatchList implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, pred)
func (c *Cacher) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return c.Watch(ctx, key, opts)
}
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
if resourceVersion == "" {
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
if opts.ResourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
return c.storage.Get(ctx, key, opts, objPtr)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
getRV, err := c.versioner.ParseResourceVersion(resourceVersion)
getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
@ -552,7 +541,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
return c.storage.Get(ctx, key, opts, objPtr)
}
// Do not create a trace - it's not for free and there are tons
@ -577,7 +566,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
objVal.Set(reflect.ValueOf(elem.Object).Elem())
} else {
objVal.Set(reflect.Zero(objVal.Type()))
if !ignoreNotFound {
if !opts.IgnoreNotFound {
return storage.NewKeyNotFoundError(key, int64(readResourceVersion))
}
}
@ -585,18 +574,20 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
}
// GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit {
if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
return c.storage.GetToList(ctx, key, opts, listObj)
}
// If resourceVersion is specified, serve it from cache.
@ -610,7 +601,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
return c.storage.GetToList(ctx, key, opts, listObj)
}
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
@ -657,18 +648,20 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
}
// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit {
if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero.
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
return c.storage.List(ctx, key, opts, listObj)
}
// If resourceVersion is specified, serve it from cache.
@ -682,7 +675,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
return c.storage.List(ctx, key, opts, listObj)
}
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
@ -935,9 +928,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
continue
}
c.watchersBuffer = append(c.watchersBuffer, watcher)
// Given that we send bookmark event once at deadline-2s, never push again
// after the watcher pops up from the buckets. Once we decide to change the
// strategy to more sophisticated, we may need it here.
// Requeue the watcher for the next bookmark if needed.
c.bookmarkWatchers.addWatcher(watcher)
}
}
}
@ -1098,7 +1090,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
Continue: options.Continue,
}
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil {
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersionMatch: options.ResourceVersionMatch, Predicate: pred}, list); err != nil {
return nil, err
}
return list, nil
@ -1106,7 +1098,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, storage.Everything)
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
}
// errWatcher implements watch.Interface to return a single error
@ -1240,13 +1232,28 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
}
}
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
// For now we return 2s before deadline (and maybe +infinity is now already passed this time)
// but it gives us extensibility for the future(false when deadline is not set).
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
// We try to send bookmarks:
// (a) roughly every minute
// (b) right before the watcher timeout - for now we simply set it 2s before
// the deadline
// The former gives us periodicity if the watch breaks due to unexpected
// conditions, the later ensures that on timeout the watcher is as close to
// now as possible - this covers 99% of cases.
heartbeatTime := now.Add(bookmarkFrequency)
if c.deadline.IsZero() {
return c.deadline, false
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
// apiserver if properly configured. So this shoudln't happen in practice.
return heartbeatTime, true
}
return c.deadline.Add(-2 * time.Second), true
if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) {
heartbeatTime = pretimeoutTime
}
if heartbeatTime.Before(now) {
return time.Time{}, false
}
return heartbeatTime, true
}
func getEventObject(object runtime.Object) runtime.Object {

View file

@ -30,7 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"k8s.io/klog/v2"
)
var _ runtime.CacheableObject = &cachingObject{}

74
vendor/k8s.io/apiserver/pkg/storage/cacher/metrics.go generated vendored Normal file
View file

@ -0,0 +1,74 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/20190404-kubernetes-control-plane-metrics-stability.md#stability-classes)
*
* Promoting the stability level of the metric is a responsibility of the component owner, since it
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
* the metric stability policy.
*/
var (
initCounter = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "apiserver_init_events_total",
Help: "Counter of init events processed in watchcache broken by resource type.",
StabilityLevel: metrics.ALPHA,
},
[]string{"resource"},
)
watchCacheCapacityIncreaseTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "watch_cache_capacity_increase_total",
Help: "Total number of watch cache capacity increase events broken by resource type.",
StabilityLevel: metrics.ALPHA,
},
[]string{"resource"},
)
watchCacheCapacityDecreaseTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "watch_cache_capacity_decrease_total",
Help: "Total number of watch cache capacity decrease events broken by resource type.",
StabilityLevel: metrics.ALPHA,
},
[]string{"resource"},
)
)
func init() {
legacyregistry.MustRegister(initCounter)
legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal)
legacyregistry.MustRegister(watchCacheCapacityDecreaseTotal)
}
// recordsWatchCacheCapacityChange record watchCache capacity resize(increase or decrease) operations.
func recordsWatchCacheCapacityChange(objType string, old, new int) {
if old < new {
watchCacheCapacityIncreaseTotal.WithLabelValues(objType).Inc()
return
}
watchCacheCapacityDecreaseTotal.WithLabelValues(objType).Inc()
}

View file

@ -44,3 +44,17 @@ func hasPathPrefix(s, pathPrefix string) bool {
}
return false
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

View file

@ -18,6 +18,7 @@ package cacher
import (
"fmt"
"reflect"
"sort"
"sync"
"time"
@ -30,7 +31,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
@ -44,6 +45,19 @@ const (
// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
// after receiving a 'too high resource version' error.
resourceVersionTooHighRetrySeconds = 1
// eventFreshDuration is time duration of events we want to keep.
// We set it to `defaultBookmarkFrequency` plus epsilon to maximize
// chances that last bookmark was sent within kept history, at the
// same time, minimizing the needed memory usage.
eventFreshDuration = 75 * time.Second
// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
// TODO: Figure out, to what value we can decreased it.
defaultLowerBoundCapacity = 100
// defaultUpperBoundCapacity should be able to keep eventFreshDuration of history.
defaultUpperBoundCapacity = 100 * 1024
)
// watchCacheEvent is a single "watch event" that is send to users of
@ -60,6 +74,7 @@ type watchCacheEvent struct {
PrevObjFields fields.Set
Key string
ResourceVersion uint64
RecordTime time.Time
}
// Computing a key of an object is generally non-trivial (it performs
@ -126,6 +141,12 @@ type watchCache struct {
// Maximum size of history window.
capacity int
// upper bound of capacity since event cache has a dynamic size.
upperBoundCapacity int
// lower bound of capacity since event cache has a dynamic size.
lowerBoundCapacity int
// keyFunc is used to get a key in the underlying storage for a given object.
keyFunc func(runtime.Object) (string, error)
@ -165,28 +186,35 @@ type watchCache struct {
// An underlying storage.Versioner.
versioner storage.Versioner
// cacher's objectType.
objectType reflect.Type
}
func newWatchCache(
capacity int,
keyFunc func(runtime.Object) (string, error),
eventHandler func(*watchCacheEvent),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
versioner storage.Versioner,
indexers *cache.Indexers) *watchCache {
indexers *cache.Indexers,
clock clock.Clock,
objectType reflect.Type) *watchCache {
wc := &watchCache{
capacity: capacity,
capacity: defaultLowerBoundCapacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]*watchCacheEvent, capacity),
cache: make([]*watchCacheEvent, defaultLowerBoundCapacity),
lowerBoundCapacity: defaultLowerBoundCapacity,
upperBoundCapacity: defaultUpperBoundCapacity,
startIndex: 0,
endIndex: 0,
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
resourceVersion: 0,
listResourceVersion: 0,
eventHandler: eventHandler,
clock: clock.RealClock{},
clock: clock,
versioner: versioner,
objectType: objectType,
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
@ -260,6 +288,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
ObjFields: elem.Fields,
Key: key,
ResourceVersion: resourceVersion,
RecordTime: w.clock.Now(),
}
if err := func() error {
@ -301,7 +330,8 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(event *watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
w.resizeCacheLocked(event.RecordTime)
if w.isCacheFullLocked() {
// Cache is full - remove the oldest element.
w.startIndex++
}
@ -309,6 +339,48 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
w.endIndex++
}
// resizeCacheLocked resizes the cache if necessary:
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
capacity := min(w.capacity*2, w.upperBoundCapacity)
if capacity > w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
capacity := max(w.capacity/2, w.lowerBoundCapacity)
if capacity < w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
}
// isCacheFullLocked used to judge whether watchCacheEvent is full.
// Assumes that lock is already held for write.
func (w *watchCache) isCacheFullLocked() bool {
return w.endIndex == w.startIndex+w.capacity
}
// doCacheResizeLocked resize watchCache's event array with different capacity.
// Assumes that lock is already held for write.
func (w *watchCache) doCacheResizeLocked(capacity int) {
newCache := make([]*watchCacheEvent, capacity)
if capacity < w.capacity {
// adjust startIndex if cache capacity shrink.
w.startIndex = w.endIndex - capacity
}
for i := w.startIndex; i < w.endIndex; i++ {
newCache[i%capacity] = w.cache[i%w.capacity]
}
w.cache = newCache
recordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity)
w.capacity = capacity
}
// List returns list of pointers to <storeElement> objects.
func (w *watchCache) List() []interface{} {
return w.store.List()
@ -460,19 +532,16 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
size := w.endIndex - w.startIndex
var oldest uint64
switch {
case size >= w.capacity:
// Once the watch event buffer is full, the oldest watch event we can deliver
// is the first one in the buffer.
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
case w.listResourceVersion > 0:
// If the watch event buffer isn't full, the oldest watch event we can deliver
// is one greater than the resource version of the last full list.
case w.listResourceVersion > 0 && w.startIndex == 0:
// If no event was removed from the buffer since last relist, the oldest watch
// event we can deliver is one greater than the resource version of the list.
oldest = w.listResourceVersion + 1
case size > 0:
// If we've never completed a list, use the resourceVersion of the oldest event
// in the buffer.
// This should only happen in unit tests that populate the buffer without
// performing list/replace operations.
// If the previous condition is not satisfied: either some event was already
// removed from the buffer or we've never completed a list (the latter can
// only happen in unit tests that populate the buffer without performing
// list/replace operations), the oldest watch event we can deliver is the first
// one in the buffer.
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
default:
return nil, fmt.Errorf("watch cache isn't correctly initialized")

View file

@ -177,7 +177,12 @@ var tooLargeResourceVersionCauseMsg = "Too large resource version"
// a minimum resource version that is larger than the largest currently available resource version for a requested resource.
func NewTooLargeResourceVersionError(minimumResourceVersion, currentRevision uint64, retrySeconds int) error {
err := errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %d, current: %d", minimumResourceVersion, currentRevision), retrySeconds)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: tooLargeResourceVersionCauseMsg}}
err.ErrStatus.Details.Causes = []metav1.StatusCause{
{
Type: metav1.CauseTypeResourceVersionTooLarge,
Message: tooLargeResourceVersionCauseMsg,
},
}
return err
}
@ -186,15 +191,5 @@ func IsTooLargeResourceVersion(err error) bool {
if !errors.IsTimeout(err) {
return false
}
switch t := err.(type) {
case errors.APIStatus:
if d := t.Status().Details; d != nil {
for _, cause := range d.Causes {
if cause.Message == tooLargeResourceVersionCauseMsg {
return true
}
}
}
}
return false
return errors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
}

View file

@ -17,6 +17,7 @@ limitations under the License.
package etcd3
import (
"fmt"
"strconv"
"k8s.io/apimachinery/pkg/api/meta"
@ -45,14 +46,14 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uin
// UpdateList implements Versioner
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string, count *int64) error {
if resourceVersion == 0 {
return fmt.Errorf("illegal resource version from storage: %d", resourceVersion)
}
listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
versionString := strconv.FormatUint(resourceVersion, 10)
listAccessor.SetResourceVersion(versionString)
listAccessor.SetContinue(nextKey)
listAccessor.SetRemainingItemCount(count)

View file

@ -23,7 +23,7 @@ import (
"time"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog"
"k8s.io/klog/v2"
)
const (

View file

@ -20,7 +20,7 @@ import (
"fmt"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog"
"k8s.io/klog/v2"
)
func init() {
@ -80,5 +80,5 @@ func (klogWrapper) Fatalf(format string, args ...interface{}) {
}
func (klogWrapper) V(l int) bool {
return bool(klog.V(klog.Level(l)))
return bool(klog.V(klog.Level(l)).Enabled())
}

View file

@ -49,6 +49,14 @@ var (
},
[]string{"resource"},
)
dbTotalSize = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_db_total_size_in_bytes",
Help: "Total size of the etcd database file physically allocated in bytes.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"endpoint"},
)
)
var registerMetrics sync.Once
@ -59,6 +67,7 @@ func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(etcdRequestLatency)
legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize)
})
}
@ -81,3 +90,8 @@ func Reset() {
func sinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
func UpdateEtcdDbSize(ep string, size int64) {
dbTotalSize.WithLabelValues(ep).Set(float64(size))
}

View file

@ -32,6 +32,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
@ -41,7 +43,7 @@ import (
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
@ -62,10 +64,7 @@ func (d authenticatedDataString) AuthenticatedData() []byte {
var _ value.Context = authenticatedDataString("")
type store struct {
client *clientv3.Client
// getOpts contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
@ -112,20 +111,20 @@ func (s *store) Versioner() storage.Versioner {
}
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return err
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if ignoreNotFound {
if opts.IgnoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
@ -251,7 +250,7 @@ func (s *store) GuaranteedUpdate(
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return nil, err
@ -379,10 +378,14 @@ func (s *store) GuaranteedUpdate(
}
// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := listOpts.ResourceVersion
match := listOpts.ResourceVersionMatch
pred := listOpts.Predicate
trace := utiltrace.New("GetToList etcd3",
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"resourceVersionMatch", match},
utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond)
@ -399,12 +402,21 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
var opts []clientv3.OpOption
if len(resourceVersion) > 0 && match == metav1.ResourceVersionMatchExact {
rv, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
opts = append(opts, clientv3.WithRev(int64(rv)))
}
getResp, err := s.client.KV.Get(ctx, key, opts...)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
if err != nil {
return err
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
@ -440,6 +452,14 @@ func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Obje
func (s *store) Count(key string) (int64, error) {
key = path.Join(s.pathPrefix, key)
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(key, "/") {
key += "/"
}
startTime := time.Now()
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
metrics.RecordEtcdRequestLatency("listWithCount", key, startTime)
@ -510,10 +530,14 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
}
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate
trace := utiltrace.New("List etcd3",
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"resourceVersionMatch", match},
utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond)
@ -547,7 +571,16 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
newItemFunc := getNewItemFunc(listObj, v)
var returnedRV, continueRV int64
var fromRV *uint64
if len(resourceVersion) > 0 {
parsedRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
fromRV = &parsedRV
}
var returnedRV, continueRV, withRev int64
var continueKey string
switch {
case s.pagingEnabled && len(pred.Continue) > 0:
@ -568,27 +601,50 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
options = append(options, clientv3.WithRev(continueRV))
withRev = continueRV
returnedRV = continueRV
}
case s.pagingEnabled && pred.Limit > 0:
if len(resourceVersion) > 0 {
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
if fromRV != nil {
switch match {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
returnedRV = int64(*fromRV)
withRev = returnedRV
case "": // legacy case
if *fromRV > 0 {
returnedRV = int64(*fromRV)
withRev = returnedRV
}
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(int64(fromRV)))
}
returnedRV = int64(fromRV)
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
default:
if fromRV != nil {
switch match {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
returnedRV = int64(*fromRV)
withRev = returnedRV
case "": // legacy case
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
}
}
options = append(options, clientv3.WithPrefix())
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
}
// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
@ -601,7 +657,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
hasMore = getResp.More
@ -650,6 +706,10 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
break
}
key = string(lastKey) + "\x00"
if withRev == 0 {
withRev = returnedRV
options = append(options, clientv3.WithRev(withRev))
}
}
// instruct the client to begin querying from immediately after the last key we returned
@ -709,22 +769,22 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
}
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, false)
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.watch(ctx, key, opts, false)
}
// WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, true)
func (s *store) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.watch(ctx, key, opts, true)
}
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
rev, err := s.versioner.ParseResourceVersion(rv)
func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, recursive bool) (watch.Interface, error) {
rev, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate)
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
@ -818,9 +878,9 @@ func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, er
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
}
// ensureMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage.
func (s *store) ensureMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
if minimumResourceVersion == "" {
return nil
}

View file

@ -32,7 +32,7 @@ import (
"k8s.io/apiserver/pkg/storage/value"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog"
"k8s.io/klog/v2"
)
const (
@ -126,7 +126,15 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
// The filter doesn't filter out any object.
wc.internalPred = storage.Everything
}
wc.ctx, wc.cancel = context.WithCancel(ctx)
// The etcd server waits until it cannot find a leader for 3 election
// timeouts to cancel existing streams. 3 is currently a hard coded
// constant. The election timeout defaults to 1000ms. If the cluster is
// healthy, when the leader is stopped, the leadership transfer should be
// smooth. (leader transfers its leadership before stopping). If leader is
// hard killed, other servers will take an election timeout to realize
// leader lost and start campaign.
wc.ctx, wc.cancel = context.WithCancel(clientv3.WithRequireLeader(ctx))
return wc
}
@ -253,8 +261,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
continue
}
if len(wc.resultChan) == outgoingBufSize {
klog.V(3).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow dispatching events to watchers", outgoingBufSize)
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
@ -360,9 +367,7 @@ func (wc *watchChan) sendError(err error) {
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
incomingBufSize)
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize)
}
select {
case wc.incomingEventChan <- e:

View file

@ -21,6 +21,7 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -175,7 +176,7 @@ type Interface interface {
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item selected by 'p' are sent down to returned watch.Interface.
@ -184,26 +185,26 @@ type Interface interface {
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will list current objects directory defined by key
// and send them in "ADDED" events, before watch starts.
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict.
@ -243,3 +244,29 @@ type Interface interface {
// Count returns number of different entries under the key (generally being path prefix).
Count(key string) (int64, error)
}
// GetOptions provides the options that may be provided for storage get operations.
type GetOptions struct {
// IgnoreNotFound determines what is returned if the requested object is not found. If
// true, a zero object is returned. If false, an error is returned.
IgnoreNotFound bool
// ResourceVersion provides a resource version constraint to apply to the get operation
// as a "not older than" constraint: the result contains data at least as new as the provided
// ResourceVersion. The newest available data is preferred, but any data not older than this
// ResourceVersion may be served.
ResourceVersion string
}
// ListOptions provides the options that may be provided for storage list operations.
type ListOptions struct {
// ResourceVersion provides a resource version constraint to apply to the list operation
// as a "not older than" constraint: the result contains data at least as new as the provided
// ResourceVersion. The newest available data is preferred, but any data not older than this
// ResourceVersion may be served.
ResourceVersion string
// ResourceVersionMatch provides the rule for how the resource version constraint applies. If set
// to the default value "" the legacy resource version semantic apply.
ResourceVersionMatch metav1.ResourceVersionMatch
// Predicate provides the selection rules for the list operation.
Predicate SelectionPredicate
}

View file

@ -28,7 +28,8 @@ const (
StorageTypeUnset = ""
StorageTypeETCD3 = "etcd3"
DefaultCompactInterval = 5 * time.Minute
DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
)
// TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
@ -71,13 +72,16 @@ type Config struct {
CompactionInterval time.Duration
// CountMetricPollPeriod specifies how often should count metric be updated
CountMetricPollPeriod time.Duration
// DBMetricPollInterval specifies how often should storage backend metric be updated.
DBMetricPollInterval time.Duration
}
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
return &Config{
Paging: true,
Prefix: prefix,
Codec: codec,
CompactionInterval: DefaultCompactInterval,
Paging: true,
Prefix: prefix,
Codec: codec,
CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
}
}

View file

@ -36,20 +36,26 @@ import (
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
)
// The short keepalive timeout and interval have been chosen to aggressively
// detect a failed etcd server without introducing much overhead.
const keepaliveTime = 30 * time.Second
const keepaliveTimeout = 10 * time.Second
const (
// The short keepalive timeout and interval have been chosen to aggressively
// detect a failed etcd server without introducing much overhead.
keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
// dialTimeout is the timeout for failing to establish a connection.
// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649)
const dialTimeout = 20 * time.Second
// dialTimeout is the timeout for failing to establish a connection.
// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649)
dialTimeout = 20 * time.Second
dbMetricsMonitorJitter = 0.5
)
func init() {
// grpcprom auto-registers (via an init function) their client metrics, since we are opting out of
@ -57,6 +63,7 @@ func init() {
// we need to explicitly register these metrics to our global registry here.
// For reference: https://github.com/kubernetes/kubernetes/pull/81387
legacyregistry.RawMustRegister(grpcprom.DefaultClientMetrics)
dbMetricsMonitors = make(map[string]struct{})
}
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
@ -153,16 +160,20 @@ type runningCompactor struct {
}
var (
lock sync.Mutex
compactors = map[string]*runningCompactor{}
// compactorsMu guards access to compactors map
compactorsMu sync.Mutex
compactors = map[string]*runningCompactor{}
// dbMetricsMonitorsMu guards access to dbMetricsMonitors map
dbMetricsMonitorsMu sync.Mutex
dbMetricsMonitors map[string]struct{}
)
// startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the
// compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called,
// the compactor is stopped.
func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) {
lock.Lock()
defer lock.Unlock()
compactorsMu.Lock()
defer compactorsMu.Unlock()
key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
@ -193,8 +204,8 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
compactors[key].refs++
return func() {
lock.Lock()
defer lock.Unlock()
compactorsMu.Lock()
defer compactorsMu.Unlock()
compactor := compactors[key]
compactor.refs--
@ -218,6 +229,11 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
return nil, nil, err
}
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
if err != nil {
return nil, nil, err
}
var once sync.Once
destroyFunc := func() {
// we know that storage destroy funcs are called multiple times (due to reuse in subresources).
@ -225,6 +241,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
// TODO: fix duplicated storage destroy calls higher level
once.Do(func() {
stopCompactor()
stopDBSizeMonitor()
client.Close()
})
}
@ -234,3 +251,36 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
}
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint.
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
if interval == 0 {
return func() {}, nil
}
dbMetricsMonitorsMu.Lock()
defer dbMetricsMonitorsMu.Unlock()
ctx, cancel := context.WithCancel(context.Background())
for _, ep := range client.Endpoints() {
if _, found := dbMetricsMonitors[ep]; found {
continue
}
dbMetricsMonitors[ep] = struct{}{}
endpoint := ep
klog.V(4).Infof("Start monitoring storage db size metric for endpoint %s with polling interval %v", endpoint, interval)
go wait.JitterUntilWithContext(ctx, func(context.Context) {
epStatus, err := client.Maintenance.Status(ctx, endpoint)
if err != nil {
klog.V(4).Infof("Failed to get storage db size for ep %s: %v", endpoint, err)
metrics.UpdateEtcdDbSize(endpoint, -1)
} else {
metrics.UpdateEtcdDbSize(endpoint, epStatus.DbSize)
}
}, interval, dbMetricsMonitorJitter, true)
}
return func() {
cancel()
}, nil
}

View file

@ -26,7 +26,7 @@ import (
"sync"
"time"
"k8s.io/klog"
"k8s.io/klog/v2"
"google.golang.org/grpc"