package stario import ( "context" "errors" "sync" "time" ) var ErrDeadlineExceeded = errors.New("deadline exceeded") var ErrQueueDataFormat = errors.New("data format error") var ErrQueueUnsupportedVersion = errors.New("unsupported frame version") var ErrQueueUnsupportedFlags = errors.New("unsupported frame flags") var ErrQueueMessageTooLarge = errors.New("message too large") var ErrQueueFrameHandlerNil = errors.New("frame handler is nil") var ErrQueueConnKeyInvalid = errors.New("queue conn key must be comparable") var ErrQueueConnKeyNil = errors.New("queue conn key must not be nil") const ( queMagicSize = 8 queHeaderSize = 14 queVersionV1 = 1 queSupportedFlags = 0 ) // 识别头 var queMagic = []byte{11, 27, 19, 96, 12, 25, 02, 20} // MsgQueue 为基本的信息单位。 type MsgQueue struct { // Deprecated: frame-level IDs are no longer emitted by StarQueue v2 framing. ID uint16 Msg []byte Conn interface{} } // FrameView exposes a parsed payload without forcing StarQueue to clone it. // // The payload is only valid during the ParseMessageView callback. Callers must // copy it if they need to keep it after the callback returns. type FrameView struct { Payload []byte Conn interface{} } // StarQueue 为流数据中的消息队列分发。 type StarQueue struct { maxLength uint32 msgPool chan MsgQueue states sync.Map ctx context.Context cancel context.CancelFunc duration time.Duration sendMu sync.RWMutex stopOnce sync.Once // Deprecated: new code should keep StarQueue focused on framing only. Encode bool // Deprecated: new code should keep StarQueue focused on framing only. EncodeFunc func([]byte) []byte // Deprecated: new code should keep StarQueue focused on framing only. DecodeFunc func([]byte) []byte } type queHeader struct { Length uint32 Version uint8 Flags uint8 } type queConnState struct { mu sync.Mutex buf []byte } func NewQueueCtx(ctx context.Context, count int64, maxMsgLength uint32) *StarQueue { if count < 0 { panic("stario: negative queue count") } q := &StarQueue{ maxLength: maxMsgLength, msgPool: make(chan MsgQueue, count), } if ctx == nil { q.ctx, q.cancel = context.WithCancel(context.Background()) } else { q.ctx, q.cancel = context.WithCancel(ctx) } context.AfterFunc(q.ctx, q.shutdown) return q } func NewQueueWithCount(count int64) *StarQueue { return NewQueueCtx(nil, count, 0) } // NewQueue 建立一个新消息队列。 func NewQueue() *StarQueue { return NewQueueWithCount(32) }