notify/transport_binding_adaptive.go
starainrt f038a89771
fix: close stream adaptive gaps and switch notify to stario v0.1.1
- make stream fast path honor adaptive soft payload limits end-to-end
  - split oversized fast-stream payloads into sequential frames before batching
  - use adaptive soft cap when encoding stream batch payloads
  - move timeout-like error detection into production code for adaptive tx
  - tune notify FrameReader read size explicitly to avoid throughput regression
  - drop local stario replace and depend on released b612.me/stario v0.1.1
2026-04-18 16:05:57 +08:00

384 lines
10 KiB
Go

package notify
import (
"sync"
"time"
)
const (
bulkAdaptiveSoftPayloadMinBytes = 256 * 1024
bulkAdaptiveSoftPayloadFallbackBytes = 2 * 1024 * 1024
bulkAdaptiveSoftPayloadStartBytes = bulkFastBatchMaxPlainBytes
bulkAdaptiveSoftPayloadTargetFlush = 4 * time.Millisecond
bulkAdaptiveSoftPayloadSlowFlush = 16 * time.Millisecond
bulkAdaptiveSoftPayloadMinSampleBytes = 256 * 1024
bulkAdaptiveSoftPayloadGrowSuccesses = 8
streamAdaptiveSoftPayloadMinBytes = 256 * 1024
streamAdaptiveSoftPayloadFallbackBytes = streamBatchMaxPayloadBytes
streamAdaptiveSoftPayloadStartBytes = streamBatchMaxPayloadBytes
streamAdaptiveSoftPayloadTargetFlush = 4 * time.Millisecond
streamAdaptiveSoftPayloadSlowFlush = 16 * time.Millisecond
streamAdaptiveSoftPayloadMinSampleBytes = 64 * 1024
streamAdaptiveSoftPayloadGrowSuccesses = 8
streamAdaptiveWaitThresholdMinBytes = 32 * 1024
streamAdaptiveFlushDelayMid = 25 * time.Microsecond
)
var bulkAdaptiveSoftPayloadSteps = [...]int{
256 * 1024,
512 * 1024,
1024 * 1024,
2 * 1024 * 1024,
4 * 1024 * 1024,
bulkFastBatchMaxPlainBytes,
}
var streamAdaptiveSoftPayloadSteps = [...]int{
256 * 1024,
512 * 1024,
1024 * 1024,
streamBatchMaxPayloadBytes,
}
type adaptiveTxState struct {
mu sync.Mutex
bulkSoftPayloadBytes int
bulkGoodputBytesPerS float64
bulkGrowStreak int
streamSoftPayloadBytes int
streamGoodputBytesPerS float64
streamGrowStreak int
}
func (b *transportBinding) bulkAdaptiveSoftPayloadBytesSnapshot() int {
if b == nil {
return bulkAdaptiveSoftPayloadFallbackBytes
}
return b.adaptiveTx.bulkSoftPayloadBytesSnapshot()
}
func (b *transportBinding) observeBulkAdaptivePayloadWrite(payloadBytes int, elapsed time.Duration, timeout time.Duration, err error) {
if b == nil {
return
}
b.adaptiveTx.observeBulkPayloadWrite(payloadBytes, elapsed, timeout, err)
}
func (b *transportBinding) streamAdaptiveSoftPayloadBytesSnapshot() int {
if b == nil {
return streamAdaptiveSoftPayloadFallbackBytes
}
return b.adaptiveTx.streamSoftPayloadBytesSnapshot()
}
func (b *transportBinding) streamAdaptiveWaitThresholdBytesSnapshot() int {
if b == nil {
return streamBatchWaitThreshold
}
return b.adaptiveTx.streamWaitThresholdBytesSnapshot()
}
func (b *transportBinding) streamAdaptiveFlushDelaySnapshot() time.Duration {
if b == nil {
return streamBatchMaxFlushDelay
}
return b.adaptiveTx.streamFlushDelaySnapshot()
}
func (b *transportBinding) observeStreamAdaptivePayloadWrite(payloadBytes int, elapsed time.Duration, timeout time.Duration, err error) {
if b == nil {
return
}
b.adaptiveTx.observeStreamPayloadWrite(payloadBytes, elapsed, timeout, err)
}
func (s *adaptiveTxState) bulkSoftPayloadBytesSnapshot() int {
if s == nil {
return bulkAdaptiveSoftPayloadStartBytes
}
s.mu.Lock()
defer s.mu.Unlock()
return s.bulkSoftPayloadBytesLocked()
}
func (s *adaptiveTxState) observeBulkPayloadWrite(payloadBytes int, elapsed time.Duration, timeout time.Duration, err error) {
if s == nil || payloadBytes <= 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
current := s.bulkSoftPayloadBytesLocked()
target, hasSample := s.observeBulkGoodputLocked(payloadBytes, elapsed)
nearTimeout := timeout > 0 && elapsed >= (timeout*3)/4
if isTimeoutLikeError(err) || nearTimeout {
s.bulkGrowStreak = 0
if hasSample && target < current {
s.bulkSoftPayloadBytes = target
return
}
s.bulkSoftPayloadBytes = previousBulkAdaptiveSoftPayloadStep(current)
return
}
if err != nil {
s.bulkGrowStreak = 0
return
}
if !hasSample {
return
}
if elapsed >= bulkAdaptiveSoftPayloadSlowFlush {
s.bulkGrowStreak = 0
if target < current {
s.bulkSoftPayloadBytes = target
return
}
s.bulkSoftPayloadBytes = previousBulkAdaptiveSoftPayloadStep(current)
return
}
if target > current {
s.bulkGrowStreak++
if s.bulkGrowStreak >= bulkAdaptiveSoftPayloadGrowSuccesses {
s.bulkSoftPayloadBytes = nextBulkAdaptiveSoftPayloadStep(current, target)
s.bulkGrowStreak = 0
}
return
}
if target < current && elapsed >= bulkAdaptiveSoftPayloadTargetFlush*2 {
s.bulkSoftPayloadBytes = target
s.bulkGrowStreak = 0
return
}
s.bulkGrowStreak = 0
}
func (s *adaptiveTxState) bulkSoftPayloadBytesLocked() int {
if s.bulkSoftPayloadBytes == 0 {
s.bulkSoftPayloadBytes = bulkAdaptiveSoftPayloadStartBytes
}
return normalizeBulkAdaptiveSoftPayloadBytes(s.bulkSoftPayloadBytes)
}
func (s *adaptiveTxState) streamSoftPayloadBytesSnapshot() int {
if s == nil {
return streamAdaptiveSoftPayloadStartBytes
}
s.mu.Lock()
defer s.mu.Unlock()
return s.streamSoftPayloadBytesLocked()
}
func (s *adaptiveTxState) streamWaitThresholdBytesSnapshot() int {
if s == nil {
return streamBatchWaitThreshold
}
s.mu.Lock()
defer s.mu.Unlock()
return streamAdaptiveWaitThresholdBytesForSoftPayload(s.streamSoftPayloadBytesLocked())
}
func (s *adaptiveTxState) streamFlushDelaySnapshot() time.Duration {
if s == nil {
return streamBatchMaxFlushDelay
}
s.mu.Lock()
defer s.mu.Unlock()
return streamAdaptiveFlushDelayForSoftPayload(s.streamSoftPayloadBytesLocked())
}
func (s *adaptiveTxState) observeStreamPayloadWrite(payloadBytes int, elapsed time.Duration, timeout time.Duration, err error) {
if s == nil || payloadBytes <= 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
current := s.streamSoftPayloadBytesLocked()
target, hasSample := s.observeStreamGoodputLocked(payloadBytes, elapsed)
nearTimeout := timeout > 0 && elapsed >= (timeout*3)/4
if isTimeoutLikeError(err) || nearTimeout {
s.streamGrowStreak = 0
if hasSample && target < current {
s.streamSoftPayloadBytes = target
return
}
s.streamSoftPayloadBytes = previousStreamAdaptiveSoftPayloadStep(current)
return
}
if err != nil {
s.streamGrowStreak = 0
return
}
if !hasSample {
return
}
if elapsed >= streamAdaptiveSoftPayloadSlowFlush {
s.streamGrowStreak = 0
if target < current {
s.streamSoftPayloadBytes = target
return
}
s.streamSoftPayloadBytes = previousStreamAdaptiveSoftPayloadStep(current)
return
}
if target > current {
s.streamGrowStreak++
if s.streamGrowStreak >= streamAdaptiveSoftPayloadGrowSuccesses {
s.streamSoftPayloadBytes = nextStreamAdaptiveSoftPayloadStep(current, target)
s.streamGrowStreak = 0
}
return
}
if target < current && elapsed >= streamAdaptiveSoftPayloadTargetFlush*2 {
s.streamSoftPayloadBytes = target
s.streamGrowStreak = 0
return
}
s.streamGrowStreak = 0
}
func (s *adaptiveTxState) streamSoftPayloadBytesLocked() int {
if s.streamSoftPayloadBytes == 0 {
s.streamSoftPayloadBytes = streamAdaptiveSoftPayloadStartBytes
}
return normalizeStreamAdaptiveSoftPayloadBytes(s.streamSoftPayloadBytes)
}
func (s *adaptiveTxState) observeBulkGoodputLocked(payloadBytes int, elapsed time.Duration) (int, bool) {
if payloadBytes < bulkAdaptiveSoftPayloadMinSampleBytes || elapsed <= 0 {
return 0, false
}
sample := float64(payloadBytes) / elapsed.Seconds()
if sample <= 0 {
return 0, false
}
if s.bulkGoodputBytesPerS <= 0 {
s.bulkGoodputBytesPerS = sample
} else {
const alpha = 0.25
s.bulkGoodputBytesPerS = s.bulkGoodputBytesPerS*(1-alpha) + sample*alpha
}
target := int(s.bulkGoodputBytesPerS * bulkAdaptiveSoftPayloadTargetFlush.Seconds())
return normalizeBulkAdaptiveSoftPayloadBytes(target), true
}
func (s *adaptiveTxState) observeStreamGoodputLocked(payloadBytes int, elapsed time.Duration) (int, bool) {
if payloadBytes < streamAdaptiveSoftPayloadMinSampleBytes || elapsed <= 0 {
return 0, false
}
sample := float64(payloadBytes) / elapsed.Seconds()
if sample <= 0 {
return 0, false
}
if s.streamGoodputBytesPerS <= 0 {
s.streamGoodputBytesPerS = sample
} else {
const alpha = 0.25
s.streamGoodputBytesPerS = s.streamGoodputBytesPerS*(1-alpha) + sample*alpha
}
target := int(s.streamGoodputBytesPerS * streamAdaptiveSoftPayloadTargetFlush.Seconds())
return normalizeStreamAdaptiveSoftPayloadBytes(target), true
}
func normalizeBulkAdaptiveSoftPayloadBytes(size int) int {
if size <= bulkAdaptiveSoftPayloadMinBytes {
return bulkAdaptiveSoftPayloadMinBytes
}
for _, step := range bulkAdaptiveSoftPayloadSteps {
if size <= step {
return step
}
}
return bulkAdaptiveSoftPayloadStartBytes
}
func previousBulkAdaptiveSoftPayloadStep(current int) int {
current = normalizeBulkAdaptiveSoftPayloadBytes(current)
for index := len(bulkAdaptiveSoftPayloadSteps) - 1; index >= 0; index-- {
step := bulkAdaptiveSoftPayloadSteps[index]
if current > step {
return step
}
}
return bulkAdaptiveSoftPayloadMinBytes
}
func nextBulkAdaptiveSoftPayloadStep(current int, target int) int {
current = normalizeBulkAdaptiveSoftPayloadBytes(current)
target = normalizeBulkAdaptiveSoftPayloadBytes(target)
for _, step := range bulkAdaptiveSoftPayloadSteps {
if step > current {
if step > target {
return target
}
return step
}
}
return bulkAdaptiveSoftPayloadStartBytes
}
func normalizeStreamAdaptiveSoftPayloadBytes(size int) int {
if size <= streamAdaptiveSoftPayloadMinBytes {
return streamAdaptiveSoftPayloadMinBytes
}
for _, step := range streamAdaptiveSoftPayloadSteps {
if size <= step {
return step
}
}
return streamAdaptiveSoftPayloadStartBytes
}
func previousStreamAdaptiveSoftPayloadStep(current int) int {
current = normalizeStreamAdaptiveSoftPayloadBytes(current)
for index := len(streamAdaptiveSoftPayloadSteps) - 1; index >= 0; index-- {
step := streamAdaptiveSoftPayloadSteps[index]
if current > step {
return step
}
}
return streamAdaptiveSoftPayloadMinBytes
}
func nextStreamAdaptiveSoftPayloadStep(current int, target int) int {
current = normalizeStreamAdaptiveSoftPayloadBytes(current)
target = normalizeStreamAdaptiveSoftPayloadBytes(target)
for _, step := range streamAdaptiveSoftPayloadSteps {
if step > current {
if step > target {
return target
}
return step
}
}
return streamAdaptiveSoftPayloadStartBytes
}
func streamAdaptiveWaitThresholdBytesForSoftPayload(size int) int {
size = normalizeStreamAdaptiveSoftPayloadBytes(size)
threshold := size / 16
if threshold < streamAdaptiveWaitThresholdMinBytes {
return streamAdaptiveWaitThresholdMinBytes
}
if threshold > streamBatchWaitThreshold {
return streamBatchWaitThreshold
}
return threshold
}
func streamAdaptiveFlushDelayForSoftPayload(size int) time.Duration {
size = normalizeStreamAdaptiveSoftPayloadBytes(size)
switch {
case size >= streamAdaptiveSoftPayloadStartBytes:
return streamBatchMaxFlushDelay
case size >= 1024*1024:
return streamAdaptiveFlushDelayMid
default:
return 0
}
}