349 lines
8.6 KiB
Go
349 lines
8.6 KiB
Go
package bcap
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Tracker struct {
|
|
config *PacketsConfig
|
|
|
|
mu sync.Mutex
|
|
tcpStates map[string]trackerTCPState
|
|
cleanupTicker *time.Ticker
|
|
stopCleanup chan struct{}
|
|
cleanupOnce sync.Once
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
func NewTracker() *Tracker {
|
|
return NewTrackerWithConfig(nil)
|
|
}
|
|
|
|
func NewTrackerWithConfig(config *PacketsConfig) *Tracker {
|
|
if config == nil {
|
|
config = DefaultConfig()
|
|
}
|
|
tracker := &Tracker{
|
|
config: config,
|
|
tcpStates: make(map[string]trackerTCPState),
|
|
stopCleanup: make(chan struct{}),
|
|
}
|
|
if config.CleanupInterval > 0 {
|
|
tracker.startAutoCleanup()
|
|
}
|
|
return tracker
|
|
}
|
|
|
|
func (t *Tracker) Stop() {
|
|
t.stopOnce.Do(func() {
|
|
close(t.stopCleanup)
|
|
})
|
|
}
|
|
|
|
func (t *Tracker) startAutoCleanup() {
|
|
t.cleanupOnce.Do(func() {
|
|
t.cleanupTicker = time.NewTicker(t.config.CleanupInterval)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-t.cleanupTicker.C:
|
|
t.CleanupExpiredFlows()
|
|
case <-t.stopCleanup:
|
|
t.cleanupTicker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
})
|
|
}
|
|
|
|
func (t *Tracker) CleanupExpiredFlows() int {
|
|
timeout := t.config.ConnectionTimeout
|
|
if timeout <= 0 {
|
|
return 0
|
|
}
|
|
now := time.Now()
|
|
removed := 0
|
|
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
for key, state := range t.tcpStates {
|
|
if state.lastSeen.IsZero() {
|
|
continue
|
|
}
|
|
if now.Sub(state.lastSeen) <= timeout {
|
|
continue
|
|
}
|
|
delete(t.tcpStates, key)
|
|
removed++
|
|
}
|
|
return removed
|
|
}
|
|
|
|
func (t *Tracker) ActiveFlowCount() int {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return len(t.tcpStates)
|
|
}
|
|
|
|
func (t *Tracker) Observe(packet Packet) (Observation, error) {
|
|
obs := newObservation(packet)
|
|
|
|
switch packet.Transport.Kind {
|
|
case ProtocolTCP:
|
|
hints, err := t.observeTCP(packet, obs.Flow)
|
|
if err != nil {
|
|
return Observation{}, err
|
|
}
|
|
obs.Hints = hints
|
|
return obs, nil
|
|
case ProtocolUDP:
|
|
obs.Hints = newUDPHints(packet)
|
|
return obs, nil
|
|
case ProtocolICMPv4, ProtocolICMPv6:
|
|
obs.Hints = newICMPHints(packet)
|
|
return obs, nil
|
|
case ProtocolARP:
|
|
obs.Hints = newARPHints(packet)
|
|
return obs, nil
|
|
default:
|
|
obs.Hints = HintSet{
|
|
Summary: SummaryHint{Code: string(TagTransportUnknown)},
|
|
Tags: []Tag{TagTransportUnknown},
|
|
}
|
|
return obs, nil
|
|
}
|
|
}
|
|
|
|
func newObservation(packet Packet) Observation {
|
|
return Observation{
|
|
Packet: packet,
|
|
Flow: newFlowRef(packet),
|
|
}
|
|
}
|
|
|
|
func newFlowRef(packet Packet) FlowRef {
|
|
forward := FlowKey{
|
|
Family: packet.Network.Family,
|
|
Protocol: packet.Transport.Kind,
|
|
Src: Endpoint{
|
|
IP: packet.Network.SrcIP,
|
|
Port: packetSourcePort(packet),
|
|
},
|
|
Dst: Endpoint{
|
|
IP: packet.Network.DstIP,
|
|
Port: packetDestinationPort(packet),
|
|
},
|
|
}
|
|
reverse := FlowKey{
|
|
Family: packet.Network.Family,
|
|
Protocol: packet.Transport.Kind,
|
|
Src: forward.Dst,
|
|
Dst: forward.Src,
|
|
}
|
|
return FlowRef{
|
|
Forward: forward,
|
|
Reverse: reverse,
|
|
Stable: stableFlowKey(forward),
|
|
}
|
|
}
|
|
|
|
func stableFlowKey(key FlowKey) string {
|
|
return buildKey(string(key.Protocol), key.Src.IP, key.Src.Port, key.Dst.IP, key.Dst.Port)
|
|
}
|
|
|
|
func packetSourcePort(packet Packet) string {
|
|
switch packet.Transport.Kind {
|
|
case ProtocolTCP:
|
|
if packet.Transport.TCP != nil {
|
|
return packet.Transport.TCP.SrcPort
|
|
}
|
|
case ProtocolUDP:
|
|
if packet.Transport.UDP != nil {
|
|
return packet.Transport.UDP.SrcPort
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func packetDestinationPort(packet Packet) string {
|
|
switch packet.Transport.Kind {
|
|
case ProtocolTCP:
|
|
if packet.Transport.TCP != nil {
|
|
return packet.Transport.TCP.DstPort
|
|
}
|
|
case ProtocolUDP:
|
|
if packet.Transport.UDP != nil {
|
|
return packet.Transport.UDP.DstPort
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func primaryTag(tags []Tag, fallback Tag) Tag {
|
|
if len(tags) == 0 {
|
|
return fallback
|
|
}
|
|
return tags[0]
|
|
}
|
|
|
|
type tcpHintOptions struct {
|
|
keepaliveResponse bool
|
|
}
|
|
|
|
func newTCPHints(packet Packet, legacyState uint8, opts tcpHintOptions) HintSet {
|
|
tags, phase, event := legacyTCPTags(legacyState, opts)
|
|
hints := HintSet{
|
|
Summary: SummaryHint{Code: string(primaryTag(tags, TagTCPPacket))},
|
|
Tags: tags,
|
|
}
|
|
|
|
if packet.Transport.TCP != nil {
|
|
hints.TCP = &TCPHint{
|
|
Phase: phase,
|
|
Event: event,
|
|
LegacyState: legacyState,
|
|
Seq: packet.Transport.TCP.Seq,
|
|
Ack: packet.Transport.TCP.Ack,
|
|
Window: packet.Transport.TCP.Window,
|
|
Payload: packet.Transport.TCP.Payload,
|
|
Retransmission: legacyState == StateTcpRetransmit,
|
|
Keepalive: legacyState == StateTcpKeepalive,
|
|
KeepaliveResponse: opts.keepaliveResponse,
|
|
RST: legacyState == StateTcpRst,
|
|
ECE: legacyState == StateTcpEce,
|
|
CWR: legacyState == StateTcpCwr,
|
|
}
|
|
}
|
|
return hints
|
|
}
|
|
|
|
func legacyTCPTags(state uint8, opts tcpHintOptions) ([]Tag, TCPPhase, TCPEvent) {
|
|
switch state {
|
|
case StateTcpConnect1:
|
|
return []Tag{TagTCPHandshakeSYN}, TCPPhaseHandshake, TCPEventSYN
|
|
case StateTcpConnect2:
|
|
return []Tag{TagTCPHandshakeSYNACK}, TCPPhaseHandshake, TCPEventSYNACK
|
|
case StateTcpConnect3:
|
|
return []Tag{TagTCPHandshakeACK}, TCPPhaseHandshake, TCPEventHandshakeACK
|
|
case StateTcpDisconnect1:
|
|
return []Tag{TagTCPTeardownFIN}, TCPPhaseTeardown, TCPEventFIN
|
|
case StateTcpDisconnect2, StateTcpDisconnect4:
|
|
return []Tag{TagTCPTeardownACK}, TCPPhaseTeardown, TCPEventTeardownACK
|
|
case StateTcpDisconnect23:
|
|
return []Tag{TagTCPTeardownFINACK}, TCPPhaseTeardown, TCPEventFINACK
|
|
case StateTcpDisconnect3:
|
|
return []Tag{TagTCPTeardownFIN}, TCPPhaseTeardown, TCPEventFIN
|
|
case StateTcpAckOk:
|
|
return []Tag{TagTCPPacket}, TCPPhaseEstablished, TCPEventACK
|
|
case StateTcpRetransmit:
|
|
return []Tag{TagTCPRetransmit, TagTCPPacket}, TCPPhaseEstablished, TCPEventRetransmission
|
|
case StateTcpKeepalive:
|
|
if opts.keepaliveResponse {
|
|
return []Tag{TagTCPKeepaliveResp, TagTCPKeepalive, TagTCPPacket}, TCPPhaseEstablished, TCPEventKeepaliveResp
|
|
}
|
|
return []Tag{TagTCPKeepalive, TagTCPPacket}, TCPPhaseEstablished, TCPEventKeepalive
|
|
case StateTcpRst:
|
|
return []Tag{TagTCPRst, TagTCPPacket}, TCPPhaseSpecial, TCPEventRST
|
|
case StateTcpEce:
|
|
return []Tag{TagTCPEce, TagTCPPacket}, TCPPhaseSpecial, TCPEventECE
|
|
case StateTcpCwr:
|
|
return []Tag{TagTCPCwr, TagTCPPacket}, TCPPhaseSpecial, TCPEventCWR
|
|
default:
|
|
return []Tag{TagTCPPacket}, TCPPhaseUnknown, TCPEventUnknown
|
|
}
|
|
}
|
|
|
|
func newUDPHints(packet Packet) HintSet {
|
|
return HintSet{
|
|
Summary: SummaryHint{Code: string(TagUDPPacket)},
|
|
Tags: []Tag{TagUDPPacket},
|
|
UDP: &UDPHint{
|
|
Payload: packet.Transport.Payload,
|
|
},
|
|
}
|
|
}
|
|
|
|
func newICMPHints(packet Packet) HintSet {
|
|
hints := HintSet{
|
|
Summary: SummaryHint{Code: string(TagICMPPacket)},
|
|
Tags: []Tag{TagICMPPacket},
|
|
}
|
|
if packet.Transport.ICMP == nil {
|
|
return hints
|
|
}
|
|
|
|
item := &ICMPHint{
|
|
Version: packet.Transport.ICMP.Version,
|
|
Type: packet.Transport.ICMP.Type,
|
|
Code: packet.Transport.ICMP.Code,
|
|
}
|
|
switch packet.Transport.Kind {
|
|
case ProtocolICMPv4:
|
|
switch packet.Transport.ICMP.Type {
|
|
case 8:
|
|
item.IsEcho = true
|
|
hints.Summary.Code = string(TagICMPEchoRequest)
|
|
hints.Tags = append(hints.Tags, TagICMPEchoRequest)
|
|
case 0:
|
|
item.IsEchoReply = true
|
|
hints.Summary.Code = string(TagICMPEchoReply)
|
|
hints.Tags = append(hints.Tags, TagICMPEchoReply)
|
|
case 3:
|
|
item.IsUnreachable = true
|
|
hints.Summary.Code = string(TagICMPUnreachable)
|
|
hints.Tags = append(hints.Tags, TagICMPUnreachable)
|
|
case 11:
|
|
item.IsTimeExceeded = true
|
|
hints.Summary.Code = string(TagICMPTimeExceeded)
|
|
hints.Tags = append(hints.Tags, TagICMPTimeExceeded)
|
|
}
|
|
case ProtocolICMPv6:
|
|
switch packet.Transport.ICMP.Type {
|
|
case 128:
|
|
item.IsEcho = true
|
|
hints.Summary.Code = string(TagICMPEchoRequest)
|
|
hints.Tags = append(hints.Tags, TagICMPEchoRequest)
|
|
case 129:
|
|
item.IsEchoReply = true
|
|
hints.Summary.Code = string(TagICMPEchoReply)
|
|
hints.Tags = append(hints.Tags, TagICMPEchoReply)
|
|
case 1:
|
|
item.IsUnreachable = true
|
|
hints.Summary.Code = string(TagICMPUnreachable)
|
|
hints.Tags = append(hints.Tags, TagICMPUnreachable)
|
|
case 3:
|
|
item.IsTimeExceeded = true
|
|
hints.Summary.Code = string(TagICMPTimeExceeded)
|
|
hints.Tags = append(hints.Tags, TagICMPTimeExceeded)
|
|
}
|
|
}
|
|
hints.ICMP = item
|
|
return hints
|
|
}
|
|
|
|
func newARPHints(packet Packet) HintSet {
|
|
hints := HintSet{
|
|
Summary: SummaryHint{Code: string(TagTransportUnknown)},
|
|
Tags: []Tag{TagTransportUnknown},
|
|
}
|
|
if packet.Network.ARP == nil {
|
|
return hints
|
|
}
|
|
item := &ARPHint{Operation: packet.Network.ARP.Operation}
|
|
switch packet.Network.ARP.Operation {
|
|
case 1:
|
|
item.Request = true
|
|
hints.Summary.Code = string(TagARPRequest)
|
|
hints.Tags = []Tag{TagARPRequest}
|
|
case 2:
|
|
item.Reply = true
|
|
hints.Summary.Code = string(TagARPReply)
|
|
hints.Tags = []Tag{TagARPReply}
|
|
}
|
|
hints.ARP = item
|
|
return hints
|
|
}
|