102 lines
2.5 KiB
Go
102 lines
2.5 KiB
Go
|
|
package stario
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
var ErrDeadlineExceeded = errors.New("deadline exceeded")
|
||
|
|
var ErrQueueDataFormat = errors.New("data format error")
|
||
|
|
var ErrQueueUnsupportedVersion = errors.New("unsupported frame version")
|
||
|
|
var ErrQueueUnsupportedFlags = errors.New("unsupported frame flags")
|
||
|
|
var ErrQueueMessageTooLarge = errors.New("message too large")
|
||
|
|
var ErrQueueFrameHandlerNil = errors.New("frame handler is nil")
|
||
|
|
var ErrQueueConnKeyInvalid = errors.New("queue conn key must be comparable")
|
||
|
|
var ErrQueueConnKeyNil = errors.New("queue conn key must not be nil")
|
||
|
|
|
||
|
|
const (
|
||
|
|
queMagicSize = 8
|
||
|
|
queHeaderSize = 14
|
||
|
|
queVersionV1 = 1
|
||
|
|
queSupportedFlags = 0
|
||
|
|
)
|
||
|
|
|
||
|
|
// 识别头
|
||
|
|
var queMagic = []byte{11, 27, 19, 96, 12, 25, 02, 20}
|
||
|
|
|
||
|
|
// MsgQueue 为基本的信息单位。
|
||
|
|
type MsgQueue struct {
|
||
|
|
// Deprecated: frame-level IDs are no longer emitted by StarQueue v2 framing.
|
||
|
|
ID uint16
|
||
|
|
|
||
|
|
Msg []byte
|
||
|
|
Conn interface{}
|
||
|
|
}
|
||
|
|
|
||
|
|
// FrameView exposes a parsed payload without forcing StarQueue to clone it.
|
||
|
|
//
|
||
|
|
// The payload is only valid during the ParseMessageView callback. Callers must
|
||
|
|
// copy it if they need to keep it after the callback returns.
|
||
|
|
type FrameView struct {
|
||
|
|
Payload []byte
|
||
|
|
Conn interface{}
|
||
|
|
}
|
||
|
|
|
||
|
|
// StarQueue 为流数据中的消息队列分发。
|
||
|
|
type StarQueue struct {
|
||
|
|
maxLength uint32
|
||
|
|
msgPool chan MsgQueue
|
||
|
|
states sync.Map
|
||
|
|
ctx context.Context
|
||
|
|
cancel context.CancelFunc
|
||
|
|
duration time.Duration
|
||
|
|
sendMu sync.RWMutex
|
||
|
|
stopOnce sync.Once
|
||
|
|
|
||
|
|
// Deprecated: new code should keep StarQueue focused on framing only.
|
||
|
|
Encode bool
|
||
|
|
// Deprecated: new code should keep StarQueue focused on framing only.
|
||
|
|
EncodeFunc func([]byte) []byte
|
||
|
|
// Deprecated: new code should keep StarQueue focused on framing only.
|
||
|
|
DecodeFunc func([]byte) []byte
|
||
|
|
}
|
||
|
|
|
||
|
|
type queHeader struct {
|
||
|
|
Length uint32
|
||
|
|
Version uint8
|
||
|
|
Flags uint8
|
||
|
|
}
|
||
|
|
|
||
|
|
type queConnState struct {
|
||
|
|
mu sync.Mutex
|
||
|
|
buf []byte
|
||
|
|
}
|
||
|
|
|
||
|
|
func NewQueueCtx(ctx context.Context, count int64, maxMsgLength uint32) *StarQueue {
|
||
|
|
if count < 0 {
|
||
|
|
panic("stario: negative queue count")
|
||
|
|
}
|
||
|
|
q := &StarQueue{
|
||
|
|
maxLength: maxMsgLength,
|
||
|
|
msgPool: make(chan MsgQueue, count),
|
||
|
|
}
|
||
|
|
if ctx == nil {
|
||
|
|
q.ctx, q.cancel = context.WithCancel(context.Background())
|
||
|
|
} else {
|
||
|
|
q.ctx, q.cancel = context.WithCancel(ctx)
|
||
|
|
}
|
||
|
|
context.AfterFunc(q.ctx, q.shutdown)
|
||
|
|
return q
|
||
|
|
}
|
||
|
|
|
||
|
|
func NewQueueWithCount(count int64) *StarQueue {
|
||
|
|
return NewQueueCtx(nil, count, 0)
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewQueue 建立一个新消息队列。
|
||
|
|
func NewQueue() *StarQueue {
|
||
|
|
return NewQueueWithCount(32)
|
||
|
|
}
|