package bcap import ( "sync" "time" ) type Tracker struct { config *PacketsConfig mu sync.Mutex tcpStates map[string]trackerTCPState cleanupTicker *time.Ticker stopCleanup chan struct{} cleanupOnce sync.Once stopOnce sync.Once } func NewTracker() *Tracker { return NewTrackerWithConfig(nil) } func NewTrackerWithConfig(config *PacketsConfig) *Tracker { if config == nil { config = DefaultConfig() } tracker := &Tracker{ config: config, tcpStates: make(map[string]trackerTCPState), stopCleanup: make(chan struct{}), } if config.CleanupInterval > 0 { tracker.startAutoCleanup() } return tracker } func (t *Tracker) Stop() { t.stopOnce.Do(func() { close(t.stopCleanup) }) } func (t *Tracker) startAutoCleanup() { t.cleanupOnce.Do(func() { t.cleanupTicker = time.NewTicker(t.config.CleanupInterval) go func() { for { select { case <-t.cleanupTicker.C: t.CleanupExpiredFlows() case <-t.stopCleanup: t.cleanupTicker.Stop() return } } }() }) } func (t *Tracker) CleanupExpiredFlows() int { timeout := t.config.ConnectionTimeout if timeout <= 0 { return 0 } now := time.Now() removed := 0 t.mu.Lock() defer t.mu.Unlock() for key, state := range t.tcpStates { if state.lastSeen.IsZero() { continue } if now.Sub(state.lastSeen) <= timeout { continue } delete(t.tcpStates, key) removed++ } return removed } func (t *Tracker) ActiveFlowCount() int { t.mu.Lock() defer t.mu.Unlock() return len(t.tcpStates) } func (t *Tracker) Observe(packet Packet) (Observation, error) { obs := newObservation(packet) switch packet.Transport.Kind { case ProtocolTCP: hints, err := t.observeTCP(packet, obs.Flow) if err != nil { return Observation{}, err } obs.Hints = hints return obs, nil case ProtocolUDP: obs.Hints = newUDPHints(packet) return obs, nil case ProtocolICMPv4, ProtocolICMPv6: obs.Hints = newICMPHints(packet) return obs, nil case ProtocolARP: obs.Hints = newARPHints(packet) return obs, nil default: obs.Hints = HintSet{ Summary: SummaryHint{Code: string(TagTransportUnknown)}, Tags: []Tag{TagTransportUnknown}, } return obs, nil } } func newObservation(packet Packet) Observation { return Observation{ Packet: packet, Flow: newFlowRef(packet), } } func newFlowRef(packet Packet) FlowRef { forward := FlowKey{ Family: packet.Network.Family, Protocol: packet.Transport.Kind, Src: Endpoint{ IP: packet.Network.SrcIP, Port: packetSourcePort(packet), }, Dst: Endpoint{ IP: packet.Network.DstIP, Port: packetDestinationPort(packet), }, } reverse := FlowKey{ Family: packet.Network.Family, Protocol: packet.Transport.Kind, Src: forward.Dst, Dst: forward.Src, } return FlowRef{ Forward: forward, Reverse: reverse, Stable: stableFlowKey(forward), } } func stableFlowKey(key FlowKey) string { return buildKey(string(key.Protocol), key.Src.IP, key.Src.Port, key.Dst.IP, key.Dst.Port) } func packetSourcePort(packet Packet) string { switch packet.Transport.Kind { case ProtocolTCP: if packet.Transport.TCP != nil { return packet.Transport.TCP.SrcPort } case ProtocolUDP: if packet.Transport.UDP != nil { return packet.Transport.UDP.SrcPort } } return "" } func packetDestinationPort(packet Packet) string { switch packet.Transport.Kind { case ProtocolTCP: if packet.Transport.TCP != nil { return packet.Transport.TCP.DstPort } case ProtocolUDP: if packet.Transport.UDP != nil { return packet.Transport.UDP.DstPort } } return "" } func primaryTag(tags []Tag, fallback Tag) Tag { if len(tags) == 0 { return fallback } return tags[0] } type tcpHintOptions struct { keepaliveResponse bool } func newTCPHints(packet Packet, legacyState uint8, opts tcpHintOptions) HintSet { tags, phase, event := legacyTCPTags(legacyState, opts) hints := HintSet{ Summary: SummaryHint{Code: string(primaryTag(tags, TagTCPPacket))}, Tags: tags, } if packet.Transport.TCP != nil { hints.TCP = &TCPHint{ Phase: phase, Event: event, LegacyState: legacyState, Seq: packet.Transport.TCP.Seq, Ack: packet.Transport.TCP.Ack, Window: packet.Transport.TCP.Window, Payload: packet.Transport.TCP.Payload, Retransmission: legacyState == StateTcpRetransmit, Keepalive: legacyState == StateTcpKeepalive, KeepaliveResponse: opts.keepaliveResponse, RST: legacyState == StateTcpRst, ECE: legacyState == StateTcpEce, CWR: legacyState == StateTcpCwr, } } return hints } func legacyTCPTags(state uint8, opts tcpHintOptions) ([]Tag, TCPPhase, TCPEvent) { switch state { case StateTcpConnect1: return []Tag{TagTCPHandshakeSYN}, TCPPhaseHandshake, TCPEventSYN case StateTcpConnect2: return []Tag{TagTCPHandshakeSYNACK}, TCPPhaseHandshake, TCPEventSYNACK case StateTcpConnect3: return []Tag{TagTCPHandshakeACK}, TCPPhaseHandshake, TCPEventHandshakeACK case StateTcpDisconnect1: return []Tag{TagTCPTeardownFIN}, TCPPhaseTeardown, TCPEventFIN case StateTcpDisconnect2, StateTcpDisconnect4: return []Tag{TagTCPTeardownACK}, TCPPhaseTeardown, TCPEventTeardownACK case StateTcpDisconnect23: return []Tag{TagTCPTeardownFINACK}, TCPPhaseTeardown, TCPEventFINACK case StateTcpDisconnect3: return []Tag{TagTCPTeardownFIN}, TCPPhaseTeardown, TCPEventFIN case StateTcpAckOk: return []Tag{TagTCPPacket}, TCPPhaseEstablished, TCPEventACK case StateTcpRetransmit: return []Tag{TagTCPRetransmit, TagTCPPacket}, TCPPhaseEstablished, TCPEventRetransmission case StateTcpKeepalive: if opts.keepaliveResponse { return []Tag{TagTCPKeepaliveResp, TagTCPKeepalive, TagTCPPacket}, TCPPhaseEstablished, TCPEventKeepaliveResp } return []Tag{TagTCPKeepalive, TagTCPPacket}, TCPPhaseEstablished, TCPEventKeepalive case StateTcpRst: return []Tag{TagTCPRst, TagTCPPacket}, TCPPhaseSpecial, TCPEventRST case StateTcpEce: return []Tag{TagTCPEce, TagTCPPacket}, TCPPhaseSpecial, TCPEventECE case StateTcpCwr: return []Tag{TagTCPCwr, TagTCPPacket}, TCPPhaseSpecial, TCPEventCWR default: return []Tag{TagTCPPacket}, TCPPhaseUnknown, TCPEventUnknown } } func newUDPHints(packet Packet) HintSet { return HintSet{ Summary: SummaryHint{Code: string(TagUDPPacket)}, Tags: []Tag{TagUDPPacket}, UDP: &UDPHint{ Payload: packet.Transport.Payload, }, } } func newICMPHints(packet Packet) HintSet { hints := HintSet{ Summary: SummaryHint{Code: string(TagICMPPacket)}, Tags: []Tag{TagICMPPacket}, } if packet.Transport.ICMP == nil { return hints } item := &ICMPHint{ Version: packet.Transport.ICMP.Version, Type: packet.Transport.ICMP.Type, Code: packet.Transport.ICMP.Code, } switch packet.Transport.Kind { case ProtocolICMPv4: switch packet.Transport.ICMP.Type { case 8: item.IsEcho = true hints.Summary.Code = string(TagICMPEchoRequest) hints.Tags = append(hints.Tags, TagICMPEchoRequest) case 0: item.IsEchoReply = true hints.Summary.Code = string(TagICMPEchoReply) hints.Tags = append(hints.Tags, TagICMPEchoReply) case 3: item.IsUnreachable = true hints.Summary.Code = string(TagICMPUnreachable) hints.Tags = append(hints.Tags, TagICMPUnreachable) case 11: item.IsTimeExceeded = true hints.Summary.Code = string(TagICMPTimeExceeded) hints.Tags = append(hints.Tags, TagICMPTimeExceeded) } case ProtocolICMPv6: switch packet.Transport.ICMP.Type { case 128: item.IsEcho = true hints.Summary.Code = string(TagICMPEchoRequest) hints.Tags = append(hints.Tags, TagICMPEchoRequest) case 129: item.IsEchoReply = true hints.Summary.Code = string(TagICMPEchoReply) hints.Tags = append(hints.Tags, TagICMPEchoReply) case 1: item.IsUnreachable = true hints.Summary.Code = string(TagICMPUnreachable) hints.Tags = append(hints.Tags, TagICMPUnreachable) case 3: item.IsTimeExceeded = true hints.Summary.Code = string(TagICMPTimeExceeded) hints.Tags = append(hints.Tags, TagICMPTimeExceeded) } } hints.ICMP = item return hints } func newARPHints(packet Packet) HintSet { hints := HintSet{ Summary: SummaryHint{Code: string(TagTransportUnknown)}, Tags: []Tag{TagTransportUnknown}, } if packet.Network.ARP == nil { return hints } item := &ARPHint{Operation: packet.Network.ARP.Operation} switch packet.Network.ARP.Operation { case 1: item.Request = true hints.Summary.Code = string(TagARPRequest) hints.Tags = []Tag{TagARPRequest} case 2: item.Reply = true hints.Summary.Code = string(TagARPReply) hints.Tags = []Tag{TagARPReply} } hints.ARP = item return hints }