130 lines
3.2 KiB
Go
130 lines
3.2 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
const defaultStreamInboundQueueLimit = 128
|
||
|
|
|
||
|
|
const defaultStreamInboundBufferedBytesLimit = 8 * 1024 * 1024
|
||
|
|
|
||
|
|
const defaultStreamOutboundWindowBytes = 512 * 1024
|
||
|
|
|
||
|
|
const defaultStreamOutboundMaxInFlightChunks = 8
|
||
|
|
|
||
|
|
type StreamConfig struct {
|
||
|
|
ChunkSize int
|
||
|
|
InboundQueueLimit int
|
||
|
|
InboundBufferedBytesLimit int
|
||
|
|
OutboundWindowBytes int
|
||
|
|
OutboundMaxInFlightChunks int
|
||
|
|
}
|
||
|
|
|
||
|
|
type streamConfig struct {
|
||
|
|
ChunkSize int
|
||
|
|
InboundQueueLimit int
|
||
|
|
InboundBufferedBytesLimit int
|
||
|
|
OutboundWindowBytes int
|
||
|
|
OutboundMaxInFlightChunks int
|
||
|
|
}
|
||
|
|
|
||
|
|
func defaultStreamConfig() streamConfig {
|
||
|
|
return streamConfig{
|
||
|
|
ChunkSize: defaultFileChunkSize,
|
||
|
|
InboundQueueLimit: defaultStreamInboundQueueLimit,
|
||
|
|
InboundBufferedBytesLimit: defaultStreamInboundBufferedBytesLimit,
|
||
|
|
OutboundWindowBytes: defaultStreamOutboundWindowBytes,
|
||
|
|
OutboundMaxInFlightChunks: defaultStreamOutboundMaxInFlightChunks,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizeStreamConfig(cfg streamConfig) streamConfig {
|
||
|
|
defaults := defaultStreamConfig()
|
||
|
|
if cfg.ChunkSize <= 0 {
|
||
|
|
cfg.ChunkSize = defaults.ChunkSize
|
||
|
|
}
|
||
|
|
if cfg.InboundQueueLimit <= 0 {
|
||
|
|
cfg.InboundQueueLimit = defaults.InboundQueueLimit
|
||
|
|
}
|
||
|
|
if cfg.InboundBufferedBytesLimit <= 0 {
|
||
|
|
cfg.InboundBufferedBytesLimit = defaults.InboundBufferedBytesLimit
|
||
|
|
}
|
||
|
|
if cfg.OutboundWindowBytes <= 0 {
|
||
|
|
cfg.OutboundWindowBytes = defaults.OutboundWindowBytes
|
||
|
|
}
|
||
|
|
if cfg.OutboundMaxInFlightChunks <= 0 {
|
||
|
|
cfg.OutboundMaxInFlightChunks = defaults.OutboundMaxInFlightChunks
|
||
|
|
}
|
||
|
|
return cfg
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizePublicStreamConfig(cfg StreamConfig) StreamConfig {
|
||
|
|
return StreamConfig(normalizeStreamConfig(streamConfig(cfg)))
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *streamRuntime) configSnapshot() streamConfig {
|
||
|
|
if r == nil {
|
||
|
|
return defaultStreamConfig()
|
||
|
|
}
|
||
|
|
r.mu.RLock()
|
||
|
|
cfg := normalizeStreamConfig(r.cfg)
|
||
|
|
r.mu.RUnlock()
|
||
|
|
return cfg
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *streamRuntime) applyConfig(cfg streamConfig) {
|
||
|
|
if r == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
cfg = normalizeStreamConfig(cfg)
|
||
|
|
r.mu.Lock()
|
||
|
|
r.cfg = cfg
|
||
|
|
flow := r.flow
|
||
|
|
r.mu.Unlock()
|
||
|
|
if flow != nil {
|
||
|
|
flow.applyConfig(cfg)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) GetStreamConfig() StreamConfig {
|
||
|
|
if c == nil {
|
||
|
|
return normalizePublicStreamConfig(StreamConfig{})
|
||
|
|
}
|
||
|
|
if runtime := c.getStreamRuntime(); runtime != nil {
|
||
|
|
return normalizePublicStreamConfig(StreamConfig(runtime.configSnapshot()))
|
||
|
|
}
|
||
|
|
return normalizePublicStreamConfig(StreamConfig{})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) GetStreamConfig() StreamConfig {
|
||
|
|
if s == nil {
|
||
|
|
return normalizePublicStreamConfig(StreamConfig{})
|
||
|
|
}
|
||
|
|
if runtime := s.getStreamRuntime(); runtime != nil {
|
||
|
|
return normalizePublicStreamConfig(StreamConfig(runtime.configSnapshot()))
|
||
|
|
}
|
||
|
|
return normalizePublicStreamConfig(StreamConfig{})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) SetStreamConfig(cfg StreamConfig) {
|
||
|
|
c.setStreamConfig(streamConfig(cfg))
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) SetStreamConfig(cfg StreamConfig) {
|
||
|
|
s.setStreamConfig(streamConfig(cfg))
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) setStreamConfig(cfg streamConfig) {
|
||
|
|
if c == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if runtime := c.getStreamRuntime(); runtime != nil {
|
||
|
|
runtime.applyConfig(cfg)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) setStreamConfig(cfg streamConfig) {
|
||
|
|
if s == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if runtime := s.getStreamRuntime(); runtime != nil {
|
||
|
|
runtime.applyConfig(cfg)
|
||
|
|
}
|
||
|
|
}
|