package notify import "time" func defaultBulkOpenTuning() BulkOpenTuning { return normalizeBulkOpenTuning(BulkOpenTuning{ ChunkSize: defaultBulkChunkSize, WindowBytes: defaultBulkOpenWindowBytes, MaxInFlight: defaultBulkOpenMaxInFlight, }) } func normalizeBulkOpenTuning(tuning BulkOpenTuning) BulkOpenTuning { if tuning.ChunkSize <= 0 { tuning.ChunkSize = defaultBulkChunkSize } if tuning.WindowBytes <= 0 { tuning.WindowBytes = defaultBulkOpenWindowBytes } if tuning.WindowBytes < tuning.ChunkSize { tuning.WindowBytes = tuning.ChunkSize } if tuning.MaxInFlight <= 0 { tuning.MaxInFlight = defaultBulkOpenMaxInFlight } return tuning } func applyBulkOpenTuningDefaults(opt BulkOpenOptions, tuning BulkOpenTuning) BulkOpenOptions { tuning = normalizeBulkOpenTuning(tuning) if opt.ChunkSize <= 0 { opt.ChunkSize = tuning.ChunkSize } if opt.WindowBytes <= 0 { opt.WindowBytes = tuning.WindowBytes } if opt.MaxInFlight <= 0 { opt.MaxInFlight = tuning.MaxInFlight } return opt } func normalizeBulkNetworkProfile(profile BulkNetworkProfile) BulkNetworkProfile { switch profile { case BulkNetworkProfileDefault, BulkNetworkProfileLAN, BulkNetworkProfileWAN, BulkNetworkProfileConstrained: return profile default: return BulkNetworkProfileDefault } } func bulkNetworkProfileName(profile BulkNetworkProfile) string { switch normalizeBulkNetworkProfile(profile) { case BulkNetworkProfileLAN: return "lan" case BulkNetworkProfileWAN: return "wan" case BulkNetworkProfileConstrained: return "constrained" case BulkNetworkProfileDefault: fallthrough default: return "default" } } func bulkNetworkProfilePreset(profile BulkNetworkProfile) (BulkOpenMode, BulkDedicatedAttachConfig, BulkOpenTuning) { switch normalizeBulkNetworkProfile(profile) { case BulkNetworkProfileLAN: return BulkOpenModeAuto, BulkDedicatedAttachConfig{ AttachLimit: defaultBulkDedicatedAttachLimit, ActiveLimit: defaultBulkDedicatedActiveLimit, LaneLimit: 4, Retry: defaultBulkDedicatedAttachRetry, Backoff: defaultBulkDedicatedAttachBackoff, DialTimeout: defaultBulkDedicatedDialTimeout, HelloTimeout: defaultBulkDedicatedHelloTimeout, }, BulkOpenTuning{ ChunkSize: 1024 * 1024, WindowBytes: 32 * 1024 * 1024, MaxInFlight: 64, } case BulkNetworkProfileWAN: return BulkOpenModeAuto, BulkDedicatedAttachConfig{ AttachLimit: 2, ActiveLimit: defaultBulkDedicatedActiveLimit, LaneLimit: 2, Retry: 4, Backoff: 250 * time.Millisecond, DialTimeout: 15 * time.Second, HelloTimeout: 20 * time.Second, }, BulkOpenTuning{ ChunkSize: 512 * 1024, WindowBytes: 8 * 1024 * 1024, MaxInFlight: 16, } case BulkNetworkProfileConstrained: return BulkOpenModeShared, BulkDedicatedAttachConfig{ AttachLimit: 1, ActiveLimit: 2, LaneLimit: 1, Retry: 5, Backoff: 400 * time.Millisecond, DialTimeout: 20 * time.Second, HelloTimeout: 30 * time.Second, }, BulkOpenTuning{ ChunkSize: 128 * 1024, WindowBytes: 1 * 1024 * 1024, MaxInFlight: 8, } default: return BulkOpenModeShared, BulkDedicatedAttachConfig{ AttachLimit: defaultBulkDedicatedAttachLimit, ActiveLimit: defaultBulkDedicatedActiveLimit, LaneLimit: defaultBulkDedicatedLaneLimit, Retry: defaultBulkDedicatedAttachRetry, Backoff: defaultBulkDedicatedAttachBackoff, DialTimeout: defaultBulkDedicatedDialTimeout, HelloTimeout: defaultBulkDedicatedHelloTimeout, }, defaultBulkOpenTuning() } } func normalizeBulkDedicatedAttachConfig(cfg BulkDedicatedAttachConfig) BulkDedicatedAttachConfig { if cfg.AttachLimit < 0 { cfg.AttachLimit = 0 } if cfg.ActiveLimit < 0 { cfg.ActiveLimit = 0 } if cfg.LaneLimit < 0 { cfg.LaneLimit = 0 } if cfg.Retry < 0 { cfg.Retry = 0 } if cfg.Backoff <= 0 { cfg.Backoff = defaultBulkDedicatedAttachBackoff } if cfg.DialTimeout <= 0 { cfg.DialTimeout = defaultBulkDedicatedDialTimeout } if cfg.HelloTimeout <= 0 { cfg.HelloTimeout = defaultBulkDedicatedHelloTimeout } return cfg } func (c *ClientCommon) setBulkDedicatedAttachSemaphoreLocked(limit int) { if limit <= 0 { c.bulkDedicatedAttachSem = nil return } c.bulkDedicatedAttachSem = make(chan struct{}, limit) } func (c *ClientCommon) applyBulkDedicatedAttachConfigLocked(cfg BulkDedicatedAttachConfig) { cfg = normalizeBulkDedicatedAttachConfig(cfg) c.bulkDedicatedAttachLimit = cfg.AttachLimit c.setBulkDedicatedAttachSemaphoreLocked(cfg.AttachLimit) c.bulkDedicatedActiveLimit = cfg.ActiveLimit c.notifyBulkDedicatedActiveWaitersLocked() c.bulkDedicatedLaneLimit = cfg.LaneLimit c.bulkDedicatedAttachRetry = cfg.Retry c.bulkDedicatedAttachBackoff = cfg.Backoff c.bulkDedicatedDialTimeout = cfg.DialTimeout c.bulkDedicatedHelloTimeout = cfg.HelloTimeout } func (c *ClientCommon) ensureBulkDedicatedActiveWaitLocked() chan struct{} { if c == nil { return nil } if c.bulkDedicatedActiveWait == nil { c.bulkDedicatedActiveWait = make(chan struct{}) } return c.bulkDedicatedActiveWait } func (c *ClientCommon) notifyBulkDedicatedActiveWaitersLocked() { if c == nil { return } waitCh := c.ensureBulkDedicatedActiveWaitLocked() close(waitCh) c.bulkDedicatedActiveWait = make(chan struct{}) } func (c *ClientCommon) applyBulkOpenTuningLocked(tuning BulkOpenTuning) { c.bulkOpenTuning = normalizeBulkOpenTuning(tuning) } func (c *ClientCommon) bulkDefaultOpenModeSnapshot() BulkOpenMode { if c == nil { return BulkOpenModeShared } c.mu.Lock() defer c.mu.Unlock() mode := normalizeBulkOpenMode(c.bulkDefaultOpenMode) if mode == BulkOpenModeDefault { mode = BulkOpenModeShared } return mode } func (c *ClientCommon) bulkDedicatedAttachSemaphoreSnapshot() chan struct{} { if c == nil { return nil } c.mu.Lock() defer c.mu.Unlock() return c.bulkDedicatedAttachSem } func (c *ClientCommon) bulkDedicatedActiveLimitSnapshot() int { if c == nil { return 0 } c.mu.Lock() defer c.mu.Unlock() if c.bulkDedicatedActiveLimit < 0 { return 0 } return c.bulkDedicatedActiveLimit } func (c *ClientCommon) bulkDedicatedActiveWaitSnapshot() chan struct{} { if c == nil { return nil } c.mu.Lock() defer c.mu.Unlock() return c.ensureBulkDedicatedActiveWaitLocked() } func (c *ClientCommon) bulkDedicatedLaneLimitSnapshot() int { if c == nil { return defaultBulkDedicatedLaneLimit } c.mu.Lock() defer c.mu.Unlock() limit := c.bulkDedicatedLaneLimit if limit < 0 { return 0 } if limit == 0 { return 0 } return limit } func (c *ClientCommon) SetBulkDefaultOpenMode(mode BulkOpenMode) { if c == nil { return } mode = normalizeBulkOpenMode(mode) if mode == BulkOpenModeDefault { mode = BulkOpenModeShared } c.mu.Lock() c.bulkDefaultOpenMode = mode c.mu.Unlock() } func (c *ClientCommon) BulkDefaultOpenMode() BulkOpenMode { return c.bulkDefaultOpenModeSnapshot() } func (c *ClientCommon) bulkOpenTuningSnapshot() BulkOpenTuning { if c == nil { return defaultBulkOpenTuning() } c.mu.Lock() defer c.mu.Unlock() return normalizeBulkOpenTuning(c.bulkOpenTuning) } func (c *ClientCommon) SetBulkOpenTuning(tuning BulkOpenTuning) { if c == nil { return } c.mu.Lock() c.applyBulkOpenTuningLocked(tuning) c.mu.Unlock() } func (c *ClientCommon) BulkOpenTuning() BulkOpenTuning { return c.bulkOpenTuningSnapshot() } func (c *ClientCommon) SetBulkDedicatedAttachConfig(cfg BulkDedicatedAttachConfig) { if c == nil { return } c.mu.Lock() c.applyBulkDedicatedAttachConfigLocked(cfg) c.mu.Unlock() } func (c *ClientCommon) BulkDedicatedAttachConfig() BulkDedicatedAttachConfig { if c == nil { return normalizeBulkDedicatedAttachConfig(BulkDedicatedAttachConfig{}) } c.mu.Lock() defer c.mu.Unlock() return normalizeBulkDedicatedAttachConfig(BulkDedicatedAttachConfig{ AttachLimit: c.bulkDedicatedAttachLimit, ActiveLimit: c.bulkDedicatedActiveLimit, LaneLimit: c.bulkDedicatedLaneLimit, Retry: c.bulkDedicatedAttachRetry, Backoff: c.bulkDedicatedAttachBackoff, DialTimeout: c.bulkDedicatedDialTimeout, HelloTimeout: c.bulkDedicatedHelloTimeout, }) } func (c *ClientCommon) SetBulkNetworkProfile(profile BulkNetworkProfile) { if c == nil { return } profile = normalizeBulkNetworkProfile(profile) mode, cfg, tuning := bulkNetworkProfilePreset(profile) c.mu.Lock() c.bulkNetworkProfile = profile c.bulkDefaultOpenMode = mode c.applyBulkDedicatedAttachConfigLocked(cfg) c.applyBulkOpenTuningLocked(tuning) c.mu.Unlock() } func (c *ClientCommon) BulkNetworkProfile() BulkNetworkProfile { if c == nil { return BulkNetworkProfileDefault } c.mu.Lock() defer c.mu.Unlock() return normalizeBulkNetworkProfile(c.bulkNetworkProfile) } func (s *ServerCommon) applyBulkOpenTuningLocked(tuning BulkOpenTuning) { s.bulkOpenTuning = normalizeBulkOpenTuning(tuning) } func (s *ServerCommon) bulkOpenTuningSnapshot() BulkOpenTuning { if s == nil { return defaultBulkOpenTuning() } s.mu.RLock() defer s.mu.RUnlock() return normalizeBulkOpenTuning(s.bulkOpenTuning) } func (s *ServerCommon) SetBulkOpenTuning(tuning BulkOpenTuning) { if s == nil { return } s.mu.Lock() s.applyBulkOpenTuningLocked(tuning) s.mu.Unlock() } func (s *ServerCommon) BulkOpenTuning() BulkOpenTuning { return s.bulkOpenTuningSnapshot() }