notify/stream_shared_batch.go

178 lines
5.1 KiB
Go
Raw Permalink Normal View History

package notify
import "encoding/binary"
const (
streamFastPathVersionV1 = 1
streamFastPathVersionV2 = 2
streamFastPathVersionCurrent = streamFastPathVersionV2
)
const (
streamFastBatchMagic = "NSB1"
streamFastBatchVersion = 1
streamFastBatchHeaderLen = 12
streamFastBatchItemHeaderLen = 24
streamFastBatchMaxItems = 64
streamFastBatchMaxPlainBytes = 8 * 1024 * 1024
)
func normalizeStreamFastPathVersion(version uint8) uint8 {
if version < streamFastPathVersionV1 {
return streamFastPathVersionV1
}
if version > streamFastPathVersionCurrent {
return streamFastPathVersionCurrent
}
return version
}
func negotiateStreamFastPathVersion(version uint8) uint8 {
return normalizeStreamFastPathVersion(version)
}
func streamFastPathSupportsBatch(version uint8) bool {
return normalizeStreamFastPathVersion(version) >= streamFastPathVersionV2
}
func streamFastBatchFrameLen(frame streamFastDataFrame) int {
return streamFastBatchItemHeaderLen + len(frame.Payload)
}
func streamFastBatchPlainLen(frames []streamFastDataFrame) int {
total := streamFastBatchHeaderLen
for _, frame := range frames {
total += streamFastBatchFrameLen(frame)
}
return total
}
func encodeStreamFastBatchPlain(frames []streamFastDataFrame) ([]byte, error) {
if len(frames) == 0 {
return nil, errStreamFastPayloadInvalid
}
buf := make([]byte, streamFastBatchPlainLen(frames))
if err := writeStreamFastBatchPlain(buf, frames); err != nil {
return nil, err
}
return buf, nil
}
func encodeStreamFastBatchPayloadFast(encode transportFastPlainEncoder, secretKey []byte, frames []streamFastDataFrame) ([]byte, error) {
if encode == nil {
return nil, errTransportPayloadEncryptFailed
}
plainLen := streamFastBatchPlainLen(frames)
return encode(secretKey, plainLen, func(dst []byte) error {
return writeStreamFastBatchPlain(dst, frames)
})
}
func writeStreamFastBatchPlain(dst []byte, frames []streamFastDataFrame) error {
if len(frames) == 0 || len(dst) != streamFastBatchPlainLen(frames) {
return errStreamFastPayloadInvalid
}
copy(dst[:4], streamFastBatchMagic)
dst[4] = streamFastBatchVersion
binary.BigEndian.PutUint32(dst[8:12], uint32(len(frames)))
offset := streamFastBatchHeaderLen
for _, frame := range frames {
if frame.DataID == 0 {
return errStreamFastPayloadInvalid
}
dst[offset] = frame.Flags
binary.BigEndian.PutUint64(dst[offset+4:offset+12], frame.DataID)
binary.BigEndian.PutUint64(dst[offset+12:offset+20], frame.Seq)
binary.BigEndian.PutUint32(dst[offset+20:offset+24], uint32(len(frame.Payload)))
offset += streamFastBatchItemHeaderLen
copy(dst[offset:offset+len(frame.Payload)], frame.Payload)
offset += len(frame.Payload)
}
return nil
}
func walkStreamFastBatchPlain(payload []byte, fn func(streamFastDataFrame) error) (bool, error) {
if len(payload) < 4 || string(payload[:4]) != streamFastBatchMagic {
return false, nil
}
if len(payload) < streamFastBatchHeaderLen {
return true, errStreamFastPayloadInvalid
}
if payload[4] != streamFastBatchVersion {
return true, errStreamFastPayloadInvalid
}
count := int(binary.BigEndian.Uint32(payload[8:12]))
if count <= 0 {
return true, errStreamFastPayloadInvalid
}
offset := streamFastBatchHeaderLen
for index := 0; index < count; index++ {
if len(payload)-offset < streamFastBatchItemHeaderLen {
return true, errStreamFastPayloadInvalid
}
flags := payload[offset]
dataID := binary.BigEndian.Uint64(payload[offset+4 : offset+12])
seq := binary.BigEndian.Uint64(payload[offset+12 : offset+20])
payloadLen := int(binary.BigEndian.Uint32(payload[offset+20 : offset+24]))
offset += streamFastBatchItemHeaderLen
if dataID == 0 || payloadLen < 0 || len(payload)-offset < payloadLen {
return true, errStreamFastPayloadInvalid
}
if fn != nil {
if err := fn(streamFastDataFrame{
Flags: flags,
DataID: dataID,
Seq: seq,
Payload: payload[offset : offset+payloadLen],
}); err != nil {
return true, err
}
}
offset += payloadLen
}
if offset != len(payload) {
return true, errStreamFastPayloadInvalid
}
return true, nil
}
func decodeStreamFastBatchPlain(payload []byte) ([]streamFastDataFrame, bool, error) {
frames := make([]streamFastDataFrame, 0, 1)
matched, err := walkStreamFastBatchPlain(payload, func(frame streamFastDataFrame) error {
frames = append(frames, frame)
return nil
})
if !matched || err != nil {
return nil, matched, err
}
return frames, true, nil
}
func walkStreamFastFrames(payload []byte, fn func(streamFastDataFrame) error) (bool, error) {
if matched, err := walkStreamFastBatchPlain(payload, fn); matched {
return true, err
}
frame, matched, err := decodeStreamFastDataFrame(payload)
if !matched || err != nil {
return matched, err
}
if fn != nil {
if err := fn(frame); err != nil {
return true, err
}
}
return true, nil
}
func decodeStreamFastDataFrames(payload []byte) ([]streamFastDataFrame, bool, error) {
frames := make([]streamFastDataFrame, 0, 1)
matched, err := walkStreamFastFrames(payload, func(frame streamFastDataFrame) error {
frames = append(frames, frame)
return nil
})
if !matched || err != nil {
return nil, matched, err
}
return frames, true, nil
}