notify/transport_write.go
starainrt 4f760f2807
fix: 修复 dedicated bulk attach 竞态并优化 short write 补写路径
- 客户端 dedicated attach 回复改为精确读取单帧,避免 attach reply 与后续 NBR1 数据粘连后被误解析
  - 服务端 accepted attach 改为先 detach transport,再直接回 attach reply,随后立即切入 dedicated bulk read loop
  - transport 读循环在 stop 或 transport ownership 失效后不再继续上推已读数据,避免 handoff 后首包被旧 reader 吃掉
  - dedicated bulk record 写路径改为 full-write,消除 short write 导致的 invalid bulk fast payload
  - 优化 vectored write 补写策略:先尝试一次 writev,未写完时直接顺序补完剩余 buffers,减少重复 WriteTo 开销
  - 放宽 vectored write 能力识别,支持通过 UnwrapConn/WriteBuffers 命中 fast path
  - 修复 dedicated batch 排队路径 payload 复用问题,改为深拷贝 queued items
  - 补齐 dedicated attach、short write、payload clone、transport stop/handoff 等回归测试
2026-04-16 17:27:48 +08:00

215 lines
4.6 KiB
Go

package notify
import (
"b612.me/stario"
"errors"
"io"
"net"
"strings"
"sync"
"time"
)
var transportConnWriteLocks sync.Map
var errTransportFrameQueueUnavailable = errors.New("transport frame queue is unavailable")
type vectoredBuffersWriter interface {
WriteBuffers(*net.Buffers) (int64, error)
}
type vectoredConnUnwrapper interface {
UnwrapConn() net.Conn
}
func writeFullToConn(conn net.Conn, data []byte) error {
if conn == nil {
return net.ErrClosed
}
return withRawConnWriteLock(conn, func(conn net.Conn) error {
return writeFullToConnUnlocked(conn, data)
})
}
func writeFullToConnUnlocked(conn net.Conn, data []byte) error {
if conn == nil {
return net.ErrClosed
}
return writeFullToWriterUnlocked(conn, data)
}
func writeFullToWriterUnlocked(writer io.Writer, data []byte) error {
if writer == nil {
return io.ErrClosedPipe
}
for len(data) > 0 {
n, err := writer.Write(data)
if n > 0 {
data = data[n:]
}
if err != nil {
return err
}
if n == 0 {
return io.ErrNoProgress
}
}
return nil
}
func writeNetBuffersFullUnlocked(conn net.Conn, buffers net.Buffers) error {
if conn == nil {
return net.ErrClosed
}
writer, writeFn := vectoredWriteStrategy(conn)
if writeFn == nil {
return writeRemainingBuffersUnlocked(conn, buffers)
}
n, err := writeFn(&buffers)
if err != nil {
return err
}
if len(buffers) == 0 {
return nil
}
if n == 0 {
return io.ErrNoProgress
}
return writeRemainingBuffersUnlocked(writer, buffers)
}
func vectoredWriteStrategy(conn net.Conn) (io.Writer, func(*net.Buffers) (int64, error)) {
current := conn
for depth := 0; depth < 8 && current != nil; depth++ {
if writer, ok := current.(vectoredBuffersWriter); ok {
target := current
return target, writer.WriteBuffers
}
switch target := current.(type) {
case *net.TCPConn:
return target, func(bufs *net.Buffers) (int64, error) {
return bufs.WriteTo(target)
}
case *net.UnixConn:
return target, func(bufs *net.Buffers) (int64, error) {
return bufs.WriteTo(target)
}
}
unwrapper, ok := current.(vectoredConnUnwrapper)
if !ok {
break
}
next := unwrapper.UnwrapConn()
if next == nil || next == current {
break
}
current = next
}
return nil, nil
}
func writeRemainingBuffersUnlocked(writer io.Writer, buffers net.Buffers) error {
for _, part := range buffers {
if len(part) == 0 {
continue
}
if err := writeFullToWriterUnlocked(writer, part); err != nil {
return err
}
}
return nil
}
func withRawConnWriteLock(conn net.Conn, fn func(net.Conn) error) error {
return withRawConnWriteLockDeadline(conn, time.Time{}, fn)
}
func withRawConnWriteLockDeadline(conn net.Conn, deadline time.Time, fn func(net.Conn) error) error {
if conn == nil {
return net.ErrClosed
}
lock := rawConnWriteLock(conn)
lock.Lock()
defer lock.Unlock()
if !deadline.IsZero() {
if err := conn.SetWriteDeadline(deadline); err != nil {
return err
}
defer func() {
_ = conn.SetWriteDeadline(time.Time{})
}()
}
return fn(conn)
}
func rawConnWriteLock(conn net.Conn) *sync.Mutex {
if conn == nil {
return &sync.Mutex{}
}
if lock, ok := transportConnWriteLocks.Load(conn); ok {
return lock.(*sync.Mutex)
}
lock := &sync.Mutex{}
actual, _ := transportConnWriteLocks.LoadOrStore(conn, lock)
return actual.(*sync.Mutex)
}
func writeFramedPayloadUnlocked(conn net.Conn, queue *stario.StarQueue, payload []byte) error {
if conn == nil {
return net.ErrClosed
}
if queue == nil {
return errTransportFrameQueueUnavailable
}
if isPacketTransportConn(conn) {
return writeFullToConnUnlocked(conn, queue.BuildMessage(payload))
}
return queue.WriteFrameBuffers(conn, payload)
}
func writeFramedPayloadBatchUnlocked(conn net.Conn, queue *stario.StarQueue, payloads [][]byte) error {
if conn == nil {
return net.ErrClosed
}
if queue == nil {
return errTransportFrameQueueUnavailable
}
if len(payloads) == 0 {
return nil
}
if isPacketTransportConn(conn) {
for _, payload := range payloads {
if err := writeFullToConnUnlocked(conn, queue.BuildMessage(payload)); err != nil {
return err
}
}
return nil
}
return queue.WriteFramesBuffers(conn, payloads...)
}
func isPacketTransportConn(conn net.Conn) bool {
if conn == nil {
return false
}
if _, ok := conn.(*net.UDPConn); ok {
return true
}
return isPacketNetwork(addrNetwork(conn.LocalAddr())) || isPacketNetwork(addrNetwork(conn.RemoteAddr()))
}
func addrNetwork(addr net.Addr) string {
if addr == nil {
return ""
}
return addr.Network()
}
func isPacketNetwork(network string) bool {
switch strings.ToLower(network) {
case "udp", "udp4", "udp6":
return true
default:
return false
}
}