2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
2026-04-18 16:05:57 +08:00
|
|
|
"net"
|
2026-04-15 15:24:36 +08:00
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type bulkRuntime struct {
|
|
|
|
|
rolePrefix string
|
|
|
|
|
seq atomic.Uint64
|
|
|
|
|
dataSeq atomic.Uint64
|
|
|
|
|
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
handler func(BulkAcceptInfo) error
|
|
|
|
|
bulks map[string]*bulkHandle
|
2026-04-18 16:05:57 +08:00
|
|
|
data map[string]map[uint64]*bulkHandle
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newBulkRuntime(rolePrefix string) *bulkRuntime {
|
|
|
|
|
return &bulkRuntime{
|
|
|
|
|
rolePrefix: rolePrefix,
|
|
|
|
|
bulks: make(map[string]*bulkHandle),
|
2026-04-18 16:05:57 +08:00
|
|
|
data: make(map[string]map[uint64]*bulkHandle),
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) nextID() string {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
return fmt.Sprintf("%s-%d", r.rolePrefix, r.seq.Add(1))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) nextDataID() uint64 {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
return r.dataSeq.Add(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) setHandler(fn func(BulkAcceptInfo) error) {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
r.handler = fn
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) handlerSnapshot() func(BulkAcceptInfo) error {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
defer r.mu.RUnlock()
|
|
|
|
|
return r.handler
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) register(scope string, bulk *bulkHandle) error {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return errBulkRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
if bulk == nil || bulk.id == "" {
|
|
|
|
|
return errBulkIDEmpty
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
scope = normalizeFileScope(scope)
|
2026-04-15 15:24:36 +08:00
|
|
|
key := bulkRuntimeKey(scope, bulk.id)
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
if _, ok := r.bulks[key]; ok {
|
|
|
|
|
return errBulkAlreadyExists
|
|
|
|
|
}
|
|
|
|
|
if bulk.dataID == 0 {
|
|
|
|
|
return errBulkDataIDEmpty
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
dataScope := r.data[scope]
|
|
|
|
|
if dataScope == nil {
|
|
|
|
|
dataScope = make(map[uint64]*bulkHandle)
|
|
|
|
|
r.data[scope] = dataScope
|
|
|
|
|
}
|
|
|
|
|
if _, ok := dataScope[bulk.dataID]; ok {
|
2026-04-15 15:24:36 +08:00
|
|
|
return errBulkAlreadyExists
|
|
|
|
|
}
|
|
|
|
|
r.bulks[key] = bulk
|
2026-04-18 16:05:57 +08:00
|
|
|
dataScope[bulk.dataID] = bulk
|
2026-04-15 15:24:36 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) lookup(scope string, bulkID string) (*bulkHandle, bool) {
|
|
|
|
|
if r == nil || bulkID == "" {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
key := bulkRuntimeKey(scope, bulkID)
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
defer r.mu.RUnlock()
|
|
|
|
|
bulk, ok := r.bulks[key]
|
|
|
|
|
return bulk, ok
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) lookupByDataID(scope string, dataID uint64) (*bulkHandle, bool) {
|
|
|
|
|
if r == nil || dataID == 0 {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
scope = normalizeFileScope(scope)
|
2026-04-15 15:24:36 +08:00
|
|
|
r.mu.RLock()
|
|
|
|
|
defer r.mu.RUnlock()
|
2026-04-18 16:05:57 +08:00
|
|
|
dataScope := r.data[scope]
|
|
|
|
|
if dataScope == nil {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
bulk, ok := dataScope[dataID]
|
2026-04-15 15:24:36 +08:00
|
|
|
return bulk, ok
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) remove(scope string, bulkID string) {
|
|
|
|
|
if r == nil || bulkID == "" {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
scope = normalizeFileScope(scope)
|
2026-04-15 15:24:36 +08:00
|
|
|
key := bulkRuntimeKey(scope, bulkID)
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
if bulk := r.bulks[key]; bulk != nil && bulk.dataID != 0 {
|
2026-04-18 16:05:57 +08:00
|
|
|
if dataScope := r.data[scope]; dataScope != nil {
|
|
|
|
|
delete(dataScope, bulk.dataID)
|
|
|
|
|
if len(dataScope) == 0 {
|
|
|
|
|
delete(r.data, scope)
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
delete(r.bulks, key)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) closeAll(err error) {
|
|
|
|
|
r.closeMatching(func(string) bool { return true }, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) closeScope(scope string, err error) {
|
|
|
|
|
scope = normalizeFileScope(scope)
|
|
|
|
|
r.closeMatching(func(key string) bool {
|
|
|
|
|
return strings.HasPrefix(key, scope+"\x00")
|
|
|
|
|
}, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) closeMatching(match func(string) bool, err error) {
|
|
|
|
|
if r == nil || match == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
resetErr := bulkRuntimeCloseError(err)
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
bulks := make([]*bulkHandle, 0, len(r.bulks))
|
|
|
|
|
for key, bulk := range r.bulks {
|
|
|
|
|
if bulk == nil || !match(key) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bulks = append(bulks, bulk)
|
|
|
|
|
}
|
|
|
|
|
r.mu.RUnlock()
|
|
|
|
|
for _, bulk := range bulks {
|
|
|
|
|
bulk.markReset(resetErr)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (r *bulkRuntime) resetDedicatedByConn(scope string, conn net.Conn, err error) {
|
|
|
|
|
if r == nil || conn == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
scope = normalizeFileScope(scope)
|
|
|
|
|
resetErr := bulkRuntimeCloseError(err)
|
|
|
|
|
prefix := scope + "\x00"
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
bulks := make([]*bulkHandle, 0, len(r.bulks))
|
|
|
|
|
for key, bulk := range r.bulks {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !strings.HasPrefix(key, prefix) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !bulk.Dedicated() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if bulk.dedicatedConnSnapshot() != conn {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bulks = append(bulks, bulk)
|
|
|
|
|
}
|
|
|
|
|
r.mu.RUnlock()
|
|
|
|
|
for _, bulk := range bulks {
|
|
|
|
|
bulk.markReset(resetErr)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) handleDedicatedReadErrorByConn(scope string, conn net.Conn, err error) {
|
|
|
|
|
if r == nil || conn == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
scope = normalizeFileScope(scope)
|
|
|
|
|
prefix := scope + "\x00"
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
bulks := make([]*bulkHandle, 0, len(r.bulks))
|
|
|
|
|
for key, bulk := range r.bulks {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !strings.HasPrefix(key, prefix) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !bulk.Dedicated() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if bulk.dedicatedConnSnapshot() != conn {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bulks = append(bulks, bulk)
|
|
|
|
|
}
|
|
|
|
|
r.mu.RUnlock()
|
|
|
|
|
for _, bulk := range bulks {
|
|
|
|
|
handleDedicatedBulkReadError(bulk, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) attachSharedDedicatedConn(scope string, laneID uint32, conn net.Conn) {
|
|
|
|
|
if r == nil || conn == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
scope = normalizeFileScope(scope)
|
|
|
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
|
|
|
|
prefix := scope + "\x00"
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
bulks := make([]*bulkHandle, 0, len(r.bulks))
|
|
|
|
|
for key, bulk := range r.bulks {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !strings.HasPrefix(key, prefix) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !bulk.Dedicated() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if bulk.dedicatedLaneIDSnapshot() != laneID {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bulks = append(bulks, bulk)
|
|
|
|
|
}
|
|
|
|
|
r.mu.RUnlock()
|
|
|
|
|
for _, bulk := range bulks {
|
|
|
|
|
current := bulk.dedicatedConnSnapshot()
|
|
|
|
|
switch {
|
|
|
|
|
case current == conn:
|
|
|
|
|
_ = bulk.attachDedicatedConnShared(conn)
|
|
|
|
|
case current == nil:
|
|
|
|
|
_ = bulk.attachDedicatedConnShared(conn)
|
|
|
|
|
default:
|
|
|
|
|
oldConn, oldSender, err := bulk.replaceDedicatedConnShared(conn)
|
|
|
|
|
if err != nil {
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if oldSender != nil {
|
|
|
|
|
oldSender.stop()
|
|
|
|
|
}
|
|
|
|
|
if oldConn != nil {
|
|
|
|
|
_ = oldConn.Close()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *bulkRuntime) dedicatedBulksForConn(scope string, conn net.Conn) []*bulkHandle {
|
|
|
|
|
if r == nil || conn == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
scope = normalizeFileScope(scope)
|
|
|
|
|
prefix := scope + "\x00"
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
bulks := make([]*bulkHandle, 0, len(r.bulks))
|
|
|
|
|
for key, bulk := range r.bulks {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !strings.HasPrefix(key, prefix) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !bulk.Dedicated() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if bulk.dedicatedConnSnapshot() != conn {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bulks = append(bulks, bulk)
|
|
|
|
|
}
|
|
|
|
|
r.mu.RUnlock()
|
|
|
|
|
return bulks
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (r *bulkRuntime) snapshots() []BulkSnapshot {
|
|
|
|
|
if r == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
snapshots := make([]BulkSnapshot, 0, len(r.bulks))
|
|
|
|
|
for _, bulk := range r.bulks {
|
|
|
|
|
if bulk == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
snapshots = append(snapshots, bulk.snapshot())
|
|
|
|
|
}
|
|
|
|
|
r.mu.RUnlock()
|
|
|
|
|
sortBulkSnapshots(snapshots)
|
|
|
|
|
return snapshots
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func bulkRuntimeKey(scope string, bulkID string) string {
|
|
|
|
|
return normalizeFileScope(scope) + "\x00" + bulkID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func bulkRuntimeCloseError(err error) error {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return errServiceShutdown
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) getBulkRuntime() *bulkRuntime {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return c.bulkRuntime
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) getBulkRuntime() *bulkRuntime {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return s.bulkRuntime
|
|
|
|
|
}
|