|
|
package client
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
"math"
|
|
|
"math/rand"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
"github.com/pingcap/errors"
|
|
|
)
|
|
|
|
|
|
/*
|
|
|
Pool for efficient reuse of connections.
|
|
|
|
|
|
Usage:
|
|
|
pool := client.NewPool(log.Debugf, 100, 400, 5, `127.0.0.1:3306`, `username`, `userpwd`, `dbname`)
|
|
|
...
|
|
|
conn, _ := pool.GetConn(ctx)
|
|
|
defer pool.PutConn(conn)
|
|
|
conn.Execute/conn.Begin/etc...
|
|
|
*/
|
|
|
|
|
|
type (
|
|
|
Timestamp int64
|
|
|
|
|
|
LogFunc func(format string, args ...interface{})
|
|
|
|
|
|
Pool struct {
|
|
|
logFunc LogFunc
|
|
|
minAlive int
|
|
|
maxAlive int
|
|
|
maxIdle int
|
|
|
idleCloseTimeout Timestamp
|
|
|
idlePingTimeout Timestamp
|
|
|
connect func() (*Conn, error)
|
|
|
|
|
|
synchro struct {
|
|
|
sync.Mutex
|
|
|
idleConnections []Connection
|
|
|
stats ConnectionStats
|
|
|
}
|
|
|
|
|
|
readyConnection chan Connection
|
|
|
}
|
|
|
|
|
|
ConnectionStats struct {
|
|
|
// Uses internally
|
|
|
TotalCount int
|
|
|
|
|
|
// Only for stats
|
|
|
IdleCount int
|
|
|
CreatedCount int64
|
|
|
}
|
|
|
|
|
|
Connection struct {
|
|
|
conn *Conn
|
|
|
lastUseAt Timestamp
|
|
|
}
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
// MaxIdleTimeoutWithoutPing - If the connection has been idle for more than this time,
|
|
|
// then ping will be performed before use to check if it alive
|
|
|
MaxIdleTimeoutWithoutPing = 10 * time.Second
|
|
|
|
|
|
// DefaultIdleTimeout - If the connection has been idle for more than this time,
|
|
|
// we can close it (but we should remember about Pool.minAlive)
|
|
|
DefaultIdleTimeout = 30 * time.Second
|
|
|
|
|
|
// MaxNewConnectionAtOnce - If we need to create new connections,
|
|
|
// then we will create no more than this number of connections at a time.
|
|
|
// This restriction will be ignored on pool initialization.
|
|
|
MaxNewConnectionAtOnce = 5
|
|
|
)
|
|
|
|
|
|
// NewPool initializes new connection pool and uses params: addr, user, password, dbName and options.
|
|
|
// minAlive specifies the minimum number of open connections that the pool will try to maintain.
|
|
|
// maxAlive specifies the maximum number of open connections (for internal reasons,
|
|
|
// may be greater by 1 inside newConnectionProducer).
|
|
|
// maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout).
|
|
|
func NewPool(
|
|
|
logFunc LogFunc,
|
|
|
minAlive int,
|
|
|
maxAlive int,
|
|
|
maxIdle int,
|
|
|
addr string,
|
|
|
user string,
|
|
|
password string,
|
|
|
dbName string,
|
|
|
options ...func(conn *Conn),
|
|
|
) *Pool {
|
|
|
if minAlive > maxAlive {
|
|
|
minAlive = maxAlive
|
|
|
}
|
|
|
if maxIdle > maxAlive {
|
|
|
maxIdle = maxAlive
|
|
|
}
|
|
|
if maxIdle <= minAlive {
|
|
|
maxIdle = minAlive
|
|
|
}
|
|
|
|
|
|
pool := &Pool{
|
|
|
logFunc: logFunc,
|
|
|
minAlive: minAlive,
|
|
|
maxAlive: maxAlive,
|
|
|
maxIdle: maxIdle,
|
|
|
|
|
|
idleCloseTimeout: Timestamp(math.Ceil(DefaultIdleTimeout.Seconds())),
|
|
|
idlePingTimeout: Timestamp(math.Ceil(MaxIdleTimeoutWithoutPing.Seconds())),
|
|
|
|
|
|
connect: func() (*Conn, error) {
|
|
|
return Connect(addr, user, password, dbName, options...)
|
|
|
},
|
|
|
|
|
|
readyConnection: make(chan Connection),
|
|
|
}
|
|
|
|
|
|
pool.synchro.idleConnections = make([]Connection, 0, pool.maxIdle)
|
|
|
|
|
|
go pool.newConnectionProducer()
|
|
|
|
|
|
if pool.minAlive > 0 {
|
|
|
pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, pool.minAlive)
|
|
|
pool.startNewConnections(pool.minAlive)
|
|
|
}
|
|
|
|
|
|
go pool.closeOldIdleConnections()
|
|
|
|
|
|
return pool
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) GetStats(stats *ConnectionStats) {
|
|
|
pool.synchro.Lock()
|
|
|
|
|
|
*stats = pool.synchro.stats
|
|
|
|
|
|
stats.IdleCount = len(pool.synchro.idleConnections)
|
|
|
|
|
|
pool.synchro.Unlock()
|
|
|
}
|
|
|
|
|
|
// GetConn returns connection from the pool or create new
|
|
|
func (pool *Pool) GetConn(ctx context.Context) (*Conn, error) {
|
|
|
for {
|
|
|
connection, err := pool.getConnection(ctx)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
// For long time idle connections, we do a ping check
|
|
|
if delta := pool.nowTs() - connection.lastUseAt; delta > pool.idlePingTimeout {
|
|
|
if err := pool.ping(connection.conn); err != nil {
|
|
|
pool.closeConn(connection.conn)
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return connection.conn, nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// PutConn returns working connection back to pool
|
|
|
func (pool *Pool) PutConn(conn *Conn) {
|
|
|
pool.putConnection(Connection{
|
|
|
conn: conn,
|
|
|
lastUseAt: pool.nowTs(),
|
|
|
})
|
|
|
}
|
|
|
|
|
|
// DropConn closes the connection without any checks
|
|
|
func (pool *Pool) DropConn(conn *Conn) {
|
|
|
pool.closeConn(conn)
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) putConnection(connection Connection) {
|
|
|
pool.synchro.Lock()
|
|
|
defer pool.synchro.Unlock()
|
|
|
|
|
|
// If someone is already waiting for a connection, then we return it to him
|
|
|
select {
|
|
|
case pool.readyConnection <- connection:
|
|
|
return
|
|
|
default:
|
|
|
}
|
|
|
|
|
|
// Nobody needs this connection
|
|
|
|
|
|
pool.putConnectionUnsafe(connection)
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) nowTs() Timestamp {
|
|
|
return Timestamp(time.Now().Unix())
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) getConnection(ctx context.Context) (Connection, error) {
|
|
|
pool.synchro.Lock()
|
|
|
|
|
|
connection := pool.getIdleConnectionUnsafe()
|
|
|
if connection.conn != nil {
|
|
|
pool.synchro.Unlock()
|
|
|
return connection, nil
|
|
|
}
|
|
|
pool.synchro.Unlock()
|
|
|
|
|
|
// No idle connections are available
|
|
|
|
|
|
select {
|
|
|
case connection := <-pool.readyConnection:
|
|
|
return connection, nil
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
return Connection{}, ctx.Err()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) putConnectionUnsafe(connection Connection) {
|
|
|
if len(pool.synchro.idleConnections) == cap(pool.synchro.idleConnections) {
|
|
|
pool.synchro.stats.TotalCount--
|
|
|
_ = connection.conn.Close() // Could it be more effective to close older connections?
|
|
|
} else {
|
|
|
pool.synchro.idleConnections = append(pool.synchro.idleConnections, connection)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) newConnectionProducer() {
|
|
|
var connection Connection
|
|
|
var err error
|
|
|
|
|
|
for {
|
|
|
connection.conn = nil
|
|
|
|
|
|
pool.synchro.Lock()
|
|
|
|
|
|
connection = pool.getIdleConnectionUnsafe()
|
|
|
if connection.conn == nil {
|
|
|
if pool.synchro.stats.TotalCount >= pool.maxAlive {
|
|
|
// Can't create more connections
|
|
|
pool.synchro.Unlock()
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
continue
|
|
|
}
|
|
|
pool.synchro.stats.TotalCount++ // "Reserving" new connection
|
|
|
}
|
|
|
|
|
|
pool.synchro.Unlock()
|
|
|
|
|
|
if connection.conn == nil {
|
|
|
connection, err = pool.createNewConnection()
|
|
|
if err != nil {
|
|
|
pool.synchro.Lock()
|
|
|
pool.synchro.stats.TotalCount-- // Bad luck, should try again
|
|
|
pool.synchro.Unlock()
|
|
|
|
|
|
time.Sleep(time.Duration(10+rand.Intn(90)) * time.Millisecond)
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
|
|
|
pool.readyConnection <- connection
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) createNewConnection() (Connection, error) {
|
|
|
var connection Connection
|
|
|
var err error
|
|
|
|
|
|
connection.conn, err = pool.connect()
|
|
|
if err != nil {
|
|
|
return Connection{}, errors.Errorf(`Could not connect to mysql: %s`, err)
|
|
|
}
|
|
|
connection.lastUseAt = pool.nowTs()
|
|
|
|
|
|
pool.synchro.Lock()
|
|
|
pool.synchro.stats.CreatedCount++
|
|
|
pool.synchro.Unlock()
|
|
|
|
|
|
return connection, nil
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) getIdleConnectionUnsafe() Connection {
|
|
|
cnt := len(pool.synchro.idleConnections)
|
|
|
if cnt == 0 {
|
|
|
return Connection{}
|
|
|
}
|
|
|
|
|
|
last := cnt - 1
|
|
|
connection := pool.synchro.idleConnections[last]
|
|
|
pool.synchro.idleConnections[last].conn = nil
|
|
|
pool.synchro.idleConnections = pool.synchro.idleConnections[:last]
|
|
|
|
|
|
return connection
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) closeOldIdleConnections() {
|
|
|
var toPing []Connection
|
|
|
|
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
|
|
|
|
for range ticker.C {
|
|
|
toPing = pool.getOldIdleConnections(toPing[:0])
|
|
|
if len(toPing) == 0 {
|
|
|
continue
|
|
|
}
|
|
|
pool.recheckConnections(toPing)
|
|
|
|
|
|
if !pool.spawnConnectionsIfNeeded() {
|
|
|
pool.closeIdleConnectionsIfCan()
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) getOldIdleConnections(dst []Connection) []Connection {
|
|
|
dst = dst[:0]
|
|
|
|
|
|
pool.synchro.Lock()
|
|
|
|
|
|
synchro := &pool.synchro
|
|
|
|
|
|
idleCnt := len(synchro.idleConnections)
|
|
|
checkBefore := pool.nowTs() - pool.idlePingTimeout
|
|
|
|
|
|
for i := idleCnt - 1; i >= 0; i-- {
|
|
|
if synchro.idleConnections[i].lastUseAt > checkBefore {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
dst = append(dst, synchro.idleConnections[i])
|
|
|
|
|
|
last := idleCnt - 1
|
|
|
if i < last {
|
|
|
// Removing an item from the middle of a slice
|
|
|
synchro.idleConnections[i], synchro.idleConnections[last] = synchro.idleConnections[last], synchro.idleConnections[i]
|
|
|
}
|
|
|
|
|
|
synchro.idleConnections[last].conn = nil
|
|
|
synchro.idleConnections = synchro.idleConnections[:last]
|
|
|
idleCnt--
|
|
|
}
|
|
|
|
|
|
pool.synchro.Unlock()
|
|
|
|
|
|
return dst
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) recheckConnections(connections []Connection) {
|
|
|
const workerCnt = 2 // Heuristic :)
|
|
|
|
|
|
queue := make(chan Connection, len(connections))
|
|
|
for _, connection := range connections {
|
|
|
queue <- connection
|
|
|
}
|
|
|
close(queue)
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
wg.Add(workerCnt)
|
|
|
for worker := 0; worker < workerCnt; worker++ {
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
for connection := range queue {
|
|
|
if err := pool.ping(connection.conn); err != nil {
|
|
|
pool.closeConn(connection.conn)
|
|
|
} else {
|
|
|
pool.putConnection(connection)
|
|
|
}
|
|
|
}
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
// spawnConnectionsIfNeeded creates new connections if there are not enough of them and returns true in this case
|
|
|
func (pool *Pool) spawnConnectionsIfNeeded() bool {
|
|
|
pool.synchro.Lock()
|
|
|
totalCount := pool.synchro.stats.TotalCount
|
|
|
idleCount := len(pool.synchro.idleConnections)
|
|
|
needSpanNew := pool.minAlive - totalCount
|
|
|
pool.synchro.Unlock()
|
|
|
|
|
|
if needSpanNew <= 0 {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
// Не хватает соединений, нужно создать еще
|
|
|
|
|
|
if needSpanNew > MaxNewConnectionAtOnce {
|
|
|
needSpanNew = MaxNewConnectionAtOnce
|
|
|
}
|
|
|
|
|
|
pool.logFunc(`Pool: Setup %d new connections (total: %d idle: %d)...`, needSpanNew, totalCount, idleCount)
|
|
|
pool.startNewConnections(needSpanNew)
|
|
|
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) closeIdleConnectionsIfCan() {
|
|
|
pool.synchro.Lock()
|
|
|
|
|
|
canCloseCnt := pool.synchro.stats.TotalCount - pool.minAlive
|
|
|
canCloseCnt-- // -1 to account for an open but unused connection (pool.readyConnection <- connection in newConnectionProducer)
|
|
|
|
|
|
idleCnt := len(pool.synchro.idleConnections)
|
|
|
|
|
|
inFly := pool.synchro.stats.TotalCount - idleCnt
|
|
|
|
|
|
// We can close no more than 10% connections at a time, but at least 1, if possible
|
|
|
idleCanCloseCnt := idleCnt / 10
|
|
|
if idleCanCloseCnt == 0 {
|
|
|
idleCanCloseCnt = 1
|
|
|
}
|
|
|
if canCloseCnt > idleCanCloseCnt {
|
|
|
canCloseCnt = idleCanCloseCnt
|
|
|
}
|
|
|
if canCloseCnt <= 0 {
|
|
|
pool.synchro.Unlock()
|
|
|
return
|
|
|
}
|
|
|
|
|
|
closeFromIdx := idleCnt - canCloseCnt
|
|
|
if closeFromIdx < 0 {
|
|
|
// If there are enough requests in the "flight" now, then we can close all unnecessary
|
|
|
closeFromIdx = 0
|
|
|
}
|
|
|
|
|
|
toClose := append([]Connection{}, pool.synchro.idleConnections[closeFromIdx:]...)
|
|
|
|
|
|
for i := closeFromIdx; i < idleCnt; i++ {
|
|
|
pool.synchro.idleConnections[i].conn = nil
|
|
|
}
|
|
|
pool.synchro.idleConnections = pool.synchro.idleConnections[:closeFromIdx]
|
|
|
|
|
|
pool.synchro.Unlock()
|
|
|
|
|
|
pool.logFunc(`Pool: Close %d idle connections (in fly %d)`, len(toClose), inFly)
|
|
|
for _, connection := range toClose {
|
|
|
pool.closeConn(connection.conn)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) closeConn(conn *Conn) {
|
|
|
pool.synchro.Lock()
|
|
|
pool.synchro.stats.TotalCount--
|
|
|
pool.synchro.Unlock()
|
|
|
|
|
|
_ = conn.Close() // Closing is not an instant action, so do it outside the lock
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) startNewConnections(count int) {
|
|
|
connections := make([]Connection, 0, count)
|
|
|
for i := 0; i < count; i++ {
|
|
|
if conn, err := pool.createNewConnection(); err == nil {
|
|
|
pool.synchro.Lock()
|
|
|
pool.synchro.stats.TotalCount++
|
|
|
pool.synchro.Unlock()
|
|
|
connections = append(connections, conn)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
pool.synchro.Lock()
|
|
|
for _, connection := range connections {
|
|
|
pool.putConnectionUnsafe(connection)
|
|
|
}
|
|
|
pool.synchro.Unlock()
|
|
|
}
|
|
|
|
|
|
func (pool *Pool) ping(conn *Conn) error {
|
|
|
deadline := time.Now().Add(100 * time.Millisecond)
|
|
|
_ = conn.SetDeadline(deadline)
|
|
|
err := conn.Ping()
|
|
|
if err != nil {
|
|
|
pool.logFunc(`Pool: ping query fail: %s`, err.Error())
|
|
|
} else {
|
|
|
_ = conn.SetDeadline(time.Time{})
|
|
|
}
|
|
|
return err
|
|
|
}
|