notify/client_bulk.go

344 lines
10 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"errors"
)
func (c *ClientCommon) SetBulkHandler(fn func(BulkAcceptInfo) error) {
runtime := c.getBulkRuntime()
if runtime == nil {
return
}
runtime.setHandler(fn)
}
func (c *ClientCommon) OpenSharedBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) {
opt.Mode = BulkOpenModeShared
opt.Dedicated = false
return c.OpenBulk(ctx, opt)
}
func (c *ClientCommon) OpenDedicatedBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) {
opt.Mode = BulkOpenModeDedicated
opt.Dedicated = true
return c.OpenBulk(ctx, opt)
}
func (c *ClientCommon) OpenBulk(ctx context.Context, opt BulkOpenOptions) (Bulk, error) {
if normalizeBulkOpenMode(opt.Mode) == BulkOpenModeDefault && !opt.Dedicated {
opt.Mode = c.bulkDefaultOpenModeSnapshot()
}
opt = normalizeBulkOpenOptions(opt)
switch opt.Mode {
case BulkOpenModeDedicated:
opt.Dedicated = true
return c.openBulkWithDedicatedMode(ctx, opt)
case BulkOpenModeAuto:
// Auto mode prefers dedicated path and falls back to shared if dedicated fails.
if err := clientDedicatedBulkSupportError(c); err == nil {
dedicatedOpt := opt
dedicatedOpt.Mode = BulkOpenModeDedicated
dedicatedOpt.Dedicated = true
bulk, dedicatedErr := c.openBulkWithDedicatedMode(ctx, dedicatedOpt)
if dedicatedErr == nil {
return bulk, nil
}
sharedOpt := opt
sharedOpt.Mode = BulkOpenModeShared
sharedOpt.Dedicated = false
sharedBulk, sharedErr := c.openBulkWithDedicatedMode(ctx, sharedOpt)
if sharedErr == nil {
c.bulkAttachFallbackCount.Add(1)
return sharedBulk, nil
}
return nil, errors.Join(dedicatedErr, sharedErr)
}
opt.Mode = BulkOpenModeShared
opt.Dedicated = false
c.bulkAttachFallbackCount.Add(1)
return c.openBulkWithDedicatedMode(ctx, opt)
case BulkOpenModeShared, BulkOpenModeDefault:
opt.Mode = BulkOpenModeShared
opt.Dedicated = false
return c.openBulkWithDedicatedMode(ctx, opt)
default:
opt.Mode = BulkOpenModeShared
opt.Dedicated = false
return c.openBulkWithDedicatedMode(ctx, opt)
}
}
func (c *ClientCommon) openBulkWithDedicatedMode(ctx context.Context, opt BulkOpenOptions) (Bulk, error) {
if c == nil {
return nil, errBulkClientNil
}
opt = applyBulkOpenTuningDefaults(opt, c.bulkOpenTuningSnapshot())
runtime := c.getBulkRuntime()
if runtime == nil {
return nil, errBulkRuntimeNil
}
req := clientBulkRequest(runtime, opt)
if req.BulkID == "" {
return nil, errBulkIDEmpty
}
if req.Dedicated {
if err := clientDedicatedBulkSupportError(c); err != nil {
return nil, err
}
}
if !validBulkRange(req.Range) {
return nil, errBulkRangeInvalid
}
if _, exists := runtime.lookup(clientFileScope(), req.BulkID); exists {
return nil, errBulkAlreadyExists
}
if req.Dedicated {
req.DedicatedLaneID = c.reserveBulkDedicatedLane()
if req.DataID == 0 {
req.DataID = runtime.nextDataID()
}
if req.AttachToken == "" {
req.AttachToken = newBulkAttachToken()
}
bulk := newBulkHandle(c.clientStopContextSnapshot(), runtime, clientFileScope(), req, c.currentClientSessionEpoch(), nil, nil, 0, clientBulkCloseSender(c), clientBulkResetSender(c), clientBulkDataSender(c, c.currentClientSessionEpoch()), clientBulkWriteSender(c, c.currentClientSessionEpoch()), clientBulkReleaseSender(c))
bulk.setClientSnapshotOwner(c)
bulk.markAcceptHandled()
if err := runtime.register(clientFileScope(), bulk); err != nil {
c.releaseBulkDedicatedLane(req.DedicatedLaneID)
return nil, err
}
resp, err := sendBulkOpenClient(ctx, c, req)
if err != nil {
bulk.markReset(err)
return nil, err
}
if resp.DataID != 0 && resp.DataID != req.DataID {
err = errBulkAlreadyExists
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
BulkID: req.BulkID,
DataID: req.DataID,
Error: "bulk dedicated data id mismatch",
})
bulk.markReset(err)
return nil, err
}
if resp.TransportGeneration != 0 {
bulk.transportGeneration = resp.TransportGeneration
}
if resp.FastPathVersion != 0 {
bulk.fastPathVersion = normalizeBulkFastPathVersion(resp.FastPathVersion)
}
if resp.AttachToken != "" {
req.AttachToken = resp.AttachToken
bulk.setDedicatedAttachToken(resp.AttachToken)
}
if err := c.attachDedicatedBulkSidecar(ctx, bulk); err != nil {
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
BulkID: req.BulkID,
DataID: req.DataID,
Error: err.Error(),
})
bulk.markReset(err)
return nil, err
}
if err := bulk.waitAcceptReady(ctx); err != nil {
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
BulkID: req.BulkID,
DataID: req.DataID,
Error: err.Error(),
})
bulk.markReset(err)
return nil, err
}
return bulk, nil
}
resp, err := sendBulkOpenClient(ctx, c, req)
if err != nil {
return nil, err
}
if resp.DataID != 0 {
req.DataID = resp.DataID
}
if resp.FastPathVersion != 0 {
req.FastPathVersion = resp.FastPathVersion
}
req.Dedicated = resp.Dedicated
if resp.AttachToken != "" {
req.AttachToken = resp.AttachToken
}
if req.DataID == 0 {
return nil, errBulkDataIDEmpty
}
bulk := newBulkHandle(c.clientStopContextSnapshot(), runtime, clientFileScope(), req, c.currentClientSessionEpoch(), nil, nil, resp.TransportGeneration, clientBulkCloseSender(c), clientBulkResetSender(c), clientBulkDataSender(c, c.currentClientSessionEpoch()), clientBulkWriteSender(c, c.currentClientSessionEpoch()), clientBulkReleaseSender(c))
bulk.setClientSnapshotOwner(c)
bulk.markAcceptHandled()
if err := runtime.register(clientFileScope(), bulk); err != nil {
c.releaseBulkDedicatedLane(req.DedicatedLaneID)
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
BulkID: req.BulkID,
DataID: req.DataID,
Error: err.Error(),
})
return nil, err
}
if bulk.Dedicated() {
if err := c.attachDedicatedBulkSidecar(ctx, bulk); err != nil {
runtime.remove(clientFileScope(), bulk.ID())
_, _ = sendBulkResetClient(context.Background(), c, BulkResetRequest{
BulkID: bulk.ID(),
DataID: bulk.dataIDSnapshot(),
Error: err.Error(),
})
return nil, err
}
}
return bulk, nil
}
func clientBulkRequest(runtime *bulkRuntime, opt BulkOpenOptions) BulkOpenRequest {
opt = normalizeBulkOpenOptions(opt)
id := opt.ID
if id == "" && runtime != nil {
id = runtime.nextID()
}
return normalizeBulkOpenRequest(BulkOpenRequest{
BulkID: id,
FastPathVersion: bulkFastPathVersionCurrent,
Range: opt.Range,
Metadata: cloneBulkMetadata(opt.Metadata),
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
Dedicated: opt.Dedicated,
ChunkSize: opt.ChunkSize,
WindowBytes: opt.WindowBytes,
MaxInFlight: opt.MaxInFlight,
})
}
func clientBulkCloseSender(c *ClientCommon) bulkCloseSender {
return func(ctx context.Context, bulk *bulkHandle, full bool) error {
if bulk != nil && bulk.Dedicated() {
if err := bulk.waitDedicatedReady(ctx); err != nil {
return err
}
return c.sendDedicatedBulkClose(ctx, bulk, full)
}
_, err := sendBulkCloseClient(ctx, c, BulkCloseRequest{
BulkID: bulk.ID(),
Full: full,
})
return err
}
}
func clientBulkResetSender(c *ClientCommon) bulkResetSender {
return func(ctx context.Context, bulk *bulkHandle, message string) error {
if bulk != nil && bulk.Dedicated() {
if err := bulk.waitDedicatedReady(ctx); err != nil {
return err
}
return c.sendDedicatedBulkReset(ctx, bulk, message)
}
_, err := sendBulkResetClient(ctx, c, BulkResetRequest{
BulkID: bulk.ID(),
DataID: bulk.dataIDSnapshot(),
Error: message,
})
return err
}
}
func clientBulkDataSender(c *ClientCommon, epoch uint64) bulkDataSender {
return func(ctx context.Context, bulk *bulkHandle, chunk []byte) error {
if c == nil {
return errBulkClientNil
}
if ctx != nil {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
if bulk != nil && bulk.Dedicated() {
if err := bulk.waitDedicatedReady(ctx); err != nil {
return err
}
return c.sendDedicatedBulkData(ctx, bulk, chunk)
}
if epoch != 0 && !c.isClientSessionEpochCurrent(epoch) {
return errTransportDetached
}
dataID := bulk.dataIDSnapshot()
if dataID == 0 {
return errBulkDataPathNotReady
}
return c.sendFastBulkData(ctx, dataID, bulk.nextOutboundDataSeq(), chunk, bulk.fastPathVersionSnapshot())
}
}
func clientBulkWriteSender(c *ClientCommon, epoch uint64) bulkWriteSender {
return func(ctx context.Context, bulk *bulkHandle, startSeq uint64, payload []byte, payloadOwned bool) (int, error) {
if c == nil {
return 0, errBulkClientNil
}
if ctx != nil {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
}
if bulk != nil && bulk.Dedicated() {
if err := bulk.waitDedicatedReady(ctx); err != nil {
return 0, err
}
return c.sendDedicatedBulkWrite(ctx, bulk, startSeq, payload, payloadOwned)
}
if epoch != 0 && !c.isClientSessionEpochCurrent(epoch) {
return 0, errTransportDetached
}
if bulk == nil {
return 0, errBulkRuntimeNil
}
dataID := bulk.dataIDSnapshot()
if dataID == 0 {
return 0, errBulkDataPathNotReady
}
return c.sendFastBulkWrite(ctx, dataID, startSeq, bulk.chunkSize, bulk.fastPathVersionSnapshot(), payload, payloadOwned)
}
}
func clientBulkReleaseSender(c *ClientCommon) bulkReleaseSender {
return func(bulk *bulkHandle, bytes int64, chunks int) error {
if c == nil || bulk == nil {
return errBulkClientNil
}
if bytes <= 0 && chunks <= 0 {
return nil
}
ctx, cancel, err := bulk.newWriteContext(bulk.Context(), bulk.writeTimeout)
if err != nil {
return err
}
defer cancel()
if bulk.Dedicated() {
return c.sendDedicatedBulkRelease(ctx, bulk, bytes, chunks)
}
if bulk.fastPathVersionSnapshot() >= bulkFastPathVersionV2 {
payload, err := encodeBulkDedicatedReleasePayload(bytes, chunks)
if err != nil {
return err
}
return c.sendFastBulkControl(ctx, bulkFastPayloadTypeRelease, 0, bulk.dataIDSnapshot(), 0, bulk.fastPathVersionSnapshot(), payload)
}
return sendBulkReleaseClient(c, BulkReleaseRequest{
BulkID: bulk.ID(),
DataID: bulk.dataIDSnapshot(),
Bytes: bytes,
Chunks: chunks,
})
}
}