- 引入 LogicalConn/TransportConn 分层,ClientConn 保留兼容适配层 - 新增 Stream、Bulk、RecordStream 三条数据面能力及对应控制路径 - 完成 transfer/file 传输内核与状态快照、诊断能力 - 补齐 reconnect、inbound dispatcher、modern psk 等基础模块 - 增加大规模回归、并发与基准测试覆盖 - 更新依赖库
207 lines
5.9 KiB
Go
207 lines
5.9 KiB
Go
package notify
|
|
|
|
import (
|
|
"b612.me/stario"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
func batchedControlEnvelope(env Envelope) bool {
|
|
switch env.Kind {
|
|
case EnvelopeSignal, EnvelopeSignalAck:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func writeDeadlineFromTimeout(timeout time.Duration) time.Time {
|
|
if timeout <= 0 {
|
|
return time.Time{}
|
|
}
|
|
return time.Now().Add(timeout)
|
|
}
|
|
|
|
func (c *ClientCommon) sendHeartbeat() error {
|
|
_, err := c.sendWait(TransferMsg{
|
|
ID: 10000,
|
|
Key: "heartbeat",
|
|
Value: nil,
|
|
Type: MSG_SYS_WAIT,
|
|
}, time.Second*5)
|
|
return err
|
|
}
|
|
|
|
func (c *ClientCommon) handleHeartbeatResult(err error, failedCount int) (int, bool) {
|
|
return c.handleHeartbeatResultWithSession(c.currentClientSessionEpoch(), err, failedCount)
|
|
}
|
|
|
|
func (c *ClientCommon) handleHeartbeatResultWithSession(epoch uint64, err error, failedCount int) (int, bool) {
|
|
if err == nil {
|
|
c.lastHeartbeat = time.Now().Unix()
|
|
return 0, false
|
|
}
|
|
if c.debugMode {
|
|
fmt.Println("failed to recv heartbeat,timeout!")
|
|
}
|
|
failedCount++
|
|
if failedCount < 3 {
|
|
return failedCount, false
|
|
}
|
|
if c.debugMode {
|
|
fmt.Println("heatbeat failed more than 3 times,stop client")
|
|
}
|
|
if !c.stopClientSessionIfCurrent(epoch, "heartbeat failed more than 3 times", errors.New("heartbeat failed more than 3 times")) {
|
|
return failedCount, true
|
|
}
|
|
return failedCount, true
|
|
}
|
|
|
|
func (c *ClientCommon) readFromTransport() (int, []byte, error) {
|
|
return c.readFromTransportBinding(c.clientTransportBindingSnapshot())
|
|
}
|
|
|
|
func (c *ClientCommon) readFromTransportConn(conn net.Conn) (int, []byte, error) {
|
|
return c.readFromTransportBinding(newTransportBinding(conn, nil))
|
|
}
|
|
|
|
func (c *ClientCommon) readFromTransportBinding(binding *transportBinding) (int, []byte, error) {
|
|
return c.readFromTransportBindingWithBuffer(binding, streamReadBuffer())
|
|
}
|
|
|
|
func (c *ClientCommon) readFromTransportBindingWithBuffer(binding *transportBinding, data []byte) (int, []byte, error) {
|
|
if len(data) == 0 {
|
|
data = streamReadBuffer()
|
|
}
|
|
if binding == nil {
|
|
return 0, data, net.ErrClosed
|
|
}
|
|
conn := binding.connSnapshot()
|
|
if conn == nil {
|
|
return 0, data, net.ErrClosed
|
|
}
|
|
if c.maxReadTimeout.Seconds() != 0 {
|
|
_ = conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
|
|
}
|
|
readNum, err := conn.Read(data)
|
|
return readNum, data, err
|
|
}
|
|
|
|
func (c *ClientCommon) handleTransportReadResult(readNum int, data []byte, err error) bool {
|
|
return c.handleTransportReadResultWithSession(c.clientStopContextSnapshot(), c.clientTransportConnSnapshot(), c.clientQueueSnapshot(), readNum, data, err, c.currentClientSessionEpoch())
|
|
}
|
|
|
|
func (c *ClientCommon) handleTransportReadResultWithSession(stopCtx context.Context, conn net.Conn, queue *stario.StarQueue, readNum int, data []byte, err error, epoch uint64) bool {
|
|
return c.handleTransportReadResultWithSessionDispatcher(stopCtx, conn, queue, readNum, data, err, epoch, c.clientInboundDispatcherSnapshot())
|
|
}
|
|
|
|
func (c *ClientCommon) handleTransportReadResultWithSessionDispatcher(stopCtx context.Context, conn net.Conn, queue *stario.StarQueue, readNum int, data []byte, err error, epoch uint64, dispatcher *inboundDispatcher) bool {
|
|
binding := newTransportBinding(conn, queue)
|
|
if err == os.ErrDeadlineExceeded {
|
|
if readNum != 0 && queue != nil {
|
|
if !c.pushMessageFast(queue, data[:readNum], dispatcher) {
|
|
queue.ParseMessage(data[:readNum], "b612")
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
if err != nil {
|
|
if c.showError || c.debugMode {
|
|
fmt.Println("client read error", err)
|
|
}
|
|
select {
|
|
case <-sessionStopChan(stopCtx):
|
|
c.closeClientTransportBinding(binding)
|
|
return false
|
|
default:
|
|
}
|
|
c.stopClientSessionIfCurrent(epoch, "client read error", err)
|
|
return false
|
|
}
|
|
if queue != nil {
|
|
if !c.pushMessageFast(queue, data[:readNum], dispatcher) {
|
|
queue.ParseMessage(data[:readNum], "b612")
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *ClientCommon) pushMessageFast(queue *stario.StarQueue, data []byte, dispatcher *inboundDispatcher) bool {
|
|
if queue == nil || dispatcher == nil || len(data) == 0 {
|
|
return false
|
|
}
|
|
if err := queue.ParseMessageOwned(data, "b612", func(msg stario.MsgQueue) error {
|
|
payload := msg.Msg
|
|
c.wg.Add(1)
|
|
if !dispatcher.Dispatch(clientInboundDispatchSource(), func() {
|
|
defer c.wg.Done()
|
|
now := time.Now()
|
|
if err := c.dispatchInboundTransportPayload(payload, now); err != nil {
|
|
if c.showError || c.debugMode {
|
|
fmt.Println("client decode envelope error", err)
|
|
}
|
|
}
|
|
}) {
|
|
c.wg.Done()
|
|
}
|
|
return nil
|
|
}); err != nil && (c.showError || c.debugMode) {
|
|
fmt.Println("client parse inbound frame error", err)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *ClientCommon) writeToTransport(data []byte) error {
|
|
binding := c.clientTransportBindingSnapshot()
|
|
if binding == nil {
|
|
return net.ErrClosed
|
|
}
|
|
return binding.withConnWriteLock(func(conn net.Conn) error {
|
|
if c.maxWriteTimeout.Seconds() != 0 {
|
|
_ = conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
|
|
}
|
|
return writeFullToConnUnlocked(conn, data)
|
|
})
|
|
}
|
|
|
|
func (c *ClientCommon) writePayloadToTransport(payload []byte) error {
|
|
binding := c.clientTransportBindingSnapshot()
|
|
if binding == nil {
|
|
return net.ErrClosed
|
|
}
|
|
queue := binding.queueSnapshot()
|
|
if queue == nil {
|
|
return errClientSessionQueueUnavailable
|
|
}
|
|
return binding.withConnWriteLock(func(conn net.Conn) error {
|
|
if c.maxWriteTimeout.Seconds() != 0 {
|
|
_ = conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
|
|
}
|
|
return writeFramedPayloadUnlocked(conn, queue, payload)
|
|
})
|
|
}
|
|
|
|
func (c *ClientCommon) writeControlPayloadToTransport(payload []byte) error {
|
|
binding := c.clientTransportBindingSnapshot()
|
|
if binding == nil {
|
|
return net.ErrClosed
|
|
}
|
|
queue := binding.queueSnapshot()
|
|
if queue == nil {
|
|
return errClientSessionQueueUnavailable
|
|
}
|
|
conn := binding.connSnapshot()
|
|
if conn == nil || isPacketTransportConn(conn) {
|
|
return c.writePayloadToTransport(payload)
|
|
}
|
|
sender := binding.controlBatchSenderSnapshot()
|
|
if sender == nil {
|
|
return c.writePayloadToTransport(payload)
|
|
}
|
|
return sender.submit(payload, writeDeadlineFromTimeout(c.maxWriteTimeout))
|
|
}
|