package notify import "encoding/binary" const ( streamFastPathVersionV1 = 1 streamFastPathVersionV2 = 2 streamFastPathVersionCurrent = streamFastPathVersionV2 ) const ( streamFastBatchMagic = "NSB1" streamFastBatchVersion = 1 streamFastBatchHeaderLen = 12 streamFastBatchItemHeaderLen = 24 streamFastBatchMaxItems = 64 streamFastBatchMaxPlainBytes = 8 * 1024 * 1024 ) func normalizeStreamFastPathVersion(version uint8) uint8 { if version < streamFastPathVersionV1 { return streamFastPathVersionV1 } if version > streamFastPathVersionCurrent { return streamFastPathVersionCurrent } return version } func negotiateStreamFastPathVersion(version uint8) uint8 { return normalizeStreamFastPathVersion(version) } func streamFastPathSupportsBatch(version uint8) bool { return normalizeStreamFastPathVersion(version) >= streamFastPathVersionV2 } func streamFastBatchFrameLen(frame streamFastDataFrame) int { return streamFastBatchItemHeaderLen + len(frame.Payload) } func streamFastBatchPlainLen(frames []streamFastDataFrame) int { total := streamFastBatchHeaderLen for _, frame := range frames { total += streamFastBatchFrameLen(frame) } return total } func encodeStreamFastBatchPlain(frames []streamFastDataFrame) ([]byte, error) { if len(frames) == 0 { return nil, errStreamFastPayloadInvalid } buf := make([]byte, streamFastBatchPlainLen(frames)) if err := writeStreamFastBatchPlain(buf, frames); err != nil { return nil, err } return buf, nil } func encodeStreamFastBatchPayloadFast(encode transportFastPlainEncoder, secretKey []byte, frames []streamFastDataFrame) ([]byte, error) { if encode == nil { return nil, errTransportPayloadEncryptFailed } plainLen := streamFastBatchPlainLen(frames) return encode(secretKey, plainLen, func(dst []byte) error { return writeStreamFastBatchPlain(dst, frames) }) } func writeStreamFastBatchPlain(dst []byte, frames []streamFastDataFrame) error { if len(frames) == 0 || len(dst) != streamFastBatchPlainLen(frames) { return errStreamFastPayloadInvalid } copy(dst[:4], streamFastBatchMagic) dst[4] = streamFastBatchVersion binary.BigEndian.PutUint32(dst[8:12], uint32(len(frames))) offset := streamFastBatchHeaderLen for _, frame := range frames { if frame.DataID == 0 { return errStreamFastPayloadInvalid } dst[offset] = frame.Flags binary.BigEndian.PutUint64(dst[offset+4:offset+12], frame.DataID) binary.BigEndian.PutUint64(dst[offset+12:offset+20], frame.Seq) binary.BigEndian.PutUint32(dst[offset+20:offset+24], uint32(len(frame.Payload))) offset += streamFastBatchItemHeaderLen copy(dst[offset:offset+len(frame.Payload)], frame.Payload) offset += len(frame.Payload) } return nil } func walkStreamFastBatchPlain(payload []byte, fn func(streamFastDataFrame) error) (bool, error) { if len(payload) < 4 || string(payload[:4]) != streamFastBatchMagic { return false, nil } if len(payload) < streamFastBatchHeaderLen { return true, errStreamFastPayloadInvalid } if payload[4] != streamFastBatchVersion { return true, errStreamFastPayloadInvalid } count := int(binary.BigEndian.Uint32(payload[8:12])) if count <= 0 { return true, errStreamFastPayloadInvalid } offset := streamFastBatchHeaderLen for index := 0; index < count; index++ { if len(payload)-offset < streamFastBatchItemHeaderLen { return true, errStreamFastPayloadInvalid } flags := payload[offset] dataID := binary.BigEndian.Uint64(payload[offset+4 : offset+12]) seq := binary.BigEndian.Uint64(payload[offset+12 : offset+20]) payloadLen := int(binary.BigEndian.Uint32(payload[offset+20 : offset+24])) offset += streamFastBatchItemHeaderLen if dataID == 0 || payloadLen < 0 || len(payload)-offset < payloadLen { return true, errStreamFastPayloadInvalid } if fn != nil { if err := fn(streamFastDataFrame{ Flags: flags, DataID: dataID, Seq: seq, Payload: payload[offset : offset+payloadLen], }); err != nil { return true, err } } offset += payloadLen } if offset != len(payload) { return true, errStreamFastPayloadInvalid } return true, nil } func decodeStreamFastBatchPlain(payload []byte) ([]streamFastDataFrame, bool, error) { frames := make([]streamFastDataFrame, 0, 1) matched, err := walkStreamFastBatchPlain(payload, func(frame streamFastDataFrame) error { frames = append(frames, frame) return nil }) if !matched || err != nil { return nil, matched, err } return frames, true, nil } func walkStreamFastFrames(payload []byte, fn func(streamFastDataFrame) error) (bool, error) { if matched, err := walkStreamFastBatchPlain(payload, fn); matched { return true, err } frame, matched, err := decodeStreamFastDataFrame(payload) if !matched || err != nil { return matched, err } if fn != nil { if err := fn(frame); err != nil { return true, err } } return true, nil } func decodeStreamFastDataFrames(payload []byte) ([]streamFastDataFrame, bool, error) { frames := make([]streamFastDataFrame, 0, 1) matched, err := walkStreamFastFrames(payload, func(frame streamFastDataFrame) error { frames = append(frames, frame) return nil }) if !matched || err != nil { return nil, matched, err } return frames, true, nil }