mirror of
https://github.com/kubernetes-sigs/prometheus-adapter.git
synced 2026-04-07 10:17:51 +00:00
Add vendor folder to git
This commit is contained in:
parent
66cf5eaafb
commit
183585f56f
6916 changed files with 2629581 additions and 1 deletions
69
vendor/k8s.io/apiserver/pkg/storage/etcd3/BUILD
generated
vendored
Normal file
69
vendor/k8s.io/apiserver/pkg/storage/etcd3/BUILD
generated
vendored
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"compact_test.go",
|
||||
"store_test.go",
|
||||
"watcher_test.go",
|
||||
],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/integration:go_default_library",
|
||||
"//vendor/golang.org/x/net/context:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/testing:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/apis/example:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/tests:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"compact.go",
|
||||
"event.go",
|
||||
"store.go",
|
||||
"watcher.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/mvcc/mvccpb:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/golang.org/x/net/context:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library",
|
||||
],
|
||||
)
|
||||
5
vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS
generated
vendored
Normal file
5
vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS
generated
vendored
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
reviewers:
|
||||
- wojtek-t
|
||||
- timothysc
|
||||
- madhusudancs
|
||||
- hongchaodeng
|
||||
161
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go
generated
vendored
Normal file
161
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go
generated
vendored
Normal file
|
|
@ -0,0 +1,161 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
compactInterval = 5 * time.Minute
|
||||
compactRevKey = "compact_rev_key"
|
||||
)
|
||||
|
||||
var (
|
||||
endpointsMapMu sync.Mutex
|
||||
endpointsMap map[string]struct{}
|
||||
)
|
||||
|
||||
func init() {
|
||||
endpointsMap = make(map[string]struct{})
|
||||
}
|
||||
|
||||
// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
|
||||
// By default, we save the most recent 10 minutes data and compact versions > 10minutes ago.
|
||||
// It should be enough for slow watchers and to tolerate burst.
|
||||
// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
|
||||
func StartCompactor(ctx context.Context, client *clientv3.Client) {
|
||||
endpointsMapMu.Lock()
|
||||
defer endpointsMapMu.Unlock()
|
||||
|
||||
// In one process, we can have only one compactor for one cluster.
|
||||
// Currently we rely on endpoints to differentiate clusters.
|
||||
for _, ep := range client.Endpoints() {
|
||||
if _, ok := endpointsMap[ep]; ok {
|
||||
glog.V(4).Infof("compactor already exists for endpoints %v", client.Endpoints())
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, ep := range client.Endpoints() {
|
||||
endpointsMap[ep] = struct{}{}
|
||||
}
|
||||
|
||||
go compactor(ctx, client, compactInterval)
|
||||
}
|
||||
|
||||
// compactor periodically compacts historical versions of keys in etcd.
|
||||
// It will compact keys with versions older than given interval.
|
||||
// In other words, after compaction, it will only contain keys set during last interval.
|
||||
// Any API call for the older versions of keys will return error.
|
||||
// Interval is the time interval between each compaction. The first compaction happens after "interval".
|
||||
func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
|
||||
// Technical definitions:
|
||||
// We have a special key in etcd defined as *compactRevKey*.
|
||||
// compactRevKey's value will be set to the string of last compacted revision.
|
||||
// compactRevKey's version will be used as logical time for comparison. THe version is referred as compact time.
|
||||
// Initially, because the key doesn't exist, the compact time (version) is 0.
|
||||
//
|
||||
// Algorithm:
|
||||
// - Compare to see if (local compact_time) = (remote compact_time).
|
||||
// - If yes, increment both local and remote compact_time, and do a compaction.
|
||||
// - If not, set local to remote compact_time.
|
||||
//
|
||||
// Technical details/insights:
|
||||
//
|
||||
// The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
|
||||
// CAS later and would try again in 10 minutes. If an APIServer crashed, another one would "take over" the lease.
|
||||
//
|
||||
// For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
|
||||
// at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1', and try again in t2' (t2' > t2).
|
||||
// If C1 crashed and wouldn't compact at t2, C2 would CAS successfully at t2'.
|
||||
//
|
||||
// oldRev(t2) curRev(t2)
|
||||
// +
|
||||
// oldRev curRev |
|
||||
// + + |
|
||||
// | | |
|
||||
// | | t1' | t2'
|
||||
// +---v-------------v----^---------v------^---->
|
||||
// t0 t1 t2
|
||||
//
|
||||
// We have the guarantees:
|
||||
// - in normal cases, the interval is 10 minutes.
|
||||
// - in failover, the interval is >10m and <20m
|
||||
//
|
||||
// FAQ:
|
||||
// - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
|
||||
// etcd API.
|
||||
// - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
|
||||
// every 10 minutes. This is very unlikely affecting or affected w.r.t. server load.
|
||||
|
||||
var compactTime int64
|
||||
var rev int64
|
||||
var err error
|
||||
for {
|
||||
select {
|
||||
case <-time.After(interval):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
compactTime, rev, err = compact(ctx, client, compactTime, rev)
|
||||
if err != nil {
|
||||
glog.Errorf("etcd: endpoint (%v) compact failed: %v", client.Endpoints(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// compact compacts etcd store and returns current rev.
|
||||
// It will return the current compact time and global revision if no error occurred.
|
||||
// Note that CAS fail will not incur any error.
|
||||
func compact(ctx context.Context, client *clientv3.Client, t, rev int64) (int64, int64, error) {
|
||||
resp, err := client.KV.Txn(ctx).If(
|
||||
clientv3.Compare(clientv3.Version(compactRevKey), "=", t),
|
||||
).Then(
|
||||
clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10)), // Expect side effect: increment Version
|
||||
).Else(
|
||||
clientv3.OpGet(compactRevKey),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return t, rev, err
|
||||
}
|
||||
|
||||
curRev := resp.Header.Revision
|
||||
|
||||
if !resp.Succeeded {
|
||||
curTime := resp.Responses[0].GetResponseRange().Kvs[0].Version
|
||||
return curTime, curRev, nil
|
||||
}
|
||||
curTime := t + 1
|
||||
|
||||
if rev == 0 {
|
||||
// We don't compact on bootstrap.
|
||||
return curTime, curRev, nil
|
||||
}
|
||||
if _, err = client.Compact(ctx, rev); err != nil {
|
||||
return curTime, curRev, err
|
||||
}
|
||||
glog.Infof("etcd: compacted rev (%d), endpoints (%v)", rev, client.Endpoints())
|
||||
return curTime, curRev, nil
|
||||
}
|
||||
87
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go
generated
vendored
Normal file
87
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go
generated
vendored
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestCompact(t *testing.T) {
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
client := cluster.RandClient()
|
||||
ctx := context.Background()
|
||||
|
||||
putResp, err := client.Put(ctx, "/somekey", "data")
|
||||
if err != nil {
|
||||
t.Fatalf("Put failed: %v", err)
|
||||
}
|
||||
|
||||
putResp1, err := client.Put(ctx, "/somekey", "data2")
|
||||
if err != nil {
|
||||
t.Fatalf("Put failed: %v", err)
|
||||
}
|
||||
|
||||
_, _, err = compact(ctx, client, 0, putResp1.Header.Revision)
|
||||
if err != nil {
|
||||
t.Fatalf("compact failed: %v", err)
|
||||
}
|
||||
|
||||
obj, err := client.Get(ctx, "/somekey", clientv3.WithRev(putResp.Header.Revision))
|
||||
if err != etcdrpc.ErrCompacted {
|
||||
t.Errorf("Expecting ErrCompacted, but get=%v err=%v", obj, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCompactConflict tests that two compactors (Let's use C1, C2) are trying to compact etcd cluster with the same
|
||||
// logical time.
|
||||
// - C1 compacts first. It will succeed.
|
||||
// - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one.
|
||||
func TestCompactConflict(t *testing.T) {
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
client := cluster.RandClient()
|
||||
ctx := context.Background()
|
||||
|
||||
putResp, err := client.Put(ctx, "/somekey", "data")
|
||||
if err != nil {
|
||||
t.Fatalf("Put failed: %v", err)
|
||||
}
|
||||
|
||||
// Compact first. It would do the compaction and return compact time which is incremented by 1.
|
||||
curTime, _, err := compact(ctx, client, 0, putResp.Header.Revision)
|
||||
if err != nil {
|
||||
t.Fatalf("compact failed: %v", err)
|
||||
}
|
||||
if curTime != 1 {
|
||||
t.Errorf("Expect current logical time = 1, get = %v", curTime)
|
||||
}
|
||||
|
||||
// Compact again with the same parameters. It won't do compaction but return the latest compact time.
|
||||
curTime2, _, err := compact(ctx, client, 0, putResp.Header.Revision)
|
||||
if err != nil {
|
||||
t.Fatalf("compact failed: %v", err)
|
||||
}
|
||||
if curTime != curTime2 {
|
||||
t.Errorf("Unexpected curTime (%v) != curTime2 (%v)", curTime, curTime2)
|
||||
}
|
||||
}
|
||||
57
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go
generated
vendored
Normal file
57
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go
generated
vendored
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
)
|
||||
|
||||
type event struct {
|
||||
key string
|
||||
value []byte
|
||||
prevValue []byte
|
||||
rev int64
|
||||
isDeleted bool
|
||||
isCreated bool
|
||||
}
|
||||
|
||||
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
|
||||
func parseKV(kv *mvccpb.KeyValue) *event {
|
||||
return &event{
|
||||
key: string(kv.Key),
|
||||
value: kv.Value,
|
||||
prevValue: nil,
|
||||
rev: kv.ModRevision,
|
||||
isDeleted: false,
|
||||
isCreated: true,
|
||||
}
|
||||
}
|
||||
|
||||
func parseEvent(e *clientv3.Event) *event {
|
||||
ret := &event{
|
||||
key: string(e.Kv.Key),
|
||||
value: e.Kv.Value,
|
||||
rev: e.Kv.ModRevision,
|
||||
isDeleted: e.Type == clientv3.EventTypeDelete,
|
||||
isCreated: e.IsCreate(),
|
||||
}
|
||||
if e.PrevKv != nil {
|
||||
ret.prevValue = e.PrevKv.Value
|
||||
}
|
||||
return ret
|
||||
}
|
||||
573
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
generated
vendored
Normal file
573
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
generated
vendored
Normal file
|
|
@ -0,0 +1,573 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
||||
)
|
||||
|
||||
// authenticatedDataString satisfies the value.Context interface. It uses the key to
|
||||
// authenticate the stored data. This does not defend against reuse of previously
|
||||
// encrypted values under the same key, but will prevent an attacker from using an
|
||||
// encrypted value from a different key. A stronger authenticated data segment would
|
||||
// include the etcd3 Version field (which is incremented on each write to a key and
|
||||
// reset when the key is deleted), but an attacker with write access to etcd can
|
||||
// force deletion and recreation of keys to weaken that angle.
|
||||
type authenticatedDataString string
|
||||
|
||||
// AuthenticatedData implements the value.Context interface.
|
||||
func (d authenticatedDataString) AuthenticatedData() []byte {
|
||||
return []byte(string(d))
|
||||
}
|
||||
|
||||
var _ value.Context = authenticatedDataString("")
|
||||
|
||||
type store struct {
|
||||
client *clientv3.Client
|
||||
// getOpts contains additional options that should be passed
|
||||
// to all Get() calls.
|
||||
getOps []clientv3.OpOption
|
||||
codec runtime.Codec
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
pathPrefix string
|
||||
watcher *watcher
|
||||
}
|
||||
|
||||
type elemForDecode struct {
|
||||
data []byte
|
||||
rev uint64
|
||||
}
|
||||
|
||||
type objState struct {
|
||||
obj runtime.Object
|
||||
meta *storage.ResponseMeta
|
||||
rev int64
|
||||
data []byte
|
||||
stale bool
|
||||
}
|
||||
|
||||
// New returns an etcd3 implementation of storage.Interface.
|
||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer) storage.Interface {
|
||||
return newStore(c, true, codec, prefix, transformer)
|
||||
}
|
||||
|
||||
// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
|
||||
// where Get operations don't require quorum read.
|
||||
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer) storage.Interface {
|
||||
return newStore(c, false, codec, prefix, transformer)
|
||||
}
|
||||
|
||||
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
|
||||
versioner := etcd.APIObjectVersioner{}
|
||||
result := &store{
|
||||
client: c,
|
||||
codec: codec,
|
||||
versioner: versioner,
|
||||
transformer: transformer,
|
||||
// for compatibility with etcd2 impl.
|
||||
// no-op for default prefix of '/registry'.
|
||||
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
watcher: newWatcher(c, codec, versioner, transformer),
|
||||
}
|
||||
if !quorumRead {
|
||||
// In case of non-quorum reads, we can set WithSerializable()
|
||||
// options for all Get operations.
|
||||
result.getOps = append(result.getOps, clientv3.WithSerializable())
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Versioner implements storage.Interface.Versioner.
|
||||
func (s *store) Versioner() storage.Versioner {
|
||||
return s.versioner
|
||||
}
|
||||
|
||||
// Get implements storage.Interface.Get.
|
||||
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(getResp.Kvs) == 0 {
|
||||
if ignoreNotFound {
|
||||
return runtime.SetZeroValue(out)
|
||||
}
|
||||
return storage.NewKeyNotFoundError(key, 0)
|
||||
}
|
||||
kv := getResp.Kvs[0]
|
||||
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
|
||||
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
||||
}
|
||||
|
||||
// Create implements storage.Interface.Create.
|
||||
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
||||
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion should not be set on objects to be created")
|
||||
}
|
||||
data, err := runtime.Encode(s.codec, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
|
||||
opts, err := s.ttlOpts(ctx, int64(ttl))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
|
||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||
notFound(key),
|
||||
).Then(
|
||||
clientv3.OpPut(key, string(newData), opts...),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !txnResp.Succeeded {
|
||||
return storage.NewKeyExistsError(key, 0)
|
||||
}
|
||||
|
||||
if out != nil {
|
||||
putResp := txnResp.Responses[0].GetResponsePut()
|
||||
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete implements storage.Interface.Delete.
|
||||
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
|
||||
v, err := conversion.EnforcePtr(out)
|
||||
if err != nil {
|
||||
panic("unable to convert output object to pointer")
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
if preconditions == nil {
|
||||
return s.unconditionalDelete(ctx, key, out)
|
||||
}
|
||||
return s.conditionalDelete(ctx, key, out, v, preconditions)
|
||||
}
|
||||
|
||||
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
|
||||
// We need to do get and delete in single transaction in order to
|
||||
// know the value and revision before deleting it.
|
||||
txnResp, err := s.client.KV.Txn(ctx).If().Then(
|
||||
clientv3.OpGet(key),
|
||||
clientv3.OpDelete(key),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
getResp := txnResp.Responses[0].GetResponseRange()
|
||||
if len(getResp.Kvs) == 0 {
|
||||
return storage.NewKeyNotFoundError(key, 0)
|
||||
}
|
||||
|
||||
kv := getResp.Kvs[0]
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
||||
}
|
||||
|
||||
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error {
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
origState, err := s.getState(getResp, key, v, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkPreconditions(key, preconditions, origState.obj); err != nil {
|
||||
return err
|
||||
}
|
||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
||||
).Then(
|
||||
clientv3.OpDelete(key),
|
||||
).Else(
|
||||
clientv3.OpGet(key),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !txnResp.Succeeded {
|
||||
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||
glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
|
||||
continue
|
||||
}
|
||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||
}
|
||||
}
|
||||
|
||||
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
|
||||
func (s *store) GuaranteedUpdate(
|
||||
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
|
||||
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
|
||||
trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String()))
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
|
||||
v, err := conversion.EnforcePtr(out)
|
||||
if err != nil {
|
||||
panic("unable to convert output object to pointer")
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
|
||||
var origState *objState
|
||||
if len(suggestion) == 1 && suggestion[0] != nil {
|
||||
origState, err = s.getStateFromObject(suggestion[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origState, err = s.getState(getResp, key, v, ignoreNotFound)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
trace.Step("initial value restored")
|
||||
|
||||
transformContext := authenticatedDataString(key)
|
||||
for {
|
||||
if err := checkPreconditions(key, preconditions, origState.obj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ret, ttl, err := s.updateState(origState, tryUpdate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := runtime.Encode(s.codec, ret)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !origState.stale && bytes.Equal(data, origState.data) {
|
||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||
}
|
||||
|
||||
newData, err := s.transformer.TransformToStorage(data, transformContext)
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
|
||||
opts, err := s.ttlOpts(ctx, int64(ttl))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Transaction prepared")
|
||||
|
||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
||||
).Then(
|
||||
clientv3.OpPut(key, string(newData), opts...),
|
||||
).Else(
|
||||
clientv3.OpGet(key),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Transaction committed")
|
||||
if !txnResp.Succeeded {
|
||||
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||
glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
|
||||
origState, err = s.getState(getResp, key, v, ignoreNotFound)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Retry value restored")
|
||||
continue
|
||||
}
|
||||
putResp := txnResp.Responses[0].GetResponsePut()
|
||||
|
||||
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
||||
}
|
||||
}
|
||||
|
||||
// GetToList implements storage.Interface.GetToList.
|
||||
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(getResp.Kvs) == 0 {
|
||||
return nil
|
||||
}
|
||||
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
elems := []*elemForDecode{{
|
||||
data: data,
|
||||
rev: uint64(getResp.Kvs[0].ModRevision),
|
||||
}}
|
||||
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
|
||||
return err
|
||||
}
|
||||
// update version with cluster level revision
|
||||
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
|
||||
}
|
||||
|
||||
// List implements storage.Interface.List.
|
||||
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
// We need to make sure the key ended with "/" so that we only get children "directories".
|
||||
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
|
||||
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
|
||||
if !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
getResp, err := s.client.KV.Get(ctx, key, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
elems := make([]*elemForDecode, 0, len(getResp.Kvs))
|
||||
for _, kv := range getResp.Kvs {
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err))
|
||||
continue
|
||||
}
|
||||
|
||||
elems = append(elems, &elemForDecode{
|
||||
data: data,
|
||||
rev: uint64(kv.ModRevision),
|
||||
})
|
||||
}
|
||||
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
|
||||
return err
|
||||
}
|
||||
// update version with cluster level revision
|
||||
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
|
||||
}
|
||||
|
||||
// Watch implements storage.Interface.Watch.
|
||||
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
return s.watch(ctx, key, resourceVersion, pred, false)
|
||||
}
|
||||
|
||||
// WatchList implements storage.Interface.WatchList.
|
||||
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
return s.watch(ctx, key, resourceVersion, pred, true)
|
||||
}
|
||||
|
||||
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
|
||||
rev, err := storage.ParseWatchResourceVersion(rv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
|
||||
}
|
||||
|
||||
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
||||
state := &objState{
|
||||
obj: reflect.New(v.Type()).Interface().(runtime.Object),
|
||||
meta: &storage.ResponseMeta{},
|
||||
}
|
||||
if len(getResp.Kvs) == 0 {
|
||||
if !ignoreNotFound {
|
||||
return nil, storage.NewKeyNotFoundError(key, 0)
|
||||
}
|
||||
if err := runtime.SetZeroValue(state.obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return nil, storage.NewInternalError(err.Error())
|
||||
}
|
||||
state.rev = getResp.Kvs[0].ModRevision
|
||||
state.meta.ResourceVersion = uint64(state.rev)
|
||||
state.data = data
|
||||
state.stale = stale
|
||||
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
|
||||
state := &objState{
|
||||
obj: obj,
|
||||
meta: &storage.ResponseMeta{},
|
||||
}
|
||||
|
||||
rv, err := s.versioner.ObjectResourceVersion(obj)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get resource version: %v", err)
|
||||
}
|
||||
state.rev = int64(rv)
|
||||
state.meta.ResourceVersion = uint64(state.rev)
|
||||
|
||||
// Compute the serialized form - for that we need to temporarily clean
|
||||
// its resource version field (those are not stored in etcd).
|
||||
if err := s.versioner.UpdateObject(obj, 0); err != nil {
|
||||
return nil, errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||
}
|
||||
state.data, err = runtime.Encode(s.codec, obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.versioner.UpdateObject(state.obj, uint64(rv))
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
|
||||
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
version, err := s.versioner.ObjectResourceVersion(ret)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if version != 0 {
|
||||
// We cannot store object with resourceVersion in etcd. We need to reset it.
|
||||
if err := s.versioner.UpdateObject(ret, 0); err != nil {
|
||||
return nil, 0, fmt.Errorf("UpdateObject failed: %v", err)
|
||||
}
|
||||
}
|
||||
var ttl uint64
|
||||
if ttlPtr != nil {
|
||||
ttl = *ttlPtr
|
||||
}
|
||||
return ret, ttl, nil
|
||||
}
|
||||
|
||||
// ttlOpts returns client options based on given ttl.
|
||||
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
|
||||
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
|
||||
if ttl == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
// TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to
|
||||
// put keys within into same lease. We shall benchmark this and optimize the performance.
|
||||
lcr, err := s.client.Lease.Grant(ctx, ttl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil
|
||||
}
|
||||
|
||||
// decode decodes value of bytes into object. It will also set the object resource version to rev.
|
||||
// On success, objPtr would be set to the object.
|
||||
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
|
||||
if _, err := conversion.EnforcePtr(objPtr); err != nil {
|
||||
panic("unable to convert output object to pointer")
|
||||
}
|
||||
_, _, err := codec.Decode(value, nil, objPtr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
versioner.UpdateObject(objPtr, uint64(rev))
|
||||
return nil
|
||||
}
|
||||
|
||||
// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev.
|
||||
// On success, ListPtr would be set to the list of objects.
|
||||
func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
|
||||
v, err := conversion.EnforcePtr(ListPtr)
|
||||
if err != nil || v.Kind() != reflect.Slice {
|
||||
panic("need ptr to slice")
|
||||
}
|
||||
for _, elem := range elems {
|
||||
obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
versioner.UpdateObject(obj, elem.rev)
|
||||
if filter(obj) {
|
||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
|
||||
if preconditions == nil {
|
||||
return nil
|
||||
}
|
||||
objMeta, err := meta.Accessor(out)
|
||||
if err != nil {
|
||||
return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
|
||||
}
|
||||
if preconditions.UID != nil && *preconditions.UID != objMeta.GetUID() {
|
||||
errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", *preconditions.UID, objMeta.GetUID())
|
||||
return storage.NewInvalidObjError(key, errMsg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func notFound(key string) clientv3.Cmp {
|
||||
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
|
||||
}
|
||||
715
vendor/k8s.io/apiserver/pkg/storage/etcd3/store_test.go
generated
vendored
Normal file
715
vendor/k8s.io/apiserver/pkg/storage/etcd3/store_test.go
generated
vendored
Normal file
|
|
@ -0,0 +1,715 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
apitesting "k8s.io/apimachinery/pkg/api/testing"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"github.com/coreos/etcd/integration"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var scheme = runtime.NewScheme()
|
||||
var codecs = serializer.NewCodecFactory(scheme)
|
||||
|
||||
func init() {
|
||||
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
||||
example.AddToScheme(scheme)
|
||||
examplev1.AddToScheme(scheme)
|
||||
}
|
||||
|
||||
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
|
||||
type prefixTransformer struct {
|
||||
prefix []byte
|
||||
stale bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
|
||||
if ctx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if !bytes.HasPrefix(b, p.prefix) {
|
||||
return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b))
|
||||
}
|
||||
return bytes.TrimPrefix(b, p.prefix), p.stale, p.err
|
||||
}
|
||||
func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
|
||||
if ctx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if len(b) > 0 {
|
||||
return append(append([]byte{}, p.prefix...), b...), p.err
|
||||
}
|
||||
return b, p.err
|
||||
}
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
etcdClient := cluster.RandClient()
|
||||
|
||||
key := "/testkey"
|
||||
out := &example.Pod{}
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
|
||||
// verify that kv pair is empty before set
|
||||
getResp, err := etcdClient.KV.Get(ctx, key)
|
||||
if err != nil {
|
||||
t.Fatalf("etcdClient.KV.Get failed: %v", err)
|
||||
}
|
||||
if len(getResp.Kvs) != 0 {
|
||||
t.Fatalf("expecting empty result on key: %s", key)
|
||||
}
|
||||
|
||||
err = store.Create(ctx, key, obj, out, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
// basic tests of the output
|
||||
if obj.ObjectMeta.Name != out.ObjectMeta.Name {
|
||||
t.Errorf("pod name want=%s, get=%s", obj.ObjectMeta.Name, out.ObjectMeta.Name)
|
||||
}
|
||||
if out.ResourceVersion == "" {
|
||||
t.Errorf("output should have non-empty resource version")
|
||||
}
|
||||
|
||||
// verify that kv pair is not empty after set
|
||||
getResp, err = etcdClient.KV.Get(ctx, key)
|
||||
if err != nil {
|
||||
t.Fatalf("etcdClient.KV.Get failed: %v", err)
|
||||
}
|
||||
if len(getResp.Kvs) == 0 {
|
||||
t.Fatalf("expecting non empty result on key: %s", key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateWithTTL(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key := "/somekey"
|
||||
|
||||
out := &example.Pod{}
|
||||
if err := store.Create(ctx, key, input, out, 1); err != nil {
|
||||
t.Fatalf("Create failed: %v", err)
|
||||
}
|
||||
|
||||
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
testCheckEventType(t, watch.Deleted, w)
|
||||
}
|
||||
|
||||
func TestCreateWithKeyExist(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key, _ := testPropogateStore(ctx, t, store, obj)
|
||||
out := &example.Pod{}
|
||||
err := store.Create(ctx, key, obj, out, 0)
|
||||
if err == nil || !storage.IsNodeExist(err) {
|
||||
t.Errorf("expecting key exists error, but get: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
tests := []struct {
|
||||
key string
|
||||
ignoreNotFound bool
|
||||
expectNotFoundErr bool
|
||||
expectedOut *example.Pod
|
||||
}{{ // test get on existing item
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
expectNotFoundErr: false,
|
||||
expectedOut: storedObj,
|
||||
}, { // test get on non-existing item with ignoreNotFound=false
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: false,
|
||||
expectNotFoundErr: true,
|
||||
}, { // test get on non-existing item with ignoreNotFound=true
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: true,
|
||||
expectNotFoundErr: false,
|
||||
expectedOut: &example.Pod{},
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
out := &example.Pod{}
|
||||
err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound)
|
||||
if tt.expectNotFoundErr {
|
||||
if err == nil || !storage.IsNotFound(err) {
|
||||
t.Errorf("#%d: expecting not found error, but get: %s", i, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Get failed: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(tt.expectedOut, out) {
|
||||
t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedOut, out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnconditionalDelete(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
tests := []struct {
|
||||
key string
|
||||
expectedObj *example.Pod
|
||||
expectNotFoundErr bool
|
||||
}{{ // test unconditional delete on existing key
|
||||
key: key,
|
||||
expectedObj: storedObj,
|
||||
expectNotFoundErr: false,
|
||||
}, { // test unconditional delete on non-existing key
|
||||
key: "/non-existing",
|
||||
expectedObj: nil,
|
||||
expectNotFoundErr: true,
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
out := &example.Pod{} // reset
|
||||
err := store.Delete(ctx, tt.key, out, nil)
|
||||
if tt.expectNotFoundErr {
|
||||
if err == nil || !storage.IsNotFound(err) {
|
||||
t.Errorf("#%d: expecting not found error, but get: %s", i, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(tt.expectedObj, out) {
|
||||
t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedObj, out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConditionalDelete(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
|
||||
tests := []struct {
|
||||
precondition *storage.Preconditions
|
||||
expectInvalidObjErr bool
|
||||
}{{ // test conditional delete with UID match
|
||||
precondition: storage.NewUIDPreconditions("A"),
|
||||
expectInvalidObjErr: false,
|
||||
}, { // test conditional delete with UID mismatch
|
||||
precondition: storage.NewUIDPreconditions("B"),
|
||||
expectInvalidObjErr: true,
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
out := &example.Pod{}
|
||||
err := store.Delete(ctx, key, out, tt.precondition)
|
||||
if tt.expectInvalidObjErr {
|
||||
if err == nil || !storage.IsInvalidObj(err) {
|
||||
t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(storedObj, out) {
|
||||
t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out)
|
||||
}
|
||||
key, storedObj = testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetToList(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
tests := []struct {
|
||||
key string
|
||||
pred storage.SelectionPredicate
|
||||
expectedOut []*example.Pod
|
||||
}{{ // test GetToList on existing key
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{storedObj},
|
||||
}, { // test GetToList on non-existing key
|
||||
key: "/non-existing",
|
||||
pred: storage.Everything,
|
||||
expectedOut: nil,
|
||||
}, { // test GetToList with matching pod name
|
||||
key: "/non-existing",
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
expectedOut: nil,
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
out := &example.PodList{}
|
||||
err := store.GetToList(ctx, tt.key, "", tt.pred, out)
|
||||
if err != nil {
|
||||
t.Fatalf("GetToList failed: %v", err)
|
||||
}
|
||||
if len(out.Items) != len(tt.expectedOut) {
|
||||
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
|
||||
continue
|
||||
}
|
||||
for j, wantPod := range tt.expectedOut {
|
||||
getPod := &out.Items[j]
|
||||
if !reflect.DeepEqual(wantPod, getPod) {
|
||||
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdate(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
|
||||
tests := []struct {
|
||||
key string
|
||||
ignoreNotFound bool
|
||||
precondition *storage.Preconditions
|
||||
expectNotFoundErr bool
|
||||
expectInvalidObjErr bool
|
||||
expectNoUpdate bool
|
||||
transformStale bool
|
||||
}{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: true,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, { // GuaranteedUpdate on non-existing key with ignoreNotFound=true
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: true,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, { // GuaranteedUpdate on existing key
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
}, { // GuaranteedUpdate with same data
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
}, { // GuaranteedUpdate with same data but stale
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: false,
|
||||
transformStale: true,
|
||||
}, { // GuaranteedUpdate with UID match
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: storage.NewUIDPreconditions("A"),
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
}, { // GuaranteedUpdate with UID mismatch
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: storage.NewUIDPreconditions("B"),
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: true,
|
||||
expectNoUpdate: true,
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
out := &example.Pod{}
|
||||
name := fmt.Sprintf("foo-%d", i)
|
||||
if tt.expectNoUpdate {
|
||||
name = storeObj.Name
|
||||
}
|
||||
originalTransformer := store.transformer.(prefixTransformer)
|
||||
if tt.transformStale {
|
||||
transformer := originalTransformer
|
||||
transformer.stale = true
|
||||
store.transformer = transformer
|
||||
}
|
||||
version := storeObj.ResourceVersion
|
||||
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
|
||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||
if tt.expectNotFoundErr && tt.ignoreNotFound {
|
||||
if pod := obj.(*example.Pod); pod.Name != "" {
|
||||
t.Errorf("#%d: expecting zero value, but get=%#v", i, pod)
|
||||
}
|
||||
}
|
||||
pod := *storeObj
|
||||
pod.Name = name
|
||||
return &pod, nil
|
||||
}))
|
||||
store.transformer = originalTransformer
|
||||
|
||||
if tt.expectNotFoundErr {
|
||||
if err == nil || !storage.IsNotFound(err) {
|
||||
t.Errorf("#%d: expecting not found error, but get: %v", i, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if tt.expectInvalidObjErr {
|
||||
if err == nil || !storage.IsInvalidObj(err) {
|
||||
t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
if out.ObjectMeta.Name != name {
|
||||
t.Errorf("#%d: pod name want=%s, get=%s", i, name, out.ObjectMeta.Name)
|
||||
}
|
||||
switch tt.expectNoUpdate {
|
||||
case true:
|
||||
if version != out.ResourceVersion {
|
||||
t.Errorf("#%d: expect no version change, before=%s, after=%s", i, version, out.ResourceVersion)
|
||||
}
|
||||
case false:
|
||||
if version == out.ResourceVersion {
|
||||
t.Errorf("#%d: expect version change, but get the same version=%s", i, version)
|
||||
}
|
||||
}
|
||||
storeObj = out
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key := "/somekey"
|
||||
|
||||
out := &example.Pod{}
|
||||
err := store.GuaranteedUpdate(ctx, key, out, true, nil,
|
||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
ttl := uint64(1)
|
||||
return input, &ttl, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Create failed: %v", err)
|
||||
}
|
||||
|
||||
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
testCheckEventType(t, watch.Deleted, w)
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
var firstToFinish sync.WaitGroup
|
||||
var secondToEnter sync.WaitGroup
|
||||
firstToFinish.Add(1)
|
||||
secondToEnter.Add(1)
|
||||
|
||||
go func() {
|
||||
err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
|
||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
pod.Name = "foo-1"
|
||||
secondToEnter.Wait()
|
||||
return pod, nil
|
||||
}))
|
||||
firstToFinish.Done()
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
updateCount := 0
|
||||
err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
|
||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||
if updateCount == 0 {
|
||||
secondToEnter.Done()
|
||||
firstToFinish.Wait()
|
||||
}
|
||||
updateCount++
|
||||
pod := obj.(*example.Pod)
|
||||
pod.Name = "foo-2"
|
||||
return pod, nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("Second GuaranteedUpdate error %#v", err)
|
||||
}
|
||||
if err := <-errChan; err != nil {
|
||||
t.Fatalf("First GuaranteedUpdate error %#v", err)
|
||||
}
|
||||
|
||||
if updateCount != 2 {
|
||||
t.Errorf("Should have conflict and called update func twice")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransformationFailure(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
ctx := context.Background()
|
||||
|
||||
preset := []struct {
|
||||
key string
|
||||
obj *example.Pod
|
||||
storedObj *example.Pod
|
||||
}{{
|
||||
key: "/one-level/test",
|
||||
obj: &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
|
||||
Spec: storagetests.DeepEqualSafePodSpec(),
|
||||
},
|
||||
}, {
|
||||
key: "/two-level/1/test",
|
||||
obj: &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
|
||||
Spec: storagetests.DeepEqualSafePodSpec(),
|
||||
},
|
||||
}}
|
||||
for i, ps := range preset[:1] {
|
||||
preset[i].storedObj = &example.Pod{}
|
||||
err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// create a second resource with an invalid prefix
|
||||
oldTransformer := store.transformer
|
||||
store.transformer = prefixTransformer{prefix: []byte("otherprefix!")}
|
||||
for i, ps := range preset[1:] {
|
||||
preset[1:][i].storedObj = &example.Pod{}
|
||||
err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
}
|
||||
store.transformer = oldTransformer
|
||||
|
||||
// only the first item is returned, and no error
|
||||
var got example.PodList
|
||||
if err := store.List(ctx, "/", "", storage.Everything, &got); err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if e, a := []example.Pod{*preset[0].storedObj}, got.Items; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a))
|
||||
}
|
||||
|
||||
// Get should fail
|
||||
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
// GuaranteedUpdate without suggestion should return an error
|
||||
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
||||
return input, nil, nil
|
||||
}); !storage.IsInternalError(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
// GuaranteedUpdate with suggestion should not return an error if we don't change the object
|
||||
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
||||
return input, nil, nil
|
||||
}, preset[1].obj); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Delete succeeds but reports an error because we cannot access the body
|
||||
if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil); !storage.IsInternalError(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
// /
|
||||
// - one-level/
|
||||
// | - test
|
||||
// |
|
||||
// - two-level/
|
||||
// - 1/
|
||||
// | - test
|
||||
// |
|
||||
// - 2/
|
||||
// - test
|
||||
preset := []struct {
|
||||
key string
|
||||
obj *example.Pod
|
||||
storedObj *example.Pod
|
||||
}{{
|
||||
key: "/one-level/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||
}, {
|
||||
key: "/two-level/1/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||
}, {
|
||||
key: "/two-level/2/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
||||
}}
|
||||
|
||||
for i, ps := range preset {
|
||||
preset[i].storedObj = &example.Pod{}
|
||||
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
prefix string
|
||||
pred storage.SelectionPredicate
|
||||
expectedOut []*example.Pod
|
||||
}{{ // test List on existing key
|
||||
prefix: "/one-level/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0].storedObj},
|
||||
}, { // test List on non-existing key
|
||||
prefix: "/non-existing/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: nil,
|
||||
}, { // test List with pod name matching
|
||||
prefix: "/one-level/",
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
expectedOut: nil,
|
||||
}, { // test List with multiple levels of directories and expect flattened result
|
||||
prefix: "/two-level/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
out := &example.PodList{}
|
||||
err := store.List(ctx, tt.prefix, "0", tt.pred, out)
|
||||
if err != nil {
|
||||
t.Fatalf("List failed: %v", err)
|
||||
}
|
||||
if len(tt.expectedOut) != len(out.Items) {
|
||||
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
|
||||
continue
|
||||
}
|
||||
for j, wantPod := range tt.expectedOut {
|
||||
getPod := &out.Items[j]
|
||||
if !reflect.DeepEqual(wantPod, getPod) {
|
||||
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
ctx := context.Background()
|
||||
return ctx, store, cluster
|
||||
}
|
||||
|
||||
// testPropogateStore helps propogates store with objects, automates key generation, and returns
|
||||
// keys and stored objects.
|
||||
func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) {
|
||||
// Setup store with a key and grab the output for returning.
|
||||
key := "/testkey"
|
||||
setOutput := &example.Pod{}
|
||||
err := store.Create(ctx, key, obj, setOutput, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
return key, setOutput
|
||||
}
|
||||
|
||||
func TestPrefix(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
transformer := prefixTransformer{prefix: []byte("test!")}
|
||||
testcases := map[string]string{
|
||||
"custom/prefix": "/custom/prefix",
|
||||
"/custom//prefix//": "/custom/prefix",
|
||||
"/registry": "/registry",
|
||||
}
|
||||
for configuredPrefix, effectivePrefix := range testcases {
|
||||
store := newStore(cluster.RandClient(), false, codec, configuredPrefix, transformer)
|
||||
if store.pathPrefix != effectivePrefix {
|
||||
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
|
||||
}
|
||||
}
|
||||
}
|
||||
386
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
generated
vendored
Normal file
386
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
generated
vendored
Normal file
|
|
@ -0,0 +1,386 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
// We have set a buffer in order to reduce times of context switches.
|
||||
incomingBufSize = 100
|
||||
outgoingBufSize = 100
|
||||
)
|
||||
|
||||
type watcher struct {
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
}
|
||||
|
||||
// watchChan implements watch.Interface.
|
||||
type watchChan struct {
|
||||
watcher *watcher
|
||||
key string
|
||||
initialRev int64
|
||||
recursive bool
|
||||
internalFilter storage.FilterFunc
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
incomingEventChan chan *event
|
||||
resultChan chan watch.Event
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
|
||||
return &watcher{
|
||||
client: client,
|
||||
codec: codec,
|
||||
versioner: versioner,
|
||||
transformer: transformer,
|
||||
}
|
||||
}
|
||||
|
||||
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
|
||||
// If rev is zero, it will return the existing object(s) and then start watching from
|
||||
// the maximum revision+1 from returned objects.
|
||||
// If rev is non-zero, it will watch events happened after given revision.
|
||||
// If recursive is false, it watches on given key.
|
||||
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
||||
// pred must be non-nil. Only if pred matches the change, it will be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
if recursive && !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
|
||||
go wc.run()
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
|
||||
wc := &watchChan{
|
||||
watcher: w,
|
||||
key: key,
|
||||
initialRev: rev,
|
||||
recursive: recursive,
|
||||
internalFilter: storage.SimpleFilter(pred),
|
||||
incomingEventChan: make(chan *event, incomingBufSize),
|
||||
resultChan: make(chan watch.Event, outgoingBufSize),
|
||||
errChan: make(chan error, 1),
|
||||
}
|
||||
if pred.Label.Empty() && pred.Field.Empty() {
|
||||
// The filter doesn't filter out any object.
|
||||
wc.internalFilter = nil
|
||||
}
|
||||
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
||||
return wc
|
||||
}
|
||||
|
||||
func (wc *watchChan) run() {
|
||||
watchClosedCh := make(chan struct{})
|
||||
go wc.startWatching(watchClosedCh)
|
||||
|
||||
var resultChanWG sync.WaitGroup
|
||||
resultChanWG.Add(1)
|
||||
go wc.processEvent(&resultChanWG)
|
||||
|
||||
select {
|
||||
case err := <-wc.errChan:
|
||||
if err == context.Canceled {
|
||||
break
|
||||
}
|
||||
errResult := parseError(err)
|
||||
if errResult != nil {
|
||||
// error result is guaranteed to be received by user before closing ResultChan.
|
||||
select {
|
||||
case wc.resultChan <- *errResult:
|
||||
case <-wc.ctx.Done(): // user has given up all results
|
||||
}
|
||||
}
|
||||
case <-watchClosedCh:
|
||||
case <-wc.ctx.Done(): // user cancel
|
||||
}
|
||||
|
||||
// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
|
||||
// It's fine to double cancel.
|
||||
wc.cancel()
|
||||
|
||||
// we need to wait until resultChan wouldn't be used anymore
|
||||
resultChanWG.Wait()
|
||||
close(wc.resultChan)
|
||||
}
|
||||
|
||||
func (wc *watchChan) Stop() {
|
||||
wc.cancel()
|
||||
}
|
||||
|
||||
func (wc *watchChan) ResultChan() <-chan watch.Event {
|
||||
return wc.resultChan
|
||||
}
|
||||
|
||||
// sync tries to retrieve existing data and send them to process.
|
||||
// The revision to watch will be set to the revision in response.
|
||||
// All events sent will have isCreated=true
|
||||
func (wc *watchChan) sync() error {
|
||||
opts := []clientv3.OpOption{}
|
||||
if wc.recursive {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wc.initialRev = getResp.Header.Revision
|
||||
for _, kv := range getResp.Kvs {
|
||||
wc.sendEvent(parseKV(kv))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// startWatching does:
|
||||
// - get current objects if initialRev=0; set initialRev to current rev
|
||||
// - watch on given key and send events to process.
|
||||
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
|
||||
if wc.initialRev == 0 {
|
||||
if err := wc.sync(); err != nil {
|
||||
glog.Errorf("failed to sync with latest state: %v", err)
|
||||
wc.sendError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
|
||||
if wc.recursive {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
|
||||
for wres := range wch {
|
||||
if wres.Err() != nil {
|
||||
err := wres.Err()
|
||||
// If there is an error on server (e.g. compaction), the channel will return it before closed.
|
||||
glog.Errorf("watch chan error: %v", err)
|
||||
wc.sendError(err)
|
||||
return
|
||||
}
|
||||
for _, e := range wres.Events {
|
||||
wc.sendEvent(parseEvent(e))
|
||||
}
|
||||
}
|
||||
// When we come to this point, it's only possible that client side ends the watch.
|
||||
// e.g. cancel the context, close the client.
|
||||
// If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
|
||||
// We should notify the main thread that this goroutine has exited.
|
||||
close(watchClosedCh)
|
||||
}
|
||||
|
||||
// processEvent processes events from etcd watcher and sends results to resultChan.
|
||||
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case e := <-wc.incomingEventChan:
|
||||
res := wc.transform(e)
|
||||
if res == nil {
|
||||
continue
|
||||
}
|
||||
if len(wc.resultChan) == outgoingBufSize {
|
||||
glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+
|
||||
"Probably caused by slow dispatching events to watchers", outgoingBufSize)
|
||||
}
|
||||
// If user couldn't receive results fast enough, we also block incoming events from watcher.
|
||||
// Because storing events in local will cause more memory usage.
|
||||
// The worst case would be closing the fast watcher.
|
||||
select {
|
||||
case wc.resultChan <- *res:
|
||||
case <-wc.ctx.Done():
|
||||
return
|
||||
}
|
||||
case <-wc.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) filter(obj runtime.Object) bool {
|
||||
if wc.internalFilter == nil {
|
||||
return true
|
||||
}
|
||||
return wc.internalFilter(obj)
|
||||
}
|
||||
|
||||
func (wc *watchChan) acceptAll() bool {
|
||||
return wc.internalFilter == nil
|
||||
}
|
||||
|
||||
// transform transforms an event into a result for user if not filtered.
|
||||
func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
||||
curObj, oldObj, err := wc.prepareObjs(e)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to prepare current and previous objects: %v", err)
|
||||
wc.sendError(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
switch {
|
||||
case e.isDeleted:
|
||||
if !wc.filter(oldObj) {
|
||||
return nil
|
||||
}
|
||||
res = &watch.Event{
|
||||
Type: watch.Deleted,
|
||||
Object: oldObj,
|
||||
}
|
||||
case e.isCreated:
|
||||
if !wc.filter(curObj) {
|
||||
return nil
|
||||
}
|
||||
res = &watch.Event{
|
||||
Type: watch.Added,
|
||||
Object: curObj,
|
||||
}
|
||||
default:
|
||||
if wc.acceptAll() {
|
||||
res = &watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: curObj,
|
||||
}
|
||||
return res
|
||||
}
|
||||
curObjPasses := wc.filter(curObj)
|
||||
oldObjPasses := wc.filter(oldObj)
|
||||
switch {
|
||||
case curObjPasses && oldObjPasses:
|
||||
res = &watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: curObj,
|
||||
}
|
||||
case curObjPasses && !oldObjPasses:
|
||||
res = &watch.Event{
|
||||
Type: watch.Added,
|
||||
Object: curObj,
|
||||
}
|
||||
case !curObjPasses && oldObjPasses:
|
||||
res = &watch.Event{
|
||||
Type: watch.Deleted,
|
||||
Object: oldObj,
|
||||
}
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func parseError(err error) *watch.Event {
|
||||
var status *metav1.Status
|
||||
switch {
|
||||
case err == etcdrpc.ErrCompacted:
|
||||
status = &metav1.Status{
|
||||
Status: metav1.StatusFailure,
|
||||
Message: err.Error(),
|
||||
Code: http.StatusGone,
|
||||
Reason: metav1.StatusReasonExpired,
|
||||
}
|
||||
default:
|
||||
status = &metav1.Status{
|
||||
Status: metav1.StatusFailure,
|
||||
Message: err.Error(),
|
||||
Code: http.StatusInternalServerError,
|
||||
Reason: metav1.StatusReasonInternalError,
|
||||
}
|
||||
}
|
||||
|
||||
return &watch.Event{
|
||||
Type: watch.Error,
|
||||
Object: status,
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) sendError(err error) {
|
||||
select {
|
||||
case wc.errChan <- err:
|
||||
case <-wc.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) sendEvent(e *event) {
|
||||
if len(wc.incomingEventChan) == incomingBufSize {
|
||||
glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+
|
||||
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
|
||||
incomingBufSize)
|
||||
}
|
||||
select {
|
||||
case wc.incomingEventChan <- e:
|
||||
case <-wc.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
||||
if !e.isDeleted {
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
// We need to decode prevValue, only if this is deletion event or
|
||||
// the underlying filter doesn't accept all objects (otherwise we
|
||||
// know that the filter for previous object will return true and
|
||||
// we need the object only to compute whether it was filtered out
|
||||
// before).
|
||||
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// Note that this sends the *old* object with the etcd revision for the time at
|
||||
// which it gets deleted.
|
||||
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
return curObj, oldObj, nil
|
||||
}
|
||||
|
||||
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (runtime.Object, error) {
|
||||
obj, err := runtime.Decode(codec, []byte(data))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// ensure resource version is set on the object we load from etcd
|
||||
if err := versioner.UpdateObject(obj, uint64(rev)); err != nil {
|
||||
return nil, fmt.Errorf("failure to version api object (%d) %#v: %v", rev, obj, err)
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
375
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go
generated
vendored
Normal file
375
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go
generated
vendored
Normal file
|
|
@ -0,0 +1,375 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
apitesting "k8s.io/apimachinery/pkg/api/testing"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
)
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
testWatch(t, false)
|
||||
}
|
||||
|
||||
func TestWatchList(t *testing.T) {
|
||||
testWatch(t, true)
|
||||
}
|
||||
|
||||
// It tests that
|
||||
// - first occurrence of objects should notify Add event
|
||||
// - update should trigger Modified event
|
||||
// - update that gets filtered should trigger Deleted event
|
||||
func testWatch(t *testing.T, recursive bool) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
||||
|
||||
tests := []struct {
|
||||
key string
|
||||
pred storage.SelectionPredicate
|
||||
watchTests []*testWatchStruct
|
||||
}{{ // create a key
|
||||
key: "/somekey-1",
|
||||
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}},
|
||||
pred: storage.Everything,
|
||||
}, { // create a key but obj gets filtered. Then update it with unfiltered obj
|
||||
key: "/somekey-3",
|
||||
watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}},
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name=bar"),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
}, { // update
|
||||
key: "/somekey-4",
|
||||
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}},
|
||||
pred: storage.Everything,
|
||||
}, { // delete because of being filtered
|
||||
key: "/somekey-5",
|
||||
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}},
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
}}
|
||||
for i, tt := range tests {
|
||||
w, err := store.watch(ctx, tt.key, "0", tt.pred, recursive)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
var prevObj *example.Pod
|
||||
for _, watchTest := range tt.watchTests {
|
||||
out := &example.Pod{}
|
||||
key := tt.key
|
||||
if recursive {
|
||||
key = key + "/item"
|
||||
}
|
||||
err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return watchTest.obj, nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
if watchTest.expectEvent {
|
||||
expectObj := out
|
||||
if watchTest.watchType == watch.Deleted {
|
||||
expectObj = prevObj
|
||||
expectObj.ResourceVersion = out.ResourceVersion
|
||||
}
|
||||
testCheckResult(t, i, watchTest.watchType, w, expectObj)
|
||||
}
|
||||
prevObj = out
|
||||
}
|
||||
w.Stop()
|
||||
testCheckStop(t, i, w)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteTriggerWatch(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
if err := store.Delete(ctx, key, &example.Pod{}, nil); err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
testCheckEventType(t, watch.Deleted, w)
|
||||
}
|
||||
|
||||
// TestWatchFromZero tests that
|
||||
// - watch from 0 should sync up and grab the object added before
|
||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||
func TestWatchFromZero(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
testCheckResult(t, 0, watch.Added, w, storedObj)
|
||||
w.Stop()
|
||||
|
||||
// Update
|
||||
out := &example.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
|
||||
// Make sure when we watch from 0 we receive an ADDED event
|
||||
w, err = store.Watch(ctx, key, "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
testCheckResult(t, 1, watch.Added, w, out)
|
||||
w.Stop()
|
||||
|
||||
// Update again
|
||||
out = &example.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
|
||||
// Compact previous versions
|
||||
revToCompact, err := strconv.Atoi(out.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
|
||||
}
|
||||
_, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
|
||||
if err != nil {
|
||||
t.Fatalf("Error compacting: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we can still watch from 0 and receive an ADDED event
|
||||
w, err = store.Watch(ctx, key, "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
testCheckResult(t, 2, watch.Added, w, out)
|
||||
}
|
||||
|
||||
// TestWatchFromNoneZero tests that
|
||||
// - watch from non-0 should just watch changes after given version
|
||||
func TestWatchFromNoneZero(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
out := &example.Pod{}
|
||||
store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err
|
||||
}))
|
||||
testCheckResult(t, 0, watch.Modified, w, out)
|
||||
}
|
||||
|
||||
func TestWatchError(t *testing.T) {
|
||||
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
invalidStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
ctx := context.Background()
|
||||
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
validStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
||||
}))
|
||||
testCheckEventType(t, watch.Error, w)
|
||||
}
|
||||
|
||||
func TestWatchContextCancel(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
canceledCtx, cancel := context.WithCancel(ctx)
|
||||
cancel()
|
||||
// When we watch with a canceled context, we should detect that it's context canceled.
|
||||
// We won't take it as error and also close the watcher.
|
||||
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case _, ok := <-w.ResultChan():
|
||||
if ok {
|
||||
t.Error("ResultChan() should be closed")
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||
origCtx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, cancel := context.WithCancel(origCtx)
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
|
||||
// make resutlChan and errChan blocking to ensure ordering.
|
||||
w.resultChan = make(chan watch.Event)
|
||||
w.errChan = make(chan error)
|
||||
// The event flow goes like:
|
||||
// - first we send an error, it should block on resultChan.
|
||||
// - Then we cancel ctx. The blocking on resultChan should be freed up
|
||||
// and run() goroutine should return.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
w.run()
|
||||
wg.Done()
|
||||
}()
|
||||
w.errChan <- fmt.Errorf("some error")
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix())
|
||||
|
||||
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}); err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
|
||||
e := <-w.ResultChan()
|
||||
watchedDeleteObj := e.Object.(*example.Pod)
|
||||
var wres clientv3.WatchResponse
|
||||
wres = <-etcdW
|
||||
|
||||
watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
||||
}
|
||||
if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision {
|
||||
t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d",
|
||||
watchedDeleteRev, wres.Events[0].Kv.ModRevision)
|
||||
}
|
||||
}
|
||||
|
||||
type testWatchStruct struct {
|
||||
obj *example.Pod
|
||||
expectEvent bool
|
||||
watchType watch.EventType
|
||||
}
|
||||
|
||||
type testCodec struct {
|
||||
runtime.Codec
|
||||
}
|
||||
|
||||
func (c *testCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
|
||||
return nil, nil, errors.New("Expected decoding failure")
|
||||
}
|
||||
|
||||
func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) {
|
||||
select {
|
||||
case res := <-w.ResultChan():
|
||||
if res.Type != expectEventType {
|
||||
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
|
||||
select {
|
||||
case res := <-w.ResultChan():
|
||||
if res.Type != expectEventType {
|
||||
t.Errorf("#%d: event type want=%v, get=%v", i, expectEventType, res.Type)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(expectObj, res.Object) {
|
||||
t.Errorf("#%d: obj want=\n%#v\nget=\n%#v", i, expectObj, res.Object)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("#%d: time out after waiting %v on ResultChan", i, wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckStop(t *testing.T, i int, w watch.Interface) {
|
||||
select {
|
||||
case e, ok := <-w.ResultChan():
|
||||
if ok {
|
||||
var obj string
|
||||
switch e.Object.(type) {
|
||||
case *example.Pod:
|
||||
obj = e.Object.(*example.Pod).Name
|
||||
case *metav1.Status:
|
||||
obj = e.Object.(*metav1.Status).Message
|
||||
}
|
||||
t.Errorf("#%d: ResultChan should have been closed. Event: %s. Object: %s", i, e.Type, obj)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("#%d: time out after waiting 1s on ResultChan", i)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue