notify/pending_wait.go

247 lines
5.0 KiB
Go
Raw Permalink Normal View History

package notify
import (
"strings"
"sync"
"time"
)
const pendingWaitShardCount = 64
type pendingWaitShard struct {
mu sync.Mutex
waits map[uint64]*WaitMsg
}
type pendingWaitPool struct {
shards [pendingWaitShardCount]pendingWaitShard
}
func newPendingWaitPool() *pendingWaitPool {
pool := &pendingWaitPool{}
for i := range pool.shards {
pool.shards[i].waits = make(map[uint64]*WaitMsg)
}
return pool
}
func (p *pendingWaitPool) createAndStore(msg TransferMsg) WaitMsg {
return p.createAndStoreWithScope(msg, "")
}
func (p *pendingWaitPool) shard(id uint64) *pendingWaitShard {
if p == nil {
return nil
}
return &p.shards[id%pendingWaitShardCount]
}
func normalizePendingWaitScope(scope string) string {
return strings.TrimSpace(scope)
}
func (p *pendingWaitPool) createAndStoreWithScope(msg TransferMsg, scope string) WaitMsg {
wait := WaitMsg{
TransferMsg: msg,
Time: time.Now(),
Reply: make(chan Message, 1),
scope: normalizePendingWaitScope(scope),
}
if shard := p.shard(wait.ID); shard != nil {
shard.mu.Lock()
shard.waits[wait.ID] = &wait
shard.mu.Unlock()
}
return wait
}
func (p *pendingWaitPool) deliver(id uint64, message Message) bool {
return p.deliverWithScopes(id, nil, message)
}
func (p *pendingWaitPool) deliverWithScopes(id uint64, scopes []string, message Message) bool {
if p == nil {
return false
}
shard := p.shard(id)
if shard == nil {
return false
}
shard.mu.Lock()
wait := shard.waits[id]
if wait == nil || !pendingWaitScopeMatches(wait.scope, scopes) {
shard.mu.Unlock()
return false
}
delete(shard.waits, id)
shard.mu.Unlock()
return safeSendWaitMessage(wait.Reply, message)
}
func pendingWaitScopeMatches(waitScope string, scopes []string) bool {
waitScope = normalizePendingWaitScope(waitScope)
if waitScope == "" {
return true
}
for _, scope := range scopes {
if waitScope == normalizePendingWaitScope(scope) {
return true
}
}
return false
}
func (p *pendingWaitPool) removeAndClose(id uint64) {
if p == nil {
return
}
shard := p.shard(id)
if shard == nil {
return
}
shard.mu.Lock()
wait := shard.waits[id]
if wait != nil {
delete(shard.waits, id)
}
shard.mu.Unlock()
if wait == nil {
return
}
safeCloseWaitReply(wait.Reply)
}
func (p *pendingWaitPool) closeAll() {
if p == nil {
return
}
for i := range p.shards {
shard := &p.shards[i]
shard.mu.Lock()
waits := make([]*WaitMsg, 0, len(shard.waits))
for id, wait := range shard.waits {
delete(shard.waits, id)
if wait != nil {
waits = append(waits, wait)
}
}
shard.mu.Unlock()
for _, wait := range waits {
safeCloseWaitReply(wait.Reply)
}
}
}
func (p *pendingWaitPool) closeScope(scope string) {
if p == nil {
return
}
scope = normalizePendingWaitScope(scope)
for i := range p.shards {
shard := &p.shards[i]
shard.mu.Lock()
waits := make([]*WaitMsg, 0)
for id, wait := range shard.waits {
if wait != nil && wait.scope == scope {
delete(shard.waits, id)
waits = append(waits, wait)
}
}
shard.mu.Unlock()
for _, wait := range waits {
safeCloseWaitReply(wait.Reply)
}
}
}
func (p *pendingWaitPool) closeServerScopeFamily(scope string) {
if p == nil {
return
}
base := normalizeFileScope(scope)
for i := range p.shards {
shard := &p.shards[i]
shard.mu.Lock()
waits := make([]*WaitMsg, 0)
for id, wait := range shard.waits {
if wait != nil && scopeBelongsToServerFileScope(wait.scope, base) {
delete(shard.waits, id)
waits = append(waits, wait)
}
}
shard.mu.Unlock()
for _, wait := range waits {
safeCloseWaitReply(wait.Reply)
}
}
}
func (p *pendingWaitPool) cleanupExpired(maxKeepSeconds int64, now time.Time) {
if p == nil || maxKeepSeconds <= 0 {
return
}
maxKeep := time.Duration(maxKeepSeconds) * time.Second
for i := range p.shards {
shard := &p.shards[i]
shard.mu.Lock()
waits := make([]*WaitMsg, 0)
for id, wait := range shard.waits {
if wait != nil && wait.Time.Add(maxKeep).Before(now) {
delete(shard.waits, id)
waits = append(waits, wait)
}
}
shard.mu.Unlock()
for _, wait := range waits {
safeCloseWaitReply(wait.Reply)
}
}
}
func safeSendWaitMessage(ch chan Message, message Message) (sent bool) {
defer func() {
if recover() != nil {
sent = false
}
}()
select {
case ch <- message:
return true
default:
return false
}
}
func safeCloseWaitReply(ch chan Message) {
defer func() {
_ = recover()
}()
close(ch)
}
func pendingWaitClosedError(stopCh <-chan struct{}) error {
return pendingWaitClosedErrorWith(stopCh, nil)
}
func pendingWaitClosedErrorWith(stopCh <-chan struct{}, detached error) error {
if stopCh != nil {
select {
case <-stopCh:
return errServiceShutdown
default:
}
}
if detached != nil {
return detached
}
return errTransportDetached
}
func (c *ClientCommon) getPendingWaitPool() *pendingWaitPool {
return c.getLogicalSessionState().pendingWaits
}
func (s *ServerCommon) getPendingWaitPool() *pendingWaitPool {
return s.getLogicalSessionState().pendingWaits
}