package notify import ( "context" "errors" "fmt" "io" "net" "time" ) const bulkDispatchRejectTimeout = 300 * time.Millisecond func (c *ClientCommon) dispatchFastBulkFrame(frame bulkFastFrame) { c.dispatchFastBulkFrameWithOwner(frame, nil) } func (c *ClientCommon) dispatchFastBulkFrameWithOwner(frame bulkFastFrame, owner *bulkReadPayloadOwner) { if frame.DataID == 0 { return } runtime := c.getBulkRuntime() if runtime == nil { return } bulk, ok := runtime.lookupByDataID(clientFileScope(), frame.DataID) if !ok { if c.showError || c.debugMode { fmt.Println("client bulk data for unknown data id", frame.DataID) } c.bestEffortRejectInboundBulkData("", frame.DataID, errBulkNotFound.Error()) return } if !bulk.acceptsClientSessionEpoch(c.currentClientSessionEpoch()) { if c.showError || c.debugMode { fmt.Println("client bulk data rejected by stale session epoch", frame.DataID) } detachErr := transportDetachedSessionEpochError() bulk.markReset(detachErr) c.bestEffortRejectInboundBulkData(bulk.ID(), frame.DataID, detachErr.Error()) return } switch frame.Type { case bulkFastPayloadTypeData: var err error if owner != nil { err = bulk.pushChunkWithOwnershipOptionsAndRelease(frame.Payload, true, true, owner.retainChunk()) } else { err = bulk.pushOwnedChunk(frame.Payload) } if err != nil { if c.showError || c.debugMode { fmt.Println("client bulk push chunk error", err) } if !errors.Is(err, io.EOF) { c.bestEffortRejectInboundBulkData(bulk.ID(), frame.DataID, err.Error()) } } case bulkFastPayloadTypeClose: if frame.Flags&bulkFastPayloadFlagFullClose != 0 { bulk.markPeerClosed() return } bulk.markRemoteClosed() case bulkFastPayloadTypeReset: resetErr := errBulkReset if len(frame.Payload) > 0 { resetErr = bulkRemoteResetError(string(frame.Payload)) } bulk.markReset(bulkResetError(resetErr)) case bulkFastPayloadTypeRelease: bytes, chunks, err := decodeBulkDedicatedReleasePayload(frame.Payload) if err != nil { if c.showError || c.debugMode { fmt.Println("client bulk release decode error", err) } c.bestEffortRejectInboundBulkData(bulk.ID(), frame.DataID, err.Error()) return } bulk.releaseOutboundWindow(bytes, chunks) } } func (c *ClientCommon) dispatchFastBulkData(frame bulkFastDataFrame) { c.dispatchFastBulkFrameWithOwner(frame, nil) } func (s *ServerCommon) dispatchFastBulkFrame(logical *LogicalConn, transport *TransportConn, conn net.Conn, frame bulkFastFrame) { s.dispatchFastBulkFrameWithOwner(logical, transport, conn, frame, nil) } func (s *ServerCommon) dispatchFastBulkFrameWithOwner(logical *LogicalConn, transport *TransportConn, conn net.Conn, frame bulkFastFrame, owner *bulkReadPayloadOwner) { if logical == nil || frame.DataID == 0 { return } runtime := s.getBulkRuntime() if runtime == nil { return } bulk, ok := runtime.lookupByDataID(serverFileScope(logical), frame.DataID) if !ok { if s.showError || s.debugMode { fmt.Println("server bulk data for unknown data id", frame.DataID) } s.bestEffortRejectInboundBulkData(logical, transport, conn, "", frame.DataID, errBulkNotFound.Error()) return } if !bulk.acceptsTransportGeneration(transport) { if s.showError || s.debugMode { fmt.Println("server bulk data rejected by transport generation mismatch", frame.DataID) } detachErr := transportDetachedGenerationMismatchError(bulk.TransportGeneration(), transport) s.bestEffortRejectInboundBulkData(logical, transport, conn, bulk.ID(), frame.DataID, detachErr.Error()) return } switch frame.Type { case bulkFastPayloadTypeData: var err error if owner != nil { err = bulk.pushChunkWithOwnershipOptionsAndRelease(frame.Payload, true, true, owner.retainChunk()) } else { err = bulk.pushOwnedChunk(frame.Payload) } if err != nil { if s.showError || s.debugMode { fmt.Println("server bulk push chunk error", err) } if !errors.Is(err, io.EOF) { s.bestEffortRejectInboundBulkData(logical, transport, conn, bulk.ID(), frame.DataID, err.Error()) } } case bulkFastPayloadTypeClose: if frame.Flags&bulkFastPayloadFlagFullClose != 0 { bulk.markPeerClosed() return } bulk.markRemoteClosed() case bulkFastPayloadTypeReset: resetErr := errBulkReset if len(frame.Payload) > 0 { resetErr = bulkRemoteResetError(string(frame.Payload)) } bulk.markReset(bulkResetError(resetErr)) case bulkFastPayloadTypeRelease: bytes, chunks, err := decodeBulkDedicatedReleasePayload(frame.Payload) if err != nil { if s.showError || s.debugMode { fmt.Println("server bulk release decode error", err) } s.bestEffortRejectInboundBulkData(logical, transport, conn, bulk.ID(), frame.DataID, err.Error()) return } bulk.releaseOutboundWindow(bytes, chunks) } } func (s *ServerCommon) dispatchFastBulkData(logical *LogicalConn, transport *TransportConn, conn net.Conn, frame bulkFastDataFrame) { s.dispatchFastBulkFrameWithOwner(logical, transport, conn, frame, nil) } func (c *ClientCommon) tryDispatchBorrowedBulkTransportPayload(payload []byte) bool { if c == nil || len(payload) == 0 { return false } profile := c.clientTransportProtectionSnapshot() plain, plainRelease, err := decryptTransportPayloadCodecOwnedPooled(profile.mode, profile.runtime, profile.msgDe, profile.secretKey, payload) if err != nil { if c.showError || c.debugMode { fmt.Println("client decode transport payload error", err) } return true } owner := newBulkReadPayloadOwner(plainRelease) matched, walkErr := walkBulkFastFrames(plain, func(frame bulkFastFrame) error { c.dispatchFastBulkFrameWithOwner(frame, owner) return nil }) if owner != nil { owner.done() } if !matched { return false } if walkErr != nil && (c.showError || c.debugMode) { fmt.Println("client decode bulk fast payload error", walkErr) } return true } func (s *ServerCommon) tryDispatchBorrowedBulkTransportPayload(source interface{}, payload []byte) bool { if s == nil || len(payload) == 0 { return false } logical, transport := s.resolveInboundSource(source) if logical == nil { return false } plain, plainRelease, err := decryptTransportPayloadCodecOwnedPooled(logical.protectionModeSnapshot(), logical.modernPSKRuntimeSnapshot(), logical.msgDeSnapshot(), logical.secretKeySnapshot(), payload) if err != nil { if s.showError || s.debugMode { fmt.Println("server decode transport payload error", err) } return true } conn := serverInboundConn(source) owner := newBulkReadPayloadOwner(plainRelease) matched, walkErr := walkBulkFastFrames(plain, func(frame bulkFastFrame) error { s.dispatchFastBulkFrameWithOwner(logical, transport, conn, frame, owner) return nil }) if owner != nil { owner.done() } if !matched { return false } if walkErr != nil && (s.showError || s.debugMode) { fmt.Println("server decode bulk fast payload error", walkErr) } return true } func (c *ClientCommon) bestEffortRejectInboundBulkData(bulkID string, dataID uint64, message string) { if c == nil || (bulkID == "" && dataID == 0) { return } ctx, cancel := context.WithTimeout(context.Background(), bulkDispatchRejectTimeout) defer cancel() _, _ = sendBulkResetClient(ctx, c, BulkResetRequest{ BulkID: bulkID, DataID: dataID, Error: message, }) } func (s *ServerCommon) bestEffortRejectInboundBulkData(logical *LogicalConn, transport *TransportConn, conn net.Conn, bulkID string, dataID uint64, message string) { if s == nil || logical == nil || (bulkID == "" && dataID == 0) { return } payload, err := encode(BulkResetRequest{ BulkID: bulkID, DataID: dataID, Error: message, }) if err != nil { return } env, err := wrapTransferMsgEnvelope(TransferMsg{ Key: BulkResetSignalKey, Value: payload, Type: MSG_ASYNC, }, s.sequenceEn) if err != nil { return } _ = s.sendEnvelopeInboundTransport(logical, transport, conn, env) }