notify/record_stream.go

1097 lines
23 KiB
Go
Raw Normal View History

package notify
import (
"context"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
)
type RecordErrorCode string
const (
RecordErrorCodeApplyFailed RecordErrorCode = "apply_failed"
RecordErrorCodeProtocol RecordErrorCode = "protocol"
RecordErrorCodeCanceled RecordErrorCode = "canceled"
)
const (
defaultRecordMaxBatchRecords = 64
defaultRecordMaxBatchBytes = 128 * 1024
defaultRecordMaxBatchDelay = 2 * time.Millisecond
defaultRecordMaxUnackedRecords = 4096
defaultRecordMaxUnackedBytes = 8 * 1024 * 1024
defaultRecordInboundQueueLimit = 128
defaultRecordAckEveryRecords = 64
defaultRecordAckDelay = time.Millisecond
)
type RecordFailure struct {
FailedSeq uint64
Code RecordErrorCode
Retryable bool
Message string
}
func (f RecordFailure) Error() string {
if f.FailedSeq == 0 && f.Code == "" && f.Message == "" {
return "record stream failed"
}
if f.Message == "" {
return fmt.Sprintf("record stream failed: seq=%d code=%s retryable=%t", f.FailedSeq, f.Code, f.Retryable)
}
return fmt.Sprintf("record stream failed: seq=%d code=%s retryable=%t msg=%s", f.FailedSeq, f.Code, f.Retryable, f.Message)
}
type RecordMessage struct {
Seq uint64
Payload []byte
}
type RecordOpenOptions struct {
Stream StreamOpenOptions
MaxBatchRecords int
MaxBatchBytes int
MaxBatchDelay time.Duration
MaxUnackedRecords int
MaxUnackedBytes int
InboundQueueLimit int
AckEveryRecords int
AckDelay time.Duration
}
type RecordAcceptInfo struct {
ID string
Metadata StreamMetadata
LogicalConn *LogicalConn
TransportConn *TransportConn
TransportGeneration uint64
RecordStream RecordStream
}
type RecordStream interface {
ID() string
Metadata() StreamMetadata
Context() context.Context
ReadRecord(context.Context) (RecordMessage, error)
WriteRecord(context.Context, []byte) (uint64, error)
Flush(context.Context) error
BarrierTo(context.Context, uint64) (uint64, error)
Barrier(context.Context) (uint64, error)
AckRecord(uint64) error
FailRecord(uint64, RecordFailure) error
CloseWrite() error
Close() error
Reset(error) error
}
type recordConfig struct {
MaxBatchRecords int
MaxBatchBytes int
MaxBatchDelay time.Duration
MaxUnackedRecords int
MaxUnackedBytes int
InboundQueueLimit int
AckEveryRecords int
AckDelay time.Duration
}
type recordFlushRequest struct {
targetSeq uint64
forceAck bool
done chan error
}
type recordObservability struct {
batchFramesSent atomic.Int64
ackFramesSent atomic.Int64
errorFramesSent atomic.Int64
batchFramesReceived atomic.Int64
ackFramesReceived atomic.Int64
errorFramesReceived atomic.Int64
piggybackAckSent atomic.Int64
piggybackAckReceived atomic.Int64
barrierCount atomic.Int64
barrierFlushWaitNanos atomic.Int64
barrierApplyWaitNanos atomic.Int64
}
type recordStream struct {
stream Stream
ctx context.Context
cancel context.CancelFunc
cfg recordConfig
writeMu sync.Mutex
sendCh chan recordOutboundMessage
flushCh chan recordFlushRequest
recvCh chan RecordMessage
ackCh chan struct{}
readerCh chan struct{}
useBatchAck bool
obs recordObservability
mu sync.Mutex
stateNotify chan struct{}
runtime *recordRuntime
runtimeKey string
runtimeWatchOnce sync.Once
runtimeDetachOnce sync.Once
nextOutboundSeq uint64
enqueuedOutboundSeq uint64
flushedOutboundSeq uint64
ackedOutboundSeq uint64
outstandingRecords int
outstandingBytes int
outstandingSizes map[uint64]int
outboundClosed bool
inboundReceivedSeq uint64
inboundAppliedSeq uint64
inboundApplied map[uint64]struct{}
inboundAckSentSeq uint64
maxPendingApply int
remoteClosed bool
readErr error
terminalErr error
}
var (
errRecordStreamNil = errors.New("record stream is nil")
errRecordRuntimeNil = errors.New("record runtime is nil")
errRecordHandlerNotConfigured = errors.New("record handler is not configured")
errRecordWriteClosed = errors.New("record stream write side is closed")
errRecordSeqNotReceived = errors.New("record sequence not received")
)
func normalizeRecordOpenOptions(opt RecordOpenOptions) RecordOpenOptions {
if opt.MaxBatchRecords <= 0 {
opt.MaxBatchRecords = defaultRecordMaxBatchRecords
}
if opt.MaxBatchBytes <= 0 {
opt.MaxBatchBytes = defaultRecordMaxBatchBytes
}
if opt.MaxBatchDelay <= 0 {
opt.MaxBatchDelay = defaultRecordMaxBatchDelay
}
if opt.MaxUnackedRecords <= 0 {
opt.MaxUnackedRecords = defaultRecordMaxUnackedRecords
}
if opt.MaxUnackedBytes <= 0 {
opt.MaxUnackedBytes = defaultRecordMaxUnackedBytes
}
if opt.InboundQueueLimit <= 0 {
opt.InboundQueueLimit = defaultRecordInboundQueueLimit
}
if opt.AckEveryRecords <= 0 {
opt.AckEveryRecords = defaultRecordAckEveryRecords
}
if opt.AckDelay <= 0 {
opt.AckDelay = defaultRecordAckDelay
}
opt.Stream = normalizeRecordStreamOpenOptions(opt.Stream)
return opt
}
func recordConfigFromOptions(opt RecordOpenOptions) recordConfig {
return recordConfig{
MaxBatchRecords: opt.MaxBatchRecords,
MaxBatchBytes: opt.MaxBatchBytes,
MaxBatchDelay: opt.MaxBatchDelay,
MaxUnackedRecords: opt.MaxUnackedRecords,
MaxUnackedBytes: opt.MaxUnackedBytes,
InboundQueueLimit: opt.InboundQueueLimit,
AckEveryRecords: opt.AckEveryRecords,
AckDelay: opt.AckDelay,
}
}
func normalizeRecordStreamOpenOptions(opt StreamOpenOptions) StreamOpenOptions {
opt.Channel = StreamRecordChannel
opt.Metadata = advertiseRecordStreamOpenMetadata(opt.Metadata)
return opt
}
func WrapStreamAsRecord(stream Stream, opt RecordOpenOptions) (RecordStream, error) {
if stream == nil {
return nil, errRecordStreamNil
}
opt = normalizeRecordOpenOptions(opt)
parent := stream.Context()
if parent == nil {
parent = context.Background()
}
ctx, cancel := context.WithCancel(parent)
record := &recordStream{
stream: stream,
ctx: ctx,
cancel: cancel,
cfg: recordConfigFromOptions(opt),
sendCh: make(chan recordOutboundMessage, opt.MaxBatchRecords*2),
flushCh: make(chan recordFlushRequest),
recvCh: make(chan RecordMessage, opt.InboundQueueLimit),
ackCh: make(chan struct{}, 1),
readerCh: make(chan struct{}),
useBatchAck: recordStreamUseBatchAck(stream.Metadata()),
stateNotify: make(chan struct{}),
outstandingSizes: make(map[uint64]int),
inboundApplied: make(map[uint64]struct{}),
}
go record.writerLoop()
go record.readLoop()
return record, nil
}
func (r *recordStream) ID() string {
if r == nil || r.stream == nil {
return ""
}
return r.stream.ID()
}
func (r *recordStream) Metadata() StreamMetadata {
if r == nil || r.stream == nil {
return nil
}
return cloneStreamMetadata(r.stream.Metadata())
}
func (r *recordStream) Context() context.Context {
if r == nil {
return context.Background()
}
return r.ctx
}
func (r *recordStream) WriteRecord(ctx context.Context, payload []byte) (uint64, error) {
if r == nil {
return 0, errRecordStreamNil
}
if ctx == nil {
ctx = context.Background()
}
size := len(payload)
for {
r.mu.Lock()
if err := r.streamErrorLocked(); err != nil {
r.mu.Unlock()
return 0, err
}
if r.outboundClosed {
r.mu.Unlock()
return 0, errRecordWriteClosed
}
if r.outstandingRecords >= r.cfg.MaxUnackedRecords || r.outstandingBytes+size > r.cfg.MaxUnackedBytes {
wait := r.stateNotify
r.mu.Unlock()
select {
case <-r.ctx.Done():
return 0, r.streamError()
case <-ctx.Done():
return 0, ctx.Err()
case <-wait:
}
continue
}
r.nextOutboundSeq++
msg := recordOutboundMessage{
Seq: r.nextOutboundSeq,
Payload: append([]byte(nil), payload...),
}
r.outstandingRecords++
r.outstandingBytes += size
r.outstandingSizes[msg.Seq] = size
select {
case <-r.ctx.Done():
r.rollbackReservedOutboundLocked(msg.Seq)
err := r.streamErrorLocked()
r.mu.Unlock()
return 0, err
case <-ctx.Done():
r.rollbackReservedOutboundLocked(msg.Seq)
r.mu.Unlock()
return 0, ctx.Err()
case r.sendCh <- msg:
r.enqueuedOutboundSeq = msg.Seq
r.signalStateLocked()
r.mu.Unlock()
return msg.Seq, nil
}
}
}
func (r *recordStream) Flush(ctx context.Context) error {
if r == nil {
return errRecordStreamNil
}
if ctx == nil {
ctx = context.Background()
}
if err := r.streamError(); err != nil {
return err
}
req := recordFlushRequest{
targetSeq: r.flushTargetSeq(),
done: make(chan error, 1),
}
select {
case <-r.ctx.Done():
return r.streamError()
case <-ctx.Done():
return ctx.Err()
case r.flushCh <- req:
}
select {
case <-r.ctx.Done():
return r.streamError()
case <-ctx.Done():
return ctx.Err()
case err := <-req.done:
return err
}
}
func (r *recordStream) Barrier(ctx context.Context) (uint64, error) {
if r == nil {
return 0, errRecordStreamNil
}
if ctx == nil {
ctx = context.Background()
}
return r.BarrierTo(ctx, r.flushTargetSeq())
}
func (r *recordStream) BarrierTo(ctx context.Context, target uint64) (uint64, error) {
if r == nil {
return 0, errRecordStreamNil
}
if ctx == nil {
ctx = context.Background()
}
current := r.flushTargetSeq()
if target == 0 {
target = current
}
if target > current {
return 0, errRecordSeqInvalid
}
r.obs.barrierCount.Add(1)
flushStart := time.Now()
err := r.Flush(ctx)
r.obs.barrierFlushWaitNanos.Add(time.Since(flushStart).Nanoseconds())
if err != nil {
return 0, err
}
if target == 0 {
return 0, nil
}
applyStart := time.Now()
err = r.waitAckedAtLeast(ctx, target)
r.obs.barrierApplyWaitNanos.Add(time.Since(applyStart).Nanoseconds())
if err != nil {
return 0, err
}
return target, nil
}
func (r *recordStream) ReadRecord(ctx context.Context) (RecordMessage, error) {
if r == nil {
return RecordMessage{}, errRecordStreamNil
}
if ctx == nil {
ctx = context.Background()
}
select {
case <-r.ctx.Done():
return RecordMessage{}, r.readError()
case <-ctx.Done():
return RecordMessage{}, ctx.Err()
case msg, ok := <-r.recvCh:
if ok {
return msg, nil
}
return RecordMessage{}, r.readError()
}
}
func (r *recordStream) AckRecord(seq uint64) error {
if r == nil {
return errRecordStreamNil
}
if seq == 0 {
return errRecordSeqInvalid
}
r.mu.Lock()
if seq > r.inboundReceivedSeq {
r.mu.Unlock()
return errRecordSeqNotReceived
}
if seq <= r.inboundAppliedSeq {
r.mu.Unlock()
return nil
}
r.inboundApplied[seq] = struct{}{}
advanced := false
for {
next := r.inboundAppliedSeq + 1
if _, ok := r.inboundApplied[next]; !ok {
break
}
delete(r.inboundApplied, next)
r.inboundAppliedSeq = next
advanced = true
}
if advanced {
r.signalStateLocked()
}
r.mu.Unlock()
if advanced {
r.notifyAckLoop()
}
return nil
}
func (r *recordStream) FailRecord(seq uint64, failure RecordFailure) error {
if r == nil {
return errRecordStreamNil
}
if seq == 0 {
return errRecordSeqInvalid
}
if failure.FailedSeq == 0 {
failure.FailedSeq = seq
}
if failure.Code == "" {
failure.Code = RecordErrorCodeApplyFailed
}
err := r.sendFailureFrame(failure)
if err != nil {
return err
}
r.setTerminalError(failure)
return r.stream.Reset(failure)
}
func (r *recordStream) CloseWrite() error {
if r == nil {
return errRecordStreamNil
}
if err := r.Flush(context.Background()); err != nil {
return err
}
if err := r.flushAckNow(); err != nil {
return err
}
r.mu.Lock()
r.outboundClosed = true
r.signalStateLocked()
r.mu.Unlock()
return r.stream.CloseWrite()
}
func (r *recordStream) Close() error {
if r == nil {
return nil
}
_ = r.flushAckNow()
r.cancel()
return r.stream.Close()
}
func (r *recordStream) Reset(err error) error {
if r == nil {
return nil
}
r.setTerminalError(err)
return r.stream.Reset(err)
}
func (r *recordStream) flushTargetSeq() uint64 {
if r == nil {
return 0
}
r.mu.Lock()
defer r.mu.Unlock()
return r.enqueuedOutboundSeq
}
func (r *recordStream) waitAckedAtLeast(ctx context.Context, target uint64) error {
for {
r.mu.Lock()
if err := r.streamErrorLocked(); err != nil {
r.mu.Unlock()
return err
}
if r.ackedOutboundSeq >= target {
r.mu.Unlock()
return nil
}
if r.remoteClosed {
r.mu.Unlock()
return io.EOF
}
wait := r.stateNotify
r.mu.Unlock()
select {
case <-r.ctx.Done():
return r.streamError()
case <-ctx.Done():
return ctx.Err()
case <-wait:
}
}
}
func (r *recordStream) writerLoop() {
var (
batch []recordOutboundMessage
batches int
bytes int
batchTimer *time.Timer
batchTimerCh <-chan time.Time
ackTimer *time.Timer
ackTimerCh <-chan time.Time
)
stopBatchTimer := func() {
if batchTimer == nil {
return
}
if !batchTimer.Stop() {
select {
case <-batchTimer.C:
default:
}
}
batchTimerCh = nil
}
stopAckTimer := func() {
if ackTimer == nil {
return
}
if !ackTimer.Stop() {
select {
case <-ackTimer.C:
default:
}
}
ackTimerCh = nil
}
scheduleAck := func(hasPendingBatch bool, force bool) (uint64, bool) {
ackSeq := r.pendingAckSeq()
if ackSeq == 0 {
stopAckTimer()
return 0, false
}
if force {
stopAckTimer()
return ackSeq, true
}
if hasPendingBatch && r.useBatchAck {
stopAckTimer()
return 0, false
}
if r.shouldSendAckNow() || r.cfg.AckDelay <= 0 {
stopAckTimer()
return ackSeq, true
}
if ackTimer == nil {
ackTimer = time.NewTimer(r.cfg.AckDelay)
} else {
ackTimer.Reset(r.cfg.AckDelay)
}
ackTimerCh = ackTimer.C
return 0, false
}
sendStandaloneAck := func(ackSeq uint64) error {
if ackSeq == 0 {
return nil
}
payload, err := encodeRecordAckFrame(ackSeq)
if err != nil {
return err
}
if err := r.writePayloadFrame(payload); err != nil {
return err
}
r.obs.ackFramesSent.Add(1)
r.markAckSent(ackSeq)
return nil
}
flushBatch := func() error {
if len(batch) == 0 {
return nil
}
ackSeq := r.pendingAckSeq()
payload, err := encodeRecordBatchFrame(batch, ackSeq, r.useBatchAck)
if err != nil {
return err
}
if err := r.writePayloadFrame(payload); err != nil {
return err
}
r.obs.batchFramesSent.Add(1)
if r.useBatchAck && ackSeq != 0 {
r.obs.piggybackAckSent.Add(1)
r.markAckSent(ackSeq)
}
r.markFlushed(batch[len(batch)-1].Seq)
batch = nil
batches = 0
bytes = 0
stopBatchTimer()
if ackSeq, sendNow := scheduleAck(false, false); sendNow {
return sendStandaloneAck(ackSeq)
}
return nil
}
flushUntil := func(target uint64) error {
for {
if target == 0 {
return flushBatch()
}
if r.flushedAtLeast(target) {
return nil
}
if len(batch) > 0 && batch[len(batch)-1].Seq >= target {
if err := flushBatch(); err != nil {
return err
}
if r.flushedAtLeast(target) {
return nil
}
continue
}
req, ok := r.nextOutboundForFlush()
if !ok {
return r.streamError()
}
batch = append(batch, req)
batches++
bytes += len(req.Payload)
if batches >= r.cfg.MaxBatchRecords || bytes >= r.cfg.MaxBatchBytes {
if err := flushBatch(); err != nil {
return err
}
}
}
}
for {
select {
case <-r.ctx.Done():
return
case req := <-r.sendCh:
batch = append(batch, req)
batches++
bytes += len(req.Payload)
if len(batch) == 1 && r.cfg.MaxBatchDelay > 0 {
if batchTimer == nil {
batchTimer = time.NewTimer(r.cfg.MaxBatchDelay)
} else {
batchTimer.Reset(r.cfg.MaxBatchDelay)
}
batchTimerCh = batchTimer.C
}
if batches >= r.cfg.MaxBatchRecords || bytes >= r.cfg.MaxBatchBytes {
if err := flushBatch(); err != nil {
r.setTerminalError(err)
return
}
continue
}
if ackSeq, sendNow := scheduleAck(len(batch) > 0, false); sendNow {
if err := sendStandaloneAck(ackSeq); err != nil {
r.setTerminalError(err)
return
}
}
case req := <-r.flushCh:
err := flushUntil(req.targetSeq)
if err == nil && req.forceAck {
if ackSeq, sendNow := scheduleAck(len(batch) > 0, true); sendNow {
err = sendStandaloneAck(ackSeq)
}
}
req.done <- err
case <-batchTimerCh:
if err := flushBatch(); err != nil {
r.setTerminalError(err)
return
}
case <-r.ackCh:
if ackSeq, sendNow := scheduleAck(len(batch) > 0, false); sendNow {
if err := sendStandaloneAck(ackSeq); err != nil {
r.setTerminalError(err)
return
}
}
case <-ackTimerCh:
stopAckTimer()
if ackSeq, sendNow := scheduleAck(len(batch) > 0, true); sendNow {
if err := sendStandaloneAck(ackSeq); err != nil {
r.setTerminalError(err)
return
}
}
}
}
}
func (r *recordStream) readLoop() {
defer close(r.recvCh)
defer close(r.readerCh)
for {
payload, err := readTransferFrame(r.stream)
if err != nil {
if errors.Is(err, io.EOF) {
r.markRemoteClosed(nil)
return
}
r.setReadError(err)
return
}
frame, err := decodeRecordFrame(payload)
if err != nil {
_ = r.sendFailureFrame(RecordFailure{
FailedSeq: r.nextInboundFailureSeq(),
Code: RecordErrorCodeProtocol,
Message: err.Error(),
})
r.setReadError(err)
_ = r.stream.Reset(err)
return
}
switch frame.Type {
case recordFrameTypeBatch:
r.obs.batchFramesReceived.Add(1)
if frame.AckSeq != 0 {
r.obs.piggybackAckReceived.Add(1)
if err := r.handleAckFrame(frame.AckSeq); err != nil {
r.setReadError(err)
_ = r.stream.Reset(err)
return
}
}
if err := r.handleBatchFrame(frame.Batch); err != nil {
_ = r.sendFailureFrame(RecordFailure{
FailedSeq: r.nextInboundFailureSeq(),
Code: RecordErrorCodeProtocol,
Message: err.Error(),
})
r.setReadError(err)
_ = r.stream.Reset(err)
return
}
case recordFrameTypeAck:
r.obs.ackFramesReceived.Add(1)
if err := r.handleAckFrame(frame.AckSeq); err != nil {
r.setReadError(err)
_ = r.stream.Reset(err)
return
}
case recordFrameTypeError:
r.obs.errorFramesReceived.Add(1)
r.setReadError(frame.Failure)
return
default:
r.setReadError(errRecordFrameInvalid)
return
}
}
}
func (r *recordStream) handleBatchFrame(batch []recordOutboundMessage) error {
if len(batch) == 0 {
return errRecordFrameInvalid
}
r.mu.Lock()
expected := r.inboundReceivedSeq + 1
if batch[0].Seq != expected {
r.mu.Unlock()
return errRecordSeqInvalid
}
lastSeq := batch[len(batch)-1].Seq
r.inboundReceivedSeq = lastSeq
r.updatePendingApplyLocked()
r.signalStateLocked()
r.mu.Unlock()
for _, item := range batch {
select {
case <-r.ctx.Done():
return r.streamError()
case r.recvCh <- RecordMessage{Seq: item.Seq, Payload: item.Payload}:
}
}
return nil
}
func (r *recordStream) handleAckFrame(ackSeq uint64) error {
r.mu.Lock()
defer r.mu.Unlock()
if ackSeq < r.ackedOutboundSeq || ackSeq > r.nextOutboundSeq {
return errRecordSeqInvalid
}
for seq := r.ackedOutboundSeq + 1; seq <= ackSeq; seq++ {
if size, ok := r.outstandingSizes[seq]; ok {
delete(r.outstandingSizes, seq)
r.outstandingBytes -= size
if r.outstandingBytes < 0 {
r.outstandingBytes = 0
}
r.outstandingRecords--
if r.outstandingRecords < 0 {
r.outstandingRecords = 0
}
}
}
r.ackedOutboundSeq = ackSeq
r.signalStateLocked()
return nil
}
func (r *recordStream) markFlushed(seq uint64) {
if r == nil || seq == 0 {
return
}
r.mu.Lock()
if seq > r.flushedOutboundSeq {
r.flushedOutboundSeq = seq
r.signalStateLocked()
}
r.mu.Unlock()
}
func (r *recordStream) flushedAtLeast(target uint64) bool {
if r == nil || target == 0 {
return true
}
r.mu.Lock()
defer r.mu.Unlock()
return r.flushedOutboundSeq >= target
}
func (r *recordStream) markRemoteClosed(err error) {
r.mu.Lock()
r.remoteClosed = true
if err != nil && r.readErr == nil {
r.readErr = err
}
r.signalStateLocked()
r.mu.Unlock()
}
func (r *recordStream) setReadError(err error) {
if err == nil {
return
}
r.mu.Lock()
if r.readErr == nil {
r.readErr = err
}
r.signalStateLocked()
r.mu.Unlock()
r.cancel()
}
func (r *recordStream) setTerminalError(err error) {
if err == nil {
return
}
r.mu.Lock()
if r.terminalErr == nil {
r.terminalErr = err
}
if r.readErr == nil {
r.readErr = err
}
r.signalStateLocked()
r.mu.Unlock()
r.cancel()
}
func (r *recordStream) rollbackReservedOutboundLocked(seq uint64) {
if r == nil || seq == 0 {
return
}
if size, ok := r.outstandingSizes[seq]; ok {
delete(r.outstandingSizes, seq)
r.outstandingBytes -= size
if r.outstandingBytes < 0 {
r.outstandingBytes = 0
}
r.outstandingRecords--
if r.outstandingRecords < 0 {
r.outstandingRecords = 0
}
}
if r.nextOutboundSeq == seq {
r.nextOutboundSeq--
}
r.signalStateLocked()
}
func (r *recordStream) readError() error {
if r == nil {
return errRecordStreamNil
}
r.mu.Lock()
defer r.mu.Unlock()
if r.readErr != nil {
return r.readErr
}
if r.terminalErr != nil {
return r.terminalErr
}
return io.EOF
}
func (r *recordStream) streamError() error {
if r == nil {
return errRecordStreamNil
}
r.mu.Lock()
defer r.mu.Unlock()
return r.streamErrorLocked()
}
func (r *recordStream) streamErrorLocked() error {
if r.readErr != nil {
return r.readErr
}
if r.terminalErr != nil {
return r.terminalErr
}
return nil
}
func (r *recordStream) shouldSendAckNow() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.inboundAppliedSeq > r.inboundAckSentSeq && int(r.inboundAppliedSeq-r.inboundAckSentSeq) >= r.cfg.AckEveryRecords
}
func (r *recordStream) pendingAckSeq() uint64 {
if r == nil {
return 0
}
r.mu.Lock()
defer r.mu.Unlock()
if r.inboundAppliedSeq <= r.inboundAckSentSeq {
return 0
}
return r.inboundAppliedSeq
}
func (r *recordStream) markAckSent(ackSeq uint64) {
if r == nil || ackSeq == 0 {
return
}
r.mu.Lock()
if ackSeq > r.inboundAckSentSeq {
r.inboundAckSentSeq = ackSeq
r.signalStateLocked()
}
r.mu.Unlock()
}
func (r *recordStream) flushAckNow() error {
if r == nil {
return errRecordStreamNil
}
req := recordFlushRequest{
forceAck: true,
done: make(chan error, 1),
}
select {
case <-r.ctx.Done():
return r.streamError()
case r.flushCh <- req:
}
select {
case <-r.ctx.Done():
return r.streamError()
case err := <-req.done:
return err
}
}
func (r *recordStream) sendFailureFrame(failure RecordFailure) error {
payload, err := encodeRecordErrorFrame(failure)
if err != nil {
return err
}
if err := r.writePayloadFrame(payload); err != nil {
return err
}
r.obs.errorFramesSent.Add(1)
return nil
}
func (r *recordStream) writePayloadFrame(payload []byte) error {
if r == nil {
return errRecordStreamNil
}
if payload == nil {
return nil
}
frame := buildTransferFrame(payload)
r.writeMu.Lock()
defer r.writeMu.Unlock()
return writeTransferFrames(r.stream, frame)
}
func (r *recordStream) notifyAckLoop() {
if r == nil {
return
}
select {
case r.ackCh <- struct{}{}:
default:
}
}
func (r *recordStream) nextOutboundForFlush() (recordOutboundMessage, bool) {
if r == nil {
return recordOutboundMessage{}, false
}
select {
case <-r.ctx.Done():
return recordOutboundMessage{}, false
case req := <-r.sendCh:
return req, true
}
}
func (r *recordStream) nextInboundFailureSeq() uint64 {
if r == nil {
return 1
}
r.mu.Lock()
defer r.mu.Unlock()
return r.inboundReceivedSeq + 1
}
func (r *recordStream) signalStateLocked() {
close(r.stateNotify)
r.stateNotify = make(chan struct{})
}
func (r *recordStream) updatePendingApplyLocked() {
if r == nil {
return
}
pending := recordPendingCount(r.inboundReceivedSeq, r.inboundAppliedSeq)
if pending > r.maxPendingApply {
r.maxPendingApply = pending
}
}