452 lines
11 KiB
Go
452 lines
11 KiB
Go
|
|
package notify
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"net"
|
||
|
|
"sync"
|
||
|
|
)
|
||
|
|
|
||
|
|
type bulkDedicatedSidecar struct {
|
||
|
|
laneID uint32
|
||
|
|
conn net.Conn
|
||
|
|
closeOnce sync.Once
|
||
|
|
|
||
|
|
senderMu sync.Mutex
|
||
|
|
sender *bulkDedicatedLaneSender
|
||
|
|
}
|
||
|
|
|
||
|
|
type bulkDedicatedLane struct {
|
||
|
|
id uint32
|
||
|
|
activeBulks int
|
||
|
|
sidecar *bulkDedicatedSidecar
|
||
|
|
attachFlight *bulkDedicatedAttachFlight
|
||
|
|
}
|
||
|
|
|
||
|
|
type bulkDedicatedAttachFlight struct {
|
||
|
|
done chan struct{}
|
||
|
|
once sync.Once
|
||
|
|
err error
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizeBulkDedicatedLaneID(laneID uint32) uint32 {
|
||
|
|
if laneID == 0 {
|
||
|
|
return 1
|
||
|
|
}
|
||
|
|
return laneID
|
||
|
|
}
|
||
|
|
|
||
|
|
func newBulkDedicatedSidecar(conn net.Conn, laneID uint32) *bulkDedicatedSidecar {
|
||
|
|
if conn == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return &bulkDedicatedSidecar{
|
||
|
|
laneID: normalizeBulkDedicatedLaneID(laneID),
|
||
|
|
conn: conn,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func newBulkDedicatedAttachFlight() *bulkDedicatedAttachFlight {
|
||
|
|
return &bulkDedicatedAttachFlight{
|
||
|
|
done: make(chan struct{}),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (f *bulkDedicatedAttachFlight) finish(err error) {
|
||
|
|
if f == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
f.once.Do(func() {
|
||
|
|
f.err = err
|
||
|
|
close(f.done)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (f *bulkDedicatedAttachFlight) wait(ctx context.Context) error {
|
||
|
|
if f == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
if ctx == nil {
|
||
|
|
ctx = context.Background()
|
||
|
|
}
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
return ctx.Err()
|
||
|
|
case <-f.done:
|
||
|
|
return f.err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *bulkDedicatedSidecar) close() {
|
||
|
|
if s == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
s.closeOnce.Do(func() {
|
||
|
|
if sender := s.laneSenderSnapshot(); sender != nil {
|
||
|
|
sender.stop()
|
||
|
|
}
|
||
|
|
if s.conn != nil {
|
||
|
|
_ = s.conn.Close()
|
||
|
|
}
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *bulkDedicatedSidecar) laneSenderSnapshot() *bulkDedicatedLaneSender {
|
||
|
|
if s == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
s.senderMu.Lock()
|
||
|
|
defer s.senderMu.Unlock()
|
||
|
|
return s.sender
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *bulkDedicatedSidecar) laneSenderWithFactory(factory func(net.Conn) *bulkDedicatedLaneSender) *bulkDedicatedLaneSender {
|
||
|
|
if s == nil || factory == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
s.senderMu.Lock()
|
||
|
|
defer s.senderMu.Unlock()
|
||
|
|
if s.sender != nil {
|
||
|
|
return s.sender
|
||
|
|
}
|
||
|
|
if s.conn == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
s.sender = factory(s.conn)
|
||
|
|
return s.sender
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) clientDedicatedSidecarSnapshot() *bulkDedicatedSidecar {
|
||
|
|
if c == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
return firstClientDedicatedSidecarLocked(c.bulkDedicatedLanes)
|
||
|
|
}
|
||
|
|
|
||
|
|
func firstClientDedicatedSidecarLocked(lanes map[uint32]*bulkDedicatedLane) *bulkDedicatedSidecar {
|
||
|
|
var (
|
||
|
|
selected *bulkDedicatedSidecar
|
||
|
|
bestID uint32
|
||
|
|
)
|
||
|
|
for laneID, lane := range lanes {
|
||
|
|
if lane == nil || lane.sidecar == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if selected == nil || laneID < bestID {
|
||
|
|
selected = lane.sidecar
|
||
|
|
bestID = laneID
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return selected
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) reserveBulkDedicatedLane() uint32 {
|
||
|
|
if c == nil {
|
||
|
|
return normalizeBulkDedicatedLaneID(0)
|
||
|
|
}
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
if c.bulkDedicatedLanes == nil {
|
||
|
|
c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
|
||
|
|
}
|
||
|
|
limit := c.bulkDedicatedLaneLimitSnapshot()
|
||
|
|
var best *bulkDedicatedLane
|
||
|
|
for _, lane := range c.bulkDedicatedLanes {
|
||
|
|
if lane == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if best == nil || lane.activeBulks < best.activeBulks || (lane.activeBulks == best.activeBulks && lane.id < best.id) {
|
||
|
|
best = lane
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if best == nil || ((limit <= 0 || len(c.bulkDedicatedLanes) < limit) && best.activeBulks > 0) {
|
||
|
|
c.bulkDedicatedNextLaneID++
|
||
|
|
laneID := normalizeBulkDedicatedLaneID(c.bulkDedicatedNextLaneID)
|
||
|
|
best = &bulkDedicatedLane{id: laneID}
|
||
|
|
c.bulkDedicatedLanes[laneID] = best
|
||
|
|
}
|
||
|
|
best.activeBulks++
|
||
|
|
return best.id
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) releaseBulkDedicatedLane(laneID uint32) {
|
||
|
|
if c == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
lane := c.bulkDedicatedLanes[laneID]
|
||
|
|
if lane == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if lane.activeBulks > 0 {
|
||
|
|
lane.activeBulks--
|
||
|
|
}
|
||
|
|
if lane.activeBulks == 0 && lane.sidecar == nil && lane.attachFlight == nil {
|
||
|
|
delete(c.bulkDedicatedLanes, laneID)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) clientDedicatedSidecarSnapshotForLane(laneID uint32) *bulkDedicatedSidecar {
|
||
|
|
if c == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
if lane := c.bulkDedicatedLanes[laneID]; lane != nil {
|
||
|
|
return lane.sidecar
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) beginClientDedicatedSidecarAttach(laneID uint32) (*bulkDedicatedSidecar, *bulkDedicatedAttachFlight, bool) {
|
||
|
|
if c == nil {
|
||
|
|
return nil, nil, false
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
if c.bulkDedicatedLanes == nil {
|
||
|
|
c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
|
||
|
|
}
|
||
|
|
lane := c.bulkDedicatedLanes[laneID]
|
||
|
|
if lane == nil {
|
||
|
|
lane = &bulkDedicatedLane{id: laneID}
|
||
|
|
c.bulkDedicatedLanes[laneID] = lane
|
||
|
|
}
|
||
|
|
if lane.sidecar != nil {
|
||
|
|
return lane.sidecar, nil, false
|
||
|
|
}
|
||
|
|
if lane.attachFlight != nil {
|
||
|
|
return nil, lane.attachFlight, false
|
||
|
|
}
|
||
|
|
flight := newBulkDedicatedAttachFlight()
|
||
|
|
lane.attachFlight = flight
|
||
|
|
return nil, flight, true
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) finishClientDedicatedSidecarAttach(laneID uint32, flight *bulkDedicatedAttachFlight, err error) {
|
||
|
|
if c == nil || flight == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
if lane := c.bulkDedicatedLanes[laneID]; lane != nil && lane.attachFlight == flight {
|
||
|
|
lane.attachFlight = nil
|
||
|
|
if lane.activeBulks == 0 && lane.sidecar == nil {
|
||
|
|
delete(c.bulkDedicatedLanes, laneID)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
flight.finish(err)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) installClientDedicatedSidecar(laneID uint32, sidecar *bulkDedicatedSidecar) (*bulkDedicatedSidecar, bool) {
|
||
|
|
if c == nil || sidecar == nil {
|
||
|
|
return nil, false
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
if c.bulkDedicatedLanes == nil {
|
||
|
|
c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
|
||
|
|
}
|
||
|
|
lane := c.bulkDedicatedLanes[laneID]
|
||
|
|
if lane == nil {
|
||
|
|
lane = &bulkDedicatedLane{id: laneID}
|
||
|
|
c.bulkDedicatedLanes[laneID] = lane
|
||
|
|
}
|
||
|
|
if lane.sidecar != nil {
|
||
|
|
return lane.sidecar, false
|
||
|
|
}
|
||
|
|
lane.sidecar = sidecar
|
||
|
|
return sidecar, true
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) clearClientDedicatedSidecar(laneID uint32, sidecar *bulkDedicatedSidecar) bool {
|
||
|
|
if c == nil || sidecar == nil {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
lane := c.bulkDedicatedLanes[laneID]
|
||
|
|
if lane == nil || lane.sidecar != sidecar {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
lane.sidecar = nil
|
||
|
|
if lane.activeBulks == 0 && lane.attachFlight == nil {
|
||
|
|
delete(c.bulkDedicatedLanes, laneID)
|
||
|
|
}
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) closeClientDedicatedSidecar() {
|
||
|
|
if c == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
c.bulkDedicatedSidecarMu.Lock()
|
||
|
|
lanes := c.bulkDedicatedLanes
|
||
|
|
c.bulkDedicatedLanes = make(map[uint32]*bulkDedicatedLane)
|
||
|
|
c.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
for _, lane := range lanes {
|
||
|
|
if lane == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if lane.sidecar != nil {
|
||
|
|
lane.sidecar.close()
|
||
|
|
}
|
||
|
|
if lane.attachFlight != nil {
|
||
|
|
lane.attachFlight.finish(errServiceShutdown)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *ClientCommon) handleClientDedicatedSidecarFailure(sidecar *bulkDedicatedSidecar, err error) {
|
||
|
|
if c == nil || sidecar == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if !c.clearClientDedicatedSidecar(sidecar.laneID, sidecar) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
runtime := c.getBulkRuntime()
|
||
|
|
if runtime != nil && sidecar.conn != nil {
|
||
|
|
runtime.handleDedicatedReadErrorByConn(clientFileScope(), sidecar.conn, err)
|
||
|
|
}
|
||
|
|
sidecar.close()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) serverDedicatedSidecarSnapshot(logical *LogicalConn) *bulkDedicatedSidecar {
|
||
|
|
if s == nil || logical == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
s.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer s.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
return firstServerDedicatedSidecarLocked(s.bulkDedicatedSidecars[logical])
|
||
|
|
}
|
||
|
|
|
||
|
|
func firstServerDedicatedSidecarLocked(lanes map[uint32]*bulkDedicatedSidecar) *bulkDedicatedSidecar {
|
||
|
|
var (
|
||
|
|
selected *bulkDedicatedSidecar
|
||
|
|
bestID uint32
|
||
|
|
)
|
||
|
|
for laneID, sidecar := range lanes {
|
||
|
|
if sidecar == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if selected == nil || laneID < bestID {
|
||
|
|
selected = sidecar
|
||
|
|
bestID = laneID
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return selected
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) serverDedicatedSidecarSnapshotForLane(logical *LogicalConn, laneID uint32) *bulkDedicatedSidecar {
|
||
|
|
if s == nil || logical == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
s.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer s.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
if lanes := s.bulkDedicatedSidecars[logical]; lanes != nil {
|
||
|
|
return lanes[laneID]
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) installServerDedicatedSidecar(logical *LogicalConn, laneID uint32, sidecar *bulkDedicatedSidecar) *bulkDedicatedSidecar {
|
||
|
|
if s == nil || logical == nil || sidecar == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
s.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer s.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
lanes := s.bulkDedicatedSidecars[logical]
|
||
|
|
if lanes == nil {
|
||
|
|
lanes = make(map[uint32]*bulkDedicatedSidecar)
|
||
|
|
s.bulkDedicatedSidecars[logical] = lanes
|
||
|
|
}
|
||
|
|
prev := lanes[laneID]
|
||
|
|
lanes[laneID] = sidecar
|
||
|
|
return prev
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) clearServerDedicatedSidecar(logical *LogicalConn, laneID uint32, sidecar *bulkDedicatedSidecar) bool {
|
||
|
|
if s == nil || logical == nil || sidecar == nil {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
laneID = normalizeBulkDedicatedLaneID(laneID)
|
||
|
|
s.bulkDedicatedSidecarMu.Lock()
|
||
|
|
defer s.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
lanes := s.bulkDedicatedSidecars[logical]
|
||
|
|
if lanes == nil || lanes[laneID] != sidecar {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
delete(lanes, laneID)
|
||
|
|
if len(lanes) == 0 {
|
||
|
|
delete(s.bulkDedicatedSidecars, logical)
|
||
|
|
}
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) closeServerDedicatedSidecar(logical *LogicalConn) {
|
||
|
|
if s == nil || logical == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
s.bulkDedicatedSidecarMu.Lock()
|
||
|
|
lanes := s.bulkDedicatedSidecars[logical]
|
||
|
|
delete(s.bulkDedicatedSidecars, logical)
|
||
|
|
s.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
for _, sidecar := range lanes {
|
||
|
|
if sidecar != nil {
|
||
|
|
sidecar.close()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) closeAllServerDedicatedSidecars() {
|
||
|
|
if s == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
s.bulkDedicatedSidecarMu.Lock()
|
||
|
|
lanesByLogical := s.bulkDedicatedSidecars
|
||
|
|
s.bulkDedicatedSidecars = make(map[*LogicalConn]map[uint32]*bulkDedicatedSidecar)
|
||
|
|
s.bulkDedicatedSidecarMu.Unlock()
|
||
|
|
for _, lanes := range lanesByLogical {
|
||
|
|
for _, sidecar := range lanes {
|
||
|
|
if sidecar != nil {
|
||
|
|
sidecar.close()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) handleServerDedicatedSidecarFailure(logical *LogicalConn, sidecar *bulkDedicatedSidecar, err error) {
|
||
|
|
if s == nil || logical == nil || sidecar == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if !s.clearServerDedicatedSidecar(logical, sidecar.laneID, sidecar) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
runtime := s.getBulkRuntime()
|
||
|
|
if runtime != nil && sidecar.conn != nil {
|
||
|
|
runtime.handleDedicatedReadErrorByConn(serverFileScope(logical), sidecar.conn, err)
|
||
|
|
}
|
||
|
|
sidecar.close()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *ServerCommon) attachServerDedicatedSidecarIfExists(logical *LogicalConn, bulk *bulkHandle) {
|
||
|
|
if s == nil || logical == nil || bulk == nil || !bulk.Dedicated() {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
sidecar := s.serverDedicatedSidecarSnapshotForLane(logical, bulk.dedicatedLaneIDSnapshot())
|
||
|
|
if sidecar == nil || sidecar.conn == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
_ = bulk.attachDedicatedConnShared(sidecar.conn)
|
||
|
|
}
|