notify/transfer_reader_writer.go

248 lines
5.0 KiB
Go
Raw Permalink Normal View History

package notify
import (
"errors"
"fmt"
"io"
"sync"
)
var (
errTransferReaderNil = errors.New("transfer reader is nil")
errTransferWriterNil = errors.New("transfer writer is nil")
errTransferOffsetInvalid = errors.New("transfer offset must be non-negative")
errTransferSequentialSourceOrderedRead = errors.New("transfer sequential source only supports ordered reads")
errTransferSequentialSourceParallelism = errors.New("transfer sequential source does not support parallel reads")
errTransferSequentialSourceImplicitChecksum = errors.New("transfer sequential source requires explicit checksum when VerifyChecksum is enabled")
errTransferSequentialSinkOrderedWrite = errors.New("transfer sequential sink only supports ordered writes")
)
type transferSequentialReaderSource interface {
transferSequentialReaderSource()
}
type transferSourceFromReader struct {
mu sync.Mutex
src io.Reader
size int64
offset int64
discard []byte
}
type transferSinkFromWriter struct {
mu sync.Mutex
dst io.Writer
offset int64
}
type transferReaderFromSource struct {
mu sync.Mutex
source TransferReaderAt
offset int64
}
type transferWriterFromSink struct {
mu sync.Mutex
sink TransferWriterAt
offset int64
}
func NewTransferSourceFromReader(src io.Reader, size int64) (TransferReaderAt, error) {
if src == nil {
return nil, errTransferReaderNil
}
if size < 0 {
return nil, errTransferSizeInvalid
}
return &transferSourceFromReader{
src: src,
size: size,
}, nil
}
func NewTransferSinkFromWriter(dst io.Writer) (TransferWriterAt, error) {
if dst == nil {
return nil, errTransferWriterNil
}
return &transferSinkFromWriter{dst: dst}, nil
}
func NewTransferReaderFromSource(source TransferReaderAt, offset int64) (io.Reader, error) {
if source == nil {
return nil, errTransferSourceNil
}
if offset < 0 {
return nil, errTransferOffsetInvalid
}
return &transferReaderFromSource{
source: source,
offset: offset,
}, nil
}
func NewTransferWriterFromSink(sink TransferWriterAt, offset int64) (io.Writer, error) {
if sink == nil {
return nil, errTransferSinkNil
}
if offset < 0 {
return nil, errTransferOffsetInvalid
}
return &transferWriterFromSink{
sink: sink,
offset: offset,
}, nil
}
func (s *transferSourceFromReader) transferSequentialReaderSource() {}
func (s *transferSourceFromReader) Size() int64 {
if s == nil {
return 0
}
return s.size
}
func (s *transferSourceFromReader) ReadAt(p []byte, off int64) (int, error) {
if s == nil || s.src == nil {
return 0, io.ErrClosedPipe
}
if off < 0 {
return 0, errTransferOffsetInvalid
}
if len(p) == 0 {
return 0, nil
}
s.mu.Lock()
defer s.mu.Unlock()
if off < s.offset {
return 0, fmt.Errorf("%w: got %d want >= %d", errTransferSequentialSourceOrderedRead, off, s.offset)
}
if off >= s.size {
return 0, io.EOF
}
if err := s.discardUntilLocked(off); err != nil {
return 0, err
}
remaining := s.size - s.offset
limited := p
truncated := false
if remaining < int64(len(limited)) {
limited = limited[:remaining]
truncated = true
}
n, err := s.src.Read(limited)
if n > 0 {
s.offset += int64(n)
}
if err != nil {
return n, err
}
if n == 0 {
return 0, io.ErrNoProgress
}
if truncated {
return n, io.EOF
}
return n, nil
}
func (s *transferSourceFromReader) discardUntilLocked(target int64) error {
for s.offset < target {
if s.discard == nil {
s.discard = make([]byte, 32*1024)
}
want := len(s.discard)
remaining := target - s.offset
if remaining < int64(want) {
want = int(remaining)
}
n, err := s.src.Read(s.discard[:want])
if n > 0 {
s.offset += int64(n)
}
if err != nil {
return err
}
if n == 0 {
return io.ErrNoProgress
}
}
return nil
}
func (s *transferSinkFromWriter) WriteAt(p []byte, off int64) (int, error) {
if s == nil || s.dst == nil {
return 0, io.ErrClosedPipe
}
if off < 0 {
return 0, errTransferOffsetInvalid
}
if len(p) == 0 {
return 0, nil
}
s.mu.Lock()
defer s.mu.Unlock()
if off != s.offset {
return 0, fmt.Errorf("%w: got %d want %d", errTransferSequentialSinkOrderedWrite, off, s.offset)
}
n, err := s.dst.Write(p)
if n > 0 {
s.offset += int64(n)
}
if err != nil {
return n, err
}
if n != len(p) {
return n, io.ErrShortWrite
}
return n, nil
}
func (s *transferSinkFromWriter) NextOffset() int64 {
if s == nil {
return 0
}
s.mu.Lock()
defer s.mu.Unlock()
return s.offset
}
func (r *transferReaderFromSource) Read(p []byte) (int, error) {
if r == nil || r.source == nil {
return 0, io.ErrClosedPipe
}
if len(p) == 0 {
return 0, nil
}
r.mu.Lock()
defer r.mu.Unlock()
n, err := r.source.ReadAt(p, r.offset)
if n > 0 {
r.offset += int64(n)
}
return n, err
}
func (w *transferWriterFromSink) Write(p []byte) (int, error) {
if w == nil || w.sink == nil {
return 0, io.ErrClosedPipe
}
if len(p) == 0 {
return 0, nil
}
w.mu.Lock()
defer w.mu.Unlock()
n, err := w.sink.WriteAt(p, w.offset)
if n > 0 {
w.offset += int64(n)
}
return n, err
}