2 Commits

Author SHA1 Message Date
b612 48bbc5b776 add debug mode 2022-05-20 09:27:19 +08:00
b612 9065a12b99 add orm function 2022-05-19 11:04:52 +08:00
9 changed files with 202 additions and 28 deletions
+30 -9
View File
@@ -15,7 +15,10 @@ import (
"time" "time"
) )
//var nowd int64 func init() {
Register(TransferMsg{})
}
type ClientCommon struct { type ClientCommon struct {
alive atomic.Value alive atomic.Value
status Status status Status
@@ -47,6 +50,7 @@ type ClientCommon struct {
useHeartBeat bool useHeartBeat bool
sequenceDe func([]byte) (interface{}, error) sequenceDe func([]byte) (interface{}, error)
sequenceEn func(interface{}) ([]byte, error) sequenceEn func(interface{}) ([]byte, error)
debugMode bool
} }
func (c *ClientCommon) Connect(network string, addr string) error { func (c *ClientCommon) Connect(network string, addr string) error {
@@ -68,6 +72,16 @@ func (c *ClientCommon) Connect(network string, addr string) error {
return c.clientPostInit() return c.clientPostInit()
} }
func (c *ClientCommon) DebugMode(dmg bool) {
c.mu.Lock()
c.debugMode = dmg
c.mu.Unlock()
}
func (c *ClientCommon) IsDebugMode() bool {
return c.debugMode
}
func (c *ClientCommon) ConnectTimeout(network string, addr string, timeout time.Duration) error { func (c *ClientCommon) ConnectTimeout(network string, addr string, timeout time.Duration) error {
if c.alive.Load().(bool) { if c.alive.Load().(bool) {
return errors.New("client already run") return errors.New("client already run")
@@ -185,9 +199,14 @@ func (c *ClientCommon) Heartbeat() {
c.lastHeartbeat = time.Now().Unix() c.lastHeartbeat = time.Now().Unix()
failedCount = 0 failedCount = 0
} }
if c.debugMode {
fmt.Println("failed to recv heartbeat,timeout!")
}
failedCount++ failedCount++
if failedCount >= 3 { if failedCount >= 3 {
//fmt.Println("heatbeat failed,stop client") if c.debugMode {
fmt.Println("heatbeat failed more than 3 times,stop client")
}
c.alive.Store(false) c.alive.Store(false)
c.mu.Lock() c.mu.Lock()
c.status = Status{ c.status = Status{
@@ -218,7 +237,9 @@ func (c *ClientCommon) readMessage() {
} }
data := make([]byte, 8192) data := make([]byte, 8192)
if c.maxReadTimeout.Seconds() != 0 { if c.maxReadTimeout.Seconds() != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)) if err := c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)); err != nil {
//TODO:ALERT
}
} }
readNum, err := c.conn.Read(data) readNum, err := c.conn.Read(data)
if err == os.ErrDeadlineExceeded { if err == os.ErrDeadlineExceeded {
@@ -228,7 +249,7 @@ func (c *ClientCommon) readMessage() {
continue continue
} }
if err != nil { if err != nil {
if c.showError { if c.showError || c.debugMode {
fmt.Println("client read error", err) fmt.Println("client read error", err)
} }
c.alive.Store(false) c.alive.Store(false)
@@ -278,7 +299,7 @@ func (c *ClientCommon) loadMessage() {
//transfer to Msg //transfer to Msg
msg, err := c.sequenceDe(c.msgDe(c.SecretKey, data.Msg)) msg, err := c.sequenceDe(c.msgDe(c.SecretKey, data.Msg))
if err != nil { if err != nil {
if c.showError { if c.showError || c.debugMode {
fmt.Println("client decode data error", err) fmt.Println("client decode data error", err)
} }
return return
@@ -314,8 +335,8 @@ func (c *ClientCommon) dispatchMsg(message Message) {
c.noFinSyncMsgPool.Delete(message.ID) c.noFinSyncMsgPool.Delete(message.ID)
return return
} }
return //return
//fallthrough fallthrough
default: default:
} }
callFn := func(fn func(*Message)) { callFn := func(fn func(*Message)) {
@@ -411,7 +432,7 @@ func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID) c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded return Message{}, os.ErrDeadlineExceeded
case <-c.stopCtx.Done(): case <-c.stopCtx.Done():
return Message{}, errors.New("Service shutdown") return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply: case msg, ok := <-data.Reply:
if !ok { if !ok {
return msg, os.ErrInvalid return msg, os.ErrInvalid
@@ -434,7 +455,7 @@ func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, e
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID) c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded return Message{}, os.ErrDeadlineExceeded
case <-c.stopCtx.Done(): case <-c.stopCtx.Done():
return Message{}, errors.New("Service shutdown") return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply: case msg, ok := <-data.Reply:
if !ok { if !ok {
return msg, os.ErrInvalid return msg, os.ErrInvalid
+2
View File
@@ -39,6 +39,8 @@ type Client interface {
StopMonitorChan() <-chan struct{} StopMonitorChan() <-chan struct{}
Status() Status Status() Status
ShowError(bool) ShowError(bool)
DebugMode(bool)
IsDebugMode() bool
GetSequenceEn() func(interface{}) ([]byte, error) GetSequenceEn() func(interface{}) ([]byte, error)
SetSequenceEn(func(interface{}) ([]byte, error)) SetSequenceEn(func(interface{}) ([]byte, error))
-4
View File
@@ -80,7 +80,3 @@ func defaultMsgEn(key []byte, d []byte) []byte {
func defaultMsgDe(key []byte, d []byte) []byte { func defaultMsgDe(key []byte, d []byte) []byte {
return starcrypto.AesDecryptCFB(d, key) return starcrypto.AesDecryptCFB(d, key)
} }
func init() {
Register(TransferMsg{})
}
+1 -1
View File
@@ -3,7 +3,7 @@ module b612.me/notify
go 1.16 go 1.16
require ( require (
b612.me/starcrypto v0.0.1 b612.me/starcrypto v0.0.2
b612.me/stario v0.0.5 b612.me/stario v0.0.5
b612.me/starnet v0.1.3 b612.me/starnet v0.1.3
) )
+2 -2
View File
@@ -1,5 +1,5 @@
b612.me/starcrypto v0.0.1 h1:xGngzXPUrVbqtWzNw2e+0eWsdG7GG1/X+ONDGIzdriI= b612.me/starcrypto v0.0.2 h1:aRf1HcqK8GqHYxLAhWfFC4W/EqQLEFNEmxsBu3wG30o=
b612.me/starcrypto v0.0.1/go.mod h1:hz0xRnfWNpYOlVrIPoGrQOWPibq4YiUZ7qN5tsQbzPo= b612.me/starcrypto v0.0.2/go.mod h1:hz0xRnfWNpYOlVrIPoGrQOWPibq4YiUZ7qN5tsQbzPo=
b612.me/stario v0.0.5 h1:Q1OGF+8eOoK49zMzkyh80GWaMuknhey6+PWJJL9ZuNo= b612.me/stario v0.0.5 h1:Q1OGF+8eOoK49zMzkyh80GWaMuknhey6+PWJJL9ZuNo=
b612.me/stario v0.0.5/go.mod h1:or4ssWcxQSjMeu+hRKEgtp0X517b3zdlEOAms8Qscvw= b612.me/stario v0.0.5/go.mod h1:or4ssWcxQSjMeu+hRKEgtp0X517b3zdlEOAms8Qscvw=
b612.me/starnet v0.1.3 h1:UjY6M96gdPdJtxnQGzCttqSwFw93sDZSHiIGtdOlFfk= b612.me/starnet v0.1.3 h1:UjY6M96gdPdJtxnQGzCttqSwFw93sDZSHiIGtdOlFfk=
+27 -2
View File
@@ -7,7 +7,7 @@ import (
"fmt" "fmt"
"net" "net"
"os" "os"
"sync" "reflect"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@@ -45,7 +45,6 @@ type Message struct {
ServerConn Client ServerConn Client
TransferMsg TransferMsg
Time time.Time Time time.Time
sync.Mutex
} }
type WaitMsg struct { type WaitMsg struct {
@@ -475,3 +474,29 @@ func MustToMsgVal(val interface{}) MsgVal {
} }
return d return d
} }
func (m MsgVal) Orm(stu interface{}) error {
inf, err := m.ToInterface()
if err != nil {
return err
}
t := reflect.TypeOf(stu)
if t.Kind() != reflect.Ptr {
return errors.New("interface not writable(pointer wanted)")
}
if !reflect.ValueOf(stu).Elem().CanSet() {
return errors.New("interface not writable")
}
it := reflect.TypeOf(inf)
if t.Elem().Kind() != it.Kind() {
return fmt.Errorf("interface{} kind is %v,not %v", t.Elem().Kind(), it.Kind())
}
if t.Elem().Name() != it.Name() {
return fmt.Errorf("interface{} name is %v,not %v", t.Elem().Name(), it.Name())
}
if t.Elem().String() != it.String() {
return fmt.Errorf("interface{} string is %v,not %v", t.Elem().String(), it.String())
}
reflect.ValueOf(stu).Elem().Set(reflect.ValueOf(inf))
return nil
}
+96
View File
@@ -0,0 +1,96 @@
package notify
import (
"fmt"
"testing"
"time"
)
func TestMsgEnDeCode(t *testing.T) {
Register(HelloMessage{})
Register(Error{})
go ServerRun(time.Second * 30)
time.Sleep(time.Second * 2)
ClientRun(time.Second * 35)
}
type Error struct {
Msg string
}
func (e Error) Error() string {
return e.Msg
}
type WorldMessage struct {
Port int
MyCode string
MyInfo []int
Err error
}
type HelloMessage struct {
ID string
MyMap map[string]string
MySlice []int
World WorldMessage
}
func ClientRun(stopTime time.Duration) {
c := NewClient()
err := c.Connect("tcp", "127.0.0.1:23456")
if err != nil {
panic(err)
}
c.SetLink("msg", func(msg *Message) {
var hi HelloMessage
err := msg.Value.Orm(&hi)
if err != nil {
panic(err)
}
fmt.Printf("recv info from server,struct detail is %+v\n", hi)
})
timer := time.NewTimer(stopTime)
for {
select {
case <-timer.C:
c.Stop()
return
case <-time.After(time.Second * 2):
fmt.Println("client msg sent", c.SendObj("msg", HelloMessage{
ID: "client",
MyMap: map[string]string{"hello": "world"},
MySlice: []int{int(time.Now().Unix())},
World: WorldMessage{
Port: 520,
MyCode: "b612",
MyInfo: []int{0, 1, 2, 3},
Err: Error{Msg: "Hello World"},
},
}))
}
}
}
func ServerRun(stopTime time.Duration) {
s := NewServer()
err := s.Listen("tcp", "127.0.0.1:23456")
if err != nil {
panic(err)
}
s.SetLink("msg", func(msg *Message) {
var hi HelloMessage
err := msg.Value.Orm(&hi)
if err != nil {
panic(err)
}
fmt.Printf("recv info from client:%v,struct detail is %+v\n", msg.ClientConn.GetRemoteAddr(), hi)
hi.ID = "server"
hi.World.Port = 666
hi.MySlice = append(hi.MySlice, 1, 1, 2, 7)
msg.ReplyObj(hi)
})
<-time.After(stopTime)
s.Stop()
}
+42 -10
View File
@@ -15,6 +15,10 @@ import (
"time" "time"
) )
func init() {
Register(TransferMsg{})
}
type ServerCommon struct { type ServerCommon struct {
msgID uint64 msgID uint64
alive atomic.Value alive atomic.Value
@@ -42,6 +46,7 @@ type ServerCommon struct {
sequenceDe func([]byte) (interface{}, error) sequenceDe func([]byte) (interface{}, error)
sequenceEn func(interface{}) ([]byte, error) sequenceEn func(interface{}) ([]byte, error)
showError bool showError bool
debugMode bool
} }
func NewServer() Server { func NewServer() Server {
@@ -65,6 +70,19 @@ func NewServer() Server {
} }
return &server return &server
} }
func (s *ServerCommon) DebugMode(dmg bool) {
s.mu.Lock()
s.debugMode = dmg
s.mu.Unlock()
}
func (s *ServerCommon) IsDebugMode() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.debugMode
}
func (s *ServerCommon) ShowError(std bool) { func (s *ServerCommon) ShowError(std bool) {
s.mu.Lock() s.mu.Lock()
s.showError = std s.showError = std
@@ -185,16 +203,22 @@ func (s *ServerCommon) acceptTU() {
for { for {
select { select {
case <-s.stopCtx.Done(): case <-s.stopCtx.Done():
if s.debugMode {
fmt.Println("accept goroutine recv exit signal,exit")
}
return return
default: default:
} }
conn, err := s.listener.Accept() conn, err := s.listener.Accept()
if err != nil { if err != nil {
if s.showError { if s.showError || s.debugMode {
fmt.Println("error accept:", err) fmt.Println("error accept:", err)
} }
continue continue
} }
if s.debugMode {
fmt.Println("accept new connection from", conn.RemoteAddr())
}
var id string var id string
for { for {
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63()) id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63())
@@ -241,7 +265,7 @@ func (s *ServerCommon) loadMessage() {
s.mu.RLock() s.mu.RLock()
for _, v := range s.clientPool { for _, v := range s.clientPool {
wg.Add(1) wg.Add(1)
go func() { go func(v *ClientConn) {
defer wg.Done() defer wg.Done()
v.sayGoodByeForTU() v.sayGoodByeForTU()
v.alive.Store(false) v.alive.Store(false)
@@ -252,7 +276,7 @@ func (s *ServerCommon) loadMessage() {
} }
v.stopFn() v.stopFn()
s.removeClient(v) s.removeClient(v)
}() }(v)
} }
s.mu.RUnlock() s.mu.RUnlock()
select { select {
@@ -282,7 +306,7 @@ func (s *ServerCommon) loadMessage() {
//fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000) //fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000)
msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg)) msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg))
if err != nil { if err != nil {
if s.showError { if s.showError || s.debugMode {
fmt.Println("server decode data error", err) fmt.Println("server decode data error", err)
} }
return return
@@ -343,8 +367,8 @@ func (s *ServerCommon) dispatchMsg(message Message) {
return return
} }
//just throw //just throw
return //return
//fallthrough fallthrough
default: default:
} }
callFn := func(fn func(*Message)) { callFn := func(fn func(*Message)) {
@@ -377,7 +401,9 @@ func (s *ServerCommon) sendTU(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
data = c.msgEn(c.SecretKey, data) data = c.msgEn(c.SecretKey, data)
data = s.queue.BuildMessage(data) data = s.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 { if c.maxWriteTimeout.Seconds() != 0 {
c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)) if err := c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)); err != nil {
return WaitMsg{}, err
}
} }
_, err = c.tuConn.Write(data) _, err = c.tuConn.Write(data)
//fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000) //fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000)
@@ -416,7 +442,7 @@ func (s *ServerCommon) sendWait(c *ClientConn, msg TransferMsg, timeout time.Dur
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID) s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded return Message{}, os.ErrDeadlineExceeded
case <-s.stopCtx.Done(): case <-s.stopCtx.Done():
return Message{}, errors.New("Service shutdown") return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply: case msg, ok := <-data.Reply:
if !ok { if !ok {
return msg, os.ErrInvalid return msg, os.ErrInvalid
@@ -447,7 +473,7 @@ func (s *ServerCommon) sendCtx(c *ClientConn, msg TransferMsg, ctx context.Conte
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID) s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrClosed return Message{}, os.ErrClosed
case <-s.stopCtx.Done(): case <-s.stopCtx.Done():
return Message{}, errors.New("Service shutdown") return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply: case msg, ok := <-data.Reply:
if !ok { if !ok {
return msg, os.ErrInvalid return msg, os.ErrInvalid
@@ -524,6 +550,9 @@ func (s *ServerCommon) acceptUDP() {
for { for {
select { select {
case <-s.stopCtx.Done(): case <-s.stopCtx.Done():
if s.debugMode {
fmt.Println("accept goroutine recv exit signal,exit")
}
return return
default: default:
} }
@@ -533,6 +562,9 @@ func (s *ServerCommon) acceptUDP() {
data := make([]byte, 4096) data := make([]byte, 4096)
num, addr, err := s.udpListener.ReadFromUDP(data) num, addr, err := s.udpListener.ReadFromUDP(data)
id := addr.String() id := addr.String()
if s.debugMode {
fmt.Println("accept new udp message from", id)
}
//fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000) //fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000)
s.mu.RLock() s.mu.RLock()
if _, ok := s.clientPool[id]; !ok { if _, ok := s.clientPool[id]; !ok {
@@ -626,7 +658,7 @@ func (s *ServerCommon) GetClient(id string) *ClientConn {
func (s *ServerCommon) GetClientLists() []*ClientConn { func (s *ServerCommon) GetClientLists() []*ClientConn {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
var list []*ClientConn = make([]*ClientConn, 0, len(s.clientPool)) var list = make([]*ClientConn, 0, len(s.clientPool))
for _, v := range s.clientPool { for _, v := range s.clientPool {
list = append(list, v) list = append(list, v)
} }
+2
View File
@@ -41,6 +41,8 @@ type Server interface {
GetSequenceDe() func([]byte) (interface{}, error) GetSequenceDe() func([]byte) (interface{}, error)
SetSequenceDe(func([]byte) (interface{}, error)) SetSequenceDe(func([]byte) (interface{}, error))
ShowError(bool) ShowError(bool)
DebugMode(bool)
IsDebugMode() bool
HeartbeatTimeoutSec() int64 HeartbeatTimeoutSec() int64
SetHeartbeatTimeoutSec(int64) SetHeartbeatTimeoutSec(int64)