mirror of
https://github.com/kubernetes-sigs/prometheus-adapter.git
synced 2026-04-06 17:57:51 +00:00
vendor: revendor
This commit is contained in:
parent
269295a414
commit
9f0440be0f
669 changed files with 58447 additions and 20021 deletions
169
vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go
generated
vendored
169
vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go
generated
vendored
|
|
@ -47,9 +47,9 @@ import (
|
|||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
fctypesv1a1 "k8s.io/api/flowcontrol/v1alpha1"
|
||||
fcclientv1a1 "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
|
||||
fclistersv1a1 "k8s.io/client-go/listers/flowcontrol/v1alpha1"
|
||||
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
||||
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
|
||||
flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta1"
|
||||
)
|
||||
|
||||
// This file contains a simple local (to the apiserver) controller
|
||||
|
|
@ -91,13 +91,13 @@ type configController struct {
|
|||
// objects need to be reprocessed.
|
||||
configQueue workqueue.RateLimitingInterface
|
||||
|
||||
plLister fclistersv1a1.PriorityLevelConfigurationLister
|
||||
plLister flowcontrollister.PriorityLevelConfigurationLister
|
||||
plInformerSynced cache.InformerSynced
|
||||
|
||||
fsLister fclistersv1a1.FlowSchemaLister
|
||||
fsLister flowcontrollister.FlowSchemaLister
|
||||
fsInformerSynced cache.InformerSynced
|
||||
|
||||
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface
|
||||
flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface
|
||||
|
||||
// serverConcurrencyLimit is the limit on the server's total
|
||||
// number of non-exempt requests being served at once. This comes
|
||||
|
|
@ -127,7 +127,7 @@ type configController struct {
|
|||
type priorityLevelState struct {
|
||||
// the API object or prototype prescribing this level. Nothing
|
||||
// reached through this pointer is mutable.
|
||||
pl *fctypesv1a1.PriorityLevelConfiguration
|
||||
pl *flowcontrol.PriorityLevelConfiguration
|
||||
|
||||
// qsCompleter holds the QueueSetCompleter derived from `config`
|
||||
// and `queues` if config is not exempt, nil otherwise.
|
||||
|
|
@ -153,7 +153,7 @@ type priorityLevelState struct {
|
|||
// NewTestableController is extra flexible to facilitate testing
|
||||
func newTestableController(
|
||||
informerFactory kubeinformers.SharedInformerFactory,
|
||||
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface,
|
||||
flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface,
|
||||
serverConcurrencyLimit int,
|
||||
requestWaitLimit time.Duration,
|
||||
obsPairGenerator metrics.TimedObserverPairGenerator,
|
||||
|
|
@ -178,7 +178,7 @@ func newTestableController(
|
|||
// config API objects.
|
||||
func (cfgCtlr *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) {
|
||||
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
|
||||
fci := informerFactory.Flowcontrol().V1alpha1()
|
||||
fci := informerFactory.Flowcontrol().V1beta1()
|
||||
pli := fci.PriorityLevelConfigurations()
|
||||
fsi := fci.FlowSchemas()
|
||||
cfgCtlr.plLister = pli.Lister()
|
||||
|
|
@ -187,13 +187,13 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube
|
|||
cfgCtlr.fsInformerSynced = fsi.Informer().HasSynced
|
||||
pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
pl := obj.(*fctypesv1a1.PriorityLevelConfiguration)
|
||||
pl := obj.(*flowcontrol.PriorityLevelConfiguration)
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
newPL := newObj.(*fctypesv1a1.PriorityLevelConfiguration)
|
||||
oldPL := oldObj.(*fctypesv1a1.PriorityLevelConfiguration)
|
||||
newPL := newObj.(*flowcontrol.PriorityLevelConfiguration)
|
||||
oldPL := oldObj.(*flowcontrol.PriorityLevelConfiguration)
|
||||
if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) {
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
|
|
@ -207,13 +207,13 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube
|
|||
}})
|
||||
fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
fs := obj.(*fctypesv1a1.FlowSchema)
|
||||
fs := obj.(*flowcontrol.FlowSchema)
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
newFS := newObj.(*fctypesv1a1.FlowSchema)
|
||||
oldFS := oldObj.(*fctypesv1a1.FlowSchema)
|
||||
newFS := newObj.(*flowcontrol.FlowSchema)
|
||||
oldFS := oldObj.(*flowcontrol.FlowSchema)
|
||||
if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) {
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
|
|
@ -332,14 +332,14 @@ type cfgMeal struct {
|
|||
|
||||
// A buffered set of status updates for a FlowSchema
|
||||
type fsStatusUpdate struct {
|
||||
flowSchema *fctypesv1a1.FlowSchema
|
||||
condition fctypesv1a1.FlowSchemaCondition
|
||||
oldValue fctypesv1a1.FlowSchemaCondition
|
||||
flowSchema *flowcontrol.FlowSchema
|
||||
condition flowcontrol.FlowSchemaCondition
|
||||
oldValue flowcontrol.FlowSchemaCondition
|
||||
}
|
||||
|
||||
// digestConfigObjects is given all the API objects that configure
|
||||
// cfgCtlr and writes its consequent new configState.
|
||||
func (cfgCtlr *configController) digestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) error {
|
||||
func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) error {
|
||||
fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs)
|
||||
var errs []error
|
||||
for _, fsu := range fsStatusUpdates {
|
||||
|
|
@ -360,7 +360,7 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*fctypesv1a1.Prior
|
|||
return apierrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) []fsStatusUpdate {
|
||||
func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) []fsStatusUpdate {
|
||||
cfgCtlr.lock.Lock()
|
||||
defer cfgCtlr.lock.Unlock()
|
||||
meal := cfgMeal{
|
||||
|
|
@ -390,7 +390,7 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a
|
|||
|
||||
// Digest the new set of PriorityLevelConfiguration objects.
|
||||
// Pretend broken ones do not exist.
|
||||
func (meal *cfgMeal) digestNewPLsLocked(newPLs []*fctypesv1a1.PriorityLevelConfiguration) {
|
||||
func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfiguration) {
|
||||
for _, pl := range newPLs {
|
||||
state := meal.cfgCtlr.priorityLevelStates[pl.Name]
|
||||
if state == nil {
|
||||
|
|
@ -411,8 +411,8 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*fctypesv1a1.PriorityLevelConfi
|
|||
if state.pl.Spec.Limited != nil {
|
||||
meal.shareSum += float64(state.pl.Spec.Limited.AssuredConcurrencyShares)
|
||||
}
|
||||
meal.haveExemptPL = meal.haveExemptPL || pl.Name == fctypesv1a1.PriorityLevelConfigurationNameExempt
|
||||
meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == fctypesv1a1.PriorityLevelConfigurationNameCatchAll
|
||||
meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt
|
||||
meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -423,9 +423,9 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*fctypesv1a1.PriorityLevelConfi
|
|||
// reflect this. This function also adds any missing mandatory
|
||||
// FlowSchema objects. The given objects must all have distinct
|
||||
// names.
|
||||
func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) {
|
||||
func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*flowcontrol.FlowSchema) {
|
||||
fsSeq := make(apihelpers.FlowSchemaSequence, 0, len(newFSs))
|
||||
fsMap := make(map[string]*fctypesv1a1.FlowSchema, len(newFSs))
|
||||
fsMap := make(map[string]*flowcontrol.FlowSchema, len(newFSs))
|
||||
var haveExemptFS, haveCatchAllFS bool
|
||||
for i, fs := range newFSs {
|
||||
otherFS := fsMap[fs.Name]
|
||||
|
|
@ -448,8 +448,8 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) {
|
|||
continue
|
||||
}
|
||||
fsSeq = append(fsSeq, newFSs[i])
|
||||
haveExemptFS = haveExemptFS || fs.Name == fctypesv1a1.FlowSchemaNameExempt
|
||||
haveCatchAllFS = haveCatchAllFS || fs.Name == fctypesv1a1.FlowSchemaNameCatchAll
|
||||
haveExemptFS = haveExemptFS || fs.Name == flowcontrol.FlowSchemaNameExempt
|
||||
haveCatchAllFS = haveCatchAllFS || fs.Name == flowcontrol.FlowSchemaNameCatchAll
|
||||
}
|
||||
// sort into the order to be used for matching
|
||||
sort.Sort(fsSeq)
|
||||
|
|
@ -481,7 +481,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
|
|||
// Still desired and already updated
|
||||
continue
|
||||
}
|
||||
if plName == fctypesv1a1.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == fctypesv1a1.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL {
|
||||
if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL {
|
||||
// BTW, we know the Spec has not changed because the
|
||||
// mandatory objects have immutable Specs
|
||||
klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName)
|
||||
|
|
@ -513,8 +513,8 @@ func (meal *cfgMeal) processOldPLsLocked() {
|
|||
// regular way.
|
||||
meal.shareSum += float64(plState.pl.Spec.Limited.AssuredConcurrencyShares)
|
||||
}
|
||||
meal.haveExemptPL = meal.haveExemptPL || plName == fctypesv1a1.PriorityLevelConfigurationNameExempt
|
||||
meal.haveCatchAllPL = meal.haveCatchAllPL || plName == fctypesv1a1.PriorityLevelConfigurationNameCatchAll
|
||||
meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt
|
||||
meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll
|
||||
meal.newPLStates[plName] = plState
|
||||
}
|
||||
}
|
||||
|
|
@ -548,18 +548,18 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
|
|||
// given priority level configuration. Returns nil if that config
|
||||
// does not call for limiting. Returns nil and an error if the given
|
||||
// object is malformed in a way that is a problem for this package.
|
||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
if (pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
|
||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
|
||||
return nil, errors.New("broken union structure at the top")
|
||||
}
|
||||
if (pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt) != (pl.Name == fctypesv1a1.PriorityLevelConfigurationNameExempt) {
|
||||
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) {
|
||||
// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
|
||||
return nil, errors.New("non-alignment between name and type")
|
||||
}
|
||||
if pl.Spec.Limited == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if (pl.Spec.Limited.LimitResponse.Type == fctypesv1a1.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
|
||||
if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
|
||||
return nil, errors.New("broken union structure for limit response")
|
||||
}
|
||||
qcAPI := pl.Spec.Limited.LimitResponse.Queuing
|
||||
|
|
@ -585,17 +585,17 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fcty
|
|||
return qsc, err
|
||||
}
|
||||
|
||||
func (meal *cfgMeal) presyncFlowSchemaStatus(fs *fctypesv1a1.FlowSchema, isDangling bool, plName string) {
|
||||
danglingCondition := apihelpers.GetFlowSchemaConditionByType(fs, fctypesv1a1.FlowSchemaConditionDangling)
|
||||
func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangling bool, plName string) {
|
||||
danglingCondition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
|
||||
if danglingCondition == nil {
|
||||
danglingCondition = &fctypesv1a1.FlowSchemaCondition{
|
||||
Type: fctypesv1a1.FlowSchemaConditionDangling,
|
||||
danglingCondition = &flowcontrol.FlowSchemaCondition{
|
||||
Type: flowcontrol.FlowSchemaConditionDangling,
|
||||
}
|
||||
}
|
||||
desiredStatus := fctypesv1a1.ConditionFalse
|
||||
desiredStatus := flowcontrol.ConditionFalse
|
||||
var desiredReason, desiredMessage string
|
||||
if isDangling {
|
||||
desiredStatus = fctypesv1a1.ConditionTrue
|
||||
desiredStatus = flowcontrol.ConditionTrue
|
||||
desiredReason = "NotFound"
|
||||
desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q but there is no such object", plName)
|
||||
} else {
|
||||
|
|
@ -607,8 +607,8 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *fctypesv1a1.FlowSchema, isDangl
|
|||
}
|
||||
meal.fsStatusUpdates = append(meal.fsStatusUpdates, fsStatusUpdate{
|
||||
flowSchema: fs,
|
||||
condition: fctypesv1a1.FlowSchemaCondition{
|
||||
Type: fctypesv1a1.FlowSchemaConditionDangling,
|
||||
condition: flowcontrol.FlowSchemaCondition{
|
||||
Type: flowcontrol.FlowSchemaConditionDangling,
|
||||
Status: desiredStatus,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: desiredReason,
|
||||
|
|
@ -619,7 +619,7 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *fctypesv1a1.FlowSchema, isDangl
|
|||
|
||||
// imaginePL adds a priority level based on one of the mandatory ones
|
||||
// that does not actually exist (right now) as a real API object.
|
||||
func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
|
||||
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
|
||||
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
|
||||
obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name})
|
||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair)
|
||||
|
|
@ -651,48 +651,59 @@ func (immediateRequest) Finish(execute func()) bool {
|
|||
// The returned bool indicates whether the request is exempt from
|
||||
// limitation. The startWaitingTime is when the request started
|
||||
// waiting in its queue, or `Time{}` if this did not happen.
|
||||
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
|
||||
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
|
||||
klog.V(7).Infof("startRequest(%#+v)", rd)
|
||||
cfgCtlr.lock.Lock()
|
||||
defer cfgCtlr.lock.Unlock()
|
||||
var selectedFlowSchema *flowcontrol.FlowSchema
|
||||
for _, fs := range cfgCtlr.flowSchemas {
|
||||
if matchesFlowSchema(rd, fs) {
|
||||
plName := fs.Spec.PriorityLevelConfiguration.Name
|
||||
plState := cfgCtlr.priorityLevelStates[plName]
|
||||
if plState.pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt {
|
||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, fs.Name, fs.Spec.DistinguisherMethod, plName)
|
||||
return fs, plState.pl, true, immediateRequest{}, time.Time{}
|
||||
}
|
||||
var numQueues int32
|
||||
if plState.pl.Spec.Limited.LimitResponse.Type == fctypesv1a1.LimitResponseTypeQueue {
|
||||
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
||||
|
||||
}
|
||||
var flowDistinguisher string
|
||||
var hashValue uint64
|
||||
if numQueues > 1 {
|
||||
flowDistinguisher = computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod)
|
||||
hashValue = hashFlowID(fs.Name, flowDistinguisher)
|
||||
}
|
||||
startWaitingTime = time.Now()
|
||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
|
||||
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
||||
if idle {
|
||||
cfgCtlr.maybeReapLocked(plName, plState)
|
||||
}
|
||||
return fs, plState.pl, false, req, startWaitingTime
|
||||
selectedFlowSchema = fs
|
||||
break
|
||||
}
|
||||
}
|
||||
// This can never happen because every configState has a
|
||||
// FlowSchema that matches everything. If somehow control reaches
|
||||
// here, panic with some relevant information.
|
||||
var catchAll *fctypesv1a1.FlowSchema
|
||||
for _, fs := range cfgCtlr.flowSchemas {
|
||||
if fs.Name == fctypesv1a1.FlowSchemaNameCatchAll {
|
||||
catchAll = fs
|
||||
if selectedFlowSchema == nil {
|
||||
// This should never happen. If the requestDigest's User is a part of
|
||||
// system:authenticated or system:unauthenticated, the catch-all flow
|
||||
// schema should match it. However, if that invariant somehow fails,
|
||||
// fallback to the catch-all flow schema anyway.
|
||||
for _, fs := range cfgCtlr.flowSchemas {
|
||||
if fs.Name == flowcontrol.FlowSchemaNameCatchAll {
|
||||
selectedFlowSchema = fs
|
||||
break
|
||||
}
|
||||
}
|
||||
if selectedFlowSchema == nil {
|
||||
// This should absolutely never, ever happen! APF guarantees two
|
||||
// undeletable flow schemas at all times: an exempt flow schema and a
|
||||
// catch-all flow schema.
|
||||
panic(fmt.Sprintf("no fallback catch-all flow schema found for request %#+v and user %#+v", rd.RequestInfo, rd.User))
|
||||
}
|
||||
klog.Warningf("no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema", rd.RequestInfo, rd.User, fcfmt.Fmt(selectedFlowSchema))
|
||||
}
|
||||
panic(fmt.Sprintf("No match; rd=%#+v, catchAll=%s", rd, fcfmt.Fmt(catchAll)))
|
||||
plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
|
||||
plState := cfgCtlr.priorityLevelStates[plName]
|
||||
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
|
||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
|
||||
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
|
||||
}
|
||||
var numQueues int32
|
||||
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
|
||||
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
||||
}
|
||||
var flowDistinguisher string
|
||||
var hashValue uint64
|
||||
if numQueues > 1 {
|
||||
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
|
||||
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
|
||||
}
|
||||
startWaitingTime = time.Now()
|
||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
|
||||
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
||||
if idle {
|
||||
cfgCtlr.maybeReapLocked(plName, plState)
|
||||
}
|
||||
return selectedFlowSchema, plState.pl, false, req, startWaitingTime
|
||||
}
|
||||
|
||||
// Call this after getting a clue that the given priority level is undesired and idle
|
||||
|
|
@ -726,14 +737,14 @@ func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorit
|
|||
}
|
||||
|
||||
// computeFlowDistinguisher extracts the flow distinguisher according to the given method
|
||||
func computeFlowDistinguisher(rd RequestDigest, method *fctypesv1a1.FlowDistinguisherMethod) string {
|
||||
func computeFlowDistinguisher(rd RequestDigest, method *flowcontrol.FlowDistinguisherMethod) string {
|
||||
if method == nil {
|
||||
return ""
|
||||
}
|
||||
switch method.Type {
|
||||
case fctypesv1a1.FlowDistinguisherMethodByUserType:
|
||||
case flowcontrol.FlowDistinguisherMethodByUserType:
|
||||
return rd.User.GetName()
|
||||
case fctypesv1a1.FlowDistinguisherMethodByNamespaceType:
|
||||
case flowcontrol.FlowDistinguisherMethodByNamespaceType:
|
||||
return rd.RequestInfo.Namespace
|
||||
default:
|
||||
// this line shall never reach
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue