starssh/pool.go

467 lines
8.6 KiB
Go
Raw Permalink Normal View History

refactor: 重构 starssh 核心运行时并补强 ssh/exec/terminal/sftp 能力 - 拆分原有单体 ssh.go,按职责重组为 types、utils、transport、login、keepalive、session、exec、pool、shell、terminal、forward、hostkey、state 等模块,并补充平台相关实现 - 重做登录与连接运行时,补齐基于 context 的建连、jump/proxy 链路、可配置认证顺序,以及 Unix/Windows 下的 ssh-agent 支持 - 新增正式非交互执行模型 ExecRequest/ExecResult,支持流式输出、溢出统计、超时控制,以及 posix/powershell/cmd/raw 多方言执行 - 保留旧 shell 风格兼容接口,同时让路径/用户探测等 helper 具备跨 shell fallback,避免 Windows 目标继续硬依赖 POSIX 命令 - 新增 TerminalSession 作为原始交互终端基座,提供 IO attach、resize、signal/control、退出状态与关闭原因管理 - 重构端口转发语义,默认复用当前 SSH 连接,并显式提供 detached 的本地/动态转发模式承载隔离场景 - 梳理 keepalive 与取消语义,区分仅取消本次操作和关闭整条连接,并统一连接状态与传输关闭路径 - 围绕新的 session/连接生命周期重做执行池与运行时支撑 - 大幅增强 SFTP 传输链路,补齐更安全的原子替换、校验、进度回调、重试隔离、可复用 client 生命周期与失败语义 - 新增取消语义、keepalive、SFTP、forward、terminal input 等关键回归测试,提升核心链路稳定性
2026-04-26 10:45:39 +08:00
package starssh
import (
"context"
"errors"
"sync"
"time"
)
const defaultExecPoolMaxOpenConns = 4
var ErrExecPoolClosed = errors.New("exec pool is closed")
type ExecPoolConfig struct {
Login LoginInput
MaxOpenConns int
MaxIdleConns int
MaxIdleTime time.Duration
DisableHealthCheck bool
HealthCheckTimeout time.Duration
}
type ExecPoolStats struct {
MaxOpenConns int
MaxIdleConns int
MaxIdleTime time.Duration
OpenConns int
IdleConns int
InUseConns int
}
type ExecPool struct {
loginInfo LoginInput
maxOpen int
maxIdle int
maxIdleTime time.Duration
idle chan *pooledClient
done chan struct{}
closeOnce sync.Once
healthCheckOnAcquire bool
healthCheckTimeout time.Duration
mu sync.Mutex
open int
closed bool
}
type pooledClient struct {
client *StarSSH
idleAt time.Time
}
func NewExecPool(config ExecPoolConfig) *ExecPool {
maxOpen := config.MaxOpenConns
if maxOpen <= 0 {
maxOpen = defaultExecPoolMaxOpenConns
}
maxIdle := config.MaxIdleConns
if maxIdle <= 0 || maxIdle > maxOpen {
maxIdle = maxOpen
}
return &ExecPool{
loginInfo: config.Login,
maxOpen: maxOpen,
maxIdle: maxIdle,
maxIdleTime: normalizeMaxIdleTime(config.MaxIdleTime),
idle: make(chan *pooledClient, maxIdle),
done: make(chan struct{}),
healthCheckOnAcquire: !config.DisableHealthCheck,
healthCheckTimeout: normalizeHealthCheckTimeout(config.HealthCheckTimeout),
}
}
func (p *ExecPool) Exec(ctx context.Context, req ExecRequest) (*ExecResult, error) {
client, err := p.Acquire(ctx)
if err != nil {
return nil, err
}
result, execErr := client.Exec(ctx, req)
if execErr != nil {
p.Discard(client)
return result, execErr
}
if releaseErr := p.Release(client); releaseErr != nil {
return result, releaseErr
}
return result, nil
}
func (p *ExecPool) ExecString(ctx context.Context, command string) (*ExecResult, error) {
return p.Exec(ctx, ExecRequest{
Command: command,
})
}
func (p *ExecPool) ExecStream(ctx context.Context, req ExecRequest, onChunk func(ExecStreamChunk)) (*ExecResult, error) {
client, err := p.Acquire(ctx)
if err != nil {
return nil, err
}
result, execErr := client.ExecStream(ctx, req, onChunk)
if execErr != nil {
p.Discard(client)
return result, execErr
}
if releaseErr := p.Release(client); releaseErr != nil {
return result, releaseErr
}
return result, nil
}
func (p *ExecPool) WarmUp(ctx context.Context, targetIdle int) error {
if p == nil {
return errors.New("exec pool is nil")
}
if ctx == nil {
ctx = context.Background()
}
targetIdle = p.normalizeWarmUpTarget(targetIdle)
if targetIdle == 0 {
return nil
}
for {
if err := ctx.Err(); err != nil {
return err
}
idleCount, create, err := p.tryWarmUp(targetIdle)
if err != nil {
return err
}
if idleCount >= targetIdle || !create {
return nil
}
conn, err := LoginContext(ctx, p.loginInfo)
if err != nil {
p.releaseSlot()
return err
}
if err := p.Release(conn); err != nil {
return err
}
}
}
func (p *ExecPool) Acquire(ctx context.Context) (*StarSSH, error) {
if p == nil {
return nil, errors.New("exec pool is nil")
}
if ctx == nil {
ctx = context.Background()
}
for {
idleClient, create, err := p.tryAcquire()
if err != nil {
return nil, err
}
if idleClient != nil {
client, ok := p.takeIdleClient(ctx, idleClient)
if !ok {
continue
}
return client, nil
}
if create {
conn, err := LoginContext(ctx, p.loginInfo)
if err != nil {
p.releaseSlot()
return nil, err
}
return conn, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-p.done:
return nil, ErrExecPoolClosed
case idleClient = <-p.idle:
if idleClient == nil {
continue
}
client, ok := p.takeIdleClient(ctx, idleClient)
if !ok {
continue
}
return client, nil
}
}
}
func (p *ExecPool) Release(client *StarSSH) error {
if p == nil {
return errors.New("exec pool is nil")
}
if client == nil {
p.releaseSlot()
return nil
}
p.mu.Lock()
if p.closed {
p.mu.Unlock()
p.closeClient(client)
return nil
}
select {
case p.idle <- &pooledClient{
client: client,
idleAt: time.Now(),
}:
p.mu.Unlock()
return nil
default:
p.mu.Unlock()
p.closeClient(client)
return nil
}
}
func (p *ExecPool) Discard(client *StarSSH) {
if p == nil {
return
}
if client == nil {
p.releaseSlot()
return
}
p.closeClient(client)
}
func (p *ExecPool) Stats() ExecPoolStats {
if p == nil {
return ExecPoolStats{}
}
p.mu.Lock()
defer p.mu.Unlock()
idleCount := len(p.idle)
openCount := p.open
inUseCount := openCount - idleCount
if inUseCount < 0 {
inUseCount = 0
}
return ExecPoolStats{
MaxOpenConns: p.maxOpen,
MaxIdleConns: p.maxIdle,
MaxIdleTime: p.maxIdleTime,
OpenConns: openCount,
IdleConns: idleCount,
InUseConns: inUseCount,
}
}
func (p *ExecPool) Close() error {
if p == nil {
return nil
}
var closeErr error
p.closeOnce.Do(func() {
p.mu.Lock()
p.closed = true
idleClients := p.drainIdleLocked()
p.mu.Unlock()
close(p.done)
for _, client := range idleClients {
if err := client.Close(); err != nil && closeErr == nil {
closeErr = err
}
}
})
return closeErr
}
func (p *ExecPool) CloseIdle() error {
if p == nil {
return nil
}
p.mu.Lock()
idleClients := p.drainIdleLocked()
p.mu.Unlock()
var closeErr error
for _, client := range idleClients {
if err := client.Close(); err != nil && closeErr == nil {
closeErr = err
}
}
return closeErr
}
func (p *ExecPool) tryAcquire() (*pooledClient, bool, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil, false, ErrExecPoolClosed
}
select {
case client := <-p.idle:
return client, false, nil
default:
}
if p.open < p.maxOpen {
p.open++
return nil, true, nil
}
return nil, false, nil
}
func (p *ExecPool) tryWarmUp(targetIdle int) (int, bool, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return 0, false, ErrExecPoolClosed
}
idleCount := len(p.idle)
if idleCount >= targetIdle {
return idleCount, false, nil
}
if p.open >= p.maxOpen {
return idleCount, false, nil
}
p.open++
return idleCount, true, nil
}
func (p *ExecPool) normalizeWarmUpTarget(targetIdle int) int {
if p == nil || p.maxIdle <= 0 {
return 0
}
if targetIdle <= 0 {
return p.maxIdle
}
if targetIdle > p.maxIdle {
return p.maxIdle
}
return targetIdle
}
func (p *ExecPool) takeIdleClient(ctx context.Context, idleClient *pooledClient) (*StarSSH, bool) {
if idleClient == nil {
return nil, false
}
if idleClient.client == nil {
p.releaseSlot()
return nil, false
}
if p.isIdleExpired(idleClient) {
p.closePooledClient(idleClient)
return nil, false
}
if err := p.healthCheckClient(ctx, idleClient.client); err != nil {
p.closePooledClient(idleClient)
return nil, false
}
return idleClient.client, true
}
func (p *ExecPool) isIdleExpired(client *pooledClient) bool {
if p == nil || client == nil || client.client == nil {
return false
}
if p.maxIdleTime <= 0 || client.idleAt.IsZero() {
return false
}
return time.Since(client.idleAt) >= p.maxIdleTime
}
func (p *ExecPool) drainIdleLocked() []*StarSSH {
clients := make([]*StarSSH, 0, len(p.idle))
for {
select {
case idleClient := <-p.idle:
if p.open > 0 {
p.open--
}
if idleClient == nil || idleClient.client == nil {
continue
}
clients = append(clients, idleClient.client)
default:
return clients
}
}
}
func (p *ExecPool) releaseSlot() {
p.mu.Lock()
defer p.mu.Unlock()
if p.open > 0 {
p.open--
}
}
func (p *ExecPool) closeClient(client *StarSSH) {
if client != nil {
_ = client.Close()
}
p.releaseSlot()
}
func (p *ExecPool) closePooledClient(client *pooledClient) {
if client == nil {
return
}
if client.client == nil {
p.releaseSlot()
return
}
p.closeClient(client.client)
}
func (p *ExecPool) healthCheckClient(ctx context.Context, client *StarSSH) error {
if client == nil {
return errors.New("ssh client is nil")
}
if !p.healthCheckOnAcquire {
return nil
}
if ctx == nil {
ctx = context.Background()
}
timeout := p.healthCheckTimeout
if timeout > 0 {
healthCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ctx = healthCtx
}
return client.PingContext(ctx)
}
func normalizeHealthCheckTimeout(timeout time.Duration) time.Duration {
if timeout <= 0 {
return defaultKeepAliveTimeout
}
return timeout
}
func normalizeMaxIdleTime(timeout time.Duration) time.Duration {
if timeout <= 0 {
return 0
}
return timeout
}