189 lines
3.2 KiB
Go
Raw Permalink Normal View History

package transfer
import "time"
type Channel string
const (
ControlChannel Channel = "control"
DataChannel Channel = "data"
)
type Direction uint8
const (
DirectionSend Direction = iota
DirectionReceive
)
type State uint8
const (
StateInit State = iota
StateNegotiating
StatePrepared
StateActive
StatePaused
StateCommitting
StateVerifying
StateDone
StateAborted
StateFailed
)
func (s State) Terminal() bool {
switch s {
case StateDone, StateAborted, StateFailed:
return true
default:
return false
}
}
type Range struct {
Offset int64
Length int64
}
type Metadata map[string]string
type TelemetryDelta struct {
SourceReadDuration time.Duration
StreamWriteDuration time.Duration
SinkWriteDuration time.Duration
SyncDuration time.Duration
VerifyDuration time.Duration
CommitDuration time.Duration
CommitWaitDuration time.Duration
SourceReadCount int
StreamWriteCount int
SinkWriteCount int
}
type Descriptor struct {
ID string
Direction Direction
Channel Channel
Size int64
Checksum string
Metadata Metadata
}
type Snapshot struct {
ID string
Direction Direction
Channel Channel
State State
Stage string
LastFailureStage string
Size int64
Checksum string
Metadata Metadata
SentBytes int64
AckedBytes int64
ReceivedBytes int64
InflightBytes int64
RetryCount int
TimeoutCount int
LastError string
SourceReadDuration time.Duration
StreamWriteDuration time.Duration
SinkWriteDuration time.Duration
SyncDuration time.Duration
VerifyDuration time.Duration
CommitDuration time.Duration
CommitWaitDuration time.Duration
SourceReadCount int
StreamWriteCount int
SinkWriteCount int
StartedAt int64
UpdatedAt int64
CompletedAt int64
}
type Begin struct {
TransferID string
Channel Channel
Size int64
Checksum string
Metadata Metadata
}
type BeginAck struct {
TransferID string
Accepted bool
NextOffset int64
Missing []Range
Error string
}
type Resume struct {
TransferID string
}
type ResumeAck struct {
TransferID string
Accepted bool
NextOffset int64
Missing []Range
Error string
}
type Commit struct {
TransferID string
Size int64
Checksum string
}
type CommitAck struct {
TransferID string
Accepted bool
Error string
}
type Abort struct {
TransferID string
Stage string
Offset int64
Error string
}
type Segment struct {
TransferID string
Channel Channel
Offset int64
Payload []byte
Flags uint32
}
type Ack struct {
TransferID string
NextOffset int64
Missing []Range
Final bool
Error string
}
func normalizeChannel(channel Channel) Channel {
if channel == "" {
return DataChannel
}
return channel
}
func cloneMetadata(src Metadata) Metadata {
if len(src) == 0 {
return nil
}
dst := make(Metadata, len(src))
for key, value := range src {
dst[key] = value
}
return dst
}
func cloneSnapshot(src Snapshot) Snapshot {
src.Metadata = cloneMetadata(src.Metadata)
return src
}