notify/client_bulk_config.go

365 lines
9.2 KiB
Go
Raw Permalink Normal View History

package notify
import "time"
func defaultBulkOpenTuning() BulkOpenTuning {
return normalizeBulkOpenTuning(BulkOpenTuning{
ChunkSize: defaultBulkChunkSize,
WindowBytes: defaultBulkOpenWindowBytes,
MaxInFlight: defaultBulkOpenMaxInFlight,
})
}
func normalizeBulkOpenTuning(tuning BulkOpenTuning) BulkOpenTuning {
if tuning.ChunkSize <= 0 {
tuning.ChunkSize = defaultBulkChunkSize
}
if tuning.WindowBytes <= 0 {
tuning.WindowBytes = defaultBulkOpenWindowBytes
}
if tuning.WindowBytes < tuning.ChunkSize {
tuning.WindowBytes = tuning.ChunkSize
}
if tuning.MaxInFlight <= 0 {
tuning.MaxInFlight = defaultBulkOpenMaxInFlight
}
return tuning
}
func applyBulkOpenTuningDefaults(opt BulkOpenOptions, tuning BulkOpenTuning) BulkOpenOptions {
tuning = normalizeBulkOpenTuning(tuning)
if opt.ChunkSize <= 0 {
opt.ChunkSize = tuning.ChunkSize
}
if opt.WindowBytes <= 0 {
opt.WindowBytes = tuning.WindowBytes
}
if opt.MaxInFlight <= 0 {
opt.MaxInFlight = tuning.MaxInFlight
}
return opt
}
func normalizeBulkNetworkProfile(profile BulkNetworkProfile) BulkNetworkProfile {
switch profile {
case BulkNetworkProfileDefault, BulkNetworkProfileLAN, BulkNetworkProfileWAN, BulkNetworkProfileConstrained:
return profile
default:
return BulkNetworkProfileDefault
}
}
func bulkNetworkProfileName(profile BulkNetworkProfile) string {
switch normalizeBulkNetworkProfile(profile) {
case BulkNetworkProfileLAN:
return "lan"
case BulkNetworkProfileWAN:
return "wan"
case BulkNetworkProfileConstrained:
return "constrained"
case BulkNetworkProfileDefault:
fallthrough
default:
return "default"
}
}
func bulkNetworkProfilePreset(profile BulkNetworkProfile) (BulkOpenMode, BulkDedicatedAttachConfig, BulkOpenTuning) {
switch normalizeBulkNetworkProfile(profile) {
case BulkNetworkProfileLAN:
return BulkOpenModeAuto, BulkDedicatedAttachConfig{
AttachLimit: defaultBulkDedicatedAttachLimit,
ActiveLimit: defaultBulkDedicatedActiveLimit,
LaneLimit: 4,
Retry: defaultBulkDedicatedAttachRetry,
Backoff: defaultBulkDedicatedAttachBackoff,
DialTimeout: defaultBulkDedicatedDialTimeout,
HelloTimeout: defaultBulkDedicatedHelloTimeout,
}, BulkOpenTuning{
ChunkSize: 1024 * 1024,
WindowBytes: 32 * 1024 * 1024,
MaxInFlight: 64,
}
case BulkNetworkProfileWAN:
return BulkOpenModeAuto, BulkDedicatedAttachConfig{
AttachLimit: 2,
ActiveLimit: defaultBulkDedicatedActiveLimit,
LaneLimit: 2,
Retry: 4,
Backoff: 250 * time.Millisecond,
DialTimeout: 15 * time.Second,
HelloTimeout: 20 * time.Second,
}, BulkOpenTuning{
ChunkSize: 512 * 1024,
WindowBytes: 8 * 1024 * 1024,
MaxInFlight: 16,
}
case BulkNetworkProfileConstrained:
return BulkOpenModeShared, BulkDedicatedAttachConfig{
AttachLimit: 1,
ActiveLimit: 2,
LaneLimit: 1,
Retry: 5,
Backoff: 400 * time.Millisecond,
DialTimeout: 20 * time.Second,
HelloTimeout: 30 * time.Second,
}, BulkOpenTuning{
ChunkSize: 128 * 1024,
WindowBytes: 1 * 1024 * 1024,
MaxInFlight: 8,
}
default:
return BulkOpenModeShared, BulkDedicatedAttachConfig{
AttachLimit: defaultBulkDedicatedAttachLimit,
ActiveLimit: defaultBulkDedicatedActiveLimit,
LaneLimit: defaultBulkDedicatedLaneLimit,
Retry: defaultBulkDedicatedAttachRetry,
Backoff: defaultBulkDedicatedAttachBackoff,
DialTimeout: defaultBulkDedicatedDialTimeout,
HelloTimeout: defaultBulkDedicatedHelloTimeout,
}, defaultBulkOpenTuning()
}
}
func normalizeBulkDedicatedAttachConfig(cfg BulkDedicatedAttachConfig) BulkDedicatedAttachConfig {
if cfg.AttachLimit < 0 {
cfg.AttachLimit = 0
}
if cfg.ActiveLimit < 0 {
cfg.ActiveLimit = 0
}
if cfg.LaneLimit < 0 {
cfg.LaneLimit = 0
}
if cfg.Retry < 0 {
cfg.Retry = 0
}
if cfg.Backoff <= 0 {
cfg.Backoff = defaultBulkDedicatedAttachBackoff
}
if cfg.DialTimeout <= 0 {
cfg.DialTimeout = defaultBulkDedicatedDialTimeout
}
if cfg.HelloTimeout <= 0 {
cfg.HelloTimeout = defaultBulkDedicatedHelloTimeout
}
return cfg
}
func (c *ClientCommon) setBulkDedicatedAttachSemaphoreLocked(limit int) {
if limit <= 0 {
c.bulkDedicatedAttachSem = nil
return
}
c.bulkDedicatedAttachSem = make(chan struct{}, limit)
}
func (c *ClientCommon) applyBulkDedicatedAttachConfigLocked(cfg BulkDedicatedAttachConfig) {
cfg = normalizeBulkDedicatedAttachConfig(cfg)
c.bulkDedicatedAttachLimit = cfg.AttachLimit
c.setBulkDedicatedAttachSemaphoreLocked(cfg.AttachLimit)
c.bulkDedicatedActiveLimit = cfg.ActiveLimit
c.notifyBulkDedicatedActiveWaitersLocked()
c.bulkDedicatedLaneLimit = cfg.LaneLimit
c.bulkDedicatedAttachRetry = cfg.Retry
c.bulkDedicatedAttachBackoff = cfg.Backoff
c.bulkDedicatedDialTimeout = cfg.DialTimeout
c.bulkDedicatedHelloTimeout = cfg.HelloTimeout
}
func (c *ClientCommon) ensureBulkDedicatedActiveWaitLocked() chan struct{} {
if c == nil {
return nil
}
if c.bulkDedicatedActiveWait == nil {
c.bulkDedicatedActiveWait = make(chan struct{})
}
return c.bulkDedicatedActiveWait
}
func (c *ClientCommon) notifyBulkDedicatedActiveWaitersLocked() {
if c == nil {
return
}
waitCh := c.ensureBulkDedicatedActiveWaitLocked()
close(waitCh)
c.bulkDedicatedActiveWait = make(chan struct{})
}
func (c *ClientCommon) applyBulkOpenTuningLocked(tuning BulkOpenTuning) {
c.bulkOpenTuning = normalizeBulkOpenTuning(tuning)
}
func (c *ClientCommon) bulkDefaultOpenModeSnapshot() BulkOpenMode {
if c == nil {
return BulkOpenModeShared
}
c.mu.Lock()
defer c.mu.Unlock()
mode := normalizeBulkOpenMode(c.bulkDefaultOpenMode)
if mode == BulkOpenModeDefault {
mode = BulkOpenModeShared
}
return mode
}
func (c *ClientCommon) bulkDedicatedAttachSemaphoreSnapshot() chan struct{} {
if c == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
return c.bulkDedicatedAttachSem
}
func (c *ClientCommon) bulkDedicatedActiveLimitSnapshot() int {
if c == nil {
return 0
}
c.mu.Lock()
defer c.mu.Unlock()
if c.bulkDedicatedActiveLimit < 0 {
return 0
}
return c.bulkDedicatedActiveLimit
}
func (c *ClientCommon) bulkDedicatedActiveWaitSnapshot() chan struct{} {
if c == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
return c.ensureBulkDedicatedActiveWaitLocked()
}
func (c *ClientCommon) bulkDedicatedLaneLimitSnapshot() int {
if c == nil {
return defaultBulkDedicatedLaneLimit
}
c.mu.Lock()
defer c.mu.Unlock()
limit := c.bulkDedicatedLaneLimit
if limit < 0 {
return 0
}
if limit == 0 {
return 0
}
return limit
}
func (c *ClientCommon) SetBulkDefaultOpenMode(mode BulkOpenMode) {
if c == nil {
return
}
mode = normalizeBulkOpenMode(mode)
if mode == BulkOpenModeDefault {
mode = BulkOpenModeShared
}
c.mu.Lock()
c.bulkDefaultOpenMode = mode
c.mu.Unlock()
}
func (c *ClientCommon) BulkDefaultOpenMode() BulkOpenMode {
return c.bulkDefaultOpenModeSnapshot()
}
func (c *ClientCommon) bulkOpenTuningSnapshot() BulkOpenTuning {
if c == nil {
return defaultBulkOpenTuning()
}
c.mu.Lock()
defer c.mu.Unlock()
return normalizeBulkOpenTuning(c.bulkOpenTuning)
}
func (c *ClientCommon) SetBulkOpenTuning(tuning BulkOpenTuning) {
if c == nil {
return
}
c.mu.Lock()
c.applyBulkOpenTuningLocked(tuning)
c.mu.Unlock()
}
func (c *ClientCommon) BulkOpenTuning() BulkOpenTuning {
return c.bulkOpenTuningSnapshot()
}
func (c *ClientCommon) SetBulkDedicatedAttachConfig(cfg BulkDedicatedAttachConfig) {
if c == nil {
return
}
c.mu.Lock()
c.applyBulkDedicatedAttachConfigLocked(cfg)
c.mu.Unlock()
}
func (c *ClientCommon) BulkDedicatedAttachConfig() BulkDedicatedAttachConfig {
if c == nil {
return normalizeBulkDedicatedAttachConfig(BulkDedicatedAttachConfig{})
}
c.mu.Lock()
defer c.mu.Unlock()
return normalizeBulkDedicatedAttachConfig(BulkDedicatedAttachConfig{
AttachLimit: c.bulkDedicatedAttachLimit,
ActiveLimit: c.bulkDedicatedActiveLimit,
LaneLimit: c.bulkDedicatedLaneLimit,
Retry: c.bulkDedicatedAttachRetry,
Backoff: c.bulkDedicatedAttachBackoff,
DialTimeout: c.bulkDedicatedDialTimeout,
HelloTimeout: c.bulkDedicatedHelloTimeout,
})
}
func (c *ClientCommon) SetBulkNetworkProfile(profile BulkNetworkProfile) {
if c == nil {
return
}
profile = normalizeBulkNetworkProfile(profile)
mode, cfg, tuning := bulkNetworkProfilePreset(profile)
c.mu.Lock()
c.bulkNetworkProfile = profile
c.bulkDefaultOpenMode = mode
c.applyBulkDedicatedAttachConfigLocked(cfg)
c.applyBulkOpenTuningLocked(tuning)
c.mu.Unlock()
}
func (c *ClientCommon) BulkNetworkProfile() BulkNetworkProfile {
if c == nil {
return BulkNetworkProfileDefault
}
c.mu.Lock()
defer c.mu.Unlock()
return normalizeBulkNetworkProfile(c.bulkNetworkProfile)
}
func (s *ServerCommon) applyBulkOpenTuningLocked(tuning BulkOpenTuning) {
s.bulkOpenTuning = normalizeBulkOpenTuning(tuning)
}
func (s *ServerCommon) bulkOpenTuningSnapshot() BulkOpenTuning {
if s == nil {
return defaultBulkOpenTuning()
}
s.mu.RLock()
defer s.mu.RUnlock()
return normalizeBulkOpenTuning(s.bulkOpenTuning)
}
func (s *ServerCommon) SetBulkOpenTuning(tuning BulkOpenTuning) {
if s == nil {
return
}
s.mu.Lock()
s.applyBulkOpenTuningLocked(tuning)
s.mu.Unlock()
}
func (s *ServerCommon) BulkOpenTuning() BulkOpenTuning {
return s.bulkOpenTuningSnapshot()
}