128 lines
3.9 KiB
Go
128 lines
3.9 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"encoding/binary"
|
||
|
|
"errors"
|
||
|
|
)
|
||
|
|
|
||
|
|
var (
|
||
|
|
errStreamFastPayloadInvalid = errors.New("invalid stream fast payload")
|
||
|
|
errStreamFastDataIDEmpty = errors.New("stream data id is empty")
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
streamFastPayloadMagic = "NSF1"
|
||
|
|
streamFastPayloadVersion = 1
|
||
|
|
streamFastPayloadTypeData = 1
|
||
|
|
streamFastPayloadHeaderLen = 28
|
||
|
|
)
|
||
|
|
|
||
|
|
type streamFastDataFrame struct {
|
||
|
|
Flags uint8
|
||
|
|
DataID uint64
|
||
|
|
Seq uint64
|
||
|
|
Payload []byte
|
||
|
|
}
|
||
|
|
|
||
|
|
func encodeStreamFastDataFrameHeader(dst []byte, dataID uint64, seq uint64, payloadLen int) error {
|
||
|
|
if dataID == 0 {
|
||
|
|
return errStreamFastDataIDEmpty
|
||
|
|
}
|
||
|
|
if len(dst) < streamFastPayloadHeaderLen {
|
||
|
|
return errStreamFastPayloadInvalid
|
||
|
|
}
|
||
|
|
copy(dst[:4], streamFastPayloadMagic)
|
||
|
|
dst[4] = streamFastPayloadVersion
|
||
|
|
dst[5] = streamFastPayloadTypeData
|
||
|
|
dst[6] = 0
|
||
|
|
dst[7] = 0
|
||
|
|
binary.BigEndian.PutUint64(dst[8:16], dataID)
|
||
|
|
binary.BigEndian.PutUint64(dst[16:24], seq)
|
||
|
|
binary.BigEndian.PutUint32(dst[24:28], uint32(payloadLen))
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func encodeStreamFastDataFrame(dataID uint64, seq uint64, payload []byte) ([]byte, error) {
|
||
|
|
frame := make([]byte, streamFastPayloadHeaderLen+len(payload))
|
||
|
|
if err := encodeStreamFastDataFrameHeader(frame, dataID, seq, len(payload)); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
copy(frame[streamFastPayloadHeaderLen:], payload)
|
||
|
|
return frame, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func decodeStreamFastDataFrame(payload []byte) (streamFastDataFrame, bool, error) {
|
||
|
|
if len(payload) < 4 || string(payload[:4]) != streamFastPayloadMagic {
|
||
|
|
return streamFastDataFrame{}, false, nil
|
||
|
|
}
|
||
|
|
if len(payload) < streamFastPayloadHeaderLen {
|
||
|
|
return streamFastDataFrame{}, true, errStreamFastPayloadInvalid
|
||
|
|
}
|
||
|
|
if payload[4] != streamFastPayloadVersion || payload[5] != streamFastPayloadTypeData {
|
||
|
|
return streamFastDataFrame{}, true, errStreamFastPayloadInvalid
|
||
|
|
}
|
||
|
|
dataLen := int(binary.BigEndian.Uint32(payload[24:28]))
|
||
|
|
if dataLen < 0 || len(payload) != streamFastPayloadHeaderLen+dataLen {
|
||
|
|
return streamFastDataFrame{}, true, errStreamFastPayloadInvalid
|
||
|
|
}
|
||
|
|
dataID := binary.BigEndian.Uint64(payload[8:16])
|
||
|
|
if dataID == 0 {
|
||
|
|
return streamFastDataFrame{}, true, errStreamFastPayloadInvalid
|
||
|
|
}
|
||
|
|
return streamFastDataFrame{
|
||
|
|
Flags: payload[6],
|
||
|
|
DataID: dataID,
|
||
|
|
Seq: binary.BigEndian.Uint64(payload[16:24]),
|
||
|
|
Payload: payload[streamFastPayloadHeaderLen:],
|
||
|
|
}, true, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) encodeFastStreamDataPayload(dataID uint64, seq uint64, chunk []byte) ([]byte, error) {
|
||
|
|
if c != nil && c.fastStreamEncode != nil {
|
||
|
|
return c.fastStreamEncode(c.SecretKey, dataID, seq, chunk)
|
||
|
|
}
|
||
|
|
plain, err := encodeStreamFastDataFrame(dataID, seq, chunk)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return c.encryptTransportPayload(plain)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) sendFastStreamData(dataID uint64, seq uint64, chunk []byte) error {
|
||
|
|
payload, err := c.encodeFastStreamDataPayload(dataID, seq, chunk)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
return c.writePayloadToTransport(payload)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) encodeFastStreamDataPayloadLogical(logical *LogicalConn, dataID uint64, seq uint64, chunk []byte) ([]byte, error) {
|
||
|
|
if logical != nil {
|
||
|
|
if fastStreamEncode := logical.fastStreamEncodeSnapshot(); fastStreamEncode != nil {
|
||
|
|
return fastStreamEncode(logical.secretKeySnapshot(), dataID, seq, chunk)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
plain, err := encodeStreamFastDataFrame(dataID, seq, chunk)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return s.encryptTransportPayloadLogical(logical, plain)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) sendFastStreamDataTransport(logical *LogicalConn, transport *TransportConn, dataID uint64, seq uint64, chunk []byte) error {
|
||
|
|
if err := s.ensureServerTransportSendReady(transport); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if logical == nil && transport != nil {
|
||
|
|
logical = transport.logicalConnSnapshot()
|
||
|
|
}
|
||
|
|
if logical == nil {
|
||
|
|
return errTransportDetached
|
||
|
|
}
|
||
|
|
payload, err := s.encodeFastStreamDataPayloadLogical(logical, dataID, seq, chunk)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
return s.writeEnvelopePayload(logical, transport, nil, payload)
|
||
|
|
}
|