|
|
@ -2,10 +2,10 @@ package notify
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
"b612.me/stario"
|
|
|
|
"b612.me/stario"
|
|
|
|
"b612.me/starnet"
|
|
|
|
|
|
|
|
"context"
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"math"
|
|
|
|
"math/rand"
|
|
|
|
"math/rand"
|
|
|
|
"net"
|
|
|
|
"net"
|
|
|
|
"os"
|
|
|
|
"os"
|
|
|
@ -21,7 +21,7 @@ type ServerCommon struct {
|
|
|
|
status Status
|
|
|
|
status Status
|
|
|
|
listener net.Listener
|
|
|
|
listener net.Listener
|
|
|
|
udpListener *net.UDPConn
|
|
|
|
udpListener *net.UDPConn
|
|
|
|
queue *starnet.StarQueue
|
|
|
|
queue *stario.StarQueue
|
|
|
|
stopFn context.CancelFunc
|
|
|
|
stopFn context.CancelFunc
|
|
|
|
stopCtx context.Context
|
|
|
|
stopCtx context.Context
|
|
|
|
maxReadTimeout time.Duration
|
|
|
|
maxReadTimeout time.Duration
|
|
|
@ -42,6 +42,7 @@ type ServerCommon struct {
|
|
|
|
sequenceDe func([]byte) (interface{}, error)
|
|
|
|
sequenceDe func([]byte) (interface{}, error)
|
|
|
|
sequenceEn func(interface{}) ([]byte, error)
|
|
|
|
sequenceEn func(interface{}) ([]byte, error)
|
|
|
|
showError bool
|
|
|
|
showError bool
|
|
|
|
|
|
|
|
debugMode bool
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewServer() Server {
|
|
|
|
func NewServer() Server {
|
|
|
@ -65,6 +66,19 @@ func NewServer() Server {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return &server
|
|
|
|
return &server
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) DebugMode(dmg bool) {
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
|
|
|
s.debugMode = dmg
|
|
|
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) IsDebugMode() bool {
|
|
|
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
|
|
|
|
return s.debugMode
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *ServerCommon) ShowError(std bool) {
|
|
|
|
func (s *ServerCommon) ShowError(std bool) {
|
|
|
|
s.mu.Lock()
|
|
|
|
s.mu.Lock()
|
|
|
|
s.showError = std
|
|
|
|
s.showError = std
|
|
|
@ -91,7 +105,7 @@ func (s *ServerCommon) Listen(network string, addr string) error {
|
|
|
|
return errors.New("server already run")
|
|
|
|
return errors.New("server already run")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
s.stopCtx, s.stopFn = context.WithCancel(context.Background())
|
|
|
|
s.stopCtx, s.stopFn = context.WithCancel(context.Background())
|
|
|
|
s.queue = starnet.NewQueueCtx(s.stopCtx, 128)
|
|
|
|
s.queue = stario.NewQueueCtx(s.stopCtx, 128, math.MaxUint32)
|
|
|
|
if strings.Contains(strings.ToLower(network), "udp") {
|
|
|
|
if strings.Contains(strings.ToLower(network), "udp") {
|
|
|
|
return s.ListenUDP(network, addr)
|
|
|
|
return s.ListenUDP(network, addr)
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -185,16 +199,22 @@ func (s *ServerCommon) acceptTU() {
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
select {
|
|
|
|
case <-s.stopCtx.Done():
|
|
|
|
case <-s.stopCtx.Done():
|
|
|
|
|
|
|
|
if s.debugMode {
|
|
|
|
|
|
|
|
fmt.Println("accept goroutine recv exit signal,exit")
|
|
|
|
|
|
|
|
}
|
|
|
|
return
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
conn, err := s.listener.Accept()
|
|
|
|
conn, err := s.listener.Accept()
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
if s.showError {
|
|
|
|
if s.showError || s.debugMode {
|
|
|
|
fmt.Println("error accept:", err)
|
|
|
|
fmt.Println("error accept:", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if s.debugMode {
|
|
|
|
|
|
|
|
fmt.Println("accept new connection from", conn.RemoteAddr())
|
|
|
|
|
|
|
|
}
|
|
|
|
var id string
|
|
|
|
var id string
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63())
|
|
|
|
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63())
|
|
|
@ -241,7 +261,7 @@ func (s *ServerCommon) loadMessage() {
|
|
|
|
s.mu.RLock()
|
|
|
|
s.mu.RLock()
|
|
|
|
for _, v := range s.clientPool {
|
|
|
|
for _, v := range s.clientPool {
|
|
|
|
wg.Add(1)
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
go func(v *ClientConn) {
|
|
|
|
defer wg.Done()
|
|
|
|
defer wg.Done()
|
|
|
|
v.sayGoodByeForTU()
|
|
|
|
v.sayGoodByeForTU()
|
|
|
|
v.alive.Store(false)
|
|
|
|
v.alive.Store(false)
|
|
|
@ -252,7 +272,7 @@ func (s *ServerCommon) loadMessage() {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
v.stopFn()
|
|
|
|
v.stopFn()
|
|
|
|
s.removeClient(v)
|
|
|
|
s.removeClient(v)
|
|
|
|
}()
|
|
|
|
}(v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
s.mu.RUnlock()
|
|
|
|
s.mu.RUnlock()
|
|
|
|
select {
|
|
|
|
select {
|
|
|
@ -272,7 +292,7 @@ func (s *ServerCommon) loadMessage() {
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
s.wg.Add(1)
|
|
|
|
s.wg.Add(1)
|
|
|
|
go func(data starnet.MsgQueue) {
|
|
|
|
go func(data stario.MsgQueue) {
|
|
|
|
s.mu.RLock()
|
|
|
|
s.mu.RLock()
|
|
|
|
cc, ok := s.clientPool[data.Conn.(string)]
|
|
|
|
cc, ok := s.clientPool[data.Conn.(string)]
|
|
|
|
s.mu.RUnlock()
|
|
|
|
s.mu.RUnlock()
|
|
|
@ -282,7 +302,7 @@ func (s *ServerCommon) loadMessage() {
|
|
|
|
//fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000)
|
|
|
|
//fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000)
|
|
|
|
msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg))
|
|
|
|
msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg))
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
if s.showError {
|
|
|
|
if s.showError || s.debugMode {
|
|
|
|
fmt.Println("server decode data error", err)
|
|
|
|
fmt.Println("server decode data error", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
return
|
|
|
@ -343,8 +363,8 @@ func (s *ServerCommon) dispatchMsg(message Message) {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
//just throw
|
|
|
|
//just throw
|
|
|
|
return
|
|
|
|
//return
|
|
|
|
//fallthrough
|
|
|
|
fallthrough
|
|
|
|
default:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
callFn := func(fn func(*Message)) {
|
|
|
|
callFn := func(fn func(*Message)) {
|
|
|
@ -377,7 +397,9 @@ func (s *ServerCommon) sendTU(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
|
|
|
|
data = c.msgEn(c.SecretKey, data)
|
|
|
|
data = c.msgEn(c.SecretKey, data)
|
|
|
|
data = s.queue.BuildMessage(data)
|
|
|
|
data = s.queue.BuildMessage(data)
|
|
|
|
if c.maxWriteTimeout.Seconds() != 0 {
|
|
|
|
if c.maxWriteTimeout.Seconds() != 0 {
|
|
|
|
c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
|
|
|
|
if err := c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)); err != nil {
|
|
|
|
|
|
|
|
return WaitMsg{}, err
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_, err = c.tuConn.Write(data)
|
|
|
|
_, err = c.tuConn.Write(data)
|
|
|
|
//fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000)
|
|
|
|
//fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000)
|
|
|
@ -416,7 +438,7 @@ func (s *ServerCommon) sendWait(c *ClientConn, msg TransferMsg, timeout time.Dur
|
|
|
|
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
|
|
|
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
|
|
|
return Message{}, os.ErrDeadlineExceeded
|
|
|
|
return Message{}, os.ErrDeadlineExceeded
|
|
|
|
case <-s.stopCtx.Done():
|
|
|
|
case <-s.stopCtx.Done():
|
|
|
|
return Message{}, errors.New("Service shutdown")
|
|
|
|
return Message{}, errors.New("service shutdown")
|
|
|
|
case msg, ok := <-data.Reply:
|
|
|
|
case msg, ok := <-data.Reply:
|
|
|
|
if !ok {
|
|
|
|
if !ok {
|
|
|
|
return msg, os.ErrInvalid
|
|
|
|
return msg, os.ErrInvalid
|
|
|
@ -447,7 +469,7 @@ func (s *ServerCommon) sendCtx(c *ClientConn, msg TransferMsg, ctx context.Conte
|
|
|
|
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
|
|
|
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
|
|
|
return Message{}, os.ErrClosed
|
|
|
|
return Message{}, os.ErrClosed
|
|
|
|
case <-s.stopCtx.Done():
|
|
|
|
case <-s.stopCtx.Done():
|
|
|
|
return Message{}, errors.New("Service shutdown")
|
|
|
|
return Message{}, errors.New("service shutdown")
|
|
|
|
case msg, ok := <-data.Reply:
|
|
|
|
case msg, ok := <-data.Reply:
|
|
|
|
if !ok {
|
|
|
|
if !ok {
|
|
|
|
return msg, os.ErrInvalid
|
|
|
|
return msg, os.ErrInvalid
|
|
|
@ -524,6 +546,9 @@ func (s *ServerCommon) acceptUDP() {
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
select {
|
|
|
|
case <-s.stopCtx.Done():
|
|
|
|
case <-s.stopCtx.Done():
|
|
|
|
|
|
|
|
if s.debugMode {
|
|
|
|
|
|
|
|
fmt.Println("accept goroutine recv exit signal,exit")
|
|
|
|
|
|
|
|
}
|
|
|
|
return
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -533,6 +558,9 @@ func (s *ServerCommon) acceptUDP() {
|
|
|
|
data := make([]byte, 4096)
|
|
|
|
data := make([]byte, 4096)
|
|
|
|
num, addr, err := s.udpListener.ReadFromUDP(data)
|
|
|
|
num, addr, err := s.udpListener.ReadFromUDP(data)
|
|
|
|
id := addr.String()
|
|
|
|
id := addr.String()
|
|
|
|
|
|
|
|
if s.debugMode {
|
|
|
|
|
|
|
|
fmt.Println("accept new udp message from", id)
|
|
|
|
|
|
|
|
}
|
|
|
|
//fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000)
|
|
|
|
//fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000)
|
|
|
|
s.mu.RLock()
|
|
|
|
s.mu.RLock()
|
|
|
|
if _, ok := s.clientPool[id]; !ok {
|
|
|
|
if _, ok := s.clientPool[id]; !ok {
|
|
|
@ -626,7 +654,7 @@ func (s *ServerCommon) GetClient(id string) *ClientConn {
|
|
|
|
func (s *ServerCommon) GetClientLists() []*ClientConn {
|
|
|
|
func (s *ServerCommon) GetClientLists() []*ClientConn {
|
|
|
|
s.mu.RLock()
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
var list []*ClientConn = make([]*ClientConn, 0, len(s.clientPool))
|
|
|
|
var list = make([]*ClientConn, 0, len(s.clientPool))
|
|
|
|
for _, v := range s.clientPool {
|
|
|
|
for _, v := range s.clientPool {
|
|
|
|
list = append(list, v)
|
|
|
|
list = append(list, v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|