Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a81d74ac45 | |||
| a2ab64a372 | |||
| 48bbc5b776 | |||
| 9065a12b99 | |||
| 996f94eef0 |
@@ -15,7 +15,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
//var nowd int64
|
|
||||||
type ClientCommon struct {
|
type ClientCommon struct {
|
||||||
alive atomic.Value
|
alive atomic.Value
|
||||||
status Status
|
status Status
|
||||||
@@ -47,6 +46,7 @@ type ClientCommon struct {
|
|||||||
useHeartBeat bool
|
useHeartBeat bool
|
||||||
sequenceDe func([]byte) (interface{}, error)
|
sequenceDe func([]byte) (interface{}, error)
|
||||||
sequenceEn func(interface{}) ([]byte, error)
|
sequenceEn func(interface{}) ([]byte, error)
|
||||||
|
debugMode bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientCommon) Connect(network string, addr string) error {
|
func (c *ClientCommon) Connect(network string, addr string) error {
|
||||||
@@ -68,6 +68,16 @@ func (c *ClientCommon) Connect(network string, addr string) error {
|
|||||||
return c.clientPostInit()
|
return c.clientPostInit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ClientCommon) DebugMode(dmg bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.debugMode = dmg
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientCommon) IsDebugMode() bool {
|
||||||
|
return c.debugMode
|
||||||
|
}
|
||||||
|
|
||||||
func (c *ClientCommon) ConnectTimeout(network string, addr string, timeout time.Duration) error {
|
func (c *ClientCommon) ConnectTimeout(network string, addr string, timeout time.Duration) error {
|
||||||
if c.alive.Load().(bool) {
|
if c.alive.Load().(bool) {
|
||||||
return errors.New("client already run")
|
return errors.New("client already run")
|
||||||
@@ -185,9 +195,14 @@ func (c *ClientCommon) Heartbeat() {
|
|||||||
c.lastHeartbeat = time.Now().Unix()
|
c.lastHeartbeat = time.Now().Unix()
|
||||||
failedCount = 0
|
failedCount = 0
|
||||||
}
|
}
|
||||||
|
if c.debugMode {
|
||||||
|
fmt.Println("failed to recv heartbeat,timeout!")
|
||||||
|
}
|
||||||
failedCount++
|
failedCount++
|
||||||
if failedCount >= 3 {
|
if failedCount >= 3 {
|
||||||
//fmt.Println("heatbeat failed,stop client")
|
if c.debugMode {
|
||||||
|
fmt.Println("heatbeat failed more than 3 times,stop client")
|
||||||
|
}
|
||||||
c.alive.Store(false)
|
c.alive.Store(false)
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
c.status = Status{
|
c.status = Status{
|
||||||
@@ -218,7 +233,9 @@ func (c *ClientCommon) readMessage() {
|
|||||||
}
|
}
|
||||||
data := make([]byte, 8192)
|
data := make([]byte, 8192)
|
||||||
if c.maxReadTimeout.Seconds() != 0 {
|
if c.maxReadTimeout.Seconds() != 0 {
|
||||||
c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
|
if err := c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)); err != nil {
|
||||||
|
//TODO:ALERT
|
||||||
|
}
|
||||||
}
|
}
|
||||||
readNum, err := c.conn.Read(data)
|
readNum, err := c.conn.Read(data)
|
||||||
if err == os.ErrDeadlineExceeded {
|
if err == os.ErrDeadlineExceeded {
|
||||||
@@ -228,7 +245,7 @@ func (c *ClientCommon) readMessage() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if c.showError {
|
if c.showError || c.debugMode {
|
||||||
fmt.Println("client read error", err)
|
fmt.Println("client read error", err)
|
||||||
}
|
}
|
||||||
c.alive.Store(false)
|
c.alive.Store(false)
|
||||||
@@ -278,7 +295,7 @@ func (c *ClientCommon) loadMessage() {
|
|||||||
//transfer to Msg
|
//transfer to Msg
|
||||||
msg, err := c.sequenceDe(c.msgDe(c.SecretKey, data.Msg))
|
msg, err := c.sequenceDe(c.msgDe(c.SecretKey, data.Msg))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if c.showError {
|
if c.showError || c.debugMode {
|
||||||
fmt.Println("client decode data error", err)
|
fmt.Println("client decode data error", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -314,8 +331,8 @@ func (c *ClientCommon) dispatchMsg(message Message) {
|
|||||||
c.noFinSyncMsgPool.Delete(message.ID)
|
c.noFinSyncMsgPool.Delete(message.ID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
//return
|
||||||
//fallthrough
|
fallthrough
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
callFn := func(fn func(*Message)) {
|
callFn := func(fn func(*Message)) {
|
||||||
@@ -411,7 +428,7 @@ func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message
|
|||||||
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
||||||
return Message{}, os.ErrDeadlineExceeded
|
return Message{}, os.ErrDeadlineExceeded
|
||||||
case <-c.stopCtx.Done():
|
case <-c.stopCtx.Done():
|
||||||
return Message{}, errors.New("Service shutdown")
|
return Message{}, errors.New("service shutdown")
|
||||||
case msg, ok := <-data.Reply:
|
case msg, ok := <-data.Reply:
|
||||||
if !ok {
|
if !ok {
|
||||||
return msg, os.ErrInvalid
|
return msg, os.ErrInvalid
|
||||||
@@ -434,7 +451,7 @@ func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, e
|
|||||||
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
||||||
return Message{}, os.ErrDeadlineExceeded
|
return Message{}, os.ErrDeadlineExceeded
|
||||||
case <-c.stopCtx.Done():
|
case <-c.stopCtx.Done():
|
||||||
return Message{}, errors.New("Service shutdown")
|
return Message{}, errors.New("service shutdown")
|
||||||
case msg, ok := <-data.Reply:
|
case msg, ok := <-data.Reply:
|
||||||
if !ok {
|
if !ok {
|
||||||
return msg, os.ErrInvalid
|
return msg, os.ErrInvalid
|
||||||
@@ -497,7 +514,11 @@ func (c *ClientCommon) Reply(m Message, value MsgVal) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientCommon) ExchangeKey(newKey []byte) error {
|
func (c *ClientCommon) ExchangeKey(newKey []byte) error {
|
||||||
newSendKey, err := starcrypto.RSAEncrypt(newKey, c.handshakeRsaPubKey)
|
pubKey, err := starcrypto.DecodePublicKey(c.handshakeRsaPubKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
newSendKey, err := starcrypto.RSAEncrypt(pubKey, newKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,6 +39,8 @@ type Client interface {
|
|||||||
StopMonitorChan() <-chan struct{}
|
StopMonitorChan() <-chan struct{}
|
||||||
Status() Status
|
Status() Status
|
||||||
ShowError(bool)
|
ShowError(bool)
|
||||||
|
DebugMode(bool)
|
||||||
|
IsDebugMode() bool
|
||||||
|
|
||||||
GetSequenceEn() func(interface{}) ([]byte, error)
|
GetSequenceEn() func(interface{}) ([]byte, error)
|
||||||
SetSequenceEn(func(interface{}) ([]byte, error))
|
SetSequenceEn(func(interface{}) ([]byte, error))
|
||||||
|
|||||||
+1
-1
@@ -82,5 +82,5 @@ func defaultMsgDe(key []byte, d []byte) []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
Register(TransferMsg{})
|
RegisterName("b612.me/notify.Transfer", TransferMsg{})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ module b612.me/notify
|
|||||||
go 1.16
|
go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
b612.me/starcrypto v0.0.1
|
b612.me/starcrypto v0.0.2
|
||||||
b612.me/stario v0.0.5
|
b612.me/stario v0.0.8
|
||||||
b612.me/starnet v0.1.3
|
b612.me/starnet v0.1.7
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
b612.me/starcrypto v0.0.1 h1:xGngzXPUrVbqtWzNw2e+0eWsdG7GG1/X+ONDGIzdriI=
|
b612.me/starcrypto v0.0.2 h1:aRf1HcqK8GqHYxLAhWfFC4W/EqQLEFNEmxsBu3wG30o=
|
||||||
b612.me/starcrypto v0.0.1/go.mod h1:hz0xRnfWNpYOlVrIPoGrQOWPibq4YiUZ7qN5tsQbzPo=
|
b612.me/starcrypto v0.0.2/go.mod h1:hz0xRnfWNpYOlVrIPoGrQOWPibq4YiUZ7qN5tsQbzPo=
|
||||||
b612.me/stario v0.0.5 h1:Q1OGF+8eOoK49zMzkyh80GWaMuknhey6+PWJJL9ZuNo=
|
b612.me/stario v0.0.8 h1:kaA4pszAKLZJm2D9JmiuYSpgjTeE3VaO74vm+H0vBGM=
|
||||||
b612.me/stario v0.0.5/go.mod h1:or4ssWcxQSjMeu+hRKEgtp0X517b3zdlEOAms8Qscvw=
|
b612.me/stario v0.0.8/go.mod h1:or4ssWcxQSjMeu+hRKEgtp0X517b3zdlEOAms8Qscvw=
|
||||||
b612.me/starnet v0.1.3 h1:UjY6M96gdPdJtxnQGzCttqSwFw93sDZSHiIGtdOlFfk=
|
b612.me/starnet v0.1.7 h1:k3CUfYNRolC/xw5Ekus2NVWHlqeykSyAH8USGTPKA5o=
|
||||||
b612.me/starnet v0.1.3/go.mod h1:j/dd6BKwQK80O4gfbGYg2aYtPH76gSdgpuKboK/DwN4=
|
b612.me/starnet v0.1.7/go.mod h1:DNC4i/ezgVLlmxnquf8AeljsL4mQ5vAyxh8vGPQqsys=
|
||||||
golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000 h1:SL+8VVnkqyshUSz5iNnXtrBQzvFF2SkROm6t5RczFAE=
|
golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000 h1:SL+8VVnkqyshUSz5iNnXtrBQzvFF2SkROm6t5RczFAE=
|
||||||
golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"reflect"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -45,7 +45,6 @@ type Message struct {
|
|||||||
ServerConn Client
|
ServerConn Client
|
||||||
TransferMsg
|
TransferMsg
|
||||||
Time time.Time
|
Time time.Time
|
||||||
sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type WaitMsg struct {
|
type WaitMsg struct {
|
||||||
@@ -146,8 +145,13 @@ func (c *ClientConn) readTUMessage() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientConn) rsaDecode(message Message) {
|
func (c *ClientConn) rsaDecode(message Message) {
|
||||||
unknownKey := message.Value
|
privKey, err := starcrypto.DecodePrivateKey(c.handshakeRsaKey, "")
|
||||||
data, err := starcrypto.RSADecrypt(unknownKey, c.handshakeRsaKey, "")
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
message.Reply([]byte("failed"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
data, err := starcrypto.RSADecrypt(privKey, message.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
message.Reply([]byte("failed"))
|
message.Reply([]byte("failed"))
|
||||||
@@ -470,3 +474,29 @@ func MustToMsgVal(val interface{}) MsgVal {
|
|||||||
}
|
}
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m MsgVal) Orm(stu interface{}) error {
|
||||||
|
inf, err := m.ToInterface()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t := reflect.TypeOf(stu)
|
||||||
|
if t.Kind() != reflect.Ptr {
|
||||||
|
return errors.New("interface not writable(pointer wanted)")
|
||||||
|
}
|
||||||
|
if !reflect.ValueOf(stu).Elem().CanSet() {
|
||||||
|
return errors.New("interface not writable")
|
||||||
|
}
|
||||||
|
it := reflect.TypeOf(inf)
|
||||||
|
if t.Elem().Kind() != it.Kind() {
|
||||||
|
return fmt.Errorf("interface{} kind is %v,not %v", t.Elem().Kind(), it.Kind())
|
||||||
|
}
|
||||||
|
if t.Elem().Name() != it.Name() {
|
||||||
|
return fmt.Errorf("interface{} name is %v,not %v", t.Elem().Name(), it.Name())
|
||||||
|
}
|
||||||
|
if t.Elem().String() != it.String() {
|
||||||
|
return fmt.Errorf("interface{} string is %v,not %v", t.Elem().String(), it.String())
|
||||||
|
}
|
||||||
|
reflect.ValueOf(stu).Elem().Set(reflect.ValueOf(inf))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
+96
@@ -0,0 +1,96 @@
|
|||||||
|
package notify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMsgEnDeCode(t *testing.T) {
|
||||||
|
Register(HelloMessage{})
|
||||||
|
Register(Error{})
|
||||||
|
go ServerRun(time.Second * 30)
|
||||||
|
time.Sleep(time.Second * 2)
|
||||||
|
ClientRun(time.Second * 35)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Error struct {
|
||||||
|
Msg string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e Error) Error() string {
|
||||||
|
return e.Msg
|
||||||
|
}
|
||||||
|
|
||||||
|
type WorldMessage struct {
|
||||||
|
Port int
|
||||||
|
MyCode string
|
||||||
|
MyInfo []int
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type HelloMessage struct {
|
||||||
|
ID string
|
||||||
|
MyMap map[string]string
|
||||||
|
MySlice []int
|
||||||
|
World WorldMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
func ClientRun(stopTime time.Duration) {
|
||||||
|
c := NewClient()
|
||||||
|
err := c.Connect("tcp", "127.0.0.1:23456")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
c.SetLink("msg", func(msg *Message) {
|
||||||
|
var hi HelloMessage
|
||||||
|
err := msg.Value.Orm(&hi)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
fmt.Printf("recv info from server,struct detail is %+v\n", hi)
|
||||||
|
})
|
||||||
|
timer := time.NewTimer(stopTime)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
c.Stop()
|
||||||
|
return
|
||||||
|
case <-time.After(time.Second * 2):
|
||||||
|
fmt.Println("client msg sent", c.SendObj("msg", HelloMessage{
|
||||||
|
ID: "client",
|
||||||
|
MyMap: map[string]string{"hello": "world"},
|
||||||
|
MySlice: []int{int(time.Now().Unix())},
|
||||||
|
World: WorldMessage{
|
||||||
|
Port: 520,
|
||||||
|
MyCode: "b612",
|
||||||
|
MyInfo: []int{0, 1, 2, 3},
|
||||||
|
Err: Error{Msg: "Hello World"},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ServerRun(stopTime time.Duration) {
|
||||||
|
s := NewServer()
|
||||||
|
err := s.Listen("tcp", "127.0.0.1:23456")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
s.SetLink("msg", func(msg *Message) {
|
||||||
|
var hi HelloMessage
|
||||||
|
err := msg.Value.Orm(&hi)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
fmt.Printf("recv info from client:%v,struct detail is %+v\n", msg.ClientConn.GetRemoteAddr(), hi)
|
||||||
|
hi.ID = "server"
|
||||||
|
hi.World.Port = 666
|
||||||
|
hi.MySlice = append(hi.MySlice, 1, 1, 2, 7)
|
||||||
|
msg.ReplyObj(hi)
|
||||||
|
})
|
||||||
|
<-time.After(stopTime)
|
||||||
|
s.Stop()
|
||||||
|
}
|
||||||
@@ -9,11 +9,22 @@ func Register(data interface{}) {
|
|||||||
gob.Register(data)
|
gob.Register(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RegisterName(name string, data interface{}) {
|
||||||
|
gob.RegisterName(name, data)
|
||||||
|
}
|
||||||
|
|
||||||
func RegisterAll(data []interface{}) {
|
func RegisterAll(data []interface{}) {
|
||||||
for _, v := range data {
|
for _, v := range data {
|
||||||
gob.Register(v)
|
gob.Register(v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RegisterNames(data map[string]interface{}) {
|
||||||
|
for k, v := range data {
|
||||||
|
gob.RegisterName(k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func encode(src interface{}) ([]byte, error) {
|
func encode(src interface{}) ([]byte, error) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
enc := gob.NewEncoder(&buf)
|
enc := gob.NewEncoder(&buf)
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ type ServerCommon struct {
|
|||||||
sequenceDe func([]byte) (interface{}, error)
|
sequenceDe func([]byte) (interface{}, error)
|
||||||
sequenceEn func(interface{}) ([]byte, error)
|
sequenceEn func(interface{}) ([]byte, error)
|
||||||
showError bool
|
showError bool
|
||||||
|
debugMode bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer() Server {
|
func NewServer() Server {
|
||||||
@@ -65,6 +66,19 @@ func NewServer() Server {
|
|||||||
}
|
}
|
||||||
return &server
|
return &server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ServerCommon) DebugMode(dmg bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.debugMode = dmg
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerCommon) IsDebugMode() bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.debugMode
|
||||||
|
}
|
||||||
|
|
||||||
func (s *ServerCommon) ShowError(std bool) {
|
func (s *ServerCommon) ShowError(std bool) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.showError = std
|
s.showError = std
|
||||||
@@ -185,16 +199,22 @@ func (s *ServerCommon) acceptTU() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.stopCtx.Done():
|
case <-s.stopCtx.Done():
|
||||||
|
if s.debugMode {
|
||||||
|
fmt.Println("accept goroutine recv exit signal,exit")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
conn, err := s.listener.Accept()
|
conn, err := s.listener.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if s.showError {
|
if s.showError || s.debugMode {
|
||||||
fmt.Println("error accept:", err)
|
fmt.Println("error accept:", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if s.debugMode {
|
||||||
|
fmt.Println("accept new connection from", conn.RemoteAddr())
|
||||||
|
}
|
||||||
var id string
|
var id string
|
||||||
for {
|
for {
|
||||||
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63())
|
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63())
|
||||||
@@ -241,7 +261,7 @@ func (s *ServerCommon) loadMessage() {
|
|||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
for _, v := range s.clientPool {
|
for _, v := range s.clientPool {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func(v *ClientConn) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
v.sayGoodByeForTU()
|
v.sayGoodByeForTU()
|
||||||
v.alive.Store(false)
|
v.alive.Store(false)
|
||||||
@@ -252,7 +272,7 @@ func (s *ServerCommon) loadMessage() {
|
|||||||
}
|
}
|
||||||
v.stopFn()
|
v.stopFn()
|
||||||
s.removeClient(v)
|
s.removeClient(v)
|
||||||
}()
|
}(v)
|
||||||
}
|
}
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
select {
|
select {
|
||||||
@@ -282,7 +302,7 @@ func (s *ServerCommon) loadMessage() {
|
|||||||
//fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000)
|
//fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000)
|
||||||
msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg))
|
msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if s.showError {
|
if s.showError || s.debugMode {
|
||||||
fmt.Println("server decode data error", err)
|
fmt.Println("server decode data error", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -343,8 +363,8 @@ func (s *ServerCommon) dispatchMsg(message Message) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
//just throw
|
//just throw
|
||||||
return
|
//return
|
||||||
//fallthrough
|
fallthrough
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
callFn := func(fn func(*Message)) {
|
callFn := func(fn func(*Message)) {
|
||||||
@@ -377,7 +397,9 @@ func (s *ServerCommon) sendTU(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
|
|||||||
data = c.msgEn(c.SecretKey, data)
|
data = c.msgEn(c.SecretKey, data)
|
||||||
data = s.queue.BuildMessage(data)
|
data = s.queue.BuildMessage(data)
|
||||||
if c.maxWriteTimeout.Seconds() != 0 {
|
if c.maxWriteTimeout.Seconds() != 0 {
|
||||||
c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
|
if err := c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)); err != nil {
|
||||||
|
return WaitMsg{}, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_, err = c.tuConn.Write(data)
|
_, err = c.tuConn.Write(data)
|
||||||
//fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000)
|
//fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000)
|
||||||
@@ -416,7 +438,7 @@ func (s *ServerCommon) sendWait(c *ClientConn, msg TransferMsg, timeout time.Dur
|
|||||||
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
||||||
return Message{}, os.ErrDeadlineExceeded
|
return Message{}, os.ErrDeadlineExceeded
|
||||||
case <-s.stopCtx.Done():
|
case <-s.stopCtx.Done():
|
||||||
return Message{}, errors.New("Service shutdown")
|
return Message{}, errors.New("service shutdown")
|
||||||
case msg, ok := <-data.Reply:
|
case msg, ok := <-data.Reply:
|
||||||
if !ok {
|
if !ok {
|
||||||
return msg, os.ErrInvalid
|
return msg, os.ErrInvalid
|
||||||
@@ -447,7 +469,7 @@ func (s *ServerCommon) sendCtx(c *ClientConn, msg TransferMsg, ctx context.Conte
|
|||||||
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
|
||||||
return Message{}, os.ErrClosed
|
return Message{}, os.ErrClosed
|
||||||
case <-s.stopCtx.Done():
|
case <-s.stopCtx.Done():
|
||||||
return Message{}, errors.New("Service shutdown")
|
return Message{}, errors.New("service shutdown")
|
||||||
case msg, ok := <-data.Reply:
|
case msg, ok := <-data.Reply:
|
||||||
if !ok {
|
if !ok {
|
||||||
return msg, os.ErrInvalid
|
return msg, os.ErrInvalid
|
||||||
@@ -524,6 +546,9 @@ func (s *ServerCommon) acceptUDP() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.stopCtx.Done():
|
case <-s.stopCtx.Done():
|
||||||
|
if s.debugMode {
|
||||||
|
fmt.Println("accept goroutine recv exit signal,exit")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@@ -533,6 +558,9 @@ func (s *ServerCommon) acceptUDP() {
|
|||||||
data := make([]byte, 4096)
|
data := make([]byte, 4096)
|
||||||
num, addr, err := s.udpListener.ReadFromUDP(data)
|
num, addr, err := s.udpListener.ReadFromUDP(data)
|
||||||
id := addr.String()
|
id := addr.String()
|
||||||
|
if s.debugMode {
|
||||||
|
fmt.Println("accept new udp message from", id)
|
||||||
|
}
|
||||||
//fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000)
|
//fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000)
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
if _, ok := s.clientPool[id]; !ok {
|
if _, ok := s.clientPool[id]; !ok {
|
||||||
@@ -626,7 +654,7 @@ func (s *ServerCommon) GetClient(id string) *ClientConn {
|
|||||||
func (s *ServerCommon) GetClientLists() []*ClientConn {
|
func (s *ServerCommon) GetClientLists() []*ClientConn {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
var list []*ClientConn = make([]*ClientConn, 0, len(s.clientPool))
|
var list = make([]*ClientConn, 0, len(s.clientPool))
|
||||||
for _, v := range s.clientPool {
|
for _, v := range s.clientPool {
|
||||||
list = append(list, v)
|
list = append(list, v)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ type Server interface {
|
|||||||
GetSequenceDe() func([]byte) (interface{}, error)
|
GetSequenceDe() func([]byte) (interface{}, error)
|
||||||
SetSequenceDe(func([]byte) (interface{}, error))
|
SetSequenceDe(func([]byte) (interface{}, error))
|
||||||
ShowError(bool)
|
ShowError(bool)
|
||||||
|
DebugMode(bool)
|
||||||
|
IsDebugMode() bool
|
||||||
|
|
||||||
HeartbeatTimeoutSec() int64
|
HeartbeatTimeoutSec() int64
|
||||||
SetHeartbeatTimeoutSec(int64)
|
SetHeartbeatTimeoutSec(int64)
|
||||||
|
|||||||
+15
-9
@@ -14,22 +14,24 @@ func Test_ServerTuAndClientCommon(t *testing.T) {
|
|||||||
noEn := func(key, bn []byte) []byte {
|
noEn := func(key, bn []byte) []byte {
|
||||||
return bn
|
return bn
|
||||||
}
|
}
|
||||||
|
_ = noEn
|
||||||
server := NewServer()
|
server := NewServer()
|
||||||
server.SetDefaultCommDecode(noEn)
|
//server.SetDefaultCommDecode(noEn)
|
||||||
server.SetDefaultCommEncode(noEn)
|
//server.SetDefaultCommEncode(noEn)
|
||||||
err := server.Listen("tcp", "127.0.0.1:12345")
|
err := server.Listen("tcp", "127.0.0.1:12345")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
server.SetLink("notify", notify)
|
server.SetLink("notify", notify)
|
||||||
for i := 1; i <= 5000; i++ {
|
for i := 1; i <= 100; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
client := NewClient()
|
client := NewClient()
|
||||||
client.SetMsgEn(noEn)
|
//client.SetMsgEn(noEn)
|
||||||
client.SetMsgDe(noEn)
|
//client.SetMsgDe(noEn)
|
||||||
client.SetSkipExchangeKey(true)
|
//client.SetSkipExchangeKey(true)
|
||||||
err = client.Connect("tcp", "127.0.0.1:12345")
|
err = client.Connect("tcp", "127.0.0.1:12345")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
time.Sleep(time.Second * 2)
|
time.Sleep(time.Second * 2)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -37,7 +39,8 @@ func Test_ServerTuAndClientCommon(t *testing.T) {
|
|||||||
for {
|
for {
|
||||||
|
|
||||||
//nowd = time.Now().UnixNano()
|
//nowd = time.Now().UnixNano()
|
||||||
client.SendWait("notify", []byte("client hello"),time.Second*15)
|
client.SendWait("notify", []byte("client hello"), time.Second*15)
|
||||||
|
//client.Send("notify", []byte("client hello"))
|
||||||
//time.Sleep(time.Millisecond)
|
//time.Sleep(time.Millisecond)
|
||||||
//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
|
//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
|
||||||
//client.Send("notify", []byte("client"))
|
//client.Send("notify", []byte("client"))
|
||||||
@@ -65,7 +68,10 @@ func notify(msg *Message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Test_normal(t *testing.T) {
|
func Test_normal(t *testing.T) {
|
||||||
server, _ := net.Listen("udp", "127.0.0.1:12345")
|
server, err := net.Listen("tcp", "127.0.0.1:12345")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
conn, err := server.Accept()
|
conn, err := server.Accept()
|
||||||
@@ -87,7 +93,7 @@ func Test_normal(t *testing.T) {
|
|||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * 5)
|
||||||
for i := 1; i <= 100; i++ {
|
for i := 1; i <= 100; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
conn, err := net.Dial("udp", "127.0.0.1:12345")
|
conn, err := net.Dial("tcp", "127.0.0.1:12345")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user