vendor: Update vendor logic

This commit is contained in:
Clayton Coleman 2020-04-08 14:34:43 -04:00
parent c6ac5cbc87
commit 4ca64b85f0
No known key found for this signature in database
GPG key ID: 3D16906B4F1C5CB3
1540 changed files with 265304 additions and 91616 deletions

View file

@ -24,8 +24,6 @@ import (
"sync"
"time"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -41,16 +39,26 @@ import (
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog"
utiltrace "k8s.io/utils/trace"
"github.com/prometheus/client_golang/prometheus"
)
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/20190404-kubernetes-control-plane-metrics-stability.md#stability-classes)
*
* Promoting the stability level of the metric is a responsibility of the component owner, since it
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
* the metric stability policy.
*/
var (
initCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "apiserver_init_events_total",
Help: "Counter of init events processed in watchcache broken by resource type",
initCounter = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "apiserver_init_events_total",
Help: "Counter of init events processed in watchcache broken by resource type",
StabilityLevel: metrics.ALPHA,
},
[]string{"resource"},
)
@ -64,7 +72,7 @@ const (
)
func init() {
prometheus.MustRegister(initCounter)
legacyregistry.MustRegister(initCounter)
}
// Config contains the configuration for a given Cache.
@ -88,9 +96,9 @@ type Config struct {
// GetAttrsFunc is used to get object labels, fields
GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, err error)
// TriggerPublisherFunc is used for optimizing amount of watchers that
// IndexerFuncs is used for optimizing amount of watchers that
// needs to process an incoming event.
TriggerPublisherFunc storage.TriggerPublisherFunc
IndexerFuncs storage.IndexerFuncs
// NewFunc is a function that creates new empty object storing a object of type Type.
NewFunc func() runtime.Object
@ -154,17 +162,17 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache
klog.Warningf("Terminating all watchers from cacher %v", objectType)
}
i.allWatchers.terminateAll(done)
for index, watchers := range i.valueWatchers {
for _, watchers := range i.valueWatchers {
watchers.terminateAll(done)
delete(i.valueWatchers, index)
}
i.valueWatchers = map[string]watchersMap{}
}
// As we don't need a high precision here, we keep all watchers timeout within a
// second in a bucket, and pop up them once at the timeout. To be more specific,
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
// This is NOT thread-safe.
type watcherBookmarkTimeBuckets struct {
lock sync.Mutex
watchersBuckets map[int64][]*cacheWatcher
startBucketID int64
clock clock.Clock
@ -186,6 +194,8 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
return false
}
bucketID := nextTime.Unix()
t.lock.Lock()
defer t.lock.Unlock()
if bucketID < t.startBucketID {
bucketID = t.startBucketID
}
@ -198,6 +208,8 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher {
currentBucketID := t.clock.Now().Unix()
// There should be one or two elements in almost all cases
expiredWatchers := make([][]*cacheWatcher, 0, 2)
t.lock.Lock()
defer t.lock.Unlock()
for ; t.startBucketID <= currentBucketID; t.startBucketID++ {
if watchers, ok := t.watchersBuckets[t.startBucketID]; ok {
delete(t.watchersBuckets, t.startBucketID)
@ -209,6 +221,11 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher {
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool
type indexedTriggerFunc struct {
indexName string
indexerFunc storage.IndexerFunc
}
// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
@ -248,9 +265,9 @@ type Cacher struct {
// newFunc is a function that creates new empty object storing a object of type Type.
newFunc func() runtime.Object
// triggerFunc is used for optimizing amount of watchers that needs to process
// indexedTrigger is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc storage.TriggerPublisherFunc
indexedTrigger *indexedTriggerFunc
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
@ -276,37 +293,54 @@ type Cacher struct {
// watchersBuffer is a list of watchers potentially interested in currently
// dispatched event.
watchersBuffer []*cacheWatcher
// blockedWatchers is a list of watchers whose buffer is currently full.
blockedWatchers []*cacheWatcher
// watchersToStop is a list of watchers that were supposed to be stopped
// during current dispatching, but stopping was deferred to the end of
// dispatching that event to avoid race with closing channels in watchers.
watchersToStop []*cacheWatcher
// Maintain a timeout queue to send the bookmark event before the watcher times out.
bookmarkWatchers *watcherBookmarkTimeBuckets
// watchBookmark feature-gate
watchBookmarkEnabled bool
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config Config) *Cacher {
func NewCacherFromConfig(config Config) (*Cacher, error) {
stopCh := make(chan struct{})
obj := config.NewFunc()
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
return nil, fmt.Errorf("storage codec doesn't seem to match given type: %v", err)
}
var indexedTrigger *indexedTriggerFunc
if config.IndexerFuncs != nil {
// For now, we don't support multiple trigger functions defined
// for a given resource.
if len(config.IndexerFuncs) > 1 {
return nil, fmt.Errorf("cacher %s doesn't support more than one IndexerFunc: ", reflect.TypeOf(obj).String())
}
for key, value := range config.IndexerFuncs {
if value != nil {
indexedTrigger = &indexedTriggerFunc{
indexName: key,
indexerFunc: value,
}
}
}
}
clock := clock.RealClock{}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(obj),
versioner: config.Versioner,
newFunc: config.NewFunc,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(obj),
versioner: config.Versioner,
newFunc: config.NewFunc,
indexedTrigger: indexedTrigger,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
@ -319,11 +353,10 @@ func NewCacherFromConfig(config Config) *Cacher {
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh,
clock: clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock),
watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark),
stopCh: stopCh,
clock: clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock),
}
// Ensure that timer is stopped.
@ -361,7 +394,7 @@ func NewCacherFromConfig(config Config) *Cacher {
)
}()
return cacher
return cacher, nil
}
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
@ -417,23 +450,27 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.ready.wait()
triggerValue, triggerSupported := "", false
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
// passed here is aware of exactly the same trigger (at most one).
// Thus, either 0 or 1 values will be returned.
if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
triggerValue, triggerSupported = matchValues[0].Value, true
if c.indexedTrigger != nil {
for _, field := range pred.IndexFields {
if field == c.indexedTrigger.indexName {
if value, ok := pred.Field.RequiresExactMatch(field); ok {
triggerValue, triggerSupported = value, true
}
}
}
}
// If there is triggerFunc defined, but triggerSupported is false,
// If there is indexedTrigger defined, but triggerSupported is false,
// we can't narrow the amount of events significantly at this point.
//
// That said, currently triggerFunc is defined only for Pods and Nodes,
// and there is only constant number of watchers for which triggerSupported
// is false (excluding those issues explicitly by users).
// That said, currently indexedTrigger is defined only for couple resources:
// Pods, Nodes, Secrets and ConfigMaps and there is only a constant
// number of watchers for which triggerSupported is false (excluding those
// issued explicitly by users).
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
if c.triggerFunc != nil && !triggerSupported {
if c.indexedTrigger != nil && !triggerSupported {
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
@ -476,8 +513,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
// Add it to the queue only when server and client support watch bookmarks.
if c.watchBookmarkEnabled && watcher.allowWatchBookmarks {
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
@ -572,7 +609,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}
trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
@ -584,7 +621,10 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil || listVal.Kind() != reflect.Slice {
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)
@ -641,7 +681,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
}
trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
@ -653,7 +693,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil || listVal.Kind() != reflect.Slice {
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)
@ -662,7 +705,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
if err != nil {
return err
}
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
// Resize the slice appropriately, since we already know that none
// of the elements will be filtered out.
@ -678,7 +721,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
@ -708,30 +751,29 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
// TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
// is aware of exactly the same trigger (at most one). Thus calling:
// c.triggerFunc(<some object>)
// can return only 0 or 1 values.
// That means, that triggerValues itself may return up to 2 different values.
if c.triggerFunc == nil {
// baseObjectThreadUnsafe omits locking for cachingObject.
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
if co, ok := object.(*cachingObject); ok {
return co.object
}
return object
}
func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bool) {
if c.indexedTrigger == nil {
return nil, false
}
result := make([]string, 0, 2)
matchValues := c.triggerFunc(event.Object)
if len(matchValues) > 0 {
result = append(result, matchValues[0].Value)
}
result = append(result, c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.Object)))
if event.PrevObject == nil {
return result, len(result) > 0
return result, true
}
prevMatchValues := c.triggerFunc(event.PrevObject)
if len(prevMatchValues) > 0 {
if len(result) == 0 || result[0] != prevMatchValues[0].Value {
result = append(result, prevMatchValues[0].Value)
}
prevTriggerValue := c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.PrevObject))
if result[0] != prevTriggerValue {
result = append(result, prevTriggerValue)
}
return result, len(result) > 0
return result, true
}
func (c *Cacher) processEvent(event *watchCacheEvent) {
@ -745,10 +787,6 @@ func (c *Cacher) processEvent(event *watchCacheEvent) {
func (c *Cacher) dispatchEvents() {
// Jitter to help level out any aggregate load.
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
// Stop the timer when watchBookmarkFeatureGate is not enabled.
if !c.watchBookmarkEnabled && !bookmarkTimer.Stop() {
<-bookmarkTimer.C()
}
defer bookmarkTimer.Stop()
lastProcessedResourceVersion := uint64(0)
@ -765,6 +803,8 @@ func (c *Cacher) dispatchEvents() {
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
if lastProcessedResourceVersion == 0 {
// pop expired watchers in case there has been no update
c.bookmarkWatchers.popExpiredWatchers()
continue
}
bookmarkEvent := &watchCacheEvent{
@ -783,19 +823,99 @@ func (c *Cacher) dispatchEvents() {
}
}
func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
switch event.Type {
case watch.Added, watch.Modified:
if object, err := newCachingObject(event.Object); err == nil {
event.Object = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
// Don't wrap PrevObject for update event (for create events it is nil).
// We only encode those to deliver DELETE watch events, so if
// event.Object is not nil it can be used only for watchers for which
// selector was satisfied for its previous version and is no longer
// satisfied for the current version.
// This is rare enough that it doesn't justify making deep-copy of the
// object (done by newCachingObject) every time.
case watch.Deleted:
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object, err := newCachingObject(event.PrevObject); err == nil {
// Update resource version of the underlying object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
event.PrevObject = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
}
}
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
// Watchers stopped after startDispatching will be delayed to finishDispatching,
// Since add() can block, we explicitly add when cacher is unlocked.
// Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones.
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)
}
} else {
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
//
// Given the deep-copies that are done to create cachingObjects,
// we try to cache serializations only if there are at least 3 watchers.
if len(c.watchersBuffer) >= 3 {
// Make a shallow copy to allow overwriting Object and PrevObject.
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent
}
c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
if !watcher.nonblockingAdd(event) {
c.blockedWatchers = append(c.blockedWatchers, watcher)
}
}
if len(c.blockedWatchers) > 0 {
// dispatchEvent is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
timeout := c.dispatchTimeoutBudget.takeAvailable()
c.timer.Reset(timeout)
// Make sure every watcher will try to send event without blocking first,
// even if the timer has already expired.
timer := c.timer
for _, watcher := range c.blockedWatchers {
if !watcher.add(event, timer) {
// fired, clean the timer by set it to nil.
timer = nil
}
}
// Stop the timer if it is not fired
if timer != nil && !timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-timer.C
}
c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime))
}
}
}
@ -805,7 +925,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
// as we don't delete watcher from bookmarkWatchers when it is stopped.
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
for _, watcher := range watchers {
// watcher.stop() is protected by c.Lock()
// c.Lock() is held here.
// watcher.stopThreadUnsafe() is protected by c.Lock()
if watcher.stopped {
continue
}
@ -820,7 +941,10 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
// startDispatching chooses watchers potentially interested in a given event
// a marks dispatching as true.
func (c *Cacher) startDispatching(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
// It is safe to call triggerValuesThreadUnsafe here, because at this
// point only this thread can access this event (we create a separate
// watchCacheEvent for every dispatch).
triggerValues, supported := c.triggerValuesThreadUnsafe(event)
c.Lock()
defer c.Unlock()
@ -875,7 +999,7 @@ func (c *Cacher) finishDispatching() {
defer c.Unlock()
c.dispatching = false
for _, watcher := range c.watchersToStop {
watcher.stop()
watcher.stopThreadUnsafe()
}
c.watchersToStop = c.watchersToStop[:0]
}
@ -890,7 +1014,7 @@ func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) {
if c.dispatching {
c.watchersToStop = append(c.watchersToStop, watcher)
} else {
watcher.stop()
watcher.stopThreadUnsafe()
}
}
@ -920,7 +1044,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
defer c.Unlock()
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stop()
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
}
@ -1022,8 +1146,8 @@ func (c *errWatcher) Stop() {
}
// cacheWatcher implements watch.Interface
// this is not thread-safe
type cacheWatcher struct {
sync.Mutex
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
@ -1064,12 +1188,8 @@ func (c *cacheWatcher) Stop() {
c.forget()
}
// TODO(#73958)
// stop() is protected by Cacher.Lock(), rename it to
// stopThreadUnsafe and remove the sync.Mutex.
func (c *cacheWatcher) stop() {
c.Lock()
defer c.Unlock()
// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock()
func (c *cacheWatcher) stopThreadUnsafe() {
if !c.stopped {
c.stopped = true
close(c.done)
@ -1078,7 +1198,6 @@ func (c *cacheWatcher) stop() {
}
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
// If we can't send it, don't block on it.
select {
case c.input <- event:
return true
@ -1087,36 +1206,34 @@ func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
}
}
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// Try to send the event immediately, without blocking.
if c.nonblockingAdd(event) {
return
return true
}
// OK, block sending, but only for up to <timeout>.
// cacheWatcher.add is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
timeout := budget.takeAvailable()
timer.Reset(timeout)
select {
case c.input <- event:
if !timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-timer.C
}
case <-timer.C:
closeFunc := func() {
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String())
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
c.forget()
}
budget.returnUnused(timeout - time.Since(startTime))
if timer == nil {
closeFunc()
return false
}
// OK, block sending, but only until timer fires.
select {
case c.input <- event:
return true
case <-timer.C:
closeFunc()
return false
}
}
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
@ -1128,6 +1245,25 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
return c.deadline.Add(-2 * time.Second), true
}
func getEventObject(object runtime.Object) runtime.Object {
if _, ok := object.(runtime.CacheableObject); ok {
// It is safe to return without deep-copy, because the underlying
// object was already deep-copied during construction.
return object
}
return object.DeepCopyObject()
}
func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
if _, ok := object.(*cachingObject); ok {
// We assume that for cachingObject resourceVersion was already propagated before.
return
}
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
}
}
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
if event.Type == watch.Bookmark {
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
@ -1145,15 +1281,13 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
switch {
case curObjPasses && !oldObjPasses:
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)}
case curObjPasses && oldObjPasses:
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
oldObj := getEventObject(event.PrevObject)
updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion)
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}

