81 lines
1.8 KiB
Go
81 lines
1.8 KiB
Go
|
|
package stario
|
||
|
|
|
||
|
|
import (
|
||
|
|
"os"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
func (q *StarQueue) closedRestoreErr() error {
|
||
|
|
if err := q.ctx.Err(); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
return os.ErrClosed
|
||
|
|
}
|
||
|
|
|
||
|
|
// Restore blocks until one message is available, the queue is stopped, or the
|
||
|
|
// configured timeout expires.
|
||
|
|
func (q *StarQueue) Restore() (MsgQueue, error) {
|
||
|
|
if q.duration <= 0 {
|
||
|
|
select {
|
||
|
|
case <-q.ctx.Done():
|
||
|
|
return MsgQueue{}, q.ctx.Err()
|
||
|
|
case data, ok := <-q.msgPool:
|
||
|
|
if !ok {
|
||
|
|
return MsgQueue{}, q.closedRestoreErr()
|
||
|
|
}
|
||
|
|
return data, nil
|
||
|
|
}
|
||
|
|
}
|
||
|
|
timer := time.NewTimer(q.duration)
|
||
|
|
defer timer.Stop()
|
||
|
|
select {
|
||
|
|
case <-q.ctx.Done():
|
||
|
|
return MsgQueue{}, q.ctx.Err()
|
||
|
|
case <-timer.C:
|
||
|
|
return MsgQueue{}, ErrDeadlineExceeded
|
||
|
|
case data, ok := <-q.msgPool:
|
||
|
|
if !ok {
|
||
|
|
return MsgQueue{}, q.closedRestoreErr()
|
||
|
|
}
|
||
|
|
return data, nil
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// RestoreOne 获取收到的一个信息。
|
||
|
|
// 兼容性修改。
|
||
|
|
func (q *StarQueue) RestoreOne() (MsgQueue, error) {
|
||
|
|
return q.Restore()
|
||
|
|
}
|
||
|
|
|
||
|
|
// Stop cancels the queue runtime.
|
||
|
|
//
|
||
|
|
// After Stop returns, Restore unblocks with context.Canceled and RestoreChan is
|
||
|
|
// eventually closed.
|
||
|
|
func (q *StarQueue) Stop() {
|
||
|
|
q.cancel()
|
||
|
|
q.shutdown()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (q *StarQueue) shutdown() {
|
||
|
|
q.stopOnce.Do(func() {
|
||
|
|
q.sendMu.Lock()
|
||
|
|
close(q.msgPool)
|
||
|
|
q.sendMu.Unlock()
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
// RestoreDuration sets the Restore timeout. A non-positive duration means wait
|
||
|
|
// forever until a message arrives or Stop is called.
|
||
|
|
func (q *StarQueue) RestoreDuration(tm time.Duration) {
|
||
|
|
q.duration = tm
|
||
|
|
}
|
||
|
|
|
||
|
|
// RestoreChan exposes the parsed message stream.
|
||
|
|
//
|
||
|
|
// The returned channel is closed after Stop or when the queue context is
|
||
|
|
// canceled. New code should still prefer Restore when it needs timeout/error
|
||
|
|
// classification instead of a plain stream.
|
||
|
|
func (q *StarQueue) RestoreChan() <-chan MsgQueue {
|
||
|
|
return q.msgPool
|
||
|
|
}
|