notify/stream_dispatcher.go

189 lines
5.8 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
)
const streamDispatchRejectTimeout = 300 * time.Millisecond
func (c *ClientCommon) dispatchStreamEnvelope(env Envelope) {
streamID := env.Stream.StreamID
if streamID == "" {
return
}
runtime := c.getStreamRuntime()
if runtime == nil {
return
}
stream, ok := runtime.lookup(clientFileScope(), streamID)
if !ok {
if c.showError || c.debugMode {
fmt.Println("client stream data for unknown stream", streamID)
}
c.bestEffortRejectInboundStreamData(streamID, 0, errStreamNotFound.Error())
return
}
if !stream.acceptsClientSessionEpoch(c.currentClientSessionEpoch()) {
if c.showError || c.debugMode {
fmt.Println("client stream data rejected by stale session epoch", streamID)
}
detachErr := transportDetachedSessionEpochError()
stream.markReset(detachErr)
c.bestEffortRejectInboundStreamData(streamID, stream.dataIDSnapshot(), detachErr.Error())
return
}
if err := stream.pushChunk(env.Stream.Chunk); err != nil {
if c.showError || c.debugMode {
fmt.Println("client stream push chunk error", err)
}
if !errors.Is(err, io.EOF) {
c.bestEffortRejectInboundStreamData(streamID, stream.dataIDSnapshot(), err.Error())
}
}
}
func (s *ServerCommon) dispatchStreamEnvelope(logical *LogicalConn, transport *TransportConn, conn net.Conn, env Envelope) {
streamID := env.Stream.StreamID
if streamID == "" || logical == nil {
return
}
runtime := s.getStreamRuntime()
if runtime == nil {
return
}
stream, ok := runtime.lookup(serverFileScope(logical), streamID)
if !ok {
if s.showError || s.debugMode {
fmt.Println("server stream data for unknown stream", streamID)
}
s.bestEffortRejectInboundStreamData(logical, transport, conn, streamID, 0, errStreamNotFound.Error())
return
}
if !stream.acceptsTransportGeneration(transport) {
if s.showError || s.debugMode {
fmt.Println("server stream data rejected by transport generation mismatch", streamID)
}
detachErr := transportDetachedGenerationMismatchError(stream.TransportGeneration(), transport)
s.bestEffortRejectInboundStreamData(logical, transport, conn, streamID, stream.dataIDSnapshot(), detachErr.Error())
return
}
if err := stream.pushChunk(env.Stream.Chunk); err != nil {
if s.showError || s.debugMode {
fmt.Println("server stream push chunk error", err)
}
if !errors.Is(err, io.EOF) {
s.bestEffortRejectInboundStreamData(logical, transport, conn, streamID, stream.dataIDSnapshot(), err.Error())
}
}
}
func (c *ClientCommon) dispatchFastStreamData(frame streamFastDataFrame) {
if frame.DataID == 0 {
return
}
runtime := c.getStreamRuntime()
if runtime == nil {
return
}
stream, ok := runtime.lookupByDataID(clientFileScope(), frame.DataID)
if !ok {
if c.showError || c.debugMode {
fmt.Println("client stream data for unknown data id", frame.DataID)
}
c.bestEffortRejectInboundStreamData("", frame.DataID, errStreamNotFound.Error())
return
}
if !stream.acceptsClientSessionEpoch(c.currentClientSessionEpoch()) {
if c.showError || c.debugMode {
fmt.Println("client stream data rejected by stale session epoch", frame.DataID)
}
detachErr := transportDetachedSessionEpochError()
stream.markReset(detachErr)
c.bestEffortRejectInboundStreamData(stream.ID(), frame.DataID, detachErr.Error())
return
}
if err := stream.pushOwnedChunk(frame.Payload); err != nil {
if c.showError || c.debugMode {
fmt.Println("client stream push chunk error", err)
}
if !errors.Is(err, io.EOF) {
c.bestEffortRejectInboundStreamData(stream.ID(), frame.DataID, err.Error())
}
}
}
func (s *ServerCommon) dispatchFastStreamData(logical *LogicalConn, transport *TransportConn, conn net.Conn, frame streamFastDataFrame) {
if logical == nil || frame.DataID == 0 {
return
}
runtime := s.getStreamRuntime()
if runtime == nil {
return
}
stream, ok := runtime.lookupByDataID(serverFileScope(logical), frame.DataID)
if !ok {
if s.showError || s.debugMode {
fmt.Println("server stream data for unknown data id", frame.DataID)
}
s.bestEffortRejectInboundStreamData(logical, transport, conn, "", frame.DataID, errStreamNotFound.Error())
return
}
if !stream.acceptsTransportGeneration(transport) {
if s.showError || s.debugMode {
fmt.Println("server stream data rejected by transport generation mismatch", frame.DataID)
}
detachErr := transportDetachedGenerationMismatchError(stream.TransportGeneration(), transport)
s.bestEffortRejectInboundStreamData(logical, transport, conn, stream.ID(), frame.DataID, detachErr.Error())
return
}
if err := stream.pushOwnedChunk(frame.Payload); err != nil {
if s.showError || s.debugMode {
fmt.Println("server stream push chunk error", err)
}
if !errors.Is(err, io.EOF) {
s.bestEffortRejectInboundStreamData(logical, transport, conn, stream.ID(), frame.DataID, err.Error())
}
}
}
func (c *ClientCommon) bestEffortRejectInboundStreamData(streamID string, dataID uint64, message string) {
if c == nil || (streamID == "" && dataID == 0) {
return
}
ctx, cancel := context.WithTimeout(context.Background(), streamDispatchRejectTimeout)
defer cancel()
_, _ = sendStreamResetClient(ctx, c, StreamResetRequest{
StreamID: streamID,
DataID: dataID,
Error: message,
})
}
func (s *ServerCommon) bestEffortRejectInboundStreamData(logical *LogicalConn, transport *TransportConn, conn net.Conn, streamID string, dataID uint64, message string) {
if s == nil || logical == nil || (streamID == "" && dataID == 0) {
return
}
payload, err := encode(StreamResetRequest{
StreamID: streamID,
DataID: dataID,
Error: message,
})
if err != nil {
return
}
env, err := wrapTransferMsgEnvelope(TransferMsg{
Key: StreamResetSignalKey,
Value: payload,
Type: MSG_ASYNC,
}, s.sequenceEn)
if err != nil {
return
}
_ = s.sendEnvelopeInboundTransport(logical, transport, conn, env)
}