vendored changes

This commit is contained in:
Sergii Koshel 2020-02-12 17:56:04 +02:00
parent d091fff18b
commit 128f9a29f5
522 changed files with 29974 additions and 25705 deletions

View file

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -53,6 +54,13 @@ var (
},
[]string{"resource"},
)
emptyFunc = func() {}
)
const (
// storageWatchListPageSize is the cacher's request chunk size of
// initial and resync watch lists to storage.
storageWatchListPageSize = int64(10000)
)
func init() {
@ -72,7 +80,6 @@ type Config struct {
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
Type interface{}
ResourcePrefix string
// KeyFunc is used to get a key in the underlying storage for a given object.
@ -85,6 +92,9 @@ type Config struct {
// needs to process an incoming event.
TriggerPublisherFunc storage.TriggerPublisherFunc
// NewFunc is a function that creates new empty object storing a object of type Type.
NewFunc func() runtime.Object
// NewList is a function that creates new empty object storing a list of
// objects of type Type.
NewListFunc func() runtime.Object
@ -150,6 +160,53 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache
}
}
// As we don't need a high precision here, we keep all watchers timeout within a
// second in a bucket, and pop up them once at the timeout. To be more specific,
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
// This is NOT thread-safe.
type watcherBookmarkTimeBuckets struct {
watchersBuckets map[int64][]*cacheWatcher
startBucketID int64
clock clock.Clock
}
func newTimeBucketWatchers(clock clock.Clock) *watcherBookmarkTimeBuckets {
return &watcherBookmarkTimeBuckets{
watchersBuckets: make(map[int64][]*cacheWatcher),
startBucketID: clock.Now().Unix(),
clock: clock,
}
}
// adds a watcher to the bucket, if the deadline is before the start, it will be
// added to the first one.
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
nextTime, ok := w.nextBookmarkTime(t.clock.Now())
if !ok {
return false
}
bucketID := nextTime.Unix()
if bucketID < t.startBucketID {
bucketID = t.startBucketID
}
watchers, _ := t.watchersBuckets[bucketID]
t.watchersBuckets[bucketID] = append(watchers, w)
return true
}
func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher {
currentBucketID := t.clock.Now().Unix()
// There should be one or two elements in almost all cases
expiredWatchers := make([][]*cacheWatcher, 0, 2)
for ; t.startBucketID <= currentBucketID; t.startBucketID++ {
if watchers, ok := t.watchersBuckets[t.startBucketID]; ok {
delete(t.watchersBuckets, t.startBucketID)
expiredWatchers = append(expiredWatchers, watchers)
}
}
return expiredWatchers
}
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool
// Cacher is responsible for serving WATCH and LIST requests for a given
@ -188,6 +245,9 @@ type Cacher struct {
// Versioner is used to handle resource versions.
versioner storage.Versioner
// newFunc is a function that creates new empty object storing a object of type Type.
newFunc func() runtime.Object
// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc storage.TriggerPublisherFunc
@ -206,6 +266,7 @@ type Cacher struct {
stopCh chan struct{}
stopWg sync.WaitGroup
clock clock.Clock
// timer is used to avoid unnecessary allocations in underlying watchers.
timer *time.Timer
@ -219,32 +280,31 @@ type Cacher struct {
// during current dispatching, but stopping was deferred to the end of
// dispatching that event to avoid race with closing channels in watchers.
watchersToStop []*cacheWatcher
// Maintain a timeout queue to send the bookmark event before the watcher times out.
bookmarkWatchers *watcherBookmarkTimeBuckets
// watchBookmark feature-gate
watchBookmarkEnabled bool
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config Config) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
stopCh := make(chan struct{})
obj := config.NewFunc()
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if obj, ok := config.Type.(runtime.Object); ok {
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
}
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
}
stopCh := make(chan struct{})
clock := clock.RealClock{}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(config.Type),
watchCache: watchCache,
reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
objectType: reflect.TypeOf(obj),
versioner: config.Versioner,
newFunc: config.NewFunc,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
watchers: indexedWatchers{
@ -259,15 +319,39 @@ func NewCacherFromConfig(config Config) *Cacher {
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh,
timer: time.NewTimer(time.Duration(0)),
stopCh: stopCh,
clock: clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock),
watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark),
}
watchCache.SetOnEvent(cacher.processEvent)
// Ensure that timer is stopped.
if !cacher.timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-cacher.timer.C
}
watchCache := newWatchCache(
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector.WatchListPageSize = storageWatchListPageSize
cacher.watchCache = watchCache
cacher.reflector = reflector
go cacher.dispatchEvents()
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
defer cacher.terminateAllWatchers()
wait.Until(
func() {
if !cacher.isStopped() {
@ -277,13 +361,6 @@ func NewCacherFromConfig(config Config) *Cacher {
)
}()
// Ensure that timer is stopped.
if !cacher.timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-cacher.timer.C
}
return cacher
}
@ -326,8 +403,8 @@ func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object
}
// Delete implements storage.Interface.
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
return c.storage.Delete(ctx, key, out, preconditions)
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion)
}
// Watch implements storage.Interface.
@ -339,21 +416,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.ready.wait()
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}
triggerValue, triggerSupported := "", false
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
// passed here is aware of exactly the same trigger (at most one).
@ -377,6 +439,29 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
chanSize = 1000
}
// Determine watch timeout('0' means deadline is not set, ignore checking)
deadline, _ := ctx.Deadline()
// Create a watcher here to reduce memory allocations under lock,
// given that memory allocation may trigger GC and block the thread.
// Also note that emptyFunc is a placeholder, until we will be able
// to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
@ -384,13 +469,21 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner)
func() {
c.Lock()
defer c.Unlock()
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
// Add it to the queue only when server and client support watch bookmarks.
if c.watchBookmarkEnabled && watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
}()
go watcher.process(ctx, initEvents, watchRV)
return watcher, nil
}
@ -512,7 +605,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
}
}
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
}
}
@ -587,7 +680,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
}
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
}
}
@ -650,6 +743,15 @@ func (c *Cacher) processEvent(event *watchCacheEvent) {
}
func (c *Cacher) dispatchEvents() {
// Jitter to help level out any aggregate load.
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
// Stop the timer when watchBookmarkFeatureGate is not enabled.
if !c.watchBookmarkEnabled && !bookmarkTimer.Stop() {
<-bookmarkTimer.C()
}
defer bookmarkTimer.Stop()
lastProcessedResourceVersion := uint64(0)
for {
select {
case event, ok := <-c.incoming:
@ -657,6 +759,24 @@ func (c *Cacher) dispatchEvents() {
return
}
c.dispatchEvent(&event)
lastProcessedResourceVersion = event.ResourceVersion
case <-bookmarkTimer.C():
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
if lastProcessedResourceVersion == 0 {
continue
}
bookmarkEvent := &watchCacheEvent{
Type: watch.Bookmark,
Object: c.newFunc(),
ResourceVersion: lastProcessedResourceVersion,
}
if err := c.versioner.UpdateObject(bookmarkEvent.Object, bookmarkEvent.ResourceVersion); err != nil {
klog.Errorf("failure to set resourceVersion to %d on bookmark event %+v", bookmarkEvent.ResourceVersion, bookmarkEvent.Object)
continue
}
c.dispatchEvent(bookmarkEvent)
case <-c.stopCh:
return
}
@ -665,13 +785,36 @@ func (c *Cacher) dispatchEvents() {
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
// Watchers stopped after startDispatching will be delayed to finishDispatching,
// Since add() can block, we explicitly add when cacher is unlocked.
for _, watcher := range c.watchersBuffer {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)
}
} else {
for _, watcher := range c.watchersBuffer {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
}
}
}
c.finishDispatching()
func (c *Cacher) startDispatchingBookmarkEvents() {
// Pop already expired watchers. However, explicitly ignore stopped ones,
// as we don't delete watcher from bookmarkWatchers when it is stopped.
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
for _, watcher := range watchers {
// watcher.stop() is protected by c.Lock()
if watcher.stopped {
continue
}
c.watchersBuffer = append(c.watchersBuffer, watcher)
// Given that we send bookmark event once at deadline-2s, never push again
// after the watcher pops up from the buckets. Once we decide to change the
// strategy to more sophisticated, we may need it here.
}
}
}
// startDispatching chooses watchers potentially interested in a given event
@ -690,6 +833,12 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) {
// gain from avoiding memory allocations is much bigger.
c.watchersBuffer = c.watchersBuffer[:0]
if event.Type == watch.Bookmark {
c.startDispatchingBookmarkEvents()
// return here to reduce following code indentation and diff
return
}
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
c.watchersBuffer = append(c.watchersBuffer, watcher)
@ -753,12 +902,9 @@ func (c *Cacher) isStopped() bool {
// Stop implements the graceful termination.
func (c *Cacher) Stop() {
// avoid stopping twice (note: cachers are shared with subresources)
if c.isStopped() {
return
}
c.stopLock.Lock()
if c.stopped {
// avoid stopping twice (note: cachers are shared with subresources)
c.stopLock.Unlock()
return
}
@ -805,7 +951,8 @@ type cacherListerWatcher struct {
newListFunc func() runtime.Object
}
func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
@ -816,7 +963,14 @@ func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, ne
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", storage.Everything, list); err != nil {
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: options.Limit,
Continue: options.Continue,
}
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil {
return nil, err
}
return list, nil
@ -877,20 +1031,27 @@ type cacheWatcher struct {
stopped bool
forget func()
versioner storage.Versioner
// The watcher will be closed by server after the deadline,
// save it here to send bookmark events before that.
deadline time.Time
allowWatchBookmarks bool
// Object type of the cache watcher interests
objectType reflect.Type
}
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type) *cacheWatcher {
return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
deadline: deadline,
allowWatchBookmarks: allowWatchBookmarks,
objectType: objectType,
}
go watcher.process(initEvents, resourceVersion)
return watcher
}
// Implements watch.Interface.
@ -903,6 +1064,9 @@ func (c *cacheWatcher) Stop() {
c.forget()
}
// TODO(#73958)
// stop() is protected by Cacher.Lock(), rename it to
// stopThreadUnsafe and remove the sync.Mutex.
func (c *cacheWatcher) stop() {
c.Lock()
defer c.Unlock()
@ -913,12 +1077,20 @@ func (c *cacheWatcher) stop() {
}
}
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
// Try to send the event immediately, without blocking.
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
// If we can't send it, don't block on it.
select {
case c.input <- event:
return
return true
default:
return false
}
}
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
// Try to send the event immediately, without blocking.
if c.nonblockingAdd(event) {
return
}
// OK, block sending, but only for up to <timeout>.
@ -947,8 +1119,20 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *ti
budget.returnUnused(timeout - time.Since(startTime))
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
// For now we return 2s before deadline (and maybe +infinity is now already passed this time)
// but it gives us extensibility for the future(false when deadline is not set).
if c.deadline.IsZero() {
return c.deadline, false
}
return c.deadline.Add(-2 * time.Second), true
}
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
if event.Type == watch.Bookmark {
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
}
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
oldObjPasses := false
if event.PrevObject != nil {
@ -956,22 +1140,32 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return
return nil
}
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}
return nil
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
watchEvent := c.convertToWatchEvent(event)
if watchEvent == nil {
// Watcher is not interested in that object.
return
}
// We need to ensure that if we put event X to the c.result, all
@ -993,12 +1187,12 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
}
select {
case c.result <- watchEvent:
case c.result <- *watchEvent:
case <-c.done:
}
}
func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash()
// Check how long we are processing initEvents.
@ -1019,25 +1213,29 @@ func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion ui
for _, event := range initEvents {
c.sendWatchCacheEvent(event)
}
objType := c.objectType.String()
if len(initEvents) > 0 {
objType := reflect.TypeOf(initEvents[0].Object).String()
initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
objType := "<null>"
if len(initEvents) > 0 {
objType = reflect.TypeOf(initEvents[0].Object).String()
}
klog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
}
defer close(c.result)
defer c.Stop()
for event := range c.input {
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
for {
select {
case event, ok := <-c.input:
if !ok {
return
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
case <-ctx.Done():
return
}
}
}
@ -1048,7 +1246,7 @@ type ready struct {
}
func newReady() *ready {
return &ready{c: sync.NewCond(&sync.Mutex{})}
return &ready{c: sync.NewCond(&sync.RWMutex{})}
}
func (r *ready) wait() {
@ -1062,8 +1260,9 @@ func (r *ready) wait() {
// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
func (r *ready) check() bool {
r.c.L.Lock()
defer r.c.L.Unlock()
rwMutex := r.c.L.(*sync.RWMutex)
rwMutex.RLock()
defer rwMutex.RUnlock()
return r.ok
}

View file

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
utiltrace "k8s.io/utils/trace"
)
@ -77,14 +78,6 @@ func storeElementKey(obj interface{}) (string, error) {
return elem.Key, nil
}
// watchCacheElement is a single "watch event" stored in a cache.
// It contains the resource version of the object and the object
// itself.
type watchCacheElement struct {
resourceVersion uint64
watchCacheEvent *watchCacheEvent
}
// watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface.
//
@ -111,7 +104,7 @@ type watchCache struct {
// by endIndex (if cache is full it will be startIndex + capacity).
// Both startIndex and endIndex can be greater than buffer capacity -
// you should always apply modulo capacity to get an index in cache array.
cache []watchCacheElement
cache []*watchCacheEvent
startIndex int
endIndex int
@ -132,7 +125,7 @@ type watchCache struct {
// This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object.
onEvent func(*watchCacheEvent)
eventHandler func(*watchCacheEvent)
// for testing timeouts.
clock clock.Clock
@ -144,18 +137,20 @@ type watchCache struct {
func newWatchCache(
capacity int,
keyFunc func(runtime.Object) (string, error),
eventHandler func(*watchCacheEvent),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
versioner storage.Versioner) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]watchCacheElement, capacity),
cache: make([]*watchCacheEvent, capacity),
startIndex: 0,
endIndex: 0,
store: cache.NewStore(storeElementKey),
resourceVersion: 0,
listResourceVersion: 0,
eventHandler: eventHandler,
clock: clock.RealClock{},
versioner: versioner,
}
@ -211,6 +206,8 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
return object, resourceVersion, nil
}
// processEvent is safe as long as there is at most one call to it in flight
// at any point in time.
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
key, err := w.keyFunc(event.Object)
if err != nil {
@ -231,39 +228,50 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
ResourceVersion: resourceVersion,
}
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()
previous, exists, err := w.store.Get(elem)
if err != nil {
if err := func() error {
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()
previous, exists, err := w.store.Get(elem)
if err != nil {
return err
}
if exists {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
}
w.updateCache(watchCacheEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
return updateFunc(elem)
}(); err != nil {
return err
}
if exists {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
// Avoid calling event handler under lock.
// This is safe as long as there is at most one call to processEvent in flight
// at any point in time.
if w.eventHandler != nil {
w.eventHandler(watchCacheEvent)
}
w.updateCache(resourceVersion, watchCacheEvent)
w.resourceVersion = resourceVersion
w.cond.Broadcast()
return updateFunc(elem)
return nil
}
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
func (w *watchCache) updateCache(event *watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
w.cache[w.endIndex%w.capacity] = event
w.endIndex++
}
@ -394,6 +402,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
w.onReplace()
}
w.cond.Broadcast()
klog.V(3).Infof("Replace watchCache (rev: %v) ", resourceVersion)
return nil
}
@ -403,12 +412,6 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
w.onReplace = onReplace
}
func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
w.Lock()
defer w.Unlock()
w.onEvent = onEvent
}
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
size := w.endIndex - w.startIndex
var oldest uint64
@ -416,7 +419,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
case size >= w.capacity:
// Once the watch event buffer is full, the oldest watch event we can deliver
// is the first one in the buffer.
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
case w.listResourceVersion > 0:
// If the watch event buffer isn't full, the oldest watch event we can deliver
// is one greater than the resource version of the last full list.
@ -426,7 +429,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
// in the buffer.
// This should only happen in unit tests that populate the buffer without
// performing list/replace operations.
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
default:
return nil, fmt.Errorf("watch cache isn't correctly initialized")
}
@ -466,12 +469,12 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
// Binary search the smallest index at which resourceVersion is greater than the given one.
f := func(i int) bool {
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion
}
first := sort.Search(size, f)
result := make([]*watchCacheEvent, size-first)
for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
result[i] = w.cache[(w.startIndex+first+i)%w.capacity]
}
return result, nil
}