notify/bulk_dedicated_sidecar.go

452 lines
11 KiB
Go
Raw Permalink Normal View History

package notify
import (
"context"
"net"
"sync"
)
type bulkDedicatedSidecar struct {
laneID uint32
conn net.Conn
closeOnce sync.Once
senderMu sync.Mutex
sender *bulkDedicatedLaneSender
}
type bulkDedicatedLane struct {
id uint32
activeBulks int
sidecar *bulkDedicatedSidecar
attachFlight *bulkDedicatedAttachFlight
}
type bulkDedicatedAttachFlight struct {
done chan struct{}
once sync.Once
err error
}
func normalizeBulkDedicatedLaneID(laneID uint32) uint32 {
if laneID == 0 {
return 1
}
return laneID
}
func newBulkDedicatedSidecar(conn net.Conn, laneID uint32) *bulkDedicatedSidecar {
if conn == nil {
return nil
}
return &bulkDedicatedSidecar{
laneID: normalizeBulkDedicatedLaneID(laneID),
conn: conn,
}
}
func newBulkDedicatedAttachFlight() *bulkDedicatedAttachFlight {
return &bulkDedicatedAttachFlight{
done: make(chan struct{}),
}
}
func (f *bulkDedicatedAttachFlight) finish(err error) {
if f == nil {
return
}
f.once.Do(func() {
f.err = err
close(f.done)
})
}
func (f *bulkDedicatedAttachFlight) wait(ctx context.Context) error {
if f == nil {
return nil
}
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
return ctx.Err()
case <-f.done:
return f.err
}
}
func (s *bulkDedicatedSidecar) close() {
if s == nil {
return
}
s.closeOnce.Do(func() {
if sender := s.laneSenderSnapshot(); sender != nil {
sender.stop()
}
if s.conn != nil {
_ = s.conn.Close()
}
})
}
func (s *bulkDedicatedSidecar) laneSenderSnapshot() *bulkDedicatedLaneSender {
if s == nil {
return nil
}
s.senderMu.Lock()
defer s.senderMu.Unlock()
return s.sender
}
func (s *bulkDedicatedSidecar) laneSenderWithFactory(factory func(net.Conn) *bulkDedicatedLaneSender) *bulkDedicatedLaneSender {
if s == nil || factory == nil {
return nil
}
s.senderMu.Lock()
defer s.senderMu.Unlock()
if s.sender != nil {
return s.sender
}
if s.conn == nil {
return nil
}
s.sender = factory(s.conn)
return s.sender
}
func (c *ClientCommon) clientDedicatedSidecarSnapshot() *bulkDedicatedSidecar {
if c == nil {
return nil
}
c.bulkDedicatedSidecarMu.Lock()
defer c.bulkDedicatedSidecarMu.Unlock()
return firstClientDedicatedSidecarLocked(c.bulkDedicatedLanes)
}
func firstClientDedicatedSidecarLocked(lanes map[uint32]*bulkDedicatedLane) *bulkDedicatedSidecar {
var (
selected *bulkDedicatedSidecar
bestID uint32
)
for laneID, lane := range lanes {
if lane == nil || lane.sidecar == nil {
continue
}
if selected == nil || laneID < bestID {
selected = lane.sidecar
bestID = laneID
}
}
return selected
}
func (c *ClientCommon) reserveBulkDedicatedLane() uint32 {
if c == nil {
return normalizeBulkDedicatedLaneID(0)
}
c.bulkDedicatedSidecarMu.Lock()
defer c.bulkDedicatedSidecarMu.Unlock()
if c.bulkDedicatedLanes == nil {
c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
}
limit := c.bulkDedicatedLaneLimitSnapshot()
var best *bulkDedicatedLane
for _, lane := range c.bulkDedicatedLanes {
if lane == nil {
continue
}
if best == nil || lane.activeBulks < best.activeBulks || (lane.activeBulks == best.activeBulks && lane.id < best.id) {
best = lane
}
}
if best == nil || ((limit <= 0 || len(c.bulkDedicatedLanes) < limit) && best.activeBulks > 0) {
c.bulkDedicatedNextLaneID++
laneID := normalizeBulkDedicatedLaneID(c.bulkDedicatedNextLaneID)
best = &bulkDedicatedLane{id: laneID}
c.bulkDedicatedLanes[laneID] = best
}
best.activeBulks++
return best.id
}
func (c *ClientCommon) releaseBulkDedicatedLane(laneID uint32) {
if c == nil {
return
}
laneID = normalizeBulkDedicatedLaneID(laneID)
c.bulkDedicatedSidecarMu.Lock()
defer c.bulkDedicatedSidecarMu.Unlock()
lane := c.bulkDedicatedLanes[laneID]
if lane == nil {
return
}
if lane.activeBulks > 0 {
lane.activeBulks--
}
if lane.activeBulks == 0 && lane.sidecar == nil && lane.attachFlight == nil {
delete(c.bulkDedicatedLanes, laneID)
}
}
func (c *ClientCommon) clientDedicatedSidecarSnapshotForLane(laneID uint32) *bulkDedicatedSidecar {
if c == nil {
return nil
}
laneID = normalizeBulkDedicatedLaneID(laneID)
c.bulkDedicatedSidecarMu.Lock()
defer c.bulkDedicatedSidecarMu.Unlock()
if lane := c.bulkDedicatedLanes[laneID]; lane != nil {
return lane.sidecar
}
return nil
}
func (c *ClientCommon) beginClientDedicatedSidecarAttach(laneID uint32) (*bulkDedicatedSidecar, *bulkDedicatedAttachFlight, bool) {
if c == nil {
return nil, nil, false
}
laneID = normalizeBulkDedicatedLaneID(laneID)
c.bulkDedicatedSidecarMu.Lock()
defer c.bulkDedicatedSidecarMu.Unlock()
if c.bulkDedicatedLanes == nil {
c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
}
lane := c.bulkDedicatedLanes[laneID]
if lane == nil {
lane = &bulkDedicatedLane{id: laneID}
c.bulkDedicatedLanes[laneID] = lane
}
if lane.sidecar != nil {
return lane.sidecar, nil, false
}
if lane.attachFlight != nil {
return nil, lane.attachFlight, false
}
flight := newBulkDedicatedAttachFlight()
lane.attachFlight = flight
return nil, flight, true
}
func (c *ClientCommon) finishClientDedicatedSidecarAttach(laneID uint32, flight *bulkDedicatedAttachFlight, err error) {
if c == nil || flight == nil {
return
}
laneID = normalizeBulkDedicatedLaneID(laneID)
c.bulkDedicatedSidecarMu.Lock()
if lane := c.bulkDedicatedLanes[laneID]; lane != nil && lane.attachFlight == flight {
lane.attachFlight = nil
if lane.activeBulks == 0 && lane.sidecar == nil {
delete(c.bulkDedicatedLanes, laneID)
}
}
c.bulkDedicatedSidecarMu.Unlock()
flight.finish(err)
}
func (c *ClientCommon) installClientDedicatedSidecar(laneID uint32, sidecar *bulkDedicatedSidecar) (*bulkDedicatedSidecar, bool) {
if c == nil || sidecar == nil {
return nil, false
}
laneID = normalizeBulkDedicatedLaneID(laneID)
c.bulkDedicatedSidecarMu.Lock()
defer c.bulkDedicatedSidecarMu.Unlock()
if c.bulkDedicatedLanes == nil {
c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
}
lane := c.bulkDedicatedLanes[laneID]
if lane == nil {
lane = &bulkDedicatedLane{id: laneID}
c.bulkDedicatedLanes[laneID] = lane
}
if lane.sidecar != nil {
return lane.sidecar, false
}
lane.sidecar = sidecar
return sidecar, true
}
func (c *ClientCommon) clearClientDedicatedSidecar(laneID uint32, sidecar *bulkDedicatedSidecar) bool {
if c == nil || sidecar == nil {
return false
}
laneID = normalizeBulkDedicatedLaneID(laneID)
c.bulkDedicatedSidecarMu.Lock()
defer c.bulkDedicatedSidecarMu.Unlock()
lane := c.bulkDedicatedLanes[laneID]
if lane == nil || lane.sidecar != sidecar {
return false
}
lane.sidecar = nil
if lane.activeBulks == 0 && lane.attachFlight == nil {
delete(c.bulkDedicatedLanes, laneID)
}
return true
}
func (c *ClientCommon) closeClientDedicatedSidecar() {
if c == nil {
return
}
c.bulkDedicatedSidecarMu.Lock()
lanes := c.bulkDedicatedLanes
c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
c.bulkDedicatedSidecarMu.Unlock()
for _, lane := range lanes {
if lane == nil {
continue
}
if lane.sidecar != nil {
lane.sidecar.close()
}
if lane.attachFlight != nil {
lane.attachFlight.finish(errServiceShutdown)
}
}
}
func (c *ClientCommon) handleClientDedicatedSidecarFailure(sidecar *bulkDedicatedSidecar, err error) {
if c == nil || sidecar == nil {
return
}
if !c.clearClientDedicatedSidecar(sidecar.laneID, sidecar) {
return
}
runtime := c.getBulkRuntime()
if runtime != nil && sidecar.conn != nil {
runtime.handleDedicatedReadErrorByConn(clientFileScope(), sidecar.conn, err)
}
sidecar.close()
}
func (s *ServerCommon) serverDedicatedSidecarSnapshot(logical *LogicalConn) *bulkDedicatedSidecar {
if s == nil || logical == nil {
return nil
}
s.bulkDedicatedSidecarMu.Lock()
defer s.bulkDedicatedSidecarMu.Unlock()
return firstServerDedicatedSidecarLocked(s.bulkDedicatedSidecars[logical])
}
func firstServerDedicatedSidecarLocked(lanes map[uint32]*bulkDedicatedSidecar) *bulkDedicatedSidecar {
var (
selected *bulkDedicatedSidecar
bestID uint32
)
for laneID, sidecar := range lanes {
if sidecar == nil {
continue
}
if selected == nil || laneID < bestID {
selected = sidecar
bestID = laneID
}
}
return selected
}
func (s *ServerCommon) serverDedicatedSidecarSnapshotForLane(logical *LogicalConn, laneID uint32) *bulkDedicatedSidecar {
if s == nil || logical == nil {
return nil
}
laneID = normalizeBulkDedicatedLaneID(laneID)
s.bulkDedicatedSidecarMu.Lock()
defer s.bulkDedicatedSidecarMu.Unlock()
if lanes := s.bulkDedicatedSidecars[logical]; lanes != nil {
return lanes[laneID]
}
return nil
}
func (s *ServerCommon) installServerDedicatedSidecar(logical *LogicalConn, laneID uint32, sidecar *bulkDedicatedSidecar) *bulkDedicatedSidecar {
if s == nil || logical == nil || sidecar == nil {
return nil
}
laneID = normalizeBulkDedicatedLaneID(laneID)
s.bulkDedicatedSidecarMu.Lock()
defer s.bulkDedicatedSidecarMu.Unlock()
lanes := s.bulkDedicatedSidecars[logical]
if lanes == nil {
lanes = make(map[uint32]*bulkDedicatedSidecar)
s.bulkDedicatedSidecars[logical] = lanes
}
prev := lanes[laneID]
lanes[laneID] = sidecar
return prev
}
func (s *ServerCommon) clearServerDedicatedSidecar(logical *LogicalConn, laneID uint32, sidecar *bulkDedicatedSidecar) bool {
if s == nil || logical == nil || sidecar == nil {
return false
}
laneID = normalizeBulkDedicatedLaneID(laneID)
s.bulkDedicatedSidecarMu.Lock()
defer s.bulkDedicatedSidecarMu.Unlock()
lanes := s.bulkDedicatedSidecars[logical]
if lanes == nil || lanes[laneID] != sidecar {
return false
}
delete(lanes, laneID)
if len(lanes) == 0 {
delete(s.bulkDedicatedSidecars, logical)
}
return true
}
func (s *ServerCommon) closeServerDedicatedSidecar(logical *LogicalConn) {
if s == nil || logical == nil {
return
}
s.bulkDedicatedSidecarMu.Lock()
lanes := s.bulkDedicatedSidecars[logical]
delete(s.bulkDedicatedSidecars, logical)
s.bulkDedicatedSidecarMu.Unlock()
for _, sidecar := range lanes {
if sidecar != nil {
sidecar.close()
}
}
}
func (s *ServerCommon) closeAllServerDedicatedSidecars() {
if s == nil {
return
}
s.bulkDedicatedSidecarMu.Lock()
lanesByLogical := s.bulkDedicatedSidecars
s.bulkDedicatedSidecars = make(map[*LogicalConn]map[uint32]*bulkDedicatedSidecar)
s.bulkDedicatedSidecarMu.Unlock()
for _, lanes := range lanesByLogical {
for _, sidecar := range lanes {
if sidecar != nil {
sidecar.close()
}
}
}
}
func (s *ServerCommon) handleServerDedicatedSidecarFailure(logical *LogicalConn, sidecar *bulkDedicatedSidecar, err error) {
if s == nil || logical == nil || sidecar == nil {
return
}
if !s.clearServerDedicatedSidecar(logical, sidecar.laneID, sidecar) {
return
}
runtime := s.getBulkRuntime()
if runtime != nil && sidecar.conn != nil {
runtime.handleDedicatedReadErrorByConn(serverFileScope(logical), sidecar.conn, err)
}
sidecar.close()
}
func (s *ServerCommon) attachServerDedicatedSidecarIfExists(logical *LogicalConn, bulk *bulkHandle) {
if s == nil || logical == nil || bulk == nil || !bulk.Dedicated() {
return
}
sidecar := s.serverDedicatedSidecarSnapshotForLane(logical, bulk.dedicatedLaneIDSnapshot())
if sidecar == nil || sidecar.conn == nil {
return
}
_ = bulk.attachDedicatedConnShared(sidecar.conn)
}