notify/inbound_dispatcher.go

128 lines
2.6 KiB
Go
Raw Permalink Normal View History

package notify
import (
"fmt"
"net"
"sync"
)
const defaultInboundDispatchSource = "_notify.default_inbound_source"
type inboundDispatcher struct {
mu sync.Mutex
closed bool
workers map[string]*inboundDispatchWorker
wg sync.WaitGroup
}
type inboundDispatchWorker struct {
queue []func()
running bool
}
func newInboundDispatcher() *inboundDispatcher {
return &inboundDispatcher{
workers: make(map[string]*inboundDispatchWorker),
}
}
func (d *inboundDispatcher) Dispatch(source string, fn func()) bool {
if d == nil || fn == nil {
return false
}
if source == "" {
source = defaultInboundDispatchSource
}
d.mu.Lock()
if d.closed {
d.mu.Unlock()
return false
}
worker := d.workers[source]
if worker == nil {
worker = &inboundDispatchWorker{}
d.workers[source] = worker
}
worker.queue = append(worker.queue, fn)
if worker.running {
d.mu.Unlock()
return true
}
worker.running = true
d.wg.Add(1)
d.mu.Unlock()
go d.run(source, worker)
return true
}
func (d *inboundDispatcher) run(source string, worker *inboundDispatchWorker) {
defer d.wg.Done()
for {
d.mu.Lock()
if len(worker.queue) == 0 {
worker.running = false
if current := d.workers[source]; current == worker {
delete(d.workers, source)
}
d.mu.Unlock()
return
}
fn := worker.queue[0]
worker.queue[0] = nil
worker.queue = worker.queue[1:]
d.mu.Unlock()
fn()
}
}
func (d *inboundDispatcher) CloseAndWait() {
if d == nil {
return
}
d.mu.Lock()
d.closed = true
d.mu.Unlock()
d.wg.Wait()
}
func clientInboundDispatchSource() string {
return "client"
}
func serverInboundDispatchSource(source interface{}) string {
switch data := source.(type) {
case serverInboundSource:
return serverInboundDispatchSourceKey(data)
case *serverInboundSource:
if data == nil {
return defaultInboundDispatchSource
}
return serverInboundDispatchSourceKey(*data)
case net.Conn:
return fmt.Sprintf("conn:%p", data)
case string:
if data == "" {
return defaultInboundDispatchSource
}
return "peer:" + data
default:
return defaultInboundDispatchSource
}
}
func serverInboundDispatchSourceKey(source serverInboundSource) string {
if source.Conn != nil {
return fmt.Sprintf("conn:%p:%d", source.Conn, source.TransportGeneration)
}
if source.Logical != nil {
return fmt.Sprintf("logical:%s:%d", source.Logical.ID(), source.TransportGeneration)
}
if source.Source != "" {
return fmt.Sprintf("peer:%s:%d", source.Source, source.TransportGeneration)
}
if source.RemoteAddr != nil {
return fmt.Sprintf("addr:%s:%d", source.RemoteAddr.String(), source.TransportGeneration)
}
return defaultInboundDispatchSource
}