View file

@ -0,0 +1,397 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"bytes"
"fmt"
"io"
"reflect"
"runtime/debug"
"sync"
"sync/atomic"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
)
var _ runtime.CacheableObject = &cachingObject{}
// metaRuntimeInterface implements runtime.Object and
// metav1.Object interfaces.
type metaRuntimeInterface interface {
runtime.Object
metav1.Object
}
// serializationResult captures a result of serialization.
type serializationResult struct {
// once should be used to ensure serialization is computed once.
once sync.Once
// raw is serialized object.
raw []byte
// err is error from serialization.
err error
}
// serializationsCache is a type for caching serialization results.
type serializationsCache map[runtime.Identifier]*serializationResult
// cachingObject is an object that is able to cache its serializations
// so that each of those is computed exactly once.
//
// cachingObject implements the metav1.Object interface (accessors for
// all metadata fields). However, setters for all fields except from
// SelfLink (which is set lately in the path) are ignored.
type cachingObject struct {
lock sync.RWMutex
// Object for which serializations are cached.
object metaRuntimeInterface
// serializations is a cache containing object`s serializations.
// The value stored in atomic.Value is of type serializationsCache.
// The atomic.Value type is used to allow fast-path.
serializations atomic.Value
}
// newCachingObject performs a deep copy of the given object and wraps it
// into a cachingObject.
// An error is returned if it's not possible to cast the object to
// metav1.Object type.
func newCachingObject(object runtime.Object) (*cachingObject, error) {
if obj, ok := object.(metaRuntimeInterface); ok {
result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)}
result.serializations.Store(make(serializationsCache))
return result, nil
}
return nil, fmt.Errorf("can't cast object to metav1.Object: %#v", object)
}
func (o *cachingObject) getSerializationResult(id runtime.Identifier) *serializationResult {
// Fast-path for getting from cache.
serializations := o.serializations.Load().(serializationsCache)
if result, exists := serializations[id]; exists {
return result
}
// Slow-path (that may require insert).
o.lock.Lock()
defer o.lock.Unlock()
serializations = o.serializations.Load().(serializationsCache)
// Check if in the meantime it wasn't inserted.
if result, exists := serializations[id]; exists {
return result
}
// Insert an entry for <id>. This requires copy of existing map.
newSerializations := make(serializationsCache)
for k, v := range serializations {
newSerializations[k] = v
}
result := &serializationResult{}
newSerializations[id] = result
o.serializations.Store(newSerializations)
return result
}
// CacheEncode implements runtime.CacheableObject interface.
// It serializes the object and writes the result to given io.Writer trying
// to first use the already cached result and falls back to a given encode
// function in case of cache miss.
// It assumes that for a given identifier, the encode function always encodes
// each input object into the same output format.
func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
result := o.getSerializationResult(id)
result.once.Do(func() {
buffer := bytes.NewBuffer(nil)
result.err = encode(o.GetObject(), buffer)
result.raw = buffer.Bytes()
})
// Once invoked, fields of serialization will not change.
if result.err != nil {
return result.err
}
_, err := w.Write(result.raw)
return err
}
// GetObject implements runtime.CacheableObject interface.
// It returns deep-copy of the wrapped object to return ownership of it
// to the called according to the contract of the interface.
func (o *cachingObject) GetObject() runtime.Object {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.DeepCopyObject().(metaRuntimeInterface)
}
// GetObjectKind implements runtime.Object interface.
func (o *cachingObject) GetObjectKind() schema.ObjectKind {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetObjectKind()
}
// DeepCopyObject implements runtime.Object interface.
func (o *cachingObject) DeepCopyObject() runtime.Object {
// DeepCopyObject on cachingObject is not expected to be called anywhere.
// However, to be on the safe-side, we implement it, though given the
// cache is only an optimization we ignore copying it.
result := &cachingObject{}
result.serializations.Store(make(serializationsCache))
o.lock.RLock()
defer o.lock.RUnlock()
result.object = o.object.DeepCopyObject().(metaRuntimeInterface)
return result
}
var (
invalidationCacheTimestampLock sync.Mutex
invalidationCacheTimestamp time.Time
)
// shouldLogCacheInvalidation allows for logging cache-invalidation
// at most once per second (to avoid spamming logs in case of issues).
func shouldLogCacheInvalidation(now time.Time) bool {
invalidationCacheTimestampLock.Lock()
defer invalidationCacheTimestampLock.Unlock()
if invalidationCacheTimestamp.Add(time.Second).Before(now) {
invalidationCacheTimestamp = now
return true
}
return false
}
func (o *cachingObject) invalidateCacheLocked() {
if cache, ok := o.serializations.Load().(serializationsCache); ok && len(cache) == 0 {
return
}
// We don't expect cache invalidation to happen - so we want
// to log the stacktrace to allow debugging if that will happen.
// OTOH, we don't want to spam logs with it.
// So we try to log it at most once per second.
if shouldLogCacheInvalidation(time.Now()) {
klog.Warningf("Unexpected cache invalidation for %#v\n%s", o.object, string(debug.Stack()))
}
o.serializations.Store(make(serializationsCache))
}
// The following functions implement metav1.Object interface:
// - getters simply delegate for the underlying object
// - setters check if operations isn't noop and if so,
// invalidate the cache and delegate for the underlying object
func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) {
if fastPath := func() bool {
o.lock.RLock()
defer o.lock.RUnlock()
return isNoop()
}(); fastPath {
return
}
o.lock.Lock()
defer o.lock.Unlock()
if isNoop() {
return
}
o.invalidateCacheLocked()
set()
}
func (o *cachingObject) GetNamespace() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetNamespace()
}
func (o *cachingObject) SetNamespace(namespace string) {
o.conditionalSet(
func() bool { return o.object.GetNamespace() == namespace },
func() { o.object.SetNamespace(namespace) },
)
}
func (o *cachingObject) GetName() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetName()
}
func (o *cachingObject) SetName(name string) {
o.conditionalSet(
func() bool { return o.object.GetName() == name },
func() { o.object.SetName(name) },
)
}
func (o *cachingObject) GetGenerateName() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetGenerateName()
}
func (o *cachingObject) SetGenerateName(name string) {
o.conditionalSet(
func() bool { return o.object.GetGenerateName() == name },
func() { o.object.SetGenerateName(name) },
)
}
func (o *cachingObject) GetUID() types.UID {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetUID()
}
func (o *cachingObject) SetUID(uid types.UID) {
o.conditionalSet(
func() bool { return o.object.GetUID() == uid },
func() { o.object.SetUID(uid) },
)
}
func (o *cachingObject) GetResourceVersion() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetResourceVersion()
}
func (o *cachingObject) SetResourceVersion(version string) {
o.conditionalSet(
func() bool { return o.object.GetResourceVersion() == version },
func() { o.object.SetResourceVersion(version) },
)
}
func (o *cachingObject) GetGeneration() int64 {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetGeneration()
}
func (o *cachingObject) SetGeneration(generation int64) {
o.conditionalSet(
func() bool { return o.object.GetGeneration() == generation },
func() { o.object.SetGeneration(generation) },
)
}
func (o *cachingObject) GetSelfLink() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetSelfLink()
}
func (o *cachingObject) SetSelfLink(selfLink string) {
o.conditionalSet(
func() bool { return o.object.GetSelfLink() == selfLink },
func() { o.object.SetSelfLink(selfLink) },
)
}
func (o *cachingObject) GetCreationTimestamp() metav1.Time {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetCreationTimestamp()
}
func (o *cachingObject) SetCreationTimestamp(timestamp metav1.Time) {
o.conditionalSet(
func() bool { return o.object.GetCreationTimestamp() == timestamp },
func() { o.object.SetCreationTimestamp(timestamp) },
)
}
func (o *cachingObject) GetDeletionTimestamp() *metav1.Time {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetDeletionTimestamp()
}
func (o *cachingObject) SetDeletionTimestamp(timestamp *metav1.Time) {
o.conditionalSet(
func() bool { return o.object.GetDeletionTimestamp() == timestamp },
func() { o.object.SetDeletionTimestamp(timestamp) },
)
}
func (o *cachingObject) GetDeletionGracePeriodSeconds() *int64 {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetDeletionGracePeriodSeconds()
}
func (o *cachingObject) SetDeletionGracePeriodSeconds(gracePeriodSeconds *int64) {
o.conditionalSet(
func() bool { return o.object.GetDeletionGracePeriodSeconds() == gracePeriodSeconds },
func() { o.object.SetDeletionGracePeriodSeconds(gracePeriodSeconds) },
)
}
func (o *cachingObject) GetLabels() map[string]string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetLabels()
}
func (o *cachingObject) SetLabels(labels map[string]string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetLabels(), labels) },
func() { o.object.SetLabels(labels) },
)
}
func (o *cachingObject) GetAnnotations() map[string]string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetAnnotations()
}
func (o *cachingObject) SetAnnotations(annotations map[string]string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetAnnotations(), annotations) },
func() { o.object.SetAnnotations(annotations) },
)
}
func (o *cachingObject) GetFinalizers() []string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetFinalizers()
}
func (o *cachingObject) SetFinalizers(finalizers []string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetFinalizers(), finalizers) },
func() { o.object.SetFinalizers(finalizers) },
)
}
func (o *cachingObject) GetOwnerReferences() []metav1.OwnerReference {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetOwnerReferences()
}
func (o *cachingObject) SetOwnerReferences(references []metav1.OwnerReference) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetOwnerReferences(), references) },
func() { o.object.SetOwnerReferences(references) },
)
}
func (o *cachingObject) GetClusterName() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetClusterName()
}
func (o *cachingObject) SetClusterName(clusterName string) {
o.conditionalSet(
func() bool { return o.object.GetClusterName() == clusterName },
func() { o.object.SetClusterName(clusterName) },
)
}
func (o *cachingObject) GetManagedFields() []metav1.ManagedFieldsEntry {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetManagedFields()
}
func (o *cachingObject) SetManagedFields(managedFields []metav1.ManagedFieldsEntry) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetManagedFields(), managedFields) },
func() { o.object.SetManagedFields(managedFields) },
)
}

