2026-04-15 15:24:36 +08:00
|
|
|
package notify
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
)
|
2026-04-15 15:24:36 +08:00
|
|
|
|
|
|
|
|
func (c *ClientCommon) SetBulkHandler(fn func(BulkAcceptInfo) error) {
|
|
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
runtime.setHandler(fn)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-18 16:05:57 +08:00
|
|
|
func (c *ClientCommon) OpenSharedBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) {
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return c.OpenBulk(ctx, opt)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) OpenDedicatedBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) {
|
|
|
|
|
opt.Mode = BulkOpenModeDedicated
|
|
|
|
|
opt.Dedicated = true
|
|
|
|
|
return c.OpenBulk(ctx, opt)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-15 15:24:36 +08:00
|
|
|
func (c *ClientCommon) OpenBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) {
|
2026-04-18 16:05:57 +08:00
|
|
|
if normalizeBulkOpenMode(opt.Mode) == BulkOpenModeDefault && !opt.Dedicated {
|
|
|
|
|
opt.Mode = c.bulkDefaultOpenModeSnapshot()
|
|
|
|
|
}
|
|
|
|
|
opt = normalizeBulkOpenOptions(opt)
|
|
|
|
|
switch opt.Mode {
|
|
|
|
|
case BulkOpenModeDedicated:
|
|
|
|
|
opt.Dedicated = true
|
|
|
|
|
return c.openBulkWithDedicatedMode(ctx, opt)
|
|
|
|
|
case BulkOpenModeAuto:
|
|
|
|
|
// Auto mode prefers dedicated path and falls back to shared if dedicated fails.
|
|
|
|
|
if err := clientDedicatedBulkSupportError(c); err == nil {
|
|
|
|
|
dedicatedOpt := opt
|
|
|
|
|
dedicatedOpt.Mode = BulkOpenModeDedicated
|
|
|
|
|
dedicatedOpt.Dedicated = true
|
|
|
|
|
bulk, dedicatedErr := c.openBulkWithDedicatedMode(ctx, dedicatedOpt)
|
|
|
|
|
if dedicatedErr == nil {
|
|
|
|
|
return bulk, nil
|
|
|
|
|
}
|
|
|
|
|
sharedOpt := opt
|
|
|
|
|
sharedOpt.Mode = BulkOpenModeShared
|
|
|
|
|
sharedOpt.Dedicated = false
|
|
|
|
|
sharedBulk, sharedErr := c.openBulkWithDedicatedMode(ctx, sharedOpt)
|
|
|
|
|
if sharedErr == nil {
|
|
|
|
|
c.bulkAttachFallbackCount.Add(1)
|
|
|
|
|
return sharedBulk, nil
|
|
|
|
|
}
|
|
|
|
|
return nil, errors.Join(dedicatedErr, sharedErr)
|
|
|
|
|
}
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
c.bulkAttachFallbackCount.Add(1)
|
|
|
|
|
return c.openBulkWithDedicatedMode(ctx, opt)
|
|
|
|
|
case BulkOpenModeShared, BulkOpenModeDefault:
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return c.openBulkWithDedicatedMode(ctx, opt)
|
|
|
|
|
default:
|
|
|
|
|
opt.Mode = BulkOpenModeShared
|
|
|
|
|
opt.Dedicated = false
|
|
|
|
|
return c.openBulkWithDedicatedMode(ctx, opt)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ClientCommon) openBulkWithDedicatedMode(ctx context.Context, opt BulkOpenOptions) (Bulk, error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
if c == nil {
|
|
|
|
|
return nil, errBulkClientNil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
opt = applyBulkOpenTuningDefaults(opt, c.bulkOpenTuningSnapshot())
|
2026-04-15 15:24:36 +08:00
|
|
|
runtime := c.getBulkRuntime()
|
|
|
|
|
if runtime == nil {
|
|
|
|
|
return nil, errBulkRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
req := clientBulkRequest(runtime, opt)
|
|
|
|
|
if req.BulkID == "" {
|
|
|
|
|
return nil, errBulkIDEmpty
|
|
|
|
|
}
|
|
|
|
|
if req.Dedicated {
|
|
|
|
|
if err := clientDedicatedBulkSupportError(c); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !validBulkRange(req.Range) {
|
|
|
|
|
return nil, errBulkRangeInvalid
|
|
|
|
|
}
|
|
|
|
|
if _, exists := runtime.lookup(clientFileScope(), req.BulkID); exists {
|
|
|
|
|
return nil, errBulkAlreadyExists
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if req.Dedicated {
|
|
|
|
|
req.DedicatedLaneID = c.reserveBulkDedicatedLane()
|
|
|
|
|
if req.DataID == 0 {
|
|
|
|
|
req.DataID = runtime.nextDataID()
|
|
|
|
|
}
|
|
|
|
|
if req.AttachToken == "" {
|
|
|
|
|
req.AttachToken = newBulkAttachToken()
|
|
|
|
|
}
|
|
|
|
|
bulk := newBulkHandle(c.clientStopContextSnapshot(), runtime, clientFileScope(), req, c.currentClientSessionEpoch(), nil, nil, 0, clientBulkCloseSender(c), clientBulkResetSender(c), clientBulkDataSender(c, c.currentClientSessionEpoch()), clientBulkWriteSender(c, c.currentClientSessionEpoch()), clientBulkReleaseSender(c))
|
|
|
|
|
bulk.setClientSnapshotOwner(c)
|
|
|
|
|
bulk.markAcceptHandled()
|
|
|
|
|
if err := runtime.register(clientFileScope(), bulk); err != nil {
|
|
|
|
|
c.releaseBulkDedicatedLane(req.DedicatedLaneID)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
resp, err := sendBulkOpenClient(ctx, c, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.DataID != 0 && resp.DataID != req.DataID {
|
|
|
|
|
err = errBulkAlreadyExists
|
|
|
|
|
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: "bulk dedicated data id mismatch",
|
|
|
|
|
})
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.TransportGeneration != 0 {
|
|
|
|
|
bulk.transportGeneration = resp.TransportGeneration
|
|
|
|
|
}
|
|
|
|
|
if resp.FastPathVersion != 0 {
|
|
|
|
|
bulk.fastPathVersion = normalizeBulkFastPathVersion(resp.FastPathVersion)
|
|
|
|
|
}
|
|
|
|
|
if resp.AttachToken != "" {
|
|
|
|
|
req.AttachToken = resp.AttachToken
|
|
|
|
|
bulk.setDedicatedAttachToken(resp.AttachToken)
|
|
|
|
|
}
|
|
|
|
|
if err := c.attachDedicatedBulkSidecar(ctx, bulk); err != nil {
|
|
|
|
|
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if err := bulk.waitAcceptReady(ctx); err != nil {
|
|
|
|
|
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
bulk.markReset(err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return bulk, nil
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
resp, err := sendBulkOpenClient(ctx, c, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if resp.DataID != 0 {
|
|
|
|
|
req.DataID = resp.DataID
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if resp.FastPathVersion != 0 {
|
|
|
|
|
req.FastPathVersion = resp.FastPathVersion
|
|
|
|
|
}
|
2026-04-15 15:24:36 +08:00
|
|
|
req.Dedicated = resp.Dedicated
|
|
|
|
|
if resp.AttachToken != "" {
|
|
|
|
|
req.AttachToken = resp.AttachToken
|
|
|
|
|
}
|
|
|
|
|
if req.DataID == 0 {
|
|
|
|
|
return nil, errBulkDataIDEmpty
|
|
|
|
|
}
|
|
|
|
|
bulk := newBulkHandle(c.clientStopContextSnapshot(), runtime, clientFileScope(), req, c.currentClientSessionEpoch(), nil, nil, resp.TransportGeneration, clientBulkCloseSender(c), clientBulkResetSender(c), clientBulkDataSender(c, c.currentClientSessionEpoch()), clientBulkWriteSender(c, c.currentClientSessionEpoch()), clientBulkReleaseSender(c))
|
|
|
|
|
bulk.setClientSnapshotOwner(c)
|
2026-04-18 16:05:57 +08:00
|
|
|
bulk.markAcceptHandled()
|
2026-04-15 15:24:36 +08:00
|
|
|
if err := runtime.register(clientFileScope(), bulk); err != nil {
|
2026-04-18 16:05:57 +08:00
|
|
|
c.releaseBulkDedicatedLane(req.DedicatedLaneID)
|
2026-04-15 15:24:36 +08:00
|
|
|
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
|
|
|
|
|
BulkID: req.BulkID,
|
|
|
|
|
DataID: req.DataID,
|
|
|
|
|
Error: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if bulk.Dedicated() {
|
|
|
|
|
if err := c.attachDedicatedBulkSidecar(ctx, bulk); err != nil {
|
|
|
|
|
runtime.remove(clientFileScope(), bulk.ID())
|
|
|
|
|
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
DataID: bulk.dataIDSnapshot(),
|
|
|
|
|
Error: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return bulk, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func clientBulkRequest(runtime *bulkRuntime, opt BulkOpenOptions) BulkOpenRequest {
|
|
|
|
|
opt = normalizeBulkOpenOptions(opt)
|
|
|
|
|
id := opt.ID
|
|
|
|
|
if id == "" && runtime != nil {
|
|
|
|
|
id = runtime.nextID()
|
|
|
|
|
}
|
|
|
|
|
return normalizeBulkOpenRequest(BulkOpenRequest{
|
2026-04-18 16:05:57 +08:00
|
|
|
BulkID: id,
|
|
|
|
|
FastPathVersion: bulkFastPathVersionCurrent,
|
|
|
|
|
Range: opt.Range,
|
|
|
|
|
Metadata: cloneBulkMetadata(opt.Metadata),
|
|
|
|
|
ReadTimeout: opt.ReadTimeout,
|
|
|
|
|
WriteTimeout: opt.WriteTimeout,
|
|
|
|
|
Dedicated: opt.Dedicated,
|
|
|
|
|
ChunkSize: opt.ChunkSize,
|
|
|
|
|
WindowBytes: opt.WindowBytes,
|
|
|
|
|
MaxInFlight: opt.MaxInFlight,
|
2026-04-15 15:24:36 +08:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func clientBulkCloseSender(c *ClientCommon) bulkCloseSender {
|
|
|
|
|
return func(ctx context.Context, bulk *bulkHandle, full bool) error {
|
|
|
|
|
if bulk != nil && bulk.Dedicated() {
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return c.sendDedicatedBulkClose(ctx, bulk, full)
|
|
|
|
|
}
|
|
|
|
|
_, err := sendBulkCloseClient(ctx, c, BulkCloseRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
Full: full,
|
|
|
|
|
})
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func clientBulkResetSender(c *ClientCommon) bulkResetSender {
|
|
|
|
|
return func(ctx context.Context, bulk *bulkHandle, message string) error {
|
|
|
|
|
if bulk != nil && bulk.Dedicated() {
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return c.sendDedicatedBulkReset(ctx, bulk, message)
|
|
|
|
|
}
|
|
|
|
|
_, err := sendBulkResetClient(ctx, c, BulkResetRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
DataID: bulk.dataIDSnapshot(),
|
|
|
|
|
Error: message,
|
|
|
|
|
})
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func clientBulkDataSender(c *ClientCommon, epoch uint64) bulkDataSender {
|
|
|
|
|
return func(ctx context.Context, bulk *bulkHandle, chunk []byte) error {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
if ctx != nil {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if bulk != nil && bulk.Dedicated() {
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return c.sendDedicatedBulkData(ctx, bulk, chunk)
|
|
|
|
|
}
|
|
|
|
|
if epoch != 0 && !c.isClientSessionEpochCurrent(epoch) {
|
|
|
|
|
return errTransportDetached
|
|
|
|
|
}
|
|
|
|
|
dataID := bulk.dataIDSnapshot()
|
|
|
|
|
if dataID == 0 {
|
|
|
|
|
return errBulkDataPathNotReady
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
return c.sendFastBulkData(ctx, dataID, bulk.nextOutboundDataSeq(), chunk, bulk.fastPathVersionSnapshot())
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func clientBulkWriteSender(c *ClientCommon, epoch uint64) bulkWriteSender {
|
2026-04-18 16:05:57 +08:00
|
|
|
return func(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
|
2026-04-15 15:24:36 +08:00
|
|
|
if c == nil {
|
|
|
|
|
return 0, errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
if ctx != nil {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return 0, ctx.Err()
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if bulk != nil && bulk.Dedicated() {
|
|
|
|
|
if err := bulk.waitDedicatedReady(ctx); err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
2026-04-20 16:35:44 +08:00
|
|
|
return c.sendDedicatedBulkWrite(ctx, bulk, startSeq, payload, payloadOwned)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
if epoch != 0 && !c.isClientSessionEpochCurrent(epoch) {
|
|
|
|
|
return 0, errTransportDetached
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
if bulk == nil {
|
|
|
|
|
return 0, errBulkRuntimeNil
|
|
|
|
|
}
|
|
|
|
|
dataID := bulk.dataIDSnapshot()
|
|
|
|
|
if dataID == 0 {
|
|
|
|
|
return 0, errBulkDataPathNotReady
|
|
|
|
|
}
|
|
|
|
|
return c.sendFastBulkWrite(ctx, dataID, startSeq, bulk.chunkSize, bulk.fastPathVersionSnapshot(), payload, payloadOwned)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func clientBulkReleaseSender(c *ClientCommon) bulkReleaseSender {
|
|
|
|
|
return func(bulk *bulkHandle, bytes int64, chunks int) error {
|
|
|
|
|
if c == nil || bulk == nil {
|
|
|
|
|
return errBulkClientNil
|
|
|
|
|
}
|
|
|
|
|
if bytes <= 0 && chunks <= 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2026-04-18 16:05:57 +08:00
|
|
|
ctx, cancel, err := bulk.newWriteContext(bulk.Context(), bulk.writeTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
2026-04-15 15:24:36 +08:00
|
|
|
if bulk.Dedicated() {
|
2026-04-18 16:05:57 +08:00
|
|
|
return c.sendDedicatedBulkRelease(ctx, bulk, bytes, chunks)
|
|
|
|
|
}
|
|
|
|
|
if bulk.fastPathVersionSnapshot() >= bulkFastPathVersionV2 {
|
|
|
|
|
payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return c.sendFastBulkControl(ctx, bulkFastPayloadTypeRelease, 0, bulk.dataIDSnapshot(), 0, bulk.fastPathVersionSnapshot(), payload)
|
2026-04-15 15:24:36 +08:00
|
|
|
}
|
|
|
|
|
return sendBulkReleaseClient(c, BulkReleaseRequest{
|
|
|
|
|
BulkID: bulk.ID(),
|
|
|
|
|
DataID: bulk.dataIDSnapshot(),
|
|
|
|
|
Bytes: bytes,
|
|
|
|
|
Chunks: chunks,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|