258 lines
7.8 KiB
Go
258 lines
7.8 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"io"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
itransfer "b612.me/notify/internal/transfer"
|
||
|
|
)
|
||
|
|
|
||
|
|
type transferDelayedSource struct {
|
||
|
|
*transferBytesSource
|
||
|
|
delay time.Duration
|
||
|
|
}
|
||
|
|
|
||
|
|
func newTransferDelayedSource(data []byte, delay time.Duration) *transferDelayedSource {
|
||
|
|
return &transferDelayedSource{
|
||
|
|
transferBytesSource: newTransferBytesSource(data),
|
||
|
|
delay: delay,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *transferDelayedSource) ReadAt(p []byte, off int64) (int, error) {
|
||
|
|
time.Sleep(s.delay)
|
||
|
|
return s.transferBytesSource.ReadAt(p, off)
|
||
|
|
}
|
||
|
|
|
||
|
|
type transferDelayedWriteStream struct {
|
||
|
|
transferWriteCountStream
|
||
|
|
delay time.Duration
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *transferDelayedWriteStream) Write(p []byte) (int, error) {
|
||
|
|
time.Sleep(s.delay)
|
||
|
|
return s.transferWriteCountStream.Write(p)
|
||
|
|
}
|
||
|
|
|
||
|
|
type transferDelayedCommitSink struct {
|
||
|
|
data []byte
|
||
|
|
writeDelay time.Duration
|
||
|
|
syncDelay time.Duration
|
||
|
|
commitDelay time.Duration
|
||
|
|
}
|
||
|
|
|
||
|
|
func newTransferDelayedCommitSink(size int, writeDelay time.Duration, syncDelay time.Duration, commitDelay time.Duration) *transferDelayedCommitSink {
|
||
|
|
return &transferDelayedCommitSink{
|
||
|
|
data: make([]byte, size),
|
||
|
|
writeDelay: writeDelay,
|
||
|
|
syncDelay: syncDelay,
|
||
|
|
commitDelay: commitDelay,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *transferDelayedCommitSink) WriteAt(p []byte, off int64) (int, error) {
|
||
|
|
time.Sleep(s.writeDelay)
|
||
|
|
if off < 0 {
|
||
|
|
return 0, io.ErrShortWrite
|
||
|
|
}
|
||
|
|
if int(off) > len(s.data) || len(p) > len(s.data)-int(off) {
|
||
|
|
return 0, io.ErrShortWrite
|
||
|
|
}
|
||
|
|
copy(s.data[off:], p)
|
||
|
|
return len(p), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *transferDelayedCommitSink) ReadAt(p []byte, off int64) (int, error) {
|
||
|
|
if off < 0 || off >= int64(len(s.data)) {
|
||
|
|
return 0, io.EOF
|
||
|
|
}
|
||
|
|
n := copy(p, s.data[off:])
|
||
|
|
if n < len(p) {
|
||
|
|
return n, io.EOF
|
||
|
|
}
|
||
|
|
return n, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *transferDelayedCommitSink) Sync(context.Context) error {
|
||
|
|
time.Sleep(s.syncDelay)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *transferDelayedCommitSink) Commit(context.Context) error {
|
||
|
|
time.Sleep(s.commitDelay)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func transferRuntimeSnapshotForTest(t *testing.T, runtime *transferRuntime, direction fileTransferDirection, scope string, transferID string) TransferSnapshot {
|
||
|
|
t.Helper()
|
||
|
|
snapshot, ok := runtime.snapshot(direction, scope, transferID)
|
||
|
|
if !ok {
|
||
|
|
t.Fatalf("runtime snapshot missing for %s", transferID)
|
||
|
|
}
|
||
|
|
return convertTransferSnapshot(snapshot)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestSendTransferSegmentsRecordsTelemetry(t *testing.T) {
|
||
|
|
const (
|
||
|
|
chunkSize = 4
|
||
|
|
readDelay = 5 * time.Millisecond
|
||
|
|
writeDelay = 4 * time.Millisecond
|
||
|
|
)
|
||
|
|
data := []byte("0123456789abcdef")
|
||
|
|
runtime := newTransferRuntime()
|
||
|
|
scope := clientFileScope()
|
||
|
|
transferID := "telemetry-send"
|
||
|
|
runtime.ensureTransferDescriptor(fileTransferDirectionSend, scope, scope, 0, itransfer.Descriptor{
|
||
|
|
ID: transferID,
|
||
|
|
Channel: itransfer.DataChannel,
|
||
|
|
Size: int64(len(data)),
|
||
|
|
})
|
||
|
|
|
||
|
|
target := transferSendTarget{
|
||
|
|
runtime: runtime,
|
||
|
|
runtimeScope: scope,
|
||
|
|
sequenceEn: func(value interface{}) ([]byte, error) {
|
||
|
|
segment, ok := value.(itransfer.Segment)
|
||
|
|
if !ok {
|
||
|
|
t.Fatalf("encoded value type = %T, want itransfer.Segment", value)
|
||
|
|
}
|
||
|
|
return append([]byte(nil), segment.Payload...), nil
|
||
|
|
},
|
||
|
|
}
|
||
|
|
stream := &transferDelayedWriteStream{delay: writeDelay}
|
||
|
|
opt := TransferSendOptions{
|
||
|
|
Descriptor: TransferDescriptor{
|
||
|
|
ID: transferID,
|
||
|
|
Channel: TransferChannelData,
|
||
|
|
Size: int64(len(data)),
|
||
|
|
},
|
||
|
|
Source: newTransferDelayedSource(data, readDelay),
|
||
|
|
ChunkSize: chunkSize,
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := sendTransferSegments(context.Background(), stream, target, opt, 0, transferSendHooks{}); err != nil {
|
||
|
|
t.Fatalf("sendTransferSegments failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
snapshot := transferRuntimeSnapshotForTest(t, runtime, fileTransferDirectionSend, scope, transferID)
|
||
|
|
if got, want := snapshot.SourceReadCount, len(data)/chunkSize; got != want {
|
||
|
|
t.Fatalf("source read count = %d, want %d", got, want)
|
||
|
|
}
|
||
|
|
if got := snapshot.SourceReadDuration; got < time.Duration(snapshot.SourceReadCount)*readDelay {
|
||
|
|
t.Fatalf("source read duration = %v, want at least %v", got, time.Duration(snapshot.SourceReadCount)*readDelay)
|
||
|
|
}
|
||
|
|
if got := snapshot.StreamWriteCount; got < 1 {
|
||
|
|
t.Fatalf("stream write count = %d, want at least 1", got)
|
||
|
|
}
|
||
|
|
if got := snapshot.StreamWriteDuration; got < writeDelay {
|
||
|
|
t.Fatalf("stream write duration = %v, want at least %v", got, writeDelay)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestTransferReceiveSessionCommitRecordsTelemetry(t *testing.T) {
|
||
|
|
const (
|
||
|
|
writeDelay = 4 * time.Millisecond
|
||
|
|
syncDelay = 3 * time.Millisecond
|
||
|
|
commitDelay = 5 * time.Millisecond
|
||
|
|
)
|
||
|
|
data := []byte("abcdefgh")
|
||
|
|
transferID := "telemetry-receive"
|
||
|
|
scope := clientFileScope()
|
||
|
|
runtime := newTransferRuntime()
|
||
|
|
runtime.ensureTransferDescriptor(fileTransferDirectionReceive, scope, scope, 0, itransfer.Descriptor{
|
||
|
|
ID: transferID,
|
||
|
|
Channel: itransfer.DataChannel,
|
||
|
|
Size: int64(len(data)),
|
||
|
|
Checksum: transferTestChecksum(data),
|
||
|
|
})
|
||
|
|
|
||
|
|
sink := newTransferDelayedCommitSink(len(data), writeDelay, syncDelay, commitDelay)
|
||
|
|
session := newTransferReceiveSession(scope, scope, nil, nil, 0, TransferReceiveOptions{
|
||
|
|
Descriptor: TransferDescriptor{
|
||
|
|
ID: transferID,
|
||
|
|
Channel: TransferChannelData,
|
||
|
|
Size: int64(len(data)),
|
||
|
|
Checksum: transferTestChecksum(data),
|
||
|
|
},
|
||
|
|
Sink: sink,
|
||
|
|
SyncOnCheckpoint: true,
|
||
|
|
VerifyChecksum: true,
|
||
|
|
})
|
||
|
|
|
||
|
|
if err := session.writeSegment(runtime, transferID, 0, data[:4]); err != nil {
|
||
|
|
t.Fatalf("writeSegment first failed: %v", err)
|
||
|
|
}
|
||
|
|
if err := session.writeSegment(runtime, transferID, 4, data[4:]); err != nil {
|
||
|
|
t.Fatalf("writeSegment second failed: %v", err)
|
||
|
|
}
|
||
|
|
if err := session.commit(context.Background(), runtime, transferID); err != nil {
|
||
|
|
t.Fatalf("commit failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
snapshot := transferRuntimeSnapshotForTest(t, runtime, fileTransferDirectionReceive, scope, transferID)
|
||
|
|
if got, want := snapshot.SinkWriteCount, 2; got != want {
|
||
|
|
t.Fatalf("sink write count = %d, want %d", got, want)
|
||
|
|
}
|
||
|
|
if got := snapshot.SinkWriteDuration; got < 2*writeDelay {
|
||
|
|
t.Fatalf("sink write duration = %v, want at least %v", got, 2*writeDelay)
|
||
|
|
}
|
||
|
|
if got := snapshot.SyncDuration; got < 3*syncDelay {
|
||
|
|
t.Fatalf("sync duration = %v, want at least %v", got, 3*syncDelay)
|
||
|
|
}
|
||
|
|
if got := snapshot.VerifyDuration; got <= 0 {
|
||
|
|
t.Fatalf("verify duration = %v, want > 0", got)
|
||
|
|
}
|
||
|
|
if got := snapshot.CommitDuration; got < commitDelay {
|
||
|
|
t.Fatalf("commit duration = %v, want at least %v", got, commitDelay)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRunTransferSendRecordsCommitWaitTelemetry(t *testing.T) {
|
||
|
|
const commitDelay = 6 * time.Millisecond
|
||
|
|
data := []byte("commit-wait")
|
||
|
|
transferID := "telemetry-commit-wait"
|
||
|
|
scope := clientFileScope()
|
||
|
|
runtime := newTransferRuntime()
|
||
|
|
runtime.ensureTransferDescriptor(fileTransferDirectionSend, scope, scope, 0, itransfer.Descriptor{
|
||
|
|
ID: transferID,
|
||
|
|
Channel: itransfer.DataChannel,
|
||
|
|
Size: int64(len(data)),
|
||
|
|
})
|
||
|
|
|
||
|
|
target := transferSendTarget{
|
||
|
|
runtime: runtime,
|
||
|
|
runtimeScope: scope,
|
||
|
|
sequenceEn: func(value interface{}) ([]byte, error) {
|
||
|
|
segment, ok := value.(itransfer.Segment)
|
||
|
|
if !ok {
|
||
|
|
t.Fatalf("encoded value type = %T, want itransfer.Segment", value)
|
||
|
|
}
|
||
|
|
return append([]byte(nil), segment.Payload...), nil
|
||
|
|
},
|
||
|
|
sendCommit: func(context.Context, TransferCommitRequest) (TransferCommitResponse, error) {
|
||
|
|
time.Sleep(commitDelay)
|
||
|
|
return TransferCommitResponse{TransferID: transferID, Accepted: true}, nil
|
||
|
|
},
|
||
|
|
}
|
||
|
|
opt := TransferSendOptions{
|
||
|
|
Descriptor: TransferDescriptor{
|
||
|
|
ID: transferID,
|
||
|
|
Channel: TransferChannelData,
|
||
|
|
Size: int64(len(data)),
|
||
|
|
},
|
||
|
|
Source: newTransferBytesSource(data),
|
||
|
|
ChunkSize: len(data),
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := runTransferSend(context.Background(), transferDiscardStream{}, opt, 0, target, transferSendHooks{}); err != nil {
|
||
|
|
t.Fatalf("runTransferSend failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
snapshot := transferRuntimeSnapshotForTest(t, runtime, fileTransferDirectionSend, scope, transferID)
|
||
|
|
if got := snapshot.CommitWaitDuration; got < commitDelay {
|
||
|
|
t.Fatalf("commit wait duration = %v, want at least %v", got, commitDelay)
|
||
|
|
}
|
||
|
|
}
|