View file

@ -40,6 +40,10 @@ const (
// before terminating request and returning Timeout error with retry
// after suggestion.
blockTimeout = 3 * time.Second
// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
// after receiving a 'too high resource version' error.
resourceVersionTooHighRetrySeconds = 1
)
// watchCacheEvent is a single "watch event" that is send to users of
@ -219,7 +223,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
return err
}
watchCacheEvent := &watchCacheEvent{
wcEvent := &watchCacheEvent{
Type: event.Type,
Object: elem.Object,
ObjLabels: elem.Labels,
@ -242,12 +246,12 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
}
if exists {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
wcEvent.PrevObject = previousElem.Object
wcEvent.PrevObjLabels = previousElem.Labels
wcEvent.PrevObjFields = previousElem.Fields
}
w.updateCache(watchCacheEvent)
w.updateCache(wcEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
@ -260,7 +264,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
// This is safe as long as there is at most one call to processEvent in flight
// at any point in time.
if w.eventHandler != nil {
w.eventHandler(watchCacheEvent)
w.eventHandler(wcEvent)
}
return nil
}
@ -303,8 +307,8 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= blockTimeout {
// Timeout with retry after 1 second.
return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
// Request that the client retry after 'resourceVersionTooHighRetrySeconds' seconds.
return storage.NewTooLargeResourceVersionError(resourceVersion, w.resourceVersion, resourceVersionTooHighRetrySeconds)
}
w.cond.Wait()
}
@ -464,7 +468,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
return result, nil
}
if resourceVersion < oldest-1 {
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
}
// Binary search the smallest index at which resourceVersion is greater than the given one.