notify/stream_flow.go

166 lines
3.0 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"sync"
)
type streamFlowController struct {
mu sync.Mutex
queue []*streamFlowRequest
inFlightBytes int
inFlightChunks int
windowBytes int
maxChunks int
}
type streamFlowRequest struct {
size int
ready chan struct{}
admitted bool
}
func newStreamFlowController(cfg streamConfig) *streamFlowController {
cfg = normalizeStreamConfig(cfg)
return &streamFlowController{
windowBytes: cfg.OutboundWindowBytes,
maxChunks: cfg.OutboundMaxInFlightChunks,
}
}
func (c *streamFlowController) applyConfig(cfg streamConfig) {
if c == nil {
return
}
cfg = normalizeStreamConfig(cfg)
c.mu.Lock()
c.windowBytes = cfg.OutboundWindowBytes
c.maxChunks = cfg.OutboundMaxInFlightChunks
c.drainLocked()
c.mu.Unlock()
}
func (c *streamFlowController) acquire(ctx context.Context, size int) (func(), error) {
if c == nil || size <= 0 {
return func() {}, nil
}
if ctx == nil {
ctx = context.Background()
}
req := &streamFlowRequest{
size: size,
ready: make(chan struct{}),
}
c.mu.Lock()
c.queue = append(c.queue, req)
c.drainLocked()
c.mu.Unlock()
select {
case <-req.ready:
released := false
return func() {
c.mu.Lock()
if released {
c.mu.Unlock()
return
}
released = true
c.inFlightBytes -= size
if c.inFlightBytes < 0 {
c.inFlightBytes = 0
}
if c.inFlightChunks > 0 {
c.inFlightChunks--
}
c.drainLocked()
c.mu.Unlock()
}, nil
case <-ctx.Done():
c.mu.Lock()
if req.admitted {
c.mu.Unlock()
released := false
return func() {
c.mu.Lock()
if released {
c.mu.Unlock()
return
}
released = true
c.inFlightBytes -= size
if c.inFlightBytes < 0 {
c.inFlightBytes = 0
}
if c.inFlightChunks > 0 {
c.inFlightChunks--
}
c.drainLocked()
c.mu.Unlock()
}, nil
}
c.removeLocked(req)
c.drainLocked()
c.mu.Unlock()
return nil, ctx.Err()
}
}
func (c *streamFlowController) removeLocked(req *streamFlowRequest) {
if c == nil || req == nil {
return
}
for i, item := range c.queue {
if item != req {
continue
}
copy(c.queue[i:], c.queue[i+1:])
c.queue[len(c.queue)-1] = nil
c.queue = c.queue[:len(c.queue)-1]
return
}
}
func (c *streamFlowController) drainLocked() {
if c == nil {
return
}
for len(c.queue) > 0 {
req := c.queue[0]
if req == nil {
c.queue = c.queue[1:]
continue
}
if c.maxChunks > 0 && c.inFlightChunks >= c.maxChunks {
return
}
if !c.canAdmitLocked(req.size) {
return
}
copy(c.queue[0:], c.queue[1:])
c.queue[len(c.queue)-1] = nil
c.queue = c.queue[:len(c.queue)-1]
req.admitted = true
c.inFlightBytes += req.size
c.inFlightChunks++
close(req.ready)
}
}
func (c *streamFlowController) canAdmitLocked(size int) bool {
if c == nil {
return true
}
if size <= 0 {
return true
}
if c.windowBytes <= 0 {
return true
}
if c.inFlightBytes+size <= c.windowBytes {
return true
}
return c.inFlightBytes == 0 && c.inFlightChunks == 0
}