starlog/rate_limit.go
2026-03-19 16:37:57 +08:00

562 lines
13 KiB
Go

package starlog
import (
"math"
"strconv"
"strings"
"sync"
"time"
)
type RateLimitScope int
const (
RateLimitScopeGlobal RateLimitScope = iota
RateLimitScopeByKey
)
type RateLimitDropPolicy int
const (
RateLimitDrop RateLimitDropPolicy = iota
RateLimitPassThrough
)
type RateLimitDropData struct {
Time time.Time
Key string
Reason string
Level int
LevelName string
LoggerName string
Message string
PassThrough bool
DroppedCount uint64
PassedThroughCount uint64
Suppressed uint64
Summary bool
SummarySuppressed uint64
}
type RateLimitStats struct {
Enabled bool
Rate float64
Burst int
Scope RateLimitScope
Allowed uint64
Dropped uint64
PassedThrough uint64
Suppressed uint64
Summaries uint64
LastDropTime time.Time
LastDropKey string
LastReason string
CurrentKeys int
}
type RateLimitConfig struct {
Enable bool
Levels []int
Rate float64
Burst int
Scope RateLimitScope
KeyFunc func(*Entry) string
MaxKeys int
KeyTTL time.Duration
DropPolicy RateLimitDropPolicy
OnDrop func(RateLimitDropData)
ExemptLevels []int
ExemptMatcher func(*Entry) bool
SummaryInterval time.Duration
}
type rateLimitBucket struct {
tokens float64
lastRefill time.Time
lastSeen time.Time
}
type rateLimiter struct {
mu sync.Mutex
cfg RateLimitConfig
limitedLevel map[int]struct{}
exemptLevel map[int]struct{}
buckets map[string]*rateLimitBucket
nowFunc func() time.Time
lastCleanupTime time.Time
lastSummaryTime time.Time
allowedCount uint64
droppedCount uint64
passedThroughCount uint64
suppressedCount uint64
summaryCount uint64
lastDropTime time.Time
lastDropKey string
lastDropReason string
}
func DefaultRateLimitConfig() RateLimitConfig {
return RateLimitConfig{
Enable: false,
Levels: nil,
Rate: 0,
Burst: 1,
Scope: RateLimitScopeGlobal,
KeyFunc: nil,
MaxKeys: 4096,
KeyTTL: 10 * time.Minute,
DropPolicy: RateLimitDrop,
OnDrop: nil,
ExemptLevels: []int{LvError, LvCritical, LvPanic, LvFatal},
ExemptMatcher: nil,
SummaryInterval: 0,
}
}
func cloneIntSlice(values []int) []int {
if len(values) == 0 {
return nil
}
result := make([]int, len(values))
copy(result, values)
return result
}
func cloneRateLimitConfig(cfg RateLimitConfig) RateLimitConfig {
cloned := cfg
cloned.Levels = cloneIntSlice(cfg.Levels)
cloned.ExemptLevels = cloneIntSlice(cfg.ExemptLevels)
return cloned
}
func normalizeRateLimitConfig(cfg RateLimitConfig) RateLimitConfig {
if cfg.Rate < 0 {
cfg.Rate = 0
}
if cfg.Burst <= 0 {
if cfg.Rate > 0 {
cfg.Burst = int(math.Ceil(cfg.Rate))
}
if cfg.Burst <= 0 {
cfg.Burst = 1
}
}
switch cfg.Scope {
case RateLimitScopeGlobal, RateLimitScopeByKey:
default:
cfg.Scope = RateLimitScopeGlobal
}
switch cfg.DropPolicy {
case RateLimitDrop, RateLimitPassThrough:
default:
cfg.DropPolicy = RateLimitDrop
}
if cfg.MaxKeys <= 0 {
cfg.MaxKeys = 4096
}
if cfg.KeyTTL <= 0 {
cfg.KeyTTL = 10 * time.Minute
}
if cfg.SummaryInterval < 0 {
cfg.SummaryInterval = 0
}
return cloneRateLimitConfig(cfg)
}
func buildLevelSet(levels []int) map[int]struct{} {
result := make(map[int]struct{}, len(levels))
for _, level := range levels {
result[level] = struct{}{}
}
return result
}
func newRateLimiter() *rateLimiter {
cfg := normalizeRateLimitConfig(DefaultRateLimitConfig())
return &rateLimiter{
cfg: cfg,
limitedLevel: buildLevelSet(cfg.Levels),
exemptLevel: buildLevelSet(cfg.ExemptLevels),
buckets: make(map[string]*rateLimitBucket),
nowFunc: time.Now,
}
}
func (limiter *rateLimiter) setNowFuncForTest(nowFunc func() time.Time) {
if limiter == nil {
return
}
limiter.mu.Lock()
if nowFunc == nil {
limiter.nowFunc = time.Now
} else {
limiter.nowFunc = nowFunc
}
limiter.mu.Unlock()
}
func (limiter *rateLimiter) now() time.Time {
if limiter == nil || limiter.nowFunc == nil {
return time.Now()
}
return limiter.nowFunc()
}
func (limiter *rateLimiter) SetConfig(cfg RateLimitConfig) {
if limiter == nil {
return
}
normalized := normalizeRateLimitConfig(cfg)
limiter.mu.Lock()
limiter.cfg = normalized
limiter.limitedLevel = buildLevelSet(normalized.Levels)
limiter.exemptLevel = buildLevelSet(normalized.ExemptLevels)
limiter.buckets = make(map[string]*rateLimitBucket)
limiter.lastCleanupTime = time.Time{}
limiter.lastSummaryTime = time.Time{}
limiter.mu.Unlock()
}
func (limiter *rateLimiter) Config() RateLimitConfig {
if limiter == nil {
return normalizeRateLimitConfig(DefaultRateLimitConfig())
}
limiter.mu.Lock()
defer limiter.mu.Unlock()
return cloneRateLimitConfig(limiter.cfg)
}
func (limiter *rateLimiter) Stats() RateLimitStats {
if limiter == nil {
return RateLimitStats{}
}
limiter.mu.Lock()
defer limiter.mu.Unlock()
return RateLimitStats{
Enabled: limiter.cfg.Enable,
Rate: limiter.cfg.Rate,
Burst: limiter.cfg.Burst,
Scope: limiter.cfg.Scope,
Allowed: limiter.allowedCount,
Dropped: limiter.droppedCount,
PassedThrough: limiter.passedThroughCount,
Suppressed: limiter.suppressedCount,
Summaries: limiter.summaryCount,
LastDropTime: limiter.lastDropTime,
LastDropKey: limiter.lastDropKey,
LastReason: limiter.lastDropReason,
CurrentKeys: len(limiter.buckets),
}
}
func (limiter *rateLimiter) ResetStats() {
if limiter == nil {
return
}
limiter.mu.Lock()
limiter.allowedCount = 0
limiter.droppedCount = 0
limiter.passedThroughCount = 0
limiter.suppressedCount = 0
limiter.summaryCount = 0
limiter.lastDropTime = time.Time{}
limiter.lastDropKey = ""
limiter.lastDropReason = ""
limiter.mu.Unlock()
}
func (limiter *rateLimiter) isLimitedLevel(level int) bool {
if len(limiter.limitedLevel) == 0 {
return true
}
_, ok := limiter.limitedLevel[level]
return ok
}
func (limiter *rateLimiter) isExempt(entry *Entry) bool {
if entry == nil {
return false
}
if _, ok := limiter.exemptLevel[entry.Level]; ok {
return true
}
if limiter.cfg.ExemptMatcher != nil && limiter.cfg.ExemptMatcher(entry) {
return true
}
return false
}
func (limiter *rateLimiter) resolveKey(entry *Entry) string {
if limiter.cfg.Scope == RateLimitScopeGlobal {
return "__global__"
}
if limiter.cfg.KeyFunc != nil {
key := strings.TrimSpace(limiter.cfg.KeyFunc(entry))
if key != "" {
return key
}
}
if entry == nil {
return "__empty__"
}
message := strings.TrimSpace(entry.Message)
if message == "" {
return strconv.Itoa(entry.Level)
}
return strconv.Itoa(entry.Level) + ":" + message
}
func (limiter *rateLimiter) cleanupBucketsLocked(now time.Time) {
if limiter.cfg.Scope != RateLimitScopeByKey || limiter.cfg.KeyTTL <= 0 {
return
}
if !limiter.lastCleanupTime.IsZero() && now.Sub(limiter.lastCleanupTime) < time.Second {
return
}
for key, bucket := range limiter.buckets {
if bucket == nil {
delete(limiter.buckets, key)
continue
}
if now.Sub(bucket.lastSeen) > limiter.cfg.KeyTTL {
delete(limiter.buckets, key)
}
}
limiter.lastCleanupTime = now
}
func (limiter *rateLimiter) getBucketLocked(key string, now time.Time) *rateLimitBucket {
if bucket, ok := limiter.buckets[key]; ok && bucket != nil {
return bucket
}
if limiter.cfg.Scope == RateLimitScopeByKey && limiter.cfg.MaxKeys > 0 && len(limiter.buckets) >= limiter.cfg.MaxKeys {
overflowKey := "__overflow__"
if bucket, ok := limiter.buckets[overflowKey]; ok && bucket != nil {
return bucket
}
oldestKey := ""
oldestTime := now
for existingKey, bucket := range limiter.buckets {
if bucket == nil {
oldestKey = existingKey
break
}
if oldestKey == "" || bucket.lastSeen.Before(oldestTime) {
oldestKey = existingKey
oldestTime = bucket.lastSeen
}
}
if oldestKey != "" {
delete(limiter.buckets, oldestKey)
}
key = overflowKey
}
bucket := &rateLimitBucket{
tokens: float64(limiter.cfg.Burst),
lastRefill: now,
lastSeen: now,
}
limiter.buckets[key] = bucket
return bucket
}
func (limiter *rateLimiter) takeTokenLocked(bucket *rateLimitBucket, now time.Time) bool {
if bucket == nil {
return true
}
if bucket.lastRefill.IsZero() {
bucket.lastRefill = now
}
elapsed := now.Sub(bucket.lastRefill).Seconds()
if elapsed > 0 && limiter.cfg.Rate > 0 {
bucket.tokens += elapsed * limiter.cfg.Rate
maxTokens := float64(limiter.cfg.Burst)
if bucket.tokens > maxTokens {
bucket.tokens = maxTokens
}
bucket.lastRefill = now
}
bucket.lastSeen = now
if bucket.tokens >= 1 {
bucket.tokens -= 1
return true
}
return false
}
func cloneEntryForDrop(entry *Entry) Entry {
if entry == nil {
return Entry{}
}
cloned := *entry
cloned.Fields = cloneFields(entry.Fields)
return cloned
}
func (limiter *rateLimiter) Allow(entry *Entry) bool {
if limiter == nil || entry == nil {
return true
}
now := limiter.now()
var callback func(RateLimitDropData)
dropData := RateLimitDropData{}
allow := true
limiter.mu.Lock()
if !limiter.cfg.Enable || limiter.cfg.Rate <= 0 {
limiter.allowedCount++
limiter.mu.Unlock()
return true
}
if !limiter.isLimitedLevel(entry.Level) || limiter.isExempt(entry) {
limiter.allowedCount++
limiter.mu.Unlock()
return true
}
limiter.cleanupBucketsLocked(now)
key := limiter.resolveKey(entry)
bucket := limiter.getBucketLocked(key, now)
if limiter.takeTokenLocked(bucket, now) {
limiter.allowedCount++
limiter.mu.Unlock()
return true
}
limiter.suppressedCount++
limiter.lastDropTime = now
limiter.lastDropKey = key
limiter.lastDropReason = "rate_limit_exceeded"
if limiter.cfg.DropPolicy == RateLimitPassThrough {
limiter.passedThroughCount++
limiter.allowedCount++
allow = true
} else {
limiter.droppedCount++
allow = false
}
dropData = RateLimitDropData{
Time: now,
Key: key,
Reason: limiter.lastDropReason,
Level: entry.Level,
LevelName: entry.LevelName,
LoggerName: entry.LoggerName,
Message: entry.Message,
PassThrough: limiter.cfg.DropPolicy == RateLimitPassThrough,
DroppedCount: limiter.droppedCount,
PassedThroughCount: limiter.passedThroughCount,
Suppressed: limiter.suppressedCount,
}
if limiter.cfg.SummaryInterval > 0 {
if limiter.lastSummaryTime.IsZero() {
limiter.lastSummaryTime = now
} else if now.Sub(limiter.lastSummaryTime) >= limiter.cfg.SummaryInterval {
dropData.Summary = true
dropData.SummarySuppressed = limiter.suppressedCount
limiter.summaryCount++
limiter.suppressedCount = 0
limiter.lastSummaryTime = now
}
}
callback = limiter.cfg.OnDrop
limiter.mu.Unlock()
if callback != nil {
entryCopy := cloneEntryForDrop(entry)
dropData.Level = entryCopy.Level
dropData.LevelName = entryCopy.LevelName
dropData.LoggerName = entryCopy.LoggerName
dropData.Message = entryCopy.Message
func() {
defer func() {
recover()
}()
callback(dropData)
}()
}
return allow
}
func (logger *starlog) allowByRateLimit(entry *Entry) bool {
if logger == nil || logger.rateLimiter == nil {
return true
}
return logger.rateLimiter.Allow(entry)
}
func (logger *StarLogger) SetRateLimitConfig(cfg RateLimitConfig) {
if logger == nil || logger.logcore == nil {
return
}
logger.logcore.mu.Lock()
if logger.logcore.rateLimiter == nil {
logger.logcore.rateLimiter = newRateLimiter()
}
limiter := logger.logcore.rateLimiter
logger.logcore.mu.Unlock()
limiter.SetConfig(cfg)
}
func (logger *StarLogger) GetRateLimitConfig() RateLimitConfig {
if logger == nil || logger.logcore == nil {
return normalizeRateLimitConfig(DefaultRateLimitConfig())
}
logger.logcore.mu.Lock()
limiter := logger.logcore.rateLimiter
logger.logcore.mu.Unlock()
if limiter == nil {
return normalizeRateLimitConfig(DefaultRateLimitConfig())
}
return limiter.Config()
}
func (logger *StarLogger) EnableRateLimit(enable bool) {
cfg := logger.GetRateLimitConfig()
cfg.Enable = enable
logger.SetRateLimitConfig(cfg)
}
func (logger *StarLogger) SetRateLimitDropHandler(handler func(RateLimitDropData)) {
cfg := logger.GetRateLimitConfig()
cfg.OnDrop = handler
logger.SetRateLimitConfig(cfg)
}
func (logger *StarLogger) GetRateLimitStats() RateLimitStats {
if logger == nil || logger.logcore == nil {
return RateLimitStats{}
}
logger.logcore.mu.Lock()
limiter := logger.logcore.rateLimiter
logger.logcore.mu.Unlock()
if limiter == nil {
return RateLimitStats{}
}
return limiter.Stats()
}
func (logger *StarLogger) ResetRateLimitStats() {
if logger == nil || logger.logcore == nil {
return
}
logger.logcore.mu.Lock()
limiter := logger.logcore.rateLimiter
logger.logcore.mu.Unlock()
if limiter == nil {
return
}
limiter.ResetStats()
}