229 lines
6.8 KiB
Go
229 lines
6.8 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"encoding/binary"
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
bulkFastPathVersionV1 = 1
|
||
|
|
bulkFastPathVersionV2 = 2
|
||
|
|
bulkFastPathVersionCurrent = bulkFastPathVersionV2
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
bulkFastBatchMagic = "NBF2"
|
||
|
|
bulkFastBatchVersion = 1
|
||
|
|
bulkFastBatchHeaderLen = 12
|
||
|
|
bulkFastBatchItemHeaderLen = 24
|
||
|
|
bulkFastBatchMaxItems = 64
|
||
|
|
bulkFastBatchMaxPlainBytes = 8 * 1024 * 1024
|
||
|
|
)
|
||
|
|
|
||
|
|
func normalizeBulkFastPathVersion(version uint8) uint8 {
|
||
|
|
if version < bulkFastPathVersionV1 {
|
||
|
|
return bulkFastPathVersionV1
|
||
|
|
}
|
||
|
|
if version > bulkFastPathVersionCurrent {
|
||
|
|
return bulkFastPathVersionCurrent
|
||
|
|
}
|
||
|
|
return version
|
||
|
|
}
|
||
|
|
|
||
|
|
func negotiateBulkFastPathVersion(version uint8) uint8 {
|
||
|
|
return normalizeBulkFastPathVersion(version)
|
||
|
|
}
|
||
|
|
|
||
|
|
func bulkFastPathSupportsSharedBatch(version uint8) bool {
|
||
|
|
return normalizeBulkFastPathVersion(version) >= bulkFastPathVersionV2
|
||
|
|
}
|
||
|
|
|
||
|
|
func bulkFastBatchFrameLen(frame bulkFastFrame) int {
|
||
|
|
return bulkFastBatchItemHeaderLen + len(frame.Payload)
|
||
|
|
}
|
||
|
|
|
||
|
|
func bulkFastBatchPlainLen(frames []bulkFastFrame) int {
|
||
|
|
total := bulkFastBatchHeaderLen
|
||
|
|
for _, frame := range frames {
|
||
|
|
total += bulkFastBatchFrameLen(frame)
|
||
|
|
}
|
||
|
|
return total
|
||
|
|
}
|
||
|
|
|
||
|
|
func encodeBulkFastFramePayload(frame bulkFastFrame) ([]byte, error) {
|
||
|
|
return encodeBulkFastControlFrame(frame.Type, frame.Flags, frame.DataID, frame.Seq, frame.Payload)
|
||
|
|
}
|
||
|
|
|
||
|
|
func encodeBulkFastFramePayloadFast(encode transportFastPlainEncoder, secretKey []byte, frame bulkFastFrame) ([]byte, error) {
|
||
|
|
if encode == nil {
|
||
|
|
return nil, errTransportPayloadEncryptFailed
|
||
|
|
}
|
||
|
|
plainLen := bulkFastPayloadHeaderLen + len(frame.Payload)
|
||
|
|
return encode(secretKey, plainLen, func(dst []byte) error {
|
||
|
|
if err := encodeBulkFastFrameHeader(dst, frame.Type, frame.Flags, frame.DataID, frame.Seq, len(frame.Payload)); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
copy(dst[bulkFastPayloadHeaderLen:], frame.Payload)
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func encodeBulkFastFramePayloadPooled(runtime *modernPSKCodecRuntime, frame bulkFastFrame) ([]byte, func(), error) {
|
||
|
|
if runtime == nil {
|
||
|
|
return nil, nil, errTransportPayloadEncryptFailed
|
||
|
|
}
|
||
|
|
plainLen := bulkFastPayloadHeaderLen + len(frame.Payload)
|
||
|
|
return runtime.sealFilledPayloadPooled(plainLen, func(dst []byte) error {
|
||
|
|
if err := encodeBulkFastFrameHeader(dst, frame.Type, frame.Flags, frame.DataID, frame.Seq, len(frame.Payload)); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
copy(dst[bulkFastPayloadHeaderLen:], frame.Payload)
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func encodeBulkFastBatchPlain(frames []bulkFastFrame) ([]byte, error) {
|
||
|
|
if len(frames) == 0 {
|
||
|
|
return nil, errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
buf := make([]byte, bulkFastBatchPlainLen(frames))
|
||
|
|
if err := writeBulkFastBatchPlain(buf, frames); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return buf, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func encodeBulkFastBatchPayloadFast(encode transportFastPlainEncoder, secretKey []byte, frames []bulkFastFrame) ([]byte, error) {
|
||
|
|
if encode == nil {
|
||
|
|
return nil, errTransportPayloadEncryptFailed
|
||
|
|
}
|
||
|
|
plainLen := bulkFastBatchPlainLen(frames)
|
||
|
|
return encode(secretKey, plainLen, func(dst []byte) error {
|
||
|
|
return writeBulkFastBatchPlain(dst, frames)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func encodeBulkFastBatchPayloadPooled(runtime *modernPSKCodecRuntime, frames []bulkFastFrame) ([]byte, func(), error) {
|
||
|
|
if runtime == nil {
|
||
|
|
return nil, nil, errTransportPayloadEncryptFailed
|
||
|
|
}
|
||
|
|
return runtime.sealFilledPayloadPooled(bulkFastBatchPlainLen(frames), func(dst []byte) error {
|
||
|
|
return writeBulkFastBatchPlain(dst, frames)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func writeBulkFastBatchPlain(dst []byte, frames []bulkFastFrame) error {
|
||
|
|
if len(frames) == 0 || len(dst) != bulkFastBatchPlainLen(frames) {
|
||
|
|
return errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
copy(dst[:4], bulkFastBatchMagic)
|
||
|
|
dst[4] = bulkFastBatchVersion
|
||
|
|
binary.BigEndian.PutUint32(dst[8:12], uint32(len(frames)))
|
||
|
|
offset := bulkFastBatchHeaderLen
|
||
|
|
for _, frame := range frames {
|
||
|
|
if frame.DataID == 0 {
|
||
|
|
return errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
dst[offset] = frame.Type
|
||
|
|
dst[offset+1] = frame.Flags
|
||
|
|
binary.BigEndian.PutUint64(dst[offset+4:offset+12], frame.DataID)
|
||
|
|
binary.BigEndian.PutUint64(dst[offset+12:offset+20], frame.Seq)
|
||
|
|
binary.BigEndian.PutUint32(dst[offset+20:offset+24], uint32(len(frame.Payload)))
|
||
|
|
offset += bulkFastBatchItemHeaderLen
|
||
|
|
copy(dst[offset:offset+len(frame.Payload)], frame.Payload)
|
||
|
|
offset += len(frame.Payload)
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func walkBulkFastBatchPlain(payload []byte, fn func(bulkFastFrame) error) (bool, error) {
|
||
|
|
if len(payload) < 4 || string(payload[:4]) != bulkFastBatchMagic {
|
||
|
|
return false, nil
|
||
|
|
}
|
||
|
|
if len(payload) < bulkFastBatchHeaderLen {
|
||
|
|
return true, errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
if payload[4] != bulkFastBatchVersion {
|
||
|
|
return true, errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
count := int(binary.BigEndian.Uint32(payload[8:12]))
|
||
|
|
if count <= 0 {
|
||
|
|
return true, errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
offset := bulkFastBatchHeaderLen
|
||
|
|
for index := 0; index < count; index++ {
|
||
|
|
if len(payload)-offset < bulkFastBatchItemHeaderLen {
|
||
|
|
return true, errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
frameType := payload[offset]
|
||
|
|
switch frameType {
|
||
|
|
case bulkFastPayloadTypeData, bulkFastPayloadTypeClose, bulkFastPayloadTypeReset, bulkFastPayloadTypeRelease:
|
||
|
|
default:
|
||
|
|
return true, errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
flags := payload[offset+1]
|
||
|
|
dataID := binary.BigEndian.Uint64(payload[offset+4 : offset+12])
|
||
|
|
seq := binary.BigEndian.Uint64(payload[offset+12 : offset+20])
|
||
|
|
payloadLen := int(binary.BigEndian.Uint32(payload[offset+20 : offset+24]))
|
||
|
|
offset += bulkFastBatchItemHeaderLen
|
||
|
|
if dataID == 0 || payloadLen < 0 || len(payload)-offset < payloadLen {
|
||
|
|
return true, errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
if fn != nil {
|
||
|
|
if err := fn(bulkFastFrame{
|
||
|
|
Type: frameType,
|
||
|
|
Flags: flags,
|
||
|
|
DataID: dataID,
|
||
|
|
Seq: seq,
|
||
|
|
Payload: payload[offset : offset+payloadLen],
|
||
|
|
}); err != nil {
|
||
|
|
return true, err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
offset += payloadLen
|
||
|
|
}
|
||
|
|
if offset != len(payload) {
|
||
|
|
return true, errBulkFastPayloadInvalid
|
||
|
|
}
|
||
|
|
return true, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func decodeBulkFastBatchPlain(payload []byte) ([]bulkFastFrame, bool, error) {
|
||
|
|
frames := make([]bulkFastFrame, 0, 1)
|
||
|
|
matched, err := walkBulkFastBatchPlain(payload, func(frame bulkFastFrame) error {
|
||
|
|
frames = append(frames, frame)
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
if !matched || err != nil {
|
||
|
|
return nil, matched, err
|
||
|
|
}
|
||
|
|
return frames, true, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func walkBulkFastFrames(payload []byte, fn func(bulkFastFrame) error) (bool, error) {
|
||
|
|
if matched, err := walkBulkFastBatchPlain(payload, fn); matched {
|
||
|
|
return true, err
|
||
|
|
}
|
||
|
|
frame, matched, err := decodeBulkFastFrame(payload)
|
||
|
|
if !matched || err != nil {
|
||
|
|
return matched, err
|
||
|
|
}
|
||
|
|
if fn != nil {
|
||
|
|
if err := fn(frame); err != nil {
|
||
|
|
return true, err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return true, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func decodeBulkFastFrames(payload []byte) ([]bulkFastFrame, bool, error) {
|
||
|
|
frames := make([]bulkFastFrame, 0, 1)
|
||
|
|
matched, err := walkBulkFastFrames(payload, func(frame bulkFastFrame) error {
|
||
|
|
frames = append(frames, frame)
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
if !matched || err != nil {
|
||
|
|
return nil, matched, err
|
||
|
|
}
|
||
|
|
return frames, true, nil
|
||
|
|
}
|