diff --git a/async_runtime_test.go b/async_runtime_test.go new file mode 100644 index 0000000..141ef85 --- /dev/null +++ b/async_runtime_test.go @@ -0,0 +1,27 @@ +package starlog + +import "testing" + +func resetLoggerAsyncRuntimeForTest(t *testing.T, logger *StarLogger) *asyncRuntime { + t.Helper() + if logger == nil { + t.Fatal("logger is nil") + } + runtime := logger.asyncRuntime() + runtime.resetForTest() + t.Cleanup(runtime.resetForTest) + return runtime +} + +func setAsyncRuntimeStateForTest(runtime *asyncRuntime, queue *starChanStack, started bool) { + if runtime == nil { + return + } + runtime.Stop() + runtime.mu.Lock() + runtime.queue = queue + runtime.started = started + runtime.stopChan = nil + runtime.doneChan = nil + runtime.mu.Unlock() +} diff --git a/core.go b/core.go index 2d625b2..2d9992d 100644 --- a/core.go +++ b/core.go @@ -578,7 +578,7 @@ func (logger *starlog) flushPendingWritesLocked(name string, colors []Attr) { logger.pendingWrites = logger.pendingWrites[1:] logger.signalPendingCondLocked() if err := logger.writeDirect(logStr); err != nil { - reportWriteError(err, LogData{ + logger.reportWriteError(err, LogData{ Name: name, Log: logStr, Colors: colors, @@ -622,13 +622,13 @@ func (logger *starlog) enqueuePendingWriteLocked(logStr string, data LogData) bo switch logger.pendingDropPolicy { case PendingDropNewest: atomic.AddUint64(&logger.pendingDropCount, 1) - reportWriteError(ErrPendingWriteDropped, data) + logger.reportWriteError(ErrPendingWriteDropped, data) return true case PendingBlock: atomic.AddUint64(&logger.pendingBlockCount, 1) if logger.pendingCond == nil { atomic.AddUint64(&logger.pendingDropCount, 1) - reportWriteError(ErrPendingWriteDropped, data) + logger.reportWriteError(ErrPendingWriteDropped, data) return true } logger.pendingCond.Wait() @@ -638,7 +638,7 @@ func (logger *starlog) enqueuePendingWriteLocked(logStr string, data LogData) bo dropData.Log = logger.pendingWrites[0] logger.pendingWrites = logger.pendingWrites[1:] logger.signalPendingCondLocked() - reportWriteError(ErrPendingWriteDropped, dropData) + logger.reportWriteError(ErrPendingWriteDropped, dropData) } } if !logger.switching { @@ -657,14 +657,15 @@ func (logger *starlog) invokeEntryHandler(handler Handler, timeout time.Duration if handlerCtx == nil { handlerCtx = context.Background() } + handlerCtx = withAsyncRuntime(handlerCtx, logger.asyncRuntime()) run := func() { defer func() { if panicErr := recover(); panicErr != nil { - reportAsyncDrop(fmt.Errorf("%w: %v", ErrAsyncHandlerPanic, panicErr), data) + logger.reportAsyncDrop(fmt.Errorf("%w: %v", ErrAsyncHandlerPanic, panicErr), data) } }() if err := handler.Handle(handlerCtx, entry); err != nil { - reportWriteError(err, data) + logger.reportWriteError(err, data) } } if timeout <= 0 { @@ -679,27 +680,29 @@ func (logger *starlog) invokeEntryHandler(handler Handler, timeout time.Duration select { case <-done: case <-time.After(timeout): - reportAsyncDrop(ErrAsyncHandlerTimeout, data) + logger.reportAsyncDrop(ErrAsyncHandlerTimeout, data) } } func (logger *starlog) enqueueAsyncTransfer(transfer logTransfer, fallbackSync bool) { - StartStacks() - if stacks == nil { - reportAsyncDrop(io.ErrClosedPipe, transfer.LogData) - if fallbackSync && GetAsyncFallbackToSync() { - invokeAsyncHandlerSafely(transfer.handlerFunc, transfer.LogData) + runtime := logger.asyncRuntime() + runtime.Start() + queue, _ := runtime.snapshot() + if queue == nil { + logger.reportAsyncDrop(io.ErrClosedPipe, transfer.LogData) + if fallbackSync && runtime.GetAsyncFallbackToSync() { + runtime.invokeAsyncHandlerSafely(transfer.handlerFunc, transfer.LogData) } return } - if err := stacks.TryPush(transfer); err != nil { + if err := queue.TryPush(transfer); err != nil { if errors.Is(err, errStackFull) { - reportAsyncDrop(ErrAsyncQueueFull, transfer.LogData) + logger.reportAsyncDrop(ErrAsyncQueueFull, transfer.LogData) } else { - reportAsyncDrop(err, transfer.LogData) + logger.reportAsyncDrop(err, transfer.LogData) } - if fallbackSync && GetAsyncFallbackToSync() { - invokeAsyncHandlerSafely(transfer.handlerFunc, transfer.LogData) + if fallbackSync && runtime.GetAsyncFallbackToSync() { + runtime.invokeAsyncHandlerSafely(transfer.handlerFunc, transfer.LogData) } } } @@ -795,21 +798,21 @@ func (logger *starlog) build(thread string, isStd bool, isShow bool, handler fun if level < snapshot.errOutputLevel { if snapshot.showColor { if _, err := fmt.Fprint(stdScreen, displayLine); err != nil { - reportWriteError(err, logData) + logger.reportWriteError(err, logData) } } else { if _, err := fmt.Fprint(os.Stdout, displayLine); err != nil { - reportWriteError(err, logData) + logger.reportWriteError(err, logData) } } } else { if snapshot.showColor { if _, err := fmt.Fprint(errScreen, displayLine); err != nil { - reportWriteError(err, logData) + logger.reportWriteError(err, logData) } } else { if _, err := fmt.Fprint(os.Stderr, displayLine); err != nil { - reportWriteError(err, logData) + logger.reportWriteError(err, logData) } } } @@ -858,7 +861,7 @@ func (logger *starlog) writeWithData(logStr string, data LogData) { } logger.flushPendingWritesLocked(data.Name, data.Colors) if err := logger.writeDirect(logStr); err != nil { - reportWriteError(err, data) + logger.reportWriteError(err, data) } } diff --git a/docs/MIGRATION.md b/docs/MIGRATION.md index 53a90f7..cd498d7 100644 --- a/docs/MIGRATION.md +++ b/docs/MIGRATION.md @@ -83,9 +83,17 @@ defer starlog.Shutdown(ctx) 说明: -- `Shutdown(ctx)` 会等待异步 handler drain,并统一关闭资源。 +- `Shutdown(ctx)` 会等待当前 logger 的异步 handler drain,并统一关闭资源。 - `Close()` 仍可用,但不等待异步 handler 队列。 +如果项目里主要使用自定义 logger,推荐直接写成: + +```go +ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) +defer cancel() +defer log.Shutdown(ctx) +``` + ## 常见旧写法到推荐写法 1. 配置项逐条 setter @@ -106,6 +114,9 @@ defer starlog.Shutdown(ctx) 6. 旧 `StartArchive` 轮转启动 推荐:`StartRotatePolicy` / `StartManagedRotatePolicy`,模板场景可直接用 `StartRotateByTime/BySize/ByTimeSize` +7. 包级异步观测/回调入口 +推荐:自定义 logger 优先使用实例方法,例如 `log.SetAsyncErrorHandler(...)`、`log.SetWriteErrorHandler(...)`、`log.GetAsyncDropCount()`、`log.GetWriteErrorCount()`、`log.Shutdown(ctx)` + ## 旧 API 仍可用(重要) 以下旧/兼容路径当前仍可用: @@ -114,6 +125,7 @@ defer starlog.Shutdown(ctx) - 历史错拼兼容名仍可用(如 `EnbaleWrite`、`IsWriteStoed`、`HookBeforArchive`) - `Archive` 旧模型仍可用(建议新代码优先 `RotatePolicy`) - 旧生命周期入口仍可用(如 `CloseStd`、`Close`),但建议新代码优先 `Shutdown(ctx)` / `CloseLogFile` +- 包级异步/错误统计入口仍可用,但默认绑定全局 `Std` logger;自定义 logger 建议改用实例方法 ## 升级后验证建议 @@ -123,7 +135,7 @@ defer starlog.Shutdown(ctx) 4. 重点验证: - 级别过滤是否符合预期 - 归档文件数量/保留策略 - - 异步丢弃计数和写入错误计数 + - 异步丢弃计数和写入错误计数是否落在预期 logger 上 ## 何时需要进一步迁移 @@ -135,5 +147,5 @@ defer starlog.Shutdown(ctx) --- -如果你希望,我可以基于你当前项目的实际日志初始化代码,给出一份“一次改完可落地”的迁移 patch 方案。 +自定义 logger 较多的项目,建议优先检查包级异步观测入口是否仍在使用,并逐步替换为实例方法。 diff --git a/docs/USAGE.md b/docs/USAGE.md index 2d80bdb..7d2a6f1 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -421,9 +421,17 @@ log.SetPendingWriteLimit(1024) log.SetPendingDropPolicy(starlog.PendingDropOldest) ``` +说明: + +- 每个 `StarLogger` 都有独立的异步运行时。 +- `SetHandler`、`SetEntryHandler`、`SetAsyncErrorHandler`、`SetWriteErrorHandler`、`GetAsyncDropCount`、`GetWriteErrorCount`、`GetAsyncMetrics`、`Shutdown(ctx)` 都作用于当前 logger。 +- 包级入口(如 `starlog.SetAsyncErrorHandler`、`starlog.GetAsyncDropCount`、`starlog.Shutdown(ctx)`)保留兼容,默认作用于全局 `Std` logger。 + 可观测项: +- `GetAsyncMetrics()` - `GetAsyncDropCount()` +- `GetWriteErrorCount()` - `GetPendingStats()` ## 11. 高频防爆(去重 / 采样 / 限流) @@ -477,6 +485,8 @@ _ = snapshot 常用统计: +- `GetMetricsSnapshot()` +- `GetAsyncMetrics()` - `GetPendingStats()` - `GetSamplingStats()` - `GetDedupStats()` @@ -491,7 +501,7 @@ _ = snapshot - `Flush()`:刷写 pending - `Sync()`:`Flush + 底层 Sync()`(若支持) - `Close()`:关闭资源,不等待异步队列 drain -- `Shutdown(ctx)`:等待异步 drain,再关闭资源 +- `Shutdown(ctx)`:等待当前 logger 的异步 drain,再关闭资源 推荐: @@ -501,7 +511,11 @@ defer cancel() defer log.Shutdown(ctx) ``` -全局 logger 可使用:`starlog.Shutdown(ctx)`。 +说明: + +- 自定义 logger 优先使用 `log.Shutdown(ctx)`。 +- 全局 logger 可使用 `starlog.Shutdown(ctx)`。 +- `WaitAsyncDrain(ctx)` 作为兼容入口保留,默认等待全局 `Std` logger 的异步队列;自定义 logger 退出时直接使用 `log.Shutdown(ctx)` 更明确。 ## 15. 标准库桥接 @@ -678,6 +692,7 @@ func main() { - 显示回调链路:可能因超时或队列满发生丢弃,可通过计数观测 - writer/sink 主写链路:只要未被过滤(级别/去重/采样/限流)且写入正常,会落盘 +- 多个自定义 logger 之间的异步丢弃计数、写入错误计数和关闭流程彼此独立 ### Q3:新项目应该选 `Archive` 还是 `RotatePolicy`? diff --git a/lifecycle.go b/lifecycle.go index e0ac907..2d37744 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "time" ) type sinkSyncer interface { @@ -15,6 +14,10 @@ type writerSyncer interface { Sync() error } +type contextualCloser interface { + CloseWithContext(context.Context) error +} + func mergeLifecycleError(current error, next error) error { if next == nil { return current @@ -26,23 +29,7 @@ func mergeLifecycleError(current error, next error) error { } func WaitAsyncDrain(ctx context.Context) error { - if ctx == nil { - ctx = context.Background() - } - for { - stackMu.Lock() - current := stacks - started := stackStarted - stackMu.Unlock() - if !started || current == nil || current.Len() == 0 { - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(5 * time.Millisecond): - } - } + return defaultAsyncRuntime().WaitDrain(ctx) } func (logger *StarLogger) Flush() error { @@ -112,7 +99,9 @@ func (logger *StarLogger) Close() error { } } if entryHandler != nil { - if closer, ok := entryHandler.(interface{ Close() error }); ok { + if closer, ok := entryHandler.(contextualCloser); ok { + err = mergeLifecycleError(err, closer.CloseWithContext(withAsyncRuntime(context.Background(), logger.asyncRuntime()))) + } else if closer, ok := entryHandler.(interface{ Close() error }); ok { err = mergeLifecycleError(err, closer.Close()) } } @@ -126,8 +115,8 @@ func (logger *StarLogger) Shutdown(ctx context.Context) error { } var err error err = mergeLifecycleError(err, logger.Flush()) - err = mergeLifecycleError(err, WaitAsyncDrain(ctx)) - StopStacks() + err = mergeLifecycleError(err, logger.asyncRuntime().WaitDrain(ctx)) + logger.asyncRuntime().Stop() err = mergeLifecycleError(err, logger.Close()) return err } diff --git a/lifecycle_test.go b/lifecycle_test.go index 8e62bc9..045b386 100644 --- a/lifecycle_test.go +++ b/lifecycle_test.go @@ -91,25 +91,13 @@ func TestCloseClosesSinkAndStopsWrite(t *testing.T) { } func TestWaitAsyncDrainContextTimeout(t *testing.T) { - stackMu.Lock() - stackStarted = true - stacks = newStarChanStack(1) - stackStopChan = nil - stackDoneChan = nil - stackMu.Unlock() - defer func() { - stackMu.Lock() - if stacks != nil { - _ = stacks.Close() - } - stackStarted = false - stacks = nil - stackStopChan = nil - stackDoneChan = nil - stackMu.Unlock() - }() + runtime := defaultAsyncRuntime() + runtime.resetForTest() + t.Cleanup(runtime.resetForTest) - if err := stacks.Push("x"); err != nil { + queue := newStarChanStack(1) + setAsyncRuntimeStateForTest(runtime, queue, true) + if err := queue.Push("x"); err != nil { t.Fatalf("prepare queue failed: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) @@ -121,13 +109,8 @@ func TestWaitAsyncDrainContextTimeout(t *testing.T) { } func TestShutdownStopsAsyncStacks(t *testing.T) { - resetAsyncMetricsForTest() - defer func() { - resetAsyncMetricsForTest() - StopStacks() - }() - logger := NewStarlog(nil) + runtime := resetLoggerAsyncRuntimeForTest(t, logger) logger.SetShowStd(false) handled := make(chan struct{}, 1) logger.SetHandler(func(LogData) { @@ -149,11 +132,36 @@ func TestShutdownStopsAsyncStacks(t *testing.T) { t.Fatalf("async handler should complete before shutdown") } - stackMu.Lock() - started := stackStarted - stackMu.Unlock() - if started { - t.Fatalf("Shutdown should stop async stacks") + if runtime.Metrics().Started { + t.Fatalf("Shutdown should stop logger async runtime") + } +} + +func TestShutdownDoesNotStopOtherLoggerAsyncRuntime(t *testing.T) { + loggerA := NewStarlog(nil) + runtimeA := resetLoggerAsyncRuntimeForTest(t, loggerA) + loggerA.SetShowStd(false) + loggerA.SetHandler(func(LogData) {}) + + loggerB := NewStarlog(nil) + runtimeB := resetLoggerAsyncRuntimeForTest(t, loggerB) + loggerB.SetShowStd(false) + loggerB.SetHandler(func(LogData) {}) + + if !runtimeA.Metrics().Started || !runtimeB.Metrics().Started { + t.Fatalf("both logger runtimes should be started") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := loggerA.Shutdown(ctx); err != nil { + t.Fatalf("Shutdown failed: %v", err) + } + if runtimeA.Metrics().Started { + t.Fatalf("Shutdown should stop only the target logger runtime") + } + if !runtimeB.Metrics().Started { + t.Fatalf("Shutdown should not stop another logger runtime") } } diff --git a/metrics.go b/metrics.go index 54e8fea..cf90b4b 100644 --- a/metrics.go +++ b/metrics.go @@ -35,42 +35,29 @@ type MetricsSnapshot struct { } func GetAsyncMetrics() AsyncMetrics { - stackMu.Lock() - started := stackStarted - current := stacks - stackMu.Unlock() - - snapshot := AsyncMetrics{ - Started: started, - Dropped: GetAsyncDropCount(), - FallbackToSync: GetAsyncFallbackToSync(), - HandlerTimeout: GetAsyncHandlerTimeout(), - } - if current != nil { - snapshot.QueueLength = current.Len() - snapshot.QueueCapacity = current.Cap() - snapshot.QueueFree = current.Free() - } - return snapshot + return defaultAsyncRuntime().Metrics() } func (logger *StarLogger) GetMetricsSnapshot() MetricsSnapshot { snapshot := MetricsSnapshot{ - Time: time.Now(), - Async: GetAsyncMetrics(), + Time: time.Now(), Errors: ErrorMetrics{ - WriteErrors: GetWriteErrorCount(), + WriteErrors: 0, }, } if logger == nil || logger.logcore == nil { + snapshot.Async = GetAsyncMetrics() + snapshot.Errors.WriteErrors = GetWriteErrorCount() return snapshot } + snapshot.Async = logger.GetAsyncMetrics() snapshot.Pending = logger.GetPendingStats() snapshot.Sampling = logger.GetSamplingStats() snapshot.Dedup = logger.GetDedupStats() snapshot.RateLimit = logger.GetRateLimitStats() snapshot.Errors.RedactionErrors = logger.GetRedactErrorCount() + snapshot.Errors.WriteErrors = logger.GetWriteErrorCount() snapshot.ArchiveRunning = IsArchiveRun(logger) logger.logcore.mu.Lock() @@ -90,6 +77,8 @@ func (logger *StarLogger) GetMetricsSnapshot() MetricsSnapshot { } func (logger *StarLogger) GetAsyncMetrics() AsyncMetrics { - _ = logger - return GetAsyncMetrics() + if logger == nil { + return GetAsyncMetrics() + } + return logger.asyncRuntime().Metrics() } diff --git a/metrics_test.go b/metrics_test.go index d456fd1..cab0460 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -1,6 +1,7 @@ package starlog import ( + "bytes" "sync/atomic" "testing" "time" @@ -60,8 +61,8 @@ func TestMetricsSnapshotIncludesPendingAndMultiSink(t *testing.T) { if snapshot.Pending.Policy != PendingDropOldest { t.Fatalf("snapshot should include pending policy") } - if snapshot.Errors.WriteErrors != GetWriteErrorCount() { - t.Fatalf("snapshot write error count should match global counter") + if snapshot.Errors.WriteErrors != logger.GetWriteErrorCount() { + t.Fatalf("snapshot write error count should match logger counter") } if !snapshot.Sampling.Enabled || snapshot.Sampling.Rate != 0.5 { t.Fatalf("snapshot should include sampling stats, got %+v", snapshot.Sampling) @@ -97,3 +98,27 @@ func TestMetricsSnapshotNilLogger(t *testing.T) { t.Fatalf("snapshot should contain timestamp") } } + +func TestMetricsSnapshotUsesLoggerRuntimeCounters(t *testing.T) { + loggerA := NewStarlog(&errWriter{}) + resetLoggerAsyncRuntimeForTest(t, loggerA) + loggerA.SetShowStd(false) + loggerA.SetShowColor(false) + loggerA.SetShowOriginFile(false) + loggerA.SetShowFuncName(false) + loggerA.SetShowFlag(false) + + loggerB := newStructuredTestLogger(&bytes.Buffer{}) + resetLoggerAsyncRuntimeForTest(t, loggerB) + + loggerA.Infoln("write error check") + + snapshotA := loggerA.GetMetricsSnapshot() + snapshotB := loggerB.GetMetricsSnapshot() + if snapshotA.Errors.WriteErrors == 0 { + t.Fatalf("snapshot should include logger write errors") + } + if snapshotB.Errors.WriteErrors != 0 { + t.Fatalf("other logger snapshot should not inherit write errors, got %d", snapshotB.Errors.WriteErrors) + } +} diff --git a/observer_test.go b/observer_test.go index 9d40733..dd10404 100644 --- a/observer_test.go +++ b/observer_test.go @@ -38,9 +38,9 @@ func waitObserverCondition(t *testing.T, timeout time.Duration, cond func() bool } func TestObserverCollectsEntries(t *testing.T) { - defer StopStacks() var buf bytes.Buffer logger := newStructuredTestLogger(&buf) + defer logger.asyncRuntime().Stop() observer := NewObserver() logger.AppendEntryHandler(observer) @@ -61,9 +61,9 @@ func TestObserverCollectsEntries(t *testing.T) { } func TestObserverLimitAndDropped(t *testing.T) { - defer StopStacks() observer := NewObserverWithLimit(2) logger := newStructuredTestLogger(&bytes.Buffer{}) + defer logger.asyncRuntime().Stop() logger.AppendEntryHandler(observer) logger.Info("one") @@ -93,9 +93,9 @@ func TestObserverLimitAndDropped(t *testing.T) { } func TestObserverTakeAllAndReset(t *testing.T) { - defer StopStacks() observer := NewObserver() logger := newStructuredTestLogger(&bytes.Buffer{}) + defer logger.asyncRuntime().Stop() logger.AppendEntryHandler(observer) logger.Info("a") @@ -119,9 +119,9 @@ func TestObserverTakeAllAndReset(t *testing.T) { } func TestTestHookAttachAndRestore(t *testing.T) { - defer StopStacks() var buf bytes.Buffer logger := newStructuredTestLogger(&buf) + defer logger.asyncRuntime().Stop() var previousCount uint64 logger.SetEntryHandler(HandlerFunc(func(_ context.Context, _ *Entry) error { atomic.AddUint64(&previousCount, 1) @@ -163,8 +163,8 @@ func TestTestHookAttachAndRestore(t *testing.T) { } func TestTestHookCloseWhenHandlerReplaced(t *testing.T) { - defer StopStacks() logger := newStructuredTestLogger(&bytes.Buffer{}) + defer logger.asyncRuntime().Stop() hook := NewTestHook(logger) logger.SetEntryHandler(nil) diff --git a/p0_reliability_test.go b/p0_reliability_test.go index 20b1ec9..5bc90df 100644 --- a/p0_reliability_test.go +++ b/p0_reliability_test.go @@ -33,21 +33,8 @@ func TestWriteBufferFlushAfterSwitchingOff(t *testing.T) { } func TestAsyncQueuePushFailureFallsBackToSyncHandler(t *testing.T) { - resetAsyncMetricsForTest() - defer func() { - resetAsyncMetricsForTest() - stackMu.Lock() - if stacks != nil { - _ = stacks.Close() - } - stackStarted = false - stacks = nil - stackStopChan = nil - stackDoneChan = nil - stackMu.Unlock() - }() - logger := NewStarlog(nil) + runtime := resetLoggerAsyncRuntimeForTest(t, logger) logger.SetShowStd(false) var handled uint64 logger.handlerFunc = func(data LogData) { @@ -55,26 +42,21 @@ func TestAsyncQueuePushFailureFallsBackToSyncHandler(t *testing.T) { } alertCalled := make(chan struct{}, 1) - SetAsyncErrorHandler(func(err error, data LogData) { + logger.SetAsyncErrorHandler(func(err error, data LogData) { select { case alertCalled <- struct{}{}: default: } }) - stackMu.Lock() - stackStarted = true - stacks = nil - stackStopChan = nil - stackDoneChan = nil - stackMu.Unlock() + setAsyncRuntimeStateForTest(runtime, nil, true) logger.Infoln("trigger async fallback") if atomic.LoadUint64(&handled) != 1 { t.Fatalf("handler should be called once via sync fallback, got: %d", handled) } - if GetAsyncDropCount() == 0 { + if logger.GetAsyncDropCount() == 0 { t.Fatalf("async drop counter should increase on push failure") } select { @@ -221,15 +203,13 @@ func (w *errWriter) Write(p []byte) (int, error) { } func TestWriteErrorObservable(t *testing.T) { - resetAsyncMetricsForTest() - defer resetAsyncMetricsForTest() - logger := NewStarlog(&errWriter{}) + resetLoggerAsyncRuntimeForTest(t, logger) logger.SetShowStd(false) logger.SetShowColor(false) observed := make(chan struct{}, 1) - SetWriteErrorHandler(func(err error, data LogData) { + logger.SetWriteErrorHandler(func(err error, data LogData) { if err != nil { select { case observed <- struct{}{}: @@ -240,7 +220,7 @@ func TestWriteErrorObservable(t *testing.T) { logger.Infoln("write error check") - if GetWriteErrorCount() == 0 { + if logger.GetWriteErrorCount() == 0 { t.Fatalf("write error count should increase") } select { @@ -251,13 +231,8 @@ func TestWriteErrorObservable(t *testing.T) { } func TestAsyncHandlerPanicDoesNotCrash(t *testing.T) { - resetAsyncMetricsForTest() - defer func() { - resetAsyncMetricsForTest() - StopStacks() - }() - logger := NewStarlog(nil) + resetLoggerAsyncRuntimeForTest(t, logger) logger.SetShowStd(false) logger.SetHandler(func(LogData) { panic("boom") @@ -266,19 +241,14 @@ func TestAsyncHandlerPanicDoesNotCrash(t *testing.T) { logger.Infoln("panic safe") time.Sleep(20 * time.Millisecond) - if GetAsyncDropCount() == 0 { + if logger.GetAsyncDropCount() == 0 { t.Fatalf("panic in async handler should be reported as drop") } } func TestEntryHandlerTimeoutFallback(t *testing.T) { - resetAsyncMetricsForTest() - defer func() { - resetAsyncMetricsForTest() - StopStacks() - }() - logger := NewStarlog(nil) + resetLoggerAsyncRuntimeForTest(t, logger) logger.SetShowStd(false) logger.SetEntryHandler(HandlerFunc(func(context.Context, *Entry) error { time.Sleep(80 * time.Millisecond) @@ -295,7 +265,7 @@ func TestEntryHandlerTimeoutFallback(t *testing.T) { } deadline := time.Now().Add(300 * time.Millisecond) for time.Now().Before(deadline) { - if GetAsyncDropCount() > 0 { + if logger.GetAsyncDropCount() > 0 { return } time.Sleep(5 * time.Millisecond) @@ -304,20 +274,9 @@ func TestEntryHandlerTimeoutFallback(t *testing.T) { } func TestEntryHandlerQueueFullNoFallbackDoesNotBlock(t *testing.T) { - resetAsyncMetricsForTest() - defer func() { - resetAsyncMetricsForTest() - stackMu.Lock() - stackStarted = false - stacks = nil - stackStopChan = nil - stackDoneChan = nil - stackMu.Unlock() - }() - - SetAsyncFallbackToSync(false) - logger := NewStarlog(nil) + runtime := resetLoggerAsyncRuntimeForTest(t, logger) + logger.SetAsyncFallbackToSync(false) logger.SetShowStd(false) var handled uint64 logger.SetEntryHandler(HandlerFunc(func(context.Context, *Entry) error { @@ -326,14 +285,10 @@ func TestEntryHandlerQueueFullNoFallbackDoesNotBlock(t *testing.T) { return nil })) - stackMu.Lock() - stackStarted = true - stacks = newStarChanStack(1) - stackStopChan = nil - stackDoneChan = nil - stackMu.Unlock() + queue := newStarChanStack(1) + setAsyncRuntimeStateForTest(runtime, queue, true) - if err := stacks.Push(logTransfer{ + if err := queue.Push(logTransfer{ handlerFunc: func(LogData) { time.Sleep(80 * time.Millisecond) }, @@ -351,7 +306,7 @@ func TestEntryHandlerQueueFullNoFallbackDoesNotBlock(t *testing.T) { if atomic.LoadUint64(&handled) != 0 { t.Fatalf("entry handler should be dropped when queue is full and fallback disabled, got %d", handled) } - if GetAsyncDropCount() == 0 { + if logger.GetAsyncDropCount() == 0 { t.Fatalf("queue-full drop should be observable") } } diff --git a/redaction.go b/redaction.go index 032e9ea..b2adb37 100644 --- a/redaction.go +++ b/redaction.go @@ -65,7 +65,7 @@ func (logger *starlog) applyRedaction(snapshot *starlog, entry *Entry) bool { } func (logger *starlog) handleRedactionFailure(snapshot *starlog, entry *Entry, err error) bool { - reportWriteError(fmt.Errorf("%w: %v", ErrRedactionFailed, err), LogData{ + logger.reportWriteError(fmt.Errorf("%w: %v", ErrRedactionFailed, err), LogData{ Name: snapshot.name, Log: entry.Message, }) diff --git a/redaction_test.go b/redaction_test.go index c9aef70..c214841 100644 --- a/redaction_test.go +++ b/redaction_test.go @@ -146,17 +146,15 @@ func TestSetRedactMaskToken(t *testing.T) { } func TestRedactionFailureReportsWriteError(t *testing.T) { - resetAsyncMetricsForTest() - defer resetAsyncMetricsForTest() - var buf bytes.Buffer logger := newStructuredTestLogger(&buf) + resetLoggerAsyncRuntimeForTest(t, logger) logger.SetRedactor(RedactorFunc(func(context.Context, *Entry) error { return errors.New("redactor failed") })) observed := make(chan error, 1) - SetWriteErrorHandler(func(err error, data LogData) { + logger.SetWriteErrorHandler(func(err error, data LogData) { if err == nil { return } @@ -167,7 +165,7 @@ func TestRedactionFailureReportsWriteError(t *testing.T) { }) logger.Info("check redaction error report") - if GetWriteErrorCount() == 0 { + if logger.GetWriteErrorCount() == 0 { t.Fatalf("write error count should increase when redaction fails") } select { diff --git a/route_handler.go b/route_handler.go index 7021bae..2a7e1df 100644 --- a/route_handler.go +++ b/route_handler.go @@ -87,7 +87,7 @@ func (handler *RouteHandler) Handle(ctx context.Context, entry *Entry) error { formatted, err := formatter.Format(entry) if err != nil { wrapErr := fmt.Errorf("route %s format failed: %w", route.name, err) - reportWriteError(wrapErr, LogData{ + reportWriteErrorWithContext(ctx, wrapErr, LogData{ Name: route.name, Log: entry.Message, }) @@ -101,7 +101,7 @@ func (handler *RouteHandler) Handle(ctx context.Context, entry *Entry) error { } if err = route.sink.Write(formatted); err != nil { wrapErr := fmt.Errorf("route %s write failed: %w", route.name, err) - reportWriteError(wrapErr, LogData{ + reportWriteErrorWithContext(ctx, wrapErr, LogData{ Name: route.name, Log: string(formatted), }) @@ -115,6 +115,10 @@ func (handler *RouteHandler) Handle(ctx context.Context, entry *Entry) error { } func (handler *RouteHandler) Close() error { + return handler.CloseWithContext(nil) +} + +func (handler *RouteHandler) CloseWithContext(ctx context.Context) error { if handler == nil { return nil } @@ -137,7 +141,7 @@ func (handler *RouteHandler) Close() error { } if err := route.sink.Close(); err != nil { wrapErr := fmt.Errorf("route %s close failed: %w", route.name, err) - reportWriteError(wrapErr, LogData{ + reportWriteErrorWithContext(ctx, wrapErr, LogData{ Name: route.name, }) if firstErr == nil { @@ -198,6 +202,10 @@ func (handler *chainedHandler) Handle(ctx context.Context, entry *Entry) error { } func (handler *chainedHandler) Close() error { + return handler.CloseWithContext(nil) +} + +func (handler *chainedHandler) CloseWithContext(ctx context.Context) error { if handler == nil { return nil } @@ -206,12 +214,16 @@ func (handler *chainedHandler) Close() error { if item == nil { continue } - closer, ok := item.(interface{ Close() error }) - if !ok { + if closer, ok := item.(contextualCloser); ok { + if err := closer.CloseWithContext(ctx); err != nil && firstErr == nil { + firstErr = err + } continue } - if err := closer.Close(); err != nil && firstErr == nil { - firstErr = err + if closer, ok := item.(interface{ Close() error }); ok { + if err := closer.Close(); err != nil && firstErr == nil { + firstErr = err + } } } return firstErr diff --git a/route_handler_test.go b/route_handler_test.go index b56a1fe..b40ad59 100644 --- a/route_handler_test.go +++ b/route_handler_test.go @@ -47,6 +47,17 @@ func (sink *closeCountSink) Close() error { return nil } +type closeFailSink struct{} + +func (sink *closeFailSink) Write(data []byte) error { + _ = data + return nil +} + +func (sink *closeFailSink) Close() error { + return errors.New("route sink close failed") +} + type safeBuffer struct { mu sync.Mutex buf bytes.Buffer @@ -206,11 +217,9 @@ func TestChainHandlerRunsAll(t *testing.T) { } func TestRouteHandlerWriteErrorObservable(t *testing.T) { - resetAsyncMetricsForTest() - defer resetAsyncMetricsForTest() - var totalBuf safeBuffer logger := newStructuredTestLogger(&totalBuf) + resetLoggerAsyncRuntimeForTest(t, logger) logger.SetEntryHandler(NewRouteHandler( Route{ Name: "failed-route", @@ -222,10 +231,50 @@ func TestRouteHandlerWriteErrorObservable(t *testing.T) { logger.Info("route write error") waitFor(t, 300*time.Millisecond, func() bool { - return GetWriteErrorCount() > 0 + return logger.GetWriteErrorCount() > 0 }, "route sink write error observable") } +func TestRouteHandlerCloseErrorUsesLoggerRuntime(t *testing.T) { + logger := newStructuredTestLogger(&bytes.Buffer{}) + resetLoggerAsyncRuntimeForTest(t, logger) + + observed := make(chan error, 1) + logger.SetWriteErrorHandler(func(err error, data LogData) { + if err == nil { + return + } + select { + case observed <- err: + default: + } + }) + logger.SetEntryHandler(NewRouteHandler( + Route{ + Name: "failed-close", + Match: MatchAllLevels(), + Formatter: &messageOnlyFormatter{}, + Sink: &closeFailSink{}, + }, + )) + + err := logger.Close() + if err == nil { + t.Fatalf("logger close should return route close error") + } + if logger.GetWriteErrorCount() == 0 { + t.Fatalf("route close error should increase logger write error count") + } + select { + case observedErr := <-observed: + if !strings.Contains(observedErr.Error(), "route sink close failed") { + t.Fatalf("unexpected observed error: %v", observedErr) + } + default: + t.Fatalf("route close error should invoke logger write error handler") + } +} + func TestRouteHandlerCloseDeduplicatesSameSink(t *testing.T) { sink := &closeCountSink{} handler := NewRouteHandler( @@ -284,23 +333,31 @@ func TestRouteHandlerWithRotatingFileSink(t *testing.T) { logger.Infof("debug payload %02d %s", idx, strings.Repeat("x", 24)) time.Sleep(3 * time.Millisecond) } - waitFor(t, 2*time.Second, func() bool { - matches, _ := filepath.Glob(filepath.Join(filepath.Dir(debugPath), "debug.*.log")) - return len(matches) > 0 - }, "rotating route sink archive creation") - content, readErr := ioutil.ReadFile(debugPath) - if readErr != nil { - t.Fatalf("read debug current log failed: %v", readErr) - } - if len(content) == 0 { - t.Fatalf("debug log should contain routed logs") + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err = logger.Shutdown(ctx); err != nil { + t.Fatalf("logger shutdown failed: %v", err) } - if err = logger.Close(); err != nil { - t.Fatalf("logger close failed: %v", err) + paths := []string{debugPath} + matches, _ := filepath.Glob(filepath.Join(filepath.Dir(debugPath), "debug.*.log")) + paths = append(paths, matches...) + found := false + for _, path := range paths { + content, readErr := ioutil.ReadFile(path) + if readErr != nil { + t.Fatalf("read routed log %s failed: %v", path, readErr) + } + if strings.Contains(string(content), "debug payload") { + found = true + break + } + } + if !found { + t.Fatalf("routed logs should exist in current or archived debug files") } if err = debugSink.Write([]byte("after close")); !errors.Is(err, ErrRotatingFileSinkClosed) { - t.Fatalf("rotating sink should be closed by logger close, got %v", err) + t.Fatalf("rotating sink should be closed by logger shutdown, got %v", err) } } diff --git a/starlog.go b/starlog.go index 442c258..b72152a 100644 --- a/starlog.go +++ b/starlog.go @@ -104,21 +104,21 @@ func (logger *starlog) applyPendingLimitLocked(limit int) { } dropped := len(logger.pendingWrites) - logger.pendingWriteLimit switch logger.pendingDropPolicy { - case PendingDropNewest: - keepEnd := len(logger.pendingWrites) - dropped - for idx := keepEnd; idx < len(logger.pendingWrites); idx++ { - reportWriteError(ErrPendingWriteDropped, LogData{ - Name: logger.name, - Log: logger.pendingWrites[idx], - }) + case PendingDropNewest: + keepEnd := len(logger.pendingWrites) - dropped + for idx := keepEnd; idx < len(logger.pendingWrites); idx++ { + logger.reportWriteError(ErrPendingWriteDropped, LogData{ + Name: logger.name, + Log: logger.pendingWrites[idx], + }) } logger.pendingWrites = logger.pendingWrites[:keepEnd] - default: - for idx := 0; idx < dropped; idx++ { - reportWriteError(ErrPendingWriteDropped, LogData{ - Name: logger.name, - Log: logger.pendingWrites[idx], - }) + default: + for idx := 0; idx < dropped; idx++ { + logger.reportWriteError(ErrPendingWriteDropped, LogData{ + Name: logger.name, + Log: logger.pendingWrites[idx], + }) } logger.pendingWrites = logger.pendingWrites[dropped:] } @@ -464,7 +464,7 @@ func (logger *StarLogger) AppendEntryHandler(handler Handler) { func (logger *StarLogger) SetHandler(f func(LogData)) { if f != nil { - StartStacks() + logger.asyncRuntime().Start() } logger.logcore.mu.Lock() defer logger.logcore.mu.Unlock() @@ -487,35 +487,35 @@ func (logger *StarLogger) SetSwitching(sw bool) { } func (logger *StarLogger) SetAsyncErrorHandler(alert func(error, LogData)) { - SetAsyncErrorHandler(alert) + logger.asyncRuntime().SetAsyncErrorHandler(alert) } func (logger *StarLogger) GetAsyncDropCount() uint64 { - return GetAsyncDropCount() + return logger.asyncRuntime().GetAsyncDropCount() } func (logger *StarLogger) SetAsyncFallbackToSync(enable bool) { - SetAsyncFallbackToSync(enable) + logger.asyncRuntime().SetAsyncFallbackToSync(enable) } func (logger *StarLogger) GetAsyncFallbackToSync() bool { - return GetAsyncFallbackToSync() + return logger.asyncRuntime().GetAsyncFallbackToSync() } func (logger *StarLogger) SetAsyncHandlerTimeout(timeout time.Duration) { - SetAsyncHandlerTimeout(timeout) + logger.asyncRuntime().SetAsyncHandlerTimeout(timeout) } func (logger *StarLogger) GetAsyncHandlerTimeout() time.Duration { - return GetAsyncHandlerTimeout() + return logger.asyncRuntime().GetAsyncHandlerTimeout() } func (logger *StarLogger) SetWriteErrorHandler(alert func(error, LogData)) { - SetWriteErrorHandler(alert) + logger.asyncRuntime().SetWriteErrorHandler(alert) } func (logger *StarLogger) GetWriteErrorCount() uint64 { - return GetWriteErrorCount() + return logger.asyncRuntime().GetWriteErrorCount() } func (logger *StarLogger) SetOnlyColorLevel(ocl bool) { diff --git a/typed.go b/typed.go index 8fe1e19..aab3a54 100644 --- a/typed.go +++ b/typed.go @@ -59,6 +59,8 @@ const ( FieldTypeOther = "other" ) +const defaultAsyncQueueCapacity uint64 = 1024 + var ( ErrAsyncHandlerPanic = errors.New("async handler panic") ErrAsyncHandlerTimeout = errors.New("async handler timeout") @@ -90,27 +92,41 @@ var ( "panic": LvPanic, "fatal": LvFatal, } - stacks *starChanStack - stackStarted bool = false - stackStopChan chan struct{} - stackDoneChan chan struct{} - stackMu sync.Mutex - stackDrop uint64 - stackAlert func(error, LogData) - stackAlertMu sync.RWMutex - stackFallback uint32 = 1 - stackTimeout int64 + + stdScreen io.Writer = colorable.NewColorableStdout() + errScreen io.Writer = colorable.NewColorableStderr() + + defaultAsyncRuntimeOnce sync.Once + defaultAsyncRuntimeFallback *asyncRuntime +) + +type asyncRuntime struct { + mu sync.Mutex + + queue *starChanStack + started bool + stopChan chan struct{} + doneChan chan struct{} + + dropCount uint64 + + asyncAlert func(error, LogData) + asyncAlertMu sync.RWMutex + fallbackSync uint32 + timeout int64 writeErrCount uint64 writeErrHandler func(error, LogData) writeErrMu sync.RWMutex - stdScreen io.Writer = colorable.NewColorableStdout() - errScreen io.Writer = colorable.NewColorableStderr() -) + queueCapacity uint64 +} + +type asyncRuntimeContextKey struct{} type starlog struct { mu *sync.Mutex + runtime *asyncRuntime output io.Writer minLevel int errOutputLevel int @@ -240,9 +256,57 @@ type Config struct { ContextFieldExtractor func(context.Context) Fields } +func newAsyncRuntime(queueCapacity uint64) *asyncRuntime { + if queueCapacity == 0 { + queueCapacity = defaultAsyncQueueCapacity + } + runtime := &asyncRuntime{ + queueCapacity: queueCapacity, + } + atomic.StoreUint32(&runtime.fallbackSync, 1) + return runtime +} + +func defaultAsyncRuntime() *asyncRuntime { + if Std != nil && Std.logcore != nil && Std.logcore.runtime != nil { + return Std.logcore.runtime + } + defaultAsyncRuntimeOnce.Do(func() { + defaultAsyncRuntimeFallback = newAsyncRuntime(defaultAsyncQueueCapacity) + }) + return defaultAsyncRuntimeFallback +} + +func withAsyncRuntime(ctx context.Context, runtime *asyncRuntime) context.Context { + if runtime == nil { + return ctx + } + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, asyncRuntimeContextKey{}, runtime) +} + +func runtimeFromContext(ctx context.Context) *asyncRuntime { + if ctx == nil { + return nil + } + runtime, _ := ctx.Value(asyncRuntimeContextKey{}).(*asyncRuntime) + return runtime +} + +func reportWriteErrorWithContext(ctx context.Context, err error, data LogData) { + runtime := runtimeFromContext(ctx) + if runtime == nil { + runtime = defaultAsyncRuntime() + } + runtime.reportWriteError(err, data) +} + func newLogCore(out io.Writer) *starlog { core := &starlog{ mu: &sync.Mutex{}, + runtime: newAsyncRuntime(defaultAsyncQueueCapacity), output: out, minLevel: LvDebug, errOutputLevel: LvError, @@ -389,62 +453,98 @@ func generateId() string { return fmt.Sprintf("%dstar%db612%d", time.Now().UnixNano(), rand.Intn(1000000), rand.Intn(1000000)) } -func StartStacks() { - stackMu.Lock() - if stackStarted { - stackMu.Unlock() +func (logger *starlog) asyncRuntime() *asyncRuntime { + if logger == nil || logger.runtime == nil { + return defaultAsyncRuntime() + } + return logger.runtime +} + +func (logger *StarLogger) asyncRuntime() *asyncRuntime { + if logger == nil || logger.logcore == nil { + return defaultAsyncRuntime() + } + return logger.logcore.asyncRuntime() +} + +func (logger *starlog) reportAsyncDrop(err error, data LogData) { + logger.asyncRuntime().reportAsyncDrop(err, data) +} + +func (logger *starlog) reportWriteError(err error, data LogData) { + logger.asyncRuntime().reportWriteError(err, data) +} + +func (runtime *asyncRuntime) snapshot() (*starChanStack, bool) { + if runtime == nil { + return nil, false + } + runtime.mu.Lock() + defer runtime.mu.Unlock() + return runtime.queue, runtime.started +} + +func (runtime *asyncRuntime) Start() { + if runtime == nil { return } - stackStarted = true - stackStopChan = make(chan struct{}) - stackDoneChan = make(chan struct{}) - stacks = newStarChanStack(1024) - stopChan := stackStopChan - doneChan := stackDoneChan - stackMu.Unlock() - go func(stop <-chan struct{}, done chan struct{}) { + runtime.mu.Lock() + if runtime.started { + runtime.mu.Unlock() + return + } + queue := newStarChanStack(runtime.queueCapacity) + stopChan := make(chan struct{}) + doneChan := make(chan struct{}) + runtime.queue = queue + runtime.stopChan = stopChan + runtime.doneChan = doneChan + runtime.started = true + runtime.mu.Unlock() + + go func(queue *starChanStack, stop <-chan struct{}, done chan struct{}) { defer close(done) - defer func() { - stackMu.Lock() - stackStarted = false - stackMu.Unlock() - }() for { select { case <-stop: return default: } - poped, err := stacks.Pop() + popped, err := queue.Pop() if err != nil { if errors.Is(err, io.EOF) { return } return } - val, ok := poped.(logTransfer) + val, ok := popped.(logTransfer) if !ok { continue } if val.handlerFunc != nil { - invokeAsyncHandlerSafely(val.handlerFunc, val.LogData) + runtime.invokeAsyncHandlerSafely(val.handlerFunc, val.LogData) } } - }(stopChan, doneChan) + }(queue, stopChan, doneChan) } -func StopStacks() { - stackMu.Lock() - if !stackStarted { - stackMu.Unlock() +func (runtime *asyncRuntime) Stop() { + if runtime == nil { return } - stopChan := stackStopChan - doneChan := stackDoneChan - current := stacks - stackStopChan = nil - stackDoneChan = nil - stackMu.Unlock() + runtime.mu.Lock() + if !runtime.started { + runtime.mu.Unlock() + return + } + stopChan := runtime.stopChan + doneChan := runtime.doneChan + queue := runtime.queue + runtime.queue = nil + runtime.stopChan = nil + runtime.doneChan = nil + runtime.started = false + runtime.mu.Unlock() if stopChan != nil { func() { @@ -454,56 +554,106 @@ func StopStacks() { close(stopChan) }() } - if current != nil { - _ = current.Close() + if queue != nil { + _ = queue.Close() } if doneChan != nil { <-doneChan } } -func Stop() { - StopStacks() +func (runtime *asyncRuntime) WaitDrain(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } + for { + queue, started := runtime.snapshot() + if !started || queue == nil || queue.Len() == 0 { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Millisecond): + } + } } -func SetAsyncErrorHandler(alert func(error, LogData)) { - stackAlertMu.Lock() - defer stackAlertMu.Unlock() - stackAlert = alert +func (runtime *asyncRuntime) Metrics() AsyncMetrics { + queue, started := runtime.snapshot() + snapshot := AsyncMetrics{ + Started: started, + Dropped: runtime.GetAsyncDropCount(), + FallbackToSync: runtime.GetAsyncFallbackToSync(), + HandlerTimeout: runtime.GetAsyncHandlerTimeout(), + } + if queue != nil { + snapshot.QueueLength = queue.Len() + snapshot.QueueCapacity = queue.Cap() + snapshot.QueueFree = queue.Free() + } + return snapshot } -func SetAsyncFallbackToSync(enable bool) { - if enable { - atomic.StoreUint32(&stackFallback, 1) +func (runtime *asyncRuntime) SetAsyncErrorHandler(alert func(error, LogData)) { + if runtime == nil { return } - atomic.StoreUint32(&stackFallback, 0) + runtime.asyncAlertMu.Lock() + defer runtime.asyncAlertMu.Unlock() + runtime.asyncAlert = alert } -func GetAsyncFallbackToSync() bool { - return atomic.LoadUint32(&stackFallback) == 1 +func (runtime *asyncRuntime) SetAsyncFallbackToSync(enable bool) { + if runtime == nil { + return + } + if enable { + atomic.StoreUint32(&runtime.fallbackSync, 1) + return + } + atomic.StoreUint32(&runtime.fallbackSync, 0) } -func SetAsyncHandlerTimeout(timeout time.Duration) { +func (runtime *asyncRuntime) GetAsyncFallbackToSync() bool { + if runtime == nil { + return true + } + return atomic.LoadUint32(&runtime.fallbackSync) == 1 +} + +func (runtime *asyncRuntime) SetAsyncHandlerTimeout(timeout time.Duration) { + if runtime == nil { + return + } if timeout < 0 { timeout = 0 } - atomic.StoreInt64(&stackTimeout, int64(timeout)) + atomic.StoreInt64(&runtime.timeout, int64(timeout)) } -func GetAsyncHandlerTimeout() time.Duration { - return time.Duration(atomic.LoadInt64(&stackTimeout)) +func (runtime *asyncRuntime) GetAsyncHandlerTimeout() time.Duration { + if runtime == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&runtime.timeout)) } -func GetAsyncDropCount() uint64 { - return atomic.LoadUint64(&stackDrop) +func (runtime *asyncRuntime) GetAsyncDropCount() uint64 { + if runtime == nil { + return 0 + } + return atomic.LoadUint64(&runtime.dropCount) } -func reportAsyncDrop(err error, data LogData) { - atomic.AddUint64(&stackDrop, 1) - stackAlertMu.RLock() - alert := stackAlert - stackAlertMu.RUnlock() +func (runtime *asyncRuntime) reportAsyncDrop(err error, data LogData) { + if runtime == nil { + return + } + atomic.AddUint64(&runtime.dropCount, 1) + runtime.asyncAlertMu.RLock() + alert := runtime.asyncAlert + runtime.asyncAlertMu.RUnlock() if alert != nil { func() { defer func() { @@ -514,31 +664,37 @@ func reportAsyncDrop(err error, data LogData) { } } -func invokeAsyncHandlerSafely(handler func(LogData), data LogData) bool { +func (runtime *asyncRuntime) invokeAsyncHandlerSafely(handler func(LogData), data LogData) bool { + if runtime == nil { + return invokeAsyncHandlerDirect(defaultAsyncRuntime(), handler, data) + } if handler == nil { return true } - timeout := GetAsyncHandlerTimeout() + timeout := runtime.GetAsyncHandlerTimeout() if timeout <= 0 { - return invokeAsyncHandlerDirect(handler, data) + return invokeAsyncHandlerDirect(runtime, handler, data) } done := make(chan bool, 1) go func() { - done <- invokeAsyncHandlerDirect(handler, data) + done <- invokeAsyncHandlerDirect(runtime, handler, data) }() select { case ok := <-done: return ok case <-time.After(timeout): - reportAsyncDrop(ErrAsyncHandlerTimeout, data) + runtime.reportAsyncDrop(ErrAsyncHandlerTimeout, data) return false } } -func invokeAsyncHandlerDirect(handler func(LogData), data LogData) (ok bool) { +func invokeAsyncHandlerDirect(runtime *asyncRuntime, handler func(LogData), data LogData) (ok bool) { defer func() { if panicErr := recover(); panicErr != nil { - reportAsyncDrop(fmt.Errorf("%w: %v", ErrAsyncHandlerPanic, panicErr), data) + if runtime == nil { + runtime = defaultAsyncRuntime() + } + runtime.reportAsyncDrop(fmt.Errorf("%w: %v", ErrAsyncHandlerPanic, panicErr), data) ok = false } }() @@ -546,24 +702,30 @@ func invokeAsyncHandlerDirect(handler func(LogData), data LogData) (ok bool) { return true } -func SetWriteErrorHandler(alert func(error, LogData)) { - writeErrMu.Lock() - defer writeErrMu.Unlock() - writeErrHandler = alert -} - -func GetWriteErrorCount() uint64 { - return atomic.LoadUint64(&writeErrCount) -} - -func reportWriteError(err error, data LogData) { - if err == nil { +func (runtime *asyncRuntime) SetWriteErrorHandler(alert func(error, LogData)) { + if runtime == nil { return } - atomic.AddUint64(&writeErrCount, 1) - writeErrMu.RLock() - alert := writeErrHandler - writeErrMu.RUnlock() + runtime.writeErrMu.Lock() + defer runtime.writeErrMu.Unlock() + runtime.writeErrHandler = alert +} + +func (runtime *asyncRuntime) GetWriteErrorCount() uint64 { + if runtime == nil { + return 0 + } + return atomic.LoadUint64(&runtime.writeErrCount) +} + +func (runtime *asyncRuntime) reportWriteError(err error, data LogData) { + if runtime == nil || err == nil { + return + } + atomic.AddUint64(&runtime.writeErrCount, 1) + runtime.writeErrMu.RLock() + alert := runtime.writeErrHandler + runtime.writeErrMu.RUnlock() if alert != nil { func() { defer func() { @@ -574,11 +736,75 @@ func reportWriteError(err error, data LogData) { } } -func resetAsyncMetricsForTest() { - atomic.StoreUint64(&stackDrop, 0) - SetAsyncErrorHandler(nil) - SetAsyncFallbackToSync(true) - SetAsyncHandlerTimeout(0) - atomic.StoreUint64(&writeErrCount, 0) - SetWriteErrorHandler(nil) +func (runtime *asyncRuntime) resetForTest() { + if runtime == nil { + return + } + runtime.Stop() + atomic.StoreUint64(&runtime.dropCount, 0) + runtime.SetAsyncErrorHandler(nil) + runtime.SetAsyncFallbackToSync(true) + runtime.SetAsyncHandlerTimeout(0) + atomic.StoreUint64(&runtime.writeErrCount, 0) + runtime.SetWriteErrorHandler(nil) +} + +func StartStacks() { + defaultAsyncRuntime().Start() +} + +func StopStacks() { + defaultAsyncRuntime().Stop() +} + +func Stop() { + StopStacks() +} + +func SetAsyncErrorHandler(alert func(error, LogData)) { + defaultAsyncRuntime().SetAsyncErrorHandler(alert) +} + +func SetAsyncFallbackToSync(enable bool) { + defaultAsyncRuntime().SetAsyncFallbackToSync(enable) +} + +func GetAsyncFallbackToSync() bool { + return defaultAsyncRuntime().GetAsyncFallbackToSync() +} + +func SetAsyncHandlerTimeout(timeout time.Duration) { + defaultAsyncRuntime().SetAsyncHandlerTimeout(timeout) +} + +func GetAsyncHandlerTimeout() time.Duration { + return defaultAsyncRuntime().GetAsyncHandlerTimeout() +} + +func GetAsyncDropCount() uint64 { + return defaultAsyncRuntime().GetAsyncDropCount() +} + +func reportAsyncDrop(err error, data LogData) { + defaultAsyncRuntime().reportAsyncDrop(err, data) +} + +func invokeAsyncHandlerSafely(handler func(LogData), data LogData) bool { + return defaultAsyncRuntime().invokeAsyncHandlerSafely(handler, data) +} + +func SetWriteErrorHandler(alert func(error, LogData)) { + defaultAsyncRuntime().SetWriteErrorHandler(alert) +} + +func GetWriteErrorCount() uint64 { + return defaultAsyncRuntime().GetWriteErrorCount() +} + +func reportWriteError(err error, data LogData) { + defaultAsyncRuntime().reportWriteError(err, data) +} + +func resetAsyncMetricsForTest() { + defaultAsyncRuntime().resetForTest() }