- make stream fast path honor adaptive soft payload limits end-to-end - split oversized fast-stream payloads into sequential frames before batching - use adaptive soft cap when encoding stream batch payloads - move timeout-like error detection into production code for adaptive tx - tune notify FrameReader read size explicitly to avoid throughput regression - drop local stario replace and depend on released b612.me/stario v0.1.1
384 lines
10 KiB
Go
384 lines
10 KiB
Go
package notify
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
bulkAdaptiveSoftPayloadMinBytes = 256 * 1024
|
|
bulkAdaptiveSoftPayloadFallbackBytes = 2 * 1024 * 1024
|
|
bulkAdaptiveSoftPayloadStartBytes = bulkFastBatchMaxPlainBytes
|
|
bulkAdaptiveSoftPayloadTargetFlush = 4 * time.Millisecond
|
|
bulkAdaptiveSoftPayloadSlowFlush = 16 * time.Millisecond
|
|
bulkAdaptiveSoftPayloadMinSampleBytes = 256 * 1024
|
|
bulkAdaptiveSoftPayloadGrowSuccesses = 8
|
|
|
|
streamAdaptiveSoftPayloadMinBytes = 256 * 1024
|
|
streamAdaptiveSoftPayloadFallbackBytes = streamBatchMaxPayloadBytes
|
|
streamAdaptiveSoftPayloadStartBytes = streamBatchMaxPayloadBytes
|
|
streamAdaptiveSoftPayloadTargetFlush = 4 * time.Millisecond
|
|
streamAdaptiveSoftPayloadSlowFlush = 16 * time.Millisecond
|
|
streamAdaptiveSoftPayloadMinSampleBytes = 64 * 1024
|
|
streamAdaptiveSoftPayloadGrowSuccesses = 8
|
|
|
|
streamAdaptiveWaitThresholdMinBytes = 32 * 1024
|
|
streamAdaptiveFlushDelayMid = 25 * time.Microsecond
|
|
)
|
|
|
|
var bulkAdaptiveSoftPayloadSteps = [...]int{
|
|
256 * 1024,
|
|
512 * 1024,
|
|
1024 * 1024,
|
|
2 * 1024 * 1024,
|
|
4 * 1024 * 1024,
|
|
bulkFastBatchMaxPlainBytes,
|
|
}
|
|
|
|
var streamAdaptiveSoftPayloadSteps = [...]int{
|
|
256 * 1024,
|
|
512 * 1024,
|
|
1024 * 1024,
|
|
streamBatchMaxPayloadBytes,
|
|
}
|
|
|
|
type adaptiveTxState struct {
|
|
mu sync.Mutex
|
|
|
|
bulkSoftPayloadBytes int
|
|
bulkGoodputBytesPerS float64
|
|
bulkGrowStreak int
|
|
|
|
streamSoftPayloadBytes int
|
|
streamGoodputBytesPerS float64
|
|
streamGrowStreak int
|
|
}
|
|
|
|
func (b *transportBinding) bulkAdaptiveSoftPayloadBytesSnapshot() int {
|
|
if b == nil {
|
|
return bulkAdaptiveSoftPayloadFallbackBytes
|
|
}
|
|
return b.adaptiveTx.bulkSoftPayloadBytesSnapshot()
|
|
}
|
|
|
|
func (b *transportBinding) observeBulkAdaptivePayloadWrite(payloadBytes int, elapsed time.Duration, timeout time.Duration, err error) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.adaptiveTx.observeBulkPayloadWrite(payloadBytes, elapsed, timeout, err)
|
|
}
|
|
|
|
func (b *transportBinding) streamAdaptiveSoftPayloadBytesSnapshot() int {
|
|
if b == nil {
|
|
return streamAdaptiveSoftPayloadFallbackBytes
|
|
}
|
|
return b.adaptiveTx.streamSoftPayloadBytesSnapshot()
|
|
}
|
|
|
|
func (b *transportBinding) streamAdaptiveWaitThresholdBytesSnapshot() int {
|
|
if b == nil {
|
|
return streamBatchWaitThreshold
|
|
}
|
|
return b.adaptiveTx.streamWaitThresholdBytesSnapshot()
|
|
}
|
|
|
|
func (b *transportBinding) streamAdaptiveFlushDelaySnapshot() time.Duration {
|
|
if b == nil {
|
|
return streamBatchMaxFlushDelay
|
|
}
|
|
return b.adaptiveTx.streamFlushDelaySnapshot()
|
|
}
|
|
|
|
func (b *transportBinding) observeStreamAdaptivePayloadWrite(payloadBytes int, elapsed time.Duration, timeout time.Duration, err error) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
b.adaptiveTx.observeStreamPayloadWrite(payloadBytes, elapsed, timeout, err)
|
|
}
|
|
|
|
func (s *adaptiveTxState) bulkSoftPayloadBytesSnapshot() int {
|
|
if s == nil {
|
|
return bulkAdaptiveSoftPayloadStartBytes
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.bulkSoftPayloadBytesLocked()
|
|
}
|
|
|
|
func (s *adaptiveTxState) observeBulkPayloadWrite(payloadBytes int, elapsed time.Duration, timeout time.Duration, err error) {
|
|
if s == nil || payloadBytes <= 0 {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
current := s.bulkSoftPayloadBytesLocked()
|
|
target, hasSample := s.observeBulkGoodputLocked(payloadBytes, elapsed)
|
|
nearTimeout := timeout > 0 && elapsed >= (timeout*3)/4
|
|
if isTimeoutLikeError(err) || nearTimeout {
|
|
s.bulkGrowStreak = 0
|
|
if hasSample && target < current {
|
|
s.bulkSoftPayloadBytes = target
|
|
return
|
|
}
|
|
s.bulkSoftPayloadBytes = previousBulkAdaptiveSoftPayloadStep(current)
|
|
return
|
|
}
|
|
if err != nil {
|
|
s.bulkGrowStreak = 0
|
|
return
|
|
}
|
|
if !hasSample {
|
|
return
|
|
}
|
|
if elapsed >= bulkAdaptiveSoftPayloadSlowFlush {
|
|
s.bulkGrowStreak = 0
|
|
if target < current {
|
|
s.bulkSoftPayloadBytes = target
|
|
return
|
|
}
|
|
s.bulkSoftPayloadBytes = previousBulkAdaptiveSoftPayloadStep(current)
|
|
return
|
|
}
|
|
if target > current {
|
|
s.bulkGrowStreak++
|
|
if s.bulkGrowStreak >= bulkAdaptiveSoftPayloadGrowSuccesses {
|
|
s.bulkSoftPayloadBytes = nextBulkAdaptiveSoftPayloadStep(current, target)
|
|
s.bulkGrowStreak = 0
|
|
}
|
|
return
|
|
}
|
|
if target < current && elapsed >= bulkAdaptiveSoftPayloadTargetFlush*2 {
|
|
s.bulkSoftPayloadBytes = target
|
|
s.bulkGrowStreak = 0
|
|
return
|
|
}
|
|
s.bulkGrowStreak = 0
|
|
}
|
|
|
|
func (s *adaptiveTxState) bulkSoftPayloadBytesLocked() int {
|
|
if s.bulkSoftPayloadBytes == 0 {
|
|
s.bulkSoftPayloadBytes = bulkAdaptiveSoftPayloadStartBytes
|
|
}
|
|
return normalizeBulkAdaptiveSoftPayloadBytes(s.bulkSoftPayloadBytes)
|
|
}
|
|
|
|
func (s *adaptiveTxState) streamSoftPayloadBytesSnapshot() int {
|
|
if s == nil {
|
|
return streamAdaptiveSoftPayloadStartBytes
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.streamSoftPayloadBytesLocked()
|
|
}
|
|
|
|
func (s *adaptiveTxState) streamWaitThresholdBytesSnapshot() int {
|
|
if s == nil {
|
|
return streamBatchWaitThreshold
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return streamAdaptiveWaitThresholdBytesForSoftPayload(s.streamSoftPayloadBytesLocked())
|
|
}
|
|
|
|
func (s *adaptiveTxState) streamFlushDelaySnapshot() time.Duration {
|
|
if s == nil {
|
|
return streamBatchMaxFlushDelay
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return streamAdaptiveFlushDelayForSoftPayload(s.streamSoftPayloadBytesLocked())
|
|
}
|
|
|
|
func (s *adaptiveTxState) observeStreamPayloadWrite(payloadBytes int, elapsed time.Duration, timeout time.Duration, err error) {
|
|
if s == nil || payloadBytes <= 0 {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
current := s.streamSoftPayloadBytesLocked()
|
|
target, hasSample := s.observeStreamGoodputLocked(payloadBytes, elapsed)
|
|
nearTimeout := timeout > 0 && elapsed >= (timeout*3)/4
|
|
if isTimeoutLikeError(err) || nearTimeout {
|
|
s.streamGrowStreak = 0
|
|
if hasSample && target < current {
|
|
s.streamSoftPayloadBytes = target
|
|
return
|
|
}
|
|
s.streamSoftPayloadBytes = previousStreamAdaptiveSoftPayloadStep(current)
|
|
return
|
|
}
|
|
if err != nil {
|
|
s.streamGrowStreak = 0
|
|
return
|
|
}
|
|
if !hasSample {
|
|
return
|
|
}
|
|
if elapsed >= streamAdaptiveSoftPayloadSlowFlush {
|
|
s.streamGrowStreak = 0
|
|
if target < current {
|
|
s.streamSoftPayloadBytes = target
|
|
return
|
|
}
|
|
s.streamSoftPayloadBytes = previousStreamAdaptiveSoftPayloadStep(current)
|
|
return
|
|
}
|
|
if target > current {
|
|
s.streamGrowStreak++
|
|
if s.streamGrowStreak >= streamAdaptiveSoftPayloadGrowSuccesses {
|
|
s.streamSoftPayloadBytes = nextStreamAdaptiveSoftPayloadStep(current, target)
|
|
s.streamGrowStreak = 0
|
|
}
|
|
return
|
|
}
|
|
if target < current && elapsed >= streamAdaptiveSoftPayloadTargetFlush*2 {
|
|
s.streamSoftPayloadBytes = target
|
|
s.streamGrowStreak = 0
|
|
return
|
|
}
|
|
s.streamGrowStreak = 0
|
|
}
|
|
|
|
func (s *adaptiveTxState) streamSoftPayloadBytesLocked() int {
|
|
if s.streamSoftPayloadBytes == 0 {
|
|
s.streamSoftPayloadBytes = streamAdaptiveSoftPayloadStartBytes
|
|
}
|
|
return normalizeStreamAdaptiveSoftPayloadBytes(s.streamSoftPayloadBytes)
|
|
}
|
|
|
|
func (s *adaptiveTxState) observeBulkGoodputLocked(payloadBytes int, elapsed time.Duration) (int, bool) {
|
|
if payloadBytes < bulkAdaptiveSoftPayloadMinSampleBytes || elapsed <= 0 {
|
|
return 0, false
|
|
}
|
|
sample := float64(payloadBytes) / elapsed.Seconds()
|
|
if sample <= 0 {
|
|
return 0, false
|
|
}
|
|
if s.bulkGoodputBytesPerS <= 0 {
|
|
s.bulkGoodputBytesPerS = sample
|
|
} else {
|
|
const alpha = 0.25
|
|
s.bulkGoodputBytesPerS = s.bulkGoodputBytesPerS*(1-alpha) + sample*alpha
|
|
}
|
|
target := int(s.bulkGoodputBytesPerS * bulkAdaptiveSoftPayloadTargetFlush.Seconds())
|
|
return normalizeBulkAdaptiveSoftPayloadBytes(target), true
|
|
}
|
|
|
|
func (s *adaptiveTxState) observeStreamGoodputLocked(payloadBytes int, elapsed time.Duration) (int, bool) {
|
|
if payloadBytes < streamAdaptiveSoftPayloadMinSampleBytes || elapsed <= 0 {
|
|
return 0, false
|
|
}
|
|
sample := float64(payloadBytes) / elapsed.Seconds()
|
|
if sample <= 0 {
|
|
return 0, false
|
|
}
|
|
if s.streamGoodputBytesPerS <= 0 {
|
|
s.streamGoodputBytesPerS = sample
|
|
} else {
|
|
const alpha = 0.25
|
|
s.streamGoodputBytesPerS = s.streamGoodputBytesPerS*(1-alpha) + sample*alpha
|
|
}
|
|
target := int(s.streamGoodputBytesPerS * streamAdaptiveSoftPayloadTargetFlush.Seconds())
|
|
return normalizeStreamAdaptiveSoftPayloadBytes(target), true
|
|
}
|
|
|
|
func normalizeBulkAdaptiveSoftPayloadBytes(size int) int {
|
|
if size <= bulkAdaptiveSoftPayloadMinBytes {
|
|
return bulkAdaptiveSoftPayloadMinBytes
|
|
}
|
|
for _, step := range bulkAdaptiveSoftPayloadSteps {
|
|
if size <= step {
|
|
return step
|
|
}
|
|
}
|
|
return bulkAdaptiveSoftPayloadStartBytes
|
|
}
|
|
|
|
func previousBulkAdaptiveSoftPayloadStep(current int) int {
|
|
current = normalizeBulkAdaptiveSoftPayloadBytes(current)
|
|
for index := len(bulkAdaptiveSoftPayloadSteps) - 1; index >= 0; index-- {
|
|
step := bulkAdaptiveSoftPayloadSteps[index]
|
|
if current > step {
|
|
return step
|
|
}
|
|
}
|
|
return bulkAdaptiveSoftPayloadMinBytes
|
|
}
|
|
|
|
func nextBulkAdaptiveSoftPayloadStep(current int, target int) int {
|
|
current = normalizeBulkAdaptiveSoftPayloadBytes(current)
|
|
target = normalizeBulkAdaptiveSoftPayloadBytes(target)
|
|
for _, step := range bulkAdaptiveSoftPayloadSteps {
|
|
if step > current {
|
|
if step > target {
|
|
return target
|
|
}
|
|
return step
|
|
}
|
|
}
|
|
return bulkAdaptiveSoftPayloadStartBytes
|
|
}
|
|
|
|
func normalizeStreamAdaptiveSoftPayloadBytes(size int) int {
|
|
if size <= streamAdaptiveSoftPayloadMinBytes {
|
|
return streamAdaptiveSoftPayloadMinBytes
|
|
}
|
|
for _, step := range streamAdaptiveSoftPayloadSteps {
|
|
if size <= step {
|
|
return step
|
|
}
|
|
}
|
|
return streamAdaptiveSoftPayloadStartBytes
|
|
}
|
|
|
|
func previousStreamAdaptiveSoftPayloadStep(current int) int {
|
|
current = normalizeStreamAdaptiveSoftPayloadBytes(current)
|
|
for index := len(streamAdaptiveSoftPayloadSteps) - 1; index >= 0; index-- {
|
|
step := streamAdaptiveSoftPayloadSteps[index]
|
|
if current > step {
|
|
return step
|
|
}
|
|
}
|
|
return streamAdaptiveSoftPayloadMinBytes
|
|
}
|
|
|
|
func nextStreamAdaptiveSoftPayloadStep(current int, target int) int {
|
|
current = normalizeStreamAdaptiveSoftPayloadBytes(current)
|
|
target = normalizeStreamAdaptiveSoftPayloadBytes(target)
|
|
for _, step := range streamAdaptiveSoftPayloadSteps {
|
|
if step > current {
|
|
if step > target {
|
|
return target
|
|
}
|
|
return step
|
|
}
|
|
}
|
|
return streamAdaptiveSoftPayloadStartBytes
|
|
}
|
|
|
|
func streamAdaptiveWaitThresholdBytesForSoftPayload(size int) int {
|
|
size = normalizeStreamAdaptiveSoftPayloadBytes(size)
|
|
threshold := size / 16
|
|
if threshold < streamAdaptiveWaitThresholdMinBytes {
|
|
return streamAdaptiveWaitThresholdMinBytes
|
|
}
|
|
if threshold > streamBatchWaitThreshold {
|
|
return streamBatchWaitThreshold
|
|
}
|
|
return threshold
|
|
}
|
|
|
|
func streamAdaptiveFlushDelayForSoftPayload(size int) time.Duration {
|
|
size = normalizeStreamAdaptiveSoftPayloadBytes(size)
|
|
switch {
|
|
case size >= streamAdaptiveSoftPayloadStartBytes:
|
|
return streamBatchMaxFlushDelay
|
|
case size >= 1024*1024:
|
|
return streamAdaptiveFlushDelayMid
|
|
default:
|
|
return 0
|
|
}
|
|
}
|