stario/que_runtime.go

81 lines
1.8 KiB
Go
Raw Permalink Normal View History

package stario
import (
"os"
"time"
)
func (q *StarQueue) closedRestoreErr() error {
if err := q.ctx.Err(); err != nil {
return err
}
return os.ErrClosed
}
// Restore blocks until one message is available, the queue is stopped, or the
// configured timeout expires.
func (q *StarQueue) Restore() (MsgQueue, error) {
if q.duration <= 0 {
select {
case <-q.ctx.Done():
return MsgQueue{}, q.ctx.Err()
case data, ok := <-q.msgPool:
if !ok {
return MsgQueue{}, q.closedRestoreErr()
}
return data, nil
}
}
timer := time.NewTimer(q.duration)
defer timer.Stop()
select {
case <-q.ctx.Done():
return MsgQueue{}, q.ctx.Err()
case <-timer.C:
return MsgQueue{}, ErrDeadlineExceeded
case data, ok := <-q.msgPool:
if !ok {
return MsgQueue{}, q.closedRestoreErr()
}
return data, nil
}
}
// RestoreOne 获取收到的一个信息。
// 兼容性修改。
func (q *StarQueue) RestoreOne() (MsgQueue, error) {
return q.Restore()
}
// Stop cancels the queue runtime.
//
// After Stop returns, Restore unblocks with context.Canceled and RestoreChan is
// eventually closed.
func (q *StarQueue) Stop() {
q.cancel()
q.shutdown()
}
func (q *StarQueue) shutdown() {
q.stopOnce.Do(func() {
q.sendMu.Lock()
close(q.msgPool)
q.sendMu.Unlock()
})
}
// RestoreDuration sets the Restore timeout. A non-positive duration means wait
// forever until a message arrives or Stop is called.
func (q *StarQueue) RestoreDuration(tm time.Duration) {
q.duration = tm
}
// RestoreChan exposes the parsed message stream.
//
// The returned channel is closed after Stop or when the queue context is
// canceled. New code should still prefer Restore when it needs timeout/error
// classification instead of a plain stream.
func (q *StarQueue) RestoreChan() <-chan MsgQueue {
return q.msgPool
}