package notify import ( "b612.me/stario" "context" "errors" "fmt" "net" "os" "time" ) func batchedControlEnvelope(env Envelope) bool { switch env.Kind { case EnvelopeSignal, EnvelopeSignalAck: return true default: return false } } func writeDeadlineFromTimeout(timeout time.Duration) time.Time { if timeout <= 0 { return time.Time{} } return time.Now().Add(timeout) } func (c *ClientCommon) sendHeartbeat() error { _, err := c.sendWait(TransferMsg{ ID: 10000, Key: "heartbeat", Value: nil, Type: MSG_SYS_WAIT, }, time.Second*5) return err } func (c *ClientCommon) handleHeartbeatResult(err error, failedCount int) (int, bool) { return c.handleHeartbeatResultWithSession(c.currentClientSessionEpoch(), err, failedCount) } func (c *ClientCommon) handleHeartbeatResultWithSession(epoch uint64, err error, failedCount int) (int, bool) { if err == nil { c.lastHeartbeat = time.Now().Unix() return 0, false } if c.debugMode { fmt.Println("failed to recv heartbeat,timeout!") } failedCount++ if failedCount < 3 { return failedCount, false } if c.debugMode { fmt.Println("heatbeat failed more than 3 times,stop client") } if !c.stopClientSessionIfCurrent(epoch, "heartbeat failed more than 3 times", errors.New("heartbeat failed more than 3 times")) { return failedCount, true } return failedCount, true } func (c *ClientCommon) readFromTransport() (int, []byte, error) { return c.readFromTransportBinding(c.clientTransportBindingSnapshot()) } func (c *ClientCommon) readFromTransportConn(conn net.Conn) (int, []byte, error) { return c.readFromTransportBinding(newTransportBinding(conn, nil)) } func (c *ClientCommon) readFromTransportBinding(binding *transportBinding) (int, []byte, error) { return c.readFromTransportBindingWithBuffer(binding, streamReadBuffer()) } func (c *ClientCommon) readFromTransportBindingWithBuffer(binding *transportBinding, data []byte) (int, []byte, error) { if len(data) == 0 { data = streamReadBuffer() } if binding == nil { return 0, data, net.ErrClosed } conn := binding.connSnapshot() if conn == nil { return 0, data, net.ErrClosed } if c.maxReadTimeout.Seconds() != 0 { _ = conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)) } readNum, err := conn.Read(data) return readNum, data, err } func (c *ClientCommon) handleTransportReadResult(readNum int, data []byte, err error) bool { return c.handleTransportReadResultWithSession(c.clientStopContextSnapshot(), c.clientTransportConnSnapshot(), c.clientQueueSnapshot(), readNum, data, err, c.currentClientSessionEpoch()) } func (c *ClientCommon) handleTransportReadResultWithSession(stopCtx context.Context, conn net.Conn, queue *stario.StarQueue, readNum int, data []byte, err error, epoch uint64) bool { return c.handleTransportReadResultWithSessionDispatcher(stopCtx, conn, queue, readNum, data, err, epoch, c.clientInboundDispatcherSnapshot()) } func (c *ClientCommon) handleTransportReadResultWithSessionDispatcher(stopCtx context.Context, conn net.Conn, queue *stario.StarQueue, readNum int, data []byte, err error, epoch uint64, dispatcher *inboundDispatcher) bool { binding := newTransportBinding(conn, queue) if err == os.ErrDeadlineExceeded { if readNum != 0 && queue != nil { if !c.pushMessageFast(queue, data[:readNum], dispatcher) { queue.ParseMessage(data[:readNum], "b612") } } return true } if err != nil { if c.showError || c.debugMode { fmt.Println("client read error", err) } select { case <-sessionStopChan(stopCtx): c.closeClientTransportBinding(binding) return false default: } c.stopClientSessionIfCurrent(epoch, "client read error", err) return false } if queue != nil { if !c.pushMessageFast(queue, data[:readNum], dispatcher) { queue.ParseMessage(data[:readNum], "b612") } } return true } func (c *ClientCommon) pushMessageFast(queue *stario.StarQueue, data []byte, dispatcher *inboundDispatcher) bool { if queue == nil || dispatcher == nil || len(data) == 0 { return false } if err := queue.ParseMessageOwned(data, "b612", func(msg stario.MsgQueue) error { payload := msg.Msg c.wg.Add(1) if !dispatcher.Dispatch(clientInboundDispatchSource(), func() { defer c.wg.Done() now := time.Now() if err := c.dispatchInboundTransportPayload(payload, now); err != nil { if c.showError || c.debugMode { fmt.Println("client decode envelope error", err) } } }) { c.wg.Done() } return nil }); err != nil && (c.showError || c.debugMode) { fmt.Println("client parse inbound frame error", err) } return true } func (c *ClientCommon) writeToTransport(data []byte) error { binding := c.clientTransportBindingSnapshot() if binding == nil { return net.ErrClosed } return binding.withConnWriteLock(func(conn net.Conn) error { if c.maxWriteTimeout.Seconds() != 0 { _ = conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)) } return writeFullToConnUnlocked(conn, data) }) } func (c *ClientCommon) writePayloadToTransport(payload []byte) error { binding := c.clientTransportBindingSnapshot() if binding == nil { return net.ErrClosed } queue := binding.queueSnapshot() if queue == nil { return errClientSessionQueueUnavailable } return binding.withConnWriteLock(func(conn net.Conn) error { if c.maxWriteTimeout.Seconds() != 0 { _ = conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)) } return writeFramedPayloadUnlocked(conn, queue, payload) }) } func (c *ClientCommon) writeControlPayloadToTransport(payload []byte) error { binding := c.clientTransportBindingSnapshot() if binding == nil { return net.ErrClosed } queue := binding.queueSnapshot() if queue == nil { return errClientSessionQueueUnavailable } conn := binding.connSnapshot() if conn == nil || isPacketTransportConn(conn) { return c.writePayloadToTransport(payload) } sender := binding.controlBatchSenderSnapshot() if sender == nil { return c.writePayloadToTransport(payload) } return sender.submit(payload, writeDeadlineFromTimeout(c.maxWriteTimeout)) }