notify/server_monitor.go

98 lines
2.6 KiB
Go
Raw Permalink Normal View History

package notify
import "time"
type expiredDetachedClientCandidate struct {
logical *LogicalConn
detachedAt time.Time
}
func (s *ServerCommon) shutdownMonitorPool() {
s.getPendingWaitPool().closeAll()
s.getFileAckPool().closeAll()
s.getSignalAckPool().closeAll()
}
func (s *ServerCommon) monitorPoolTick(now time.Time) {
s.getPendingWaitPool().cleanupExpired(s.noFinSyncMsgMaxKeepSeconds, now)
s.cleanupExpiredDetachedClients(now)
s.cleanupLostHeartbeatClients(now)
}
func (s *ServerCommon) cleanupLostHeartbeatClients(now time.Time) {
if s.maxHeartbeatLostSeconds == 0 {
return
}
for _, logical := range s.snapshotLostHeartbeatClients(now.Unix()) {
if logical.shouldPreserveLogicalPeerOnTransportLoss() {
s.detachLogicalSessionTransport(logical, "heartbeat timeout", nil)
continue
}
s.stopLogicalSession(logical, "heartbeat timeout", nil)
}
}
func (s *ServerCommon) snapshotLostHeartbeatClients(nowUnix int64) []*LogicalConn {
allLogicals := s.GetLogicalConnList()
logicals := make([]*LogicalConn, 0, len(allLogicals))
for _, logical := range allLogicals {
if logical == nil {
continue
}
if logical.shouldPreserveLogicalPeerOnTransportLoss() && !logical.transportAttachedSnapshot() {
continue
}
if nowUnix-logical.lastHeartbeatUnixSnapshot() > s.maxHeartbeatLostSeconds {
logicals = append(logicals, logical)
}
}
return logicals
}
func (s *ServerCommon) cleanupExpiredDetachedClients(now time.Time) {
keepSec := s.DetachedClientKeepSec()
if keepSec <= 0 {
return
}
keep := time.Duration(keepSec) * time.Second
for _, candidate := range s.snapshotExpiredDetachedClients(now, keep) {
logical := candidate.logical
if logical == nil || !logical.logicalTransportDetachedSnapshot() {
continue
}
detach := logical.transportDetachSnapshot()
if detach == nil || !detach.At.Equal(candidate.detachedAt) {
continue
}
if now.Sub(detach.At) < keep {
continue
}
s.stopLogicalSession(logical, "detached transport expired", nil)
}
}
func (s *ServerCommon) snapshotExpiredDetachedClients(now time.Time, keep time.Duration) []expiredDetachedClientCandidate {
if keep <= 0 {
return nil
}
allLogicals := s.GetLogicalConnList()
clients := make([]expiredDetachedClientCandidate, 0, len(allLogicals))
for _, logical := range allLogicals {
if logical == nil || !logical.logicalTransportDetachedSnapshot() {
continue
}
detach := logical.transportDetachSnapshot()
if detach == nil || detach.At.IsZero() {
continue
}
if now.Sub(detach.At) < keep {
continue
}
clients = append(clients, expiredDetachedClientCandidate{
logical: logical,
detachedAt: detach.At,
})
}
return clients
}