mirror of
https://github.com/kubernetes-sigs/prometheus-adapter.git
synced 2026-04-07 10:17:51 +00:00
Check in the vendor directory
Travis seems to be having issues pulling deps, so we'll have to check in the vendor directory and prevent the makefile from trying to regenerate it normally.
This commit is contained in:
parent
98e16bc315
commit
a293b2bf94
2526 changed files with 930931 additions and 4 deletions
25
vendor/k8s.io/apiserver/pkg/storage/etcd/OWNERS
generated
vendored
Executable file
25
vendor/k8s.io/apiserver/pkg/storage/etcd/OWNERS
generated
vendored
Executable file
|
|
@ -0,0 +1,25 @@
|
|||
reviewers:
|
||||
- lavalamp
|
||||
- smarterclayton
|
||||
- wojtek-t
|
||||
- deads2k
|
||||
- derekwaynecarr
|
||||
- caesarxuchao
|
||||
- mikedanese
|
||||
- liggitt
|
||||
- davidopp
|
||||
- pmorie
|
||||
- luxas
|
||||
- janetkuo
|
||||
- roberthbailey
|
||||
- tallclair
|
||||
- timothysc
|
||||
- dims
|
||||
- hongchaodeng
|
||||
- krousey
|
||||
- fgrzadkowski
|
||||
- resouer
|
||||
- pweil-
|
||||
- mqliang
|
||||
- feihujiang
|
||||
- enj
|
||||
148
vendor/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go
generated
vendored
Normal file
148
vendor/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go
generated
vendored
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
)
|
||||
|
||||
// APIObjectVersioner implements versioning and extracting etcd node information
|
||||
// for objects that have an embedded ObjectMeta or ListMeta field.
|
||||
type APIObjectVersioner struct{}
|
||||
|
||||
// UpdateObject implements Versioner
|
||||
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
versionString := ""
|
||||
if resourceVersion != 0 {
|
||||
versionString = strconv.FormatUint(resourceVersion, 10)
|
||||
}
|
||||
accessor.SetResourceVersion(versionString)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateList implements Versioner
|
||||
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string) error {
|
||||
listAccessor, err := meta.ListAccessor(obj)
|
||||
if err != nil || listAccessor == nil {
|
||||
return err
|
||||
}
|
||||
versionString := ""
|
||||
if resourceVersion != 0 {
|
||||
versionString = strconv.FormatUint(resourceVersion, 10)
|
||||
}
|
||||
listAccessor.SetResourceVersion(versionString)
|
||||
listAccessor.SetContinue(nextKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PrepareObjectForStorage clears resource version and self link prior to writing to etcd.
|
||||
func (a APIObjectVersioner) PrepareObjectForStorage(obj runtime.Object) error {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accessor.SetResourceVersion("")
|
||||
accessor.SetSelfLink("")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ObjectResourceVersion implements Versioner
|
||||
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
version := accessor.GetResourceVersion()
|
||||
if len(version) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(version, 10, 64)
|
||||
}
|
||||
|
||||
// ParseWatchResourceVersion takes a resource version argument and converts it to
|
||||
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
|
||||
// an opaque value, the default watch behavior for non-zero watch is to watch
|
||||
// the next value (if you pass "1", you will see updates from "2" onwards).
|
||||
func (a APIObjectVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
|
||||
if resourceVersion == "" || resourceVersion == "0" {
|
||||
return 0, nil
|
||||
}
|
||||
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
||||
if err != nil {
|
||||
return 0, storage.NewInvalidError(field.ErrorList{
|
||||
// Validation errors are supposed to return version-specific field
|
||||
// paths, but this is probably close enough.
|
||||
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
|
||||
})
|
||||
}
|
||||
return version, nil
|
||||
}
|
||||
|
||||
// ParseListResourceVersion takes a resource version argument and converts it to
|
||||
// the etcd version.
|
||||
// TODO: reevaluate whether it is really clearer to have both this and the
|
||||
// Watch version of this function, since they perform the same logic.
|
||||
func (a APIObjectVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) {
|
||||
if resourceVersion == "" {
|
||||
return 0, nil
|
||||
}
|
||||
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
||||
if err != nil {
|
||||
return 0, storage.NewInvalidError(field.ErrorList{
|
||||
// Validation errors are supposed to return version-specific field
|
||||
// paths, but this is probably close enough.
|
||||
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
|
||||
})
|
||||
}
|
||||
return version, nil
|
||||
}
|
||||
|
||||
// APIObjectVersioner implements Versioner
|
||||
var Versioner storage.Versioner = APIObjectVersioner{}
|
||||
|
||||
// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
|
||||
// but etcd resource versions are special, they're actually ints, so we can easily compare them.
|
||||
func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
|
||||
lhsVersion, err := Versioner.ObjectResourceVersion(lhs)
|
||||
if err != nil {
|
||||
// coder error
|
||||
panic(err)
|
||||
}
|
||||
rhsVersion, err := Versioner.ObjectResourceVersion(rhs)
|
||||
if err != nil {
|
||||
// coder error
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if lhsVersion == rhsVersion {
|
||||
return 0
|
||||
}
|
||||
if lhsVersion < rhsVersion {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
17
vendor/k8s.io/apiserver/pkg/storage/etcd/doc.go
generated
vendored
Normal file
17
vendor/k8s.io/apiserver/pkg/storage/etcd/doc.go
generated
vendored
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd // import "k8s.io/apiserver/pkg/storage/etcd"
|
||||
652
vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go
generated
vendored
Normal file
652
vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go
generated
vendored
Normal file
|
|
@ -0,0 +1,652 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilcache "k8s.io/apimachinery/pkg/util/cache"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd/metrics"
|
||||
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
|
||||
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
||||
)
|
||||
|
||||
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
|
||||
// must be able to undo the transformation caused by the other.
|
||||
type ValueTransformer interface {
|
||||
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
|
||||
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
|
||||
// have not changed.
|
||||
TransformStringFromStorage(string) (value string, stale bool, err error)
|
||||
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
|
||||
TransformStringToStorage(string) (value string, err error)
|
||||
}
|
||||
|
||||
type identityTransformer struct{}
|
||||
|
||||
func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) {
|
||||
return s, false, nil
|
||||
}
|
||||
func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
|
||||
|
||||
// IdentityTransformer performs no transformation on the provided values.
|
||||
var IdentityTransformer ValueTransformer = identityTransformer{}
|
||||
|
||||
// Creates a new storage interface from the client
|
||||
// TODO: deprecate in favor of storage.Config abstraction over time
|
||||
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, transformer ValueTransformer) storage.Interface {
|
||||
return &etcdHelper{
|
||||
etcdMembersAPI: etcd.NewMembersAPI(client),
|
||||
etcdKeysAPI: etcd.NewKeysAPI(client),
|
||||
codec: codec,
|
||||
versioner: APIObjectVersioner{},
|
||||
transformer: transformer,
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
quorum: quorum,
|
||||
cache: utilcache.NewCache(cacheSize),
|
||||
}
|
||||
}
|
||||
|
||||
// etcdHelper is the reference implementation of storage.Interface.
|
||||
type etcdHelper struct {
|
||||
etcdMembersAPI etcd.MembersAPI
|
||||
etcdKeysAPI etcd.KeysAPI
|
||||
codec runtime.Codec
|
||||
transformer ValueTransformer
|
||||
// Note that versioner is required for etcdHelper to work correctly.
|
||||
// The public constructors (NewStorage & NewEtcdStorage) are setting it
|
||||
// correctly, so be careful when manipulating with it manually.
|
||||
// optional, has to be set to perform any atomic operations
|
||||
versioner storage.Versioner
|
||||
// prefix for all etcd keys
|
||||
pathPrefix string
|
||||
// if true, perform quorum read
|
||||
quorum bool
|
||||
|
||||
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
|
||||
// to resourceVersion.
|
||||
// This depends on etcd's indexes being globally unique across all objects/types. This will
|
||||
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
|
||||
// support multi-object transaction that will result in many objects with the same index.
|
||||
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
|
||||
// TODO: Measure how much this cache helps after the conversion code is optimized.
|
||||
cache utilcache.Cache
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) Versioner() storage.Versioner {
|
||||
return h.versioner
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
||||
trace := utiltrace.New("etcdHelper::Create " + getTypeName(obj))
|
||||
defer trace.LogIfLong(250 * time.Millisecond)
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
key = path.Join(h.pathPrefix, key)
|
||||
data, err := runtime.Encode(h.codec, obj)
|
||||
trace.Step("Object encoded")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion may not be set on objects to be created")
|
||||
}
|
||||
if err := h.versioner.PrepareObjectForStorage(obj); err != nil {
|
||||
return fmt.Errorf("PrepareObjectForStorage returned an error: %v", err)
|
||||
}
|
||||
trace.Step("Version checked")
|
||||
|
||||
startTime := time.Now()
|
||||
opts := etcd.SetOptions{
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
|
||||
newBody, err := h.transformer.TransformStringToStorage(string(data))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
|
||||
response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts)
|
||||
trace.Step("Object created")
|
||||
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
||||
if err != nil {
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
if out != nil {
|
||||
if _, err := conversion.EnforcePtr(out); err != nil {
|
||||
panic("unable to convert output object to pointer")
|
||||
}
|
||||
_, _, _, err = h.extractObj(response, err, out, false, false)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
|
||||
if preconditions == nil {
|
||||
return nil
|
||||
}
|
||||
objMeta, err := meta.Accessor(out)
|
||||
if err != nil {
|
||||
return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
|
||||
}
|
||||
if preconditions.UID != nil && *preconditions.UID != objMeta.GetUID() {
|
||||
errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.GetUID())
|
||||
return storage.NewInvalidObjError(key, errMsg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
key = path.Join(h.pathPrefix, key)
|
||||
v, err := conversion.EnforcePtr(out)
|
||||
if err != nil {
|
||||
panic("unable to convert output object to pointer")
|
||||
}
|
||||
|
||||
if preconditions == nil {
|
||||
startTime := time.Now()
|
||||
response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
|
||||
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
// if the object that existed prior to the delete is returned by etcd, update the out object.
|
||||
if err != nil || response.PrevNode != nil {
|
||||
_, _, _, err = h.extractObj(response, err, out, false, true)
|
||||
}
|
||||
}
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
|
||||
// Check the preconditions match.
|
||||
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
||||
for {
|
||||
_, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false)
|
||||
if err != nil {
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
if err := checkPreconditions(key, preconditions, obj); err != nil {
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
index := uint64(0)
|
||||
if node != nil {
|
||||
index = node.ModifiedIndex
|
||||
} else if res != nil {
|
||||
index = res.Index
|
||||
}
|
||||
opt := etcd.DeleteOptions{PrevIndex: index}
|
||||
startTime := time.Now()
|
||||
response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
|
||||
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||
if !etcdutil.IsEtcdTestFailed(err) {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
// if the object that existed prior to the delete is returned by etcd, update the out object.
|
||||
if err != nil || response.PrevNode != nil {
|
||||
_, _, _, err = h.extractObj(response, err, out, false, true)
|
||||
}
|
||||
}
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
|
||||
glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
|
||||
}
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key = path.Join(h.pathPrefix, key)
|
||||
w := newEtcdWatcher(false, h.quorum, nil, pred, h.codec, h.versioner, nil, h.transformer, h)
|
||||
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key = path.Join(h.pathPrefix, key)
|
||||
w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred, h.codec, h.versioner, nil, h.transformer, h)
|
||||
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
key = path.Join(h.pathPrefix, key)
|
||||
_, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
|
||||
return err
|
||||
}
|
||||
|
||||
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
|
||||
// about the response, like the current etcd index and the ttl.
|
||||
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
startTime := time.Now()
|
||||
|
||||
opts := &etcd.GetOptions{
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
|
||||
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
||||
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
||||
return "", nil, nil, false, toStorageErr(err, key, 0)
|
||||
}
|
||||
body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
|
||||
return body, node, response, stale, toStorageErr(err, key, 0)
|
||||
}
|
||||
|
||||
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) {
|
||||
if response != nil {
|
||||
if prevNode {
|
||||
node = response.PrevNode
|
||||
} else {
|
||||
node = response.Node
|
||||
}
|
||||
}
|
||||
if inErr != nil || node == nil || len(node.Value) == 0 {
|
||||
if ignoreNotFound {
|
||||
v, err := conversion.EnforcePtr(objPtr)
|
||||
if err != nil {
|
||||
return "", nil, false, err
|
||||
}
|
||||
v.Set(reflect.Zero(v.Type()))
|
||||
return "", nil, false, nil
|
||||
} else if inErr != nil {
|
||||
return "", nil, false, inErr
|
||||
}
|
||||
return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response)
|
||||
}
|
||||
|
||||
body, stale, err = h.transformer.TransformStringFromStorage(node.Value)
|
||||
if err != nil {
|
||||
return body, nil, stale, storage.NewInternalError(err.Error())
|
||||
}
|
||||
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
|
||||
if err != nil {
|
||||
return body, nil, stale, err
|
||||
}
|
||||
if out != objPtr {
|
||||
return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
|
||||
}
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
_ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
|
||||
return body, node, stale, err
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
trace := utiltrace.New("GetToList " + getTypeName(listObj))
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key = path.Join(h.pathPrefix, key)
|
||||
startTime := time.Now()
|
||||
trace.Step("About to read etcd node")
|
||||
|
||||
opts := &etcd.GetOptions{
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
|
||||
trace.Step("Etcd node read")
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
if err != nil {
|
||||
if etcdutil.IsEtcdNotFound(err) {
|
||||
if etcdErr, ok := err.(etcd.Error); ok {
|
||||
return h.versioner.UpdateList(listObj, etcdErr.Index, "")
|
||||
}
|
||||
return fmt.Errorf("unexpected error from storage: %#v", err)
|
||||
}
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
|
||||
nodes := make([]*etcd.Node, 0)
|
||||
nodes = append(nodes, response.Node)
|
||||
|
||||
if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Object decoded")
|
||||
if err := h.versioner.UpdateList(listObj, response.Index, ""); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
||||
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, pred storage.SelectionPredicate, slicePtr interface{}) error {
|
||||
trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr))
|
||||
defer trace.LogIfLong(400 * time.Millisecond)
|
||||
v, err := conversion.EnforcePtr(slicePtr)
|
||||
if err != nil || v.Kind() != reflect.Slice {
|
||||
// This should not happen at runtime.
|
||||
panic("need ptr to slice")
|
||||
}
|
||||
for _, node := range nodes {
|
||||
if node.Dir {
|
||||
// IMPORTANT: do not log each key as a discrete step in the trace log
|
||||
// as it produces an immense amount of log spam when there is a large
|
||||
// amount of content in the list.
|
||||
if err := h.decodeNodeList(node.Nodes, pred, slicePtr); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if obj, found := h.getFromCache(node.ModifiedIndex, pred); found {
|
||||
// obj != nil iff it matches the pred function.
|
||||
if obj != nil {
|
||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||
}
|
||||
} else {
|
||||
body, _, err := h.transformer.TransformStringFromStorage(node.Value)
|
||||
if err != nil {
|
||||
// omit items from lists and watches that cannot be transformed, but log the error
|
||||
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err))
|
||||
continue
|
||||
}
|
||||
|
||||
obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
|
||||
if matched, err := pred.Matches(obj); err == nil && matched {
|
||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||
}
|
||||
if node.ModifiedIndex != 0 {
|
||||
h.addToCache(node.ModifiedIndex, obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
trace := utiltrace.New("List " + getTypeName(listObj))
|
||||
defer trace.LogIfLong(400 * time.Millisecond)
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key = path.Join(h.pathPrefix, key)
|
||||
startTime := time.Now()
|
||||
trace.Step("About to list etcd node")
|
||||
nodes, index, err := h.listEtcdNode(ctx, key)
|
||||
trace.Step("Etcd node listed")
|
||||
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Node list decoded")
|
||||
if err := h.versioner.UpdateList(listObj, index, ""); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
opts := etcd.GetOptions{
|
||||
Recursive: true,
|
||||
Sort: true,
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
|
||||
if err != nil {
|
||||
var index uint64
|
||||
if etcdError, ok := err.(etcd.Error); ok {
|
||||
index = etcdError.Index
|
||||
}
|
||||
nodes := make([]*etcd.Node, 0)
|
||||
if etcdutil.IsEtcdNotFound(err) {
|
||||
return nodes, index, nil
|
||||
} else {
|
||||
return nodes, index, toStorageErr(err, key, 0)
|
||||
}
|
||||
}
|
||||
return result.Node.Nodes, result.Index, nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) GuaranteedUpdate(
|
||||
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
|
||||
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
|
||||
// Ignore the suggestion about current object.
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
v, err := conversion.EnforcePtr(ptrToType)
|
||||
if err != nil {
|
||||
// Panic is appropriate, because this is a programming error.
|
||||
panic("need ptr to type")
|
||||
}
|
||||
key = path.Join(h.pathPrefix, key)
|
||||
for {
|
||||
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
||||
origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
|
||||
if err != nil {
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
if err := checkPreconditions(key, preconditions, obj); err != nil {
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
meta := storage.ResponseMeta{}
|
||||
if node != nil {
|
||||
meta.TTL = node.TTL
|
||||
meta.ResourceVersion = node.ModifiedIndex
|
||||
}
|
||||
// Get the object to be written by calling tryUpdate.
|
||||
ret, newTTL, err := tryUpdate(obj, meta)
|
||||
if err != nil {
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
|
||||
index := uint64(0)
|
||||
ttl := uint64(0)
|
||||
if node != nil {
|
||||
index = node.ModifiedIndex
|
||||
if node.TTL != 0 {
|
||||
ttl = uint64(node.TTL)
|
||||
}
|
||||
if node.Expiration != nil && ttl == 0 {
|
||||
ttl = 1
|
||||
}
|
||||
} else if res != nil {
|
||||
index = res.Index
|
||||
}
|
||||
|
||||
if newTTL != nil {
|
||||
if ttl != 0 && *newTTL == 0 {
|
||||
// TODO: remove this after we have verified this is no longer an issue
|
||||
glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key)
|
||||
}
|
||||
ttl = *newTTL
|
||||
}
|
||||
|
||||
// Since update object may have a resourceVersion set, we need to clear it here.
|
||||
if err := h.versioner.PrepareObjectForStorage(ret); err != nil {
|
||||
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||
}
|
||||
|
||||
newBodyData, err := runtime.Encode(h.codec, ret)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newBody := string(newBodyData)
|
||||
data, err := h.transformer.TransformStringToStorage(newBody)
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
|
||||
// First time this key has been used, try creating new value.
|
||||
if index == 0 {
|
||||
startTime := time.Now()
|
||||
opts := etcd.SetOptions{
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
|
||||
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
||||
if etcdutil.IsEtcdNodeExist(err) {
|
||||
continue
|
||||
}
|
||||
_, _, _, err = h.extractObj(response, err, ptrToType, false, false)
|
||||
return toStorageErr(err, key, 0)
|
||||
}
|
||||
|
||||
// If we don't send an update, we simply return the currently existing
|
||||
// version of the object. However, the value transformer may indicate that
|
||||
// the on disk representation has changed and that we must commit an update.
|
||||
if newBody == origBody && !stale {
|
||||
_, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
|
||||
return err
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
// Swap origBody with data, if origBody is the latest etcd data.
|
||||
opts := etcd.SetOptions{
|
||||
PrevIndex: index,
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
}
|
||||
response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
|
||||
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
||||
if etcdutil.IsEtcdTestFailed(err) {
|
||||
// Try again.
|
||||
continue
|
||||
}
|
||||
_, _, _, err = h.extractObj(response, err, ptrToType, false, false)
|
||||
return toStorageErr(err, key, int64(index))
|
||||
}
|
||||
}
|
||||
|
||||
func (*etcdHelper) Count(pathPerfix string) (int64, error) {
|
||||
return 0, fmt.Errorf("Count is unimplemented for etcd2!")
|
||||
}
|
||||
|
||||
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
|
||||
// their Node.ModifiedIndex, which is unique across all types.
|
||||
// All implementations must be thread-safe.
|
||||
type etcdCache interface {
|
||||
getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool)
|
||||
addToCache(index uint64, obj runtime.Object)
|
||||
}
|
||||
|
||||
func getTypeName(obj interface{}) string {
|
||||
return reflect.TypeOf(obj).String()
|
||||
}
|
||||
|
||||
func (h *etcdHelper) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.ObserveGetCache(startTime)
|
||||
}()
|
||||
obj, found := h.cache.Get(index)
|
||||
if found {
|
||||
if matched, err := pred.Matches(obj.(runtime.Object)); err != nil || !matched {
|
||||
return nil, true
|
||||
}
|
||||
// We should not return the object itself to avoid polluting the cache if someone
|
||||
// modifies returned values.
|
||||
objCopy := obj.(runtime.Object).DeepCopyObject()
|
||||
metrics.ObserveCacheHit()
|
||||
return objCopy.(runtime.Object), true
|
||||
}
|
||||
metrics.ObserveCacheMiss()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.ObserveAddCache(startTime)
|
||||
}()
|
||||
objCopy := obj.DeepCopyObject()
|
||||
isOverwrite := h.cache.Add(index, objCopy)
|
||||
if !isOverwrite {
|
||||
metrics.ObserveNewEntry()
|
||||
}
|
||||
}
|
||||
|
||||
func toStorageErr(err error, key string, rv int64) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
switch {
|
||||
case etcdutil.IsEtcdNotFound(err):
|
||||
return storage.NewKeyNotFoundError(key, rv)
|
||||
case etcdutil.IsEtcdNodeExist(err):
|
||||
return storage.NewKeyExistsError(key, rv)
|
||||
case etcdutil.IsEtcdTestFailed(err):
|
||||
return storage.NewResourceVersionConflictsError(key, rv)
|
||||
case etcdutil.IsEtcdUnreachable(err):
|
||||
return storage.NewUnreachableError(key, rv)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
500
vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go
generated
vendored
Normal file
500
vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go
generated
vendored
Normal file
|
|
@ -0,0 +1,500 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// Etcd watch event actions
|
||||
const (
|
||||
EtcdCreate = "create"
|
||||
EtcdGet = "get"
|
||||
EtcdSet = "set"
|
||||
EtcdCAS = "compareAndSwap"
|
||||
EtcdDelete = "delete"
|
||||
EtcdCAD = "compareAndDelete"
|
||||
EtcdExpire = "expire"
|
||||
)
|
||||
|
||||
// TransformFunc attempts to convert an object to another object for use with a watcher.
|
||||
type TransformFunc func(runtime.Object) (runtime.Object, error)
|
||||
|
||||
// includeFunc returns true if the given key should be considered part of a watch
|
||||
type includeFunc func(key string) bool
|
||||
|
||||
// exceptKey is an includeFunc that returns false when the provided key matches the watched key
|
||||
func exceptKey(except string) includeFunc {
|
||||
return func(key string) bool {
|
||||
return key != except
|
||||
}
|
||||
}
|
||||
|
||||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||
type etcdWatcher struct {
|
||||
// HighWaterMarks for performance debugging.
|
||||
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
|
||||
// See: https://golang.org/pkg/sync/atomic/ for more information
|
||||
incomingHWM storage.HighWaterMark
|
||||
outgoingHWM storage.HighWaterMark
|
||||
|
||||
encoding runtime.Codec
|
||||
// Note that versioner is required for etcdWatcher to work correctly.
|
||||
// There is no public constructor of it, so be careful when manipulating
|
||||
// with it manually.
|
||||
versioner storage.Versioner
|
||||
transform TransformFunc
|
||||
valueTransformer ValueTransformer
|
||||
|
||||
list bool // If we're doing a recursive watch, should be true.
|
||||
quorum bool // If we enable quorum, should be true
|
||||
include includeFunc
|
||||
pred storage.SelectionPredicate
|
||||
|
||||
etcdIncoming chan *etcd.Response
|
||||
etcdError chan error
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
etcdCallEnded chan struct{}
|
||||
|
||||
outgoing chan watch.Event
|
||||
userStop chan struct{}
|
||||
stopped bool
|
||||
stopLock sync.Mutex
|
||||
// wg is used to avoid calls to etcd after Stop(), and to make sure
|
||||
// that the translate goroutine is not leaked.
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Injectable for testing. Send the event down the outgoing channel.
|
||||
emit func(watch.Event)
|
||||
|
||||
cache etcdCache
|
||||
}
|
||||
|
||||
// watchWaitDuration is the amount of time to wait for an error from watch.
|
||||
const watchWaitDuration = 100 * time.Millisecond
|
||||
|
||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
|
||||
// The versioner must be able to handle the objects that transform creates.
|
||||
func newEtcdWatcher(list bool, quorum bool, include includeFunc, pred storage.SelectionPredicate,
|
||||
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
|
||||
valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher {
|
||||
w := &etcdWatcher{
|
||||
encoding: encoding,
|
||||
versioner: versioner,
|
||||
transform: transform,
|
||||
valueTransformer: valueTransformer,
|
||||
|
||||
list: list,
|
||||
quorum: quorum,
|
||||
include: include,
|
||||
pred: pred,
|
||||
// Buffer this channel, so that the etcd client is not forced
|
||||
// to context switch with every object it gets, and so that a
|
||||
// long time spent decoding an object won't block the *next*
|
||||
// object. Basically, we see a lot of "401 window exceeded"
|
||||
// errors from etcd, and that's due to the client not streaming
|
||||
// results but rather getting them one at a time. So we really
|
||||
// want to never block the etcd client, if possible. The 100 is
|
||||
// mostly arbitrary--we know it goes as high as 50, though.
|
||||
// There's a V(2) log message that prints the length so we can
|
||||
// monitor how much of this buffer is actually used.
|
||||
etcdIncoming: make(chan *etcd.Response, 100),
|
||||
etcdError: make(chan error, 1),
|
||||
// Similarly to etcdIncomming, we don't want to force context
|
||||
// switch on every new incoming object.
|
||||
outgoing: make(chan watch.Event, 100),
|
||||
userStop: make(chan struct{}),
|
||||
stopped: false,
|
||||
wg: sync.WaitGroup{},
|
||||
cache: cache,
|
||||
ctx: nil,
|
||||
cancel: nil,
|
||||
}
|
||||
w.emit = func(e watch.Event) {
|
||||
if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) {
|
||||
// Monitor if this gets backed up, and how much.
|
||||
glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen)
|
||||
}
|
||||
// Give up on user stop, without this we leak a lot of goroutines in tests.
|
||||
select {
|
||||
case w.outgoing <- e:
|
||||
case <-w.userStop:
|
||||
}
|
||||
}
|
||||
// translate will call done. We need to Add() here because otherwise,
|
||||
// if Stop() gets called before translate gets started, there'd be a
|
||||
// problem.
|
||||
w.wg.Add(1)
|
||||
go w.translate()
|
||||
return w
|
||||
}
|
||||
|
||||
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
||||
// as a goroutine.
|
||||
func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer close(w.etcdError)
|
||||
defer close(w.etcdIncoming)
|
||||
|
||||
// All calls to etcd are coming from this function - once it is finished
|
||||
// no other call to etcd should be generated by this watcher.
|
||||
done := func() {}
|
||||
|
||||
// We need to be prepared, that Stop() can be called at any time.
|
||||
// It can potentially also be called, even before this function is called.
|
||||
// If that is the case, we simply skip all the code here.
|
||||
// See #18928 for more details.
|
||||
var watcher etcd.Watcher
|
||||
returned := func() bool {
|
||||
w.stopLock.Lock()
|
||||
defer w.stopLock.Unlock()
|
||||
if w.stopped {
|
||||
// Watcher has already been stopped - don't event initiate it here.
|
||||
return true
|
||||
}
|
||||
w.wg.Add(1)
|
||||
done = w.wg.Done
|
||||
// Perform initialization of watcher under lock - we want to avoid situation when
|
||||
// Stop() is called in the meantime (which in tests can cause etcd termination and
|
||||
// strange behavior here).
|
||||
if resourceVersion == 0 {
|
||||
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
|
||||
if err != nil {
|
||||
w.etcdError <- err
|
||||
return true
|
||||
}
|
||||
resourceVersion = latest
|
||||
}
|
||||
|
||||
opts := etcd.WatcherOptions{
|
||||
Recursive: w.list,
|
||||
AfterIndex: resourceVersion,
|
||||
}
|
||||
watcher = client.Watcher(key, &opts)
|
||||
w.ctx, w.cancel = context.WithCancel(ctx)
|
||||
return false
|
||||
}()
|
||||
defer done()
|
||||
if returned {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
resp, err := watcher.Next(w.ctx)
|
||||
if err != nil {
|
||||
w.etcdError <- err
|
||||
return
|
||||
}
|
||||
w.etcdIncoming <- resp
|
||||
}
|
||||
}
|
||||
|
||||
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
||||
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
opts := etcd.GetOptions{
|
||||
Recursive: recursive,
|
||||
Sort: false,
|
||||
Quorum: quorum,
|
||||
}
|
||||
resp, err := client.Get(ctx, key, &opts)
|
||||
if err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err))
|
||||
return resourceVersion, toStorageErr(err, key, 0)
|
||||
}
|
||||
if etcdError, ok := err.(etcd.Error); ok {
|
||||
resourceVersion = etcdError.Index
|
||||
}
|
||||
return resourceVersion, nil
|
||||
}
|
||||
resourceVersion = resp.Index
|
||||
convertRecursiveResponse(resp.Node, resp, incoming)
|
||||
return
|
||||
}
|
||||
|
||||
// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
|
||||
// by copying the original response. This emulates the behavior of a recursive watch.
|
||||
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
|
||||
if node.Dir {
|
||||
for i := range node.Nodes {
|
||||
convertRecursiveResponse(node.Nodes[i], response, incoming)
|
||||
}
|
||||
return
|
||||
}
|
||||
copied := *response
|
||||
copied.Action = "get"
|
||||
copied.Node = node
|
||||
incoming <- &copied
|
||||
}
|
||||
|
||||
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
|
||||
// called as a goroutine.
|
||||
func (w *etcdWatcher) translate() {
|
||||
defer w.wg.Done()
|
||||
defer close(w.outgoing)
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for {
|
||||
select {
|
||||
case err := <-w.etcdError:
|
||||
if err != nil {
|
||||
var status *metav1.Status
|
||||
switch {
|
||||
case etcdutil.IsEtcdWatchExpired(err):
|
||||
status = &metav1.Status{
|
||||
Status: metav1.StatusFailure,
|
||||
Message: err.Error(),
|
||||
Code: http.StatusGone, // Gone
|
||||
Reason: metav1.StatusReasonExpired,
|
||||
}
|
||||
// TODO: need to generate errors using api/errors which has a circular dependency on this package
|
||||
// no other way to inject errors
|
||||
// case etcdutil.IsEtcdUnreachable(err):
|
||||
// status = errors.NewServerTimeout(...)
|
||||
default:
|
||||
status = &metav1.Status{
|
||||
Status: metav1.StatusFailure,
|
||||
Message: err.Error(),
|
||||
Code: http.StatusInternalServerError,
|
||||
Reason: metav1.StatusReasonInternalError,
|
||||
}
|
||||
}
|
||||
w.emit(watch.Event{
|
||||
Type: watch.Error,
|
||||
Object: status,
|
||||
})
|
||||
}
|
||||
return
|
||||
case <-w.userStop:
|
||||
return
|
||||
case res, ok := <-w.etcdIncoming:
|
||||
if ok {
|
||||
if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) {
|
||||
// Monitor if this gets backed up, and how much.
|
||||
glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen)
|
||||
}
|
||||
w.sendResult(res)
|
||||
}
|
||||
// If !ok, don't return here-- must wait for etcdError channel
|
||||
// to give an error or be closed.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// decodeObject extracts an object from the provided etcd node or returns an error.
|
||||
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
||||
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found {
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj, err := runtime.Decode(w.encoding, []byte(body))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ensure resource version is set on the object we load from etcd
|
||||
if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
|
||||
}
|
||||
|
||||
// perform any necessary transformation
|
||||
if w.transform != nil {
|
||||
obj, err = w.transform(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if node.ModifiedIndex != 0 {
|
||||
w.cache.addToCache(node.ModifiedIndex, obj)
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (w *etcdWatcher) sendAdd(res *etcd.Response) {
|
||||
if res.Node == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected nil node: %#v", res))
|
||||
return
|
||||
}
|
||||
if w.include != nil && !w.include(res.Node.Key) {
|
||||
return
|
||||
}
|
||||
obj, err := w.decodeObject(res.Node)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
||||
return
|
||||
}
|
||||
if matched, err := w.pred.Matches(obj); err != nil || !matched {
|
||||
return
|
||||
}
|
||||
action := watch.Added
|
||||
w.emit(watch.Event{
|
||||
Type: action,
|
||||
Object: obj,
|
||||
})
|
||||
}
|
||||
|
||||
func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
||||
if res.Node == nil {
|
||||
glog.Errorf("unexpected nil node: %#v", res)
|
||||
return
|
||||
}
|
||||
if w.include != nil && !w.include(res.Node.Key) {
|
||||
return
|
||||
}
|
||||
curObj, err := w.decodeObject(res.Node)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
||||
return
|
||||
}
|
||||
curObjPasses := false
|
||||
if matched, err := w.pred.Matches(curObj); err == nil && matched {
|
||||
curObjPasses = true
|
||||
}
|
||||
oldObjPasses := false
|
||||
var oldObj runtime.Object
|
||||
if res.PrevNode != nil && res.PrevNode.Value != "" {
|
||||
// Ignore problems reading the old object.
|
||||
if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
|
||||
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
|
||||
}
|
||||
if matched, err := w.pred.Matches(oldObj); err == nil && matched {
|
||||
oldObjPasses = true
|
||||
}
|
||||
}
|
||||
}
|
||||
// Some changes to an object may cause it to start or stop matching a pred.
|
||||
// We need to report those as adds/deletes. So we have to check both the previous
|
||||
// and current value of the object.
|
||||
switch {
|
||||
case curObjPasses && oldObjPasses:
|
||||
w.emit(watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: curObj,
|
||||
})
|
||||
case curObjPasses && !oldObjPasses:
|
||||
w.emit(watch.Event{
|
||||
Type: watch.Added,
|
||||
Object: curObj,
|
||||
})
|
||||
case !curObjPasses && oldObjPasses:
|
||||
w.emit(watch.Event{
|
||||
Type: watch.Deleted,
|
||||
Object: oldObj,
|
||||
})
|
||||
}
|
||||
// Do nothing if neither new nor old object passed the pred.
|
||||
}
|
||||
|
||||
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
||||
if res.PrevNode == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res))
|
||||
return
|
||||
}
|
||||
if w.include != nil && !w.include(res.PrevNode.Key) {
|
||||
return
|
||||
}
|
||||
node := *res.PrevNode
|
||||
if res.Node != nil {
|
||||
// Note that this sends the *old* object with the etcd index for the time at
|
||||
// which it gets deleted. This will allow users to restart the watch at the right
|
||||
// index.
|
||||
node.ModifiedIndex = res.Node.ModifiedIndex
|
||||
}
|
||||
obj, err := w.decodeObject(&node)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node))
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
||||
return
|
||||
}
|
||||
if matched, err := w.pred.Matches(obj); err != nil || !matched {
|
||||
return
|
||||
}
|
||||
w.emit(watch.Event{
|
||||
Type: watch.Deleted,
|
||||
Object: obj,
|
||||
})
|
||||
}
|
||||
|
||||
func (w *etcdWatcher) sendResult(res *etcd.Response) {
|
||||
switch res.Action {
|
||||
case EtcdCreate, EtcdGet:
|
||||
// "Get" will only happen in watch 0 case, where we explicitly want ADDED event
|
||||
// for initial state.
|
||||
w.sendAdd(res)
|
||||
case EtcdSet, EtcdCAS:
|
||||
w.sendModify(res)
|
||||
case EtcdDelete, EtcdExpire, EtcdCAD:
|
||||
w.sendDelete(res)
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action))
|
||||
}
|
||||
}
|
||||
|
||||
// ResultChan implements watch.Interface.
|
||||
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
|
||||
return w.outgoing
|
||||
}
|
||||
|
||||
// Stop implements watch.Interface.
|
||||
func (w *etcdWatcher) Stop() {
|
||||
w.stopLock.Lock()
|
||||
if w.cancel != nil {
|
||||
w.cancel()
|
||||
w.cancel = nil
|
||||
}
|
||||
if !w.stopped {
|
||||
w.stopped = true
|
||||
close(w.userStop)
|
||||
}
|
||||
w.stopLock.Unlock()
|
||||
|
||||
// Wait until all calls to etcd are finished and no other
|
||||
// will be issued.
|
||||
w.wg.Wait()
|
||||
}
|
||||
122
vendor/k8s.io/apiserver/pkg/storage/etcd/metrics/metrics.go
generated
vendored
Normal file
122
vendor/k8s.io/apiserver/pkg/storage/etcd/metrics/metrics.go
generated
vendored
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
cacheHitCounterOpts = prometheus.CounterOpts{
|
||||
Name: "etcd_helper_cache_hit_count",
|
||||
Help: "Counter of etcd helper cache hits.",
|
||||
}
|
||||
cacheHitCounter = prometheus.NewCounter(cacheHitCounterOpts)
|
||||
cacheMissCounterOpts = prometheus.CounterOpts{
|
||||
Name: "etcd_helper_cache_miss_count",
|
||||
Help: "Counter of etcd helper cache miss.",
|
||||
}
|
||||
cacheMissCounter = prometheus.NewCounter(cacheMissCounterOpts)
|
||||
cacheEntryCounterOpts = prometheus.CounterOpts{
|
||||
Name: "etcd_helper_cache_entry_count",
|
||||
Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " +
|
||||
"because two concurrent threads can miss the cache and generate the same entry twice.",
|
||||
}
|
||||
cacheEntryCounter = prometheus.NewCounter(cacheEntryCounterOpts)
|
||||
cacheGetLatency = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "etcd_request_cache_get_latencies_summary",
|
||||
Help: "Latency in microseconds of getting an object from etcd cache",
|
||||
},
|
||||
)
|
||||
cacheAddLatency = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "etcd_request_cache_add_latencies_summary",
|
||||
Help: "Latency in microseconds of adding an object to etcd cache",
|
||||
},
|
||||
)
|
||||
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "etcd_request_latencies_summary",
|
||||
Help: "Etcd request latency summary in microseconds for each operation and object type.",
|
||||
},
|
||||
[]string{"operation", "type"},
|
||||
)
|
||||
objectCounts = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "etcd_object_counts",
|
||||
Help: "Number of stored objects at the time of last check split by kind.",
|
||||
},
|
||||
[]string{"resource"},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
|
||||
// Register all metrics.
|
||||
func Register() {
|
||||
// Register the metrics.
|
||||
registerMetrics.Do(func() {
|
||||
prometheus.MustRegister(cacheHitCounter)
|
||||
prometheus.MustRegister(cacheMissCounter)
|
||||
prometheus.MustRegister(cacheEntryCounter)
|
||||
prometheus.MustRegister(cacheAddLatency)
|
||||
prometheus.MustRegister(cacheGetLatency)
|
||||
prometheus.MustRegister(etcdRequestLatenciesSummary)
|
||||
prometheus.MustRegister(objectCounts)
|
||||
})
|
||||
}
|
||||
|
||||
func UpdateObjectCount(resourcePrefix string, count int64) {
|
||||
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
|
||||
}
|
||||
|
||||
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
|
||||
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||
}
|
||||
|
||||
func ObserveGetCache(startTime time.Time) {
|
||||
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||
}
|
||||
|
||||
func ObserveAddCache(startTime time.Time) {
|
||||
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||
}
|
||||
|
||||
func ObserveCacheHit() {
|
||||
cacheHitCounter.Inc()
|
||||
}
|
||||
|
||||
func ObserveCacheMiss() {
|
||||
cacheMissCounter.Inc()
|
||||
}
|
||||
|
||||
func ObserveNewEntry() {
|
||||
cacheEntryCounter.Inc()
|
||||
}
|
||||
|
||||
func Reset() {
|
||||
cacheHitCounter = prometheus.NewCounter(cacheHitCounterOpts)
|
||||
cacheMissCounter = prometheus.NewCounter(cacheMissCounterOpts)
|
||||
cacheEntryCounter = prometheus.NewCounter(cacheEntryCounterOpts)
|
||||
// TODO: Reset cacheAddLatency.
|
||||
// TODO: Reset cacheGetLatency.
|
||||
etcdRequestLatenciesSummary.Reset()
|
||||
}
|
||||
19
vendor/k8s.io/apiserver/pkg/storage/etcd/util/doc.go
generated
vendored
Normal file
19
vendor/k8s.io/apiserver/pkg/storage/etcd/util/doc.go
generated
vendored
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package util holds generic etcd-related utility functions that any user of ectd might want to
|
||||
// use, without pulling in kubernetes-specific code.
|
||||
package util // import "k8s.io/apiserver/pkg/storage/etcd/util"
|
||||
99
vendor/k8s.io/apiserver/pkg/storage/etcd/util/etcd_util.go
generated
vendored
Normal file
99
vendor/k8s.io/apiserver/pkg/storage/etcd/util/etcd_util.go
generated
vendored
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
)
|
||||
|
||||
// IsEtcdNotFound returns true if and only if err is an etcd not found error.
|
||||
func IsEtcdNotFound(err error) bool {
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeKeyNotFound)
|
||||
}
|
||||
|
||||
// IsEtcdNodeExist returns true if and only if err is an etcd node already exist error.
|
||||
func IsEtcdNodeExist(err error) bool {
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeNodeExist)
|
||||
}
|
||||
|
||||
// IsEtcdTestFailed returns true if and only if err is an etcd write conflict.
|
||||
func IsEtcdTestFailed(err error) bool {
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeTestFailed)
|
||||
}
|
||||
|
||||
// IsEtcdWatchExpired returns true if and only if err indicates the watch has expired.
|
||||
func IsEtcdWatchExpired(err error) bool {
|
||||
// NOTE: This seems weird why it wouldn't be etcd.ErrorCodeWatcherCleared
|
||||
// I'm using the previous matching value
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeEventIndexCleared)
|
||||
}
|
||||
|
||||
// IsEtcdUnreachable returns true if and only if err indicates the server could not be reached.
|
||||
func IsEtcdUnreachable(err error) bool {
|
||||
// NOTE: The logic has changed previous error code no longer applies
|
||||
return err == etcd.ErrClusterUnavailable
|
||||
}
|
||||
|
||||
// isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode
|
||||
func isEtcdErrorNum(err error, errorCode int) bool {
|
||||
if err != nil {
|
||||
if etcdError, ok := err.(etcd.Error); ok {
|
||||
return etcdError.Code == errorCode
|
||||
}
|
||||
// NOTE: There are other error types returned
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetEtcdVersion performs a version check against the provided Etcd server,
|
||||
// returning the string response, and error (if any).
|
||||
func GetEtcdVersion(host string) (string, error) {
|
||||
response, err := http.Get(host + "/version")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
|
||||
}
|
||||
versionBytes, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(versionBytes), nil
|
||||
}
|
||||
|
||||
type etcdHealth struct {
|
||||
// Note this has to be public so the json library can modify it.
|
||||
Health string `json:"health"`
|
||||
}
|
||||
|
||||
func EtcdHealthCheck(data []byte) error {
|
||||
obj := etcdHealth{}
|
||||
if err := json.Unmarshal(data, &obj); err != nil {
|
||||
return err
|
||||
}
|
||||
if obj.Health != "true" {
|
||||
return fmt.Errorf("Unhealthy status: %s", obj.Health)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue