starlog/pipeline.go
2026-03-19 16:37:57 +08:00

323 lines
7.5 KiB
Go

package starlog
import (
"context"
"errors"
"fmt"
"io"
"os"
"sort"
"strings"
"time"
"b612.me/starlog/internal/pipelinex"
)
type FileInfo = os.FileInfo
type Fields map[string]interface{}
type Entry struct {
Time time.Time
Level int
LevelName string
LoggerName string
Thread string
File string
Line int
Func string
Message string
Fields Fields
Err error
Context context.Context
}
type Handler interface {
Handle(context.Context, *Entry) error
}
type Formatter interface {
Format(*Entry) ([]byte, error)
}
type Sink interface {
Write([]byte) error
Close() error
}
type RotatePolicy interface {
ShouldRotate(FileInfo, *Entry) bool
NextPath(string, time.Time) string
}
// RotateArchivePathProvider is an optional extension for RotatePolicy.
// If implemented, ArchivePath is preferred over NextPath when resolving
// the archived file destination path.
type RotateArchivePathProvider interface {
ArchivePath(string, time.Time) string
}
func resolveRotateArchivePath(policy RotatePolicy, current string, now time.Time) string {
if policy == nil {
return ""
}
if provider, ok := policy.(RotateArchivePathProvider); ok {
if path := provider.ArchivePath(current, now); path != "" {
return path
}
}
return policy.NextPath(current, now)
}
type Redactor interface {
Redact(context.Context, *Entry) error
}
type RedactRule interface {
Apply(context.Context, *Entry) (bool, error)
}
type HandlerFunc func(context.Context, *Entry) error
func (f HandlerFunc) Handle(ctx context.Context, entry *Entry) error {
if f == nil {
return nil
}
return f(ctx, entry)
}
type RedactorFunc func(context.Context, *Entry) error
func (f RedactorFunc) Redact(ctx context.Context, entry *Entry) error {
if f == nil {
return nil
}
return f(ctx, entry)
}
type RedactRuleFunc func(context.Context, *Entry) (bool, error)
func (f RedactRuleFunc) Apply(ctx context.Context, entry *Entry) (bool, error) {
if f == nil {
return false, nil
}
return f(ctx, entry)
}
func cloneFields(fields Fields) Fields {
if len(fields) == 0 {
return nil
}
cloned := make(Fields, len(fields))
for k, v := range fields {
cloned[k] = v
}
return cloned
}
func mergeFields(base Fields, extra Fields) Fields {
switch {
case len(base) == 0 && len(extra) == 0:
return nil
case len(base) == 0:
return cloneFields(extra)
case len(extra) == 0:
return cloneFields(base)
default:
merged := make(Fields, len(base)+len(extra))
for k, v := range base {
merged[k] = v
}
for k, v := range extra {
merged[k] = v
}
return merged
}
}
func renderFields(fields Fields) string {
if len(fields) == 0 {
return ""
}
keys := make([]string, 0, len(fields))
for key := range fields {
keys = append(keys, key)
}
sort.Strings(keys)
pairs := make([]string, 0, len(keys))
for _, key := range keys {
pairs = append(pairs, fmt.Sprintf("%s=%v", key, fields[key]))
}
return strings.Join(pairs, " ")
}
type TextFormatter struct {
IncludeTimestamp bool
IncludeLevel bool
IncludeSource bool
IncludeThread bool
IncludeLogger bool
}
func NewTextFormatter() *TextFormatter {
return &TextFormatter{
IncludeTimestamp: true,
IncludeLevel: true,
IncludeSource: true,
IncludeThread: true,
IncludeLogger: false,
}
}
func (formatter *TextFormatter) Format(entry *Entry) ([]byte, error) {
if entry == nil {
return []byte(""), nil
}
options := pipelinex.TextOptions{
IncludeTimestamp: formatter == nil || formatter.IncludeTimestamp,
IncludeLevel: formatter == nil || formatter.IncludeLevel,
IncludeSource: formatter == nil || formatter.IncludeSource,
IncludeThread: formatter == nil || formatter.IncludeThread,
IncludeLogger: formatter != nil && formatter.IncludeLogger,
}
pipeEntry := pipelinex.Entry{
Time: entry.Time,
LevelName: entry.LevelName,
LoggerName: entry.LoggerName,
Thread: entry.Thread,
File: entry.File,
Line: entry.Line,
Func: entry.Func,
Message: entry.Message,
Fields: cloneFields(entry.Fields),
}
if entry.Err != nil {
pipeEntry.Error = entry.Err.Error()
}
return pipelinex.FormatText(pipeEntry, options)
}
type JSONFormatter struct {
Pretty bool
}
func NewJSONFormatter() *JSONFormatter {
return &JSONFormatter{}
}
func (formatter *JSONFormatter) Format(entry *Entry) ([]byte, error) {
if entry == nil {
return []byte("{}"), nil
}
pipeEntry := pipelinex.Entry{
Time: entry.Time,
LevelName: entry.LevelName,
LoggerName: entry.LoggerName,
Thread: entry.Thread,
File: entry.File,
Line: entry.Line,
Func: entry.Func,
Message: entry.Message,
Fields: cloneFields(entry.Fields),
}
if entry.Err != nil {
pipeEntry.Error = entry.Err.Error()
}
return pipelinex.FormatJSON(pipeEntry, formatter != nil && formatter.Pretty)
}
type WriterSink struct {
writer io.Writer
closer io.Closer
}
func NewWriterSink(writer io.Writer) *WriterSink {
sink := &WriterSink{writer: writer}
if closer, ok := writer.(io.Closer); ok {
sink.closer = closer
}
return sink
}
func (sink *WriterSink) Write(data []byte) error {
if sink == nil || sink.writer == nil {
return errors.New("sink writer is nil")
}
_, err := sink.writer.Write(data)
return err
}
func (sink *WriterSink) Close() error {
if sink == nil || sink.closer == nil {
return nil
}
return sink.closer.Close()
}
type RotatePolicyArchive struct {
policy RotatePolicy
checkInterval int64
hookBefore func(*StarLogger, string, string, os.FileInfo) error
hookAfter func(*StarLogger, string, string, os.FileInfo) error
}
func NewRotatePolicyArchive(policy RotatePolicy, checkInterval int64) *RotatePolicyArchive {
return &RotatePolicyArchive{
policy: policy,
checkInterval: checkInterval,
}
}
func (archive *RotatePolicyArchive) ShouldArchiveNow(logger *StarLogger, fullpath string, info os.FileInfo) bool {
if archive == nil || archive.policy == nil {
return false
}
return archive.policy.ShouldRotate(info, nil)
}
func (archive *RotatePolicyArchive) NextLogFilePath(logger *StarLogger, oldpath string, info os.FileInfo) string {
return oldpath
}
func (archive *RotatePolicyArchive) ArchiveLogFilePath(logger *StarLogger, oldpath string, info os.FileInfo) string {
if archive == nil || archive.policy == nil {
return oldpath
}
return resolveRotateArchivePath(archive.policy, oldpath, time.Now())
}
func (archive *RotatePolicyArchive) Interval() int64 {
if archive == nil || archive.checkInterval <= 0 {
return 1
}
return archive.checkInterval
}
func (archive *RotatePolicyArchive) HookBeforArchive() func(*StarLogger, string, string, os.FileInfo) error {
return archive.HookBeforeArchive()
}
func (archive *RotatePolicyArchive) HookBeforeArchive() func(*StarLogger, string, string, os.FileInfo) error {
return archive.hookBefore
}
func (archive *RotatePolicyArchive) HookAfterArchive() func(*StarLogger, string, string, os.FileInfo) error {
return archive.hookAfter
}
func (archive *RotatePolicyArchive) DoArchive() func(*StarLogger, string, string, os.FileInfo) error {
return nil
}
func (archive *RotatePolicyArchive) SetHookBeforeArchive(hook func(*StarLogger, string, string, os.FileInfo) error) {
archive.hookBefore = hook
}
func (archive *RotatePolicyArchive) SetHookBeforArchive(hook func(*StarLogger, string, string, os.FileInfo) error) {
archive.SetHookBeforeArchive(hook)
}
func (archive *RotatePolicyArchive) SetHookAfterArchive(hook func(*StarLogger, string, string, os.FileInfo) error) {
archive.hookAfter = hook
}