stario/sync.go
starainrt c8facb5a03
stario: 提升 Go 1.20 基线与交互/队列稳定性
- 提升 go.mod 基线到 Go 1.20,并补齐对应测试
  - 修正 Passwd / PasswdResponseSignal 语义,Ctrl+C 默认退出当前流程
  - 优化 raw terminal redraw、Restore 与 StopUntil 的边界行为
  - 新增 StarPipe、FrameReader/FrameWriter、ReadFullContext/WriteFullContext/CopyContext、IsTerminal/ReadPasswordContext
  - 收口 StarQueue / StarBuffer 语义,删除 EndWrite,统一 Close / Abort 行为
  - 补齐 signal、timeout、queue、terminal、pipe、buffer 的回归测试与 race 覆盖
2026-04-15 14:35:19 +08:00

160 lines
3.2 KiB
Go

package stario
import (
"fmt"
"sync"
)
type waitGroupAddMode uint8
const (
waitGroupAddModeStrict waitGroupAddMode = iota
waitGroupAddModeLoose
)
// WaitGroup is a concurrency-limited sync.WaitGroup variant.
//
// A zero or negative limit means unlimited concurrency. WaitGroup must not be
// copied after first use.
type WaitGroup struct {
wg sync.WaitGroup
mu sync.Mutex
cond *sync.Cond
initOnce sync.Once
maxCount int
running int
addMode waitGroupAddMode
}
// NewWaitGroup creates a WaitGroup with the provided concurrency limit.
func NewWaitGroup(maxCount int) WaitGroup {
if maxCount < 0 {
panic("stario: negative max wait count")
}
return WaitGroup{
maxCount: maxCount,
addMode: waitGroupAddModeStrict,
}
}
func (w *WaitGroup) init() {
w.initOnce.Do(func() {
w.cond = sync.NewCond(&w.mu)
})
}
// Add adjusts the running task count.
//
// Positive deltas may block when the concurrency limit is reached. Negative
// deltas release running slots.
func (w *WaitGroup) Add(delta int) {
w.init()
if delta == 0 {
return
}
if delta < 0 {
w.release(-delta)
return
}
w.acquire(delta)
}
func (w *WaitGroup) acquire(delta int) {
w.mu.Lock()
defer w.mu.Unlock()
if w.maxCount <= 0 {
w.running += delta
w.wg.Add(delta)
return
}
if delta == 1 {
w.wg.Add(1)
for w.maxCount > 0 && w.running >= w.maxCount {
w.cond.Wait()
}
w.running++
return
}
if w.running+delta > w.maxCount {
if w.addMode == waitGroupAddModeStrict {
panic(fmt.Sprintf("stario: WaitGroup.Add(%d) exceeds max limit %d with %d running", delta, w.maxCount, w.running))
}
w.maxCount = w.running + delta
}
w.running += delta
w.wg.Add(delta)
}
func (w *WaitGroup) release(delta int) {
w.mu.Lock()
defer w.mu.Unlock()
if delta > w.running {
panic(fmt.Sprintf("stario: WaitGroup.Done releases %d tasks but only %d running", delta, w.running))
}
w.wg.Add(-delta)
w.running -= delta
w.cond.Broadcast()
}
// Done releases one running task slot.
func (w *WaitGroup) Done() {
w.Add(-1)
}
// Go runs fn in a goroutine while accounting for the concurrency limit.
func (w *WaitGroup) Go(fn func()) {
w.Add(1)
go func() {
defer w.Done()
fn()
}()
}
// Wait blocks until all added work has completed.
func (w *WaitGroup) Wait() {
w.init()
w.wg.Wait()
}
// GetMaxWaitNum returns the current concurrency limit.
func (w *WaitGroup) GetMaxWaitNum() int {
w.init()
w.mu.Lock()
defer w.mu.Unlock()
return w.maxCount
}
// SetMaxWaitNum updates the concurrency limit.
func (w *WaitGroup) SetMaxWaitNum(num int) {
if num < 0 {
panic("stario: negative max wait count")
}
w.init()
w.mu.Lock()
w.maxCount = num
w.mu.Unlock()
w.cond.Broadcast()
}
// SetStrictAddMode controls whether Add(n>1) panics or auto-expands the limit
// when the requested batch exceeds the current capacity.
func (w *WaitGroup) SetStrictAddMode(strict bool) {
w.init()
w.mu.Lock()
if strict {
w.addMode = waitGroupAddModeStrict
} else {
w.addMode = waitGroupAddModeLoose
}
w.mu.Unlock()
w.cond.Broadcast()
}
// StrictAddMode reports whether strict batch-add behavior is enabled.
func (w *WaitGroup) StrictAddMode() bool {
w.init()
w.mu.Lock()
defer w.mu.Unlock()
return w.addMode == waitGroupAddModeStrict
}