2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type streamRuntime struct {
|
|
|
|
|
rolePrefix string
|
|
|
|
|
seq atomic.Uint64
|
|
|
|
|
dataSeq atomic.Uint64
|
|
|
|
|
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
handler func(StreamAcceptInfo) error
|
|
|
|
|
streams map[string]*streamHandle
|
2026-04-18 16:05:57 +08:00
|
|
|
data map[string]map[uint64]*streamHandle
|
2026-04-15 15:24:36 +08:00
|
|
|
cfg streamConfig
|
|
|
|
|
flow *streamFlowController
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newStreamRuntime(rolePrefix string) *streamRuntime {
|
|
|
|
|
cfg := defaultStreamConfig()
|
|
|
|
|
return &streamRuntime{
|
|
|
|
|
rolePrefix: rolePrefix,
|
|
|
|
|
streams: make(map[string]*streamHandle),
|
2026-04-18 16:05:57 +08:00
|
|
|
data: make(map[string]map[uint64]*streamHandle),
|
2026-04-15 15:24:36 +08:00
|
|
|
cfg: cfg,
|
|
|
|
|
flow: newStreamFlowController(cfg),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) nextID() string {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
return fmt.Sprintf("%s-%d", r.rolePrefix, r.seq.Add(1))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) nextDataID() uint64 {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
return r.dataSeq.Add(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) setHandler(fn func(StreamAcceptInfo) error) {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
r.handler = fn
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) handlerSnapshot() func(StreamAcceptInfo) error {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
defer r.mu.RUnlock()
|
|
|
|
|
return r.handler
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) register(scope string, stream *streamHandle) error {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return errStreamRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
if stream == nil || stream.id == "" {
|
|
|
|
|
return errStreamIDEmpty
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
scope = normalizeFileScope(scope)
|
2026-04-15 15:24:36 +08:00
|
|
|
key := streamRuntimeKey(scope, stream.id)
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
if _, ok := r.streams[key]; ok {
|
|
|
|
|
return errStreamAlreadyExists
|
|
|
|
|
}
|
|
|
|
|
if stream.dataID != 0 {
|
2026-04-18 16:05:57 +08:00
|
|
|
dataScope := r.data[scope]
|
|
|
|
|
if dataScope == nil {
|
|
|
|
|
dataScope = make(map[uint64]*streamHandle)
|
|
|
|
|
r.data[scope] = dataScope
|
|
|
|
|
}
|
|
|
|
|
if _, ok := dataScope[stream.dataID]; ok {
|
2026-04-15 15:24:36 +08:00
|
|
|
return errStreamAlreadyExists
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
dataScope[stream.dataID] = stream
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
r.streams[key] = stream
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) lookup(scope string, streamID string) (*streamHandle, bool) {
|
|
|
|
|
if r == nil || streamID == "" {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
key := streamRuntimeKey(scope, streamID)
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
defer r.mu.RUnlock()
|
|
|
|
|
stream, ok := r.streams[key]
|
|
|
|
|
return stream, ok
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) lookupByDataID(scope string, dataID uint64) (*streamHandle, bool) {
|
|
|
|
|
if r == nil || dataID == 0 {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
scope = normalizeFileScope(scope)
|
2026-04-15 15:24:36 +08:00
|
|
|
r.mu.RLock()
|
|
|
|
|
defer r.mu.RUnlock()
|
2026-04-18 16:05:57 +08:00
|
|
|
dataScope := r.data[scope]
|
|
|
|
|
if dataScope == nil {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
stream, ok := dataScope[dataID]
|
2026-04-15 15:24:36 +08:00
|
|
|
return stream, ok
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) remove(scope string, streamID string) {
|
|
|
|
|
if r == nil || streamID == "" {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
scope = normalizeFileScope(scope)
|
2026-04-15 15:24:36 +08:00
|
|
|
key := streamRuntimeKey(scope, streamID)
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
if stream := r.streams[key]; stream != nil && stream.dataID != 0 {
|
2026-04-18 16:05:57 +08:00
|
|
|
if dataScope := r.data[scope]; dataScope != nil {
|
|
|
|
|
delete(dataScope, stream.dataID)
|
|
|
|
|
if len(dataScope) == 0 {
|
|
|
|
|
delete(r.data, scope)
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
delete(r.streams, key)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) acquireOutbound(ctx context.Context, size int) (func(), error) {
|
|
|
|
|
if r == nil || r.flow == nil {
|
|
|
|
|
return func() {}, nil
|
|
|
|
|
}
|
|
|
|
|
return r.flow.acquire(ctx, size)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (r *streamRuntime) tryAcquireOutbound(size int) bool {
|
|
|
|
|
if r == nil || r.flow == nil {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
return r.flow.tryAcquire(size)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) releaseOutbound(size int) {
|
|
|
|
|
if r == nil || r.flow == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
r.flow.release(size)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (r *streamRuntime) snapshots() []StreamSnapshot {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
snapshots := make([]StreamSnapshot, 0, len(r.streams))
|
|
|
|
|
for _, stream := range r.streams {
|
|
|
|
|
if stream == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
snapshots = append(snapshots, stream.snapshot())
|
|
|
|
|
}
|
|
|
|
|
r.mu.RUnlock()
|
|
|
|
|
sortStreamSnapshots(snapshots)
|
|
|
|
|
return snapshots
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) closeAll(err error) {
|
|
|
|
|
r.closeMatching(func(string) bool { return true }, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) closeScope(scope string, err error) {
|
|
|
|
|
scope = normalizeFileScope(scope)
|
|
|
|
|
r.closeMatching(func(key string) bool {
|
|
|
|
|
return strings.HasPrefix(key, scope+"\x00")
|
|
|
|
|
}, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *streamRuntime) closeMatching(match func(string) bool, err error) {
|
|
|
|
|
if r == nil || match == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
resetErr := streamRuntimeCloseError(err)
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
streams := make([]*streamHandle, 0, len(r.streams))
|
|
|
|
|
for key, stream := range r.streams {
|
|
|
|
|
if stream == nil || !match(key) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
streams = append(streams, stream)
|
|
|
|
|
}
|
|
|
|
|
r.mu.RUnlock()
|
|
|
|
|
for _, stream := range streams {
|
|
|
|
|
stream.markReset(resetErr)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func streamRuntimeKey(scope string, streamID string) string {
|
|
|
|
|
return normalizeFileScope(scope) + "\x00" + streamID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) getStreamRuntime() *streamRuntime {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return c.streamRuntime
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) getStreamRuntime() *streamRuntime {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return s.streamRuntime
|
|
|
|
|
}
|