package notify import ( "encoding/binary" ) const ( bulkFastPathVersionV1 = 1 bulkFastPathVersionV2 = 2 bulkFastPathVersionCurrent = bulkFastPathVersionV2 ) const ( bulkFastBatchMagic = "NBF2" bulkFastBatchVersion = 1 bulkFastBatchHeaderLen = 12 bulkFastBatchItemHeaderLen = 24 bulkFastBatchMaxItems = 64 bulkFastBatchMaxPlainBytes = 8 * 1024 * 1024 ) func normalizeBulkFastPathVersion(version uint8) uint8 { if version < bulkFastPathVersionV1 { return bulkFastPathVersionV1 } if version > bulkFastPathVersionCurrent { return bulkFastPathVersionCurrent } return version } func negotiateBulkFastPathVersion(version uint8) uint8 { return normalizeBulkFastPathVersion(version) } func bulkFastPathSupportsSharedBatch(version uint8) bool { return normalizeBulkFastPathVersion(version) >= bulkFastPathVersionV2 } func bulkFastBatchFrameLen(frame bulkFastFrame) int { return bulkFastBatchItemHeaderLen + len(frame.Payload) } func bulkFastBatchPlainLen(frames []bulkFastFrame) int { total := bulkFastBatchHeaderLen for _, frame := range frames { total += bulkFastBatchFrameLen(frame) } return total } func encodeBulkFastFramePayload(frame bulkFastFrame) ([]byte, error) { return encodeBulkFastControlFrame(frame.Type, frame.Flags, frame.DataID, frame.Seq, frame.Payload) } func encodeBulkFastFramePayloadFast(encode transportFastPlainEncoder, secretKey []byte, frame bulkFastFrame) ([]byte, error) { if encode == nil { return nil, errTransportPayloadEncryptFailed } plainLen := bulkFastPayloadHeaderLen + len(frame.Payload) return encode(secretKey, plainLen, func(dst []byte) error { if err := encodeBulkFastFrameHeader(dst, frame.Type, frame.Flags, frame.DataID, frame.Seq, len(frame.Payload)); err != nil { return err } copy(dst[bulkFastPayloadHeaderLen:], frame.Payload) return nil }) } func encodeBulkFastFramePayloadPooled(runtime *modernPSKCodecRuntime, frame bulkFastFrame) ([]byte, func(), error) { if runtime == nil { return nil, nil, errTransportPayloadEncryptFailed } plainLen := bulkFastPayloadHeaderLen + len(frame.Payload) return runtime.sealFilledPayloadPooled(plainLen, func(dst []byte) error { if err := encodeBulkFastFrameHeader(dst, frame.Type, frame.Flags, frame.DataID, frame.Seq, len(frame.Payload)); err != nil { return err } copy(dst[bulkFastPayloadHeaderLen:], frame.Payload) return nil }) } func encodeBulkFastBatchPlain(frames []bulkFastFrame) ([]byte, error) { if len(frames) == 0 { return nil, errBulkFastPayloadInvalid } buf := make([]byte, bulkFastBatchPlainLen(frames)) if err := writeBulkFastBatchPlain(buf, frames); err != nil { return nil, err } return buf, nil } func encodeBulkFastBatchPayloadFast(encode transportFastPlainEncoder, secretKey []byte, frames []bulkFastFrame) ([]byte, error) { if encode == nil { return nil, errTransportPayloadEncryptFailed } plainLen := bulkFastBatchPlainLen(frames) return encode(secretKey, plainLen, func(dst []byte) error { return writeBulkFastBatchPlain(dst, frames) }) } func encodeBulkFastBatchPayloadPooled(runtime *modernPSKCodecRuntime, frames []bulkFastFrame) ([]byte, func(), error) { if runtime == nil { return nil, nil, errTransportPayloadEncryptFailed } return runtime.sealFilledPayloadPooled(bulkFastBatchPlainLen(frames), func(dst []byte) error { return writeBulkFastBatchPlain(dst, frames) }) } func writeBulkFastBatchPlain(dst []byte, frames []bulkFastFrame) error { if len(frames) == 0 || len(dst) != bulkFastBatchPlainLen(frames) { return errBulkFastPayloadInvalid } copy(dst[:4], bulkFastBatchMagic) dst[4] = bulkFastBatchVersion binary.BigEndian.PutUint32(dst[8:12], uint32(len(frames))) offset := bulkFastBatchHeaderLen for _, frame := range frames { if frame.DataID == 0 { return errBulkFastPayloadInvalid } dst[offset] = frame.Type dst[offset+1] = frame.Flags binary.BigEndian.PutUint64(dst[offset+4:offset+12], frame.DataID) binary.BigEndian.PutUint64(dst[offset+12:offset+20], frame.Seq) binary.BigEndian.PutUint32(dst[offset+20:offset+24], uint32(len(frame.Payload))) offset += bulkFastBatchItemHeaderLen copy(dst[offset:offset+len(frame.Payload)], frame.Payload) offset += len(frame.Payload) } return nil } func walkBulkFastBatchPlain(payload []byte, fn func(bulkFastFrame) error) (bool, error) { if len(payload) < 4 || string(payload[:4]) != bulkFastBatchMagic { return false, nil } if len(payload) < bulkFastBatchHeaderLen { return true, errBulkFastPayloadInvalid } if payload[4] != bulkFastBatchVersion { return true, errBulkFastPayloadInvalid } count := int(binary.BigEndian.Uint32(payload[8:12])) if count <= 0 { return true, errBulkFastPayloadInvalid } offset := bulkFastBatchHeaderLen for index := 0; index < count; index++ { if len(payload)-offset < bulkFastBatchItemHeaderLen { return true, errBulkFastPayloadInvalid } frameType := payload[offset] switch frameType { case bulkFastPayloadTypeData, bulkFastPayloadTypeClose, bulkFastPayloadTypeReset, bulkFastPayloadTypeRelease: default: return true, errBulkFastPayloadInvalid } flags := payload[offset+1] dataID := binary.BigEndian.Uint64(payload[offset+4 : offset+12]) seq := binary.BigEndian.Uint64(payload[offset+12 : offset+20]) payloadLen := int(binary.BigEndian.Uint32(payload[offset+20 : offset+24])) offset += bulkFastBatchItemHeaderLen if dataID == 0 || payloadLen < 0 || len(payload)-offset < payloadLen { return true, errBulkFastPayloadInvalid } if fn != nil { if err := fn(bulkFastFrame{ Type: frameType, Flags: flags, DataID: dataID, Seq: seq, Payload: payload[offset : offset+payloadLen], }); err != nil { return true, err } } offset += payloadLen } if offset != len(payload) { return true, errBulkFastPayloadInvalid } return true, nil } func decodeBulkFastBatchPlain(payload []byte) ([]bulkFastFrame, bool, error) { frames := make([]bulkFastFrame, 0, 1) matched, err := walkBulkFastBatchPlain(payload, func(frame bulkFastFrame) error { frames = append(frames, frame) return nil }) if !matched || err != nil { return nil, matched, err } return frames, true, nil } func walkBulkFastFrames(payload []byte, fn func(bulkFastFrame) error) (bool, error) { if matched, err := walkBulkFastBatchPlain(payload, fn); matched { return true, err } frame, matched, err := decodeBulkFastFrame(payload) if !matched || err != nil { return matched, err } if fn != nil { if err := fn(frame); err != nil { return true, err } } return true, nil } func decodeBulkFastFrames(payload []byte) ([]bulkFastFrame, bool, error) { frames := make([]bulkFastFrame, 0, 1) matched, err := walkBulkFastFrames(payload, func(frame bulkFastFrame) error { frames = append(frames, frame) return nil }) if !matched || err != nil { return nil, matched, err } return frames, true, nil }