notify/transfer_snapshot.go

575 lines
18 KiB
Go
Raw Permalink Normal View History

package notify
import (
itransfer "b612.me/notify/internal/transfer"
"errors"
"strconv"
"time"
)
type TransferDirection uint8
const (
TransferDirectionSend TransferDirection = iota
TransferDirectionReceive
)
type TransferState uint8
const (
TransferStateInit TransferState = iota
TransferStateNegotiating
TransferStatePrepared
TransferStateActive
TransferStatePaused
TransferStateCommitting
TransferStateVerifying
TransferStateDone
TransferStateAborted
TransferStateFailed
)
type TransferChannel string
const (
TransferChannelControl TransferChannel = "control"
TransferChannelData TransferChannel = "data"
)
type TransferSnapshot struct {
ID string
Scope string
RuntimeScope string
TransportGeneration uint64
Direction TransferDirection
Channel TransferChannel
State TransferState
Stage string
LastFailureStage string
Size int64
Checksum string
ChunkSize int
Parallelism int
MaxInflightBytes int64
Metadata map[string]string
SentBytes int64
AckedBytes int64
ReceivedBytes int64
InflightBytes int64
RetryCount int
TimeoutCount int
LastError string
SourceReadDuration time.Duration
StreamWriteDuration time.Duration
SinkWriteDuration time.Duration
SyncDuration time.Duration
VerifyDuration time.Duration
CommitDuration time.Duration
CommitWaitDuration time.Duration
SourceReadCount int
StreamWriteCount int
SinkWriteCount int
StartedAt time.Time
UpdatedAt time.Time
CompletedAt time.Time
}
type TransferTelemetrySummary struct {
SourceReadBytes int64
StreamWriteBytes int64
SinkWriteBytes int64
SourceReadThroughputBPS float64
StreamWriteThroughputBPS float64
SinkWriteThroughputBPS float64
WorkDuration time.Duration
ObservedDuration time.Duration
CommitWaitRatio float64
}
type TransferSnapshotQuery struct {
Scope string
RuntimeScope string
TransportGeneration uint64
MatchTransportGeneration bool
}
func (s TransferSnapshot) TelemetrySummary() TransferTelemetrySummary {
workDuration := s.SourceReadDuration + s.StreamWriteDuration + s.SinkWriteDuration +
s.SyncDuration + s.VerifyDuration + s.CommitDuration
observedDuration := workDuration + s.CommitWaitDuration
commitWaitRatio := durationRatio(s.CommitWaitDuration, observedDuration)
return TransferTelemetrySummary{
SourceReadBytes: transferSummarySourceReadBytes(s),
StreamWriteBytes: transferSummaryStreamWriteBytes(s),
SinkWriteBytes: transferSummarySinkWriteBytes(s),
SourceReadThroughputBPS: throughputBytesPerSecond(transferSummarySourceReadBytes(s), s.SourceReadDuration),
StreamWriteThroughputBPS: throughputBytesPerSecond(transferSummaryStreamWriteBytes(s), s.StreamWriteDuration),
SinkWriteThroughputBPS: throughputBytesPerSecond(transferSummarySinkWriteBytes(s), s.SinkWriteDuration),
WorkDuration: workDuration,
ObservedDuration: observedDuration,
CommitWaitRatio: commitWaitRatio,
}
}
type clientTransferSnapshotReader interface {
clientTransferSnapshots() []TransferSnapshot
clientTransferSnapshotByID(string) (TransferSnapshot, bool)
clientTransferSnapshotByIDScope(string, string) (TransferSnapshot, bool)
}
type serverTransferSnapshotReader interface {
serverTransferSnapshots() []TransferSnapshot
serverTransferSnapshotByID(string) (TransferSnapshot, bool)
serverTransferSnapshotByIDScope(string, string) (TransferSnapshot, bool)
}
type clientTransferSnapshotQueryReader interface {
clientTransferSnapshotByIDQuery(string, TransferSnapshotQuery) (TransferSnapshot, bool)
}
type serverTransferSnapshotQueryReader interface {
serverTransferSnapshotByIDQuery(string, TransferSnapshotQuery) (TransferSnapshot, bool)
}
var (
errClientTransferSnapshotNil = errors.New("client transfer snapshot target is nil")
errServerTransferSnapshotNil = errors.New("server transfer snapshot target is nil")
errClientTransferSnapshotUnsupported = errors.New("client transfer snapshot target type is unsupported")
errServerTransferSnapshotUnsupported = errors.New("server transfer snapshot target type is unsupported")
)
func GetClientTransferSnapshots(c Client) ([]TransferSnapshot, error) {
if c == nil {
return nil, errClientTransferSnapshotNil
}
reader, ok := any(c).(clientTransferSnapshotReader)
if !ok {
return nil, errClientTransferSnapshotUnsupported
}
return reader.clientTransferSnapshots(), nil
}
func GetServerTransferSnapshots(s Server) ([]TransferSnapshot, error) {
if s == nil {
return nil, errServerTransferSnapshotNil
}
reader, ok := any(s).(serverTransferSnapshotReader)
if !ok {
return nil, errServerTransferSnapshotUnsupported
}
return reader.serverTransferSnapshots(), nil
}
func GetClientTransferSnapshotByID(c Client, transferID string) (TransferSnapshot, bool, error) {
if c == nil {
return TransferSnapshot{}, false, errClientTransferSnapshotNil
}
reader, ok := any(c).(clientTransferSnapshotReader)
if !ok {
return TransferSnapshot{}, false, errClientTransferSnapshotUnsupported
}
snapshot, found := reader.clientTransferSnapshotByID(transferID)
return snapshot, found, nil
}
func GetClientTransferSnapshotByIDScope(c Client, transferID string, scope string) (TransferSnapshot, bool, error) {
if c == nil {
return TransferSnapshot{}, false, errClientTransferSnapshotNil
}
reader, ok := any(c).(clientTransferSnapshotReader)
if !ok {
return TransferSnapshot{}, false, errClientTransferSnapshotUnsupported
}
snapshot, found := reader.clientTransferSnapshotByIDScope(transferID, scope)
return snapshot, found, nil
}
func GetServerTransferSnapshotByID(s Server, transferID string) (TransferSnapshot, bool, error) {
if s == nil {
return TransferSnapshot{}, false, errServerTransferSnapshotNil
}
reader, ok := any(s).(serverTransferSnapshotReader)
if !ok {
return TransferSnapshot{}, false, errServerTransferSnapshotUnsupported
}
snapshot, found := reader.serverTransferSnapshotByID(transferID)
return snapshot, found, nil
}
func GetServerTransferSnapshotByIDScope(s Server, transferID string, scope string) (TransferSnapshot, bool, error) {
if s == nil {
return TransferSnapshot{}, false, errServerTransferSnapshotNil
}
reader, ok := any(s).(serverTransferSnapshotReader)
if !ok {
return TransferSnapshot{}, false, errServerTransferSnapshotUnsupported
}
snapshot, found := reader.serverTransferSnapshotByIDScope(transferID, scope)
return snapshot, found, nil
}
func GetClientTransferSnapshotByIDQuery(c Client, transferID string, query TransferSnapshotQuery) (TransferSnapshot, bool, error) {
if c == nil {
return TransferSnapshot{}, false, errClientTransferSnapshotNil
}
reader, ok := any(c).(clientTransferSnapshotQueryReader)
if !ok {
return TransferSnapshot{}, false, errClientTransferSnapshotUnsupported
}
snapshot, found := reader.clientTransferSnapshotByIDQuery(transferID, query)
return snapshot, found, nil
}
func GetServerTransferSnapshotByIDQuery(s Server, transferID string, query TransferSnapshotQuery) (TransferSnapshot, bool, error) {
if s == nil {
return TransferSnapshot{}, false, errServerTransferSnapshotNil
}
reader, ok := any(s).(serverTransferSnapshotQueryReader)
if !ok {
return TransferSnapshot{}, false, errServerTransferSnapshotUnsupported
}
snapshot, found := reader.serverTransferSnapshotByIDQuery(transferID, query)
return snapshot, found, nil
}
func (c *ClientCommon) clientTransferSnapshots() []TransferSnapshot {
return transferSnapshotsFromRuntime(c.getTransferRuntime())
}
func (c *ClientCommon) clientTransferSnapshotByID(transferID string) (TransferSnapshot, bool) {
return transferSnapshotByIDFromRuntime(c.getTransferRuntime(), transferID)
}
func (c *ClientCommon) clientTransferSnapshotByIDScope(transferID string, scope string) (TransferSnapshot, bool) {
return transferSnapshotByIDScopeFromRuntime(c.getTransferRuntime(), transferID, scope)
}
func (c *ClientCommon) clientTransferSnapshotByIDQuery(transferID string, query TransferSnapshotQuery) (TransferSnapshot, bool) {
return transferSnapshotByIDQueryFromRuntime(c.getTransferRuntime(), transferID, query)
}
func (s *ServerCommon) serverTransferSnapshots() []TransferSnapshot {
return transferSnapshotsFromRuntime(s.getTransferRuntime())
}
func (s *ServerCommon) serverTransferSnapshotByID(transferID string) (TransferSnapshot, bool) {
return transferSnapshotByIDFromRuntime(s.getTransferRuntime(), transferID)
}
func (s *ServerCommon) serverTransferSnapshotByIDScope(transferID string, scope string) (TransferSnapshot, bool) {
return transferSnapshotByIDScopeFromRuntime(s.getTransferRuntime(), transferID, scope)
}
func (s *ServerCommon) serverTransferSnapshotByIDQuery(transferID string, query TransferSnapshotQuery) (TransferSnapshot, bool) {
return transferSnapshotByIDQueryFromRuntime(s.getTransferRuntime(), transferID, query)
}
func transferSnapshotsFromRuntime(runtime *transferRuntime) []TransferSnapshot {
if runtime == nil {
return nil
}
src := runtime.snapshots()
out := make([]TransferSnapshot, 0, len(src))
for _, snapshot := range src {
out = append(out, convertTransferSnapshot(snapshot))
}
return out
}
func transferSnapshotByIDFromRuntime(runtime *transferRuntime, transferID string) (TransferSnapshot, bool) {
if runtime == nil || transferID == "" {
return TransferSnapshot{}, false
}
var matched TransferSnapshot
found := false
for _, snapshot := range transferSnapshotsFromRuntime(runtime) {
if snapshot.ID != transferID {
continue
}
if found {
return TransferSnapshot{}, false
}
matched = snapshot
found = true
}
return matched, found
}
func transferSnapshotByIDScopeFromRuntime(runtime *transferRuntime, transferID string, scope string) (TransferSnapshot, bool) {
if runtime == nil || transferID == "" {
return TransferSnapshot{}, false
}
wantScope := normalizeFileScope(scope)
var matched TransferSnapshot
found := false
for _, snapshot := range transferSnapshotsFromRuntime(runtime) {
if snapshot.ID != transferID || normalizeFileScope(snapshot.Scope) != wantScope {
continue
}
if found {
return TransferSnapshot{}, false
}
matched = snapshot
found = true
}
return matched, found
}
func transferSnapshotByIDQueryFromRuntime(runtime *transferRuntime, transferID string, query TransferSnapshotQuery) (TransferSnapshot, bool) {
if runtime == nil || transferID == "" {
return TransferSnapshot{}, false
}
var matched TransferSnapshot
found := false
for _, snapshot := range transferSnapshotsFromRuntime(runtime) {
if snapshot.ID != transferID {
continue
}
if !transferSnapshotQueryMatch(snapshot, query) {
continue
}
if found {
return TransferSnapshot{}, false
}
matched = snapshot
found = true
}
return matched, found
}
func transferSnapshotQueryMatch(snapshot TransferSnapshot, query TransferSnapshotQuery) bool {
if query.Scope != "" && normalizeFileScope(snapshot.Scope) != normalizeFileScope(query.Scope) {
return false
}
if query.RuntimeScope != "" && normalizeFileScope(snapshot.RuntimeScope) != normalizeFileScope(query.RuntimeScope) {
return false
}
if query.MatchTransportGeneration && snapshot.TransportGeneration != query.TransportGeneration {
return false
}
return true
}
func convertTransferSnapshot(snapshot itransfer.Snapshot) TransferSnapshot {
scope := transferSnapshotScope(snapshot.Metadata)
runtimeScope := transferSnapshotRuntimeScope(snapshot.Metadata)
id := transferSnapshotID(snapshot.ID, snapshot.Metadata)
metadata := cloneTransferMetadata(snapshot.Metadata)
return TransferSnapshot{
ID: id,
Scope: scope,
RuntimeScope: runtimeScope,
TransportGeneration: transferSnapshotTransportGeneration(snapshot.Metadata),
Direction: convertTransferDirection(snapshot.Direction),
Channel: TransferChannel(snapshot.Channel),
State: convertTransferState(snapshot.State),
Stage: snapshot.Stage,
LastFailureStage: snapshot.LastFailureStage,
Size: snapshot.Size,
Checksum: snapshot.Checksum,
ChunkSize: transferSnapshotChunkSize(snapshot.Metadata),
Parallelism: transferSnapshotParallelism(snapshot.Metadata),
MaxInflightBytes: transferSnapshotMaxInflightBytes(snapshot.Metadata),
Metadata: metadata,
SentBytes: snapshot.SentBytes,
AckedBytes: snapshot.AckedBytes,
ReceivedBytes: snapshot.ReceivedBytes,
InflightBytes: snapshot.InflightBytes,
RetryCount: snapshot.RetryCount,
TimeoutCount: snapshot.TimeoutCount,
LastError: snapshot.LastError,
SourceReadDuration: snapshot.SourceReadDuration,
StreamWriteDuration: snapshot.StreamWriteDuration,
SinkWriteDuration: snapshot.SinkWriteDuration,
SyncDuration: snapshot.SyncDuration,
VerifyDuration: snapshot.VerifyDuration,
CommitDuration: snapshot.CommitDuration,
CommitWaitDuration: snapshot.CommitWaitDuration,
SourceReadCount: snapshot.SourceReadCount,
StreamWriteCount: snapshot.StreamWriteCount,
SinkWriteCount: snapshot.SinkWriteCount,
StartedAt: unixNanoTime(snapshot.StartedAt),
UpdatedAt: unixNanoTime(snapshot.UpdatedAt),
CompletedAt: unixNanoTime(snapshot.CompletedAt),
}
}
func convertTransferDirection(direction itransfer.Direction) TransferDirection {
switch direction {
case itransfer.DirectionReceive:
return TransferDirectionReceive
default:
return TransferDirectionSend
}
}
func convertTransferState(state itransfer.State) TransferState {
switch state {
case itransfer.StateNegotiating:
return TransferStateNegotiating
case itransfer.StatePrepared:
return TransferStatePrepared
case itransfer.StateActive:
return TransferStateActive
case itransfer.StatePaused:
return TransferStatePaused
case itransfer.StateCommitting:
return TransferStateCommitting
case itransfer.StateVerifying:
return TransferStateVerifying
case itransfer.StateDone:
return TransferStateDone
case itransfer.StateAborted:
return TransferStateAborted
case itransfer.StateFailed:
return TransferStateFailed
default:
return TransferStateInit
}
}
func cloneTransferMetadata(src map[string]string) map[string]string {
if len(src) == 0 {
return nil
}
dst := make(map[string]string, len(src))
for key, value := range src {
if key == transferMetadataIDKey || key == transferMetadataScopeKey ||
key == transferMetadataRuntimeScopeKey || key == transferMetadataTransportGenerationKey ||
key == transferMetadataSendChunkSizeKey || key == transferMetadataSendParallelismKey ||
key == transferMetadataSendMaxInflightKey {
continue
}
dst[key] = value
}
if len(dst) == 0 {
return nil
}
return dst
}
func transferSnapshotID(fallback string, metadata map[string]string) string {
if metadata != nil {
if value := metadata[transferMetadataIDKey]; value != "" {
return value
}
}
return fallback
}
func transferSnapshotScope(metadata map[string]string) string {
if metadata != nil {
if value := metadata[transferMetadataScopeKey]; value != "" {
return normalizeFileScope(value)
}
}
return defaultFileScope
}
func transferSnapshotRuntimeScope(metadata map[string]string) string {
if metadata != nil {
if value := metadata[transferMetadataRuntimeScopeKey]; value != "" {
return normalizeFileScope(value)
}
}
return transferSnapshotScope(metadata)
}
func transferSnapshotTransportGeneration(metadata map[string]string) uint64 {
if metadata == nil {
return 0
}
value := metadata[transferMetadataTransportGenerationKey]
if value == "" {
return 0
}
gen, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return 0
}
return gen
}
func transferSnapshotChunkSize(metadata map[string]string) int {
return transferSnapshotMetadataInt(metadata, transferMetadataSendChunkSizeKey)
}
func transferSnapshotParallelism(metadata map[string]string) int {
return transferSnapshotMetadataInt(metadata, transferMetadataSendParallelismKey)
}
func transferSnapshotMaxInflightBytes(metadata map[string]string) int64 {
return transferSnapshotMetadataInt64(metadata, transferMetadataSendMaxInflightKey)
}
func transferSnapshotMetadataInt(metadata map[string]string, key string) int {
value := transferSnapshotMetadataInt64(metadata, key)
if value <= 0 {
return 0
}
return int(value)
}
func transferSnapshotMetadataInt64(metadata map[string]string, key string) int64 {
if metadata == nil {
return 0
}
value := metadata[key]
if value == "" {
return 0
}
parsed, err := strconv.ParseInt(value, 10, 64)
if err != nil || parsed <= 0 {
return 0
}
return parsed
}
func transferSummarySourceReadBytes(snapshot TransferSnapshot) int64 {
if snapshot.SentBytes > 0 {
return snapshot.SentBytes
}
if snapshot.AckedBytes > 0 {
return snapshot.AckedBytes
}
return 0
}
func transferSummaryStreamWriteBytes(snapshot TransferSnapshot) int64 {
if snapshot.SentBytes > 0 {
return snapshot.SentBytes
}
if snapshot.AckedBytes > 0 {
return snapshot.AckedBytes
}
return 0
}
func transferSummarySinkWriteBytes(snapshot TransferSnapshot) int64 {
if snapshot.ReceivedBytes > 0 {
return snapshot.ReceivedBytes
}
return 0
}
func throughputBytesPerSecond(bytes int64, dur time.Duration) float64 {
if bytes <= 0 || dur <= 0 {
return 0
}
return float64(bytes) / dur.Seconds()
}
func durationRatio(part time.Duration, whole time.Duration) float64 {
if part <= 0 || whole <= 0 {
return 0
}
return float64(part) / float64(whole)
}
func unixNanoTime(value int64) time.Time {
if value <= 0 {
return time.Time{}
}
return time.Unix(0, value)
}