mirror of
https://github.com/kubernetes-sigs/prometheus-adapter.git
synced 2026-04-07 22:25:03 +00:00
Update deps to Kube 1.11.3
This updates the dependencies to Kube 1.11.3 to pull in a fix allowing requestheader auth to be used without normal client auth (which makes things work on clusters that don't enable client auth normally, like EKS).
This commit is contained in:
parent
262493780f
commit
c916572aca
474 changed files with 40067 additions and 18326 deletions
635
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
635
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
|
|
@ -26,6 +26,7 @@ import (
|
|||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
|
@ -36,16 +37,14 @@ import (
|
|||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/resolver"
|
||||
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
|
||||
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -66,8 +65,6 @@ var (
|
|||
errConnDrain = errors.New("grpc: the connection is drained")
|
||||
// errConnClosing indicates that the connection is closing.
|
||||
errConnClosing = errors.New("grpc: the connection is closing")
|
||||
// errConnUnavailable indicates that the connection is unavailable.
|
||||
errConnUnavailable = errors.New("grpc: the connection is unavailable")
|
||||
// errBalancerClosed indicates that the balancer is closed.
|
||||
errBalancerClosed = errors.New("grpc: balancer is closed")
|
||||
// We use an accessor so that minConnectTimeout can be
|
||||
|
|
@ -90,351 +87,16 @@ var (
|
|||
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
|
||||
// and grpc.WithInsecure() are both called for a connection.
|
||||
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
|
||||
// errNetworkIO indicates that the connection is down due to some network I/O error.
|
||||
errNetworkIO = errors.New("grpc: failed with network I/O error")
|
||||
)
|
||||
|
||||
// dialOptions configure a Dial call. dialOptions are set by the DialOption
|
||||
// values passed to Dial.
|
||||
type dialOptions struct {
|
||||
unaryInt UnaryClientInterceptor
|
||||
streamInt StreamClientInterceptor
|
||||
cp Compressor
|
||||
dc Decompressor
|
||||
bs backoff.Strategy
|
||||
block bool
|
||||
insecure bool
|
||||
timeout time.Duration
|
||||
scChan <-chan ServiceConfig
|
||||
copts transport.ConnectOptions
|
||||
callOptions []CallOption
|
||||
// This is used by v1 balancer dial option WithBalancer to support v1
|
||||
// balancer, and also by WithBalancerName dial option.
|
||||
balancerBuilder balancer.Builder
|
||||
// This is to support grpclb.
|
||||
resolverBuilder resolver.Builder
|
||||
waitForHandshake bool
|
||||
channelzParentID int64
|
||||
disableServiceConfig bool
|
||||
}
|
||||
|
||||
const (
|
||||
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
|
||||
defaultClientMaxSendMessageSize = math.MaxInt32
|
||||
// http2IOBufSize specifies the buffer size for sending frames.
|
||||
defaultWriteBufSize = 32 * 1024
|
||||
defaultReadBufSize = 32 * 1024
|
||||
)
|
||||
|
||||
// RegisterChannelz turns on channelz service.
|
||||
// This is an EXPERIMENTAL API.
|
||||
func RegisterChannelz() {
|
||||
channelz.TurnOn()
|
||||
}
|
||||
|
||||
// DialOption configures how we set up the connection.
|
||||
type DialOption func(*dialOptions)
|
||||
|
||||
// WithWaitForHandshake blocks until the initial settings frame is received from the
|
||||
// server before assigning RPCs to the connection.
|
||||
// Experimental API.
|
||||
func WithWaitForHandshake() DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.waitForHandshake = true
|
||||
}
|
||||
}
|
||||
|
||||
// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
|
||||
// before doing a write on the wire.
|
||||
func WithWriteBufferSize(s int) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.WriteBufferSize = s
|
||||
}
|
||||
}
|
||||
|
||||
// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
|
||||
// for each read syscall.
|
||||
func WithReadBufferSize(s int) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.ReadBufferSize = s
|
||||
}
|
||||
}
|
||||
|
||||
// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
|
||||
// The lower bound for window size is 64K and any value smaller than that will be ignored.
|
||||
func WithInitialWindowSize(s int32) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.InitialWindowSize = s
|
||||
}
|
||||
}
|
||||
|
||||
// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
|
||||
// The lower bound for window size is 64K and any value smaller than that will be ignored.
|
||||
func WithInitialConnWindowSize(s int32) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.InitialConnWindowSize = s
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
|
||||
//
|
||||
// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
|
||||
func WithMaxMsgSize(s int) DialOption {
|
||||
return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
|
||||
}
|
||||
|
||||
// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
|
||||
func WithDefaultCallOptions(cos ...CallOption) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.callOptions = append(o.callOptions, cos...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
|
||||
//
|
||||
// Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead.
|
||||
func WithCodec(c Codec) DialOption {
|
||||
return WithDefaultCallOptions(CallCustomCodec(c))
|
||||
}
|
||||
|
||||
// WithCompressor returns a DialOption which sets a Compressor to use for
|
||||
// message compression. It has lower priority than the compressor set by
|
||||
// the UseCompressor CallOption.
|
||||
//
|
||||
// Deprecated: use UseCompressor instead.
|
||||
func WithCompressor(cp Compressor) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.cp = cp
|
||||
}
|
||||
}
|
||||
|
||||
// WithDecompressor returns a DialOption which sets a Decompressor to use for
|
||||
// incoming message decompression. If incoming response messages are encoded
|
||||
// using the decompressor's Type(), it will be used. Otherwise, the message
|
||||
// encoding will be used to look up the compressor registered via
|
||||
// encoding.RegisterCompressor, which will then be used to decompress the
|
||||
// message. If no compressor is registered for the encoding, an Unimplemented
|
||||
// status error will be returned.
|
||||
//
|
||||
// Deprecated: use encoding.RegisterCompressor instead.
|
||||
func WithDecompressor(dc Decompressor) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.dc = dc
|
||||
}
|
||||
}
|
||||
|
||||
// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
|
||||
// Name resolver will be ignored if this DialOption is specified.
|
||||
//
|
||||
// Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
|
||||
func WithBalancer(b Balancer) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.balancerBuilder = &balancerWrapperBuilder{
|
||||
b: b,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithBalancerName sets the balancer that the ClientConn will be initialized
|
||||
// with. Balancer registered with balancerName will be used. This function
|
||||
// panics if no balancer was registered by balancerName.
|
||||
//
|
||||
// The balancer cannot be overridden by balancer option specified by service
|
||||
// config.
|
||||
//
|
||||
// This is an EXPERIMENTAL API.
|
||||
func WithBalancerName(balancerName string) DialOption {
|
||||
builder := balancer.Get(balancerName)
|
||||
if builder == nil {
|
||||
panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
|
||||
}
|
||||
return func(o *dialOptions) {
|
||||
o.balancerBuilder = builder
|
||||
}
|
||||
}
|
||||
|
||||
// withResolverBuilder is only for grpclb.
|
||||
func withResolverBuilder(b resolver.Builder) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.resolverBuilder = b
|
||||
}
|
||||
}
|
||||
|
||||
// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
|
||||
//
|
||||
// Deprecated: service config should be received through name resolver, as specified here.
|
||||
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
|
||||
func WithServiceConfig(c <-chan ServiceConfig) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.scChan = c
|
||||
}
|
||||
}
|
||||
|
||||
// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
|
||||
// when backing off after failed connection attempts.
|
||||
func WithBackoffMaxDelay(md time.Duration) DialOption {
|
||||
return WithBackoffConfig(BackoffConfig{MaxDelay: md})
|
||||
}
|
||||
|
||||
// WithBackoffConfig configures the dialer to use the provided backoff
|
||||
// parameters after connection failures.
|
||||
//
|
||||
// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
|
||||
// for use.
|
||||
func WithBackoffConfig(b BackoffConfig) DialOption {
|
||||
|
||||
return withBackoff(backoff.Exponential{
|
||||
MaxDelay: b.MaxDelay,
|
||||
})
|
||||
}
|
||||
|
||||
// withBackoff sets the backoff strategy used for connectRetryNum after a
|
||||
// failed connection attempt.
|
||||
//
|
||||
// This can be exported if arbitrary backoff strategies are allowed by gRPC.
|
||||
func withBackoff(bs backoff.Strategy) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.bs = bs
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
|
||||
// connection is up. Without this, Dial returns immediately and connecting the server
|
||||
// happens in background.
|
||||
func WithBlock() DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.block = true
|
||||
}
|
||||
}
|
||||
|
||||
// WithInsecure returns a DialOption which disables transport security for this ClientConn.
|
||||
// Note that transport security is required unless WithInsecure is set.
|
||||
func WithInsecure() DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.insecure = true
|
||||
}
|
||||
}
|
||||
|
||||
// WithTransportCredentials returns a DialOption which configures a
|
||||
// connection level security credentials (e.g., TLS/SSL).
|
||||
func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.TransportCredentials = creds
|
||||
}
|
||||
}
|
||||
|
||||
// WithPerRPCCredentials returns a DialOption which sets
|
||||
// credentials and places auth state on each outbound RPC.
|
||||
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
|
||||
// initially. This is valid if and only if WithBlock() is present.
|
||||
//
|
||||
// Deprecated: use DialContext and context.WithTimeout instead.
|
||||
func WithTimeout(d time.Duration) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.timeout = d
|
||||
}
|
||||
}
|
||||
|
||||
func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.Dialer = f
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
internal.WithContextDialer = withContextDialer
|
||||
internal.WithResolverBuilder = withResolverBuilder
|
||||
}
|
||||
|
||||
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
|
||||
// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
|
||||
// Temporary() method to decide if it should try to reconnect to the network address.
|
||||
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
|
||||
return withContextDialer(
|
||||
func(ctx context.Context, addr string) (net.Conn, error) {
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
return f(addr, deadline.Sub(time.Now()))
|
||||
}
|
||||
return f(addr, 0)
|
||||
})
|
||||
}
|
||||
|
||||
// WithStatsHandler returns a DialOption that specifies the stats handler
|
||||
// for all the RPCs and underlying network connections in this ClientConn.
|
||||
func WithStatsHandler(h stats.Handler) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.StatsHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
|
||||
// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
|
||||
// address and won't try to reconnect.
|
||||
// The default value of FailOnNonTempDialError is false.
|
||||
// This is an EXPERIMENTAL API.
|
||||
func FailOnNonTempDialError(f bool) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.FailOnNonTempDialError = f
|
||||
}
|
||||
}
|
||||
|
||||
// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
|
||||
func WithUserAgent(s string) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.UserAgent = s
|
||||
}
|
||||
}
|
||||
|
||||
// WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport.
|
||||
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.KeepaliveParams = kp
|
||||
}
|
||||
}
|
||||
|
||||
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
|
||||
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.unaryInt = f
|
||||
}
|
||||
}
|
||||
|
||||
// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
|
||||
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.streamInt = f
|
||||
}
|
||||
}
|
||||
|
||||
// WithAuthority returns a DialOption that specifies the value to be used as
|
||||
// the :authority pseudo-header. This value only works with WithInsecure and
|
||||
// has no effect if TransportCredentials are present.
|
||||
func WithAuthority(a string) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.Authority = a
|
||||
}
|
||||
}
|
||||
|
||||
// WithChannelzParentID returns a DialOption that specifies the channelz ID of current ClientConn's
|
||||
// parent. This function is used in nested channel creation (e.g. grpclb dial).
|
||||
func WithChannelzParentID(id int64) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.channelzParentID = id
|
||||
}
|
||||
}
|
||||
|
||||
// WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
|
||||
// service config provided by the resolver and provides a hint to the resolver
|
||||
// to not fetch service configs.
|
||||
func WithDisableServiceConfig() DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.disableServiceConfig = true
|
||||
}
|
||||
}
|
||||
|
||||
// Dial creates a client connection to the given target.
|
||||
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
||||
return DialContext(context.Background(), target, opts...)
|
||||
|
|
@ -458,23 +120,25 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
|||
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
|
||||
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
|
||||
cc := &ClientConn{
|
||||
target: target,
|
||||
csMgr: &connectivityStateManager{},
|
||||
conns: make(map[*addrConn]struct{}),
|
||||
|
||||
target: target,
|
||||
csMgr: &connectivityStateManager{},
|
||||
conns: make(map[*addrConn]struct{}),
|
||||
dopts: defaultDialOptions(),
|
||||
blockingpicker: newPickerWrapper(),
|
||||
czData: new(channelzData),
|
||||
}
|
||||
cc.retryThrottler.Store((*retryThrottler)(nil))
|
||||
cc.ctx, cc.cancel = context.WithCancel(context.Background())
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&cc.dopts)
|
||||
opt.apply(&cc.dopts)
|
||||
}
|
||||
|
||||
if channelz.IsOn() {
|
||||
if cc.dopts.channelzParentID != 0 {
|
||||
cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target)
|
||||
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
|
||||
} else {
|
||||
cc.channelzID = channelz.RegisterChannel(cc, 0, target)
|
||||
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -567,8 +231,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
|||
creds := cc.dopts.copts.TransportCredentials
|
||||
if creds != nil && creds.Info().ServerName != "" {
|
||||
cc.authority = creds.Info().ServerName
|
||||
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
|
||||
cc.authority = cc.dopts.copts.Authority
|
||||
} else if cc.dopts.insecure && cc.dopts.authority != "" {
|
||||
cc.authority = cc.dopts.authority
|
||||
} else {
|
||||
// Use endpoint from "scheme://authority/endpoint" as the default
|
||||
// authority for ClientConn.
|
||||
|
|
@ -620,6 +284,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
|||
s := cc.GetState()
|
||||
if s == connectivity.Ready {
|
||||
break
|
||||
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
|
||||
if err = cc.blockingpicker.connectionError(); err != nil {
|
||||
terr, ok := err.(interface{ Temporary() bool })
|
||||
if ok && !terr.Temporary() {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
if !cc.WaitForStateChange(ctx, s) {
|
||||
// ctx got timeout or canceled.
|
||||
|
|
@ -699,13 +370,10 @@ type ClientConn struct {
|
|||
preBalancerName string // previous balancer name.
|
||||
curAddresses []resolver.Address
|
||||
balancerWrapper *ccBalancerWrapper
|
||||
retryThrottler atomic.Value
|
||||
|
||||
channelzID int64 // channelz unique identification number
|
||||
czmu sync.RWMutex
|
||||
callsStarted int64
|
||||
callsSucceeded int64
|
||||
callsFailed int64
|
||||
lastCallStartedTime time.Time
|
||||
channelzID int64 // channelz unique identification number
|
||||
czData *channelzData
|
||||
}
|
||||
|
||||
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
|
||||
|
|
@ -830,8 +498,6 @@ func (cc *ClientConn) switchBalancer(name string) {
|
|||
if cc.balancerWrapper != nil {
|
||||
cc.balancerWrapper.close()
|
||||
}
|
||||
// Clear all stickiness state.
|
||||
cc.blockingpicker.clearStickinessState()
|
||||
|
||||
builder := balancer.Get(name)
|
||||
if builder == nil {
|
||||
|
|
@ -860,9 +526,11 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
|
|||
// Caller needs to make sure len(addrs) > 0.
|
||||
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
|
||||
ac := &addrConn{
|
||||
cc: cc,
|
||||
addrs: addrs,
|
||||
dopts: cc.dopts,
|
||||
cc: cc,
|
||||
addrs: addrs,
|
||||
dopts: cc.dopts,
|
||||
czData: new(channelzData),
|
||||
resetBackoff: make(chan struct{}),
|
||||
}
|
||||
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
|
||||
// Track ac in cc. This needs to be done before any getTransport(...) is called.
|
||||
|
|
@ -892,40 +560,34 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
|
|||
ac.tearDown(err)
|
||||
}
|
||||
|
||||
// ChannelzMetric returns ChannelInternalMetric of current ClientConn.
|
||||
// This is an EXPERIMENTAL API.
|
||||
func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
|
||||
state := cc.GetState()
|
||||
cc.czmu.RLock()
|
||||
defer cc.czmu.RUnlock()
|
||||
func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
|
||||
return &channelz.ChannelInternalMetric{
|
||||
State: state,
|
||||
State: cc.GetState(),
|
||||
Target: cc.target,
|
||||
CallsStarted: cc.callsStarted,
|
||||
CallsSucceeded: cc.callsSucceeded,
|
||||
CallsFailed: cc.callsFailed,
|
||||
LastCallStartedTimestamp: cc.lastCallStartedTime,
|
||||
CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
|
||||
CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
|
||||
CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
|
||||
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
|
||||
}
|
||||
}
|
||||
|
||||
// Target returns the target string of the ClientConn.
|
||||
// This is an EXPERIMENTAL API.
|
||||
func (cc *ClientConn) Target() string {
|
||||
return cc.target
|
||||
}
|
||||
|
||||
func (cc *ClientConn) incrCallsStarted() {
|
||||
cc.czmu.Lock()
|
||||
cc.callsStarted++
|
||||
// TODO(yuxuanli): will make this a time.Time pointer improve performance?
|
||||
cc.lastCallStartedTime = time.Now()
|
||||
cc.czmu.Unlock()
|
||||
atomic.AddInt64(&cc.czData.callsStarted, 1)
|
||||
atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func (cc *ClientConn) incrCallsSucceeded() {
|
||||
cc.czmu.Lock()
|
||||
cc.callsSucceeded++
|
||||
cc.czmu.Unlock()
|
||||
atomic.AddInt64(&cc.czData.callsSucceeded, 1)
|
||||
}
|
||||
|
||||
func (cc *ClientConn) incrCallsFailed() {
|
||||
cc.czmu.Lock()
|
||||
cc.callsFailed++
|
||||
cc.czmu.Unlock()
|
||||
atomic.AddInt64(&cc.czData.callsFailed, 1)
|
||||
}
|
||||
|
||||
// connect starts to creating transport and also starts the transport monitor
|
||||
|
|
@ -1012,8 +674,10 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
|
|||
return m
|
||||
}
|
||||
|
||||
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
|
||||
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
|
||||
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
|
||||
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
|
||||
FullMethodName: method,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, toRPCErr(err)
|
||||
}
|
||||
|
|
@ -1033,6 +697,19 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
|
|||
cc.mu.Lock()
|
||||
cc.scRaw = js
|
||||
cc.sc = sc
|
||||
|
||||
if sc.retryThrottling != nil {
|
||||
newThrottler := &retryThrottler{
|
||||
tokens: sc.retryThrottling.MaxTokens,
|
||||
max: sc.retryThrottling.MaxTokens,
|
||||
thresh: sc.retryThrottling.MaxTokens / 2,
|
||||
ratio: sc.retryThrottling.TokenRatio,
|
||||
}
|
||||
cc.retryThrottler.Store(newThrottler)
|
||||
} else {
|
||||
cc.retryThrottler.Store((*retryThrottler)(nil))
|
||||
}
|
||||
|
||||
if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
|
||||
if cc.curBalancerName == grpclbName {
|
||||
// If current balancer is grpclb, there's at least one grpclb
|
||||
|
|
@ -1047,17 +724,6 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
|
|||
}
|
||||
}
|
||||
|
||||
if envConfigStickinessOn {
|
||||
var newStickinessMDKey string
|
||||
if sc.stickinessMetadataKey != nil && *sc.stickinessMetadataKey != "" {
|
||||
newStickinessMDKey = *sc.stickinessMetadataKey
|
||||
}
|
||||
// newStickinessMDKey is "" if one of the following happens:
|
||||
// - stickinessMetadataKey is set to ""
|
||||
// - stickinessMetadataKey field doesn't exist in service config
|
||||
cc.blockingpicker.updateStickinessMDKey(strings.ToLower(newStickinessMDKey))
|
||||
}
|
||||
|
||||
cc.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1072,6 +738,24 @@ func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
|
|||
go r.resolveNow(o)
|
||||
}
|
||||
|
||||
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
|
||||
// them to attempt another connection immediately. It also resets the backoff
|
||||
// times used for subsequent attempts regardless of the current state.
|
||||
//
|
||||
// In general, this function should not be used. Typical service or network
|
||||
// outages result in a reasonable client reconnection strategy by default.
|
||||
// However, if a previously unavailable network becomes available, this may be
|
||||
// used to trigger an immediate reconnect.
|
||||
//
|
||||
// This API is EXPERIMENTAL.
|
||||
func (cc *ClientConn) ResetConnectBackoff() {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
for ac := range cc.conns {
|
||||
ac.resetConnectBackoff()
|
||||
}
|
||||
}
|
||||
|
||||
// Close tears down the ClientConn and all underlying connections.
|
||||
func (cc *ClientConn) Close() error {
|
||||
defer cc.cancel()
|
||||
|
|
@ -1140,12 +824,10 @@ type addrConn struct {
|
|||
// negotiations must complete.
|
||||
connectDeadline time.Time
|
||||
|
||||
channelzID int64 // channelz unique identification number
|
||||
czmu sync.RWMutex
|
||||
callsStarted int64
|
||||
callsSucceeded int64
|
||||
callsFailed int64
|
||||
lastCallStartedTime time.Time
|
||||
resetBackoff chan struct{}
|
||||
|
||||
channelzID int64 // channelz unique identification number
|
||||
czData *channelzData
|
||||
}
|
||||
|
||||
// adjustParams updates parameters used to create transports upon
|
||||
|
|
@ -1170,14 +852,6 @@ func (ac *addrConn) printf(format string, a ...interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// errorf records an error in ac's event log, unless ac has been closed.
|
||||
// REQUIRES ac.mu is held.
|
||||
func (ac *addrConn) errorf(format string, a ...interface{}) {
|
||||
if ac.events != nil {
|
||||
ac.events.Errorf(format, a...)
|
||||
}
|
||||
}
|
||||
|
||||
// resetTransport recreates a transport to the address for ac. The old
|
||||
// transport will close itself on error or when the clientconn is closed.
|
||||
// The created transport must receive initial settings frame from the server.
|
||||
|
|
@ -1208,6 +882,7 @@ func (ac *addrConn) resetTransport() error {
|
|||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||
ac.cc.mu.RUnlock()
|
||||
var backoffDeadline, connectDeadline time.Time
|
||||
var resetBackoff chan struct{}
|
||||
for connectRetryNum := 0; ; connectRetryNum++ {
|
||||
ac.mu.Lock()
|
||||
if ac.backoffDeadline.IsZero() {
|
||||
|
|
@ -1215,6 +890,7 @@ func (ac *addrConn) resetTransport() error {
|
|||
// or this is the first time this addrConn is trying to establish a
|
||||
// connection.
|
||||
backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
|
||||
resetBackoff = ac.resetBackoff
|
||||
// This will be the duration that dial gets to finish.
|
||||
dialDuration := getMinConnectTimeout()
|
||||
if backoffFor > dialDuration {
|
||||
|
|
@ -1248,7 +924,7 @@ func (ac *addrConn) resetTransport() error {
|
|||
copy(addrsIter, ac.addrs)
|
||||
copts := ac.dopts.copts
|
||||
ac.mu.Unlock()
|
||||
connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
|
||||
connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts, resetBackoff)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -1260,7 +936,7 @@ func (ac *addrConn) resetTransport() error {
|
|||
|
||||
// createTransport creates a connection to one of the backends in addrs.
|
||||
// It returns true if a connection was established.
|
||||
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
|
||||
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions, resetBackoff chan struct{}) (bool, error) {
|
||||
for i := ridx; i < len(addrs); i++ {
|
||||
addr := addrs[i]
|
||||
target := transport.TargetInfo{
|
||||
|
|
@ -1311,7 +987,7 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
|
|||
// Didn't receive server preface, must kill this new transport now.
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
|
||||
newTr.Close()
|
||||
break
|
||||
continue
|
||||
case <-ac.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
|
@ -1360,6 +1036,8 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
|
|||
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-resetBackoff:
|
||||
timer.Stop()
|
||||
case <-ac.ctx.Done():
|
||||
timer.Stop()
|
||||
return false, ac.ctx.Err()
|
||||
|
|
@ -1367,6 +1045,14 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (ac *addrConn) resetConnectBackoff() {
|
||||
ac.mu.Lock()
|
||||
close(ac.resetBackoff)
|
||||
ac.resetBackoff = make(chan struct{})
|
||||
ac.connectRetryNum = 0
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
// Run in a goroutine to track the error in transport and create the
|
||||
// new transport if an error happens. It returns when the channel is closing.
|
||||
func (ac *addrConn) transportMonitor() {
|
||||
|
|
@ -1447,46 +1133,6 @@ func (ac *addrConn) transportMonitor() {
|
|||
}
|
||||
}
|
||||
|
||||
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
||||
// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
|
||||
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
|
||||
for {
|
||||
ac.mu.Lock()
|
||||
switch {
|
||||
case ac.state == connectivity.Shutdown:
|
||||
if failfast || !hasBalancer {
|
||||
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
|
||||
err := ac.tearDownErr
|
||||
ac.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
return nil, errConnClosing
|
||||
case ac.state == connectivity.Ready:
|
||||
ct := ac.transport
|
||||
ac.mu.Unlock()
|
||||
return ct, nil
|
||||
case ac.state == connectivity.TransientFailure:
|
||||
if failfast || hasBalancer {
|
||||
ac.mu.Unlock()
|
||||
return nil, errConnUnavailable
|
||||
}
|
||||
}
|
||||
ready := ac.ready
|
||||
if ready == nil {
|
||||
ready = make(chan struct{})
|
||||
ac.ready = ready
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, toRPCErr(ctx.Err())
|
||||
// Wait until the new transport is ready or failed.
|
||||
case <-ready:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getReadyTransport returns the transport if ac's state is READY.
|
||||
// Otherwise it returns nil, false.
|
||||
// If ac's state is IDLE, it will trigger ac to connect.
|
||||
|
|
@ -1551,47 +1197,76 @@ func (ac *addrConn) getState() connectivity.State {
|
|||
return ac.state
|
||||
}
|
||||
|
||||
func (ac *addrConn) getCurAddr() (ret resolver.Address) {
|
||||
ac.mu.Lock()
|
||||
ret = ac.curAddr
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
|
||||
ac.mu.Lock()
|
||||
addr := ac.curAddr.Addr
|
||||
ac.mu.Unlock()
|
||||
state := ac.getState()
|
||||
ac.czmu.RLock()
|
||||
defer ac.czmu.RUnlock()
|
||||
return &channelz.ChannelInternalMetric{
|
||||
State: state,
|
||||
State: ac.getState(),
|
||||
Target: addr,
|
||||
CallsStarted: ac.callsStarted,
|
||||
CallsSucceeded: ac.callsSucceeded,
|
||||
CallsFailed: ac.callsFailed,
|
||||
LastCallStartedTimestamp: ac.lastCallStartedTime,
|
||||
CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
|
||||
CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
|
||||
CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
|
||||
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *addrConn) incrCallsStarted() {
|
||||
ac.czmu.Lock()
|
||||
ac.callsStarted++
|
||||
ac.lastCallStartedTime = time.Now()
|
||||
ac.czmu.Unlock()
|
||||
atomic.AddInt64(&ac.czData.callsStarted, 1)
|
||||
atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func (ac *addrConn) incrCallsSucceeded() {
|
||||
ac.czmu.Lock()
|
||||
ac.callsSucceeded++
|
||||
ac.czmu.Unlock()
|
||||
atomic.AddInt64(&ac.czData.callsSucceeded, 1)
|
||||
}
|
||||
|
||||
func (ac *addrConn) incrCallsFailed() {
|
||||
ac.czmu.Lock()
|
||||
ac.callsFailed++
|
||||
ac.czmu.Unlock()
|
||||
atomic.AddInt64(&ac.czData.callsFailed, 1)
|
||||
}
|
||||
|
||||
type retryThrottler struct {
|
||||
max float64
|
||||
thresh float64
|
||||
ratio float64
|
||||
|
||||
mu sync.Mutex
|
||||
tokens float64 // TODO(dfawley): replace with atomic and remove lock.
|
||||
}
|
||||
|
||||
// throttle subtracts a retry token from the pool and returns whether a retry
|
||||
// should be throttled (disallowed) based upon the retry throttling policy in
|
||||
// the service config.
|
||||
func (rt *retryThrottler) throttle() bool {
|
||||
if rt == nil {
|
||||
return false
|
||||
}
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.tokens--
|
||||
if rt.tokens < 0 {
|
||||
rt.tokens = 0
|
||||
}
|
||||
return rt.tokens <= rt.thresh
|
||||
}
|
||||
|
||||
func (rt *retryThrottler) successfulRPC() {
|
||||
if rt == nil {
|
||||
return
|
||||
}
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.tokens += rt.ratio
|
||||
if rt.tokens > rt.max {
|
||||
rt.tokens = rt.max
|
||||
}
|
||||
}
|
||||
|
||||
type channelzChannel struct {
|
||||
cc *ClientConn
|
||||
}
|
||||
|
||||
func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
|
||||
return c.cc.channelzMetric()
|
||||
}
|
||||
|
||||
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue