notify/stream_fastpath.go

128 lines
3.9 KiB
Go
Raw Permalink Normal View History

package notify
import (
"encoding/binary"
"errors"
)
var (
errStreamFastPayloadInvalid = errors.New("invalid stream fast payload")
errStreamFastDataIDEmpty = errors.New("stream data id is empty")
)
const (
streamFastPayloadMagic = "NSF1"
streamFastPayloadVersion = 1
streamFastPayloadTypeData = 1
streamFastPayloadHeaderLen = 28
)
type streamFastDataFrame struct {
Flags uint8
DataID uint64
Seq uint64
Payload []byte
}
func encodeStreamFastDataFrameHeader(dst []byte, dataID uint64, seq uint64, payloadLen int) error {
if dataID == 0 {
return errStreamFastDataIDEmpty
}
if len(dst) < streamFastPayloadHeaderLen {
return errStreamFastPayloadInvalid
}
copy(dst[:4], streamFastPayloadMagic)
dst[4] = streamFastPayloadVersion
dst[5] = streamFastPayloadTypeData
dst[6] = 0
dst[7] = 0
binary.BigEndian.PutUint64(dst[8:16], dataID)
binary.BigEndian.PutUint64(dst[16:24], seq)
binary.BigEndian.PutUint32(dst[24:28], uint32(payloadLen))
return nil
}
func encodeStreamFastDataFrame(dataID uint64, seq uint64, payload []byte) ([]byte, error) {
frame := make([]byte, streamFastPayloadHeaderLen+len(payload))
if err := encodeStreamFastDataFrameHeader(frame, dataID, seq, len(payload)); err != nil {
return nil, err
}
copy(frame[streamFastPayloadHeaderLen:], payload)
return frame, nil
}
func decodeStreamFastDataFrame(payload []byte) (streamFastDataFrame, bool, error) {
if len(payload) < 4 || string(payload[:4]) != streamFastPayloadMagic {
return streamFastDataFrame{}, false, nil
}
if len(payload) < streamFastPayloadHeaderLen {
return streamFastDataFrame{}, true, errStreamFastPayloadInvalid
}
if payload[4] != streamFastPayloadVersion || payload[5] != streamFastPayloadTypeData {
return streamFastDataFrame{}, true, errStreamFastPayloadInvalid
}
dataLen := int(binary.BigEndian.Uint32(payload[24:28]))
if dataLen < 0 || len(payload) != streamFastPayloadHeaderLen+dataLen {
return streamFastDataFrame{}, true, errStreamFastPayloadInvalid
}
dataID := binary.BigEndian.Uint64(payload[8:16])
if dataID == 0 {
return streamFastDataFrame{}, true, errStreamFastPayloadInvalid
}
return streamFastDataFrame{
Flags: payload[6],
DataID: dataID,
Seq: binary.BigEndian.Uint64(payload[16:24]),
Payload: payload[streamFastPayloadHeaderLen:],
}, true, nil
}
func (c *ClientCommon) encodeFastStreamDataPayload(dataID uint64, seq uint64, chunk []byte) ([]byte, error) {
if c != nil && c.fastStreamEncode != nil {
return c.fastStreamEncode(c.SecretKey, dataID, seq, chunk)
}
plain, err := encodeStreamFastDataFrame(dataID, seq, chunk)
if err != nil {
return nil, err
}
return c.encryptTransportPayload(plain)
}
func (c *ClientCommon) sendFastStreamData(dataID uint64, seq uint64, chunk []byte) error {
payload, err := c.encodeFastStreamDataPayload(dataID, seq, chunk)
if err != nil {
return err
}
return c.writePayloadToTransport(payload)
}
func (s *ServerCommon) encodeFastStreamDataPayloadLogical(logical *LogicalConn, dataID uint64, seq uint64, chunk []byte) ([]byte, error) {
if logical != nil {
if fastStreamEncode := logical.fastStreamEncodeSnapshot(); fastStreamEncode != nil {
return fastStreamEncode(logical.secretKeySnapshot(), dataID, seq, chunk)
}
}
plain, err := encodeStreamFastDataFrame(dataID, seq, chunk)
if err != nil {
return nil, err
}
return s.encryptTransportPayloadLogical(logical, plain)
}
func (s *ServerCommon) sendFastStreamDataTransport(logical *LogicalConn, transport *TransportConn, dataID uint64, seq uint64, chunk []byte) error {
if err := s.ensureServerTransportSendReady(transport); err != nil {
return err
}
if logical == nil && transport != nil {
logical = transport.logicalConnSnapshot()
}
if logical == nil {
return errTransportDetached
}
payload, err := s.encodeFastStreamDataPayloadLogical(logical, dataID, seq, chunk)
if err != nil {
return err
}
return s.writeEnvelopePayload(logical, transport, nil, payload)
}