package stario import ( "os" "time" ) func (q *StarQueue) closedRestoreErr() error { if err := q.ctx.Err(); err != nil { return err } return os.ErrClosed } // Restore blocks until one message is available, the queue is stopped, or the // configured timeout expires. func (q *StarQueue) Restore() (MsgQueue, error) { if q.duration <= 0 { select { case <-q.ctx.Done(): return MsgQueue{}, q.ctx.Err() case data, ok := <-q.msgPool: if !ok { return MsgQueue{}, q.closedRestoreErr() } return data, nil } } timer := time.NewTimer(q.duration) defer timer.Stop() select { case <-q.ctx.Done(): return MsgQueue{}, q.ctx.Err() case <-timer.C: return MsgQueue{}, ErrDeadlineExceeded case data, ok := <-q.msgPool: if !ok { return MsgQueue{}, q.closedRestoreErr() } return data, nil } } // RestoreOne 获取收到的一个信息。 // 兼容性修改。 func (q *StarQueue) RestoreOne() (MsgQueue, error) { return q.Restore() } // Stop cancels the queue runtime. // // After Stop returns, Restore unblocks with context.Canceled and RestoreChan is // eventually closed. func (q *StarQueue) Stop() { q.cancel() q.shutdown() } func (q *StarQueue) shutdown() { q.stopOnce.Do(func() { q.sendMu.Lock() close(q.msgPool) q.sendMu.Unlock() }) } // RestoreDuration sets the Restore timeout. A non-positive duration means wait // forever until a message arrives or Stop is called. func (q *StarQueue) RestoreDuration(tm time.Duration) { q.duration = tm } // RestoreChan exposes the parsed message stream. // // The returned channel is closed after Stop or when the queue context is // canceled. New code should still prefer Restore when it needs timeout/error // classification instead of a plain stream. func (q *StarQueue) RestoreChan() <-chan MsgQueue { return q.msgPool }