2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
|
|
|
|
import (
|
2026-04-18 16:05:57 +08:00
|
|
|
"b612.me/stario"
|
2026-04-15 15:24:36 +08:00
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"net"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type TransportConn struct {
|
|
|
|
|
logical *LogicalConn
|
|
|
|
|
generation uint64
|
|
|
|
|
remoteAddr net.Addr
|
|
|
|
|
attached bool
|
|
|
|
|
hasRuntimeConn bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const (
|
2026-04-18 16:05:57 +08:00
|
|
|
transportStreamReadBufferSize = 1024 * 1024
|
2026-04-15 15:24:36 +08:00
|
|
|
transportPacketReadBufferSize = 64 * 1024
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func streamReadBuffer() []byte {
|
|
|
|
|
return make([]byte, transportStreamReadBufferSize)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func packetReadBuffer() []byte {
|
|
|
|
|
return make([]byte, transportPacketReadBufferSize)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func newTransportFrameReader(conn net.Conn, queue *stario.StarQueue) *stario.FrameReader {
|
|
|
|
|
reader := stario.NewFrameReader(conn, queue)
|
|
|
|
|
if reader == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if transportStreamReadBufferSize > stario.DefaultFrameReaderBufferSize {
|
|
|
|
|
reader.SetReadBufferSize(transportStreamReadBufferSize)
|
|
|
|
|
}
|
|
|
|
|
return reader
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
type TransportConnRuntimeSnapshot struct {
|
|
|
|
|
ClientID string
|
|
|
|
|
RemoteAddress string
|
|
|
|
|
BindingOwner string
|
|
|
|
|
LogicalAlive bool
|
|
|
|
|
BindingCurrent bool
|
|
|
|
|
LogicalReason string
|
|
|
|
|
LogicalError string
|
|
|
|
|
TransportGeneration uint64
|
|
|
|
|
Attached bool
|
|
|
|
|
HasRuntimeConn bool
|
|
|
|
|
UsesStreamTransport bool
|
|
|
|
|
Current bool
|
|
|
|
|
TransportDetachReason string
|
|
|
|
|
TransportDetachKind string
|
|
|
|
|
TransportDetachGeneration uint64
|
|
|
|
|
TransportDetachError string
|
|
|
|
|
TransportDetachedAt time.Time
|
|
|
|
|
ReattachEligible bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type transportConnServerSender interface {
|
|
|
|
|
sendTransport(*TransportConn, TransferMsg) (WaitMsg, error)
|
|
|
|
|
sendTransportWait(*TransportConn, TransferMsg, time.Duration) (Message, error)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type transportConnServerAPI interface {
|
|
|
|
|
transportConnServerSender
|
|
|
|
|
SendCtxTransport(context.Context, *TransportConn, string, MsgVal) (Message, error)
|
|
|
|
|
SendObjTransport(*TransportConn, string, interface{}) error
|
|
|
|
|
SendObjCtxTransport(context.Context, *TransportConn, string, interface{}) (Message, error)
|
|
|
|
|
SendWaitObjTransport(*TransportConn, string, interface{}, time.Duration) (Message, error)
|
|
|
|
|
SendFileTransport(context.Context, *TransportConn, string) error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type serverUDPTransportRuntimeReader interface {
|
|
|
|
|
serverUDPListenerSnapshot() *net.UDPConn
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var errTransportConnRuntimeSnapshotNil = errors.New("transport conn runtime snapshot target is nil")
|
|
|
|
|
|
|
|
|
|
func (c *ClientConn) clientConnRemoteAddrSnapshot() net.Addr {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return c.clientConnLogicalPeerStateSnapshot().clientAddr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientConn) CurrentTransportConn() *TransportConn {
|
|
|
|
|
return c.currentTransportConnSnapshot()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientConn) currentTransportConnSnapshot() *TransportConn {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
logical := c.LogicalConn()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return logical.currentTransportConnSnapshot()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *LogicalConn) currentTransportConnSnapshot() *TransportConn {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
logical := c
|
|
|
|
|
remoteAddr := c.RemoteAddr()
|
|
|
|
|
hasRuntimeConn := c.transportSnapshot() != nil
|
|
|
|
|
server := c.Server()
|
|
|
|
|
if server != nil {
|
|
|
|
|
if reader, ok := server.(serverUDPTransportRuntimeReader); ok && reader.serverUDPListenerSnapshot() != nil {
|
|
|
|
|
if remoteAddr == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return &TransportConn{
|
|
|
|
|
logical: logical,
|
|
|
|
|
generation: c.transportGenerationSnapshot(),
|
|
|
|
|
remoteAddr: remoteAddr,
|
|
|
|
|
attached: true,
|
|
|
|
|
hasRuntimeConn: hasRuntimeConn,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !c.transportAttachedSnapshot() {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return &TransportConn{
|
|
|
|
|
logical: logical,
|
|
|
|
|
generation: c.transportGenerationSnapshot(),
|
|
|
|
|
remoteAddr: remoteAddr,
|
|
|
|
|
attached: true,
|
|
|
|
|
hasRuntimeConn: hasRuntimeConn,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) logicalConnSnapshot() *LogicalConn {
|
|
|
|
|
if t == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return t.logical
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) LogicalConn() *LogicalConn {
|
|
|
|
|
return t.logicalConnSnapshot()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) ClientID() string {
|
|
|
|
|
logical := t.logicalConnSnapshot()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
return logical.ID()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) RemoteAddr() net.Addr {
|
|
|
|
|
if t == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return t.remoteAddr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) TransportGeneration() uint64 {
|
|
|
|
|
if t == nil {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
return t.generation
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) Attached() bool {
|
|
|
|
|
return t != nil && t.attached
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) HasRuntimeConn() bool {
|
|
|
|
|
return t != nil && t.hasRuntimeConn
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) UsesStreamTransport() bool {
|
|
|
|
|
logical := t.logicalConnSnapshot()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
return logical.usesStreamTransportSnapshot()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) IsCurrent() bool {
|
|
|
|
|
if t == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
logical := t.logicalConnSnapshot()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
current := logical.CurrentTransportConn()
|
|
|
|
|
if current == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if current.generation != t.generation {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
return transportConnAddrString(current.remoteAddr) == transportConnAddrString(t.remoteAddr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func transportConnAddrString(addr net.Addr) string {
|
|
|
|
|
if addr == nil {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
return addr.String()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) transportScope() string {
|
|
|
|
|
logical := t.logicalConnSnapshot()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return serverFileDomain + ":unknown"
|
|
|
|
|
}
|
|
|
|
|
return serverTransportScopeByGeneration(logical, t.TransportGeneration())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) deliveryScopes() []string {
|
|
|
|
|
logical := t.logicalConnSnapshot()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return []string{serverFileDomain + ":unknown"}
|
|
|
|
|
}
|
|
|
|
|
base := serverFileScope(logical)
|
|
|
|
|
transport := t.transportScope()
|
|
|
|
|
if transport == base {
|
|
|
|
|
return []string{base}
|
|
|
|
|
}
|
|
|
|
|
return []string{transport, base}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) runtimeSnapshot() TransportConnRuntimeSnapshot {
|
|
|
|
|
snapshot := TransportConnRuntimeSnapshot{
|
|
|
|
|
ClientID: t.ClientID(),
|
|
|
|
|
TransportGeneration: t.TransportGeneration(),
|
|
|
|
|
Attached: t.Attached(),
|
|
|
|
|
HasRuntimeConn: t.HasRuntimeConn(),
|
|
|
|
|
UsesStreamTransport: t.UsesStreamTransport(),
|
|
|
|
|
Current: t.IsCurrent(),
|
|
|
|
|
}
|
|
|
|
|
if addr := t.RemoteAddr(); addr != nil {
|
|
|
|
|
snapshot.RemoteAddress = addr.String()
|
|
|
|
|
}
|
|
|
|
|
if logical := t.logicalConnSnapshot(); logical != nil {
|
|
|
|
|
diag := snapshotBindingDiagnosticsFromLogical(logical, t, t.TransportGeneration())
|
|
|
|
|
snapshot.BindingOwner = diag.BindingOwner
|
|
|
|
|
snapshot.LogicalAlive = diag.BindingAlive
|
|
|
|
|
snapshot.BindingCurrent = diag.BindingCurrent
|
|
|
|
|
snapshot.LogicalReason = diag.BindingReason
|
|
|
|
|
snapshot.LogicalError = diag.BindingError
|
|
|
|
|
snapshot.TransportDetachReason = diag.TransportDetachReason
|
|
|
|
|
snapshot.TransportDetachKind = diag.TransportDetachKind
|
|
|
|
|
snapshot.TransportDetachGeneration = diag.TransportDetachGeneration
|
|
|
|
|
snapshot.TransportDetachError = diag.TransportDetachError
|
|
|
|
|
snapshot.TransportDetachedAt = diag.TransportDetachedAt
|
|
|
|
|
snapshot.ReattachEligible = diag.ReattachEligible
|
2026-04-15 19:52:45 +08:00
|
|
|
if snapshot.LogicalAlive && snapshot.TransportDetachReason != "" && !snapshot.Current {
|
|
|
|
|
snapshot.LogicalReason = ""
|
|
|
|
|
snapshot.LogicalError = ""
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
return snapshot
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func GetTransportConnRuntimeSnapshot(t *TransportConn) (TransportConnRuntimeSnapshot, error) {
|
|
|
|
|
if t == nil {
|
|
|
|
|
return TransportConnRuntimeSnapshot{}, errTransportConnRuntimeSnapshotNil
|
|
|
|
|
}
|
|
|
|
|
return t.runtimeSnapshot(), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func GetCurrentTransportConnRuntimeSnapshot(c *ClientConn) (TransportConnRuntimeSnapshot, bool, error) {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return TransportConnRuntimeSnapshot{}, false, errClientConnRuntimeSnapshotNil
|
|
|
|
|
}
|
|
|
|
|
transport := c.CurrentTransportConn()
|
|
|
|
|
if transport == nil {
|
|
|
|
|
return TransportConnRuntimeSnapshot{}, false, nil
|
|
|
|
|
}
|
|
|
|
|
snapshot, err := GetTransportConnRuntimeSnapshot(transport)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return TransportConnRuntimeSnapshot{}, false, err
|
|
|
|
|
}
|
|
|
|
|
return snapshot, true, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) transportConnServerSenderSnapshot() transportConnServerSender {
|
|
|
|
|
logical := t.logicalConnSnapshot()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
server := logical.Server()
|
|
|
|
|
if server == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
sender, _ := server.(transportConnServerSender)
|
|
|
|
|
return sender
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) transportConnServerAPISnapshot() transportConnServerAPI {
|
|
|
|
|
logical := t.logicalConnSnapshot()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
server := logical.Server()
|
|
|
|
|
if server == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
api, _ := server.(transportConnServerAPI)
|
|
|
|
|
return api
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) sendTransfer(msg TransferMsg) (WaitMsg, error) {
|
|
|
|
|
sender := t.transportConnServerSenderSnapshot()
|
|
|
|
|
if sender == nil {
|
|
|
|
|
return WaitMsg{}, transportDetachedErrorForTransport(t)
|
|
|
|
|
}
|
|
|
|
|
return sender.sendTransport(t, msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) sendTransferWait(msg TransferMsg, timeout time.Duration) (Message, error) {
|
|
|
|
|
sender := t.transportConnServerSenderSnapshot()
|
|
|
|
|
if sender == nil {
|
|
|
|
|
return Message{}, transportDetachedErrorForTransport(t)
|
|
|
|
|
}
|
|
|
|
|
return sender.sendTransportWait(t, msg, timeout)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) Send(key string, value MsgVal) error {
|
|
|
|
|
_, err := t.sendTransfer(TransferMsg{
|
|
|
|
|
Key: key,
|
|
|
|
|
Value: value,
|
|
|
|
|
Type: MSG_ASYNC,
|
|
|
|
|
})
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) {
|
|
|
|
|
return t.sendTransferWait(TransferMsg{
|
|
|
|
|
Key: key,
|
|
|
|
|
Value: value,
|
|
|
|
|
Type: MSG_SYNC_ASK,
|
|
|
|
|
}, timeout)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) {
|
|
|
|
|
api := t.transportConnServerAPISnapshot()
|
|
|
|
|
if api == nil {
|
|
|
|
|
return Message{}, transportDetachedErrorForTransport(t)
|
|
|
|
|
}
|
|
|
|
|
return api.SendCtxTransport(ctx, t, key, value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) SendObj(key string, value interface{}) error {
|
|
|
|
|
api := t.transportConnServerAPISnapshot()
|
|
|
|
|
if api == nil {
|
|
|
|
|
return transportDetachedErrorForTransport(t)
|
|
|
|
|
}
|
|
|
|
|
return api.SendObjTransport(t, key, value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) SendObjCtx(ctx context.Context, key string, value interface{}) (Message, error) {
|
|
|
|
|
api := t.transportConnServerAPISnapshot()
|
|
|
|
|
if api == nil {
|
|
|
|
|
return Message{}, transportDetachedErrorForTransport(t)
|
|
|
|
|
}
|
|
|
|
|
return api.SendObjCtxTransport(ctx, t, key, value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) {
|
|
|
|
|
api := t.transportConnServerAPISnapshot()
|
|
|
|
|
if api == nil {
|
|
|
|
|
return Message{}, transportDetachedErrorForTransport(t)
|
|
|
|
|
}
|
|
|
|
|
return api.SendWaitObjTransport(t, key, value, timeout)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TransportConn) SendFile(ctx context.Context, filePath string) error {
|
|
|
|
|
api := t.transportConnServerAPISnapshot()
|
|
|
|
|
if api == nil {
|
|
|
|
|
return transportDetachedErrorForTransport(t)
|
|
|
|
|
}
|
|
|
|
|
return api.SendFileTransport(ctx, t, filePath)
|
|
|
|
|
}
|