starlog/route_handler_test.go

307 lines
7.5 KiB
Go
Raw Permalink Normal View History

2026-03-19 16:37:57 +08:00
package starlog
import (
"bytes"
"context"
"errors"
"io/ioutil"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
type messageOnlyFormatter struct{}
func (formatter *messageOnlyFormatter) Format(entry *Entry) ([]byte, error) {
if entry == nil {
return []byte{}, nil
}
return []byte(entry.Message + "\n"), nil
}
type failSink struct{}
func (sink *failSink) Write(data []byte) error {
_ = data
return errors.New("route sink write failed")
}
func (sink *failSink) Close() error {
return nil
}
type closeCountSink struct {
closeCount uint64
}
func (sink *closeCountSink) Write(data []byte) error {
_ = data
return nil
}
func (sink *closeCountSink) Close() error {
atomic.AddUint64(&sink.closeCount, 1)
return nil
}
type safeBuffer struct {
mu sync.Mutex
buf bytes.Buffer
}
func (buffer *safeBuffer) Write(p []byte) (int, error) {
buffer.mu.Lock()
defer buffer.mu.Unlock()
return buffer.buf.Write(p)
}
func (buffer *safeBuffer) String() string {
buffer.mu.Lock()
defer buffer.mu.Unlock()
return buffer.buf.String()
}
func waitFor(t *testing.T, timeout time.Duration, cond func() bool, reason string) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if cond() {
return
}
time.Sleep(5 * time.Millisecond)
}
t.Fatalf("timeout waiting for condition: %s", reason)
}
func TestRouteHandlerSplitByLevel(t *testing.T) {
var totalBuf safeBuffer
var briefBuf safeBuffer
var errBuf safeBuffer
logger := newStructuredTestLogger(&totalBuf)
router := NewRouteHandler(
Route{
Name: "brief",
Match: MatchLevels(LvInfo, LvNotice),
Formatter: &messageOnlyFormatter{},
Sink: NewWriterSink(&briefBuf),
},
Route{
Name: "err",
Match: MatchLevels(LvError),
Formatter: &messageOnlyFormatter{},
Sink: NewWriterSink(&errBuf),
},
)
logger.AppendEntryHandler(router)
logger.Info("i1")
logger.Notice("n1")
logger.Error("e1")
waitFor(t, 300*time.Millisecond, func() bool {
return strings.Contains(briefBuf.String(), "i1") &&
strings.Contains(briefBuf.String(), "n1") &&
strings.Contains(errBuf.String(), "e1")
}, "route handler split outputs")
total := totalBuf.String()
if !strings.Contains(total, "i1") || !strings.Contains(total, "n1") || !strings.Contains(total, "e1") {
t.Fatalf("total log should keep all levels, got %q", total)
}
brief := briefBuf.String()
if !strings.Contains(brief, "i1") || !strings.Contains(brief, "n1") {
t.Fatalf("brief route should contain info+notice, got %q", brief)
}
if strings.Contains(brief, "e1") {
t.Fatalf("brief route should not contain error log, got %q", brief)
}
errLog := errBuf.String()
if !strings.Contains(errLog, "e1") {
t.Fatalf("err route should contain error log, got %q", errLog)
}
if strings.Contains(errLog, "i1") || strings.Contains(errLog, "n1") {
t.Fatalf("err route should only contain error log, got %q", errLog)
}
}
func TestRouteHandlerDynamicReplaceRoutes(t *testing.T) {
var totalBuf safeBuffer
var briefBuf safeBuffer
var errBuf safeBuffer
logger := newStructuredTestLogger(&totalBuf)
router := NewRouteHandler(
Route{
Name: "brief",
Match: MatchLevels(LvInfo),
Formatter: &messageOnlyFormatter{},
Sink: NewWriterSink(&briefBuf),
},
)
logger.SetEntryHandler(router)
logger.Info("brief-1")
waitFor(t, 300*time.Millisecond, func() bool {
return strings.Contains(briefBuf.String(), "brief-1")
}, "initial brief route output")
router.ReplaceRoutes(
Route{
Name: "err",
Match: MatchLevels(LvError),
Formatter: &messageOnlyFormatter{},
Sink: NewWriterSink(&errBuf),
},
)
logger.Info("brief-2")
logger.Error("err-1")
waitFor(t, 300*time.Millisecond, func() bool {
return strings.Contains(errBuf.String(), "err-1")
}, "replaced error route output")
brief := briefBuf.String()
if !strings.Contains(brief, "brief-1") {
t.Fatalf("brief route should keep previous matching log, got %q", brief)
}
if strings.Contains(brief, "brief-2") || strings.Contains(brief, "err-1") {
t.Fatalf("brief route should be replaced and stop receiving new logs, got %q", brief)
}
errLog := errBuf.String()
if !strings.Contains(errLog, "err-1") {
t.Fatalf("err route should receive error log after replace, got %q", errLog)
}
if strings.Contains(errLog, "brief-1") || strings.Contains(errLog, "brief-2") {
t.Fatalf("err route should not receive info logs, got %q", errLog)
}
}
func TestChainHandlerRunsAll(t *testing.T) {
var c1 uint64
var c2 uint64
handler := ChainHandler(
HandlerFunc(func(context.Context, *Entry) error {
atomic.AddUint64(&c1, 1)
return nil
}),
HandlerFunc(func(context.Context, *Entry) error {
atomic.AddUint64(&c2, 1)
return nil
}),
)
if err := handler.Handle(context.Background(), &Entry{}); err != nil {
t.Fatalf("chain handler should not return error, got %v", err)
}
if atomic.LoadUint64(&c1) != 1 || atomic.LoadUint64(&c2) != 1 {
t.Fatalf("all handlers should run once, got c1=%d c2=%d", c1, c2)
}
}
func TestRouteHandlerWriteErrorObservable(t *testing.T) {
resetAsyncMetricsForTest()
defer resetAsyncMetricsForTest()
var totalBuf safeBuffer
logger := newStructuredTestLogger(&totalBuf)
logger.SetEntryHandler(NewRouteHandler(
Route{
Name: "failed-route",
Match: MatchAllLevels(),
Formatter: &messageOnlyFormatter{},
Sink: &failSink{},
},
))
logger.Info("route write error")
waitFor(t, 300*time.Millisecond, func() bool {
return GetWriteErrorCount() > 0
}, "route sink write error observable")
}
func TestRouteHandlerCloseDeduplicatesSameSink(t *testing.T) {
sink := &closeCountSink{}
handler := NewRouteHandler(
Route{
Name: "r1",
Match: MatchLevels(LvInfo),
Formatter: &messageOnlyFormatter{},
Sink: sink,
},
Route{
Name: "r2",
Match: MatchLevels(LvError),
Formatter: &messageOnlyFormatter{},
Sink: sink,
},
)
if err := handler.Close(); err != nil {
t.Fatalf("route handler close failed: %v", err)
}
if atomic.LoadUint64(&sink.closeCount) != 1 {
t.Fatalf("same sink should be closed once, got %d", sink.closeCount)
}
}
func TestRouteHandlerWithRotatingFileSink(t *testing.T) {
var totalBuf safeBuffer
logger := newStructuredTestLogger(&totalBuf)
logger.SetShowStd(false)
logger.SetShowColor(false)
debugPath := filepath.Join(testBinDir(t), "debug.log")
debugSink, err := NewManagedRotateBySizeSink(
debugPath,
true,
128,
10*time.Millisecond,
RotateManageOptions{
MaxBackups: 5,
Pattern: "debug.*.log",
},
)
if err != nil {
t.Fatalf("create rotating sink failed: %v", err)
}
router := NewRouteHandler(
Route{
Name: "debug-info",
Match: MatchLevels(LvDebug, LvInfo),
Formatter: &messageOnlyFormatter{},
Sink: debugSink,
},
)
logger.SetEntryHandler(router)
for idx := 0; idx < 20; idx++ {
logger.Infof("debug payload %02d %s", idx, strings.Repeat("x", 24))
time.Sleep(3 * time.Millisecond)
}
waitFor(t, 2*time.Second, func() bool {
matches, _ := filepath.Glob(filepath.Join(filepath.Dir(debugPath), "debug.*.log"))
return len(matches) > 0
}, "rotating route sink archive creation")
content, readErr := ioutil.ReadFile(debugPath)
if readErr != nil {
t.Fatalf("read debug current log failed: %v", readErr)
}
if len(content) == 0 {
t.Fatalf("debug log should contain routed logs")
}
if err = logger.Close(); err != nil {
t.Fatalf("logger close failed: %v", err)
}
if err = debugSink.Write([]byte("after close")); !errors.Is(err, ErrRotatingFileSinkClosed) {
t.Fatalf("rotating sink should be closed by logger close, got %v", err)
}
}