vendor dependencies

This commit is contained in:
Sergiusz Urbaniak 2019-04-24 11:06:03 +02:00
parent 604208ef4f
commit 72abf135d6
1156 changed files with 78178 additions and 105799 deletions

View file

@ -31,16 +31,6 @@ import (
// PluginName is the name reported in error metrics.
const PluginName = "buffered"
const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
// BatchConfig represents batching delegate audit backend configuration.
type BatchConfig struct {
// BufferSize defines a size of the buffering queue.
@ -57,19 +47,9 @@ type BatchConfig struct {
// ThrottleBurst defines the maximum number of requests sent to the delegate backend at the same moment in case
// the capacity defined by ThrottleQPS was not utilized.
ThrottleBurst int
}
// NewDefaultBatchConfig returns new Config objects populated by default values.
func NewDefaultBatchConfig() BatchConfig {
return BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
}
// Whether the delegate backend should be called asynchronously.
AsyncDelegate bool
}
type bufferedBackend struct {
@ -85,6 +65,9 @@ type bufferedBackend struct {
// Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed.
maxBatchWait time.Duration
// Whether the delegate backend should be called asynchronously.
asyncDelegate bool
// Channel to signal that the batching routine has processed all remaining events and exited.
// Once `shutdownCh` is closed no new events will be sent to the delegate backend.
shutdownCh chan struct{}
@ -113,6 +96,7 @@ func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend {
buffer: make(chan *auditinternal.Event, config.BufferSize),
maxBatchSize: config.MaxBatchSize,
maxBatchWait: config.MaxBatchWait,
asyncDelegate: config.AsyncDelegate,
shutdownCh: make(chan struct{}),
wg: sync.WaitGroup{},
throttle: throttle,
@ -169,8 +153,17 @@ func (b *bufferedBackend) Shutdown() {
// b.stopCh is closed, processIncomingEvents stops and closes the buffer.
func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
defer close(b.buffer)
t := time.NewTimer(b.maxBatchWait)
defer t.Stop()
var (
maxWaitChan <-chan time.Time
maxWaitTimer *time.Timer
)
// Only use max wait batching if batching is enabled.
if b.maxBatchSize > 1 {
maxWaitTimer = time.NewTimer(b.maxBatchWait)
maxWaitChan = maxWaitTimer.C
defer maxWaitTimer.Stop()
}
for {
func() {
@ -178,8 +171,10 @@ func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()
t.Reset(b.maxBatchWait)
b.processEvents(b.collectEvents(t.C, stopCh))
if b.maxBatchSize > 1 {
maxWaitTimer.Reset(b.maxBatchWait)
}
b.processEvents(b.collectEvents(maxWaitChan, stopCh))
}()
select {
@ -235,18 +230,28 @@ func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
b.throttle.Accept()
}
b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()
if b.asyncDelegate {
b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
} else {
func() {
defer runtime.HandleCrash()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
}
}
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) bool {
// The following mechanism is in place to support the situation when audit
// events are still coming after the backend was stopped.
var sendErr error
@ -274,9 +279,10 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
case b.buffer <- event:
default:
sendErr = fmt.Errorf("audit buffer queue blocked")
return
return true
}
}
return true
}
func (b *bufferedBackend) String() string {