package bcap import ( "fmt" "net" "strings" "sync" "sync/atomic" "time" "github.com/gopacket/gopacket" "github.com/gopacket/gopacket/layers" ) // ════════════════════════════════════════════════ // 错误类型定义 / Error Type Definitions // ════════════════════════════════════════════════ // ParseErrorType 解析错误类型 / Parse error type type ParseErrorType int const ( ErrTypeNone ParseErrorType = iota // 无错误 / No error ErrTypeLinkLayer // 链路层错误 / Link layer error ErrTypeNetwork // 网络层错误 / Network layer error ErrTypeTransport // 传输层错误 / Transport layer error ErrTypeUnsupported // 不支持的协议 / Unsupported protocol ) // ParseError 解析错误结构体,包含详细的错误信息 // ParseError structure containing detailed error information type ParseError struct { Type ParseErrorType // 错误类型 / Error type Layer string // 出错的层 / Layer where error occurred Message string // 错误信息 / Error message Err error // 原始错误 / Original error } func (e *ParseError) Error() string { if e.Err != nil { return fmt.Sprintf("[%s] %s: %v", e.Layer, e.Message, e.Err) } return fmt.Sprintf("[%s] %s", e.Layer, e.Message) } // NewParseError 创建新的解析错误 / Create new parse error func NewParseError(errType ParseErrorType, layer, message string, err error) *ParseError { return &ParseError{ Type: errType, Layer: layer, Message: message, Err: err, } } // ════════════════════════════════════════════════ // 状态常量定义 / State Constants // ════════════════════════════════════════════════════════════════════════════════ // TCP 状态描述符定义 / TCP state descriptor definitions const ( StateUnknown uint8 = 0 // 未知状态 / Unknown state StateTcpConnect1 uint8 = 1 // 三次握手第一步 (SYN) / Three-way handshake step 1 (SYN) StateTcpConnect2 uint8 = 2 // 三次握手第二步 (SYN-ACK) / Three-way handshake step 2 (SYN-ACK) StateTcpConnect3 uint8 = 3 // 三次握手第三步 (ACK) / Three-way handshake step 3 (ACK) StateTcpDisconnect1 uint8 = 4 // 四次挥手第一步 (FIN) / Four-way handshake step 1 (FIN) StateTcpDisconnect2 uint8 = 5 // 四次挥手第二步 (ACK) / Four-way handshake step 2 (ACK) StateTcpDisconnect23 uint8 = 6 // 四次挥手第二三步合并 (FIN-ACK) / Four-way handshake step 2+3 combined StateTcpDisconnect3 uint8 = 7 // 四次挥手第三步 (FIN) / Four-way handshake step 3 (FIN) StateTcpDisconnect4 uint8 = 8 // 四次挥手第四步 (ACK) / Four-way handshake step 4 (ACK) StateTcpAckOk uint8 = 9 // 正常 ACK / Normal ACK StateTcpRetransmit uint8 = 10 // TCP 重传 / TCP retransmission StateTcpEce uint8 = 11 // TCP ECE 标志 / TCP ECE flag StateTcpCwr uint8 = 12 // TCP CWR 标志 / TCP CWR flag StateTcpRst uint8 = 13 // TCP RST 标志 / TCP RST flag StateTcpKeepalive uint8 = 14 // TCP 保活 / TCP keepalive StateUdp uint8 = 20 // UDP 数据包 / UDP packet StateIcmp uint8 = 30 // ICMP 数据包 / ICMP packet StateIcmpv6 uint8 = 31 // ICMPv6 数据包 / ICMPv6 packet ) // 默认配置常量 / Default configuration constants const ( DefaultConnectionTimeout = 5 * time.Minute // 默认连接超时时间 / Default connection timeout DefaultCleanupInterval = 1 * time.Minute // 默认清理间隔 / Default cleanup interval DefaultShardCount = 32 // 默认分片数量 / Default shard count ) // ════════════════════════════════════════════════ // 回调函数类型定义 / Callback Function Types // ════════════════════════════════════════════════ // ConnectionCallback 连接事件回调函数类型 // ConnectionCallback connection event callback function type type ConnectionCallback func(info PacketInfo) // ErrorCallback 错误事件回调函数类型 // ErrorCallback error event callback function type type ErrorCallback func(err *ParseError, packet gopacket.Packet) // ════════════════════════════════════════════════════════════════════════════════ // 配置结构体 / Configuration Structures // ════════════════════════════════════════════════════════════════════════════════ // PacketsConfig 数据包管理器配置 // PacketsConfig configuration for packets manager type PacketsConfig struct { // 连接超时时间,超过此时间未活动的连接将被清理 // Connection timeout, connections inactive beyond this time will be cleaned up ConnectionTimeout time.Duration // 自动清理间隔,设置为 0 则禁用自动清理 // Auto cleanup interval, set to 0 to disable auto cleanup CleanupInterval time.Duration // 是否启用懒加载模式(按需解析详细信息) // Whether to enable lazy parsing mode (parse details on demand) LazyParsing bool // 是否启用调试日志 // Whether to enable debug logging EnableDebugLog bool // 分片数量(用于减少锁竞争) // Shard count (for reducing lock contention) ShardCount int // 新连接回调 / New connection callback OnNewConnection ConnectionCallback // 连接关闭回调 / Connection closed callback OnConnectionClosed ConnectionCallback // 错误回调 / Error callback OnError ErrorCallback } // DefaultConfig 返回默认配置 // DefaultConfig returns default configuration func DefaultConfig() *PacketsConfig { return &PacketsConfig{ ConnectionTimeout: DefaultConnectionTimeout, CleanupInterval: DefaultCleanupInterval, LazyParsing: false, EnableDebugLog: false, ShardCount: DefaultShardCount, } } // ════════════════════════════════════════════════ // 统计信息结构体 / Statistics Structures // ════════════════════════════════════════════════ // ErrorStats 错误统计信息 // ErrorStats error statistics type ErrorStats struct { LinkLayerErrors uint64 // 链路层错误数 / Link layer error count NetworkErrors uint64 // 网络层错误数 / Network layer error count TransportErrors uint64 // 传输层错误数 / Transport layer error count UnsupportedErrors uint64 // 不支持协议错误数 / Unsupported protocol error count TotalErrors uint64 // 总错误数 / Total error count } // ConnectionStats 连接统计信息 // ConnectionStats connection statistics type ConnectionStats struct { ActiveConnections int64 // 当前活跃连接数 / Current active connections TotalConnections uint64 // 历史总连接数 / Total historical connections ClosedConnections uint64 // 已关闭连接数 / Closed connections TimeoutConnections uint64 // 超时清理的连接数 / Timeout cleaned connections TcpConnections int64 // TCP 连接数 / TCP connection count UdpConnections int64 // UDP 连接数 / UDP connection count IcmpConnections int64 // ICMP 连接数 / ICMP connection count } // Stats 综合统计信息 // Stats comprehensive statistics type Stats struct { Errors ErrorStats // 错误统计 / Error statistics Connections ConnectionStats // 连接统计 / Connection statistics StartTime time.Time // 启动时间 / Start time LastCleanup time.Time // 上次清理时间 / Last cleanup time } // ════════════════════════════════════════════════ // 数据包信息结构体 / Packet Info Structure // ════════════════════════════════════════════════════════════════════════════════ // PacketInfo 数据包信息结构体,包含完整的网络层和传输层信息 // PacketInfo structure containing complete network and transport layer information type PacketInfo struct { Key string // 连接唯一标识 / Unique connection identifier ReverseKey string // 反向连接标识 / Reverse connection identifier Type string // 协议类型: tcp/udp/icmp/icmpv6 / Protocol type SrcMac net.HardwareAddr // 源 MAC 地址 / Source MAC address DstMac net.HardwareAddr // 目标 MAC 地址 / Destination MAC address SrcIP string // 源 IP 地址 / Source IP address SrcPort string // 源端口 (TCP/UDP) / Source port DstIP string // 目标 IP 地址 / Destination IP address DstPort string // 目标端口 (TCP/UDP) / Destination port // 时间戳相关字段 / Timestamp related fields Timestamp time.Time // 数据包时间戳 / Packet timestamp TimestampMicros int64 // 微秒级时间戳 / Microsecond timestamp RelativeTime time.Duration // 相对于首包的时间偏移 / Time offset relative to first packet // ICMP 相关字段 / ICMP related fields IcmpType uint8 // ICMP 类型 / ICMP type IcmpCode uint8 // ICMP 代码 / ICMP code IcmpChecksum uint16 // ICMP 校验和 / ICMP checksum IcmpId uint16 // ICMP 标识符 / ICMP identifier IcmpSeq uint16 // ICMP 序列号 / ICMP sequence number // 连接统计字段 / Connection statistics fields PacketCount uint64 // 该连接的数据包计数 / Packet count for this connection ByteCount uint64 // 该连接的字节计数 / Byte count for this connection FirstSeen time.Time // 首次出现时间 / First seen time LastSeen time.Time // 最后出现时间 / Last seen time // 内部状态字段 / Internal state fields comment string // 用户自定义注释 / User-defined comment packet gopacket.Packet // 原始数据包 / Raw packet tcpSeq uint32 // TCP 序列号 / TCP sequence number tcpAck uint32 // TCP 确认号 / TCP acknowledgment number tcpWindow uint16 // TCP 窗口大小 / TCP window size tcpPayloads int // 载荷长度 / Payload length finState bool // TCP FIN 标志 / TCP FIN flag synState bool // TCP SYN 标志 / TCP SYN flag isFirst bool // 是否为首次出现 / Whether first occurrence stateDescript uint8 // 状态描述符 / State descriptor } // PacketInfo 的 Getter 方法 / PacketInfo getter methods func (p PacketInfo) StateDescript() uint8 { return p.stateDescript } func (p PacketInfo) TcpPayloads() int { return p.tcpPayloads } func (p PacketInfo) FinState() bool { return p.finState } func (p PacketInfo) SynState() bool { return p.synState } func (p PacketInfo) TcpWindow() uint16 { return p.tcpWindow } func (p PacketInfo) TcpAck() uint32 { return p.tcpAck } func (p PacketInfo) TcpSeq() uint32 { return p.tcpSeq } func (p PacketInfo) Packet() gopacket.Packet { return p.packet } func (p PacketInfo) Comment() string { return p.comment } func (p *PacketInfo) SetComment(comment string) { p.comment = comment } // ════════════════════════════════════════════════ // 分片 Map 实现(减少锁竞争)/ Sharded Map Implementation (reduce lock contention) // ════════════════════════════════════════════════ // shard 单个分片 // shard single shard type shard struct { sync.RWMutex items map[string]PacketInfo } // shardedMap 分片 Map,通过将数据分散到多个分片来减少锁竞争 // shardedMap sharded map, reduces lock contention by distributing data across multiple shards type shardedMap struct { shards []*shard shardCount int } // newShardedMap 创建新的分片 Map // newShardedMap creates a new sharded map func newShardedMap(shardCount int) *shardedMap { if shardCount <= 0 { shardCount = DefaultShardCount } sm := &shardedMap{ shards: make([]*shard, shardCount), shardCount: shardCount, } for i := 0; i < shardCount; i++ { sm.shards[i] = &shard{ items: make(map[string]PacketInfo), } } return sm } // getShard 根据 key 获取对应的分片 // getShard gets the shard for a given key func (sm *shardedMap) getShard(key string) *shard { hash := fnvHash(key) return sm.shards[hash%uint32(sm.shardCount)] } // fnvHash FNV-1a 哈希函数,用于快速计算字符串哈希 // fnvHash FNV-1a hash function for fast string hashing func fnvHash(key string) uint32 { hash := uint32(2166136261) for i := 0; i < len(key); i++ { hash ^= uint32(key[i]) hash *= 16777619 } return hash } // Get 获取值 / Get value func (sm *shardedMap) Get(key string) (PacketInfo, bool) { shard := sm.getShard(key) shard.RLock() val, ok := shard.items[key] shard.RUnlock() return val, ok } // Set 设置值 / Set value func (sm *shardedMap) Set(key string, value PacketInfo) { shard := sm.getShard(key) shard.Lock() shard.items[key] = value shard.Unlock() } // Delete 删除值 / Delete value func (sm *shardedMap) Delete(key string) { shard := sm.getShard(key) shard.Lock() delete(shard.items, key) shard.Unlock() } // DeleteBatch 批量删除 / Batch delete func (sm *shardedMap) DeleteBatch(keys []string) { // 按分片分组 keys / Group keys by shard shardKeys := make(map[int][]string) for _, key := range keys { hash := fnvHash(key) shardIdx := int(hash % uint32(sm.shardCount)) shardKeys[shardIdx] = append(shardKeys[shardIdx], key) } // 对每个分片批量删除 / Batch delete for each shard for shardIdx, keys := range shardKeys { shard := sm.shards[shardIdx] shard.Lock() for _, key := range keys { delete(shard.items, key) } shard.Unlock() } } // Len 获取总长度 / Get total length func (sm *shardedMap) Len() int { total := 0 for _, shard := range sm.shards { shard.RLock() total += len(shard.items) shard.RUnlock() } return total } // GetAll 获取所有项 / Get all items func (sm *shardedMap) GetAll() []PacketInfo { var result []PacketInfo for _, shard := range sm.shards { shard.RLock() for _, v := range shard.items { result = append(result, v) } shard.RUnlock() } return result } // Range 遍历所有项 / Range over all items func (sm *shardedMap) Range(f func(key string, value PacketInfo) bool) { for _, shard := range sm.shards { shard.RLock() for k, v := range shard.items { if !f(k, v) { shard.RUnlock() return } } shard.RUnlock() } } // Clear 清空所有项 / Clear all items func (sm *shardedMap) Clear() { for _, shard := range sm.shards { shard.Lock() shard.items = make(map[string]PacketInfo) shard.Unlock() } } // ════════════════════════════════════════════════ // 对象池(减少内存分配)/ Object Pool (reduce memory allocation) // ════════════════════════════════════════════════ // stringBuilderPool strings.Builder 对象池 var stringBuilderPool = sync.Pool{ New: func() interface{} { return &strings.Builder{} }, } // getStringBuilder 从对象池获取 strings.Builder // getStringBuilder gets strings.Builder from pool func getStringBuilder() *strings.Builder { return stringBuilderPool.Get().(*strings.Builder) } // putStringBuilder 将 strings.Builder 放回对象池 // putStringBuilder puts strings.Builder back to pool func putStringBuilder(sb *strings.Builder) { sb.Reset() stringBuilderPool.Put(sb) } // buildKey 使用对象池优化的 key 构建函数 // buildKey optimized key building function using object pool func buildKey(protocol, srcIP, srcPort, dstIP, dstPort string) string { sb := getStringBuilder() defer putStringBuilder(sb) sb.WriteString(protocol) sb.WriteString("://") sb.WriteString(srcIP) if srcPort != "" { sb.WriteString(":") sb.WriteString(srcPort) } sb.WriteString("=") sb.WriteString(dstIP) if dstPort != "" { sb.WriteString(":") sb.WriteString(dstPort) } return sb.String() } // ════════════════════════════════════════════════ // Packets 主结构体 / Packets Main Structure // ════════════════════════════════════════════════ // Packets 数据包管理器,用于跟踪和解析网络数据包 // Packets manager for tracking and parsing network packets type Packets struct { config *PacketsConfig // 配置 / Configuration connections *shardedMap // 连接映射表 / Connection map stats Stats // 统计信息 / Statistics statsLock sync.RWMutex // 统计信息锁 / Statistics lock startTime time.Time // 启动时间 / Start time firstPacketTime time.Time // 首个数据包时间 / First packet time cleanupTicker *time.Ticker // 清理定时器 / Cleanup ticker stopCleanup chan struct{} // 停止清理信号 / Stop cleanup signal cleanupOnce sync.Once // 确保只启动一次清理 / Ensure cleanup starts only once } // NewPackets 创建新的数据包管理器实例(使用默认配置) // NewPackets creates a new Packets manager instance (with default config) func NewPackets() *Packets { return NewPacketsWithConfig(DefaultConfig()) } // NewPacketsWithConfig 使用自定义配置创建数据包管理器 // NewPacketsWithConfig creates Packets manager with custom config func NewPacketsWithConfig(config *PacketsConfig) *Packets { if config == nil { config = DefaultConfig() } p := &Packets{ config: config, connections: newShardedMap(config.ShardCount), startTime: time.Now(), stopCleanup: make(chan struct{}), } p.stats.StartTime = p.startTime // 启动自动清理 goroutine / Start auto cleanup goroutine if config.CleanupInterval > 0 { p.startAutoCleanup() } return p } // startAutoCleanup 启动自动清理 goroutine // startAutoCleanup starts auto cleanup goroutine func (p *Packets) startAutoCleanup() { p.cleanupOnce.Do(func() { p.cleanupTicker = time.NewTicker(p.config.CleanupInterval) go func() { for { select { case <-p.cleanupTicker.C: p.CleanupExpiredConnections() case <-p.stopCleanup: p.cleanupTicker.Stop() return } } }() }) } // Stop 停止数据包管理器(停止自动清理) // Stop stops the packets manager (stops auto cleanup) func (p *Packets) Stop() { close(p.stopCleanup) } // ════════════════════════════════════════════════ // 统计和查询方法 / Statistics and Query Methods // ════════════════════════════════════════════════ // GetStats 获取统计信息 // GetStats gets statistics func (p *Packets) GetStats() Stats { p.statsLock.RLock() defer p.statsLock.RUnlock() stats := p.stats stats.Connections.ActiveConnections = int64(p.connections.Len()) return stats } // GetConnectionCount 获取当前活跃连接数 // GetConnectionCount gets current active connection count func (p *Packets) GetConnectionCount() int { return p.connections.Len() } // GetAllConnections 获取所有活跃连接 // GetAllConnections gets all active connections func (p *Packets) GetAllConnections() []PacketInfo { return p.connections.GetAll() } // GetConnectionsByIP 根据 IP 地址查询连接 // GetConnectionsByIP queries connections by IP address func (p *Packets) GetConnectionsByIP(ip string) []PacketInfo { var result []PacketInfo p.connections.Range(func(key string, value PacketInfo) bool { if value.SrcIP == ip || value.DstIP == ip { result = append(result, value) } return true }) return result } // GetConnectionsByPort 根据端口查询连接 // GetConnectionsByPort queries connections by port func (p *Packets) GetConnectionsByPort(port string) []PacketInfo { var result []PacketInfo p.connections.Range(func(key string, value PacketInfo) bool { if value.SrcPort == port || value.DstPort == port { result = append(result, value) } return true }) return result } // GetConnectionsByType 根据协议类型查询连接 // GetConnectionsByType queries connections by protocol type func (p *Packets) GetConnectionsByType(protocolType string) []PacketInfo { var result []PacketInfo p.connections.Range(func(key string, value PacketInfo) bool { if value.Type == protocolType { result = append(result, value) } return true }) return result } // Key 根据 key 获取连接信息 // Key gets connection info by key func (p *Packets) Key(key string) (PacketInfo, bool) { return p.connections.Get(key) } // SetComment 设置连接注释 // SetComment sets connection comment func (p *Packets) SetComment(key, comment string) { if info, ok := p.connections.Get(key); ok { info.comment = comment p.connections.Set(key, info) } } // ════════════════════════════════════════════════ // 连接清理方法 / Connection Cleanup Methods // ════════════════════════════════════════════════ // CleanupExpiredConnections 清理过期连接 // CleanupExpiredConnections cleans up expired connections func (p *Packets) CleanupExpiredConnections() int { now := time.Now() timeout := p.config.ConnectionTimeout var expiredKeys []string // 收集过期的 keys / Collect expired keys p.connections.Range(func(key string, value PacketInfo) bool { if now.Sub(value.LastSeen) > timeout { expiredKeys = append(expiredKeys, key) } return true }) // 批量删除 / Batch delete if len(expiredKeys) > 0 { p.connections.DeleteBatch(expiredKeys) // 更新统计 / Update statistics p.statsLock.Lock() p.stats.Connections.TimeoutConnections += uint64(len(expiredKeys)) p.stats.LastCleanup = now p.statsLock.Unlock() } return len(expiredKeys) } // ClearAllConnections 清空所有连接 // ClearAllConnections clears all connections func (p *Packets) ClearAllConnections() { p.connections.Clear() p.statsLock.Lock() p.stats.Connections.ActiveConnections = 0 p.statsLock.Unlock() } // ════════════════════════════════════════════════ // 错误处理方法 / Error Handling Methods // ════════════════════════════════════════════════ // recordError 记录错误统计 // recordError records error statistics func (p *Packets) recordError(errType ParseErrorType) { p.statsLock.Lock() defer p.statsLock.Unlock() p.stats.Errors.TotalErrors++ switch errType { case ErrTypeLinkLayer: p.stats.Errors.LinkLayerErrors++ case ErrTypeNetwork: p.stats.Errors.NetworkErrors++ case ErrTypeTransport: p.stats.Errors.TransportErrors++ case ErrTypeUnsupported: p.stats.Errors.UnsupportedErrors++ } } // handleError 处理错误(记录统计并调用回调) // handleError handles error (records statistics and calls callback) func (p *Packets) handleError(parseErr *ParseError, packet gopacket.Packet) { p.recordError(parseErr.Type) if p.config.EnableDebugLog { fmt.Printf("[ERROR] %s\n", parseErr.Error()) } if p.config.OnError != nil { p.config.OnError(parseErr, packet) } } // ════════════════════════════════════════════════ // 数据包解析方法 / Packet Parsing Methods // ════════════════════════════════════════════════ // ParsePacket 解析数据包,支持 Ethernet、Linux SLL、Linux SLL2 等链路层格式 // ParsePacket parses packets, supporting Ethernet, Linux SLL, Linux SLL2 and other link layer formats // // 参数 / Parameters: // - packet: gopacket 数据包对象 / gopacket packet object // - opts: 可选参数,opts[0] 可以是 *[]byte 类型的 MAC 地址(用于 nfqueue 等场景) // / Optional parameters, opts[0] can be *[]byte type MAC address (for nfqueue scenarios) // // 返回 / Returns: // - PacketInfo: 解析后的数据包信息 / Parsed packet information // - error: 解析错误 / Parse error func (p *Packets) ParsePacket(packet gopacket.Packet, opts ...interface{}) (PacketInfo, error) { var info PacketInfo // 提取时间戳 / Extract timestamp metadata := packet.Metadata() if metadata != nil { info.Timestamp = metadata.Timestamp info.TimestampMicros = metadata.Timestamp.UnixMicro() // 记录首个数据包时间 / Record first packet time if p.firstPacketTime.IsZero() { p.firstPacketTime = metadata.Timestamp } // 计算相对时间 / Calculate relative time info.RelativeTime = metadata.Timestamp.Sub(p.firstPacketTime) } else { info.Timestamp = time.Now() info.TimestampMicros = info.Timestamp.UnixMicro() } // ──────────────────────────────────────────────── // 链路层:提取 MAC 地址 / Link Layer: Extract MAC addresses // 支持 Ethernet、Linux SLL、Linux SLL2 / Support Ethernet, Linux SLL, Linux SLL2 // ──────────────────────────────────────────────── var srcMac, dstMac net.HardwareAddr // 情况 1: 标准以太网层 / Case 1: Standard Ethernet layer if ethLayer := packet.Layer(layers.LayerTypeEthernet); ethLayer != nil { eth := ethLayer.(*layers.Ethernet) srcMac = eth.SrcMAC dstMac = eth.DstMAC } else if sllLayer := packet.Layer(layers.LayerTypeLinuxSLL); sllLayer != nil { // 情况 2: Linux SLL (cooked capture v1) / Case 2: Linux SLL (cooked capture v1) sll := sllLayer.(*layers.LinuxSLL) if sll.AddrType == 1 && sll.AddrLen == 6 { // 1 = ARPHRD_ETHER srcMac = sll.Addr } } else if sll2Layer := packet.Layer(layers.LayerTypeLinuxSLL2); sll2Layer != nil { // 情况 3: Linux SLL2 (cooked capture v2) / Case 3: Linux SLL2 (cooked capture v2) sll2 := sll2Layer.(*layers.LinuxSLL2) if sll2.ARPHardwareType == 1 && sll2.AddrLength == 6 { // 1 = ARPHRD_ETHER srcMac = sll2.Addr } } // 可选:使用外部提供的 MAC 地址覆盖 / Optional: Override with externally provided MAC for k, v := range opts { if k == 0 { if b, ok := v.(*[]byte); ok && b != nil && len(*b) == 6 { srcMac = net.HardwareAddr(*b) } } } info.SrcMac = srcMac info.DstMac = dstMac // ──────────────────────────────────────────────── // 网络层:提取 IP 地址 / Network Layer: Extract IP addresses // ──────────────────────────────────────────────── if nw := packet.NetworkLayer(); nw != nil { srcp, dstp := nw.NetworkFlow().Endpoints() info.SrcIP = srcp.String() info.DstIP = dstp.String() } else { parseErr := NewParseError(ErrTypeNetwork, "Network", "no valid network layer found", nil) p.handleError(parseErr, packet) return info, parseErr } // ──────────────────────────────────────────────── // 传输层:解析 TCP/UDP/ICMP / Transport Layer: Parse TCP/UDP/ICMP // ──────────────────────────────────────────────── // TCP 协议 / TCP protocol if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil { if tcp, ok := tcpLayer.(*layers.TCP); ok { return p.parseTcp(info, packet, tcpLayer, tcp) } } // UDP 协议 / UDP protocol if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil { if udp, ok := udpLayer.(*layers.UDP); ok { return p.parseUdp(info, packet, udpLayer, udp) } } // ICMPv4 协议 / ICMPv4 protocol if icmpLayer := packet.Layer(layers.LayerTypeICMPv4); icmpLayer != nil { if icmp, ok := icmpLayer.(*layers.ICMPv4); ok { return p.parseIcmp(info, packet, icmpLayer, icmp) } } // ICMPv6 协议 / ICMPv6 protocol if icmpv6Layer := packet.Layer(layers.LayerTypeICMPv6); icmpv6Layer != nil { if icmpv6, ok := icmpv6Layer.(*layers.ICMPv6); ok { return p.parseIcmpv6(info, packet, icmpv6Layer, icmpv6) } } // 不支持的协议类型 / Unsupported protocol type parseErr := NewParseError(ErrTypeUnsupported, "Transport", "unsupported packet type (not TCP/UDP/ICMP/ICMPv6)", nil) p.handleError(parseErr, packet) return info, parseErr } // parseTcp 解析 TCP 数据包并进行状态跟踪 // parseTcp parses TCP packets and performs state tracking func (p *Packets) parseTcp(info PacketInfo, packet gopacket.Packet, layer gopacket.Layer, tcp *layers.TCP) (PacketInfo, error) { // 使用优化的 key 构建 / Use optimized key building info.Key = buildKey("tcp", info.SrcIP, fmt.Sprintf("%d", tcp.SrcPort), info.DstIP, fmt.Sprintf("%d", tcp.DstPort)) info.ReverseKey = buildKey("tcp", info.DstIP, fmt.Sprintf("%d", tcp.DstPort), info.SrcIP, fmt.Sprintf("%d", tcp.SrcPort)) info.Type = "tcp" info.SrcPort = fmt.Sprintf("%d", tcp.SrcPort) info.DstPort = fmt.Sprintf("%d", tcp.DstPort) info.packet = packet info.tcpSeq = tcp.Seq info.tcpAck = tcp.Ack info.tcpPayloads = len(layer.LayerPayload()) info.finState = tcp.FIN info.synState = tcp.SYN info.tcpWindow = tcp.Window // 获取上一个同方向的数据包 / Get the last packet in the same direction lastPacket, exists := p.connections.Get(info.Key) // 如果是新连接,初始化状态 / If it's a new connection, initialize state if !exists { lastPacket = PacketInfo{ Key: info.Key, ReverseKey: info.ReverseKey, Type: "tcp", SrcIP: info.SrcIP, SrcPort: info.SrcPort, DstIP: info.DstIP, DstPort: info.DstPort, FirstSeen: info.Timestamp, LastSeen: info.Timestamp, PacketCount: 0, ByteCount: 0, tcpSeq: tcp.Seq, tcpAck: tcp.Ack, tcpWindow: tcp.Window, tcpPayloads: len(layer.LayerPayload()), finState: tcp.FIN, synState: tcp.SYN, isFirst: true, stateDescript: StateUnknown, } // 更新统计 / Update statistics p.statsLock.Lock() p.stats.Connections.TotalConnections++ atomic.AddInt64(&p.stats.Connections.TcpConnections, 1) p.statsLock.Unlock() // 触发新连接回调 / Trigger new connection callback if p.config.OnNewConnection != nil { p.config.OnNewConnection(lastPacket) } } // 更新连接统计 / Update connection statistics info.FirstSeen = lastPacket.FirstSeen info.LastSeen = info.Timestamp info.PacketCount = lastPacket.PacketCount + 1 info.ByteCount = lastPacket.ByteCount + uint64(len(packet.Data())) // 获取反向连接的数据包 / Get the reverse direction packet lastReverse, _ := p.connections.Get(info.ReverseKey) // 继承上一个数据包的注释和 MAC 地址 / Inherit comment and MAC addresses from last packet if !lastPacket.isFirst { info.comment = lastPacket.comment if lastPacket.SrcMac != nil && len(info.SrcMac) == 0 { info.SrcMac = lastPacket.SrcMac } if lastPacket.DstMac != nil && len(info.DstMac) == 0 { info.DstMac = lastPacket.DstMac } } if lastReverse.SrcMac != nil && len(info.DstMac) == 0 { info.DstMac = lastReverse.SrcMac } // ──────────────────────────────────────────────── // TCP 状态机判断 / TCP state machine // ──────────────────────────────────────────────── connectionClosed := false // RST 标志:连接重置 / RST flag: connection reset if tcp.RST { info.stateDescript = StateTcpRst p.connections.Delete(info.Key) p.connections.Delete(info.ReverseKey) connectionClosed = true // 更新统计 / Update statistics p.statsLock.Lock() p.stats.Connections.ClosedConnections++ atomic.AddInt64(&p.stats.Connections.TcpConnections, -1) p.statsLock.Unlock() // 触发连接关闭回调 / Trigger connection closed callback if p.config.OnConnectionClosed != nil { p.config.OnConnectionClosed(info) } return info, nil } // 三次握手 / Three-way handshake if tcp.SYN && !tcp.ACK { info.stateDescript = StateTcpConnect1 // SYN } else if tcp.SYN && tcp.ACK { info.stateDescript = StateTcpConnect2 // SYN-ACK } else if tcp.ACK { if !tcp.FIN { // 三次握手第三步 / Three-way handshake step 3 if lastReverse.tcpSeq+1 == tcp.Ack && lastReverse.stateDescript == StateTcpConnect2 { info.stateDescript = StateTcpConnect3 } else if tcp.CWR { info.stateDescript = StateTcpCwr } else if tcp.ECE { info.stateDescript = StateTcpEce } if info.stateDescript != StateUnknown { goto savereturn } // TCP 保活检测 / TCP keepalive detection if info.tcpSeq == lastReverse.tcpAck-1 && info.tcpSeq == lastPacket.tcpSeq+uint32(lastPacket.tcpPayloads)-1 { info.stateDescript = StateTcpKeepalive goto savereturn } // TCP 重传检测 / TCP retransmission detection if !lastPacket.isFirst { if info.tcpSeq < lastPacket.tcpSeq+uint32(lastPacket.tcpPayloads) { info.stateDescript = StateTcpRetransmit goto savereturn } } // 四次挥手完成 / Four-way handshake completed if lastReverse.finState && lastPacket.finState { info.stateDescript = StateTcpDisconnect4 p.connections.Delete(info.Key) p.connections.Delete(info.ReverseKey) connectionClosed = true // 更新统计 / Update statistics p.statsLock.Lock() p.stats.Connections.ClosedConnections++ atomic.AddInt64(&p.stats.Connections.TcpConnections, -1) p.statsLock.Unlock() // 触发连接关闭回调 / Trigger connection closed callback if p.config.OnConnectionClosed != nil { p.config.OnConnectionClosed(info) } return info, nil } // 四次挥手第二步 / Four-way handshake step 2 if lastReverse.finState && lastReverse.tcpSeq+1 == info.tcpAck { info.stateDescript = StateTcpDisconnect2 goto savereturn } // 正常 ACK / Normal ACK info.stateDescript = StateTcpAckOk } else { // 四次挥手 / Four-way handshake if !lastReverse.finState { info.stateDescript = StateTcpDisconnect1 // FIN } else { if lastReverse.finState && lastReverse.tcpSeq+1 == info.tcpAck && lastPacket.tcpAck == info.tcpAck && lastPacket.tcpSeq == info.tcpSeq { info.stateDescript = StateTcpDisconnect3 // FIN (step 3) } else { info.stateDescript = StateTcpDisconnect23 // FIN-ACK (step 2+3 combined) } } } } savereturn: // 只有在连接未关闭时才保存 / Only save if connection is not closed if !connectionClosed { p.connections.Set(info.Key, info) } return info, nil } // parseUdp 解析 UDP 数据包 // parseUdp parses UDP packets func (p *Packets) parseUdp(info PacketInfo, packet gopacket.Packet, layer gopacket.Layer, udp *layers.UDP) (PacketInfo, error) { // 使用优化的 key 构建 / Use optimized key building info.Key = buildKey("udp", info.SrcIP, fmt.Sprintf("%d", udp.SrcPort), info.DstIP, fmt.Sprintf("%d", udp.DstPort)) info.ReverseKey = buildKey("udp", info.DstIP, fmt.Sprintf("%d", udp.DstPort), info.SrcIP, fmt.Sprintf("%d", udp.SrcPort)) info.Type = "udp" info.SrcPort = fmt.Sprintf("%d", udp.SrcPort) info.DstPort = fmt.Sprintf("%d", udp.DstPort) info.packet = packet info.tcpPayloads = len(layer.LayerPayload()) info.stateDescript = StateUdp // 获取已存在的连接信息 / Get existing connection info lastPacket, exists := p.connections.Get(info.Key) if !exists { // 新 UDP 连接 / New UDP connection info.FirstSeen = info.Timestamp info.LastSeen = info.Timestamp info.PacketCount = 1 info.ByteCount = uint64(len(packet.Data())) // 更新统计 / Update statistics p.statsLock.Lock() p.stats.Connections.TotalConnections++ atomic.AddInt64(&p.stats.Connections.UdpConnections, 1) p.statsLock.Unlock() // 触发新连接回调 / Trigger new connection callback if p.config.OnNewConnection != nil { p.config.OnNewConnection(info) } } else { // 更新已存在的连接 / Update existing connection info.FirstSeen = lastPacket.FirstSeen info.LastSeen = info.Timestamp info.PacketCount = lastPacket.PacketCount + 1 info.ByteCount = lastPacket.ByteCount + uint64(len(packet.Data())) info.comment = lastPacket.comment // 继承 MAC 地址 / Inherit MAC addresses if lastPacket.SrcMac != nil && len(info.SrcMac) == 0 { info.SrcMac = lastPacket.SrcMac } if lastPacket.DstMac != nil && len(info.DstMac) == 0 { info.DstMac = lastPacket.DstMac } } p.connections.Set(info.Key, info) return info, nil } // parseIcmp 解析 ICMPv4 数据包 // parseIcmp parses ICMPv4 packets func (p *Packets) parseIcmp(info PacketInfo, packet gopacket.Packet, layer gopacket.Layer, icmp *layers.ICMPv4) (PacketInfo, error) { info.Type = "icmp" info.IcmpType = uint8(icmp.TypeCode.Type()) info.IcmpCode = uint8(icmp.TypeCode.Code()) info.IcmpChecksum = icmp.Checksum info.IcmpId = icmp.Id info.IcmpSeq = icmp.Seq info.tcpPayloads = len(layer.LayerPayload()) info.stateDescript = StateIcmp // ICMP 的 Key 格式:icmp://srcIP=dstIP:type:code:id:seq // ICMP Key format: icmp://srcIP=dstIP:type:code:id:seq sb := getStringBuilder() defer putStringBuilder(sb) sb.WriteString("icmp://") sb.WriteString(info.SrcIP) sb.WriteString("=") sb.WriteString(info.DstIP) sb.WriteString(":") sb.WriteString(fmt.Sprintf("%d:%d:%d:%d", info.IcmpType, info.IcmpCode, info.IcmpId, info.IcmpSeq)) info.Key = sb.String() sb.Reset() sb.WriteString("icmp://") sb.WriteString(info.DstIP) sb.WriteString("=") sb.WriteString(info.SrcIP) sb.WriteString(":") sb.WriteString(fmt.Sprintf("%d:%d:%d:%d", info.IcmpType, info.IcmpCode, info.IcmpId, info.IcmpSeq)) info.ReverseKey = sb.String() info.packet = packet // 获取已存在的连接信息 / Get existing connection info lastPacket, exists := p.connections.Get(info.Key) if !exists { // 新 ICMP 连接 / New ICMP connection info.FirstSeen = info.Timestamp info.LastSeen = info.Timestamp info.PacketCount = 1 info.ByteCount = uint64(len(packet.Data())) // 更新统计 / Update statistics p.statsLock.Lock() p.stats.Connections.TotalConnections++ atomic.AddInt64(&p.stats.Connections.IcmpConnections, 1) p.statsLock.Unlock() // 触发新连接回调 / Trigger new connection callback if p.config.OnNewConnection != nil { p.config.OnNewConnection(info) } } else { // 更新已存在的连接 / Update existing connection info.FirstSeen = lastPacket.FirstSeen info.LastSeen = info.Timestamp info.PacketCount = lastPacket.PacketCount + 1 info.ByteCount = lastPacket.ByteCount + uint64(len(packet.Data())) info.comment = lastPacket.comment // 继承 MAC 地址 / Inherit MAC addresses if lastPacket.SrcMac != nil && len(info.SrcMac) == 0 { info.SrcMac = lastPacket.SrcMac } if lastPacket.DstMac != nil && len(info.DstMac) == 0 { info.DstMac = lastPacket.DstMac } } p.connections.Set(info.Key, info) return info, nil } // parseIcmpv6 解析 ICMPv6 数据包 // parseIcmpv6 parses ICMPv6 packets func (p *Packets) parseIcmpv6(info PacketInfo, packet gopacket.Packet, layer gopacket.Layer, icmpv6 *layers.ICMPv6) (PacketInfo, error) { info.Type = "icmpv6" info.IcmpType = uint8(icmpv6.TypeCode.Type()) info.IcmpCode = uint8(icmpv6.TypeCode.Code()) info.IcmpChecksum = icmpv6.Checksum info.tcpPayloads = len(layer.LayerPayload()) info.stateDescript = StateIcmpv6 // ICMPv6 的 Key 格式:icmpv6://srcIP=dstIP:type:code // ICMPv6 Key format: icmpv6://srcIP=dstIP:type:code sb := getStringBuilder() defer putStringBuilder(sb) sb.WriteString("icmpv6://") sb.WriteString(info.SrcIP) sb.WriteString("=") sb.WriteString(info.DstIP) sb.WriteString(":") sb.WriteString(fmt.Sprintf("%d:%d", info.IcmpType, info.IcmpCode)) info.Key = sb.String() sb.Reset() sb.WriteString("icmpv6://") sb.WriteString(info.DstIP) sb.WriteString("=") sb.WriteString(info.SrcIP) sb.WriteString(":") sb.WriteString(fmt.Sprintf("%d:%d", info.IcmpType, info.IcmpCode)) info.ReverseKey = sb.String() info.packet = packet // 获取已存在的连接信息 / Get existing connection info lastPacket, exists := p.connections.Get(info.Key) if !exists { // 新 ICMPv6 连接 / New ICMPv6 connection info.FirstSeen = info.Timestamp info.LastSeen = info.Timestamp info.PacketCount = 1 info.ByteCount = uint64(len(packet.Data())) // 更新统计 / Update statistics p.statsLock.Lock() p.stats.Connections.TotalConnections++ atomic.AddInt64(&p.stats.Connections.IcmpConnections, 1) p.statsLock.Unlock() // 触发新连接回调 / Trigger new connection callback if p.config.OnNewConnection != nil { p.config.OnNewConnection(info) } } else { // 更新已存在的连接 / Update existing connection info.FirstSeen = lastPacket.FirstSeen info.LastSeen = info.Timestamp info.PacketCount = lastPacket.PacketCount + 1 info.ByteCount = lastPacket.ByteCount + uint64(len(packet.Data())) info.comment = lastPacket.comment // 继承 MAC 地址 / Inherit MAC addresses if lastPacket.SrcMac != nil && len(info.SrcMac) == 0 { info.SrcMac = lastPacket.SrcMac } if lastPacket.DstMac != nil && len(info.DstMac) == 0 { info.DstMac = lastPacket.DstMac } } p.connections.Set(info.Key, info) return info, nil } // ════════════════════════════════════════════════ // 辅助方法和工具函数 / Helper Methods and Utility Functions // ════════════════════════════════════════════════ // GetStateDescription 获取状态描述符的文本描述 // GetStateDescription gets text description of state descriptor func GetStateDescription(state uint8) string { switch state { case StateUnknown: return "Unknown" case StateTcpConnect1: return "TCP SYN" case StateTcpConnect2: return "TCP SYN-ACK" case StateTcpConnect3: return "TCP ACK (Connected)" case StateTcpDisconnect1: return "TCP FIN (Step 1)" case StateTcpDisconnect2: return "TCP ACK (Step 2)" case StateTcpDisconnect23: return "TCP FIN-ACK (Step 2+3)" case StateTcpDisconnect3: return "TCP FIN (Step 3)" case StateTcpDisconnect4: return "TCP ACK (Closed)" case StateTcpAckOk: return "TCP ACK" case StateTcpRetransmit: return "TCP Retransmission" case StateTcpEce: return "TCP ECE" case StateTcpCwr: return "TCP CWR" case StateTcpRst: return "TCP RST" case StateTcpKeepalive: return "TCP Keepalive" case StateUdp: return "UDP" case StateIcmp: return "ICMP" case StateIcmpv6: return "ICMPv6" default: return fmt.Sprintf("Unknown(%d)", state) } } // FormatDuration 格式化时间间隔为人类可读格式 // FormatDuration formats duration to human readable format func FormatDuration(d time.Duration) string { if d < time.Microsecond { return fmt.Sprintf("%dns", d.Nanoseconds()) } else if d < time.Millisecond { return fmt.Sprintf("%.2fµs", float64(d.Nanoseconds())/1000.0) } else if d < time.Second { return fmt.Sprintf("%.2fms", float64(d.Microseconds())/1000.0) } else if d < time.Minute { return fmt.Sprintf("%.2fs", d.Seconds()) } else if d < time.Hour { return fmt.Sprintf("%.2fm", d.Minutes()) } return fmt.Sprintf("%.2fh", d.Hours()) } // FormatBytes 格式化字节数为人类可读格式 // FormatBytes formats bytes to human readable format func FormatBytes(bytes uint64) string { const unit = 1024 if bytes < unit { return fmt.Sprintf("%d B", bytes) } div, exp := uint64(unit), 0 for n := bytes / unit; n >= unit; n /= unit { div *= unit exp++ } return fmt.Sprintf("%.2f %cB", float64(bytes)/float64(div), "KMGTPE"[exp]) } // PrintStats 打印统计信息(用于调试) // PrintStats prints statistics (for debugging) func (p *Packets) PrintStats() { stats := p.GetStats() fmt.Println("════════════════════════════════════════") fmt.Println("Packet Statistics / 数据包统计") fmt.Println("════════════════════════════════════════") fmt.Printf("Start Time / 启动时间: %s\n", stats.StartTime.Format("2006-01-02 15:04:05")) fmt.Printf("Uptime / 运行时间: %s\n", FormatDuration(time.Since(stats.StartTime))) fmt.Println() fmt.Println("Connection Statistics / 连接统计:") fmt.Printf(" Active / 活跃: %d\n", stats.Connections.ActiveConnections) fmt.Printf(" Total / 总计: %d\n", stats.Connections.TotalConnections) fmt.Printf(" Closed / 已关闭: %d\n", stats.Connections.ClosedConnections) fmt.Printf(" Timeout / 超时: %d\n", stats.Connections.TimeoutConnections) fmt.Printf(" TCP: %d\n", stats.Connections.TcpConnections) fmt.Printf(" UDP: %d\n", stats.Connections.UdpConnections) fmt.Printf(" ICMP: %d\n", stats.Connections.IcmpConnections) fmt.Println() fmt.Println("Error Statistics / 错误统计:") fmt.Printf(" Total / 总计: %d\n", stats.Errors.TotalErrors) fmt.Printf(" Link Layer / 链路层: %d\n", stats.Errors.LinkLayerErrors) fmt.Printf(" Network / 网络层: %d\n", stats.Errors.NetworkErrors) fmt.Printf(" Transport / 传输层: %d\n", stats.Errors.TransportErrors) fmt.Printf(" Unsupported / 不支持: %d\n", stats.Errors.UnsupportedErrors) if !stats.LastCleanup.IsZero() { fmt.Println() fmt.Printf("Last Cleanup / 上次清理: %s\n", stats.LastCleanup.Format("2006-01-02 15:04:05")) } fmt.Println("════════════════════════════════════════") } // ExportConnectionsToJSON 导出连接信息为 JSON 格式(示例) // ExportConnectionsToJSON exports connections to JSON format (example) func (p *Packets) ExportConnectionsToJSON() string { connections := p.GetAllConnections() sb := getStringBuilder() defer putStringBuilder(sb) sb.WriteString("[\n") for i, conn := range connections { if i > 0 { sb.WriteString(",\n") } sb.WriteString(fmt.Sprintf(` { "key": "%s", "type": "%s", "src_ip": "%s", "src_port": "%s", "dst_ip": "%s", "dst_port": "%s", "packet_count": %d, "byte_count": %d, "state": "%s", "first_seen": "%s", "last_seen": "%s", "duration": "%s" }`, conn.Key, conn.Type, conn.SrcIP, conn.SrcPort, conn.DstIP, conn.DstPort, conn.PacketCount, conn.ByteCount, GetStateDescription(conn.stateDescript), conn.FirstSeen.Format(time.RFC3339), conn.LastSeen.Format(time.RFC3339), FormatDuration(conn.LastSeen.Sub(conn.FirstSeen)), )) } sb.WriteString("\n]") return sb.String() }