starlog/multi_sink.go
2026-03-19 16:37:57 +08:00

146 lines
3.2 KiB
Go

package starlog
import "b612.me/starlog/internal/multisinkx"
type SinkState string
const (
SinkStateHealthy SinkState = SinkState(multisinkx.StateHealthy)
SinkStateDegraded SinkState = SinkState(multisinkx.StateDegraded)
SinkStateRecovered SinkState = SinkState(multisinkx.StateRecovered)
)
type SinkStats struct {
Index int
Writes uint64
WriteErrors uint64
Closes uint64
CloseErrors uint64
ConsecutiveWriteErrors uint64
ConsecutiveCloseErrors uint64
LastWriteError string
LastCloseError string
State SinkState
}
type MultiSinkStats struct {
ContinueOnError bool
Sinks []SinkStats
}
type MultiSink struct {
core *multisinkx.MultiSink
}
func wrapSinks(sinks []Sink) []multisinkx.Sink {
if len(sinks) == 0 {
return nil
}
result := make([]multisinkx.Sink, 0, len(sinks))
for _, sink := range sinks {
if sink == nil {
continue
}
result = append(result, sink)
}
return result
}
func toSinkStats(stats multisinkx.Stats) SinkStats {
return SinkStats{
Index: stats.Index,
Writes: stats.Writes,
WriteErrors: stats.WriteErrors,
Closes: stats.Closes,
CloseErrors: stats.CloseErrors,
ConsecutiveWriteErrors: stats.ConsecutiveWriteErrors,
ConsecutiveCloseErrors: stats.ConsecutiveCloseErrors,
LastWriteError: stats.LastWriteError,
LastCloseError: stats.LastCloseError,
State: SinkState(stats.State),
}
}
func toMultiSinkStats(snapshot multisinkx.Snapshot) MultiSinkStats {
result := MultiSinkStats{
ContinueOnError: snapshot.ContinueOnError,
Sinks: make([]SinkStats, 0, len(snapshot.Sinks)),
}
for _, item := range snapshot.Sinks {
result.Sinks = append(result.Sinks, toSinkStats(item))
}
return result
}
func NewMultiSink(sinks ...Sink) *MultiSink {
return &MultiSink{
core: multisinkx.New(wrapSinks(sinks)...),
}
}
func (sink *MultiSink) SetSinks(sinks ...Sink) {
if sink == nil || sink.core == nil {
return
}
sink.core.SetSinks(wrapSinks(sinks)...)
}
func (sink *MultiSink) AddSink(item Sink) {
if sink == nil || sink.core == nil || item == nil {
return
}
sink.core.AddSink(item)
}
func (sink *MultiSink) SetContinueOnError(continueOnError bool) {
if sink == nil || sink.core == nil {
return
}
sink.core.SetContinueOnError(continueOnError)
}
func (sink *MultiSink) ContinueOnError() bool {
if sink == nil || sink.core == nil {
return true
}
return sink.core.ContinueOnError()
}
func (sink *MultiSink) SinkCount() int {
if sink == nil || sink.core == nil {
return 0
}
return sink.core.SinkCount()
}
func (sink *MultiSink) GetStats() MultiSinkStats {
if sink == nil || sink.core == nil {
return MultiSinkStats{
ContinueOnError: true,
Sinks: nil,
}
}
return toMultiSinkStats(sink.core.GetStats())
}
func (sink *MultiSink) ResetStats() {
if sink == nil || sink.core == nil {
return
}
sink.core.ResetStats()
}
func (sink *MultiSink) Write(data []byte) error {
if sink == nil || sink.core == nil {
return nil
}
return sink.core.Write(data)
}
func (sink *MultiSink) Close() error {
if sink == nil || sink.core == nil {
return nil
}
return sink.core.Close()
}