package notify import ( "fmt" "net" "sync" ) const defaultInboundDispatchSource = "_notify.default_inbound_source" type inboundDispatcher struct { mu sync.Mutex closed bool workers map[string]*inboundDispatchWorker wg sync.WaitGroup } type inboundDispatchWorker struct { queue []func() running bool } func newInboundDispatcher() *inboundDispatcher { return &inboundDispatcher{ workers: make(map[string]*inboundDispatchWorker), } } func (d *inboundDispatcher) Dispatch(source string, fn func()) bool { if d == nil || fn == nil { return false } if source == "" { source = defaultInboundDispatchSource } d.mu.Lock() if d.closed { d.mu.Unlock() return false } worker := d.workers[source] if worker == nil { worker = &inboundDispatchWorker{} d.workers[source] = worker } worker.queue = append(worker.queue, fn) if worker.running { d.mu.Unlock() return true } worker.running = true d.wg.Add(1) d.mu.Unlock() go d.run(source, worker) return true } func (d *inboundDispatcher) run(source string, worker *inboundDispatchWorker) { defer d.wg.Done() for { d.mu.Lock() if len(worker.queue) == 0 { worker.running = false if current := d.workers[source]; current == worker { delete(d.workers, source) } d.mu.Unlock() return } fn := worker.queue[0] worker.queue[0] = nil worker.queue = worker.queue[1:] d.mu.Unlock() fn() } } func (d *inboundDispatcher) CloseAndWait() { if d == nil { return } d.mu.Lock() d.closed = true d.mu.Unlock() d.wg.Wait() } func clientInboundDispatchSource() string { return "client" } func serverInboundDispatchSource(source interface{}) string { switch data := source.(type) { case serverInboundSource: return serverInboundDispatchSourceKey(data) case *serverInboundSource: if data == nil { return defaultInboundDispatchSource } return serverInboundDispatchSourceKey(*data) case net.Conn: return fmt.Sprintf("conn:%p", data) case string: if data == "" { return defaultInboundDispatchSource } return "peer:" + data default: return defaultInboundDispatchSource } } func serverInboundDispatchSourceKey(source serverInboundSource) string { if source.Conn != nil { return fmt.Sprintf("conn:%p:%d", source.Conn, source.TransportGeneration) } if source.Logical != nil { return fmt.Sprintf("logical:%s:%d", source.Logical.ID(), source.TransportGeneration) } if source.Source != "" { return fmt.Sprintf("peer:%s:%d", source.Source, source.TransportGeneration) } if source.RemoteAddr != nil { return fmt.Sprintf("addr:%s:%d", source.RemoteAddr.String(), source.TransportGeneration) } return defaultInboundDispatchSource }