365 lines
9.2 KiB
Go
365 lines
9.2 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import "time"
|
||
|
|
|
||
|
|
func defaultBulkOpenTuning() BulkOpenTuning {
|
||
|
|
return normalizeBulkOpenTuning(BulkOpenTuning{
|
||
|
|
ChunkSize: defaultBulkChunkSize,
|
||
|
|
WindowBytes: defaultBulkOpenWindowBytes,
|
||
|
|
MaxInFlight: defaultBulkOpenMaxInFlight,
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizeBulkOpenTuning(tuning BulkOpenTuning) BulkOpenTuning {
|
||
|
|
if tuning.ChunkSize <= 0 {
|
||
|
|
tuning.ChunkSize = defaultBulkChunkSize
|
||
|
|
}
|
||
|
|
if tuning.WindowBytes <= 0 {
|
||
|
|
tuning.WindowBytes = defaultBulkOpenWindowBytes
|
||
|
|
}
|
||
|
|
if tuning.WindowBytes < tuning.ChunkSize {
|
||
|
|
tuning.WindowBytes = tuning.ChunkSize
|
||
|
|
}
|
||
|
|
if tuning.MaxInFlight <= 0 {
|
||
|
|
tuning.MaxInFlight = defaultBulkOpenMaxInFlight
|
||
|
|
}
|
||
|
|
return tuning
|
||
|
|
}
|
||
|
|
|
||
|
|
func applyBulkOpenTuningDefaults(opt BulkOpenOptions, tuning BulkOpenTuning) BulkOpenOptions {
|
||
|
|
tuning = normalizeBulkOpenTuning(tuning)
|
||
|
|
if opt.ChunkSize <= 0 {
|
||
|
|
opt.ChunkSize = tuning.ChunkSize
|
||
|
|
}
|
||
|
|
if opt.WindowBytes <= 0 {
|
||
|
|
opt.WindowBytes = tuning.WindowBytes
|
||
|
|
}
|
||
|
|
if opt.MaxInFlight <= 0 {
|
||
|
|
opt.MaxInFlight = tuning.MaxInFlight
|
||
|
|
}
|
||
|
|
return opt
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizeBulkNetworkProfile(profile BulkNetworkProfile) BulkNetworkProfile {
|
||
|
|
switch profile {
|
||
|
|
case BulkNetworkProfileDefault, BulkNetworkProfileLAN, BulkNetworkProfileWAN, BulkNetworkProfileConstrained:
|
||
|
|
return profile
|
||
|
|
default:
|
||
|
|
return BulkNetworkProfileDefault
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func bulkNetworkProfileName(profile BulkNetworkProfile) string {
|
||
|
|
switch normalizeBulkNetworkProfile(profile) {
|
||
|
|
case BulkNetworkProfileLAN:
|
||
|
|
return "lan"
|
||
|
|
case BulkNetworkProfileWAN:
|
||
|
|
return "wan"
|
||
|
|
case BulkNetworkProfileConstrained:
|
||
|
|
return "constrained"
|
||
|
|
case BulkNetworkProfileDefault:
|
||
|
|
fallthrough
|
||
|
|
default:
|
||
|
|
return "default"
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func bulkNetworkProfilePreset(profile BulkNetworkProfile) (BulkOpenMode, BulkDedicatedAttachConfig, BulkOpenTuning) {
|
||
|
|
switch normalizeBulkNetworkProfile(profile) {
|
||
|
|
case BulkNetworkProfileLAN:
|
||
|
|
return BulkOpenModeAuto, BulkDedicatedAttachConfig{
|
||
|
|
AttachLimit: defaultBulkDedicatedAttachLimit,
|
||
|
|
ActiveLimit: defaultBulkDedicatedActiveLimit,
|
||
|
|
LaneLimit: 4,
|
||
|
|
Retry: defaultBulkDedicatedAttachRetry,
|
||
|
|
Backoff: defaultBulkDedicatedAttachBackoff,
|
||
|
|
DialTimeout: defaultBulkDedicatedDialTimeout,
|
||
|
|
HelloTimeout: defaultBulkDedicatedHelloTimeout,
|
||
|
|
}, BulkOpenTuning{
|
||
|
|
ChunkSize: 1024 * 1024,
|
||
|
|
WindowBytes: 32 * 1024 * 1024,
|
||
|
|
MaxInFlight: 64,
|
||
|
|
}
|
||
|
|
case BulkNetworkProfileWAN:
|
||
|
|
return BulkOpenModeAuto, BulkDedicatedAttachConfig{
|
||
|
|
AttachLimit: 2,
|
||
|
|
ActiveLimit: defaultBulkDedicatedActiveLimit,
|
||
|
|
LaneLimit: 2,
|
||
|
|
Retry: 4,
|
||
|
|
Backoff: 250 * time.Millisecond,
|
||
|
|
DialTimeout: 15 * time.Second,
|
||
|
|
HelloTimeout: 20 * time.Second,
|
||
|
|
}, BulkOpenTuning{
|
||
|
|
ChunkSize: 512 * 1024,
|
||
|
|
WindowBytes: 8 * 1024 * 1024,
|
||
|
|
MaxInFlight: 16,
|
||
|
|
}
|
||
|
|
case BulkNetworkProfileConstrained:
|
||
|
|
return BulkOpenModeShared, BulkDedicatedAttachConfig{
|
||
|
|
AttachLimit: 1,
|
||
|
|
ActiveLimit: 2,
|
||
|
|
LaneLimit: 1,
|
||
|
|
Retry: 5,
|
||
|
|
Backoff: 400 * time.Millisecond,
|
||
|
|
DialTimeout: 20 * time.Second,
|
||
|
|
HelloTimeout: 30 * time.Second,
|
||
|
|
}, BulkOpenTuning{
|
||
|
|
ChunkSize: 128 * 1024,
|
||
|
|
WindowBytes: 1 * 1024 * 1024,
|
||
|
|
MaxInFlight: 8,
|
||
|
|
}
|
||
|
|
default:
|
||
|
|
return BulkOpenModeShared, BulkDedicatedAttachConfig{
|
||
|
|
AttachLimit: defaultBulkDedicatedAttachLimit,
|
||
|
|
ActiveLimit: defaultBulkDedicatedActiveLimit,
|
||
|
|
LaneLimit: defaultBulkDedicatedLaneLimit,
|
||
|
|
Retry: defaultBulkDedicatedAttachRetry,
|
||
|
|
Backoff: defaultBulkDedicatedAttachBackoff,
|
||
|
|
DialTimeout: defaultBulkDedicatedDialTimeout,
|
||
|
|
HelloTimeout: defaultBulkDedicatedHelloTimeout,
|
||
|
|
}, defaultBulkOpenTuning()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizeBulkDedicatedAttachConfig(cfg BulkDedicatedAttachConfig) BulkDedicatedAttachConfig {
|
||
|
|
if cfg.AttachLimit < 0 {
|
||
|
|
cfg.AttachLimit = 0
|
||
|
|
}
|
||
|
|
if cfg.ActiveLimit < 0 {
|
||
|
|
cfg.ActiveLimit = 0
|
||
|
|
}
|
||
|
|
if cfg.LaneLimit < 0 {
|
||
|
|
cfg.LaneLimit = 0
|
||
|
|
}
|
||
|
|
if cfg.Retry < 0 {
|
||
|
|
cfg.Retry = 0
|
||
|
|
}
|
||
|
|
if cfg.Backoff <= 0 {
|
||
|
|
cfg.Backoff = defaultBulkDedicatedAttachBackoff
|
||
|
|
}
|
||
|
|
if cfg.DialTimeout <= 0 {
|
||
|
|
cfg.DialTimeout = defaultBulkDedicatedDialTimeout
|
||
|
|
}
|
||
|
|
if cfg.HelloTimeout <= 0 {
|
||
|
|
cfg.HelloTimeout = defaultBulkDedicatedHelloTimeout
|
||
|
|
}
|
||
|
|
return cfg
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) setBulkDedicatedAttachSemaphoreLocked(limit int) {
|
||
|
|
if limit <= 0 {
|
||
|
|
c.bulkDedicatedAttachSem = nil
|
||
|
|
return
|
||
|
|
}
|
||
|
|
c.bulkDedicatedAttachSem = make(chan struct{}, limit)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) applyBulkDedicatedAttachConfigLocked(cfg BulkDedicatedAttachConfig) {
|
||
|
|
cfg = normalizeBulkDedicatedAttachConfig(cfg)
|
||
|
|
c.bulkDedicatedAttachLimit = cfg.AttachLimit
|
||
|
|
c.setBulkDedicatedAttachSemaphoreLocked(cfg.AttachLimit)
|
||
|
|
c.bulkDedicatedActiveLimit = cfg.ActiveLimit
|
||
|
|
c.notifyBulkDedicatedActiveWaitersLocked()
|
||
|
|
c.bulkDedicatedLaneLimit = cfg.LaneLimit
|
||
|
|
c.bulkDedicatedAttachRetry = cfg.Retry
|
||
|
|
c.bulkDedicatedAttachBackoff = cfg.Backoff
|
||
|
|
c.bulkDedicatedDialTimeout = cfg.DialTimeout
|
||
|
|
c.bulkDedicatedHelloTimeout = cfg.HelloTimeout
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) ensureBulkDedicatedActiveWaitLocked() chan struct{} {
|
||
|
|
if c == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
if c.bulkDedicatedActiveWait == nil {
|
||
|
|
c.bulkDedicatedActiveWait = make(chan struct{})
|
||
|
|
}
|
||
|
|
return c.bulkDedicatedActiveWait
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) notifyBulkDedicatedActiveWaitersLocked() {
|
||
|
|
if c == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
waitCh := c.ensureBulkDedicatedActiveWaitLocked()
|
||
|
|
close(waitCh)
|
||
|
|
c.bulkDedicatedActiveWait = make(chan struct{})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) applyBulkOpenTuningLocked(tuning BulkOpenTuning) {
|
||
|
|
c.bulkOpenTuning = normalizeBulkOpenTuning(tuning)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) bulkDefaultOpenModeSnapshot() BulkOpenMode {
|
||
|
|
if c == nil {
|
||
|
|
return BulkOpenModeShared
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
mode := normalizeBulkOpenMode(c.bulkDefaultOpenMode)
|
||
|
|
if mode == BulkOpenModeDefault {
|
||
|
|
mode = BulkOpenModeShared
|
||
|
|
}
|
||
|
|
return mode
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) bulkDedicatedAttachSemaphoreSnapshot() chan struct{} {
|
||
|
|
if c == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
return c.bulkDedicatedAttachSem
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) bulkDedicatedActiveLimitSnapshot() int {
|
||
|
|
if c == nil {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
if c.bulkDedicatedActiveLimit < 0 {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
return c.bulkDedicatedActiveLimit
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) bulkDedicatedActiveWaitSnapshot() chan struct{} {
|
||
|
|
if c == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
return c.ensureBulkDedicatedActiveWaitLocked()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) bulkDedicatedLaneLimitSnapshot() int {
|
||
|
|
if c == nil {
|
||
|
|
return defaultBulkDedicatedLaneLimit
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
limit := c.bulkDedicatedLaneLimit
|
||
|
|
if limit < 0 {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
if limit == 0 {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
return limit
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SetBulkDefaultOpenMode(mode BulkOpenMode) {
|
||
|
|
if c == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
mode = normalizeBulkOpenMode(mode)
|
||
|
|
if mode == BulkOpenModeDefault {
|
||
|
|
mode = BulkOpenModeShared
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
c.bulkDefaultOpenMode = mode
|
||
|
|
c.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) BulkDefaultOpenMode() BulkOpenMode {
|
||
|
|
return c.bulkDefaultOpenModeSnapshot()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) bulkOpenTuningSnapshot() BulkOpenTuning {
|
||
|
|
if c == nil {
|
||
|
|
return defaultBulkOpenTuning()
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
return normalizeBulkOpenTuning(c.bulkOpenTuning)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SetBulkOpenTuning(tuning BulkOpenTuning) {
|
||
|
|
if c == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
c.applyBulkOpenTuningLocked(tuning)
|
||
|
|
c.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) BulkOpenTuning() BulkOpenTuning {
|
||
|
|
return c.bulkOpenTuningSnapshot()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SetBulkDedicatedAttachConfig(cfg BulkDedicatedAttachConfig) {
|
||
|
|
if c == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
c.applyBulkDedicatedAttachConfigLocked(cfg)
|
||
|
|
c.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) BulkDedicatedAttachConfig() BulkDedicatedAttachConfig {
|
||
|
|
if c == nil {
|
||
|
|
return normalizeBulkDedicatedAttachConfig(BulkDedicatedAttachConfig{})
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
return normalizeBulkDedicatedAttachConfig(BulkDedicatedAttachConfig{
|
||
|
|
AttachLimit: c.bulkDedicatedAttachLimit,
|
||
|
|
ActiveLimit: c.bulkDedicatedActiveLimit,
|
||
|
|
LaneLimit: c.bulkDedicatedLaneLimit,
|
||
|
|
Retry: c.bulkDedicatedAttachRetry,
|
||
|
|
Backoff: c.bulkDedicatedAttachBackoff,
|
||
|
|
DialTimeout: c.bulkDedicatedDialTimeout,
|
||
|
|
HelloTimeout: c.bulkDedicatedHelloTimeout,
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SetBulkNetworkProfile(profile BulkNetworkProfile) {
|
||
|
|
if c == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
profile = normalizeBulkNetworkProfile(profile)
|
||
|
|
mode, cfg, tuning := bulkNetworkProfilePreset(profile)
|
||
|
|
c.mu.Lock()
|
||
|
|
c.bulkNetworkProfile = profile
|
||
|
|
c.bulkDefaultOpenMode = mode
|
||
|
|
c.applyBulkDedicatedAttachConfigLocked(cfg)
|
||
|
|
c.applyBulkOpenTuningLocked(tuning)
|
||
|
|
c.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) BulkNetworkProfile() BulkNetworkProfile {
|
||
|
|
if c == nil {
|
||
|
|
return BulkNetworkProfileDefault
|
||
|
|
}
|
||
|
|
c.mu.Lock()
|
||
|
|
defer c.mu.Unlock()
|
||
|
|
return normalizeBulkNetworkProfile(c.bulkNetworkProfile)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) applyBulkOpenTuningLocked(tuning BulkOpenTuning) {
|
||
|
|
s.bulkOpenTuning = normalizeBulkOpenTuning(tuning)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) bulkOpenTuningSnapshot() BulkOpenTuning {
|
||
|
|
if s == nil {
|
||
|
|
return defaultBulkOpenTuning()
|
||
|
|
}
|
||
|
|
s.mu.RLock()
|
||
|
|
defer s.mu.RUnlock()
|
||
|
|
return normalizeBulkOpenTuning(s.bulkOpenTuning)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) SetBulkOpenTuning(tuning BulkOpenTuning) {
|
||
|
|
if s == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
s.mu.Lock()
|
||
|
|
s.applyBulkOpenTuningLocked(tuning)
|
||
|
|
s.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) BulkOpenTuning() BulkOpenTuning {
|
||
|
|
return s.bulkOpenTuningSnapshot()
|
||
|
|
}
|