package notify import ( "context" "net" "sync" ) type bulkDedicatedSidecar struct { laneID uint32 conn net.Conn closeOnce sync.Once senderMu sync.Mutex sender *bulkDedicatedLaneSender } type bulkDedicatedLane struct { id uint32 activeBulks int sidecar *bulkDedicatedSidecar attachFlight *bulkDedicatedAttachFlight } type bulkDedicatedAttachFlight struct { done chan struct{} once sync.Once err error } func normalizeBulkDedicatedLaneID(laneID uint32) uint32 { if laneID == 0 { return 1 } return laneID } func newBulkDedicatedSidecar(conn net.Conn, laneID uint32) *bulkDedicatedSidecar { if conn == nil { return nil } return &bulkDedicatedSidecar{ laneID: normalizeBulkDedicatedLaneID(laneID), conn: conn, } } func newBulkDedicatedAttachFlight() *bulkDedicatedAttachFlight { return &bulkDedicatedAttachFlight{ done: make(chan struct{}), } } func (f *bulkDedicatedAttachFlight) finish(err error) { if f == nil { return } f.once.Do(func() { f.err = err close(f.done) }) } func (f *bulkDedicatedAttachFlight) wait(ctx context.Context) error { if f == nil { return nil } if ctx == nil { ctx = context.Background() } select { case <-ctx.Done(): return ctx.Err() case <-f.done: return f.err } } func (s *bulkDedicatedSidecar) close() { if s == nil { return } s.closeOnce.Do(func() { if sender := s.laneSenderSnapshot(); sender != nil { sender.stop() } if s.conn != nil { _ = s.conn.Close() } }) } func (s *bulkDedicatedSidecar) laneSenderSnapshot() *bulkDedicatedLaneSender { if s == nil { return nil } s.senderMu.Lock() defer s.senderMu.Unlock() return s.sender } func (s *bulkDedicatedSidecar) laneSenderWithFactory(factory func(net.Conn) *bulkDedicatedLaneSender) *bulkDedicatedLaneSender { if s == nil || factory == nil { return nil } s.senderMu.Lock() defer s.senderMu.Unlock() if s.sender != nil { return s.sender } if s.conn == nil { return nil } s.sender = factory(s.conn) return s.sender } func (c *ClientCommon) clientDedicatedSidecarSnapshot() *bulkDedicatedSidecar { if c == nil { return nil } c.bulkDedicatedSidecarMu.Lock() defer c.bulkDedicatedSidecarMu.Unlock() return firstClientDedicatedSidecarLocked(c.bulkDedicatedLanes) } func firstClientDedicatedSidecarLocked(lanes map[uint32]*bulkDedicatedLane) *bulkDedicatedSidecar { var ( selected *bulkDedicatedSidecar bestID uint32 ) for laneID, lane := range lanes { if lane == nil || lane.sidecar == nil { continue } if selected == nil || laneID < bestID { selected = lane.sidecar bestID = laneID } } return selected } func (c *ClientCommon) reserveBulkDedicatedLane() uint32 { if c == nil { return normalizeBulkDedicatedLaneID(0) } c.bulkDedicatedSidecarMu.Lock() defer c.bulkDedicatedSidecarMu.Unlock() if c.bulkDedicatedLanes == nil { c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane) } limit := c.bulkDedicatedLaneLimitSnapshot() var best *bulkDedicatedLane for _, lane := range c.bulkDedicatedLanes { if lane == nil { continue } if best == nil || lane.activeBulks < best.activeBulks || (lane.activeBulks == best.activeBulks && lane.id < best.id) { best = lane } } if best == nil || ((limit <= 0 || len(c.bulkDedicatedLanes) < limit) && best.activeBulks > 0) { c.bulkDedicatedNextLaneID++ laneID := normalizeBulkDedicatedLaneID(c.bulkDedicatedNextLaneID) best = &bulkDedicatedLane{id: laneID} c.bulkDedicatedLanes[laneID] = best } best.activeBulks++ return best.id } func (c *ClientCommon) releaseBulkDedicatedLane(laneID uint32) { if c == nil { return } laneID = normalizeBulkDedicatedLaneID(laneID) c.bulkDedicatedSidecarMu.Lock() defer c.bulkDedicatedSidecarMu.Unlock() lane := c.bulkDedicatedLanes[laneID] if lane == nil { return } if lane.activeBulks > 0 { lane.activeBulks-- } if lane.activeBulks == 0 && lane.sidecar == nil && lane.attachFlight == nil { delete(c.bulkDedicatedLanes, laneID) } } func (c *ClientCommon) clientDedicatedSidecarSnapshotForLane(laneID uint32) *bulkDedicatedSidecar { if c == nil { return nil } laneID = normalizeBulkDedicatedLaneID(laneID) c.bulkDedicatedSidecarMu.Lock() defer c.bulkDedicatedSidecarMu.Unlock() if lane := c.bulkDedicatedLanes[laneID]; lane != nil { return lane.sidecar } return nil } func (c *ClientCommon) beginClientDedicatedSidecarAttach(laneID uint32) (*bulkDedicatedSidecar, *bulkDedicatedAttachFlight, bool) { if c == nil { return nil, nil, false } laneID = normalizeBulkDedicatedLaneID(laneID) c.bulkDedicatedSidecarMu.Lock() defer c.bulkDedicatedSidecarMu.Unlock() if c.bulkDedicatedLanes == nil { c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane) } lane := c.bulkDedicatedLanes[laneID] if lane == nil { lane = &bulkDedicatedLane{id: laneID} c.bulkDedicatedLanes[laneID] = lane } if lane.sidecar != nil { return lane.sidecar, nil, false } if lane.attachFlight != nil { return nil, lane.attachFlight, false } flight := newBulkDedicatedAttachFlight() lane.attachFlight = flight return nil, flight, true } func (c *ClientCommon) finishClientDedicatedSidecarAttach(laneID uint32, flight *bulkDedicatedAttachFlight, err error) { if c == nil || flight == nil { return } laneID = normalizeBulkDedicatedLaneID(laneID) c.bulkDedicatedSidecarMu.Lock() if lane := c.bulkDedicatedLanes[laneID]; lane != nil && lane.attachFlight == flight { lane.attachFlight = nil if lane.activeBulks == 0 && lane.sidecar == nil { delete(c.bulkDedicatedLanes, laneID) } } c.bulkDedicatedSidecarMu.Unlock() flight.finish(err) } func (c *ClientCommon) installClientDedicatedSidecar(laneID uint32, sidecar *bulkDedicatedSidecar) (*bulkDedicatedSidecar, bool) { if c == nil || sidecar == nil { return nil, false } laneID = normalizeBulkDedicatedLaneID(laneID) c.bulkDedicatedSidecarMu.Lock() defer c.bulkDedicatedSidecarMu.Unlock() if c.bulkDedicatedLanes == nil { c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane) } lane := c.bulkDedicatedLanes[laneID] if lane == nil { lane = &bulkDedicatedLane{id: laneID} c.bulkDedicatedLanes[laneID] = lane } if lane.sidecar != nil { return lane.sidecar, false } lane.sidecar = sidecar return sidecar, true } func (c *ClientCommon) clearClientDedicatedSidecar(laneID uint32, sidecar *bulkDedicatedSidecar) bool { if c == nil || sidecar == nil { return false } laneID = normalizeBulkDedicatedLaneID(laneID) c.bulkDedicatedSidecarMu.Lock() defer c.bulkDedicatedSidecarMu.Unlock() lane := c.bulkDedicatedLanes[laneID] if lane == nil || lane.sidecar != sidecar { return false } lane.sidecar = nil if lane.activeBulks == 0 && lane.attachFlight == nil { delete(c.bulkDedicatedLanes, laneID) } return true } func (c *ClientCommon) closeClientDedicatedSidecar() { if c == nil { return } c.bulkDedicatedSidecarMu.Lock() lanes := c.bulkDedicatedLanes c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane) c.bulkDedicatedSidecarMu.Unlock() for _, lane := range lanes { if lane == nil { continue } if lane.sidecar != nil { lane.sidecar.close() } if lane.attachFlight != nil { lane.attachFlight.finish(errServiceShutdown) } } } func (c *ClientCommon) handleClientDedicatedSidecarFailure(sidecar *bulkDedicatedSidecar, err error) { if c == nil || sidecar == nil { return } if !c.clearClientDedicatedSidecar(sidecar.laneID, sidecar) { return } runtime := c.getBulkRuntime() if runtime != nil && sidecar.conn != nil { runtime.handleDedicatedReadErrorByConn(clientFileScope(), sidecar.conn, err) } sidecar.close() } func (s *ServerCommon) serverDedicatedSidecarSnapshot(logical *LogicalConn) *bulkDedicatedSidecar { if s == nil || logical == nil { return nil } s.bulkDedicatedSidecarMu.Lock() defer s.bulkDedicatedSidecarMu.Unlock() return firstServerDedicatedSidecarLocked(s.bulkDedicatedSidecars[logical]) } func firstServerDedicatedSidecarLocked(lanes map[uint32]*bulkDedicatedSidecar) *bulkDedicatedSidecar { var ( selected *bulkDedicatedSidecar bestID uint32 ) for laneID, sidecar := range lanes { if sidecar == nil { continue } if selected == nil || laneID < bestID { selected = sidecar bestID = laneID } } return selected } func (s *ServerCommon) serverDedicatedSidecarSnapshotForLane(logical *LogicalConn, laneID uint32) *bulkDedicatedSidecar { if s == nil || logical == nil { return nil } laneID = normalizeBulkDedicatedLaneID(laneID) s.bulkDedicatedSidecarMu.Lock() defer s.bulkDedicatedSidecarMu.Unlock() if lanes := s.bulkDedicatedSidecars[logical]; lanes != nil { return lanes[laneID] } return nil } func (s *ServerCommon) installServerDedicatedSidecar(logical *LogicalConn, laneID uint32, sidecar *bulkDedicatedSidecar) *bulkDedicatedSidecar { if s == nil || logical == nil || sidecar == nil { return nil } laneID = normalizeBulkDedicatedLaneID(laneID) s.bulkDedicatedSidecarMu.Lock() defer s.bulkDedicatedSidecarMu.Unlock() lanes := s.bulkDedicatedSidecars[logical] if lanes == nil { lanes = make(map[uint32]*bulkDedicatedSidecar) s.bulkDedicatedSidecars[logical] = lanes } prev := lanes[laneID] lanes[laneID] = sidecar return prev } func (s *ServerCommon) clearServerDedicatedSidecar(logical *LogicalConn, laneID uint32, sidecar *bulkDedicatedSidecar) bool { if s == nil || logical == nil || sidecar == nil { return false } laneID = normalizeBulkDedicatedLaneID(laneID) s.bulkDedicatedSidecarMu.Lock() defer s.bulkDedicatedSidecarMu.Unlock() lanes := s.bulkDedicatedSidecars[logical] if lanes == nil || lanes[laneID] != sidecar { return false } delete(lanes, laneID) if len(lanes) == 0 { delete(s.bulkDedicatedSidecars, logical) } return true } func (s *ServerCommon) closeServerDedicatedSidecar(logical *LogicalConn) { if s == nil || logical == nil { return } s.bulkDedicatedSidecarMu.Lock() lanes := s.bulkDedicatedSidecars[logical] delete(s.bulkDedicatedSidecars, logical) s.bulkDedicatedSidecarMu.Unlock() for _, sidecar := range lanes { if sidecar != nil { sidecar.close() } } } func (s *ServerCommon) closeAllServerDedicatedSidecars() { if s == nil { return } s.bulkDedicatedSidecarMu.Lock() lanesByLogical := s.bulkDedicatedSidecars s.bulkDedicatedSidecars = make(map[*LogicalConn]map[uint32]*bulkDedicatedSidecar) s.bulkDedicatedSidecarMu.Unlock() for _, lanes := range lanesByLogical { for _, sidecar := range lanes { if sidecar != nil { sidecar.close() } } } } func (s *ServerCommon) handleServerDedicatedSidecarFailure(logical *LogicalConn, sidecar *bulkDedicatedSidecar, err error) { if s == nil || logical == nil || sidecar == nil { return } if !s.clearServerDedicatedSidecar(logical, sidecar.laneID, sidecar) { return } runtime := s.getBulkRuntime() if runtime != nil && sidecar.conn != nil { runtime.handleDedicatedReadErrorByConn(serverFileScope(logical), sidecar.conn, err) } sidecar.close() } func (s *ServerCommon) attachServerDedicatedSidecarIfExists(logical *LogicalConn, bulk *bulkHandle) { if s == nil || logical == nil || bulk == nil || !bulk.Dedicated() { return } sidecar := s.serverDedicatedSidecarSnapshotForLane(logical, bulk.dedicatedLaneIDSnapshot()) if sidecar == nil || sidecar.conn == nil { return } _ = bulk.attachDedicatedConnShared(sidecar.conn) }