notify/bulk_dispatcher.go

258 lines
7.8 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
)
const bulkDispatchRejectTimeout = 300 * time.Millisecond
func (c *ClientCommon) dispatchFastBulkFrame(frame bulkFastFrame) {
c.dispatchFastBulkFrameWithOwner(frame, nil)
}
func (c *ClientCommon) dispatchFastBulkFrameWithOwner(frame bulkFastFrame, owner *bulkReadPayloadOwner) {
if frame.DataID == 0 {
return
}
runtime := c.getBulkRuntime()
if runtime == nil {
return
}
bulk, ok := runtime.lookupByDataID(clientFileScope(), frame.DataID)
if !ok {
if c.showError || c.debugMode {
fmt.Println("client bulk data for unknown data id", frame.DataID)
}
c.bestEffortRejectInboundBulkData("", frame.DataID, errBulkNotFound.Error())
return
}
if !bulk.acceptsClientSessionEpoch(c.currentClientSessionEpoch()) {
if c.showError || c.debugMode {
fmt.Println("client bulk data rejected by stale session epoch", frame.DataID)
}
detachErr := transportDetachedSessionEpochError()
bulk.markReset(detachErr)
c.bestEffortRejectInboundBulkData(bulk.ID(), frame.DataID, detachErr.Error())
return
}
switch frame.Type {
case bulkFastPayloadTypeData:
var err error
if owner != nil {
err = bulk.pushChunkWithOwnershipOptionsAndRelease(frame.Payload, true, true, owner.retainChunk())
} else {
err = bulk.pushOwnedChunk(frame.Payload)
}
if err != nil {
if c.showError || c.debugMode {
fmt.Println("client bulk push chunk error", err)
}
if !errors.Is(err, io.EOF) {
c.bestEffortRejectInboundBulkData(bulk.ID(), frame.DataID, err.Error())
}
}
case bulkFastPayloadTypeClose:
if frame.Flags&bulkFastPayloadFlagFullClose != 0 {
bulk.markPeerClosed()
return
}
bulk.markRemoteClosed()
case bulkFastPayloadTypeReset:
resetErr := errBulkReset
if len(frame.Payload) > 0 {
resetErr = bulkRemoteResetError(string(frame.Payload))
}
bulk.markReset(bulkResetError(resetErr))
case bulkFastPayloadTypeRelease:
bytes, chunks, err := decodeBulkDedicatedReleasePayload(frame.Payload)
if err != nil {
if c.showError || c.debugMode {
fmt.Println("client bulk release decode error", err)
}
c.bestEffortRejectInboundBulkData(bulk.ID(), frame.DataID, err.Error())
return
}
bulk.releaseOutboundWindow(bytes, chunks)
}
}
func (c *ClientCommon) dispatchFastBulkData(frame bulkFastDataFrame) {
c.dispatchFastBulkFrameWithOwner(frame, nil)
}
func (s *ServerCommon) dispatchFastBulkFrame(logical *LogicalConn, transport *TransportConn, conn net.Conn, frame bulkFastFrame) {
s.dispatchFastBulkFrameWithOwner(logical, transport, conn, frame, nil)
}
func (s *ServerCommon) dispatchFastBulkFrameWithOwner(logical *LogicalConn, transport *TransportConn, conn net.Conn, frame bulkFastFrame, owner *bulkReadPayloadOwner) {
if logical == nil || frame.DataID == 0 {
return
}
runtime := s.getBulkRuntime()
if runtime == nil {
return
}
bulk, ok := runtime.lookupByDataID(serverFileScope(logical), frame.DataID)
if !ok {
if s.showError || s.debugMode {
fmt.Println("server bulk data for unknown data id", frame.DataID)
}
s.bestEffortRejectInboundBulkData(logical, transport, conn, "", frame.DataID, errBulkNotFound.Error())
return
}
if !bulk.acceptsTransportGeneration(transport) {
if s.showError || s.debugMode {
fmt.Println("server bulk data rejected by transport generation mismatch", frame.DataID)
}
detachErr := transportDetachedGenerationMismatchError(bulk.TransportGeneration(), transport)
s.bestEffortRejectInboundBulkData(logical, transport, conn, bulk.ID(), frame.DataID, detachErr.Error())
return
}
switch frame.Type {
case bulkFastPayloadTypeData:
var err error
if owner != nil {
err = bulk.pushChunkWithOwnershipOptionsAndRelease(frame.Payload, true, true, owner.retainChunk())
} else {
err = bulk.pushOwnedChunk(frame.Payload)
}
if err != nil {
if s.showError || s.debugMode {
fmt.Println("server bulk push chunk error", err)
}
if !errors.Is(err, io.EOF) {
s.bestEffortRejectInboundBulkData(logical, transport, conn, bulk.ID(), frame.DataID, err.Error())
}
}
case bulkFastPayloadTypeClose:
if frame.Flags&bulkFastPayloadFlagFullClose != 0 {
bulk.markPeerClosed()
return
}
bulk.markRemoteClosed()
case bulkFastPayloadTypeReset:
resetErr := errBulkReset
if len(frame.Payload) > 0 {
resetErr = bulkRemoteResetError(string(frame.Payload))
}
bulk.markReset(bulkResetError(resetErr))
case bulkFastPayloadTypeRelease:
bytes, chunks, err := decodeBulkDedicatedReleasePayload(frame.Payload)
if err != nil {
if s.showError || s.debugMode {
fmt.Println("server bulk release decode error", err)
}
s.bestEffortRejectInboundBulkData(logical, transport, conn, bulk.ID(), frame.DataID, err.Error())
return
}
bulk.releaseOutboundWindow(bytes, chunks)
}
}
func (s *ServerCommon) dispatchFastBulkData(logical *LogicalConn, transport *TransportConn, conn net.Conn, frame bulkFastDataFrame) {
s.dispatchFastBulkFrameWithOwner(logical, transport, conn, frame, nil)
}
func (c *ClientCommon) tryDispatchBorrowedBulkTransportPayload(payload []byte) bool {
if c == nil || len(payload) == 0 {
return false
}
profile := c.clientTransportProtectionSnapshot()
plain, plainRelease, err := decryptTransportPayloadCodecOwnedPooled(profile.mode, profile.runtime, profile.msgDe, profile.secretKey, payload)
if err != nil {
if c.showError || c.debugMode {
fmt.Println("client decode transport payload error", err)
}
return true
}
owner := newBulkReadPayloadOwner(plainRelease)
matched, walkErr := walkBulkFastFrames(plain, func(frame bulkFastFrame) error {
c.dispatchFastBulkFrameWithOwner(frame, owner)
return nil
})
if owner != nil {
owner.done()
}
if !matched {
return false
}
if walkErr != nil && (c.showError || c.debugMode) {
fmt.Println("client decode bulk fast payload error", walkErr)
}
return true
}
func (s *ServerCommon) tryDispatchBorrowedBulkTransportPayload(source interface{}, payload []byte) bool {
if s == nil || len(payload) == 0 {
return false
}
logical, transport := s.resolveInboundSource(source)
if logical == nil {
return false
}
plain, plainRelease, err := decryptTransportPayloadCodecOwnedPooled(logical.protectionModeSnapshot(), logical.modernPSKRuntimeSnapshot(), logical.msgDeSnapshot(), logical.secretKeySnapshot(), payload)
if err != nil {
if s.showError || s.debugMode {
fmt.Println("server decode transport payload error", err)
}
return true
}
conn := serverInboundConn(source)
owner := newBulkReadPayloadOwner(plainRelease)
matched, walkErr := walkBulkFastFrames(plain, func(frame bulkFastFrame) error {
s.dispatchFastBulkFrameWithOwner(logical, transport, conn, frame, owner)
return nil
})
if owner != nil {
owner.done()
}
if !matched {
return false
}
if walkErr != nil && (s.showError || s.debugMode) {
fmt.Println("server decode bulk fast payload error", walkErr)
}
return true
}
func (c *ClientCommon) bestEffortRejectInboundBulkData(bulkID string, dataID uint64, message string) {
if c == nil || (bulkID == "" && dataID == 0) {
return
}
ctx, cancel := context.WithTimeout(context.Background(), bulkDispatchRejectTimeout)
defer cancel()
_, _ = sendBulkResetClient(ctx, c, BulkResetRequest{
BulkID: bulkID,
DataID: dataID,
Error: message,
})
}
func (s *ServerCommon) bestEffortRejectInboundBulkData(logical *LogicalConn, transport *TransportConn, conn net.Conn, bulkID string, dataID uint64, message string) {
if s == nil || logical == nil || (bulkID == "" && dataID == 0) {
return
}
payload, err := encode(BulkResetRequest{
BulkID: bulkID,
DataID: dataID,
Error: message,
})
if err != nil {
return
}
env, err := wrapTransferMsgEnvelope(TransferMsg{
Key: BulkResetSignalKey,
Value: payload,
Type: MSG_ASYNC,
}, s.sequenceEn)
if err != nil {
return
}
_ = s.sendEnvelopeInboundTransport(logical, transport, conn, env)
}