notify/bulk_runtime.go

197 lines
4.0 KiB
Go
Raw Permalink Normal View History

package notify
import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
)
type bulkRuntime struct {
rolePrefix string
seq atomic.Uint64
dataSeq atomic.Uint64
mu sync.RWMutex
handler func(BulkAcceptInfo) error
bulks map[string]*bulkHandle
data map[string]*bulkHandle
}
func newBulkRuntime(rolePrefix string) *bulkRuntime {
return &bulkRuntime{
rolePrefix: rolePrefix,
bulks: make(map[string]*bulkHandle),
data: make(map[string]*bulkHandle),
}
}
func (r *bulkRuntime) nextID() string {
if r == nil {
return ""
}
return fmt.Sprintf("%s-%d", r.rolePrefix, r.seq.Add(1))
}
func (r *bulkRuntime) nextDataID() uint64 {
if r == nil {
return 0
}
return r.dataSeq.Add(1)
}
func (r *bulkRuntime) setHandler(fn func(BulkAcceptInfo) error) {
if r == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
r.handler = fn
}
func (r *bulkRuntime) handlerSnapshot() func(BulkAcceptInfo) error {
if r == nil {
return nil
}
r.mu.RLock()
defer r.mu.RUnlock()
return r.handler
}
func (r *bulkRuntime) register(scope string, bulk *bulkHandle) error {
if r == nil {
return errBulkRuntimeNil
}
if bulk == nil || bulk.id == "" {
return errBulkIDEmpty
}
key := bulkRuntimeKey(scope, bulk.id)
dataKey := bulkRuntimeDataKey(scope, bulk.dataID)
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.bulks[key]; ok {
return errBulkAlreadyExists
}
if bulk.dataID == 0 {
return errBulkDataIDEmpty
}
if _, ok := r.data[dataKey]; ok {
return errBulkAlreadyExists
}
r.bulks[key] = bulk
r.data[dataKey] = bulk
return nil
}
func (r *bulkRuntime) lookup(scope string, bulkID string) (*bulkHandle, bool) {
if r == nil || bulkID == "" {
return nil, false
}
key := bulkRuntimeKey(scope, bulkID)
r.mu.RLock()
defer r.mu.RUnlock()
bulk, ok := r.bulks[key]
return bulk, ok
}
func (r *bulkRuntime) lookupByDataID(scope string, dataID uint64) (*bulkHandle, bool) {
if r == nil || dataID == 0 {
return nil, false
}
key := bulkRuntimeDataKey(scope, dataID)
r.mu.RLock()
defer r.mu.RUnlock()
bulk, ok := r.data[key]
return bulk, ok
}
func (r *bulkRuntime) remove(scope string, bulkID string) {
if r == nil || bulkID == "" {
return
}
key := bulkRuntimeKey(scope, bulkID)
r.mu.Lock()
defer r.mu.Unlock()
if bulk := r.bulks[key]; bulk != nil && bulk.dataID != 0 {
delete(r.data, bulkRuntimeDataKey(scope, bulk.dataID))
}
delete(r.bulks, key)
}
func (r *bulkRuntime) closeAll(err error) {
r.closeMatching(func(string) bool { return true }, err)
}
func (r *bulkRuntime) closeScope(scope string, err error) {
scope = normalizeFileScope(scope)
r.closeMatching(func(key string) bool {
return strings.HasPrefix(key, scope+"\x00")
}, err)
}
func (r *bulkRuntime) closeMatching(match func(string) bool, err error) {
if r == nil || match == nil {
return
}
resetErr := bulkRuntimeCloseError(err)
r.mu.RLock()
bulks := make([]*bulkHandle, 0, len(r.bulks))
for key, bulk := range r.bulks {
if bulk == nil || !match(key) {
continue
}
bulks = append(bulks, bulk)
}
r.mu.RUnlock()
for _, bulk := range bulks {
bulk.markReset(resetErr)
}
}
func (r *bulkRuntime) snapshots() []BulkSnapshot {
if r == nil {
return nil
}
r.mu.RLock()
snapshots := make([]BulkSnapshot, 0, len(r.bulks))
for _, bulk := range r.bulks {
if bulk == nil {
continue
}
snapshots = append(snapshots, bulk.snapshot())
}
r.mu.RUnlock()
sortBulkSnapshots(snapshots)
return snapshots
}
func bulkRuntimeKey(scope string, bulkID string) string {
return normalizeFileScope(scope) + "\x00" + bulkID
}
func bulkRuntimeDataKey(scope string, dataID uint64) string {
return normalizeFileScope(scope) + "\x01" + strconv.FormatUint(dataID, 10)
}
func bulkRuntimeCloseError(err error) error {
if err != nil {
return err
}
return errServiceShutdown
}
func (c *ClientCommon) getBulkRuntime() *bulkRuntime {
if c == nil {
return nil
}
return c.bulkRuntime
}
func (s *ServerCommon) getBulkRuntime() *bulkRuntime {
if s == nil {
return nil
}
return s.bulkRuntime
}