From 095cb3dbdfaebee3a97c536bad66e3330b6cd889 Mon Sep 17 00:00:00 2001 From: starainrt Date: Sun, 8 Mar 2026 20:07:59 +0800 Subject: [PATCH] rewrite parse.go --- go.mod | 39 +- go.sum | 155 +++--- parse.go | 1406 ++++++++++++++++++++++++++++++------------------------ 3 files changed, 870 insertions(+), 730 deletions(-) diff --git a/go.mod b/go.mod index eba5000..3cdb533 100644 --- a/go.mod +++ b/go.mod @@ -1,26 +1,31 @@ module b612.me/mysql/binlog -go 1.20 +go 1.24.0 require ( b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 - b612.me/staros v1.1.7 - github.com/starainrt/go-mysql v0.0.0-20230425104003-c669a16617e3 + b612.me/staros v1.1.8 + github.com/starainrt/go-mysql v0.0.2 ) require ( - b612.me/stario v0.0.9 // indirect - b612.me/win32api v0.0.2 // indirect - b612.me/wincmd v0.0.3 // indirect - github.com/DataDog/zstd v1.5.2 // indirect - github.com/Masterminds/semver v1.5.0 // indirect - github.com/google/uuid v1.3.0 // indirect - github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 // indirect - github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect - github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect - github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed // indirect - go.uber.org/atomic v1.7.0 // indirect - golang.org/x/crypto v0.21.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/term v0.18.0 // indirect + b612.me/stario v0.0.10 // indirect + b612.me/win32api v0.0.3 // indirect + b612.me/wincmd v0.0.4 // indirect + filippo.io/edwards25519 v1.2.0 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect + github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee // indirect + github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a // indirect + github.com/pingcap/tidb/pkg/parser v0.0.0-20260219190905-9b9281fa8d6d // indirect + github.com/shopspring/decimal v1.4.0 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.1 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/term v0.40.0 // indirect + golang.org/x/text v0.34.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/go.sum b/go.sum index d1d31e3..5d7be86 100644 --- a/go.sum +++ b/go.sum @@ -1,155 +1,150 @@ b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94= -b612.me/stario v0.0.9 h1:bFDlejUJMwZ12a09snZJspQsOlkqpDAl9qKPEYOGWCk= -b612.me/stario v0.0.9/go.mod h1:x4D/x8zA5SC0pj/uJAi4FyG5p4j5UZoMEZfvuRR6VNw= -b612.me/staros v1.1.7 h1:GkQp5sBPRqo3pOh6nKyKffJydyYrjlfzpsPxNeVJ26g= -b612.me/staros v1.1.7/go.mod h1:Yi/WfvIqRAPQEf/eiaaIwrL5LNcUbqzMIuFIyJJOU40= -b612.me/win32api v0.0.2 h1:5PwvPR5fYs3a/v+LjYdtRif+5Q04zRGLTVxmCYNjCpA= +b612.me/stario v0.0.10 h1:+cIyiDCBCjUfodMJDp4FLs+2E1jo7YENkN+sMEe6550= +b612.me/stario v0.0.10/go.mod h1:1Owmu9jzKWgs4VsmeI8YWlGwLrCwPNM/bYpxkyn+MMk= +b612.me/staros v1.1.8 h1:5Bpuf9q2nH75S2ekmieJuH3Y8LTqg/voxXCOiMAC3kk= +b612.me/staros v1.1.8/go.mod h1:4KmokjKXFW5h1hbA4aIv5O+2FptVzBubCo7IPirfqm8= b612.me/win32api v0.0.2/go.mod h1:sj66sFJDKElEjOR+0YhdSW6b4kq4jsXu4T5/Hnpyot0= -b612.me/wincmd v0.0.3 h1:GYrkYnNun39yfNcA2+u0h4VW/BYbTrJK39QW4W1LCYA= -b612.me/wincmd v0.0.3/go.mod h1:nWdNREHO6F+2PngEUcyYN3Eo7DzYEVa/fO6czd9d/fo= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= -github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= -github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= -github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= +b612.me/win32api v0.0.3 h1:TfINlv9BBmWC/YbkJ0MTpN1NzTFPnnTGB5Dux6iRWIA= +b612.me/win32api v0.0.3/go.mod h1:sj66sFJDKElEjOR+0YhdSW6b4kq4jsXu4T5/Hnpyot0= +b612.me/wincmd v0.0.4 h1:fv9p1V8mw2HdUjaoZBWZy0T41JftueyLxAuch1MgtdI= +b612.me/wincmd v0.0.4/go.mod h1:o3yPoE+DpVPHGKl/q1WT1C8OaIVwHEnpeNgMFqzlwD8= +filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo= +filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= -github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ= -github.com/cznic/strutil v0.0.0-20171016134553-529a34b1c186/go.mod h1:AHHPPPXTw0h6pVabbcbyGRK1DckRn7r/STdZEeIDzZc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= -github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg= -github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 h1:+FZIDR/D97YOPik4N4lPDaUcLDF/EQPogxtlHB2ZZRM= -github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= -github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= -github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= +github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee h1:/IDPbpzkzA97t1/Z1+C3KlxbevjMeaI6BQYxvivu4u8= +github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8= +github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= +github.com/pingcap/tidb/pkg/parser v0.0.0-20260219190905-9b9281fa8d6d h1:jD97s7AVHGuKGqvbJkTcNpMlcSx5Qv/sZF0XHENK+0w= +github.com/pingcap/tidb/pkg/parser v0.0.0-20260219190905-9b9281fa8d6d/go.mod h1:oHE+ub2QaDERd+UNHe4z2BhFV2jZrm7VNOe6atR9AF4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= -github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed h1:KMgQoLJGCq1IoZpLZE3AIffh9veYWoVlsvA4ib55TMM= -github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= -github.com/starainrt/go-mysql v0.0.0-20230425104003-c669a16617e3 h1:uG2HtVhi/UtC+sUiArLV8cc8SDlUrRkqLChriV5WYZw= -github.com/starainrt/go-mysql v0.0.0-20230425104003-c669a16617e3/go.mod h1:nNTDdEbTDJ9dK0b1NJ+xo804OQuZw39GfNfpryd9d3w= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/starainrt/go-mysql v0.0.2 h1:PcQX1kNekew1LEqWExN71NEjAWdZ8Cpuau0F1C29JQY= +github.com/starainrt/go-mysql v0.0.2/go.mod h1:O1TXVhZd1iT/vU027lg5mxAPU5b8UrG1SRkQqa3bgww= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= -golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= +golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= -modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254= -modernc.org/lex v1.0.0/go.mod h1:G6rxMTy3cH2iA0iXL/HRRv4Znu8MK4higxph/lE7ypk= -modernc.org/lexer v1.0.0/go.mod h1:F/Dld0YKYdZCLQ7bD0USbWL4YKCyTDRDHiDTOs0q0vk= -modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= -modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/parser v1.0.0/go.mod h1:H20AntYJ2cHHL6MHthJ8LZzXCdDCHMWt1KZXtIMjejA= -modernc.org/parser v1.0.2/go.mod h1:TXNq3HABP3HMaqLK7brD1fLA/LfN0KS6JxZn71QdDqs= -modernc.org/scanner v1.0.1/go.mod h1:OIzD2ZtjYk6yTuyqZr57FmifbM9fIH74SumloSsajuE= -modernc.org/sortutil v1.0.0/go.mod h1:1QO0q8IlIlmjBIwm6t/7sof874+xCfZouyqZMLIAtxM= -modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= -modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= -modernc.org/y v1.0.1/go.mod h1:Ho86I+LVHEI+LYXoUKlmOMAM1JTXOCfj8qi1T8PsClE= diff --git a/parse.go b/parse.go index 3e5f0f2..ad3642f 100644 --- a/parse.go +++ b/parse.go @@ -3,27 +3,45 @@ package binlog import ( "b612.me/mysql/gtid" "b612.me/staros" + "bufio" "bytes" + "errors" "fmt" "github.com/starainrt/go-mysql/replication" "io" "os" "strings" + "sync" "time" ) +var ( + ErrInvalidBinlogHeader = errors.New("invalid binlog file header") + ErrEventTooSmall = errors.New("event size too small") +) + +const ( + CompressionNone uint64 = 255 + CompressionZSTD uint64 = 0 +) + +const ( + maxPooledBodyCap = 4 << 20 // 4MB + defaultReadBufSize = 1 << 20 // 1MB +) + type TxDetail struct { - StartPos int `json:"startPos"` - EndPos int `json:"endPos"` - RowCount int `json:"rowCount"` - Timestamp int64 `json:"timestamp"` - Time time.Time `json:"time"` - Sql string `json:"sql"` - Db string `json:"db"` - Table string `json:"table"` - SqlType string `json:"sqlType"` - CompressionType string `json:"compressionType"` - Rows [][]interface{} + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + RowCount int `json:"rowCount"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + Sql string `json:"sql"` + Db string `json:"db"` + Table string `json:"table"` + SqlType string `json:"sqlType"` + CompressionType string `json:"compressionType"` + Rows [][]interface{} `json:"rows"` } const ( @@ -34,306 +52,25 @@ const ( ) type Transaction struct { - GTID string `json:"gtid"` - Timestamp int64 `json:"timestamp"` - Time time.Time `json:"time"` - StartPos int `json:"startPos"` - EndPos int `json:"endPos"` - Size int `json:"size"` - RowsCount int `json:"rowsCount"` - Status uint8 `json:"status"` - TxStartTime int64 `json:"txStartTime"` - TxEndTime int64 `json:"txEndTime"` - sqlOrigin []string `json:"sqlOrigin"` - Txs []TxDetail `json:"txs"` - validSchemaCount int + GTID string `json:"gtid"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + Size int `json:"size"` + RowsCount int `json:"rowsCount"` + Status uint8 `json:"status"` + TxStartTime int64 `json:"txStartTime"` + TxEndTime int64 `json:"txEndTime"` + sqlOrigin []string `json:"sqlOrigin"` + Txs []TxDetail `json:"txs"` + dmlEventCount int } func (t Transaction) GetSqlOrigin() []string { return t.sqlOrigin } -func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { - return parseOneBinlog(path, fx) -} -func parseOneBinlog(path string, fx func(Transaction) bool) error { - if !staros.Exists(path) { - return os.ErrNotExist - } - f, err := os.Open(path) - if f != nil { - defer f.Close() - } - if err != nil { - return err - } - - fileTypeBytes := int64(4) - - b := make([]byte, fileTypeBytes) - // 读取binlog头 - if _, err = f.Read(b); err != nil { - return err - } else if !bytes.Equal(b, replication.BinLogFileHeader) { - //不是binlog格式 - return err - } - // must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped - if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil { - return err - } - return parseBinlogDetail(f, fx) -} - -func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { - parse := replication.NewBinlogParser() - parse.SetParseTime(false) - parse.SetUseDecimal(false) - // process: 0, continue: 1, break: 2, EOF: 3 - var ( - err error - n int64 - tbMapPos uint32 = 0 - ) - var tx Transaction - currentGtid := "" - for { - headBuf := make([]byte, replication.EventHeaderSize) - if _, err = io.ReadFull(r, headBuf); err == io.EOF { - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ - } - tx.RowsCount += v.RowCount - tx.Txs[k] = v - } - tx.Size = tx.EndPos - tx.StartPos - if f != nil { - f(tx) - } - return nil - } else if err != nil { - return err - } - - var h *replication.EventHeader - h, err = parse.ParseHeader(headBuf) - if err != nil { - return err - } - //fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT)) - - if h.EventSize <= uint32(replication.EventHeaderSize) { - err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) - return err - } - - var buf bytes.Buffer - if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { - err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) - return err - } - - //h.Dump(os.Stdout) - - data := buf.Bytes() - var rawData []byte - rawData = append(rawData, headBuf...) - rawData = append(rawData, data...) - - eventLen := int(h.EventSize) - replication.EventHeaderSize - - if len(data) != eventLen { - err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) - return err - } - - var e replication.Event - e, err = parse.ParseEvent(h, data, rawData) - if err != nil { - return err - } - if h.EventType == replication.TABLE_MAP_EVENT { - tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event - } - - //e.Dump(os.Stdout) - - //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} - binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data - - evs := ParseBinlogEvent(binEvent) - for _, ev := range evs { - startPos := 0 - if ev.Type == "query" || ev.Type == "gtid" { - startPos = int(h.LogPos - h.EventSize) - //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } else { - startPos = int(tbMapPos) - //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } - switch ev.Type { - case "gtid": - if currentGtid != "" { - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ - } - tx.RowsCount += v.RowCount - tx.Txs[k] = v - } - tx.Size = tx.EndPos - tx.StartPos - if f != nil { - if !f(tx) { - return nil - } - } - } - currentGtid = ev.Data - tx = Transaction{ - GTID: ev.Data, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - } - case "": - tx.EndPos = int(h.LogPos) - continue - case "rowsquery": - tx.EndPos = int(h.LogPos) - tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) - default: - status := STATUS_PREPARE - if ev.Type == "query" { - switch strings.ToLower(ev.Data) { - case "begin": - status = STATUS_BEGIN - case "commit": - status = STATUS_COMMIT - case "rollback": - status = STATUS_ROLLBACK - } - tx.Status = status - } - tx.EndPos = int(h.LogPos) - tx.Txs = append(tx.Txs, TxDetail{ - StartPos: startPos, - EndPos: int(h.LogPos), - Db: ev.DB, - Table: ev.TB, - Sql: ev.Data, - SqlType: ev.Type, - Rows: ev.Rows, - RowCount: int(ev.RowCnt), - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - CompressionType: ev.CompressionType, - }) - } - } - } - -} - -type BinlogEvent struct { - Type string - DB string - TB string - Data string - RowCnt uint32 - Rows [][]interface{} - CompressionType string -} - -func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { - var res []BinlogEvent - var sig BinlogEvent - switch ev.Header.EventType { - case replication.ANONYMOUS_GTID_EVENT: - //ge := ev.Event.(*replication.GTIDEvent) - sig.Data = "anonymous-gtid-event:1" - sig.Type = "gtid" - case replication.WRITE_ROWS_EVENTv1, - replication.WRITE_ROWS_EVENTv2: - wrEvent := ev.Event.(*replication.RowsEvent) - sig.DB = string(wrEvent.Table.Schema) - sig.TB = string(wrEvent.Table.Table) - sig.Type = "insert" - sig.RowCnt = uint32(len(wrEvent.Rows)) - sig.Rows = wrEvent.Rows - case replication.UPDATE_ROWS_EVENTv1, - replication.UPDATE_ROWS_EVENTv2: - wrEvent := ev.Event.(*replication.RowsEvent) - sig.DB = string(wrEvent.Table.Schema) - sig.TB = string(wrEvent.Table.Table) - sig.Type = "update" - sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 - sig.Rows = wrEvent.Rows - case replication.DELETE_ROWS_EVENTv1, - replication.DELETE_ROWS_EVENTv2: - - //replication.XID_EVENT, - //replication.TABLE_MAP_EVENT: - - wrEvent := ev.Event.(*replication.RowsEvent) - sig.DB = string(wrEvent.Table.Schema) - sig.TB = string(wrEvent.Table.Table) - sig.Type = "delete" - sig.RowCnt = uint32(len(wrEvent.Rows)) - sig.Rows = wrEvent.Rows - case replication.ROWS_QUERY_EVENT: - queryEvent := ev.Event.(*replication.RowsQueryEvent) - sig.Data = string(queryEvent.Query) - sig.Type = "rowsquery" - case replication.QUERY_EVENT: - queryEvent := ev.Event.(*replication.QueryEvent) - sig.DB = string(queryEvent.Schema) - sig.Data = string(queryEvent.Query) - sig.Type = "query" - - case replication.MARIADB_GTID_EVENT: - // For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl). - //https://mariadb.com/kb/en/library/gtid_event/ - sig.Data = "begin" - sig.Type = "query" - - case replication.XID_EVENT: - // XID_EVENT represents commit。rollback transaction not in binlog - sig.Data = "commit" - sig.Type = "query" - case replication.GTID_EVENT: - ge := ev.Event.(*replication.GTIDEvent) - gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) - if err == nil { - sig.Data = gid.String() - } - sig.Type = "gtid" - case replication.TRANSACTION_PAYLOAD_EVENT: - ge := ev.Event.(*replication.TransactionPayloadEvent) - for _, val := range ge.Events { - res = append(res, ParseBinlogEvent(val)...) - } - for idx := range res { - if ge.CompressionType == 0 { - res[idx].CompressionType = "ZSTD" - } else if ge.CompressionType != 255 { - res[idx].CompressionType = "UNKNOWN" - } - } - return res - } - res = append(res, sig) - return res -} - type BinlogFilter struct { IncludeGtid string ExcludeGtid string @@ -352,307 +89,244 @@ type BinlogFilter struct { IncludeBlank bool } -func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { - var subGtid, inGtid, exGtid *gtid.Gtid - var err error - var includeMap = make(map[string]bool) - var excludeMap = make(map[string]bool) - if len(filter.IncludeTables) != 0 { - for _, v := range filter.IncludeTables { - if len(strings.Split(v, ".")) != 2 { - return fmt.Errorf("IncludeTable Name Is Invalid:%s", v) - } - includeMap[v] = true +type BinlogEvent struct { + Type string + DB string + TB string + Data string + RowCnt uint32 + Rows [][]interface{} + CompressionType string +} + +type tableMatcher struct { + exactMatch map[string]bool + dbWildcard map[string]bool + tbWildcard map[string]bool + matchAll bool +} + +func (m *tableMatcher) match(db, tb string) bool { + if m.matchAll { + return true + } + if m.exactMatch[db+"."+tb] { + return true + } + if m.dbWildcard[db] { + return true + } + if m.tbWildcard[tb] { + return true + } + return false +} + +var bodyBufPool = sync.Pool{ + New: func() any { + b := make([]byte, 0, 64*1024) + return &b + }, +} + +func getBodyBuf(n int) []byte { + p := bodyBufPool.Get().(*[]byte) + if cap(*p) < n { + b := make([]byte, n) + return b + } + return (*p)[:n] +} + +func putBodyBuf(b []byte) { + if cap(b) > maxPooledBodyCap { + return + } + b = b[:0] + bodyBufPool.Put(&b) +} + +func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { + return parseOneBinlog(path, fx) +} + +func parseOneBinlog(path string, fx func(Transaction) bool) error { + if !staros.Exists(path) { + return os.ErrNotExist + } + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + if err := validateBinlogHeader(f); err != nil { + return err + } + + br := bufio.NewReaderSize(f, defaultReadBufSize) + return parseBinlogDetail(br, fx) +} + +func validateBinlogHeader(f *os.File) error { + const fileTypeBytes = int64(4) + b := make([]byte, fileTypeBytes) + + if _, err := f.Read(b); err != nil { + return fmt.Errorf("read binlog header failed: %w", err) + } + if !bytes.Equal(b, replication.BinLogFileHeader) { + return ErrInvalidBinlogHeader + } + if _, err := f.Seek(fileTypeBytes, io.SeekStart); err != nil { + return fmt.Errorf("seek after header failed: %w", err) + } + return nil +} + +func readEventHeader(r io.Reader, parser *replication.BinlogParser, headBuf []byte) (*replication.EventHeader, error) { + if _, err := io.ReadFull(r, headBuf); err != nil { + return nil, err + } + h, err := parser.ParseHeader(headBuf) + if err != nil { + return nil, fmt.Errorf("parse header failed: %w", err) + } + if h.EventSize <= uint32(replication.EventHeaderSize) { + return nil, fmt.Errorf("%w: event size is %d", ErrEventTooSmall, h.EventSize) + } + return h, nil +} + +func readEventBody(r io.Reader, h *replication.EventHeader) ([]byte, error) { + bodyLen := int(h.EventSize) - replication.EventHeaderSize + body := getBodyBuf(bodyLen) + if _, err := io.ReadFull(r, body); err != nil { + putBodyBuf(body) + return nil, fmt.Errorf("read event body failed: %w (need %d bytes)", err, bodyLen) + } + return body, nil +} + +func skipEventBody(r io.Reader, h *replication.EventHeader) error { + bodyLen := int64(h.EventSize) - int64(replication.EventHeaderSize) + if bodyLen <= 0 { + return nil + } + if _, err := io.CopyN(io.Discard, r, bodyLen); err != nil { + return fmt.Errorf("skip event body failed: %w", err) + } + return nil +} + +func parseEvent(parser *replication.BinlogParser, h *replication.EventHeader, body []byte) (replication.Event, error) { + e, err := parser.ParseEvent(h, body, nil) + if err != nil { + return nil, fmt.Errorf("parse event failed at pos %d: %w", h.LogPos, err) + } + return e, nil +} + +func finalizeTx(tx *Transaction, onlyShowGtid bool) { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } + tx.RowsCount += v.RowCount + tx.Txs[k] = v + } + + if onlyShowGtid { + tx.Size = 0 + } else { + tx.Size = tx.EndPos - tx.StartPos + } +} + +func fillTimeLazy(tx *Transaction) { + if tx.Timestamp != 0 && tx.Time.IsZero() { + tx.Time = time.Unix(tx.Timestamp, 0) + } + for i := range tx.Txs { + if tx.Txs[i].Timestamp != 0 && tx.Txs[i].Time.IsZero() { + tx.Txs[i].Time = time.Unix(tx.Txs[i].Timestamp, 0) } } - if len(filter.ExcludeTables) != 0 { - for _, v := range filter.ExcludeTables { - if len(strings.Split(v, ".")) != 2 { - return fmt.Errorf("ExcludeTable Name Is Invalid:%s", v) - } - excludeMap[v] = true - } - } - matchInclude := func(db, tb string) bool { - if len(filter.IncludeTables) == 0 { - return false - } - if db == "" && tb == "" { - if filter.IncludeBlank { - return true - } - return false - } - if _, ok := includeMap["*.*"]; ok { - return true - } - if _, ok := includeMap[db+"."+tb]; ok { - return true - } - if _, ok := includeMap[db+".*"]; ok { - return true - } - if _, ok := includeMap["*."+tb]; ok { - return true - } - return false - } - matchExclude := func(db, tb string) bool { - if len(filter.ExcludeTables) == 0 { - return false - } - if db == "" && tb == "" { - if filter.ExcludeBlank { - return true - } - return false - } - if _, ok := excludeMap["*.*"]; ok { - return true - } - if _, ok := excludeMap[db+"."+tb]; ok { - return true - } - if _, ok := excludeMap[db+".*"]; ok { - return true - } - if _, ok := excludeMap["*."+tb]; ok { - return true - } - return false - } - if filter.IncludeGtid != "" { - inGtid, err = gtid.Parse(filter.IncludeGtid) - if err != nil { - return err - } - subGtid = inGtid.Clone() - } - if filter.ExcludeGtid != "" { - exGtid, err = gtid.Parse(filter.ExcludeGtid) - if err != nil { - return err - } - } - // process: 0, continue: 1, break: 2, EOF: 3 +} + +func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { + parser := replication.NewBinlogParser() + parser.SetParseTime(false) + parser.SetUseDecimal(false) + var ( - n int64 - tbMapPos uint32 = 0 - skipTillNext bool = false + tbMapPos uint32 + tx Transaction + headBuf = make([]byte, replication.EventHeaderSize) ) - var tx Transaction - currentGtid := "" - callFn := func(tx Transaction) bool { - if fn == nil { - return true - } - if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { - return true - } - if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { - return true - } - if filter.StartPos != 0 && filter.StartPos > tx.StartPos { - return true - } - if filter.EndPos != 0 && filter.EndPos < tx.EndPos { - return true - } - if filter.BigThan != 0 && filter.BigThan > tx.Size { - return true - } - if filter.SmallThan != 0 && filter.SmallThan < tx.Size { - return true - } - if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.validSchemaCount == 0 { - return true - } - var txs []TxDetail - var matched bool - for _, t := range tx.Txs { - if len(includeMap) != 0 { - if matchInclude(t.Db, t.Table) { - matched = true - if filter.PickTxAllIfMatch { - return fn(tx) - } - txs = append(txs, t) - } - } - if len(excludeMap) != 0 { - if matchExclude(t.Db, t.Table) { - matched = true - if filter.PickTxAllIfMatch { - return true - } - } else { - txs = append(txs, t) - } - } - } - if matched { - tx.Txs = txs - } - if !matched && len(filter.IncludeTables) > 0 { - return true - } - if len(tx.Txs) == 0 && matched { - return true - } - return fn(tx) - } + for { - headBuf := make([]byte, replication.EventHeaderSize) - if _, err = io.ReadFull(r, headBuf); err == io.EOF { - if tx.Time.IsZero() { - return nil - } - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ + h, err := readEventHeader(r, parser, headBuf) + if err == io.EOF { + if currentGtid != "" { + finalizeTx(&tx, false) + fillTimeLazy(&tx) + if f != nil { + f(tx) } - tx.RowsCount += v.RowCount - tx.Txs[k] = v } - if filter.OnlyShowGtid { - tx.EndPos = tx.StartPos - } - tx.Size = tx.EndPos - tx.StartPos - callFn(tx) return nil - } else if err != nil { - return err } - var h *replication.EventHeader - h, err = parse.ParseHeader(headBuf) if err != nil { return err } - //fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT)) - if h.EventSize <= uint32(replication.EventHeaderSize) { - err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) - return err - } - var buf bytes.Buffer - if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { - err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) - return err - } - if skipTillNext && h.EventType != replication.GTID_EVENT { - continue - } - if filter.OnlyShowGtid && h.EventType != replication.GTID_EVENT { - continue - } - //h.Dump(os.Stdout) - - data := buf.Bytes() - var rawData []byte - rawData = append(rawData, headBuf...) - rawData = append(rawData, data...) - - eventLen := int(h.EventSize) - replication.EventHeaderSize - - if len(data) != eventLen { - err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) - return err - } - - var e replication.Event - e, err = parse.ParseEvent(h, data, rawData) + body, err := readEventBody(r, h) if err != nil { return err } + + e, err := parseEvent(parser, h, body) + putBodyBuf(body) + if err != nil { + return err + } + if h.EventType == replication.TABLE_MAP_EVENT { - tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event + tbMapPos = h.LogPos - h.EventSize } - //e.Dump(os.Stdout) - - //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} - binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data - - evs := ParseBinlogEvent(binEvent) + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) for _, ev := range evs { startPos := 0 if ev.Type == "query" || ev.Type == "gtid" { startPos = int(h.LogPos - h.EventSize) - //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } else { startPos = int(tbMapPos) - //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } - if filter.EndPos != 0 && startPos > filter.EndPos { - skipTillNext = true - continue - } - if filter.StartPos != 0 && startPos < filter.EndPos { - skipTillNext = true - continue } + switch ev.Type { case "gtid": - if skipTillNext { - skipTillNext = false - } if currentGtid != "" { - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ - } - tx.RowsCount += v.RowCount - tx.Txs[k] = v - } - if filter.OnlyShowGtid { - tx.EndPos = startPos - 1 - } - tx.Size = tx.EndPos - tx.StartPos - if !callFn(tx) { + finalizeTx(&tx, false) + fillTimeLazy(&tx) + if f != nil && !f(tx) { return nil } - if subGtid != nil { - subGtid.Sub(tx.GTID) - if subGtid.EventCount() == 0 { - return nil - } - } - tx = Transaction{} } currentGtid = ev.Data - if inGtid != nil { - if c, _ := inGtid.Contain(ev.Data); !c { - tx = Transaction{} - currentGtid = "" - skipTillNext = true - continue - } - } - if exGtid != nil { - if c, _ := exGtid.Contain(ev.Data); c { - currentGtid = "" - skipTillNext = true - continue - } - } tx = Transaction{ GTID: ev.Data, StartPos: startPos, Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), + Txs: make([]TxDetail, 0, 8), + sqlOrigin: make([]string, 0, 4), } case "": tx.EndPos = int(h.LogPos) - continue case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) @@ -676,7 +350,7 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter tx.Status = status } if ev.DB != "" && ev.TB != "" { - tx.validSchemaCount++ + tx.dmlEventCount++ } tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, @@ -688,7 +362,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter Rows: ev.Rows, RowCount: int(ev.RowCnt), Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), CompressionType: ev.CompressionType, }) } @@ -696,88 +369,555 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter } } +func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { + var res []BinlogEvent + var sig BinlogEvent + + switch ev.Header.EventType { + case replication.ANONYMOUS_GTID_EVENT: + sig.Data = "anonymous-gtid-event:1" + sig.Type = "gtid" + + case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + wrEvent, ok := ev.Event.(*replication.RowsEvent) + if !ok { + return res + } + sig.DB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "insert" + sig.RowCnt = uint32(len(wrEvent.Rows)) + sig.Rows = wrEvent.Rows + + case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + wrEvent, ok := ev.Event.(*replication.RowsEvent) + if !ok { + return res + } + sig.DB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "update" + sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 + sig.Rows = wrEvent.Rows + + case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + wrEvent, ok := ev.Event.(*replication.RowsEvent) + if !ok { + return res + } + sig.DB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "delete" + sig.RowCnt = uint32(len(wrEvent.Rows)) + sig.Rows = wrEvent.Rows + + case replication.ROWS_QUERY_EVENT: + queryEvent, ok := ev.Event.(*replication.RowsQueryEvent) + if !ok { + return res + } + sig.Data = string(queryEvent.Query) + sig.Type = "rowsquery" + + case replication.QUERY_EVENT: + queryEvent, ok := ev.Event.(*replication.QueryEvent) + if !ok { + return res + } + sig.DB = string(queryEvent.Schema) + sig.Data = string(queryEvent.Query) + sig.Type = "query" + + case replication.MARIADB_GTID_EVENT: + sig.Data = "begin" + sig.Type = "query" + + case replication.XID_EVENT: + sig.Data = "commit" + sig.Type = "query" + + case replication.GTID_EVENT: + ge, ok := ev.Event.(*replication.GTIDEvent) + if !ok { + return res + } + gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) + if err != nil { + sig.Data = fmt.Sprintf("invalid-gtid:%s:%d", bytesToUuid(ge.SID), ge.GNO) + } else { + sig.Data = gid.String() + } + sig.Type = "gtid" + + case replication.TRANSACTION_PAYLOAD_EVENT: + ge, ok := ev.Event.(*replication.TransactionPayloadEvent) + if !ok { + return res + } + for _, val := range ge.Events { + res = append(res, ParseBinlogEvent(val)...) + } + compressionType := getCompressionTypeName(ge.CompressionType) + for idx := range res { + res[idx].CompressionType = compressionType + } + return res + } + + res = append(res, sig) + return res +} + +func getCompressionTypeName(code uint64) string { + switch code { + case CompressionZSTD: + return "ZSTD" + case CompressionNone: + return "" + default: + return fmt.Sprintf("UNKNOWN(%d)", code) + } +} + func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error { - defer func() { - recover() - }() if !staros.Exists(path) { return os.ErrNotExist } + f, err := os.Open(path) - if f != nil { - defer f.Close() - } if err != nil { return err } - parse := replication.NewBinlogParser() - parse.SetParseTime(false) - parse.SetUseDecimal(false) - seekZore := func() error { - fileTypeBytes := int64(4) - b := make([]byte, fileTypeBytes) - // 读取binlog头 - if _, err = f.Read(b); err != nil { - return err - } else if !bytes.Equal(b, replication.BinLogFileHeader) { - //不是binlog格式 - return err - } - // must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped - if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil { - return err - } - return nil - } + defer f.Close() + + parser := replication.NewBinlogParser() + parser.SetParseTime(false) + parser.SetUseDecimal(false) + if pos != 0 { - if err = seekZore(); err != nil { + if err := seekToPosition(f, parser, pos); err != nil { return err } - for { - headBuf := make([]byte, replication.EventHeaderSize) - if _, err = io.ReadFull(f, headBuf); err != nil { - return err - } - var h *replication.EventHeader - h, err = parse.ParseHeader(headBuf) - if err != nil { - return err - } - if h.EventSize <= uint32(replication.EventHeaderSize) { - err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) - return err - } - var buf bytes.Buffer - if n, err := io.CopyN(&buf, f, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { - err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) - return err - } - data := buf.Bytes() - var rawData []byte - rawData = append(rawData, headBuf...) - rawData = append(rawData, data...) - eventLen := int(h.EventSize) - replication.EventHeaderSize - if len(data) != eventLen { - err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) - return err - } - _, err = parse.ParseEvent(h, data, rawData) - if err != nil { - return err - } - if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { - break - } - } - if _, err = f.Seek(pos, 0); err != nil { + } else { + if err := validateBinlogHeader(f); err != nil { return err } } - if pos == 0 { - if err = seekZore(); err != nil { + br := bufio.NewReaderSize(f, defaultReadBufSize) + return parseBinlogWithFilter(br, parser, filter, fx) +} + +func seekToPosition(f *os.File, parser *replication.BinlogParser, pos int64) error { + if err := validateBinlogHeader(f); err != nil { + return err + } + + headBuf := make([]byte, replication.EventHeaderSize) + for { + h, err := readEventHeader(f, parser, headBuf) + if err != nil { + return fmt.Errorf("seek to position failed: %w", err) + } + body, err := readEventBody(f, h) + if err != nil { return err } + _, err = parseEvent(parser, h, body) + putBodyBuf(body) + if err != nil { + return err + } + if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { + break + } } - return parseBinlogWithFilter(f, parse, filter, fx) + + if _, err := f.Seek(pos, io.SeekStart); err != nil { + return fmt.Errorf("seek to pos %d failed: %w", pos, err) + } + return nil +} + +func parseBinlogWithFilter(r io.Reader, parser *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { + var subGtid, inGtid, exGtid *gtid.Gtid + var err error + + includeMatcher, excludeMatcher := prepareTableMatchers(filter) + + if filter.IncludeGtid != "" { + inGtid, err = gtid.Parse(filter.IncludeGtid) + if err != nil { + return fmt.Errorf("parse include gtid failed: %w", err) + } + subGtid = inGtid.Clone() + } + if filter.ExcludeGtid != "" { + exGtid, err = gtid.Parse(filter.ExcludeGtid) + if err != nil { + return fmt.Errorf("parse exclude gtid failed: %w", err) + } + } + + var ( + tbMapPos uint32 + skipCurrentTxn bool + tx Transaction + headBuf = make([]byte, replication.EventHeaderSize) + ) + currentGtid := "" + + callFn := func(tx Transaction) bool { + if fn == nil { + return true + } + + fillTimeLazy(&tx) + + if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { + return true + } + if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { + return true + } + if filter.StartPos != 0 && filter.StartPos > tx.StartPos { + return true + } + if filter.EndPos != 0 && filter.EndPos < tx.EndPos { + return true + } + if filter.BigThan != 0 && filter.BigThan > tx.Size { + return true + } + if filter.SmallThan != 0 && filter.SmallThan < tx.Size { + return true + } + if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.dmlEventCount == 0 { + return true + } + + var txs []TxDetail + var matched bool + + for _, t := range tx.Txs { + includeMatch := includeMatcher != nil && includeMatcher.match(t.Db, t.Table) + excludeMatch := excludeMatcher != nil && excludeMatcher.match(t.Db, t.Table) + + if t.Db == "" && t.Table == "" { + if includeMatcher != nil && !filter.IncludeBlank { + continue + } + if excludeMatcher != nil && filter.ExcludeBlank { + matched = true + if filter.PickTxAllIfMatch { + return true + } + continue + } + } + + if includeMatcher != nil { + if includeMatch { + matched = true + if filter.PickTxAllIfMatch { + return fn(tx) + } + txs = append(txs, t) + } + } else if excludeMatcher != nil { + if excludeMatch { + matched = true + if filter.PickTxAllIfMatch { + return true + } + } else { + txs = append(txs, t) + } + } else { + txs = append(txs, t) + } + } + + if matched { + tx.Txs = txs + } + if !matched && includeMatcher != nil { + return true + } + if len(tx.Txs) == 0 && matched { + return true + } + return fn(tx) + } + + for { + h, err := readEventHeader(r, parser, headBuf) + if err == io.EOF { + if !tx.Time.IsZero() || tx.Timestamp != 0 { + finalizeTx(&tx, filter.OnlyShowGtid) + callFn(tx) + } + return nil + } + if err != nil { + return err + } + + // GTID-only fast path + if filter.OnlyShowGtid { + if h.EventType != replication.GTID_EVENT && h.EventType != replication.ANONYMOUS_GTID_EVENT { + if err := skipEventBody(r, h); err != nil { + return err + } + continue + } + + body, err := readEventBody(r, h) + if err != nil { + return err + } + e, err := parseEvent(parser, h, body) + putBodyBuf(body) + if err != nil { + return err + } + + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + for _, ev := range evs { + if ev.Type != "gtid" { + continue + } + startPos := int(h.LogPos - h.EventSize) + + if filter.EndPos != 0 && startPos > filter.EndPos { + continue + } + if filter.StartPos != 0 && startPos < filter.StartPos { + continue + } + + if currentGtid != "" { + tx.EndPos = startPos - 1 + finalizeTx(&tx, true) + if !callFn(tx) { + return nil + } + if subGtid != nil { + if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { + return nil + } + } + tx = Transaction{} + } + + currentGtid = ev.Data + + if inGtid != nil { + if c, _ := inGtid.Contain(ev.Data); !c { + tx = Transaction{} + currentGtid = "" + continue + } + } + if exGtid != nil { + if c, _ := exGtid.Contain(ev.Data); c { + currentGtid = "" + tx = Transaction{} + continue + } + } + + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + EndPos: startPos, + Timestamp: int64(h.Timestamp), + } + } + continue + } + + // 先处理GTID事件(决定当前事务是否命中) + if h.EventType == replication.GTID_EVENT || h.EventType == replication.ANONYMOUS_GTID_EVENT { + body, err := readEventBody(r, h) + if err != nil { + return err + } + e, err := parseEvent(parser, h, body) + putBodyBuf(body) + if err != nil { + return err + } + + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + for _, ev := range evs { + if ev.Type != "gtid" { + continue + } + startPos := int(h.LogPos - h.EventSize) + + if currentGtid != "" { + finalizeTx(&tx, false) + if !callFn(tx) { + return nil + } + if subGtid != nil { + if err := subGtid.Sub(tx.GTID); err == nil && subGtid.EventCount() == 0 { + return nil + } + } + tx = Transaction{} + } + + currentGtid = ev.Data + skipCurrentTxn = false + + if filter.EndPos != 0 && startPos > filter.EndPos { + skipCurrentTxn = true + } + if filter.StartPos != 0 && startPos < filter.StartPos { + skipCurrentTxn = true + } + if inGtid != nil { + if c, _ := inGtid.Contain(ev.Data); !c { + skipCurrentTxn = true + } + } + if exGtid != nil { + if c, _ := exGtid.Contain(ev.Data); c { + skipCurrentTxn = true + } + } + + if !skipCurrentTxn { + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + Txs: make([]TxDetail, 0, 8), + sqlOrigin: make([]string, 0, 4), + } + } else { + tx = Transaction{} + } + } + continue + } + + // 未命中事务:零解析到底 + if skipCurrentTxn { + if err := skipEventBody(r, h); err != nil { + return err + } + continue + } + + body, err := readEventBody(r, h) + if err != nil { + return err + } + e, err := parseEvent(parser, h, body) + putBodyBuf(body) + if err != nil { + return err + } + + if h.EventType == replication.TABLE_MAP_EVENT { + tbMapPos = h.LogPos - h.EventSize + } + + evs := ParseBinlogEvent(&replication.BinlogEvent{Header: h, Event: e}) + for _, ev := range evs { + startPos := 0 + if ev.Type == "query" || ev.Type == "gtid" { + startPos = int(h.LogPos - h.EventSize) + } else { + startPos = int(tbMapPos) + } + + switch ev.Type { + case "": + tx.EndPos = int(h.LogPos) + + case "rowsquery": + tx.EndPos = int(h.LogPos) + tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) + + default: + tx.EndPos = int(h.LogPos) + status := STATUS_PREPARE + if ev.Type == "query" { + switch strings.ToLower(ev.Data) { + case "begin": + if tx.TxStartTime == 0 { + tx.TxStartTime = int64(h.Timestamp) + } + status = STATUS_BEGIN + case "commit": + status = STATUS_COMMIT + tx.TxEndTime = int64(h.Timestamp) + case "rollback": + status = STATUS_ROLLBACK + tx.TxEndTime = int64(h.Timestamp) + } + tx.Status = status + } + if ev.DB != "" && ev.TB != "" { + tx.dmlEventCount++ + } + tx.Txs = append(tx.Txs, TxDetail{ + StartPos: startPos, + EndPos: int(h.LogPos), + Db: ev.DB, + Table: ev.TB, + Sql: ev.Data, + SqlType: ev.Type, + Rows: ev.Rows, + RowCount: int(ev.RowCnt), + Timestamp: int64(h.Timestamp), + CompressionType: ev.CompressionType, + }) + } + } + } +} + +func prepareTableMatchers(filter BinlogFilter) (includeMatcher, excludeMatcher *tableMatcher) { + if len(filter.IncludeTables) > 0 { + includeMatcher = buildTableMatcher(filter.IncludeTables) + } + if len(filter.ExcludeTables) > 0 { + excludeMatcher = buildTableMatcher(filter.ExcludeTables) + } + return includeMatcher, excludeMatcher +} + +func buildTableMatcher(patterns []string) *tableMatcher { + m := &tableMatcher{ + exactMatch: make(map[string]bool), + dbWildcard: make(map[string]bool), + tbWildcard: make(map[string]bool), + } + + for _, pattern := range patterns { + if pattern == "*.*" { + m.matchAll = true + continue + } + parts := strings.Split(pattern, ".") + if len(parts) != 2 { + continue + } + db, tb := parts[0], parts[1] + if db == "*" && tb == "*" { + m.matchAll = true + } else if db == "*" { + m.tbWildcard[tb] = true + } else if tb == "*" { + m.dbWildcard[db] = true + } else { + m.exactMatch[pattern] = true + } + } + return m }