240 lines
5.9 KiB
Go
240 lines
5.9 KiB
Go
|
|
package binlog
|
||
|
|
|
||
|
|
import (
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
type TransactionOutcome string
|
||
|
|
|
||
|
|
const (
|
||
|
|
TransactionOutcomeCommit TransactionOutcome = "commit"
|
||
|
|
TransactionOutcomeRollback TransactionOutcome = "rollback"
|
||
|
|
TransactionOutcomeAutocommit TransactionOutcome = "autocommit"
|
||
|
|
TransactionOutcomeOpen TransactionOutcome = "open"
|
||
|
|
TransactionOutcomeOther TransactionOutcome = "other"
|
||
|
|
)
|
||
|
|
|
||
|
|
type TransactionSummary struct {
|
||
|
|
GTID string
|
||
|
|
SeenTime time.Time
|
||
|
|
LastEventTime time.Time
|
||
|
|
BeginTime time.Time
|
||
|
|
EndTime time.Time
|
||
|
|
Duration time.Duration
|
||
|
|
HasBeginBoundary bool
|
||
|
|
HasEndBoundary bool
|
||
|
|
HasExplicitDuration bool
|
||
|
|
HasDuration bool
|
||
|
|
Outcome TransactionOutcome
|
||
|
|
StartPos int
|
||
|
|
EndPos int
|
||
|
|
Size int
|
||
|
|
RowsCount int
|
||
|
|
StatementsCount int
|
||
|
|
Tables []string
|
||
|
|
SampleSQL string
|
||
|
|
LastCommitted int64
|
||
|
|
SequenceNumber int64
|
||
|
|
TransactionLength uint64
|
||
|
|
ImmediateCommitTimestamp uint64
|
||
|
|
OriginalCommitTimestamp uint64
|
||
|
|
}
|
||
|
|
|
||
|
|
func SummarizeTransaction(tx Transaction) TransactionSummary {
|
||
|
|
fillTimeLazy(&tx)
|
||
|
|
s := TransactionSummary{
|
||
|
|
GTID: strings.TrimSpace(tx.GTID),
|
||
|
|
SeenTime: tx.Time,
|
||
|
|
LastEventTime: tx.Time,
|
||
|
|
StartPos: tx.StartPos,
|
||
|
|
EndPos: tx.EndPos,
|
||
|
|
Size: tx.Size,
|
||
|
|
RowsCount: tx.RowsCount,
|
||
|
|
LastCommitted: tx.LastCommitted,
|
||
|
|
SequenceNumber: tx.SequenceNumber,
|
||
|
|
TransactionLength: tx.TransactionLength,
|
||
|
|
ImmediateCommitTimestamp: tx.ImmediateCommitTimestamp,
|
||
|
|
OriginalCommitTimestamp: tx.OriginalCommitTimestamp,
|
||
|
|
}
|
||
|
|
|
||
|
|
tableSeen := make(map[string]struct{}, 8)
|
||
|
|
hasBegin := false
|
||
|
|
hasCommit := false
|
||
|
|
hasRollback := false
|
||
|
|
hasNonBoundary := false
|
||
|
|
|
||
|
|
for _, detail := range tx.Txs {
|
||
|
|
if detail.Time.IsZero() && detail.Timestamp != 0 {
|
||
|
|
detail.Time = time.Unix(detail.Timestamp, 0)
|
||
|
|
}
|
||
|
|
if !detail.Time.IsZero() {
|
||
|
|
s.LastEventTime = maxSummaryTime(s.LastEventTime, detail.Time)
|
||
|
|
switch boundaryKind(detail.Sql) {
|
||
|
|
case "begin":
|
||
|
|
s.BeginTime = minSummaryTime(s.BeginTime, detail.Time)
|
||
|
|
s.HasBeginBoundary = true
|
||
|
|
case "commit", "rollback":
|
||
|
|
s.EndTime = maxSummaryTime(s.EndTime, detail.Time)
|
||
|
|
s.HasEndBoundary = true
|
||
|
|
}
|
||
|
|
}
|
||
|
|
s.RowsCount += detail.RowCount
|
||
|
|
if strings.TrimSpace(detail.Sql) != "" && !isBoundaryDetail(detail) {
|
||
|
|
s.StatementsCount++
|
||
|
|
if s.SampleSQL == "" {
|
||
|
|
s.SampleSQL = compactSampleSQL(detail.Sql)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
tableKey := summaryTableKey(detail.Db, detail.Table)
|
||
|
|
if tableKey != "" {
|
||
|
|
if _, ok := tableSeen[tableKey]; !ok {
|
||
|
|
tableSeen[tableKey] = struct{}{}
|
||
|
|
s.Tables = append(s.Tables, tableKey)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
switch boundaryKind(detail.Sql) {
|
||
|
|
case "begin":
|
||
|
|
hasBegin = true
|
||
|
|
case "commit":
|
||
|
|
hasCommit = true
|
||
|
|
case "rollback":
|
||
|
|
hasRollback = true
|
||
|
|
default:
|
||
|
|
if !strings.EqualFold(strings.TrimSpace(detail.SqlType), "query") || strings.TrimSpace(detail.Sql) != "" {
|
||
|
|
hasNonBoundary = true
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if tx.RowsCount > 0 {
|
||
|
|
s.RowsCount = tx.RowsCount
|
||
|
|
}
|
||
|
|
if s.SeenTime.IsZero() {
|
||
|
|
s.SeenTime = firstDetailTime(tx)
|
||
|
|
}
|
||
|
|
if s.BeginTime.IsZero() {
|
||
|
|
s.BeginTime = firstNonZeroSummaryTime(firstDetailTime(tx), s.SeenTime)
|
||
|
|
}
|
||
|
|
if s.EndTime.IsZero() {
|
||
|
|
s.EndTime = firstNonZeroSummaryTime(s.LastEventTime, s.BeginTime, s.SeenTime)
|
||
|
|
}
|
||
|
|
if !s.BeginTime.IsZero() && !s.EndTime.IsZero() && !s.EndTime.Before(s.BeginTime) {
|
||
|
|
s.Duration = s.EndTime.Sub(s.BeginTime)
|
||
|
|
s.HasDuration = true
|
||
|
|
s.HasExplicitDuration = s.HasBeginBoundary && s.HasEndBoundary
|
||
|
|
}
|
||
|
|
s.Outcome = summarizeOutcome(tx, hasBegin, hasCommit, hasRollback, hasNonBoundary)
|
||
|
|
return s
|
||
|
|
}
|
||
|
|
|
||
|
|
func summarizeOutcome(tx Transaction, hasBegin bool, hasCommit bool, hasRollback bool, hasNonBoundary bool) TransactionOutcome {
|
||
|
|
switch {
|
||
|
|
case hasCommit:
|
||
|
|
return TransactionOutcomeCommit
|
||
|
|
case hasRollback:
|
||
|
|
return TransactionOutcomeRollback
|
||
|
|
case hasBegin:
|
||
|
|
return TransactionOutcomeOpen
|
||
|
|
}
|
||
|
|
switch tx.Status {
|
||
|
|
case STATUS_COMMIT:
|
||
|
|
return TransactionOutcomeCommit
|
||
|
|
case STATUS_ROLLBACK:
|
||
|
|
return TransactionOutcomeRollback
|
||
|
|
case STATUS_BEGIN:
|
||
|
|
return TransactionOutcomeOpen
|
||
|
|
case STATUS_PREPARE:
|
||
|
|
if hasNonBoundary {
|
||
|
|
return TransactionOutcomeAutocommit
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if hasNonBoundary {
|
||
|
|
return TransactionOutcomeAutocommit
|
||
|
|
}
|
||
|
|
return TransactionOutcomeOther
|
||
|
|
}
|
||
|
|
|
||
|
|
func isBoundaryDetail(detail TxDetail) bool {
|
||
|
|
if !strings.EqualFold(strings.TrimSpace(detail.SqlType), "query") {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
return boundaryKind(detail.Sql) != ""
|
||
|
|
}
|
||
|
|
|
||
|
|
func boundaryKind(sql string) string {
|
||
|
|
switch strings.ToLower(strings.TrimSpace(sql)) {
|
||
|
|
case "begin":
|
||
|
|
return "begin"
|
||
|
|
case "commit":
|
||
|
|
return "commit"
|
||
|
|
case "rollback":
|
||
|
|
return "rollback"
|
||
|
|
default:
|
||
|
|
return ""
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func summaryTableKey(db string, table string) string {
|
||
|
|
db = strings.ToLower(strings.TrimSpace(db))
|
||
|
|
table = strings.ToLower(strings.TrimSpace(table))
|
||
|
|
if db == "" || table == "" {
|
||
|
|
return ""
|
||
|
|
}
|
||
|
|
return db + "." + table
|
||
|
|
}
|
||
|
|
|
||
|
|
func compactSampleSQL(sql string) string {
|
||
|
|
sql = strings.Join(strings.Fields(sql), " ")
|
||
|
|
if len(sql) <= 200 {
|
||
|
|
return sql
|
||
|
|
}
|
||
|
|
return sql[:197] + "..."
|
||
|
|
}
|
||
|
|
|
||
|
|
func firstDetailTime(tx Transaction) time.Time {
|
||
|
|
var ret time.Time
|
||
|
|
for _, detail := range tx.Txs {
|
||
|
|
t := detail.Time
|
||
|
|
if t.IsZero() && detail.Timestamp != 0 {
|
||
|
|
t = time.Unix(detail.Timestamp, 0)
|
||
|
|
}
|
||
|
|
ret = minSummaryTime(ret, t)
|
||
|
|
}
|
||
|
|
return ret
|
||
|
|
}
|
||
|
|
|
||
|
|
func minSummaryTime(a time.Time, b time.Time) time.Time {
|
||
|
|
if a.IsZero() {
|
||
|
|
return b
|
||
|
|
}
|
||
|
|
if b.IsZero() {
|
||
|
|
return a
|
||
|
|
}
|
||
|
|
if b.Before(a) {
|
||
|
|
return b
|
||
|
|
}
|
||
|
|
return a
|
||
|
|
}
|
||
|
|
|
||
|
|
func maxSummaryTime(a time.Time, b time.Time) time.Time {
|
||
|
|
if a.IsZero() {
|
||
|
|
return b
|
||
|
|
}
|
||
|
|
if b.IsZero() {
|
||
|
|
return a
|
||
|
|
}
|
||
|
|
if b.After(a) {
|
||
|
|
return b
|
||
|
|
}
|
||
|
|
return a
|
||
|
|
}
|
||
|
|
|
||
|
|
func firstNonZeroSummaryTime(items ...time.Time) time.Time {
|
||
|
|
for _, item := range items {
|
||
|
|
if !item.IsZero() {
|
||
|
|
return item
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return time.Time{}
|
||
|
|
}
|