package notify import ( "context" "errors" ) func (c *ClientCommon) SetBulkHandler(fn func(BulkAcceptInfo) error) { runtime := c.getBulkRuntime() if runtime == nil { return } runtime.setHandler(fn) } func (c *ClientCommon) OpenSharedBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) { opt.Mode = BulkOpenModeShared opt.Dedicated = false return c.OpenBulk(ctx, opt) } func (c *ClientCommon) OpenDedicatedBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) { opt.Mode = BulkOpenModeDedicated opt.Dedicated = true return c.OpenBulk(ctx, opt) } func (c *ClientCommon) OpenBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) { if normalizeBulkOpenMode(opt.Mode) == BulkOpenModeDefault && !opt.Dedicated { opt.Mode = c.bulkDefaultOpenModeSnapshot() } opt = normalizeBulkOpenOptions(opt) switch opt.Mode { case BulkOpenModeDedicated: opt.Dedicated = true return c.openBulkWithDedicatedMode(ctx, opt) case BulkOpenModeAuto: // Auto mode prefers dedicated path and falls back to shared if dedicated fails. if err := clientDedicatedBulkSupportError(c); err == nil { dedicatedOpt := opt dedicatedOpt.Mode = BulkOpenModeDedicated dedicatedOpt.Dedicated = true bulk, dedicatedErr := c.openBulkWithDedicatedMode(ctx, dedicatedOpt) if dedicatedErr == nil { return bulk, nil } sharedOpt := opt sharedOpt.Mode = BulkOpenModeShared sharedOpt.Dedicated = false sharedBulk, sharedErr := c.openBulkWithDedicatedMode(ctx, sharedOpt) if sharedErr == nil { c.bulkAttachFallbackCount.Add(1) return sharedBulk, nil } return nil, errors.Join(dedicatedErr, sharedErr) } opt.Mode = BulkOpenModeShared opt.Dedicated = false c.bulkAttachFallbackCount.Add(1) return c.openBulkWithDedicatedMode(ctx, opt) case BulkOpenModeShared, BulkOpenModeDefault: opt.Mode = BulkOpenModeShared opt.Dedicated = false return c.openBulkWithDedicatedMode(ctx, opt) default: opt.Mode = BulkOpenModeShared opt.Dedicated = false return c.openBulkWithDedicatedMode(ctx, opt) } } func (c *ClientCommon) openBulkWithDedicatedMode(ctx context.Context, opt BulkOpenOptions) (Bulk, error) { if c == nil { return nil, errBulkClientNil } opt = applyBulkOpenTuningDefaults(opt, c.bulkOpenTuningSnapshot()) runtime := c.getBulkRuntime() if runtime == nil { return nil, errBulkRuntimeNil } req := clientBulkRequest(runtime, opt) if req.BulkID == "" { return nil, errBulkIDEmpty } if req.Dedicated { if err := clientDedicatedBulkSupportError(c); err != nil { return nil, err } } if !validBulkRange(req.Range) { return nil, errBulkRangeInvalid } if _, exists := runtime.lookup(clientFileScope(), req.BulkID); exists { return nil, errBulkAlreadyExists } if req.Dedicated { req.DedicatedLaneID = c.reserveBulkDedicatedLane() if req.DataID == 0 { req.DataID = runtime.nextDataID() } if req.AttachToken == "" { req.AttachToken = newBulkAttachToken() } bulk := newBulkHandle(c.clientStopContextSnapshot(), runtime, clientFileScope(), req, c.currentClientSessionEpoch(), nil, nil, 0, clientBulkCloseSender(c), clientBulkResetSender(c), clientBulkDataSender(c, c.currentClientSessionEpoch()), clientBulkWriteSender(c, c.currentClientSessionEpoch()), clientBulkReleaseSender(c)) bulk.setClientSnapshotOwner(c) bulk.markAcceptHandled() if err := runtime.register(clientFileScope(), bulk); err != nil { c.releaseBulkDedicatedLane(req.DedicatedLaneID) return nil, err } resp, err := sendBulkOpenClient(ctx, c, req) if err != nil { bulk.markReset(err) return nil, err } if resp.DataID != 0 && resp.DataID != req.DataID { err = errBulkAlreadyExists _, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{ BulkID: req.BulkID, DataID: req.DataID, Error: "bulk dedicated data id mismatch", }) bulk.markReset(err) return nil, err } if resp.TransportGeneration != 0 { bulk.transportGeneration = resp.TransportGeneration } if resp.FastPathVersion != 0 { bulk.fastPathVersion = normalizeBulkFastPathVersion(resp.FastPathVersion) } if resp.AttachToken != "" { req.AttachToken = resp.AttachToken bulk.setDedicatedAttachToken(resp.AttachToken) } if err := c.attachDedicatedBulkSidecar(ctx, bulk); err != nil { _, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{ BulkID: req.BulkID, DataID: req.DataID, Error: err.Error(), }) bulk.markReset(err) return nil, err } if err := bulk.waitAcceptReady(ctx); err != nil { _, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{ BulkID: req.BulkID, DataID: req.DataID, Error: err.Error(), }) bulk.markReset(err) return nil, err } return bulk, nil } resp, err := sendBulkOpenClient(ctx, c, req) if err != nil { return nil, err } if resp.DataID != 0 { req.DataID = resp.DataID } if resp.FastPathVersion != 0 { req.FastPathVersion = resp.FastPathVersion } req.Dedicated = resp.Dedicated if resp.AttachToken != "" { req.AttachToken = resp.AttachToken } if req.DataID == 0 { return nil, errBulkDataIDEmpty } bulk := newBulkHandle(c.clientStopContextSnapshot(), runtime, clientFileScope(), req, c.currentClientSessionEpoch(), nil, nil, resp.TransportGeneration, clientBulkCloseSender(c), clientBulkResetSender(c), clientBulkDataSender(c, c.currentClientSessionEpoch()), clientBulkWriteSender(c, c.currentClientSessionEpoch()), clientBulkReleaseSender(c)) bulk.setClientSnapshotOwner(c) bulk.markAcceptHandled() if err := runtime.register(clientFileScope(), bulk); err != nil { c.releaseBulkDedicatedLane(req.DedicatedLaneID) _, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{ BulkID: req.BulkID, DataID: req.DataID, Error: err.Error(), }) return nil, err } if bulk.Dedicated() { if err := c.attachDedicatedBulkSidecar(ctx, bulk); err != nil { runtime.remove(clientFileScope(), bulk.ID()) _, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{ BulkID: bulk.ID(), DataID: bulk.dataIDSnapshot(), Error: err.Error(), }) return nil, err } } return bulk, nil } func clientBulkRequest(runtime *bulkRuntime, opt BulkOpenOptions) BulkOpenRequest { opt = normalizeBulkOpenOptions(opt) id := opt.ID if id == "" && runtime != nil { id = runtime.nextID() } return normalizeBulkOpenRequest(BulkOpenRequest{ BulkID: id, FastPathVersion: bulkFastPathVersionCurrent, Range: opt.Range, Metadata: cloneBulkMetadata(opt.Metadata), ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, Dedicated: opt.Dedicated, ChunkSize: opt.ChunkSize, WindowBytes: opt.WindowBytes, MaxInFlight: opt.MaxInFlight, }) } func clientBulkCloseSender(c *ClientCommon) bulkCloseSender { return func(ctx context.Context, bulk *bulkHandle, full bool) error { if bulk != nil && bulk.Dedicated() { if err := bulk.waitDedicatedReady(ctx); err != nil { return err } return c.sendDedicatedBulkClose(ctx, bulk, full) } _, err := sendBulkCloseClient(ctx, c, BulkCloseRequest{ BulkID: bulk.ID(), Full: full, }) return err } } func clientBulkResetSender(c *ClientCommon) bulkResetSender { return func(ctx context.Context, bulk *bulkHandle, message string) error { if bulk != nil && bulk.Dedicated() { if err := bulk.waitDedicatedReady(ctx); err != nil { return err } return c.sendDedicatedBulkReset(ctx, bulk, message) } _, err := sendBulkResetClient(ctx, c, BulkResetRequest{ BulkID: bulk.ID(), DataID: bulk.dataIDSnapshot(), Error: message, }) return err } } func clientBulkDataSender(c *ClientCommon, epoch uint64) bulkDataSender { return func(ctx context.Context, bulk *bulkHandle, chunk []byte) error { if c == nil { return errBulkClientNil } if ctx != nil { select { case <-ctx.Done(): return ctx.Err() default: } } if bulk != nil && bulk.Dedicated() { if err := bulk.waitDedicatedReady(ctx); err != nil { return err } return c.sendDedicatedBulkData(ctx, bulk, chunk) } if epoch != 0 && !c.isClientSessionEpochCurrent(epoch) { return errTransportDetached } dataID := bulk.dataIDSnapshot() if dataID == 0 { return errBulkDataPathNotReady } return c.sendFastBulkData(ctx, dataID, bulk.nextOutboundDataSeq(), chunk, bulk.fastPathVersionSnapshot()) } } func clientBulkWriteSender(c *ClientCommon, epoch uint64) bulkWriteSender { return func(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) { if c == nil { return 0, errBulkClientNil } if ctx != nil { select { case <-ctx.Done(): return 0, ctx.Err() default: } } if bulk != nil && bulk.Dedicated() { if err := bulk.waitDedicatedReady(ctx); err != nil { return 0, err } return c.sendDedicatedBulkWrite(ctx, bulk, startSeq, payload, payloadOwned) } if epoch != 0 && !c.isClientSessionEpochCurrent(epoch) { return 0, errTransportDetached } if bulk == nil { return 0, errBulkRuntimeNil } dataID := bulk.dataIDSnapshot() if dataID == 0 { return 0, errBulkDataPathNotReady } return c.sendFastBulkWrite(ctx, dataID, startSeq, bulk.chunkSize, bulk.fastPathVersionSnapshot(), payload, payloadOwned) } } func clientBulkReleaseSender(c *ClientCommon) bulkReleaseSender { return func(bulk *bulkHandle, bytes int64, chunks int) error { if c == nil || bulk == nil { return errBulkClientNil } if bytes <= 0 && chunks <= 0 { return nil } ctx, cancel, err := bulk.newWriteContext(bulk.Context(), bulk.writeTimeout) if err != nil { return err } defer cancel() if bulk.Dedicated() { return c.sendDedicatedBulkRelease(ctx, bulk, bytes, chunks) } if bulk.fastPathVersionSnapshot() >= bulkFastPathVersionV2 { payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks) if err != nil { return err } return c.sendFastBulkControl(ctx, bulkFastPayloadTypeRelease, 0, bulk.dataIDSnapshot(), 0, bulk.fastPathVersionSnapshot(), payload) } return sendBulkReleaseClient(c, BulkReleaseRequest{ BulkID: bulk.ID(), DataID: bulk.dataIDSnapshot(), Bytes: bytes, Chunks: chunks, }) } }