2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
)
|
2026-04-15 15:24:36 +08:00
|
|
|
|
|
|
|
|
func (s *ServerCommon) SetBulkHandler(fn func(BulkAcceptInfo) error) {
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime.setHandler(fn)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) OpenBulkLogical(ctx context.Context, logical *LogicalConn, opt BulkOpenOptions) (Bulk, error) {
|
2026-04-18 16:05:57 +08:00
|
|
|
opt = normalizeBulkOpenOptions(opt)
|
|
|
|
|
switch opt.Mode {
|
|
|
|
|
case BulkOpenModeDedicated:
|
|
|
|
|
opt.Dedicated = true
|
|
|
|
|
return s.openBulkLogicalWithMode(ctx, logical, opt)
|
|
|
|
|
case BulkOpenModeAuto:
|
|
|
|
|
if err := logicalDedicatedBulkSupportError(logical); err == nil {
|
|
|
|
|
dedicatedOpt := opt
|
|
|
|
|
dedicatedOpt.Mode = BulkOpenModeDedicated
|
|
|
|
|
dedicatedOpt.Dedicated = true
|
|
|
|
|
bulk, dedicatedErr := s.openBulkLogicalWithMode(ctx, logical, dedicatedOpt)
|
|
|
|
|
if dedicatedErr == nil {
|
|
|
|
|
return bulk, nil
|
|
|
|
|
}
|
|
|
|
|
sharedOpt := opt
|
|
|
|
|
sharedOpt.Mode = BulkOpenModeShared
|
|
|
|
|
sharedOpt.Dedicated = false
|
|
|
|
|
sharedBulk, sharedErr := s.openBulkLogicalWithMode(ctx, logical, sharedOpt)
|
|
|
|
|
if sharedErr == nil {
|
|
|
|
|
return sharedBulk, nil
|
|
|
|
|
}
|
|
|
|
|
return nil, errors.Join(dedicatedErr, sharedErr)
|
|
|
|
|
}
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return s.openBulkLogicalWithMode(ctx, logical, opt)
|
|
|
|
|
case BulkOpenModeShared, BulkOpenModeDefault:
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return s.openBulkLogicalWithMode(ctx, logical, opt)
|
|
|
|
|
default:
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return s.openBulkLogicalWithMode(ctx, logical, opt)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) openBulkLogicalWithMode(ctx context.Context, logical *LogicalConn, opt BulkOpenOptions) (Bulk, error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
if s == nil {
|
|
|
|
|
return nil, errBulkServerNil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
opt = applyBulkOpenTuningDefaults(opt, s.bulkOpenTuningSnapshot())
|
2026-04-15 15:24:36 +08:00
|
|
|
if logical == nil {
|
|
|
|
|
return nil, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return nil, errBulkRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
req := serverBulkRequest(runtime, opt)
|
|
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
if req.Dedicated {
|
|
|
|
|
if err := logicalDedicatedBulkSupportError(logical); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !validBulkRange(req.Range) {
|
|
|
|
|
return nil, errBulkRangeInvalid
|
|
|
|
|
}
|
|
|
|
|
if _, exists := runtime.lookup(scope, req.BulkID); exists {
|
|
|
|
|
return nil, errBulkAlreadyExists
|
|
|
|
|
}
|
|
|
|
|
if req.Dedicated {
|
|
|
|
|
if req.DataID == 0 {
|
|
|
|
|
req.DataID = runtime.nextDataID()
|
|
|
|
|
}
|
|
|
|
|
if req.AttachToken == "" {
|
|
|
|
|
req.AttachToken = newBulkAttachToken()
|
|
|
|
|
}
|
|
|
|
|
bulk := newBulkHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, logical.CurrentTransportConn(), logical.transportGenerationSnapshot(), serverBulkCloseSender(s, logical, nil), serverBulkResetSender(s, logical, nil), serverBulkDataSender(s, logical.CurrentTransportConn()), serverBulkWriteSender(s, logical, logical.CurrentTransportConn()), serverBulkReleaseSender(s, logical, logical.CurrentTransportConn()))
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markAcceptHandled()
|
2026-04-15 15:24:36 +08:00
|
|
|
if err := runtime.register(scope, bulk); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
s.attachServerDedicatedSidecarIfExists(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
resp, err := sendBulkOpenServerLogical(ctx, s, logical, req)
|
|
|
|
|
if err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markReset(err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.DataID != 0 && resp.DataID != req.DataID {
|
|
|
|
|
err = errBulkAlreadyExists
|
|
|
|
|
_, _ = sendBulkResetServerLogical(context.Background(), s, logical, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: "bulk dedicated data id mismatch",
|
|
|
|
|
})
|
|
|
|
|
bulk.markReset(err)
|
2026-04-15 15:24:36 +08:00
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.TransportGeneration != 0 {
|
|
|
|
|
bulk.transportGeneration = resp.TransportGeneration
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if resp.FastPathVersion != 0 {
|
|
|
|
|
bulk.fastPathVersion = normalizeBulkFastPathVersion(resp.FastPathVersion)
|
|
|
|
|
}
|
|
|
|
|
if resp.AttachToken != "" {
|
|
|
|
|
bulk.setDedicatedAttachToken(resp.AttachToken)
|
|
|
|
|
}
|
|
|
|
|
if err := bulk.waitAcceptReady(ctx); err != nil {
|
|
|
|
|
_, _ = sendBulkResetServerLogical(context.Background(), s, logical, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
return bulk, nil
|
|
|
|
|
}
|
|
|
|
|
resp, err := sendBulkOpenServerLogical(ctx, s, logical, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.DataID != 0 {
|
|
|
|
|
req.DataID = resp.DataID
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if resp.FastPathVersion != 0 {
|
|
|
|
|
req.FastPathVersion = resp.FastPathVersion
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
req.Dedicated = resp.Dedicated
|
|
|
|
|
if resp.AttachToken != "" {
|
|
|
|
|
req.AttachToken = resp.AttachToken
|
|
|
|
|
}
|
|
|
|
|
if req.DataID == 0 {
|
|
|
|
|
return nil, errBulkDataIDEmpty
|
|
|
|
|
}
|
|
|
|
|
transport := logical.CurrentTransportConn()
|
|
|
|
|
bulk := newBulkHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, transport, resp.TransportGeneration, serverBulkCloseSender(s, logical, nil), serverBulkResetSender(s, logical, nil), serverBulkDataSender(s, transport), serverBulkWriteSender(s, logical, transport), serverBulkReleaseSender(s, logical, transport))
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markAcceptHandled()
|
2026-04-15 15:24:36 +08:00
|
|
|
if err := runtime.register(scope, bulk); err != nil {
|
|
|
|
|
_, _ = sendBulkResetServerLogical(context.Background(), s, logical, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
s.attachServerDedicatedSidecarIfExists(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
return bulk, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) OpenBulkTransport(ctx context.Context, transport *TransportConn, opt BulkOpenOptions) (Bulk, error) {
|
2026-04-18 16:05:57 +08:00
|
|
|
opt = normalizeBulkOpenOptions(opt)
|
|
|
|
|
switch opt.Mode {
|
|
|
|
|
case BulkOpenModeDedicated:
|
|
|
|
|
opt.Dedicated = true
|
|
|
|
|
return s.openBulkTransportWithMode(ctx, transport, opt)
|
|
|
|
|
case BulkOpenModeAuto:
|
|
|
|
|
if err := transportDedicatedBulkSupportError(transport); err == nil {
|
|
|
|
|
dedicatedOpt := opt
|
|
|
|
|
dedicatedOpt.Mode = BulkOpenModeDedicated
|
|
|
|
|
dedicatedOpt.Dedicated = true
|
|
|
|
|
bulk, dedicatedErr := s.openBulkTransportWithMode(ctx, transport, dedicatedOpt)
|
|
|
|
|
if dedicatedErr == nil {
|
|
|
|
|
return bulk, nil
|
|
|
|
|
}
|
|
|
|
|
sharedOpt := opt
|
|
|
|
|
sharedOpt.Mode = BulkOpenModeShared
|
|
|
|
|
sharedOpt.Dedicated = false
|
|
|
|
|
sharedBulk, sharedErr := s.openBulkTransportWithMode(ctx, transport, sharedOpt)
|
|
|
|
|
if sharedErr == nil {
|
|
|
|
|
return sharedBulk, nil
|
|
|
|
|
}
|
|
|
|
|
return nil, errors.Join(dedicatedErr, sharedErr)
|
|
|
|
|
}
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return s.openBulkTransportWithMode(ctx, transport, opt)
|
|
|
|
|
case BulkOpenModeShared, BulkOpenModeDefault:
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return s.openBulkTransportWithMode(ctx, transport, opt)
|
|
|
|
|
default:
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return s.openBulkTransportWithMode(ctx, transport, opt)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) openBulkTransportWithMode(ctx context.Context, transport *TransportConn, opt BulkOpenOptions) (Bulk, error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
if s == nil {
|
|
|
|
|
return nil, errBulkServerNil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
opt = applyBulkOpenTuningDefaults(opt, s.bulkOpenTuningSnapshot())
|
2026-04-15 15:24:36 +08:00
|
|
|
if transport == nil {
|
|
|
|
|
return nil, errBulkTransportNil
|
|
|
|
|
}
|
|
|
|
|
logical := transport.LogicalConn()
|
|
|
|
|
if logical == nil {
|
|
|
|
|
return nil, errBulkLogicalConnNil
|
|
|
|
|
}
|
|
|
|
|
runtime := s.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return nil, errBulkRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
req := serverBulkRequest(runtime, opt)
|
|
|
|
|
scope := serverFileScope(logical)
|
|
|
|
|
if req.Dedicated {
|
|
|
|
|
if err := transportDedicatedBulkSupportError(transport); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !validBulkRange(req.Range) {
|
|
|
|
|
return nil, errBulkRangeInvalid
|
|
|
|
|
}
|
|
|
|
|
if _, exists := runtime.lookup(scope, req.BulkID); exists {
|
|
|
|
|
return nil, errBulkAlreadyExists
|
|
|
|
|
}
|
|
|
|
|
if req.Dedicated {
|
|
|
|
|
if req.DataID == 0 {
|
|
|
|
|
req.DataID = runtime.nextDataID()
|
|
|
|
|
}
|
|
|
|
|
if req.AttachToken == "" {
|
|
|
|
|
req.AttachToken = newBulkAttachToken()
|
|
|
|
|
}
|
|
|
|
|
bulk := newBulkHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, transport, transport.TransportGeneration(), serverBulkCloseSender(s, logical, transport), serverBulkResetSender(s, logical, transport), serverBulkDataSender(s, transport), serverBulkWriteSender(s, logical, transport), serverBulkReleaseSender(s, logical, transport))
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markAcceptHandled()
|
2026-04-15 15:24:36 +08:00
|
|
|
if err := runtime.register(scope, bulk); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
s.attachServerDedicatedSidecarIfExists(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
resp, err := sendBulkOpenServerTransport(ctx, s, transport, req)
|
|
|
|
|
if err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markReset(err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.DataID != 0 && resp.DataID != req.DataID {
|
|
|
|
|
err = errBulkAlreadyExists
|
|
|
|
|
_, _ = sendBulkResetServerTransport(context.Background(), s, transport, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: "bulk dedicated data id mismatch",
|
|
|
|
|
})
|
|
|
|
|
bulk.markReset(err)
|
2026-04-15 15:24:36 +08:00
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.TransportGeneration != 0 {
|
|
|
|
|
bulk.transportGeneration = resp.TransportGeneration
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if resp.FastPathVersion != 0 {
|
|
|
|
|
bulk.fastPathVersion = normalizeBulkFastPathVersion(resp.FastPathVersion)
|
|
|
|
|
}
|
|
|
|
|
if resp.AttachToken != "" {
|
|
|
|
|
bulk.setDedicatedAttachToken(resp.AttachToken)
|
|
|
|
|
}
|
|
|
|
|
if err := bulk.waitAcceptReady(ctx); err != nil {
|
|
|
|
|
_, _ = sendBulkResetServerTransport(context.Background(), s, transport, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
return bulk, nil
|
|
|
|
|
}
|
|
|
|
|
resp, err := sendBulkOpenServerTransport(ctx, s, transport, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.DataID != 0 {
|
|
|
|
|
req.DataID = resp.DataID
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if resp.FastPathVersion != 0 {
|
|
|
|
|
req.FastPathVersion = resp.FastPathVersion
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
req.Dedicated = resp.Dedicated
|
|
|
|
|
if resp.AttachToken != "" {
|
|
|
|
|
req.AttachToken = resp.AttachToken
|
|
|
|
|
}
|
|
|
|
|
if req.DataID == 0 {
|
|
|
|
|
return nil, errBulkDataIDEmpty
|
|
|
|
|
}
|
|
|
|
|
bulk := newBulkHandle(logical.stopContextSnapshot(), runtime, scope, req, 0, logical, transport, resp.TransportGeneration, serverBulkCloseSender(s, logical, transport), serverBulkResetSender(s, logical, transport), serverBulkDataSender(s, transport), serverBulkWriteSender(s, logical, transport), serverBulkReleaseSender(s, logical, transport))
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markAcceptHandled()
|
2026-04-15 15:24:36 +08:00
|
|
|
if err := runtime.register(scope, bulk); err != nil {
|
|
|
|
|
_, _ = sendBulkResetServerTransport(context.Background(), s, transport, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
s.attachServerDedicatedSidecarIfExists(logical, bulk)
|
2026-04-15 15:24:36 +08:00
|
|
|
return bulk, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func serverBulkRequest(runtime *bulkRuntime, opt BulkOpenOptions) BulkOpenRequest {
|
|
|
|
|
opt = normalizeBulkOpenOptions(opt)
|
|
|
|
|
id := opt.ID
|
|
|
|
|
if id == "" && runtime != nil {
|
|
|
|
|
id = runtime.nextID()
|
|
|
|
|
}
|
|
|
|
|
return normalizeBulkOpenRequest(BulkOpenRequest{
|
2026-04-18 16:05:57 +08:00
|
|
|
BulkID: id,
|
|
|
|
|
FastPathVersion: bulkFastPathVersionCurrent,
|
|
|
|
|
Range: opt.Range,
|
|
|
|
|
Metadata: cloneBulkMetadata(opt.Metadata),
|
|
|
|
|
ReadTimeout: opt.ReadTimeout,
|
|
|
|
|
WriteTimeout: opt.WriteTimeout,
|
|
|
|
|
Dedicated: opt.Dedicated,
|
|
|
|
|
ChunkSize: opt.ChunkSize,
|
|
|
|
|
WindowBytes: opt.WindowBytes,
|
|
|
|
|
MaxInFlight: opt.MaxInFlight,
|
2026-04-15 15:24:36 +08:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func serverBulkCloseSender(s *ServerCommon, logical *LogicalConn, transport *TransportConn) bulkCloseSender {
|
|
|
|
|
return func(ctx context.Context, bulk *bulkHandle, full bool) error {
|
|
|
|
|
if bulk != nil && bulk.Dedicated() {
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.sendDedicatedBulkClose(ctx, bulk.LogicalConn(), bulk, full)
|
|
|
|
|
}
|
|
|
|
|
req := BulkCloseRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
Full: full,
|
|
|
|
|
}
|
|
|
|
|
if logical != nil {
|
|
|
|
|
_, err := sendBulkCloseServerLogical(ctx, s, logical, req)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
_, err := sendBulkCloseServerTransport(ctx, s, transport, req)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func serverBulkResetSender(s *ServerCommon, logical *LogicalConn, transport *TransportConn) bulkResetSender {
|
|
|
|
|
return func(ctx context.Context, bulk *bulkHandle, message string) error {
|
|
|
|
|
if bulk != nil && bulk.Dedicated() {
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.sendDedicatedBulkReset(ctx, bulk.LogicalConn(), bulk, message)
|
|
|
|
|
}
|
|
|
|
|
req := BulkResetRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
DataID: bulk.dataIDSnapshot(),
|
|
|
|
|
Error: message,
|
|
|
|
|
}
|
|
|
|
|
if logical != nil {
|
|
|
|
|
_, err := sendBulkResetServerLogical(ctx, s, logical, req)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
_, err := sendBulkResetServerTransport(ctx, s, transport, req)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func serverBulkDataSender(s *ServerCommon, transport *TransportConn) bulkDataSender {
|
|
|
|
|
return func(ctx context.Context, bulk *bulkHandle, chunk []byte) error {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if ctx != nil {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if bulk != nil && bulk.Dedicated() {
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.sendDedicatedBulkData(ctx, bulk.LogicalConn(), bulk, chunk)
|
|
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
return errBulkTransportNil
|
|
|
|
|
}
|
|
|
|
|
if !transport.IsCurrent() {
|
|
|
|
|
return errTransportDetached
|
|
|
|
|
}
|
|
|
|
|
dataID := bulk.dataIDSnapshot()
|
|
|
|
|
if dataID == 0 {
|
|
|
|
|
return errBulkDataPathNotReady
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return s.sendFastBulkDataTransport(ctx, bulk.LogicalConn(), transport, dataID, bulk.nextOutboundDataSeq(), chunk, bulk.fastPathVersionSnapshot())
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func serverBulkWriteSender(s *ServerCommon, logical *LogicalConn, transport *TransportConn) bulkWriteSender {
|
2026-04-18 16:05:57 +08:00
|
|
|
return func(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
if s == nil {
|
|
|
|
|
return 0, errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if ctx != nil {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return 0, ctx.Err()
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if bulk != nil && bulk.Dedicated() {
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
return s.sendDedicatedBulkWrite(ctx, bulk.LogicalConn(), bulk, startSeq, payload, payloadOwned)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
if transport == nil {
|
|
|
|
|
return 0, errBulkTransportNil
|
|
|
|
|
}
|
|
|
|
|
if !transport.IsCurrent() {
|
|
|
|
|
return 0, errTransportDetached
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if bulk == nil {
|
|
|
|
|
return 0, errBulkRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
dataID := bulk.dataIDSnapshot()
|
|
|
|
|
if dataID == 0 {
|
|
|
|
|
return 0, errBulkDataPathNotReady
|
|
|
|
|
}
|
|
|
|
|
return s.sendFastBulkWriteTransport(ctx, bulk.LogicalConn(), transport, dataID, startSeq, bulk.chunkSize, bulk.fastPathVersionSnapshot(), payload, payloadOwned)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func serverBulkReleaseSender(s *ServerCommon, logical *LogicalConn, transport *TransportConn) bulkReleaseSender {
|
|
|
|
|
return func(bulk *bulkHandle, bytes int64, chunks int) error {
|
|
|
|
|
if s == nil || bulk == nil {
|
|
|
|
|
return errBulkServerNil
|
|
|
|
|
}
|
|
|
|
|
if bytes <= 0 && chunks <= 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
ctx, cancel, err := bulk.newWriteContext(bulk.Context(), bulk.writeTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
2026-04-15 15:24:36 +08:00
|
|
|
if bulk.Dedicated() {
|
2026-04-18 16:05:57 +08:00
|
|
|
return s.sendDedicatedBulkRelease(ctx, logical, bulk, bytes, chunks)
|
|
|
|
|
}
|
|
|
|
|
if transport != nil && transport.IsCurrent() && bulk.fastPathVersionSnapshot() >= bulkFastPathVersionV2 {
|
|
|
|
|
payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.sendFastBulkControlTransport(ctx, logical, transport, bulkFastPayloadTypeRelease, 0, bulk.dataIDSnapshot(), 0, bulk.fastPathVersionSnapshot(), payload)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
req := BulkReleaseRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
DataID: bulk.dataIDSnapshot(),
|
|
|
|
|
Bytes: bytes,
|
|
|
|
|
Chunks: chunks,
|
|
|
|
|
}
|
|
|
|
|
if transport != nil && transport.IsCurrent() {
|
|
|
|
|
return sendBulkReleaseServerTransport(s, transport, req)
|
|
|
|
|
}
|
|
|
|
|
return sendBulkReleaseServerLogical(s, logical, req)
|
|
|
|
|
}
|
|
|
|
|
}
|