fix: 修正异步运行时隔离并更新使用与迁移说明

This commit is contained in:
兔子 2026-03-21 18:40:16 +08:00
parent 8023bfe328
commit 2d6bbcbc9b
Signed by: b612
GPG Key ID: 99DD2222B612B612
16 changed files with 635 additions and 319 deletions

27
async_runtime_test.go Normal file
View File

@ -0,0 +1,27 @@
package starlog
import "testing"
func resetLoggerAsyncRuntimeForTest(t *testing.T, logger *StarLogger) *asyncRuntime {
t.Helper()
if logger == nil {
t.Fatal("logger is nil")
}
runtime := logger.asyncRuntime()
runtime.resetForTest()
t.Cleanup(runtime.resetForTest)
return runtime
}
func setAsyncRuntimeStateForTest(runtime *asyncRuntime, queue *starChanStack, started bool) {
if runtime == nil {
return
}
runtime.Stop()
runtime.mu.Lock()
runtime.queue = queue
runtime.started = started
runtime.stopChan = nil
runtime.doneChan = nil
runtime.mu.Unlock()
}

47
core.go
View File

@ -578,7 +578,7 @@ func (logger *starlog) flushPendingWritesLocked(name string, colors []Attr) {
logger.pendingWrites = logger.pendingWrites[1:]
logger.signalPendingCondLocked()
if err := logger.writeDirect(logStr); err != nil {
reportWriteError(err, LogData{
logger.reportWriteError(err, LogData{
Name: name,
Log: logStr,
Colors: colors,
@ -622,13 +622,13 @@ func (logger *starlog) enqueuePendingWriteLocked(logStr string, data LogData) bo
switch logger.pendingDropPolicy {
case PendingDropNewest:
atomic.AddUint64(&logger.pendingDropCount, 1)
reportWriteError(ErrPendingWriteDropped, data)
logger.reportWriteError(ErrPendingWriteDropped, data)
return true
case PendingBlock:
atomic.AddUint64(&logger.pendingBlockCount, 1)
if logger.pendingCond == nil {
atomic.AddUint64(&logger.pendingDropCount, 1)
reportWriteError(ErrPendingWriteDropped, data)
logger.reportWriteError(ErrPendingWriteDropped, data)
return true
}
logger.pendingCond.Wait()
@ -638,7 +638,7 @@ func (logger *starlog) enqueuePendingWriteLocked(logStr string, data LogData) bo
dropData.Log = logger.pendingWrites[0]
logger.pendingWrites = logger.pendingWrites[1:]
logger.signalPendingCondLocked()
reportWriteError(ErrPendingWriteDropped, dropData)
logger.reportWriteError(ErrPendingWriteDropped, dropData)
}
}
if !logger.switching {
@ -657,14 +657,15 @@ func (logger *starlog) invokeEntryHandler(handler Handler, timeout time.Duration
if handlerCtx == nil {
handlerCtx = context.Background()
}
handlerCtx = withAsyncRuntime(handlerCtx, logger.asyncRuntime())
run := func() {
defer func() {
if panicErr := recover(); panicErr != nil {
reportAsyncDrop(fmt.Errorf("%w: %v", ErrAsyncHandlerPanic, panicErr), data)
logger.reportAsyncDrop(fmt.Errorf("%w: %v", ErrAsyncHandlerPanic, panicErr), data)
}
}()
if err := handler.Handle(handlerCtx, entry); err != nil {
reportWriteError(err, data)
logger.reportWriteError(err, data)
}
}
if timeout <= 0 {
@ -679,27 +680,29 @@ func (logger *starlog) invokeEntryHandler(handler Handler, timeout time.Duration
select {
case <-done:
case <-time.After(timeout):
reportAsyncDrop(ErrAsyncHandlerTimeout, data)
logger.reportAsyncDrop(ErrAsyncHandlerTimeout, data)
}
}
func (logger *starlog) enqueueAsyncTransfer(transfer logTransfer, fallbackSync bool) {
StartStacks()
if stacks == nil {
reportAsyncDrop(io.ErrClosedPipe, transfer.LogData)
if fallbackSync && GetAsyncFallbackToSync() {
invokeAsyncHandlerSafely(transfer.handlerFunc, transfer.LogData)
runtime := logger.asyncRuntime()
runtime.Start()
queue, _ := runtime.snapshot()
if queue == nil {
logger.reportAsyncDrop(io.ErrClosedPipe, transfer.LogData)
if fallbackSync && runtime.GetAsyncFallbackToSync() {
runtime.invokeAsyncHandlerSafely(transfer.handlerFunc, transfer.LogData)
}
return
}
if err := stacks.TryPush(transfer); err != nil {
if err := queue.TryPush(transfer); err != nil {
if errors.Is(err, errStackFull) {
reportAsyncDrop(ErrAsyncQueueFull, transfer.LogData)
logger.reportAsyncDrop(ErrAsyncQueueFull, transfer.LogData)
} else {
reportAsyncDrop(err, transfer.LogData)
logger.reportAsyncDrop(err, transfer.LogData)
}
if fallbackSync && GetAsyncFallbackToSync() {
invokeAsyncHandlerSafely(transfer.handlerFunc, transfer.LogData)
if fallbackSync && runtime.GetAsyncFallbackToSync() {
runtime.invokeAsyncHandlerSafely(transfer.handlerFunc, transfer.LogData)
}
}
}
@ -795,21 +798,21 @@ func (logger *starlog) build(thread string, isStd bool, isShow bool, handler fun
if level < snapshot.errOutputLevel {
if snapshot.showColor {
if _, err := fmt.Fprint(stdScreen, displayLine); err != nil {
reportWriteError(err, logData)
logger.reportWriteError(err, logData)
}
} else {
if _, err := fmt.Fprint(os.Stdout, displayLine); err != nil {
reportWriteError(err, logData)
logger.reportWriteError(err, logData)
}
}
} else {
if snapshot.showColor {
if _, err := fmt.Fprint(errScreen, displayLine); err != nil {
reportWriteError(err, logData)
logger.reportWriteError(err, logData)
}
} else {
if _, err := fmt.Fprint(os.Stderr, displayLine); err != nil {
reportWriteError(err, logData)
logger.reportWriteError(err, logData)
}
}
}
@ -858,7 +861,7 @@ func (logger *starlog) writeWithData(logStr string, data LogData) {
}
logger.flushPendingWritesLocked(data.Name, data.Colors)
if err := logger.writeDirect(logStr); err != nil {
reportWriteError(err, data)
logger.reportWriteError(err, data)
}
}

View File

@ -83,9 +83,17 @@ defer starlog.Shutdown(ctx)
说明:
- `Shutdown(ctx)` 会等待异步 handler drain并统一关闭资源。
- `Shutdown(ctx)` 会等待当前 logger 的异步 handler drain并统一关闭资源。
- `Close()` 仍可用,但不等待异步 handler 队列。
如果项目里主要使用自定义 logger推荐直接写成
```go
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
defer log.Shutdown(ctx)
```
## 常见旧写法到推荐写法
1. 配置项逐条 setter
@ -106,6 +114,9 @@ defer starlog.Shutdown(ctx)
6. 旧 `StartArchive` 轮转启动
推荐:`StartRotatePolicy` / `StartManagedRotatePolicy`,模板场景可直接用 `StartRotateByTime/BySize/ByTimeSize`
7. 包级异步观测/回调入口
推荐:自定义 logger 优先使用实例方法,例如 `log.SetAsyncErrorHandler(...)``log.SetWriteErrorHandler(...)``log.GetAsyncDropCount()``log.GetWriteErrorCount()``log.Shutdown(ctx)`
## 旧 API 仍可用(重要)
以下旧/兼容路径当前仍可用:
@ -114,6 +125,7 @@ defer starlog.Shutdown(ctx)
- 历史错拼兼容名仍可用(如 `EnbaleWrite``IsWriteStoed``HookBeforArchive`
- `Archive` 旧模型仍可用(建议新代码优先 `RotatePolicy`
- 旧生命周期入口仍可用(如 `CloseStd``Close`),但建议新代码优先 `Shutdown(ctx)` / `CloseLogFile`
- 包级异步/错误统计入口仍可用,但默认绑定全局 `Std` logger自定义 logger 建议改用实例方法
## 升级后验证建议
@ -123,7 +135,7 @@ defer starlog.Shutdown(ctx)
4. 重点验证:
- 级别过滤是否符合预期
- 归档文件数量/保留策略
- 异步丢弃计数和写入错误计数
- 异步丢弃计数和写入错误计数是否落在预期 logger 上
## 何时需要进一步迁移
@ -135,5 +147,5 @@ defer starlog.Shutdown(ctx)
---
如果你希望,我可以基于你当前项目的实际日志初始化代码,给出一份“一次改完可落地”的迁移 patch 方案
自定义 logger 较多的项目,建议优先检查包级异步观测入口是否仍在使用,并逐步替换为实例方法

View File

@ -421,9 +421,17 @@ log.SetPendingWriteLimit(1024)
log.SetPendingDropPolicy(starlog.PendingDropOldest)
```
说明:
- 每个 `StarLogger` 都有独立的异步运行时。
- `SetHandler``SetEntryHandler``SetAsyncErrorHandler``SetWriteErrorHandler``GetAsyncDropCount``GetWriteErrorCount``GetAsyncMetrics``Shutdown(ctx)` 都作用于当前 logger。
- 包级入口(如 `starlog.SetAsyncErrorHandler``starlog.GetAsyncDropCount``starlog.Shutdown(ctx)`)保留兼容,默认作用于全局 `Std` logger。
可观测项:
- `GetAsyncMetrics()`
- `GetAsyncDropCount()`
- `GetWriteErrorCount()`
- `GetPendingStats()`
## 11. 高频防爆(去重 / 采样 / 限流)
@ -477,6 +485,8 @@ _ = snapshot
常用统计:
- `GetMetricsSnapshot()`
- `GetAsyncMetrics()`
- `GetPendingStats()`
- `GetSamplingStats()`
- `GetDedupStats()`
@ -491,7 +501,7 @@ _ = snapshot
- `Flush()`:刷写 pending
- `Sync()``Flush + 底层 Sync()`(若支持)
- `Close()`:关闭资源,不等待异步队列 drain
- `Shutdown(ctx)`:等待异步 drain再关闭资源
- `Shutdown(ctx)`:等待当前 logger 的异步 drain再关闭资源
推荐:
@ -501,7 +511,11 @@ defer cancel()
defer log.Shutdown(ctx)
```
全局 logger 可使用:`starlog.Shutdown(ctx)`
说明:
- 自定义 logger 优先使用 `log.Shutdown(ctx)`
- 全局 logger 可使用 `starlog.Shutdown(ctx)`
- `WaitAsyncDrain(ctx)` 作为兼容入口保留,默认等待全局 `Std` logger 的异步队列;自定义 logger 退出时直接使用 `log.Shutdown(ctx)` 更明确。
## 15. 标准库桥接
@ -678,6 +692,7 @@ func main() {
- 显示回调链路:可能因超时或队列满发生丢弃,可通过计数观测
- writer/sink 主写链路:只要未被过滤(级别/去重/采样/限流)且写入正常,会落盘
- 多个自定义 logger 之间的异步丢弃计数、写入错误计数和关闭流程彼此独立
### Q3新项目应该选 `Archive` 还是 `RotatePolicy`

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"time"
)
type sinkSyncer interface {
@ -15,6 +14,10 @@ type writerSyncer interface {
Sync() error
}
type contextualCloser interface {
CloseWithContext(context.Context) error
}
func mergeLifecycleError(current error, next error) error {
if next == nil {
return current
@ -26,23 +29,7 @@ func mergeLifecycleError(current error, next error) error {
}
func WaitAsyncDrain(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
for {
stackMu.Lock()
current := stacks
started := stackStarted
stackMu.Unlock()
if !started || current == nil || current.Len() == 0 {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Millisecond):
}
}
return defaultAsyncRuntime().WaitDrain(ctx)
}
func (logger *StarLogger) Flush() error {
@ -112,7 +99,9 @@ func (logger *StarLogger) Close() error {
}
}
if entryHandler != nil {
if closer, ok := entryHandler.(interface{ Close() error }); ok {
if closer, ok := entryHandler.(contextualCloser); ok {
err = mergeLifecycleError(err, closer.CloseWithContext(withAsyncRuntime(context.Background(), logger.asyncRuntime())))
} else if closer, ok := entryHandler.(interface{ Close() error }); ok {
err = mergeLifecycleError(err, closer.Close())
}
}
@ -126,8 +115,8 @@ func (logger *StarLogger) Shutdown(ctx context.Context) error {
}
var err error
err = mergeLifecycleError(err, logger.Flush())
err = mergeLifecycleError(err, WaitAsyncDrain(ctx))
StopStacks()
err = mergeLifecycleError(err, logger.asyncRuntime().WaitDrain(ctx))
logger.asyncRuntime().Stop()
err = mergeLifecycleError(err, logger.Close())
return err
}

View File

@ -91,25 +91,13 @@ func TestCloseClosesSinkAndStopsWrite(t *testing.T) {
}
func TestWaitAsyncDrainContextTimeout(t *testing.T) {
stackMu.Lock()
stackStarted = true
stacks = newStarChanStack(1)
stackStopChan = nil
stackDoneChan = nil
stackMu.Unlock()
defer func() {
stackMu.Lock()
if stacks != nil {
_ = stacks.Close()
}
stackStarted = false
stacks = nil
stackStopChan = nil
stackDoneChan = nil
stackMu.Unlock()
}()
runtime := defaultAsyncRuntime()
runtime.resetForTest()
t.Cleanup(runtime.resetForTest)
if err := stacks.Push("x"); err != nil {
queue := newStarChanStack(1)
setAsyncRuntimeStateForTest(runtime, queue, true)
if err := queue.Push("x"); err != nil {
t.Fatalf("prepare queue failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
@ -121,13 +109,8 @@ func TestWaitAsyncDrainContextTimeout(t *testing.T) {
}
func TestShutdownStopsAsyncStacks(t *testing.T) {
resetAsyncMetricsForTest()
defer func() {
resetAsyncMetricsForTest()
StopStacks()
}()
logger := NewStarlog(nil)
runtime := resetLoggerAsyncRuntimeForTest(t, logger)
logger.SetShowStd(false)
handled := make(chan struct{}, 1)
logger.SetHandler(func(LogData) {
@ -149,11 +132,36 @@ func TestShutdownStopsAsyncStacks(t *testing.T) {
t.Fatalf("async handler should complete before shutdown")
}
stackMu.Lock()
started := stackStarted
stackMu.Unlock()
if started {
t.Fatalf("Shutdown should stop async stacks")
if runtime.Metrics().Started {
t.Fatalf("Shutdown should stop logger async runtime")
}
}
func TestShutdownDoesNotStopOtherLoggerAsyncRuntime(t *testing.T) {
loggerA := NewStarlog(nil)
runtimeA := resetLoggerAsyncRuntimeForTest(t, loggerA)
loggerA.SetShowStd(false)
loggerA.SetHandler(func(LogData) {})
loggerB := NewStarlog(nil)
runtimeB := resetLoggerAsyncRuntimeForTest(t, loggerB)
loggerB.SetShowStd(false)
loggerB.SetHandler(func(LogData) {})
if !runtimeA.Metrics().Started || !runtimeB.Metrics().Started {
t.Fatalf("both logger runtimes should be started")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := loggerA.Shutdown(ctx); err != nil {
t.Fatalf("Shutdown failed: %v", err)
}
if runtimeA.Metrics().Started {
t.Fatalf("Shutdown should stop only the target logger runtime")
}
if !runtimeB.Metrics().Started {
t.Fatalf("Shutdown should not stop another logger runtime")
}
}

View File

@ -35,42 +35,29 @@ type MetricsSnapshot struct {
}
func GetAsyncMetrics() AsyncMetrics {
stackMu.Lock()
started := stackStarted
current := stacks
stackMu.Unlock()
snapshot := AsyncMetrics{
Started: started,
Dropped: GetAsyncDropCount(),
FallbackToSync: GetAsyncFallbackToSync(),
HandlerTimeout: GetAsyncHandlerTimeout(),
}
if current != nil {
snapshot.QueueLength = current.Len()
snapshot.QueueCapacity = current.Cap()
snapshot.QueueFree = current.Free()
}
return snapshot
return defaultAsyncRuntime().Metrics()
}
func (logger *StarLogger) GetMetricsSnapshot() MetricsSnapshot {
snapshot := MetricsSnapshot{
Time: time.Now(),
Async: GetAsyncMetrics(),
Errors: ErrorMetrics{
WriteErrors: GetWriteErrorCount(),
WriteErrors: 0,
},
}
if logger == nil || logger.logcore == nil {
snapshot.Async = GetAsyncMetrics()
snapshot.Errors.WriteErrors = GetWriteErrorCount()
return snapshot
}
snapshot.Async = logger.GetAsyncMetrics()
snapshot.Pending = logger.GetPendingStats()
snapshot.Sampling = logger.GetSamplingStats()
snapshot.Dedup = logger.GetDedupStats()
snapshot.RateLimit = logger.GetRateLimitStats()
snapshot.Errors.RedactionErrors = logger.GetRedactErrorCount()
snapshot.Errors.WriteErrors = logger.GetWriteErrorCount()
snapshot.ArchiveRunning = IsArchiveRun(logger)
logger.logcore.mu.Lock()
@ -90,6 +77,8 @@ func (logger *StarLogger) GetMetricsSnapshot() MetricsSnapshot {
}
func (logger *StarLogger) GetAsyncMetrics() AsyncMetrics {
_ = logger
if logger == nil {
return GetAsyncMetrics()
}
return logger.asyncRuntime().Metrics()
}

View File

@ -1,6 +1,7 @@
package starlog
import (
"bytes"
"sync/atomic"
"testing"
"time"
@ -60,8 +61,8 @@ func TestMetricsSnapshotIncludesPendingAndMultiSink(t *testing.T) {
if snapshot.Pending.Policy != PendingDropOldest {
t.Fatalf("snapshot should include pending policy")
}
if snapshot.Errors.WriteErrors != GetWriteErrorCount() {
t.Fatalf("snapshot write error count should match global counter")
if snapshot.Errors.WriteErrors != logger.GetWriteErrorCount() {
t.Fatalf("snapshot write error count should match logger counter")
}
if !snapshot.Sampling.Enabled || snapshot.Sampling.Rate != 0.5 {
t.Fatalf("snapshot should include sampling stats, got %+v", snapshot.Sampling)
@ -97,3 +98,27 @@ func TestMetricsSnapshotNilLogger(t *testing.T) {
t.Fatalf("snapshot should contain timestamp")
}
}
func TestMetricsSnapshotUsesLoggerRuntimeCounters(t *testing.T) {
loggerA := NewStarlog(&errWriter{})
resetLoggerAsyncRuntimeForTest(t, loggerA)
loggerA.SetShowStd(false)
loggerA.SetShowColor(false)
loggerA.SetShowOriginFile(false)
loggerA.SetShowFuncName(false)
loggerA.SetShowFlag(false)
loggerB := newStructuredTestLogger(&bytes.Buffer{})
resetLoggerAsyncRuntimeForTest(t, loggerB)
loggerA.Infoln("write error check")
snapshotA := loggerA.GetMetricsSnapshot()
snapshotB := loggerB.GetMetricsSnapshot()
if snapshotA.Errors.WriteErrors == 0 {
t.Fatalf("snapshot should include logger write errors")
}
if snapshotB.Errors.WriteErrors != 0 {
t.Fatalf("other logger snapshot should not inherit write errors, got %d", snapshotB.Errors.WriteErrors)
}
}

View File

@ -38,9 +38,9 @@ func waitObserverCondition(t *testing.T, timeout time.Duration, cond func() bool
}
func TestObserverCollectsEntries(t *testing.T) {
defer StopStacks()
var buf bytes.Buffer
logger := newStructuredTestLogger(&buf)
defer logger.asyncRuntime().Stop()
observer := NewObserver()
logger.AppendEntryHandler(observer)
@ -61,9 +61,9 @@ func TestObserverCollectsEntries(t *testing.T) {
}
func TestObserverLimitAndDropped(t *testing.T) {
defer StopStacks()
observer := NewObserverWithLimit(2)
logger := newStructuredTestLogger(&bytes.Buffer{})
defer logger.asyncRuntime().Stop()
logger.AppendEntryHandler(observer)
logger.Info("one")
@ -93,9 +93,9 @@ func TestObserverLimitAndDropped(t *testing.T) {
}
func TestObserverTakeAllAndReset(t *testing.T) {
defer StopStacks()
observer := NewObserver()
logger := newStructuredTestLogger(&bytes.Buffer{})
defer logger.asyncRuntime().Stop()
logger.AppendEntryHandler(observer)
logger.Info("a")
@ -119,9 +119,9 @@ func TestObserverTakeAllAndReset(t *testing.T) {
}
func TestTestHookAttachAndRestore(t *testing.T) {
defer StopStacks()
var buf bytes.Buffer
logger := newStructuredTestLogger(&buf)
defer logger.asyncRuntime().Stop()
var previousCount uint64
logger.SetEntryHandler(HandlerFunc(func(_ context.Context, _ *Entry) error {
atomic.AddUint64(&previousCount, 1)
@ -163,8 +163,8 @@ func TestTestHookAttachAndRestore(t *testing.T) {
}
func TestTestHookCloseWhenHandlerReplaced(t *testing.T) {
defer StopStacks()
logger := newStructuredTestLogger(&bytes.Buffer{})
defer logger.asyncRuntime().Stop()
hook := NewTestHook(logger)
logger.SetEntryHandler(nil)

View File

@ -33,21 +33,8 @@ func TestWriteBufferFlushAfterSwitchingOff(t *testing.T) {
}
func TestAsyncQueuePushFailureFallsBackToSyncHandler(t *testing.T) {
resetAsyncMetricsForTest()
defer func() {
resetAsyncMetricsForTest()
stackMu.Lock()
if stacks != nil {
_ = stacks.Close()
}
stackStarted = false
stacks = nil
stackStopChan = nil
stackDoneChan = nil
stackMu.Unlock()
}()
logger := NewStarlog(nil)
runtime := resetLoggerAsyncRuntimeForTest(t, logger)
logger.SetShowStd(false)
var handled uint64
logger.handlerFunc = func(data LogData) {
@ -55,26 +42,21 @@ func TestAsyncQueuePushFailureFallsBackToSyncHandler(t *testing.T) {
}
alertCalled := make(chan struct{}, 1)
SetAsyncErrorHandler(func(err error, data LogData) {
logger.SetAsyncErrorHandler(func(err error, data LogData) {
select {
case alertCalled <- struct{}{}:
default:
}
})
stackMu.Lock()
stackStarted = true
stacks = nil
stackStopChan = nil
stackDoneChan = nil
stackMu.Unlock()
setAsyncRuntimeStateForTest(runtime, nil, true)
logger.Infoln("trigger async fallback")
if atomic.LoadUint64(&handled) != 1 {
t.Fatalf("handler should be called once via sync fallback, got: %d", handled)
}
if GetAsyncDropCount() == 0 {
if logger.GetAsyncDropCount() == 0 {
t.Fatalf("async drop counter should increase on push failure")
}
select {
@ -221,15 +203,13 @@ func (w *errWriter) Write(p []byte) (int, error) {
}
func TestWriteErrorObservable(t *testing.T) {
resetAsyncMetricsForTest()
defer resetAsyncMetricsForTest()
logger := NewStarlog(&errWriter{})
resetLoggerAsyncRuntimeForTest(t, logger)
logger.SetShowStd(false)
logger.SetShowColor(false)
observed := make(chan struct{}, 1)
SetWriteErrorHandler(func(err error, data LogData) {
logger.SetWriteErrorHandler(func(err error, data LogData) {
if err != nil {
select {
case observed <- struct{}{}:
@ -240,7 +220,7 @@ func TestWriteErrorObservable(t *testing.T) {
logger.Infoln("write error check")
if GetWriteErrorCount() == 0 {
if logger.GetWriteErrorCount() == 0 {
t.Fatalf("write error count should increase")
}
select {
@ -251,13 +231,8 @@ func TestWriteErrorObservable(t *testing.T) {
}
func TestAsyncHandlerPanicDoesNotCrash(t *testing.T) {
resetAsyncMetricsForTest()
defer func() {
resetAsyncMetricsForTest()
StopStacks()
}()
logger := NewStarlog(nil)
resetLoggerAsyncRuntimeForTest(t, logger)
logger.SetShowStd(false)
logger.SetHandler(func(LogData) {
panic("boom")
@ -266,19 +241,14 @@ func TestAsyncHandlerPanicDoesNotCrash(t *testing.T) {
logger.Infoln("panic safe")
time.Sleep(20 * time.Millisecond)
if GetAsyncDropCount() == 0 {
if logger.GetAsyncDropCount() == 0 {
t.Fatalf("panic in async handler should be reported as drop")
}
}
func TestEntryHandlerTimeoutFallback(t *testing.T) {
resetAsyncMetricsForTest()
defer func() {
resetAsyncMetricsForTest()
StopStacks()
}()
logger := NewStarlog(nil)
resetLoggerAsyncRuntimeForTest(t, logger)
logger.SetShowStd(false)
logger.SetEntryHandler(HandlerFunc(func(context.Context, *Entry) error {
time.Sleep(80 * time.Millisecond)
@ -295,7 +265,7 @@ func TestEntryHandlerTimeoutFallback(t *testing.T) {
}
deadline := time.Now().Add(300 * time.Millisecond)
for time.Now().Before(deadline) {
if GetAsyncDropCount() > 0 {
if logger.GetAsyncDropCount() > 0 {
return
}
time.Sleep(5 * time.Millisecond)
@ -304,20 +274,9 @@ func TestEntryHandlerTimeoutFallback(t *testing.T) {
}
func TestEntryHandlerQueueFullNoFallbackDoesNotBlock(t *testing.T) {
resetAsyncMetricsForTest()
defer func() {
resetAsyncMetricsForTest()
stackMu.Lock()
stackStarted = false
stacks = nil
stackStopChan = nil
stackDoneChan = nil
stackMu.Unlock()
}()
SetAsyncFallbackToSync(false)
logger := NewStarlog(nil)
runtime := resetLoggerAsyncRuntimeForTest(t, logger)
logger.SetAsyncFallbackToSync(false)
logger.SetShowStd(false)
var handled uint64
logger.SetEntryHandler(HandlerFunc(func(context.Context, *Entry) error {
@ -326,14 +285,10 @@ func TestEntryHandlerQueueFullNoFallbackDoesNotBlock(t *testing.T) {
return nil
}))
stackMu.Lock()
stackStarted = true
stacks = newStarChanStack(1)
stackStopChan = nil
stackDoneChan = nil
stackMu.Unlock()
queue := newStarChanStack(1)
setAsyncRuntimeStateForTest(runtime, queue, true)
if err := stacks.Push(logTransfer{
if err := queue.Push(logTransfer{
handlerFunc: func(LogData) {
time.Sleep(80 * time.Millisecond)
},
@ -351,7 +306,7 @@ func TestEntryHandlerQueueFullNoFallbackDoesNotBlock(t *testing.T) {
if atomic.LoadUint64(&handled) != 0 {
t.Fatalf("entry handler should be dropped when queue is full and fallback disabled, got %d", handled)
}
if GetAsyncDropCount() == 0 {
if logger.GetAsyncDropCount() == 0 {
t.Fatalf("queue-full drop should be observable")
}
}

View File

@ -65,7 +65,7 @@ func (logger *starlog) applyRedaction(snapshot *starlog, entry *Entry) bool {
}
func (logger *starlog) handleRedactionFailure(snapshot *starlog, entry *Entry, err error) bool {
reportWriteError(fmt.Errorf("%w: %v", ErrRedactionFailed, err), LogData{
logger.reportWriteError(fmt.Errorf("%w: %v", ErrRedactionFailed, err), LogData{
Name: snapshot.name,
Log: entry.Message,
})

View File

@ -146,17 +146,15 @@ func TestSetRedactMaskToken(t *testing.T) {
}
func TestRedactionFailureReportsWriteError(t *testing.T) {
resetAsyncMetricsForTest()
defer resetAsyncMetricsForTest()
var buf bytes.Buffer
logger := newStructuredTestLogger(&buf)
resetLoggerAsyncRuntimeForTest(t, logger)
logger.SetRedactor(RedactorFunc(func(context.Context, *Entry) error {
return errors.New("redactor failed")
}))
observed := make(chan error, 1)
SetWriteErrorHandler(func(err error, data LogData) {
logger.SetWriteErrorHandler(func(err error, data LogData) {
if err == nil {
return
}
@ -167,7 +165,7 @@ func TestRedactionFailureReportsWriteError(t *testing.T) {
})
logger.Info("check redaction error report")
if GetWriteErrorCount() == 0 {
if logger.GetWriteErrorCount() == 0 {
t.Fatalf("write error count should increase when redaction fails")
}
select {

View File

@ -87,7 +87,7 @@ func (handler *RouteHandler) Handle(ctx context.Context, entry *Entry) error {
formatted, err := formatter.Format(entry)
if err != nil {
wrapErr := fmt.Errorf("route %s format failed: %w", route.name, err)
reportWriteError(wrapErr, LogData{
reportWriteErrorWithContext(ctx, wrapErr, LogData{
Name: route.name,
Log: entry.Message,
})
@ -101,7 +101,7 @@ func (handler *RouteHandler) Handle(ctx context.Context, entry *Entry) error {
}
if err = route.sink.Write(formatted); err != nil {
wrapErr := fmt.Errorf("route %s write failed: %w", route.name, err)
reportWriteError(wrapErr, LogData{
reportWriteErrorWithContext(ctx, wrapErr, LogData{
Name: route.name,
Log: string(formatted),
})
@ -115,6 +115,10 @@ func (handler *RouteHandler) Handle(ctx context.Context, entry *Entry) error {
}
func (handler *RouteHandler) Close() error {
return handler.CloseWithContext(nil)
}
func (handler *RouteHandler) CloseWithContext(ctx context.Context) error {
if handler == nil {
return nil
}
@ -137,7 +141,7 @@ func (handler *RouteHandler) Close() error {
}
if err := route.sink.Close(); err != nil {
wrapErr := fmt.Errorf("route %s close failed: %w", route.name, err)
reportWriteError(wrapErr, LogData{
reportWriteErrorWithContext(ctx, wrapErr, LogData{
Name: route.name,
})
if firstErr == nil {
@ -198,6 +202,10 @@ func (handler *chainedHandler) Handle(ctx context.Context, entry *Entry) error {
}
func (handler *chainedHandler) Close() error {
return handler.CloseWithContext(nil)
}
func (handler *chainedHandler) CloseWithContext(ctx context.Context) error {
if handler == nil {
return nil
}
@ -206,14 +214,18 @@ func (handler *chainedHandler) Close() error {
if item == nil {
continue
}
closer, ok := item.(interface{ Close() error })
if !ok {
if closer, ok := item.(contextualCloser); ok {
if err := closer.CloseWithContext(ctx); err != nil && firstErr == nil {
firstErr = err
}
continue
}
if closer, ok := item.(interface{ Close() error }); ok {
if err := closer.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
}
return firstErr
}

View File

@ -47,6 +47,17 @@ func (sink *closeCountSink) Close() error {
return nil
}
type closeFailSink struct{}
func (sink *closeFailSink) Write(data []byte) error {
_ = data
return nil
}
func (sink *closeFailSink) Close() error {
return errors.New("route sink close failed")
}
type safeBuffer struct {
mu sync.Mutex
buf bytes.Buffer
@ -206,11 +217,9 @@ func TestChainHandlerRunsAll(t *testing.T) {
}
func TestRouteHandlerWriteErrorObservable(t *testing.T) {
resetAsyncMetricsForTest()
defer resetAsyncMetricsForTest()
var totalBuf safeBuffer
logger := newStructuredTestLogger(&totalBuf)
resetLoggerAsyncRuntimeForTest(t, logger)
logger.SetEntryHandler(NewRouteHandler(
Route{
Name: "failed-route",
@ -222,10 +231,50 @@ func TestRouteHandlerWriteErrorObservable(t *testing.T) {
logger.Info("route write error")
waitFor(t, 300*time.Millisecond, func() bool {
return GetWriteErrorCount() > 0
return logger.GetWriteErrorCount() > 0
}, "route sink write error observable")
}
func TestRouteHandlerCloseErrorUsesLoggerRuntime(t *testing.T) {
logger := newStructuredTestLogger(&bytes.Buffer{})
resetLoggerAsyncRuntimeForTest(t, logger)
observed := make(chan error, 1)
logger.SetWriteErrorHandler(func(err error, data LogData) {
if err == nil {
return
}
select {
case observed <- err:
default:
}
})
logger.SetEntryHandler(NewRouteHandler(
Route{
Name: "failed-close",
Match: MatchAllLevels(),
Formatter: &messageOnlyFormatter{},
Sink: &closeFailSink{},
},
))
err := logger.Close()
if err == nil {
t.Fatalf("logger close should return route close error")
}
if logger.GetWriteErrorCount() == 0 {
t.Fatalf("route close error should increase logger write error count")
}
select {
case observedErr := <-observed:
if !strings.Contains(observedErr.Error(), "route sink close failed") {
t.Fatalf("unexpected observed error: %v", observedErr)
}
default:
t.Fatalf("route close error should invoke logger write error handler")
}
}
func TestRouteHandlerCloseDeduplicatesSameSink(t *testing.T) {
sink := &closeCountSink{}
handler := NewRouteHandler(
@ -284,23 +333,31 @@ func TestRouteHandlerWithRotatingFileSink(t *testing.T) {
logger.Infof("debug payload %02d %s", idx, strings.Repeat("x", 24))
time.Sleep(3 * time.Millisecond)
}
waitFor(t, 2*time.Second, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err = logger.Shutdown(ctx); err != nil {
t.Fatalf("logger shutdown failed: %v", err)
}
paths := []string{debugPath}
matches, _ := filepath.Glob(filepath.Join(filepath.Dir(debugPath), "debug.*.log"))
return len(matches) > 0
}, "rotating route sink archive creation")
content, readErr := ioutil.ReadFile(debugPath)
paths = append(paths, matches...)
found := false
for _, path := range paths {
content, readErr := ioutil.ReadFile(path)
if readErr != nil {
t.Fatalf("read debug current log failed: %v", readErr)
t.Fatalf("read routed log %s failed: %v", path, readErr)
}
if len(content) == 0 {
t.Fatalf("debug log should contain routed logs")
if strings.Contains(string(content), "debug payload") {
found = true
break
}
if err = logger.Close(); err != nil {
t.Fatalf("logger close failed: %v", err)
}
if !found {
t.Fatalf("routed logs should exist in current or archived debug files")
}
if err = debugSink.Write([]byte("after close")); !errors.Is(err, ErrRotatingFileSinkClosed) {
t.Fatalf("rotating sink should be closed by logger close, got %v", err)
t.Fatalf("rotating sink should be closed by logger shutdown, got %v", err)
}
}

View File

@ -107,7 +107,7 @@ func (logger *starlog) applyPendingLimitLocked(limit int) {
case PendingDropNewest:
keepEnd := len(logger.pendingWrites) - dropped
for idx := keepEnd; idx < len(logger.pendingWrites); idx++ {
reportWriteError(ErrPendingWriteDropped, LogData{
logger.reportWriteError(ErrPendingWriteDropped, LogData{
Name: logger.name,
Log: logger.pendingWrites[idx],
})
@ -115,7 +115,7 @@ func (logger *starlog) applyPendingLimitLocked(limit int) {
logger.pendingWrites = logger.pendingWrites[:keepEnd]
default:
for idx := 0; idx < dropped; idx++ {
reportWriteError(ErrPendingWriteDropped, LogData{
logger.reportWriteError(ErrPendingWriteDropped, LogData{
Name: logger.name,
Log: logger.pendingWrites[idx],
})
@ -464,7 +464,7 @@ func (logger *StarLogger) AppendEntryHandler(handler Handler) {
func (logger *StarLogger) SetHandler(f func(LogData)) {
if f != nil {
StartStacks()
logger.asyncRuntime().Start()
}
logger.logcore.mu.Lock()
defer logger.logcore.mu.Unlock()
@ -487,35 +487,35 @@ func (logger *StarLogger) SetSwitching(sw bool) {
}
func (logger *StarLogger) SetAsyncErrorHandler(alert func(error, LogData)) {
SetAsyncErrorHandler(alert)
logger.asyncRuntime().SetAsyncErrorHandler(alert)
}
func (logger *StarLogger) GetAsyncDropCount() uint64 {
return GetAsyncDropCount()
return logger.asyncRuntime().GetAsyncDropCount()
}
func (logger *StarLogger) SetAsyncFallbackToSync(enable bool) {
SetAsyncFallbackToSync(enable)
logger.asyncRuntime().SetAsyncFallbackToSync(enable)
}
func (logger *StarLogger) GetAsyncFallbackToSync() bool {
return GetAsyncFallbackToSync()
return logger.asyncRuntime().GetAsyncFallbackToSync()
}
func (logger *StarLogger) SetAsyncHandlerTimeout(timeout time.Duration) {
SetAsyncHandlerTimeout(timeout)
logger.asyncRuntime().SetAsyncHandlerTimeout(timeout)
}
func (logger *StarLogger) GetAsyncHandlerTimeout() time.Duration {
return GetAsyncHandlerTimeout()
return logger.asyncRuntime().GetAsyncHandlerTimeout()
}
func (logger *StarLogger) SetWriteErrorHandler(alert func(error, LogData)) {
SetWriteErrorHandler(alert)
logger.asyncRuntime().SetWriteErrorHandler(alert)
}
func (logger *StarLogger) GetWriteErrorCount() uint64 {
return GetWriteErrorCount()
return logger.asyncRuntime().GetWriteErrorCount()
}
func (logger *StarLogger) SetOnlyColorLevel(ocl bool) {

424
typed.go
View File

@ -59,6 +59,8 @@ const (
FieldTypeOther = "other"
)
const defaultAsyncQueueCapacity uint64 = 1024
var (
ErrAsyncHandlerPanic = errors.New("async handler panic")
ErrAsyncHandlerTimeout = errors.New("async handler timeout")
@ -90,27 +92,41 @@ var (
"panic": LvPanic,
"fatal": LvFatal,
}
stacks *starChanStack
stackStarted bool = false
stackStopChan chan struct{}
stackDoneChan chan struct{}
stackMu sync.Mutex
stackDrop uint64
stackAlert func(error, LogData)
stackAlertMu sync.RWMutex
stackFallback uint32 = 1
stackTimeout int64
stdScreen io.Writer = colorable.NewColorableStdout()
errScreen io.Writer = colorable.NewColorableStderr()
defaultAsyncRuntimeOnce sync.Once
defaultAsyncRuntimeFallback *asyncRuntime
)
type asyncRuntime struct {
mu sync.Mutex
queue *starChanStack
started bool
stopChan chan struct{}
doneChan chan struct{}
dropCount uint64
asyncAlert func(error, LogData)
asyncAlertMu sync.RWMutex
fallbackSync uint32
timeout int64
writeErrCount uint64
writeErrHandler func(error, LogData)
writeErrMu sync.RWMutex
stdScreen io.Writer = colorable.NewColorableStdout()
errScreen io.Writer = colorable.NewColorableStderr()
)
queueCapacity uint64
}
type asyncRuntimeContextKey struct{}
type starlog struct {
mu *sync.Mutex
runtime *asyncRuntime
output io.Writer
minLevel int
errOutputLevel int
@ -240,9 +256,57 @@ type Config struct {
ContextFieldExtractor func(context.Context) Fields
}
func newAsyncRuntime(queueCapacity uint64) *asyncRuntime {
if queueCapacity == 0 {
queueCapacity = defaultAsyncQueueCapacity
}
runtime := &asyncRuntime{
queueCapacity: queueCapacity,
}
atomic.StoreUint32(&runtime.fallbackSync, 1)
return runtime
}
func defaultAsyncRuntime() *asyncRuntime {
if Std != nil && Std.logcore != nil && Std.logcore.runtime != nil {
return Std.logcore.runtime
}
defaultAsyncRuntimeOnce.Do(func() {
defaultAsyncRuntimeFallback = newAsyncRuntime(defaultAsyncQueueCapacity)
})
return defaultAsyncRuntimeFallback
}
func withAsyncRuntime(ctx context.Context, runtime *asyncRuntime) context.Context {
if runtime == nil {
return ctx
}
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, asyncRuntimeContextKey{}, runtime)
}
func runtimeFromContext(ctx context.Context) *asyncRuntime {
if ctx == nil {
return nil
}
runtime, _ := ctx.Value(asyncRuntimeContextKey{}).(*asyncRuntime)
return runtime
}
func reportWriteErrorWithContext(ctx context.Context, err error, data LogData) {
runtime := runtimeFromContext(ctx)
if runtime == nil {
runtime = defaultAsyncRuntime()
}
runtime.reportWriteError(err, data)
}
func newLogCore(out io.Writer) *starlog {
core := &starlog{
mu: &sync.Mutex{},
runtime: newAsyncRuntime(defaultAsyncQueueCapacity),
output: out,
minLevel: LvDebug,
errOutputLevel: LvError,
@ -389,62 +453,98 @@ func generateId() string {
return fmt.Sprintf("%dstar%db612%d", time.Now().UnixNano(), rand.Intn(1000000), rand.Intn(1000000))
}
func StartStacks() {
stackMu.Lock()
if stackStarted {
stackMu.Unlock()
func (logger *starlog) asyncRuntime() *asyncRuntime {
if logger == nil || logger.runtime == nil {
return defaultAsyncRuntime()
}
return logger.runtime
}
func (logger *StarLogger) asyncRuntime() *asyncRuntime {
if logger == nil || logger.logcore == nil {
return defaultAsyncRuntime()
}
return logger.logcore.asyncRuntime()
}
func (logger *starlog) reportAsyncDrop(err error, data LogData) {
logger.asyncRuntime().reportAsyncDrop(err, data)
}
func (logger *starlog) reportWriteError(err error, data LogData) {
logger.asyncRuntime().reportWriteError(err, data)
}
func (runtime *asyncRuntime) snapshot() (*starChanStack, bool) {
if runtime == nil {
return nil, false
}
runtime.mu.Lock()
defer runtime.mu.Unlock()
return runtime.queue, runtime.started
}
func (runtime *asyncRuntime) Start() {
if runtime == nil {
return
}
stackStarted = true
stackStopChan = make(chan struct{})
stackDoneChan = make(chan struct{})
stacks = newStarChanStack(1024)
stopChan := stackStopChan
doneChan := stackDoneChan
stackMu.Unlock()
go func(stop <-chan struct{}, done chan struct{}) {
runtime.mu.Lock()
if runtime.started {
runtime.mu.Unlock()
return
}
queue := newStarChanStack(runtime.queueCapacity)
stopChan := make(chan struct{})
doneChan := make(chan struct{})
runtime.queue = queue
runtime.stopChan = stopChan
runtime.doneChan = doneChan
runtime.started = true
runtime.mu.Unlock()
go func(queue *starChanStack, stop <-chan struct{}, done chan struct{}) {
defer close(done)
defer func() {
stackMu.Lock()
stackStarted = false
stackMu.Unlock()
}()
for {
select {
case <-stop:
return
default:
}
poped, err := stacks.Pop()
popped, err := queue.Pop()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
return
}
val, ok := poped.(logTransfer)
val, ok := popped.(logTransfer)
if !ok {
continue
}
if val.handlerFunc != nil {
invokeAsyncHandlerSafely(val.handlerFunc, val.LogData)
runtime.invokeAsyncHandlerSafely(val.handlerFunc, val.LogData)
}
}
}(stopChan, doneChan)
}(queue, stopChan, doneChan)
}
func StopStacks() {
stackMu.Lock()
if !stackStarted {
stackMu.Unlock()
func (runtime *asyncRuntime) Stop() {
if runtime == nil {
return
}
stopChan := stackStopChan
doneChan := stackDoneChan
current := stacks
stackStopChan = nil
stackDoneChan = nil
stackMu.Unlock()
runtime.mu.Lock()
if !runtime.started {
runtime.mu.Unlock()
return
}
stopChan := runtime.stopChan
doneChan := runtime.doneChan
queue := runtime.queue
runtime.queue = nil
runtime.stopChan = nil
runtime.doneChan = nil
runtime.started = false
runtime.mu.Unlock()
if stopChan != nil {
func() {
@ -454,56 +554,106 @@ func StopStacks() {
close(stopChan)
}()
}
if current != nil {
_ = current.Close()
if queue != nil {
_ = queue.Close()
}
if doneChan != nil {
<-doneChan
}
}
func Stop() {
StopStacks()
func (runtime *asyncRuntime) WaitDrain(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
for {
queue, started := runtime.snapshot()
if !started || queue == nil || queue.Len() == 0 {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Millisecond):
}
}
}
func SetAsyncErrorHandler(alert func(error, LogData)) {
stackAlertMu.Lock()
defer stackAlertMu.Unlock()
stackAlert = alert
func (runtime *asyncRuntime) Metrics() AsyncMetrics {
queue, started := runtime.snapshot()
snapshot := AsyncMetrics{
Started: started,
Dropped: runtime.GetAsyncDropCount(),
FallbackToSync: runtime.GetAsyncFallbackToSync(),
HandlerTimeout: runtime.GetAsyncHandlerTimeout(),
}
if queue != nil {
snapshot.QueueLength = queue.Len()
snapshot.QueueCapacity = queue.Cap()
snapshot.QueueFree = queue.Free()
}
return snapshot
}
func SetAsyncFallbackToSync(enable bool) {
if enable {
atomic.StoreUint32(&stackFallback, 1)
func (runtime *asyncRuntime) SetAsyncErrorHandler(alert func(error, LogData)) {
if runtime == nil {
return
}
atomic.StoreUint32(&stackFallback, 0)
runtime.asyncAlertMu.Lock()
defer runtime.asyncAlertMu.Unlock()
runtime.asyncAlert = alert
}
func GetAsyncFallbackToSync() bool {
return atomic.LoadUint32(&stackFallback) == 1
func (runtime *asyncRuntime) SetAsyncFallbackToSync(enable bool) {
if runtime == nil {
return
}
if enable {
atomic.StoreUint32(&runtime.fallbackSync, 1)
return
}
atomic.StoreUint32(&runtime.fallbackSync, 0)
}
func SetAsyncHandlerTimeout(timeout time.Duration) {
func (runtime *asyncRuntime) GetAsyncFallbackToSync() bool {
if runtime == nil {
return true
}
return atomic.LoadUint32(&runtime.fallbackSync) == 1
}
func (runtime *asyncRuntime) SetAsyncHandlerTimeout(timeout time.Duration) {
if runtime == nil {
return
}
if timeout < 0 {
timeout = 0
}
atomic.StoreInt64(&stackTimeout, int64(timeout))
atomic.StoreInt64(&runtime.timeout, int64(timeout))
}
func GetAsyncHandlerTimeout() time.Duration {
return time.Duration(atomic.LoadInt64(&stackTimeout))
func (runtime *asyncRuntime) GetAsyncHandlerTimeout() time.Duration {
if runtime == nil {
return 0
}
return time.Duration(atomic.LoadInt64(&runtime.timeout))
}
func GetAsyncDropCount() uint64 {
return atomic.LoadUint64(&stackDrop)
func (runtime *asyncRuntime) GetAsyncDropCount() uint64 {
if runtime == nil {
return 0
}
return atomic.LoadUint64(&runtime.dropCount)
}
func reportAsyncDrop(err error, data LogData) {
atomic.AddUint64(&stackDrop, 1)
stackAlertMu.RLock()
alert := stackAlert
stackAlertMu.RUnlock()
func (runtime *asyncRuntime) reportAsyncDrop(err error, data LogData) {
if runtime == nil {
return
}
atomic.AddUint64(&runtime.dropCount, 1)
runtime.asyncAlertMu.RLock()
alert := runtime.asyncAlert
runtime.asyncAlertMu.RUnlock()
if alert != nil {
func() {
defer func() {
@ -514,31 +664,37 @@ func reportAsyncDrop(err error, data LogData) {
}
}
func invokeAsyncHandlerSafely(handler func(LogData), data LogData) bool {
func (runtime *asyncRuntime) invokeAsyncHandlerSafely(handler func(LogData), data LogData) bool {
if runtime == nil {
return invokeAsyncHandlerDirect(defaultAsyncRuntime(), handler, data)
}
if handler == nil {
return true
}
timeout := GetAsyncHandlerTimeout()
timeout := runtime.GetAsyncHandlerTimeout()
if timeout <= 0 {
return invokeAsyncHandlerDirect(handler, data)
return invokeAsyncHandlerDirect(runtime, handler, data)
}
done := make(chan bool, 1)
go func() {
done <- invokeAsyncHandlerDirect(handler, data)
done <- invokeAsyncHandlerDirect(runtime, handler, data)
}()
select {
case ok := <-done:
return ok
case <-time.After(timeout):
reportAsyncDrop(ErrAsyncHandlerTimeout, data)
runtime.reportAsyncDrop(ErrAsyncHandlerTimeout, data)
return false
}
}
func invokeAsyncHandlerDirect(handler func(LogData), data LogData) (ok bool) {
func invokeAsyncHandlerDirect(runtime *asyncRuntime, handler func(LogData), data LogData) (ok bool) {
defer func() {
if panicErr := recover(); panicErr != nil {
reportAsyncDrop(fmt.Errorf("%w: %v", ErrAsyncHandlerPanic, panicErr), data)
if runtime == nil {
runtime = defaultAsyncRuntime()
}
runtime.reportAsyncDrop(fmt.Errorf("%w: %v", ErrAsyncHandlerPanic, panicErr), data)
ok = false
}
}()
@ -546,24 +702,30 @@ func invokeAsyncHandlerDirect(handler func(LogData), data LogData) (ok bool) {
return true
}
func SetWriteErrorHandler(alert func(error, LogData)) {
writeErrMu.Lock()
defer writeErrMu.Unlock()
writeErrHandler = alert
}
func GetWriteErrorCount() uint64 {
return atomic.LoadUint64(&writeErrCount)
}
func reportWriteError(err error, data LogData) {
if err == nil {
func (runtime *asyncRuntime) SetWriteErrorHandler(alert func(error, LogData)) {
if runtime == nil {
return
}
atomic.AddUint64(&writeErrCount, 1)
writeErrMu.RLock()
alert := writeErrHandler
writeErrMu.RUnlock()
runtime.writeErrMu.Lock()
defer runtime.writeErrMu.Unlock()
runtime.writeErrHandler = alert
}
func (runtime *asyncRuntime) GetWriteErrorCount() uint64 {
if runtime == nil {
return 0
}
return atomic.LoadUint64(&runtime.writeErrCount)
}
func (runtime *asyncRuntime) reportWriteError(err error, data LogData) {
if runtime == nil || err == nil {
return
}
atomic.AddUint64(&runtime.writeErrCount, 1)
runtime.writeErrMu.RLock()
alert := runtime.writeErrHandler
runtime.writeErrMu.RUnlock()
if alert != nil {
func() {
defer func() {
@ -574,11 +736,75 @@ func reportWriteError(err error, data LogData) {
}
}
func resetAsyncMetricsForTest() {
atomic.StoreUint64(&stackDrop, 0)
SetAsyncErrorHandler(nil)
SetAsyncFallbackToSync(true)
SetAsyncHandlerTimeout(0)
atomic.StoreUint64(&writeErrCount, 0)
SetWriteErrorHandler(nil)
func (runtime *asyncRuntime) resetForTest() {
if runtime == nil {
return
}
runtime.Stop()
atomic.StoreUint64(&runtime.dropCount, 0)
runtime.SetAsyncErrorHandler(nil)
runtime.SetAsyncFallbackToSync(true)
runtime.SetAsyncHandlerTimeout(0)
atomic.StoreUint64(&runtime.writeErrCount, 0)
runtime.SetWriteErrorHandler(nil)
}
func StartStacks() {
defaultAsyncRuntime().Start()
}
func StopStacks() {
defaultAsyncRuntime().Stop()
}
func Stop() {
StopStacks()
}
func SetAsyncErrorHandler(alert func(error, LogData)) {
defaultAsyncRuntime().SetAsyncErrorHandler(alert)
}
func SetAsyncFallbackToSync(enable bool) {
defaultAsyncRuntime().SetAsyncFallbackToSync(enable)
}
func GetAsyncFallbackToSync() bool {
return defaultAsyncRuntime().GetAsyncFallbackToSync()
}
func SetAsyncHandlerTimeout(timeout time.Duration) {
defaultAsyncRuntime().SetAsyncHandlerTimeout(timeout)
}
func GetAsyncHandlerTimeout() time.Duration {
return defaultAsyncRuntime().GetAsyncHandlerTimeout()
}
func GetAsyncDropCount() uint64 {
return defaultAsyncRuntime().GetAsyncDropCount()
}
func reportAsyncDrop(err error, data LogData) {
defaultAsyncRuntime().reportAsyncDrop(err, data)
}
func invokeAsyncHandlerSafely(handler func(LogData), data LogData) bool {
return defaultAsyncRuntime().invokeAsyncHandlerSafely(handler, data)
}
func SetWriteErrorHandler(alert func(error, LogData)) {
defaultAsyncRuntime().SetWriteErrorHandler(alert)
}
func GetWriteErrorCount() uint64 {
return defaultAsyncRuntime().GetWriteErrorCount()
}
func reportWriteError(err error, data LogData) {
defaultAsyncRuntime().reportWriteError(err, data)
}
func resetAsyncMetricsForTest() {
defaultAsyncRuntime().resetForTest()
}