notify/bulk_shared_batch.go
starainrt f038a89771
fix: close stream adaptive gaps and switch notify to stario v0.1.1
- make stream fast path honor adaptive soft payload limits end-to-end
  - split oversized fast-stream payloads into sequential frames before batching
  - use adaptive soft cap when encoding stream batch payloads
  - move timeout-like error detection into production code for adaptive tx
  - tune notify FrameReader read size explicitly to avoid throughput regression
  - drop local stario replace and depend on released b612.me/stario v0.1.1
2026-04-18 16:05:57 +08:00

229 lines
6.8 KiB
Go

package notify
import (
"encoding/binary"
)
const (
bulkFastPathVersionV1 = 1
bulkFastPathVersionV2 = 2
bulkFastPathVersionCurrent = bulkFastPathVersionV2
)
const (
bulkFastBatchMagic = "NBF2"
bulkFastBatchVersion = 1
bulkFastBatchHeaderLen = 12
bulkFastBatchItemHeaderLen = 24
bulkFastBatchMaxItems = 64
bulkFastBatchMaxPlainBytes = 8 * 1024 * 1024
)
func normalizeBulkFastPathVersion(version uint8) uint8 {
if version < bulkFastPathVersionV1 {
return bulkFastPathVersionV1
}
if version > bulkFastPathVersionCurrent {
return bulkFastPathVersionCurrent
}
return version
}
func negotiateBulkFastPathVersion(version uint8) uint8 {
return normalizeBulkFastPathVersion(version)
}
func bulkFastPathSupportsSharedBatch(version uint8) bool {
return normalizeBulkFastPathVersion(version) >= bulkFastPathVersionV2
}
func bulkFastBatchFrameLen(frame bulkFastFrame) int {
return bulkFastBatchItemHeaderLen + len(frame.Payload)
}
func bulkFastBatchPlainLen(frames []bulkFastFrame) int {
total := bulkFastBatchHeaderLen
for _, frame := range frames {
total += bulkFastBatchFrameLen(frame)
}
return total
}
func encodeBulkFastFramePayload(frame bulkFastFrame) ([]byte, error) {
return encodeBulkFastControlFrame(frame.Type, frame.Flags, frame.DataID, frame.Seq, frame.Payload)
}
func encodeBulkFastFramePayloadFast(encode transportFastPlainEncoder, secretKey []byte, frame bulkFastFrame) ([]byte, error) {
if encode == nil {
return nil, errTransportPayloadEncryptFailed
}
plainLen := bulkFastPayloadHeaderLen + len(frame.Payload)
return encode(secretKey, plainLen, func(dst []byte) error {
if err := encodeBulkFastFrameHeader(dst, frame.Type, frame.Flags, frame.DataID, frame.Seq, len(frame.Payload)); err != nil {
return err
}
copy(dst[bulkFastPayloadHeaderLen:], frame.Payload)
return nil
})
}
func encodeBulkFastFramePayloadPooled(runtime *modernPSKCodecRuntime, frame bulkFastFrame) ([]byte, func(), error) {
if runtime == nil {
return nil, nil, errTransportPayloadEncryptFailed
}
plainLen := bulkFastPayloadHeaderLen + len(frame.Payload)
return runtime.sealFilledPayloadPooled(plainLen, func(dst []byte) error {
if err := encodeBulkFastFrameHeader(dst, frame.Type, frame.Flags, frame.DataID, frame.Seq, len(frame.Payload)); err != nil {
return err
}
copy(dst[bulkFastPayloadHeaderLen:], frame.Payload)
return nil
})
}
func encodeBulkFastBatchPlain(frames []bulkFastFrame) ([]byte, error) {
if len(frames) == 0 {
return nil, errBulkFastPayloadInvalid
}
buf := make([]byte, bulkFastBatchPlainLen(frames))
if err := writeBulkFastBatchPlain(buf, frames); err != nil {
return nil, err
}
return buf, nil
}
func encodeBulkFastBatchPayloadFast(encode transportFastPlainEncoder, secretKey []byte, frames []bulkFastFrame) ([]byte, error) {
if encode == nil {
return nil, errTransportPayloadEncryptFailed
}
plainLen := bulkFastBatchPlainLen(frames)
return encode(secretKey, plainLen, func(dst []byte) error {
return writeBulkFastBatchPlain(dst, frames)
})
}
func encodeBulkFastBatchPayloadPooled(runtime *modernPSKCodecRuntime, frames []bulkFastFrame) ([]byte, func(), error) {
if runtime == nil {
return nil, nil, errTransportPayloadEncryptFailed
}
return runtime.sealFilledPayloadPooled(bulkFastBatchPlainLen(frames), func(dst []byte) error {
return writeBulkFastBatchPlain(dst, frames)
})
}
func writeBulkFastBatchPlain(dst []byte, frames []bulkFastFrame) error {
if len(frames) == 0 || len(dst) != bulkFastBatchPlainLen(frames) {
return errBulkFastPayloadInvalid
}
copy(dst[:4], bulkFastBatchMagic)
dst[4] = bulkFastBatchVersion
binary.BigEndian.PutUint32(dst[8:12], uint32(len(frames)))
offset := bulkFastBatchHeaderLen
for _, frame := range frames {
if frame.DataID == 0 {
return errBulkFastPayloadInvalid
}
dst[offset] = frame.Type
dst[offset+1] = frame.Flags
binary.BigEndian.PutUint64(dst[offset+4:offset+12], frame.DataID)
binary.BigEndian.PutUint64(dst[offset+12:offset+20], frame.Seq)
binary.BigEndian.PutUint32(dst[offset+20:offset+24], uint32(len(frame.Payload)))
offset += bulkFastBatchItemHeaderLen
copy(dst[offset:offset+len(frame.Payload)], frame.Payload)
offset += len(frame.Payload)
}
return nil
}
func walkBulkFastBatchPlain(payload []byte, fn func(bulkFastFrame) error) (bool, error) {
if len(payload) < 4 || string(payload[:4]) != bulkFastBatchMagic {
return false, nil
}
if len(payload) < bulkFastBatchHeaderLen {
return true, errBulkFastPayloadInvalid
}
if payload[4] != bulkFastBatchVersion {
return true, errBulkFastPayloadInvalid
}
count := int(binary.BigEndian.Uint32(payload[8:12]))
if count <= 0 {
return true, errBulkFastPayloadInvalid
}
offset := bulkFastBatchHeaderLen
for index := 0; index < count; index++ {
if len(payload)-offset < bulkFastBatchItemHeaderLen {
return true, errBulkFastPayloadInvalid
}
frameType := payload[offset]
switch frameType {
case bulkFastPayloadTypeData, bulkFastPayloadTypeClose, bulkFastPayloadTypeReset, bulkFastPayloadTypeRelease:
default:
return true, errBulkFastPayloadInvalid
}
flags := payload[offset+1]
dataID := binary.BigEndian.Uint64(payload[offset+4 : offset+12])
seq := binary.BigEndian.Uint64(payload[offset+12 : offset+20])
payloadLen := int(binary.BigEndian.Uint32(payload[offset+20 : offset+24]))
offset += bulkFastBatchItemHeaderLen
if dataID == 0 || payloadLen < 0 || len(payload)-offset < payloadLen {
return true, errBulkFastPayloadInvalid
}
if fn != nil {
if err := fn(bulkFastFrame{
Type: frameType,
Flags: flags,
DataID: dataID,
Seq: seq,
Payload: payload[offset : offset+payloadLen],
}); err != nil {
return true, err
}
}
offset += payloadLen
}
if offset != len(payload) {
return true, errBulkFastPayloadInvalid
}
return true, nil
}
func decodeBulkFastBatchPlain(payload []byte) ([]bulkFastFrame, bool, error) {
frames := make([]bulkFastFrame, 0, 1)
matched, err := walkBulkFastBatchPlain(payload, func(frame bulkFastFrame) error {
frames = append(frames, frame)
return nil
})
if !matched || err != nil {
return nil, matched, err
}
return frames, true, nil
}
func walkBulkFastFrames(payload []byte, fn func(bulkFastFrame) error) (bool, error) {
if matched, err := walkBulkFastBatchPlain(payload, fn); matched {
return true, err
}
frame, matched, err := decodeBulkFastFrame(payload)
if !matched || err != nil {
return matched, err
}
if fn != nil {
if err := fn(frame); err != nil {
return true, err
}
}
return true, nil
}
func decodeBulkFastFrames(payload []byte) ([]bulkFastFrame, bool, error) {
frames := make([]bulkFastFrame, 0, 1)
matched, err := walkBulkFastFrames(payload, func(frame bulkFastFrame) error {
frames = append(frames, frame)
return nil
})
if !matched || err != nil {
return nil, matched, err
}
return frames, true, nil
}