2026-04-18 16:05:57 +08:00
|
|
|
package notify
|
|
|
|
|
|
|
|
|
|
import "encoding/binary"
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
streamFastPathVersionV1 = 1
|
|
|
|
|
streamFastPathVersionV2 = 2
|
|
|
|
|
streamFastPathVersionCurrent = streamFastPathVersionV2
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
streamFastBatchMagic = "NSB1"
|
|
|
|
|
streamFastBatchVersion = 1
|
|
|
|
|
streamFastBatchHeaderLen = 12
|
|
|
|
|
streamFastBatchItemHeaderLen = 24
|
|
|
|
|
streamFastBatchMaxItems = 64
|
|
|
|
|
streamFastBatchMaxPlainBytes = 8 * 1024 * 1024
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func normalizeStreamFastPathVersion(version uint8) uint8 {
|
|
|
|
|
if version < streamFastPathVersionV1 {
|
|
|
|
|
return streamFastPathVersionV1
|
|
|
|
|
}
|
|
|
|
|
if version > streamFastPathVersionCurrent {
|
|
|
|
|
return streamFastPathVersionCurrent
|
|
|
|
|
}
|
|
|
|
|
return version
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func negotiateStreamFastPathVersion(version uint8) uint8 {
|
|
|
|
|
return normalizeStreamFastPathVersion(version)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func streamFastPathSupportsBatch(version uint8) bool {
|
|
|
|
|
return normalizeStreamFastPathVersion(version) >= streamFastPathVersionV2
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func streamFastBatchFrameLen(frame streamFastDataFrame) int {
|
|
|
|
|
return streamFastBatchItemHeaderLen + len(frame.Payload)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func streamFastBatchPlainLen(frames []streamFastDataFrame) int {
|
|
|
|
|
total := streamFastBatchHeaderLen
|
|
|
|
|
for _, frame := range frames {
|
|
|
|
|
total += streamFastBatchFrameLen(frame)
|
|
|
|
|
}
|
|
|
|
|
return total
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func encodeStreamFastBatchPlain(frames []streamFastDataFrame) ([]byte, error) {
|
|
|
|
|
if len(frames) == 0 {
|
|
|
|
|
return nil, errStreamFastPayloadInvalid
|
|
|
|
|
}
|
|
|
|
|
buf := make([]byte, streamFastBatchPlainLen(frames))
|
|
|
|
|
if err := writeStreamFastBatchPlain(buf, frames); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return buf, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func encodeStreamFastBatchPayloadFast(encode transportFastPlainEncoder, secretKey []byte, frames []streamFastDataFrame) ([]byte, error) {
|
|
|
|
|
if encode == nil {
|
|
|
|
|
return nil, errTransportPayloadEncryptFailed
|
|
|
|
|
}
|
|
|
|
|
plainLen := streamFastBatchPlainLen(frames)
|
|
|
|
|
return encode(secretKey, plainLen, func(dst []byte) error {
|
|
|
|
|
return writeStreamFastBatchPlain(dst, frames)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func writeStreamFastBatchPlain(dst []byte, frames []streamFastDataFrame) error {
|
|
|
|
|
if len(frames) == 0 || len(dst) != streamFastBatchPlainLen(frames) {
|
|
|
|
|
return errStreamFastPayloadInvalid
|
|
|
|
|
}
|
|
|
|
|
copy(dst[:4], streamFastBatchMagic)
|
|
|
|
|
dst[4] = streamFastBatchVersion
|
|
|
|
|
binary.BigEndian.PutUint32(dst[8:12], uint32(len(frames)))
|
|
|
|
|
offset := streamFastBatchHeaderLen
|
|
|
|
|
for _, frame := range frames {
|
|
|
|
|
if frame.DataID == 0 {
|
|
|
|
|
return errStreamFastPayloadInvalid
|
|
|
|
|
}
|
|
|
|
|
dst[offset] = frame.Flags
|
|
|
|
|
binary.BigEndian.PutUint64(dst[offset+4:offset+12], frame.DataID)
|
|
|
|
|
binary.BigEndian.PutUint64(dst[offset+12:offset+20], frame.Seq)
|
|
|
|
|
binary.BigEndian.PutUint32(dst[offset+20:offset+24], uint32(len(frame.Payload)))
|
|
|
|
|
offset += streamFastBatchItemHeaderLen
|
|
|
|
|
copy(dst[offset:offset+len(frame.Payload)], frame.Payload)
|
|
|
|
|
offset += len(frame.Payload)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-20 16:35:44 +08:00
|
|
|
func walkStreamFastBatchPlain(payload []byte, fn func(streamFastDataFrame) error) (bool, error) {
|
2026-04-18 16:05:57 +08:00
|
|
|
if len(payload) < 4 || string(payload[:4]) != streamFastBatchMagic {
|
2026-04-20 16:35:44 +08:00
|
|
|
return false, nil
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
if len(payload) < streamFastBatchHeaderLen {
|
2026-04-20 16:35:44 +08:00
|
|
|
return true, errStreamFastPayloadInvalid
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
if payload[4] != streamFastBatchVersion {
|
2026-04-20 16:35:44 +08:00
|
|
|
return true, errStreamFastPayloadInvalid
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
count := int(binary.BigEndian.Uint32(payload[8:12]))
|
|
|
|
|
if count <= 0 {
|
2026-04-20 16:35:44 +08:00
|
|
|
return true, errStreamFastPayloadInvalid
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
offset := streamFastBatchHeaderLen
|
|
|
|
|
for index := 0; index < count; index++ {
|
|
|
|
|
if len(payload)-offset < streamFastBatchItemHeaderLen {
|
2026-04-20 16:35:44 +08:00
|
|
|
return true, errStreamFastPayloadInvalid
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
flags := payload[offset]
|
|
|
|
|
dataID := binary.BigEndian.Uint64(payload[offset+4 : offset+12])
|
|
|
|
|
seq := binary.BigEndian.Uint64(payload[offset+12 : offset+20])
|
|
|
|
|
payloadLen := int(binary.BigEndian.Uint32(payload[offset+20 : offset+24]))
|
|
|
|
|
offset += streamFastBatchItemHeaderLen
|
|
|
|
|
if dataID == 0 || payloadLen < 0 || len(payload)-offset < payloadLen {
|
2026-04-20 16:35:44 +08:00
|
|
|
return true, errStreamFastPayloadInvalid
|
|
|
|
|
}
|
|
|
|
|
if fn != nil {
|
|
|
|
|
if err := fn(streamFastDataFrame{
|
|
|
|
|
Flags: flags,
|
|
|
|
|
DataID: dataID,
|
|
|
|
|
Seq: seq,
|
|
|
|
|
Payload: payload[offset : offset+payloadLen],
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return true, err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
offset += payloadLen
|
|
|
|
|
}
|
|
|
|
|
if offset != len(payload) {
|
2026-04-20 16:35:44 +08:00
|
|
|
return true, errStreamFastPayloadInvalid
|
|
|
|
|
}
|
|
|
|
|
return true, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeStreamFastBatchPlain(payload []byte) ([]streamFastDataFrame, bool, error) {
|
|
|
|
|
frames := make([]streamFastDataFrame, 0, 1)
|
|
|
|
|
matched, err := walkStreamFastBatchPlain(payload, func(frame streamFastDataFrame) error {
|
|
|
|
|
frames = append(frames, frame)
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
if !matched || err != nil {
|
|
|
|
|
return nil, matched, err
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
return frames, true, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-20 16:35:44 +08:00
|
|
|
func walkStreamFastFrames(payload []byte, fn func(streamFastDataFrame) error) (bool, error) {
|
|
|
|
|
if matched, err := walkStreamFastBatchPlain(payload, fn); matched {
|
|
|
|
|
return true, err
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
frame, matched, err := decodeStreamFastDataFrame(payload)
|
2026-04-20 16:35:44 +08:00
|
|
|
if !matched || err != nil {
|
|
|
|
|
return matched, err
|
|
|
|
|
}
|
|
|
|
|
if fn != nil {
|
|
|
|
|
if err := fn(frame); err != nil {
|
|
|
|
|
return true, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func decodeStreamFastDataFrames(payload []byte) ([]streamFastDataFrame, bool, error) {
|
|
|
|
|
frames := make([]streamFastDataFrame, 0, 1)
|
|
|
|
|
matched, err := walkStreamFastFrames(payload, func(frame streamFastDataFrame) error {
|
|
|
|
|
frames = append(frames, frame)
|
|
|
|
|
return nil
|
|
|
|
|
})
|
2026-04-18 16:05:57 +08:00
|
|
|
if !matched || err != nil {
|
|
|
|
|
return nil, matched, err
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
return frames, true, nil
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|