|
|
@ -381,6 +381,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|
|
|
tx.RowsCount += v.RowCount
|
|
|
|
tx.RowsCount += v.RowCount
|
|
|
|
tx.Txs[k] = v
|
|
|
|
tx.Txs[k] = v
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if filter.OnlyShowGtid {
|
|
|
|
|
|
|
|
tx.EndPos = tx.StartPos
|
|
|
|
|
|
|
|
}
|
|
|
|
tx.Size = tx.EndPos - tx.StartPos
|
|
|
|
tx.Size = tx.EndPos - tx.StartPos
|
|
|
|
callFn(tx)
|
|
|
|
callFn(tx)
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
@ -451,6 +454,14 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|
|
|
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
|
|
|
|
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
|
|
|
|
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
|
|
|
|
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if filter.EndPos != 0 && startPos > filter.EndPos {
|
|
|
|
|
|
|
|
skipTillNext = true
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if filter.StartPos != 0 && startPos < filter.EndPos {
|
|
|
|
|
|
|
|
skipTillNext = true
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
switch ev.Type {
|
|
|
|
switch ev.Type {
|
|
|
|
case "gtid":
|
|
|
|
case "gtid":
|
|
|
|
if skipTillNext {
|
|
|
|
if skipTillNext {
|
|
|
|