2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"b612.me/stario"
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"net"
|
|
|
|
|
"os"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func batchedControlEnvelope(env Envelope) bool {
|
|
|
|
|
switch env.Kind {
|
|
|
|
|
case EnvelopeSignal, EnvelopeSignalAck:
|
|
|
|
|
return true
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func writeDeadlineFromTimeout(timeout time.Duration) time.Time {
|
|
|
|
|
if timeout <= 0 {
|
|
|
|
|
return time.Time{}
|
|
|
|
|
}
|
|
|
|
|
return time.Now().Add(timeout)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) sendHeartbeat() error {
|
|
|
|
|
_, err := c.sendWait(TransferMsg{
|
|
|
|
|
ID: 10000,
|
|
|
|
|
Key: "heartbeat",
|
|
|
|
|
Value: nil,
|
|
|
|
|
Type: MSG_SYS_WAIT,
|
|
|
|
|
}, time.Second*5)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleHeartbeatResult(err error, failedCount int) (int, bool) {
|
|
|
|
|
return c.handleHeartbeatResultWithSession(c.currentClientSessionEpoch(), err, failedCount)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleHeartbeatResultWithSession(epoch uint64, err error, failedCount int) (int, bool) {
|
|
|
|
|
if err == nil {
|
|
|
|
|
c.lastHeartbeat = time.Now().Unix()
|
|
|
|
|
return 0, false
|
|
|
|
|
}
|
|
|
|
|
if c.debugMode {
|
|
|
|
|
fmt.Println("failed to recv heartbeat,timeout!")
|
|
|
|
|
}
|
|
|
|
|
failedCount++
|
|
|
|
|
if failedCount < 3 {
|
|
|
|
|
return failedCount, false
|
|
|
|
|
}
|
|
|
|
|
if c.debugMode {
|
|
|
|
|
fmt.Println("heatbeat failed more than 3 times,stop client")
|
|
|
|
|
}
|
|
|
|
|
if !c.stopClientSessionIfCurrent(epoch, "heartbeat failed more than 3 times", errors.New("heartbeat failed more than 3 times")) {
|
|
|
|
|
return failedCount, true
|
|
|
|
|
}
|
|
|
|
|
return failedCount, true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) readFromTransport() (int, []byte, error) {
|
|
|
|
|
return c.readFromTransportBinding(c.clientTransportBindingSnapshot())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) readFromTransportConn(conn net.Conn) (int, []byte, error) {
|
|
|
|
|
return c.readFromTransportBinding(newTransportBinding(conn, nil))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) readFromTransportBinding(binding *transportBinding) (int, []byte, error) {
|
|
|
|
|
return c.readFromTransportBindingWithBuffer(binding, streamReadBuffer())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) readFromTransportBindingWithBuffer(binding *transportBinding, data []byte) (int, []byte, error) {
|
|
|
|
|
if len(data) == 0 {
|
|
|
|
|
data = streamReadBuffer()
|
|
|
|
|
}
|
|
|
|
|
if binding == nil {
|
|
|
|
|
return 0, data, net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
conn := binding.connSnapshot()
|
|
|
|
|
if conn == nil {
|
|
|
|
|
return 0, data, net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
if c.maxReadTimeout.Seconds() != 0 {
|
|
|
|
|
_ = conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
|
|
|
|
|
}
|
|
|
|
|
readNum, err := conn.Read(data)
|
|
|
|
|
return readNum, data, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleTransportReadResult(readNum int, data []byte, err error) bool {
|
|
|
|
|
return c.handleTransportReadResultWithSession(c.clientStopContextSnapshot(), c.clientTransportConnSnapshot(), c.clientQueueSnapshot(), readNum, data, err, c.currentClientSessionEpoch())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleTransportReadResultWithSession(stopCtx context.Context, conn net.Conn, queue *stario.StarQueue, readNum int, data []byte, err error, epoch uint64) bool {
|
|
|
|
|
return c.handleTransportReadResultWithSessionDispatcher(stopCtx, conn, queue, readNum, data, err, epoch, c.clientInboundDispatcherSnapshot())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleTransportReadResultWithSessionDispatcher(stopCtx context.Context, conn net.Conn, queue *stario.StarQueue, readNum int, data []byte, err error, epoch uint64, dispatcher *inboundDispatcher) bool {
|
|
|
|
|
binding := newTransportBinding(conn, queue)
|
|
|
|
|
if err == os.ErrDeadlineExceeded {
|
|
|
|
|
if readNum != 0 && queue != nil {
|
|
|
|
|
if !c.pushMessageFast(queue, data[:readNum], dispatcher) {
|
|
|
|
|
queue.ParseMessage(data[:readNum], "b612")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
if c.showError || c.debugMode {
|
|
|
|
|
fmt.Println("client read error", err)
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-sessionStopChan(stopCtx):
|
|
|
|
|
c.closeClientTransportBinding(binding)
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
c.stopClientSessionIfCurrent(epoch, "client read error", err)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if queue != nil {
|
|
|
|
|
if !c.pushMessageFast(queue, data[:readNum], dispatcher) {
|
|
|
|
|
queue.ParseMessage(data[:readNum], "b612")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (c *ClientCommon) readTransportPayloadPooled(conn net.Conn, reader *stario.FrameReader) ([]byte, func(), error) {
|
|
|
|
|
if reader == nil {
|
|
|
|
|
return nil, nil, net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
if conn == nil {
|
|
|
|
|
return nil, nil, net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
if c.maxReadTimeout.Seconds() != 0 {
|
|
|
|
|
_ = conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
|
|
|
|
|
}
|
|
|
|
|
return reader.NextPooled()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) handleTransportPayloadReadResultWithSession(stopCtx context.Context, binding *transportBinding, payload []byte, release func(), err error, epoch uint64, dispatcher *inboundDispatcher) bool {
|
|
|
|
|
if err == os.ErrDeadlineExceeded {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
if release != nil {
|
|
|
|
|
release()
|
|
|
|
|
}
|
|
|
|
|
if c.showError || c.debugMode {
|
|
|
|
|
fmt.Println("client read error", err)
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-sessionStopChan(stopCtx):
|
|
|
|
|
c.closeClientTransportBinding(binding)
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
c.stopClientSessionIfCurrent(epoch, "client read error", err)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
c.dispatchTransportPayloadFast(payload, release, dispatcher)
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) dispatchTransportPayloadFast(payload []byte, release func(), dispatcher *inboundDispatcher) {
|
|
|
|
|
if len(payload) == 0 {
|
|
|
|
|
if release != nil {
|
|
|
|
|
release()
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
plain, plainRelease, err := c.decryptTransportPayloadPooled(payload, release)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if c.showError || c.debugMode {
|
|
|
|
|
fmt.Println("client decode transport payload error", err)
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
if c.tryDispatchBorrowedTransportPlain(plain, plainRelease) {
|
|
|
|
|
return
|
2026-04-18 16:05:57 +08:00
|
|
|
}
|
|
|
|
|
if dispatcher == nil {
|
|
|
|
|
now := time.Now()
|
2026-04-20 16:35:44 +08:00
|
|
|
err := c.dispatchInboundTransportPlain(plain, now)
|
|
|
|
|
if plainRelease != nil {
|
|
|
|
|
plainRelease()
|
|
|
|
|
}
|
|
|
|
|
if err != nil && (c.showError || c.debugMode) {
|
2026-04-18 16:05:57 +08:00
|
|
|
fmt.Println("client decode envelope error", err)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
owned := plain
|
|
|
|
|
if plainRelease != nil {
|
|
|
|
|
owned = append([]byte(nil), plain...)
|
|
|
|
|
plainRelease()
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
c.wg.Add(1)
|
|
|
|
|
if !dispatcher.Dispatch(clientInboundDispatchSource(), func() {
|
|
|
|
|
defer c.wg.Done()
|
|
|
|
|
now := time.Now()
|
2026-04-20 16:35:44 +08:00
|
|
|
if err := c.dispatchInboundTransportPlain(owned, now); err != nil && (c.showError || c.debugMode) {
|
2026-04-18 16:05:57 +08:00
|
|
|
fmt.Println("client decode envelope error", err)
|
|
|
|
|
}
|
|
|
|
|
}) {
|
|
|
|
|
c.wg.Done()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (c *ClientCommon) pushMessageFast(queue *stario.StarQueue, data []byte, dispatcher *inboundDispatcher) bool {
|
|
|
|
|
if queue == nil || dispatcher == nil || len(data) == 0 {
|
|
|
|
|
return false
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if err := queue.ParseMessageView(data, "b612", func(frame stario.FrameView) error {
|
|
|
|
|
c.dispatchTransportPayloadFast(frame.Payload, nil, dispatcher)
|
2026-04-15 15:24:36 +08:00
|
|
|
return nil
|
|
|
|
|
}); err != nil && (c.showError || c.debugMode) {
|
|
|
|
|
fmt.Println("client parse inbound frame error", err)
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) writeToTransport(data []byte) error {
|
|
|
|
|
binding := c.clientTransportBindingSnapshot()
|
|
|
|
|
if binding == nil {
|
|
|
|
|
return net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
return binding.withConnWriteLock(func(conn net.Conn) error {
|
|
|
|
|
if c.maxWriteTimeout.Seconds() != 0 {
|
|
|
|
|
_ = conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
|
|
|
|
|
}
|
|
|
|
|
return writeFullToConnUnlocked(conn, data)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) writePayloadToTransport(payload []byte) error {
|
|
|
|
|
binding := c.clientTransportBindingSnapshot()
|
|
|
|
|
if binding == nil {
|
|
|
|
|
return net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
queue := binding.queueSnapshot()
|
|
|
|
|
if queue == nil {
|
|
|
|
|
return errClientSessionQueueUnavailable
|
|
|
|
|
}
|
|
|
|
|
return binding.withConnWriteLock(func(conn net.Conn) error {
|
|
|
|
|
if c.maxWriteTimeout.Seconds() != 0 {
|
|
|
|
|
_ = conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
|
|
|
|
|
}
|
|
|
|
|
return writeFramedPayloadUnlocked(conn, queue, payload)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) writeControlPayloadToTransport(payload []byte) error {
|
|
|
|
|
binding := c.clientTransportBindingSnapshot()
|
|
|
|
|
if binding == nil {
|
|
|
|
|
return net.ErrClosed
|
|
|
|
|
}
|
|
|
|
|
queue := binding.queueSnapshot()
|
|
|
|
|
if queue == nil {
|
|
|
|
|
return errClientSessionQueueUnavailable
|
|
|
|
|
}
|
|
|
|
|
conn := binding.connSnapshot()
|
|
|
|
|
if conn == nil || isPacketTransportConn(conn) {
|
|
|
|
|
return c.writePayloadToTransport(payload)
|
|
|
|
|
}
|
|
|
|
|
sender := binding.controlBatchSenderSnapshot()
|
|
|
|
|
if sender == nil {
|
|
|
|
|
return c.writePayloadToTransport(payload)
|
|
|
|
|
}
|
|
|
|
|
return sender.submit(payload, writeDeadlineFromTimeout(c.maxWriteTimeout))
|
|
|
|
|
}
|