Compare commits
	
		
			No commits in common. "master" and "outdate" have entirely different histories.
		
	
	
		
	
		
							
								
								
									
										780
									
								
								client.go
									
									
									
									
									
								
							
							
						
						
									
										780
									
								
								client.go
									
									
									
									
									
								
							@ -1,614 +1,272 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"b612.me/starcrypto"
 | 
			
		||||
	"b612.me/stario"
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"net"
 | 
			
		||||
	"os"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"b612.me/starainrt"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ClientCommon struct {
 | 
			
		||||
	alive                      atomic.Value
 | 
			
		||||
	status                     Status
 | 
			
		||||
	byeFromServer              bool
 | 
			
		||||
	conn                       net.Conn
 | 
			
		||||
	mu                         sync.Mutex
 | 
			
		||||
	msgID                      uint64
 | 
			
		||||
	queue                      *stario.StarQueue
 | 
			
		||||
	stopFn                     context.CancelFunc
 | 
			
		||||
	stopCtx                    context.Context
 | 
			
		||||
	parallelNum                int
 | 
			
		||||
	maxReadTimeout             time.Duration
 | 
			
		||||
	maxWriteTimeout            time.Duration
 | 
			
		||||
	keyExchangeFn              func(c Client) error
 | 
			
		||||
	linkFns                    map[string]func(message *Message)
 | 
			
		||||
	defaultFns                 func(message *Message)
 | 
			
		||||
	msgEn                      func([]byte, []byte) []byte
 | 
			
		||||
	msgDe                      func([]byte, []byte) []byte
 | 
			
		||||
	noFinSyncMsgPool           sync.Map
 | 
			
		||||
	handshakeRsaPubKey         []byte
 | 
			
		||||
	SecretKey                  []byte
 | 
			
		||||
	noFinSyncMsgMaxKeepSeconds int
 | 
			
		||||
	lastHeartbeat              int64
 | 
			
		||||
	heartbeatPeriod            time.Duration
 | 
			
		||||
	wg                         stario.WaitGroup
 | 
			
		||||
	netType                    NetType
 | 
			
		||||
	showError                  bool
 | 
			
		||||
	skipKeyExchange            bool
 | 
			
		||||
	useHeartBeat               bool
 | 
			
		||||
	sequenceDe                 func([]byte) (interface{}, error)
 | 
			
		||||
	sequenceEn                 func(interface{}) ([]byte, error)
 | 
			
		||||
	debugMode                  bool
 | 
			
		||||
// StarNotifyC 为Client端
 | 
			
		||||
type StarNotifyC struct {
 | 
			
		||||
	Connc      net.Conn
 | 
			
		||||
	clientSign map[string]chan string
 | 
			
		||||
	// FuncLists 当不使用channel时,使用此记录调用函数
 | 
			
		||||
	FuncLists   map[string]func(CMsg)
 | 
			
		||||
	stopSign    context.Context
 | 
			
		||||
	cancel      context.CancelFunc
 | 
			
		||||
	defaultFunc func(CMsg)
 | 
			
		||||
	// Stop 停止信 号
 | 
			
		||||
	Stop chan int
 | 
			
		||||
	// UseChannel 是否使用channel作为信息传递
 | 
			
		||||
	UseChannel bool
 | 
			
		||||
	isUDP      bool
 | 
			
		||||
	// Queue 是用来处理收发信息的简单消息队列
 | 
			
		||||
	Queue *starainrt.StarQueue
 | 
			
		||||
	// Online 当前链接是否处于活跃状态
 | 
			
		||||
	Online   bool
 | 
			
		||||
	lockPool map[string]CMsg
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) Connect(network string, addr string) error {
 | 
			
		||||
	if c.alive.Load().(bool) {
 | 
			
		||||
		return errors.New("client already run")
 | 
			
		||||
	}
 | 
			
		||||
	c.stopCtx, c.stopFn = context.WithCancel(context.Background())
 | 
			
		||||
	c.queue = stario.NewQueueCtx(c.stopCtx, 4, math.MaxUint32)
 | 
			
		||||
	conn, err := net.Dial(network, addr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	c.alive.Store(true)
 | 
			
		||||
	c.status.Alive = true
 | 
			
		||||
	c.conn = conn
 | 
			
		||||
	if c.useHeartBeat {
 | 
			
		||||
		go c.Heartbeat()
 | 
			
		||||
	}
 | 
			
		||||
	return c.clientPostInit()
 | 
			
		||||
// CMsg 指明当前客户端被通知的关键字
 | 
			
		||||
type CMsg struct {
 | 
			
		||||
	Key   string
 | 
			
		||||
	Value string
 | 
			
		||||
	mode  string
 | 
			
		||||
	wait  chan int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) DebugMode(dmg bool) {
 | 
			
		||||
	c.mu.Lock()
 | 
			
		||||
	c.debugMode = dmg
 | 
			
		||||
	c.mu.Unlock()
 | 
			
		||||
func (star *StarNotifyC) starinitc() {
 | 
			
		||||
	star.stopSign, star.cancel = context.WithCancel(context.Background())
 | 
			
		||||
	star.Queue = starainrt.NewQueue()
 | 
			
		||||
	star.FuncLists = make(map[string]func(CMsg))
 | 
			
		||||
	star.UseChannel = true
 | 
			
		||||
	star.Stop = make(chan int, 5)
 | 
			
		||||
	star.clientSign = make(map[string]chan string)
 | 
			
		||||
	star.Online = false
 | 
			
		||||
	star.lockPool = make(map[string]CMsg)
 | 
			
		||||
	star.Queue.RestoreDuration(time.Second * 2)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) IsDebugMode() bool {
 | 
			
		||||
	return c.debugMode
 | 
			
		||||
// Notify 用于获取一个通知
 | 
			
		||||
func (star *StarNotifyC) Notify(key string) chan string {
 | 
			
		||||
	if _, ok := star.clientSign[key]; !ok {
 | 
			
		||||
		ch := make(chan string, 20)
 | 
			
		||||
		star.clientSign[key] = ch
 | 
			
		||||
	}
 | 
			
		||||
	return star.clientSign[key]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) ConnectTimeout(network string, addr string, timeout time.Duration) error {
 | 
			
		||||
	if c.alive.Load().(bool) {
 | 
			
		||||
		return errors.New("client already run")
 | 
			
		||||
	}
 | 
			
		||||
	c.stopCtx, c.stopFn = context.WithCancel(context.Background())
 | 
			
		||||
	c.queue = stario.NewQueueCtx(c.stopCtx, 4, math.MaxUint32)
 | 
			
		||||
	conn, err := net.DialTimeout(network, addr, timeout)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	c.alive.Store(true)
 | 
			
		||||
	c.status.Alive = true
 | 
			
		||||
	c.conn = conn
 | 
			
		||||
	if c.useHeartBeat {
 | 
			
		||||
		go c.Heartbeat()
 | 
			
		||||
	}
 | 
			
		||||
	return c.clientPostInit()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) monitorPool() {
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-c.stopCtx.Done():
 | 
			
		||||
			c.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
 | 
			
		||||
				data := v.(WaitMsg)
 | 
			
		||||
				close(data.Reply)
 | 
			
		||||
				c.noFinSyncMsgPool.Delete(k)
 | 
			
		||||
				return true
 | 
			
		||||
			})
 | 
			
		||||
			return
 | 
			
		||||
		case <-time.After(time.Second * 30):
 | 
			
		||||
		}
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		if c.noFinSyncMsgMaxKeepSeconds > 0 {
 | 
			
		||||
			c.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
 | 
			
		||||
				data := v.(WaitMsg)
 | 
			
		||||
				if data.Time.Add(time.Duration(c.noFinSyncMsgMaxKeepSeconds) * time.Second).Before(now) {
 | 
			
		||||
					close(data.Reply)
 | 
			
		||||
					c.noFinSyncMsgPool.Delete(k)
 | 
			
		||||
				}
 | 
			
		||||
				return true
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SkipExchangeKey() bool {
 | 
			
		||||
	return c.skipKeyExchange
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SetSkipExchangeKey(val bool) {
 | 
			
		||||
	c.skipKeyExchange = val
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) clientPostInit() error {
 | 
			
		||||
	go c.readMessage()
 | 
			
		||||
	go c.loadMessage()
 | 
			
		||||
	if !c.skipKeyExchange {
 | 
			
		||||
		err := c.keyExchangeFn(c)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			c.alive.Store(false)
 | 
			
		||||
			c.mu.Lock()
 | 
			
		||||
			c.status = Status{
 | 
			
		||||
				Alive:  false,
 | 
			
		||||
				Reason: "key exchange failed",
 | 
			
		||||
				Err:    err,
 | 
			
		||||
			}
 | 
			
		||||
			c.mu.Unlock()
 | 
			
		||||
			c.stopFn()
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
func NewClient() Client {
 | 
			
		||||
	var client = ClientCommon{
 | 
			
		||||
		maxReadTimeout:     0,
 | 
			
		||||
		maxWriteTimeout:    0,
 | 
			
		||||
		sequenceEn:         encode,
 | 
			
		||||
		sequenceDe:         Decode,
 | 
			
		||||
		keyExchangeFn:      aesRsaHello,
 | 
			
		||||
		SecretKey:          defaultAesKey,
 | 
			
		||||
		handshakeRsaPubKey: defaultRsaPubKey,
 | 
			
		||||
		msgEn:              defaultMsgEn,
 | 
			
		||||
		msgDe:              defaultMsgDe,
 | 
			
		||||
	}
 | 
			
		||||
	client.alive.Store(false)
 | 
			
		||||
	//heartbeat should not controlable for user
 | 
			
		||||
	client.useHeartBeat = true
 | 
			
		||||
	client.heartbeatPeriod = time.Second * 20
 | 
			
		||||
	client.linkFns = make(map[string]func(*Message))
 | 
			
		||||
	client.defaultFns = func(message *Message) {
 | 
			
		||||
func (star *StarNotifyC) store(key, value string) {
 | 
			
		||||
	if _, ok := star.clientSign[key]; !ok {
 | 
			
		||||
		ch := make(chan string, 20)
 | 
			
		||||
		ch <- value
 | 
			
		||||
		star.clientSign[key] = ch
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	client.wg = stario.NewWaitGroup(0)
 | 
			
		||||
	client.stopCtx, client.stopFn = context.WithCancel(context.Background())
 | 
			
		||||
	return &client
 | 
			
		||||
	star.clientSign[key] <- value
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) Heartbeat() {
 | 
			
		||||
	failedCount := 0
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-c.stopCtx.Done():
 | 
			
		||||
			return
 | 
			
		||||
		case <-time.After(c.heartbeatPeriod):
 | 
			
		||||
		}
 | 
			
		||||
		_, err := c.sendWait(TransferMsg{
 | 
			
		||||
			ID:    10000,
 | 
			
		||||
			Key:   "heartbeat",
 | 
			
		||||
			Value: nil,
 | 
			
		||||
			Type:  MSG_SYS_WAIT,
 | 
			
		||||
		}, time.Second*5)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			c.lastHeartbeat = time.Now().Unix()
 | 
			
		||||
			failedCount = 0
 | 
			
		||||
		}
 | 
			
		||||
		if c.debugMode {
 | 
			
		||||
			fmt.Println("failed to recv heartbeat,timeout!")
 | 
			
		||||
		}
 | 
			
		||||
		failedCount++
 | 
			
		||||
		if failedCount >= 3 {
 | 
			
		||||
			if c.debugMode {
 | 
			
		||||
				fmt.Println("heatbeat failed more than 3 times,stop client")
 | 
			
		||||
// NewNotifyC 用于新建一个Client端进程
 | 
			
		||||
func NewNotifyC(netype, value string) (*StarNotifyC, error) {
 | 
			
		||||
	var err error
 | 
			
		||||
	var star StarNotifyC
 | 
			
		||||
	star.starinitc()
 | 
			
		||||
	star.isUDP = false
 | 
			
		||||
	if strings.Index(netype, "udp") >= 0 {
 | 
			
		||||
		star.isUDP = true
 | 
			
		||||
	}
 | 
			
		||||
	star.Connc, err = net.Dial(netype, value)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	go star.cnotify()
 | 
			
		||||
	go func() {
 | 
			
		||||
		<-star.stopSign.Done()
 | 
			
		||||
		star.Connc.Close()
 | 
			
		||||
		star.Online = false
 | 
			
		||||
		return
 | 
			
		||||
	}()
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			buf := make([]byte, 8192)
 | 
			
		||||
			n, err := star.Connc.Read(buf)
 | 
			
		||||
			if n != 0 {
 | 
			
		||||
				star.Queue.ParseMessage(buf[0:n], star.Connc)
 | 
			
		||||
			}
 | 
			
		||||
			c.alive.Store(false)
 | 
			
		||||
			c.mu.Lock()
 | 
			
		||||
			c.status = Status{
 | 
			
		||||
				Alive:  false,
 | 
			
		||||
				Reason: "heartbeat failed more than 3 times",
 | 
			
		||||
				Err:    errors.New("heartbeat failed more than 3 times"),
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				star.Connc.Close()
 | 
			
		||||
				star.ClientStop()
 | 
			
		||||
				//star, _ = NewNotifyC(netype, value)
 | 
			
		||||
				star.Online = false
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			c.mu.Unlock()
 | 
			
		||||
			c.stopFn()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	star.Online = true
 | 
			
		||||
	return &star, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Send 用于向Server端发送数据
 | 
			
		||||
func (star *StarNotifyC) Send(name string) error {
 | 
			
		||||
	return star.SendValue(name, "")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SendValue 用于向Server端发送key-value类型数据
 | 
			
		||||
func (star *StarNotifyC) SendValue(name, value string) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	var key []byte
 | 
			
		||||
	for _, v := range []byte(name) {
 | 
			
		||||
		if v == byte(124) || v == byte(92) {
 | 
			
		||||
			key = append(key, byte(92))
 | 
			
		||||
		}
 | 
			
		||||
		key = append(key, v)
 | 
			
		||||
	}
 | 
			
		||||
	_, err = star.Connc.Write(star.Queue.BuildMessage([]byte("pa" + "||" + string(key) + "||" + value)))
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (star *StarNotifyC) trim(name string) string {
 | 
			
		||||
	var slash bool = false
 | 
			
		||||
	var key []byte
 | 
			
		||||
	for _, v := range []byte(name) {
 | 
			
		||||
		if v == byte(92) && !slash {
 | 
			
		||||
			slash = true
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		slash = false
 | 
			
		||||
		key = append(key, v)
 | 
			
		||||
	}
 | 
			
		||||
	return string(key)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SendValueWait 用于向Server端发送key-value类型数据并等待结果返回,此结果不会通过标准返回流程处理
 | 
			
		||||
func (star *StarNotifyC) SendValueWait(name, value string, tmout time.Duration) (CMsg, error) {
 | 
			
		||||
	var err error
 | 
			
		||||
	var tmceed <-chan time.Time
 | 
			
		||||
	if star.UseChannel {
 | 
			
		||||
		return CMsg{}, errors.New("Do Not Use UseChannel Mode!")
 | 
			
		||||
	}
 | 
			
		||||
	rand.Seed(time.Now().UnixNano())
 | 
			
		||||
	mode := "cr" + fmt.Sprintf("%05d", rand.Intn(99999))
 | 
			
		||||
	var key []byte
 | 
			
		||||
	for _, v := range []byte(name) {
 | 
			
		||||
		if v == byte(124) || v == byte(92) {
 | 
			
		||||
			key = append(key, byte(92))
 | 
			
		||||
		}
 | 
			
		||||
		key = append(key, v)
 | 
			
		||||
	}
 | 
			
		||||
	_, err = star.Connc.Write(star.Queue.BuildMessage([]byte(mode + "||" + string(key) + "||" + value)))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return CMsg{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if int64(tmout) > 0 {
 | 
			
		||||
		tmceed = time.After(tmout)
 | 
			
		||||
	}
 | 
			
		||||
	var source CMsg
 | 
			
		||||
	source.wait = make(chan int, 2)
 | 
			
		||||
	star.lockPool[mode] = source
 | 
			
		||||
	select {
 | 
			
		||||
	case <-source.wait:
 | 
			
		||||
		res := star.lockPool[mode]
 | 
			
		||||
		delete(star.lockPool, mode)
 | 
			
		||||
		return res, nil
 | 
			
		||||
	case <-tmceed:
 | 
			
		||||
		return CMsg{}, errors.New("Time Exceed")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) ShowError(std bool) {
 | 
			
		||||
	c.mu.Lock()
 | 
			
		||||
	c.showError = std
 | 
			
		||||
	c.mu.Unlock()
 | 
			
		||||
// ReplyMsg 用于向Server端Reply信息
 | 
			
		||||
func (star *StarNotifyC) ReplyMsg(data CMsg, name, value string) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	var key []byte
 | 
			
		||||
	for _, v := range []byte(name) {
 | 
			
		||||
		if v == byte(124) || v == byte(92) {
 | 
			
		||||
			key = append(key, byte(92))
 | 
			
		||||
		}
 | 
			
		||||
		key = append(key, v)
 | 
			
		||||
	}
 | 
			
		||||
	_, err = star.Connc.Write(star.Queue.BuildMessage([]byte(data.mode + "||" + string(key) + "||" + value)))
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) readMessage() {
 | 
			
		||||
func (star *StarNotifyC) cnotify() {
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-c.stopCtx.Done():
 | 
			
		||||
			c.conn.Close()
 | 
			
		||||
		case <-star.stopSign.Done():
 | 
			
		||||
			return
 | 
			
		||||
		default:
 | 
			
		||||
		}
 | 
			
		||||
		data := make([]byte, 8192)
 | 
			
		||||
		if c.maxReadTimeout.Seconds() != 0 {
 | 
			
		||||
			if err := c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)); err != nil {
 | 
			
		||||
				//TODO:ALERT
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		readNum, err := c.conn.Read(data)
 | 
			
		||||
		if err == os.ErrDeadlineExceeded {
 | 
			
		||||
			if readNum != 0 {
 | 
			
		||||
				c.queue.ParseMessage(data[:readNum], "b612")
 | 
			
		||||
			}
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		data, err := star.Queue.RestoreOne()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if c.showError || c.debugMode {
 | 
			
		||||
				fmt.Println("client read error", err)
 | 
			
		||||
			}
 | 
			
		||||
			c.alive.Store(false)
 | 
			
		||||
			c.mu.Lock()
 | 
			
		||||
			c.status = Status{
 | 
			
		||||
				Alive:  false,
 | 
			
		||||
				Reason: "client read error",
 | 
			
		||||
				Err:    err,
 | 
			
		||||
			}
 | 
			
		||||
			c.mu.Unlock()
 | 
			
		||||
			c.stopFn()
 | 
			
		||||
			time.Sleep(time.Millisecond * 20)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		c.queue.ParseMessage(data[:readNum], "b612")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) sayGoodBye() error {
 | 
			
		||||
	_, err := c.sendWait(TransferMsg{
 | 
			
		||||
		ID:    10010,
 | 
			
		||||
		Key:   "bye",
 | 
			
		||||
		Value: nil,
 | 
			
		||||
		Type:  MSG_SYS_WAIT,
 | 
			
		||||
	}, time.Second*3)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) loadMessage() {
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-c.stopCtx.Done():
 | 
			
		||||
			//say goodbye
 | 
			
		||||
			if !c.byeFromServer {
 | 
			
		||||
				c.sayGoodBye()
 | 
			
		||||
			}
 | 
			
		||||
			c.conn.Close()
 | 
			
		||||
		if string(data.Msg) == "b612ryzstop" {
 | 
			
		||||
			star.ClientStop()
 | 
			
		||||
			star.Online = false
 | 
			
		||||
			return
 | 
			
		||||
		case data, ok := <-c.queue.RestoreChan():
 | 
			
		||||
			if !ok {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			c.wg.Add(1)
 | 
			
		||||
			go func(data stario.MsgQueue) {
 | 
			
		||||
				defer c.wg.Done()
 | 
			
		||||
				//fmt.Println("c received:", float64(time.Now().UnixNano()-nowd)/1000000)
 | 
			
		||||
				now := time.Now()
 | 
			
		||||
				//transfer to Msg
 | 
			
		||||
				msg, err := c.sequenceDe(c.msgDe(c.SecretKey, data.Msg))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					if c.showError || c.debugMode {
 | 
			
		||||
						fmt.Println("client decode data error", err)
 | 
			
		||||
		}
 | 
			
		||||
		strs := strings.SplitN(string(data.Msg), "||", 3)
 | 
			
		||||
		if len(strs) < 3 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		strs[1] = star.trim(strs[1])
 | 
			
		||||
		if star.UseChannel {
 | 
			
		||||
			go star.store(strs[1], strs[2])
 | 
			
		||||
		} else {
 | 
			
		||||
			mode, key, value := strs[0], strs[1], strs[2]
 | 
			
		||||
			if mode[0:2] != "cr" {
 | 
			
		||||
				if msg, ok := star.FuncLists[key]; ok {
 | 
			
		||||
					go msg(CMsg{key, value, mode, nil})
 | 
			
		||||
				} else {
 | 
			
		||||
					if star.defaultFunc != nil {
 | 
			
		||||
						go star.defaultFunc(CMsg{key, value, mode, nil})
 | 
			
		||||
					}
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				message := Message{
 | 
			
		||||
					ServerConn:  c,
 | 
			
		||||
					TransferMsg: msg.(TransferMsg),
 | 
			
		||||
					NetType:     NET_CLIENT,
 | 
			
		||||
			} else {
 | 
			
		||||
				if sa, ok := star.lockPool[mode]; ok {
 | 
			
		||||
					sa.Key = key
 | 
			
		||||
					sa.Value = value
 | 
			
		||||
					sa.mode = mode
 | 
			
		||||
					star.lockPool[mode] = sa
 | 
			
		||||
					sa.wait <- 1
 | 
			
		||||
				} else {
 | 
			
		||||
					if msg, ok := star.FuncLists[key]; ok {
 | 
			
		||||
						go msg(CMsg{key, value, mode, nil})
 | 
			
		||||
					} else {
 | 
			
		||||
						if star.defaultFunc != nil {
 | 
			
		||||
							go star.defaultFunc(CMsg{key, value, mode, nil})
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				message.Time = now
 | 
			
		||||
				c.dispatchMsg(message)
 | 
			
		||||
			}(data)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) dispatchMsg(message Message) {
 | 
			
		||||
	switch message.TransferMsg.Type {
 | 
			
		||||
	case MSG_SYS_WAIT:
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case MSG_SYS:
 | 
			
		||||
		c.sysMsg(message)
 | 
			
		||||
		return
 | 
			
		||||
	case MSG_KEY_CHANGE:
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case MSG_SYS_REPLY:
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case MSG_SYNC_REPLY:
 | 
			
		||||
		data, ok := c.noFinSyncMsgPool.Load(message.ID)
 | 
			
		||||
		if ok {
 | 
			
		||||
			wait := data.(WaitMsg)
 | 
			
		||||
			wait.Reply <- message
 | 
			
		||||
			c.noFinSyncMsgPool.Delete(message.ID)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		//return
 | 
			
		||||
		fallthrough
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
	callFn := func(fn func(*Message)) {
 | 
			
		||||
		fn(&message)
 | 
			
		||||
	}
 | 
			
		||||
	fn, ok := c.linkFns[message.Key]
 | 
			
		||||
	if ok {
 | 
			
		||||
		callFn(fn)
 | 
			
		||||
	}
 | 
			
		||||
	if c.defaultFns != nil {
 | 
			
		||||
		callFn(c.defaultFns)
 | 
			
		||||
// ClientStop 终止client端运行
 | 
			
		||||
func (star *StarNotifyC) ClientStop() {
 | 
			
		||||
	if star.isUDP {
 | 
			
		||||
		star.Send("b612ryzstop")
 | 
			
		||||
	}
 | 
			
		||||
	star.cancel()
 | 
			
		||||
	star.Stop <- 1
 | 
			
		||||
	star.Stop <- 1
 | 
			
		||||
	star.Stop <- 1
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) sysMsg(message Message) {
 | 
			
		||||
	switch message.Key {
 | 
			
		||||
	case "bye":
 | 
			
		||||
		if message.TransferMsg.Type == MSG_SYS_WAIT {
 | 
			
		||||
			//fmt.Println("recv stop signal from server")
 | 
			
		||||
			c.byeFromServer = true
 | 
			
		||||
			message.Reply(nil)
 | 
			
		||||
		}
 | 
			
		||||
		c.alive.Store(false)
 | 
			
		||||
		c.mu.Lock()
 | 
			
		||||
		c.status = Status{
 | 
			
		||||
			Alive:  false,
 | 
			
		||||
			Reason: "recv stop signal from server",
 | 
			
		||||
			Err:    nil,
 | 
			
		||||
		}
 | 
			
		||||
		c.mu.Unlock()
 | 
			
		||||
		c.stopFn()
 | 
			
		||||
	}
 | 
			
		||||
// SetNotify 用于设置关键词的调用函数
 | 
			
		||||
func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) {
 | 
			
		||||
	star.FuncLists[name] = data
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SetDefaultLink(fn func(message *Message)) {
 | 
			
		||||
	c.defaultFns = fn
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SetLink(key string, fn func(*Message)) {
 | 
			
		||||
	c.mu.Lock()
 | 
			
		||||
	defer c.mu.Unlock()
 | 
			
		||||
	c.linkFns[key] = fn
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) send(msg TransferMsg) (WaitMsg, error) {
 | 
			
		||||
	var wait WaitMsg
 | 
			
		||||
	if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
 | 
			
		||||
		msg.ID = atomic.AddUint64(&c.msgID, 1)
 | 
			
		||||
	}
 | 
			
		||||
	data, err := c.sequenceEn(msg)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return WaitMsg{}, err
 | 
			
		||||
	}
 | 
			
		||||
	data = c.msgEn(c.SecretKey, data)
 | 
			
		||||
	data = c.queue.BuildMessage(data)
 | 
			
		||||
	if c.maxWriteTimeout.Seconds() != 0 {
 | 
			
		||||
		c.conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
 | 
			
		||||
	}
 | 
			
		||||
	_, err = c.conn.Write(data)
 | 
			
		||||
	if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_KEY_CHANGE || msg.Type == MSG_SYS_WAIT) {
 | 
			
		||||
		wait.Time = time.Now()
 | 
			
		||||
		wait.TransferMsg = msg
 | 
			
		||||
		wait.Reply = make(chan Message, 1)
 | 
			
		||||
		c.noFinSyncMsgPool.Store(msg.ID, wait)
 | 
			
		||||
	}
 | 
			
		||||
	return wait, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) Send(key string, value MsgVal) error {
 | 
			
		||||
	_, err := c.send(TransferMsg{
 | 
			
		||||
		Key:   key,
 | 
			
		||||
		Value: value,
 | 
			
		||||
		Type:  MSG_ASYNC,
 | 
			
		||||
	})
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message, error) {
 | 
			
		||||
	data, err := c.send(msg)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return Message{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if timeout.Seconds() == 0 {
 | 
			
		||||
		msg, ok := <-data.Reply
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return msg, os.ErrInvalid
 | 
			
		||||
		}
 | 
			
		||||
		return msg, nil
 | 
			
		||||
	}
 | 
			
		||||
	select {
 | 
			
		||||
	case <-time.After(timeout):
 | 
			
		||||
		close(data.Reply)
 | 
			
		||||
		c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
 | 
			
		||||
		return Message{}, os.ErrDeadlineExceeded
 | 
			
		||||
	case <-c.stopCtx.Done():
 | 
			
		||||
		return Message{}, errors.New("service shutdown")
 | 
			
		||||
	case msg, ok := <-data.Reply:
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return msg, os.ErrInvalid
 | 
			
		||||
		}
 | 
			
		||||
		return msg, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, error) {
 | 
			
		||||
	data, err := c.send(msg)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return Message{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if ctx == nil {
 | 
			
		||||
		ctx = context.Background()
 | 
			
		||||
	}
 | 
			
		||||
	select {
 | 
			
		||||
	case <-ctx.Done():
 | 
			
		||||
		close(data.Reply)
 | 
			
		||||
		c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
 | 
			
		||||
		return Message{}, os.ErrDeadlineExceeded
 | 
			
		||||
	case <-c.stopCtx.Done():
 | 
			
		||||
		return Message{}, errors.New("service shutdown")
 | 
			
		||||
	case msg, ok := <-data.Reply:
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return msg, os.ErrInvalid
 | 
			
		||||
		}
 | 
			
		||||
		return msg, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error) {
 | 
			
		||||
	data, err := c.sequenceEn(val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return Message{}, err
 | 
			
		||||
	}
 | 
			
		||||
	return c.sendCtx(TransferMsg{
 | 
			
		||||
		Key:   key,
 | 
			
		||||
		Value: data,
 | 
			
		||||
		Type:  MSG_SYNC_ASK,
 | 
			
		||||
	}, ctx)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SendObj(key string, val interface{}) error {
 | 
			
		||||
	data, err := encode(val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	_, err = c.send(TransferMsg{
 | 
			
		||||
		Key:   key,
 | 
			
		||||
		Value: data,
 | 
			
		||||
		Type:  MSG_ASYNC,
 | 
			
		||||
	})
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) {
 | 
			
		||||
	return c.sendCtx(TransferMsg{
 | 
			
		||||
		Key:   key,
 | 
			
		||||
		Value: value,
 | 
			
		||||
		Type:  MSG_SYNC_ASK,
 | 
			
		||||
	}, ctx)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) {
 | 
			
		||||
	return c.sendWait(TransferMsg{
 | 
			
		||||
		Key:   key,
 | 
			
		||||
		Value: value,
 | 
			
		||||
		Type:  MSG_SYNC_ASK,
 | 
			
		||||
	}, timeout)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) {
 | 
			
		||||
	data, err := c.sequenceEn(value)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return Message{}, err
 | 
			
		||||
	}
 | 
			
		||||
	return c.SendWait(key, data, timeout)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) Reply(m Message, value MsgVal) error {
 | 
			
		||||
	return m.Reply(value)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) ExchangeKey(newKey []byte) error {
 | 
			
		||||
	pubKey, err := starcrypto.DecodeRsaPublicKey(c.handshakeRsaPubKey)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	newSendKey, err := starcrypto.RSAEncrypt(pubKey, newKey)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	data, err := c.sendWait(TransferMsg{
 | 
			
		||||
		ID:    19961127,
 | 
			
		||||
		Key:   "sirius",
 | 
			
		||||
		Value: newSendKey,
 | 
			
		||||
		Type:  MSG_KEY_CHANGE,
 | 
			
		||||
	}, time.Second*10)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if string(data.Value) != "success" {
 | 
			
		||||
		return errors.New("cannot exchange new aes-key")
 | 
			
		||||
	}
 | 
			
		||||
	c.SecretKey = newKey
 | 
			
		||||
	time.Sleep(time.Millisecond * 100)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func aesRsaHello(c Client) error {
 | 
			
		||||
	newAesKey := []byte(fmt.Sprintf("%d%d%d%s", time.Now().UnixNano(), rand.Int63(), rand.Int63(), "b612.me"))
 | 
			
		||||
	newAesKey = []byte(starcrypto.Md5Str(newAesKey))
 | 
			
		||||
	return c.ExchangeKey(newAesKey)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) GetMsgEn() func([]byte, []byte) []byte {
 | 
			
		||||
	return c.msgEn
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) SetMsgEn(fn func([]byte, []byte) []byte) {
 | 
			
		||||
	c.msgEn = fn
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) GetMsgDe() func([]byte, []byte) []byte {
 | 
			
		||||
	return c.msgDe
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) SetMsgDe(fn func([]byte, []byte) []byte) {
 | 
			
		||||
	c.msgDe = fn
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) HeartbeatPeroid() time.Duration {
 | 
			
		||||
	return c.heartbeatPeriod
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) SetHeartbeatPeroid(duration time.Duration) {
 | 
			
		||||
	c.heartbeatPeriod = duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) GetSecretKey() []byte {
 | 
			
		||||
	return c.SecretKey
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) SetSecretKey(key []byte) {
 | 
			
		||||
	c.SecretKey = key
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) RsaPubKey() []byte {
 | 
			
		||||
	return c.handshakeRsaPubKey
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) SetRsaPubKey(key []byte) {
 | 
			
		||||
	c.handshakeRsaPubKey = key
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) Stop() error {
 | 
			
		||||
	if !c.alive.Load().(bool) {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	c.alive.Store(false)
 | 
			
		||||
	c.mu.Lock()
 | 
			
		||||
	c.status = Status{
 | 
			
		||||
		Alive:  false,
 | 
			
		||||
		Reason: "recv stop signal from user",
 | 
			
		||||
		Err:    nil,
 | 
			
		||||
	}
 | 
			
		||||
	c.mu.Unlock()
 | 
			
		||||
	c.stopFn()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) StopMonitorChan() <-chan struct{} {
 | 
			
		||||
	return c.stopCtx.Done()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) Status() Status {
 | 
			
		||||
	return c.status
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientCommon) GetSequenceEn() func(interface{}) ([]byte, error) {
 | 
			
		||||
	return c.sequenceEn
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) SetSequenceEn(fn func(interface{}) ([]byte, error)) {
 | 
			
		||||
	c.sequenceEn = fn
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) GetSequenceDe() func([]byte) (interface{}, error) {
 | 
			
		||||
	return c.sequenceDe
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientCommon) SetSequenceDe(fn func([]byte) (interface{}, error)) {
 | 
			
		||||
	c.sequenceDe = fn
 | 
			
		||||
// SetDefaultNotify 用于设置默认关键词的调用函数
 | 
			
		||||
func (star *StarNotifyC) SetDefaultNotify(data func(CMsg)) {
 | 
			
		||||
	star.defaultFunc = data
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										149
									
								
								client_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										149
									
								
								client_test.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,149 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Test_usechannel(t *testing.T) {
 | 
			
		||||
	server, err := NewNotifyS("udp", "127.0.0.1:1926")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	server.SetNotify("nihao", func(data SMsg) string {
 | 
			
		||||
		fmt.Println("server recv:", data.Key, data.Value)
 | 
			
		||||
		if data.Value != "" {
 | 
			
		||||
			data.Reply("nba")
 | 
			
		||||
			return "nb"
 | 
			
		||||
		}
 | 
			
		||||
		return ""
 | 
			
		||||
	})
 | 
			
		||||
	client, err := NewNotifyC("udp", "127.0.0.1:1926")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	//time.Sleep(time.Second * 10)
 | 
			
		||||
	client.Send("nihao")
 | 
			
		||||
	client.SendValue("nihao", "lalala")
 | 
			
		||||
	txt := <-client.Notify("nihao")
 | 
			
		||||
	fmt.Println("client", txt)
 | 
			
		||||
	txt = <-client.Notify("nihao")
 | 
			
		||||
	fmt.Println("client", txt)
 | 
			
		||||
	server.ServerStop()
 | 
			
		||||
	<-client.Stop
 | 
			
		||||
	client.ClientStop()
 | 
			
		||||
	time.Sleep(time.Second * 3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_nochannel(t *testing.T) {
 | 
			
		||||
	server, err := NewNotifyS("udp", "127.0.0.1:1926")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	server.SetNotify("nihao", func(data SMsg) string {
 | 
			
		||||
		fmt.Println("server recv:", data.Key, data.Value)
 | 
			
		||||
		if data.Value != "" {
 | 
			
		||||
			data.Reply("nbaz")
 | 
			
		||||
			return ""
 | 
			
		||||
		}
 | 
			
		||||
		return ""
 | 
			
		||||
	})
 | 
			
		||||
	client, err := NewNotifyC("udp", "127.0.0.1:1926")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	//time.Sleep(time.Second * 10)
 | 
			
		||||
	client.UseChannel = false
 | 
			
		||||
	client.SetNotify("nihao", func(data CMsg) {
 | 
			
		||||
		fmt.Println("client recv:", data.Key, data.Value)
 | 
			
		||||
		if data.Value != "" {
 | 
			
		||||
			time.Sleep(time.Millisecond * 900)
 | 
			
		||||
			client.SendValue("nihao", "dsb")
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
	client.SendValue("nihao", "lalala")
 | 
			
		||||
	time.Sleep(time.Second * 3)
 | 
			
		||||
	server.ServerStop()
 | 
			
		||||
	<-client.Stop
 | 
			
		||||
	client.ClientStop()
 | 
			
		||||
	time.Sleep(time.Second * 3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_pipec(t *testing.T) {
 | 
			
		||||
	server, err := NewNotifyS("tcp", "127.0.0.1:1926")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	server.SetNotify("ni\\||hao", func(data SMsg) string {
 | 
			
		||||
		fmt.Println("server recv:", data.Key, data.Value, data.mode)
 | 
			
		||||
		if data.Value != "" {
 | 
			
		||||
			data.Reply("nba")
 | 
			
		||||
			return ""
 | 
			
		||||
		}
 | 
			
		||||
		return ""
 | 
			
		||||
	})
 | 
			
		||||
	client, err := NewNotifyC("tcp", "127.0.0.1:1926")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	client.UseChannel = false
 | 
			
		||||
	sa, err := client.SendValueWait("ni\\||hao", "lalaeee", time.Second*10)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	fmt.Println(sa)
 | 
			
		||||
	fmt.Println("sukidesu")
 | 
			
		||||
	time.Sleep(time.Second * 3)
 | 
			
		||||
	server.ServerStop()
 | 
			
		||||
	<-client.Stop
 | 
			
		||||
	client.ClientStop()
 | 
			
		||||
	time.Sleep(time.Second * 2)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_pips(t *testing.T) {
 | 
			
		||||
	var testmsg SMsg
 | 
			
		||||
	server, err := NewNotifyS("udp", "127.0.0.1:1926")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	server.SetNotify("nihao", func(data SMsg) string {
 | 
			
		||||
		fmt.Println("server recv:", data.Key, data.Value, data.mode)
 | 
			
		||||
		testmsg = data
 | 
			
		||||
		if data.Value != "" {
 | 
			
		||||
			data.Reply("nbaz")
 | 
			
		||||
			return ""
 | 
			
		||||
		}
 | 
			
		||||
		return ""
 | 
			
		||||
	})
 | 
			
		||||
	client, err := NewNotifyC("udp", "127.0.0.1:1926")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	//time.Sleep(time.Second * 10)
 | 
			
		||||
	client.UseChannel = false
 | 
			
		||||
	client.SetNotify("nihao", func(data CMsg) {
 | 
			
		||||
		fmt.Println("client recv:", data.Key, data.Value, data.mode)
 | 
			
		||||
		if data.mode != "pa" {
 | 
			
		||||
			time.Sleep(time.Millisecond * 1200)
 | 
			
		||||
			client.ReplyMsg(data, "nihao", "dsb")
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
	client.SendValue("nihao", "lalala")
 | 
			
		||||
	time.Sleep(time.Second * 3)
 | 
			
		||||
	fmt.Println(server.SendWait(testmsg, "nihao", "wozuinb", time.Second*20))
 | 
			
		||||
	fmt.Println("sakura")
 | 
			
		||||
	server.ServerStop()
 | 
			
		||||
	<-client.Stop
 | 
			
		||||
	client.ClientStop()
 | 
			
		||||
	time.Sleep(time.Second * 3)
 | 
			
		||||
}
 | 
			
		||||
@ -1,51 +0,0 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Client interface {
 | 
			
		||||
	SetDefaultLink(func(message *Message))
 | 
			
		||||
	SetLink(string, func(*Message))
 | 
			
		||||
	send(msg TransferMsg) (WaitMsg, error)
 | 
			
		||||
	sendWait(msg TransferMsg, timeout time.Duration) (Message, error)
 | 
			
		||||
	Send(key string, value MsgVal) error
 | 
			
		||||
	SendWait(key string, value MsgVal, timeout time.Duration) (Message, error)
 | 
			
		||||
	SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error)
 | 
			
		||||
	SendCtx(ctx context.Context, key string, value MsgVal) (Message, error)
 | 
			
		||||
	Reply(m Message, value MsgVal) error
 | 
			
		||||
	ExchangeKey(newKey []byte) error
 | 
			
		||||
	Connect(network string, addr string) error
 | 
			
		||||
	ConnectTimeout(network string, addr string, timeout time.Duration) error
 | 
			
		||||
	SkipExchangeKey() bool
 | 
			
		||||
	SetSkipExchangeKey(bool)
 | 
			
		||||
 | 
			
		||||
	GetMsgEn() func([]byte, []byte) []byte
 | 
			
		||||
	SetMsgEn(func([]byte, []byte) []byte)
 | 
			
		||||
	GetMsgDe() func([]byte, []byte) []byte
 | 
			
		||||
	SetMsgDe(func([]byte, []byte) []byte)
 | 
			
		||||
 | 
			
		||||
	Heartbeat()
 | 
			
		||||
	HeartbeatPeroid() time.Duration
 | 
			
		||||
	SetHeartbeatPeroid(duration time.Duration)
 | 
			
		||||
 | 
			
		||||
	GetSecretKey() []byte
 | 
			
		||||
	SetSecretKey(key []byte)
 | 
			
		||||
	RsaPubKey() []byte
 | 
			
		||||
	SetRsaPubKey([]byte)
 | 
			
		||||
 | 
			
		||||
	Stop() error
 | 
			
		||||
	StopMonitorChan() <-chan struct{}
 | 
			
		||||
	Status() Status
 | 
			
		||||
	ShowError(bool)
 | 
			
		||||
	DebugMode(bool)
 | 
			
		||||
	IsDebugMode() bool
 | 
			
		||||
 | 
			
		||||
	GetSequenceEn() func(interface{}) ([]byte, error)
 | 
			
		||||
	SetSequenceEn(func(interface{}) ([]byte, error))
 | 
			
		||||
	GetSequenceDe() func([]byte) (interface{}, error)
 | 
			
		||||
	SetSequenceDe(func([]byte) (interface{}, error))
 | 
			
		||||
	SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error)
 | 
			
		||||
	SendObj(key string, val interface{}) error
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										97
									
								
								default.go
									
									
									
									
									
								
							
							
						
						
									
										97
									
								
								default.go
									
									
									
									
									
								
							@ -1,97 +0,0 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"b612.me/starcrypto"
 | 
			
		||||
	"log"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var defaultRsaKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
 | 
			
		||||
MIIJKAIBAAKCAgEAxmeMqr9yfJFKZn26oe/HvC7bZXNLC9Nk55AuTkb4XuIoqXDb
 | 
			
		||||
AJD2Y/p167oJLKIqL3edcj7h+oTfn6s79vxT0ZCEf37ILU0G+scRzVwYHiLMwOUC
 | 
			
		||||
bS2o4Xor3zqUi9f1piJBvoBNh8RKKtsmJW6VQZdiUGJHbgX4MdOdtf/6TvxZMwSX
 | 
			
		||||
U+PRSCAjy04A31Zi7DEWUWJPyqmHeu++PxXU5lvoMdCGDqpcF2j2uO7oJJUww01M
 | 
			
		||||
3F5FtTElMrK4/P9gD4kP7NiPhOfVPEfBsYT/DSSjvqNZJZuWnxu+cDxE7J/sBvdp
 | 
			
		||||
eNRLhqzdmMYagZFuUmVrz8QmsD6jKHgydW+r7irllvb8WJPK/RIMif+4Rg7rDKFb
 | 
			
		||||
j8+ZQ3HZ/gKELoRSyb3zL6RC2qlGLjC1tdeN7TNTinCv092y39T8jIARJ7tpfePh
 | 
			
		||||
NBxsBdxfXbCAzHYZIHufI9Zlsc+felQwanlDhq+q8YLcnKHvNKYVyCf/upExpAiA
 | 
			
		||||
rr88y/KbeKes0KorKkwMBnGUMTothWM25wHozcurixNvP4UMWX7LWD7vOZZuNDQN
 | 
			
		||||
utZYeTwdsniI3mTO9vlPWEK8JTfxBU7x9SePUMJNDyjfDUJM8C2DOlyhGNPkgazO
 | 
			
		||||
GdliH87tHkEy/7jJnGclgKmciiVPgwHfFx9GGoBHEfvmAoGGrk4qNbjm7JECAwEA
 | 
			
		||||
AQKCAgBYzHe05ELFZfG6tYMWf08R9pbTbSqlfFOpIGrZNgJr1SUF0TDzq+3bCXpF
 | 
			
		||||
qtn4VAw1en/JZkOV8Gp1+Bm6jWymWtwyg/fr7pG1I+vf0dwpgMHLg7P2UX1IjXmd
 | 
			
		||||
S4a4oEuds69hJ+OLZFsdm0ATeM7ssGicOaBmqd1Pz7rCfnL1bxQtNVzVex1r/paG
 | 
			
		||||
o77YNr3HoKCwhCPaPM4aQ7sOWSMUhwYBZabaYX0eLShf1O2pkexlPO+tobPpSLmx
 | 
			
		||||
WzRYZ6QC0AGEq9hwT6KsfCFA5pmQtFllNY7suhpL1AsECLWAgoMNCyb1oW68NBpq
 | 
			
		||||
CiBK5WBPGH2MW+pE74Pu1P0gen6kLGnApKQjprE1aGuR+xkZe3uEnXwSryU9TXki
 | 
			
		||||
wINTEMsX8dkmofFqaJhUwSubrb+t7gvv9E9ZZe0X6UgKzAVVqvh4z1pP8VT+xHpu
 | 
			
		||||
pW7SR8n9cFddaEPUijSb1rSpJrNzfJJ+G7yrB7Cw2kBgQ07vzD3z/3kA9cwFevLS
 | 
			
		||||
mv3l3OQuB6y9c+AG3cX5WGAt/BVOLjimj9qJt+YglG0SwG31U0PUnnx6QVz/UtJm
 | 
			
		||||
CbJQ2TpJd+mk0HyuMU+eycp7BWF3PMN+SE4QgKCKWnhsLeAd3gcvifsbLOYE1OPg
 | 
			
		||||
wv1tqyJy0VsJiSn6Ub6Qq0kPLwCLlQTnLWk5mIhnRpHYufTSwQKCAQEA4gS4FKPU
 | 
			
		||||
tAcQ82dEYW4OjGfhNWrjFpF+A8K5zufleQWcgzQ3fQho13zH0vZobukfkEVlVxla
 | 
			
		||||
OIVk7ZgNA4mCSFrATjIx3RMqzrAUvTte0O4wkjYgCwVvTdS1W8nvRLKgugLygyoo
 | 
			
		||||
r+MLW5IT3eNMK/2fZbftNlAkbc7NCo3c2tS6MXFgjx5JUuzChOY73Kp4p5KS38L5
 | 
			
		||||
wRRiI8KTIKjBjMZ5q/l8VLKX89bKOCaWibmItoXY6QMbIjargb7YLp3X6uGEyGIu
 | 
			
		||||
VhPbQ80/+OC2ZqIvDecp4PYnJNZFeqfjyfhJCNqDjBKYwIscBLMU/Wf9OY258OR4
 | 
			
		||||
snQaerN1M0h9lQKCAQEA4LkZIRLLw+8bIVM+7VXxFwOAGy+MH35tvuNIToItAoUh
 | 
			
		||||
zjL5LG34PjID8J0DPyP8VRVanak1EcxF0aTEkvnt2f2RAVsW89ytcn8Lybb12Ae8
 | 
			
		||||
ia2ZWuIM+J40nuKOGPs3lJ9HqdPWmZYWsWKxFJmYBBnwD6CADYqhqambQn0HeaYl
 | 
			
		||||
/WUD7blLYg+4Kk1mt9/hIw93jTWP/86O2H0ia+AhYPTqyvVXfIXKhat6NlOYksGf
 | 
			
		||||
Hdv+aCC8Ukg6FyEgiNc/rFn0MWPnEX+cM1AwubviHIBhV8QWILLBTjupwsEBZVah
 | 
			
		||||
60ftH+HRUCmEeOpI7jyzIlfEUNLoBHfswKMhMPtcDQKCAQEA0JFkQX+xn/PJW6PX
 | 
			
		||||
AUWrXTvbIg0hw8i9DcFa76klJBnehWDhN5tUDE5Uo8PJOVgdTWgMjWSS0geezHX8
 | 
			
		||||
xF/XfudoAIDnbMfsP9FTQhCQfaLf5XzW8vSv8pWwSiS9jJp+IUjo+8siwrR03aqe
 | 
			
		||||
dKr0tr+ToS0qVG1+QGqO4gdpX/LgYxHp9ggPx9s94aAIa6hQMOrcaGqnSNqDedZr
 | 
			
		||||
KL8x5LOewek3J32rJVP3Rfut/SfeFfjL4rKADoF+oPs4yUPVZSV4/+VCNyKZuyaj
 | 
			
		||||
uwm6qFlPrLe9+J+OHbsxYG+fj9hzpRzoOZFLrppwX5HWc8XLcpnrlXVwP9VOPh5u
 | 
			
		||||
r8VcRQKCAQAJFHGHfJLvH8Ig3pQ0UryjCWkrsAghXaJhjB1nzqqy514uTrDysp7N
 | 
			
		||||
JIg0OKPg8TtI1MwMgsG6Ll7D0bx/k8mgfTZWr6+FuuznK2r2g4X7bJSZm4IOwgN0
 | 
			
		||||
KDBIGy9SoxPj1Wu32O9a1U2lbS9qfao+wC2K9Bk4ctmFWW0Eiri6mZP/YQ1/lXUO
 | 
			
		||||
SURPsUDtPQaDvCRAeGGRHG95H9U8NpoiqMKz4KXgSiecrwkJGOeZRml/c1wcKPZy
 | 
			
		||||
/KgcNyJxZQEVnazYMgksE9Pj3uGZH5ZLQISuXyXlvFNDLfX2AIZl6dIxB371QtKK
 | 
			
		||||
QqMvn4fC2IEEajdsbJkjVRUj03OL3xwhAoIBAAfMhDSvBbDkGTaXnNMjPPSbswqK
 | 
			
		||||
qcSRhSG27mjs1dDNBKuFbz6TkIOp4nxjuS9Zp19fErXlAE9mF5yXSmuiAkZmWfhs
 | 
			
		||||
HKpWIdjFJK1EqSfcINe2YuoyUIulz9oG7ObRHD4D8jSPjA8Ete+XsBHGyOtUl09u
 | 
			
		||||
X4u9uClhqjK+r1Tno2vw5yF6ZxfQtdWuL4W0UL1S8E+VO7vjTjNOYvgjAIpAM/gW
 | 
			
		||||
sqjA2Qw52UZqhhLXoTfRvtJilxlXXhIRJSsnUoGiYVCQ/upjqJCClEvJfIWdGY/U
 | 
			
		||||
I2CbFrwJcNvOG1lUsSM55JUmbrSWVPfo7yq2k9GCuFxOy2n/SVlvlQUcNkA=
 | 
			
		||||
-----END RSA PRIVATE KEY-----`)
 | 
			
		||||
 | 
			
		||||
var defaultRsaPubKey = []byte(`-----BEGIN PUBLIC KEY-----
 | 
			
		||||
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAxmeMqr9yfJFKZn26oe/H
 | 
			
		||||
vC7bZXNLC9Nk55AuTkb4XuIoqXDbAJD2Y/p167oJLKIqL3edcj7h+oTfn6s79vxT
 | 
			
		||||
0ZCEf37ILU0G+scRzVwYHiLMwOUCbS2o4Xor3zqUi9f1piJBvoBNh8RKKtsmJW6V
 | 
			
		||||
QZdiUGJHbgX4MdOdtf/6TvxZMwSXU+PRSCAjy04A31Zi7DEWUWJPyqmHeu++PxXU
 | 
			
		||||
5lvoMdCGDqpcF2j2uO7oJJUww01M3F5FtTElMrK4/P9gD4kP7NiPhOfVPEfBsYT/
 | 
			
		||||
DSSjvqNZJZuWnxu+cDxE7J/sBvdpeNRLhqzdmMYagZFuUmVrz8QmsD6jKHgydW+r
 | 
			
		||||
7irllvb8WJPK/RIMif+4Rg7rDKFbj8+ZQ3HZ/gKELoRSyb3zL6RC2qlGLjC1tdeN
 | 
			
		||||
7TNTinCv092y39T8jIARJ7tpfePhNBxsBdxfXbCAzHYZIHufI9Zlsc+felQwanlD
 | 
			
		||||
hq+q8YLcnKHvNKYVyCf/upExpAiArr88y/KbeKes0KorKkwMBnGUMTothWM25wHo
 | 
			
		||||
zcurixNvP4UMWX7LWD7vOZZuNDQNutZYeTwdsniI3mTO9vlPWEK8JTfxBU7x9SeP
 | 
			
		||||
UMJNDyjfDUJM8C2DOlyhGNPkgazOGdliH87tHkEy/7jJnGclgKmciiVPgwHfFx9G
 | 
			
		||||
GoBHEfvmAoGGrk4qNbjm7JECAwEAAQ==
 | 
			
		||||
-----END PUBLIC KEY-----`)
 | 
			
		||||
 | 
			
		||||
var defaultAesKey = []byte{0x19, 0x96, 0x11, 0x27, 228, 187, 187, 231, 142, 137, 230, 179, 189, 229, 184, 133}
 | 
			
		||||
 | 
			
		||||
func defaultMsgEn(key []byte, d []byte) []byte {
 | 
			
		||||
	data, err := starcrypto.CustomEncryptAesCFB(d, key)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Print(err)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return data
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func defaultMsgDe(key []byte, d []byte) []byte {
 | 
			
		||||
	data, err := starcrypto.CustomDecryptAesCFB(d, key)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Print(err)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return data
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	RegisterName("b612.me/notify.Transfer", TransferMsg{})
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										8
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								go.mod
									
									
									
									
									
								
							@ -1,8 +0,0 @@
 | 
			
		||||
module b612.me/notify
 | 
			
		||||
 | 
			
		||||
go 1.16
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	b612.me/starcrypto v0.0.5
 | 
			
		||||
	b612.me/stario v0.0.10
 | 
			
		||||
)
 | 
			
		||||
							
								
								
									
										75
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										75
									
								
								go.sum
									
									
									
									
									
								
							@ -1,75 +0,0 @@
 | 
			
		||||
b612.me/starcrypto v0.0.5 h1:Aa4pRDO2lBH2Aw+vz8NuUtRb73J8z5aOa9SImBY5sq4=
 | 
			
		||||
b612.me/starcrypto v0.0.5/go.mod h1:pF5A16p8r/h1G0x7ZNmmAF6K1sdIMpbCUxn2WGC8gZ0=
 | 
			
		||||
b612.me/stario v0.0.0-20240818091810-d528a583f4b2 h1:SxN1WDZsEBQFTnLaKbc7Z+91uyWhUB4cKHo5Ucztyh0=
 | 
			
		||||
b612.me/stario v0.0.0-20240818091810-d528a583f4b2/go.mod h1:1Owmu9jzKWgs4VsmeI8YWlGwLrCwPNM/bYpxkyn+MMk=
 | 
			
		||||
b612.me/stario v0.0.10/go.mod h1:1Owmu9jzKWgs4VsmeI8YWlGwLrCwPNM/bYpxkyn+MMk=
 | 
			
		||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 | 
			
		||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 | 
			
		||||
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
 | 
			
		||||
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
 | 
			
		||||
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
 | 
			
		||||
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
 | 
			
		||||
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
 | 
			
		||||
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
 | 
			
		||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
 | 
			
		||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 | 
			
		||||
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 | 
			
		||||
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
 | 
			
		||||
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
 | 
			
		||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 | 
			
		||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 | 
			
		||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 | 
			
		||||
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 | 
			
		||||
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
 | 
			
		||||
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
 | 
			
		||||
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
 | 
			
		||||
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
 | 
			
		||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 | 
			
		||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 | 
			
		||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 | 
			
		||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
 | 
			
		||||
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 | 
			
		||||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 | 
			
		||||
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 | 
			
		||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 | 
			
		||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 | 
			
		||||
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 | 
			
		||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 | 
			
		||||
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
 | 
			
		||||
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 | 
			
		||||
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
 | 
			
		||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 | 
			
		||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 | 
			
		||||
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
 | 
			
		||||
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
 | 
			
		||||
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
 | 
			
		||||
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
 | 
			
		||||
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
 | 
			
		||||
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
 | 
			
		||||
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
 | 
			
		||||
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
 | 
			
		||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 | 
			
		||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 | 
			
		||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 | 
			
		||||
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 | 
			
		||||
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
 | 
			
		||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
 | 
			
		||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
 | 
			
		||||
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
 | 
			
		||||
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
 | 
			
		||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 | 
			
		||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 | 
			
		||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
 | 
			
		||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
 | 
			
		||||
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
 | 
			
		||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
 | 
			
		||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 | 
			
		||||
							
								
								
									
										502
									
								
								msg.go
									
									
									
									
									
								
							
							
						
						
									
										502
									
								
								msg.go
									
									
									
									
									
								
							@ -1,502 +0,0 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"b612.me/starcrypto"
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net"
 | 
			
		||||
	"os"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	MSG_SYS MessageType = iota
 | 
			
		||||
	MSG_SYS_WAIT
 | 
			
		||||
	MSG_SYS_REPLY
 | 
			
		||||
	MSG_KEY_CHANGE
 | 
			
		||||
	MSG_ASYNC
 | 
			
		||||
	MSG_SYNC_ASK
 | 
			
		||||
	MSG_SYNC_REPLY
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type MessageType uint8
 | 
			
		||||
 | 
			
		||||
type NetType uint8
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	NET_SERVER NetType = iota
 | 
			
		||||
	NET_CLIENT
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type MsgVal []byte
 | 
			
		||||
type TransferMsg struct {
 | 
			
		||||
	ID    uint64
 | 
			
		||||
	Key   string
 | 
			
		||||
	Value MsgVal
 | 
			
		||||
	Type  MessageType
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Message struct {
 | 
			
		||||
	NetType
 | 
			
		||||
	ClientConn *ClientConn
 | 
			
		||||
	ServerConn Client
 | 
			
		||||
	TransferMsg
 | 
			
		||||
	Time time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type WaitMsg struct {
 | 
			
		||||
	TransferMsg
 | 
			
		||||
	Time  time.Time
 | 
			
		||||
	Reply chan Message
 | 
			
		||||
	//Ctx   context.Context
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *Message) Reply(value MsgVal) (err error) {
 | 
			
		||||
	reply := TransferMsg{
 | 
			
		||||
		ID:    m.ID,
 | 
			
		||||
		Key:   m.Key,
 | 
			
		||||
		Value: value,
 | 
			
		||||
		Type:  m.Type,
 | 
			
		||||
	}
 | 
			
		||||
	if reply.Type == MSG_SYNC_ASK {
 | 
			
		||||
		reply.Type = MSG_SYNC_REPLY
 | 
			
		||||
	}
 | 
			
		||||
	if reply.Type == MSG_SYS_WAIT {
 | 
			
		||||
		reply.Type = MSG_SYS_REPLY
 | 
			
		||||
	}
 | 
			
		||||
	if m.NetType == NET_SERVER {
 | 
			
		||||
		_, err = m.ClientConn.server.send(m.ClientConn, reply)
 | 
			
		||||
	}
 | 
			
		||||
	if m.NetType == NET_CLIENT {
 | 
			
		||||
		_, err = m.ServerConn.send(reply)
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *Message) ReplyObj(value interface{}) (err error) {
 | 
			
		||||
	data, err := encode(value)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return m.Reply(data)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ClientConn struct {
 | 
			
		||||
	alive           atomic.Value
 | 
			
		||||
	status          Status
 | 
			
		||||
	ClientID        string
 | 
			
		||||
	ClientAddr      net.Addr
 | 
			
		||||
	tuConn          net.Conn
 | 
			
		||||
	server          Server
 | 
			
		||||
	stopFn          context.CancelFunc
 | 
			
		||||
	stopCtx         context.Context
 | 
			
		||||
	maxReadTimeout  time.Duration
 | 
			
		||||
	maxWriteTimeout time.Duration
 | 
			
		||||
	msgEn           func([]byte, []byte) []byte
 | 
			
		||||
	msgDe           func([]byte, []byte) []byte
 | 
			
		||||
	handshakeRsaKey []byte
 | 
			
		||||
	SecretKey       []byte
 | 
			
		||||
	lastHeartBeat   int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Status struct {
 | 
			
		||||
	Alive  bool
 | 
			
		||||
	Reason string
 | 
			
		||||
	Err    error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) readTUMessage() {
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-c.stopCtx.Done():
 | 
			
		||||
			c.tuConn.Close()
 | 
			
		||||
			c.server.removeClient(c)
 | 
			
		||||
			return
 | 
			
		||||
		default:
 | 
			
		||||
		}
 | 
			
		||||
		if c.maxReadTimeout.Seconds() > 0 {
 | 
			
		||||
			c.tuConn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
 | 
			
		||||
		}
 | 
			
		||||
		data := make([]byte, 8192)
 | 
			
		||||
		num, err := c.tuConn.Read(data)
 | 
			
		||||
		if err == os.ErrDeadlineExceeded {
 | 
			
		||||
			if num != 0 {
 | 
			
		||||
				c.server.pushMessage(data[:num], c.ClientID)
 | 
			
		||||
			}
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			//conn is broke
 | 
			
		||||
			c.alive.Store(false)
 | 
			
		||||
			c.status = Status{
 | 
			
		||||
				Alive:  false,
 | 
			
		||||
				Reason: "read error",
 | 
			
		||||
				Err:    err,
 | 
			
		||||
			}
 | 
			
		||||
			c.stopFn()
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		c.server.pushMessage(data[:num], c.ClientID)
 | 
			
		||||
		//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) rsaDecode(message Message) {
 | 
			
		||||
	privKey, err := starcrypto.DecodeRsaPrivateKey(c.handshakeRsaKey, "")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		message.Reply([]byte("failed"))
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	data, err := starcrypto.RSADecrypt(privKey, message.Value)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		message.Reply([]byte("failed"))
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	//fmt.Println("aes-key changed to", string(data))
 | 
			
		||||
	message.Reply([]byte("success"))
 | 
			
		||||
	c.SecretKey = data
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) sayGoodByeForTU() error {
 | 
			
		||||
	_, err := c.server.sendWait(c, TransferMsg{
 | 
			
		||||
		ID:    10010,
 | 
			
		||||
		Key:   "bye",
 | 
			
		||||
		Value: nil,
 | 
			
		||||
		Type:  MSG_SYS_WAIT,
 | 
			
		||||
	}, time.Second*3)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) GetSecretKey() []byte {
 | 
			
		||||
	return c.SecretKey
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientConn) SetSecretKey(key []byte) {
 | 
			
		||||
	c.SecretKey = key
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) GetMsgEn() func([]byte, []byte) []byte {
 | 
			
		||||
	return c.msgEn
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientConn) SetMsgEn(fn func([]byte, []byte) []byte) {
 | 
			
		||||
	c.msgEn = fn
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientConn) GetMsgDe() func([]byte, []byte) []byte {
 | 
			
		||||
	return c.msgDe
 | 
			
		||||
}
 | 
			
		||||
func (c *ClientConn) SetMsgDe(fn func([]byte, []byte) []byte) {
 | 
			
		||||
	c.msgDe = fn
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) StopMonitorChan() <-chan struct{} {
 | 
			
		||||
	return c.stopCtx.Done()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) Status() Status {
 | 
			
		||||
	return c.status
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) Server() Server {
 | 
			
		||||
	return c.server
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClientConn) GetRemoteAddr() net.Addr {
 | 
			
		||||
	return c.ClientAddr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToClearString() string {
 | 
			
		||||
	return string(m)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToInterface() (interface{}, error) {
 | 
			
		||||
	return Decode(m)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToInterface() interface{} {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToString() (string, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(string); !ok {
 | 
			
		||||
		return "", errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToString() string {
 | 
			
		||||
	inf, err := m.ToString()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToInt32() (int32, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(int32); !ok {
 | 
			
		||||
		return 0, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToInt32() int32 {
 | 
			
		||||
	inf, err := m.ToInt32()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToInt() (int, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(int); !ok {
 | 
			
		||||
		return 0, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToInt() int {
 | 
			
		||||
	inf, err := m.ToInt()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToUint64() (uint64, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(uint64); !ok {
 | 
			
		||||
		return 0, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToUint64() uint64 {
 | 
			
		||||
	inf, err := m.ToUint64()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToUint32() (uint32, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(uint32); !ok {
 | 
			
		||||
		return 0, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToUint32() uint32 {
 | 
			
		||||
	inf, err := m.ToUint32()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToUint() (uint, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(uint); !ok {
 | 
			
		||||
		return 0, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToUint() uint {
 | 
			
		||||
	inf, err := m.ToUint()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToBool() (bool, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return false, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(bool); !ok {
 | 
			
		||||
		return false, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToBool() bool {
 | 
			
		||||
	inf, err := m.ToBool()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToFloat64() (float64, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(float64); !ok {
 | 
			
		||||
		return 0, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToFloat64() float64 {
 | 
			
		||||
	inf, err := m.ToFloat64()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
func (m MsgVal) ToFloat32() (float32, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.(float32); !ok {
 | 
			
		||||
		return 0, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToFloat32() float32 {
 | 
			
		||||
	inf, err := m.ToFloat32()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToSliceString() ([]string, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return []string{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.([]string); !ok {
 | 
			
		||||
		return []string{}, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToSliceString() []string {
 | 
			
		||||
	inf, err := m.ToSliceString()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToSliceInt64() ([]int64, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return []int64{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.([]int64); !ok {
 | 
			
		||||
		return []int64{}, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToSliceInt64() []int64 {
 | 
			
		||||
	inf, err := m.ToSliceInt64()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) ToSliceFloat64() ([]float64, error) {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return []float64{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if data, ok := inf.([]float64); !ok {
 | 
			
		||||
		return []float64{}, errors.New("source data not match target type")
 | 
			
		||||
	} else {
 | 
			
		||||
		return data, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) MustToSliceFloat64() []float64 {
 | 
			
		||||
	inf, err := m.ToSliceFloat64()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return inf
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ToMsgVal(val interface{}) (MsgVal, error) {
 | 
			
		||||
	return Encode(val)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func MustToMsgVal(val interface{}) MsgVal {
 | 
			
		||||
	d, err := ToMsgVal(val)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return d
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m MsgVal) Orm(stu interface{}) error {
 | 
			
		||||
	inf, err := m.ToInterface()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	t := reflect.TypeOf(stu)
 | 
			
		||||
	if t.Kind() != reflect.Ptr {
 | 
			
		||||
		return errors.New("interface not writable(pointer wanted)")
 | 
			
		||||
	}
 | 
			
		||||
	if !reflect.ValueOf(stu).Elem().CanSet() {
 | 
			
		||||
		return errors.New("interface not writable")
 | 
			
		||||
	}
 | 
			
		||||
	it := reflect.TypeOf(inf)
 | 
			
		||||
	if t.Elem().Kind() != it.Kind() {
 | 
			
		||||
		return fmt.Errorf("interface{} kind is %v,not %v", t.Elem().Kind(), it.Kind())
 | 
			
		||||
	}
 | 
			
		||||
	if t.Elem().Name() != it.Name() {
 | 
			
		||||
		return fmt.Errorf("interface{} name is %v,not %v", t.Elem().Name(), it.Name())
 | 
			
		||||
	}
 | 
			
		||||
	if t.Elem().String() != it.String() {
 | 
			
		||||
		return fmt.Errorf("interface{} string is %v,not %v", t.Elem().String(), it.String())
 | 
			
		||||
	}
 | 
			
		||||
	reflect.ValueOf(stu).Elem().Set(reflect.ValueOf(inf))
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										96
									
								
								msg_test.go
									
									
									
									
									
								
							
							
						
						
									
										96
									
								
								msg_test.go
									
									
									
									
									
								
							@ -1,96 +0,0 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestMsgEnDeCode(t *testing.T) {
 | 
			
		||||
	Register(HelloMessage{})
 | 
			
		||||
	Register(Error{})
 | 
			
		||||
	go ServerRun(time.Second * 30)
 | 
			
		||||
	time.Sleep(time.Second * 2)
 | 
			
		||||
	ClientRun(time.Second * 35)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Error struct {
 | 
			
		||||
	Msg string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (e Error) Error() string {
 | 
			
		||||
	return e.Msg
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type WorldMessage struct {
 | 
			
		||||
	Port   int
 | 
			
		||||
	MyCode string
 | 
			
		||||
	MyInfo []int
 | 
			
		||||
	Err    error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type HelloMessage struct {
 | 
			
		||||
	ID      string
 | 
			
		||||
	MyMap   map[string]string
 | 
			
		||||
	MySlice []int
 | 
			
		||||
	World   WorldMessage
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ClientRun(stopTime time.Duration) {
 | 
			
		||||
	c := NewClient()
 | 
			
		||||
	err := c.Connect("tcp", "127.0.0.1:23456")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	c.SetLink("msg", func(msg *Message) {
 | 
			
		||||
		var hi HelloMessage
 | 
			
		||||
		err := msg.Value.Orm(&hi)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		fmt.Printf("recv info from server,struct detail is %+v\n", hi)
 | 
			
		||||
	})
 | 
			
		||||
	timer := time.NewTimer(stopTime)
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-timer.C:
 | 
			
		||||
			c.Stop()
 | 
			
		||||
			return
 | 
			
		||||
		case <-time.After(time.Second * 2):
 | 
			
		||||
			fmt.Println("client msg sent", c.SendObj("msg", HelloMessage{
 | 
			
		||||
				ID:      "client",
 | 
			
		||||
				MyMap:   map[string]string{"hello": "world"},
 | 
			
		||||
				MySlice: []int{int(time.Now().Unix())},
 | 
			
		||||
				World: WorldMessage{
 | 
			
		||||
					Port:   520,
 | 
			
		||||
					MyCode: "b612",
 | 
			
		||||
					MyInfo: []int{0, 1, 2, 3},
 | 
			
		||||
					Err:    Error{Msg: "Hello World"},
 | 
			
		||||
				},
 | 
			
		||||
			}))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ServerRun(stopTime time.Duration) {
 | 
			
		||||
	s := NewServer()
 | 
			
		||||
	err := s.Listen("tcp", "127.0.0.1:23456")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	s.SetLink("msg", func(msg *Message) {
 | 
			
		||||
		var hi HelloMessage
 | 
			
		||||
		err := msg.Value.Orm(&hi)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		fmt.Printf("recv info from client:%v,struct detail is %+v\n", msg.ClientConn.GetRemoteAddr(), hi)
 | 
			
		||||
		hi.ID = "server"
 | 
			
		||||
		hi.World.Port = 666
 | 
			
		||||
		hi.MySlice = append(hi.MySlice, 1, 1, 2, 7)
 | 
			
		||||
		msg.ReplyObj(hi)
 | 
			
		||||
	})
 | 
			
		||||
	<-time.After(stopTime)
 | 
			
		||||
	s.Stop()
 | 
			
		||||
}
 | 
			
		||||
@ -1,44 +0,0 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"encoding/gob"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Register(data interface{}) {
 | 
			
		||||
	gob.Register(data)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterName(name string, data interface{}) {
 | 
			
		||||
	gob.RegisterName(name, data)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterAll(data []interface{}) {
 | 
			
		||||
	for _, v := range data {
 | 
			
		||||
		gob.Register(v)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterNames(data map[string]interface{}) {
 | 
			
		||||
	for k, v := range data {
 | 
			
		||||
		gob.RegisterName(k, v)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func encode(src interface{}) ([]byte, error) {
 | 
			
		||||
	var buf bytes.Buffer
 | 
			
		||||
	enc := gob.NewEncoder(&buf)
 | 
			
		||||
	err := enc.Encode(&src)
 | 
			
		||||
	return buf.Bytes(), err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Encode(src interface{}) ([]byte, error) {
 | 
			
		||||
	return encode(src)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Decode(src []byte) (interface{}, error) {
 | 
			
		||||
	dec := gob.NewDecoder(bytes.NewReader(src))
 | 
			
		||||
	var dst interface{}
 | 
			
		||||
	err := dec.Decode(&dst)
 | 
			
		||||
	return dst, err
 | 
			
		||||
}
 | 
			
		||||
@ -1,49 +0,0 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"net"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Server interface {
 | 
			
		||||
	SetDefaultCommEncode(func([]byte, []byte) []byte)
 | 
			
		||||
	SetDefaultCommDecode(func([]byte, []byte) []byte)
 | 
			
		||||
	SetDefaultLink(func(message *Message))
 | 
			
		||||
	SetLink(string, func(*Message))
 | 
			
		||||
	send(c *ClientConn, msg TransferMsg) (WaitMsg, error)
 | 
			
		||||
	sendWait(c *ClientConn, msg TransferMsg, timeout time.Duration) (Message, error)
 | 
			
		||||
	SendObjCtx(ctx context.Context, c *ClientConn, key string, val interface{}) (Message, error)
 | 
			
		||||
	SendObj(c *ClientConn, key string, val interface{}) error
 | 
			
		||||
	Send(c *ClientConn, key string, value MsgVal) error
 | 
			
		||||
	SendWait(c *ClientConn, key string, value MsgVal, timeout time.Duration) (Message, error)
 | 
			
		||||
	SendWaitObj(c *ClientConn, key string, value interface{}, timeout time.Duration) (Message, error)
 | 
			
		||||
	SendCtx(ctx context.Context, c *ClientConn, key string, value MsgVal) (Message, error)
 | 
			
		||||
	Reply(m Message, value MsgVal) error
 | 
			
		||||
	pushMessage([]byte, string)
 | 
			
		||||
	removeClient(client *ClientConn)
 | 
			
		||||
	Listen(network string, addr string) error
 | 
			
		||||
	Stop() error
 | 
			
		||||
	StopMonitorChan() <-chan struct{}
 | 
			
		||||
	Status() Status
 | 
			
		||||
 | 
			
		||||
	GetSecretKey() []byte
 | 
			
		||||
	SetSecretKey(key []byte)
 | 
			
		||||
	RsaPrivKey() []byte
 | 
			
		||||
	SetRsaPrivKey([]byte)
 | 
			
		||||
 | 
			
		||||
	GetClient(id string) *ClientConn
 | 
			
		||||
	GetClientLists() []*ClientConn
 | 
			
		||||
	GetClientAddrs() []net.Addr
 | 
			
		||||
 | 
			
		||||
	GetSequenceEn() func(interface{}) ([]byte, error)
 | 
			
		||||
	SetSequenceEn(func(interface{}) ([]byte, error))
 | 
			
		||||
	GetSequenceDe() func([]byte) (interface{}, error)
 | 
			
		||||
	SetSequenceDe(func([]byte) (interface{}, error))
 | 
			
		||||
	ShowError(bool)
 | 
			
		||||
	DebugMode(bool)
 | 
			
		||||
	IsDebugMode() bool
 | 
			
		||||
 | 
			
		||||
	HeartbeatTimeoutSec() int64
 | 
			
		||||
	SetHeartbeatTimeoutSec(int64)
 | 
			
		||||
}
 | 
			
		||||
@ -1,109 +0,0 @@
 | 
			
		||||
package starnotify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"b612.me/notify"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	cmu        sync.RWMutex
 | 
			
		||||
	smu        sync.RWMutex
 | 
			
		||||
	starClient map[string]notify.Client
 | 
			
		||||
	starServer map[string]notify.Server
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	starClient = make(map[string]notify.Client)
 | 
			
		||||
	starServer = make(map[string]notify.Server)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewClient(key string) notify.Client {
 | 
			
		||||
	client := notify.NewClient()
 | 
			
		||||
	cmu.Lock()
 | 
			
		||||
	starClient[key] = client
 | 
			
		||||
	cmu.Unlock()
 | 
			
		||||
	return client
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DeleteClient(key string) (err error) {
 | 
			
		||||
	cmu.RLock()
 | 
			
		||||
	client, ok := starClient[key]
 | 
			
		||||
	cmu.RUnlock()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return errors.New("Not Exists Yet!")
 | 
			
		||||
	}
 | 
			
		||||
	if client.Status().Alive {
 | 
			
		||||
		err = client.Stop()
 | 
			
		||||
	}
 | 
			
		||||
	client = nil
 | 
			
		||||
	cmu.Lock()
 | 
			
		||||
	delete(starClient, key)
 | 
			
		||||
	cmu.Unlock()
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewServer(key string) notify.Server {
 | 
			
		||||
	server := notify.NewServer()
 | 
			
		||||
	smu.Lock()
 | 
			
		||||
	starServer[key] = server
 | 
			
		||||
	smu.Unlock()
 | 
			
		||||
	return server
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DeleteServer(key string) error {
 | 
			
		||||
	smu.RLock()
 | 
			
		||||
	server, ok := starServer[key]
 | 
			
		||||
	smu.RUnlock()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return errors.New("Not Exists Yet!")
 | 
			
		||||
	}
 | 
			
		||||
	if server.Status().Alive {
 | 
			
		||||
		server.Stop()
 | 
			
		||||
	}
 | 
			
		||||
	server = nil
 | 
			
		||||
	smu.Lock()
 | 
			
		||||
	delete(starServer, key)
 | 
			
		||||
	smu.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func S(key string) notify.Server {
 | 
			
		||||
	smu.RLock()
 | 
			
		||||
	server, ok := starServer[key]
 | 
			
		||||
	smu.RUnlock()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return server
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func C(key string) notify.Client {
 | 
			
		||||
	cmu.RLock()
 | 
			
		||||
	client, ok := starClient[key]
 | 
			
		||||
	cmu.RUnlock()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return client
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Server(key string) (notify.Server, error) {
 | 
			
		||||
	smu.RLock()
 | 
			
		||||
	server, ok := starServer[key]
 | 
			
		||||
	smu.RUnlock()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil, errors.New("Not Exists Yet")
 | 
			
		||||
	}
 | 
			
		||||
	return server, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Client(key string) (notify.Client, error) {
 | 
			
		||||
	cmu.RLock()
 | 
			
		||||
	client, ok := starClient[key]
 | 
			
		||||
	cmu.RUnlock()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil, errors.New("Not Exists Yet")
 | 
			
		||||
	}
 | 
			
		||||
	return client, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										152
									
								
								v2cs_test.go
									
									
									
									
									
								
							
							
						
						
									
										152
									
								
								v2cs_test.go
									
									
									
									
									
								
							@ -1,152 +0,0 @@
 | 
			
		||||
package notify
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net"
 | 
			
		||||
	//_ "net/http/pprof"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Test_ServerTuAndClientCommon(t *testing.T) {
 | 
			
		||||
	//go http.ListenAndServe("0.0.0.0:8888", nil)
 | 
			
		||||
	noEn := func(key, bn []byte) []byte {
 | 
			
		||||
		return bn
 | 
			
		||||
	}
 | 
			
		||||
	_ = noEn
 | 
			
		||||
	server := NewServer()
 | 
			
		||||
	//server.SetDefaultCommDecode(noEn)
 | 
			
		||||
	//server.SetDefaultCommEncode(noEn)
 | 
			
		||||
	err := server.Listen("tcp", "127.0.0.1:12345")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	server.SetLink("notify", notify)
 | 
			
		||||
	for i := 1; i <= 100; i++ {
 | 
			
		||||
		go func() {
 | 
			
		||||
			client := NewClient()
 | 
			
		||||
			//client.SetMsgEn(noEn)
 | 
			
		||||
			//client.SetMsgDe(noEn)
 | 
			
		||||
			//client.SetSkipExchangeKey(true)
 | 
			
		||||
			err = client.Connect("tcp", "127.0.0.1:12345")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
				time.Sleep(time.Second * 2)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			//client.SetLink("notify", notify)
 | 
			
		||||
			for {
 | 
			
		||||
 | 
			
		||||
				//nowd = time.Now().UnixNano()
 | 
			
		||||
				client.SendWait("notify", []byte("client hello"), time.Second*15)
 | 
			
		||||
				//client.Send("notify", []byte("client hello"))
 | 
			
		||||
				//time.Sleep(time.Millisecond)
 | 
			
		||||
				//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
 | 
			
		||||
				//client.Send("notify", []byte("client"))
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	time.Sleep(time.Second)
 | 
			
		||||
	go func() {
 | 
			
		||||
		time.Sleep(time.Second * 10)
 | 
			
		||||
		server.Stop()
 | 
			
		||||
	}()
 | 
			
		||||
	<-server.StopMonitorChan()
 | 
			
		||||
	fmt.Println(count2)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var count2 int64
 | 
			
		||||
 | 
			
		||||
func notify(msg *Message) {
 | 
			
		||||
	//fmt.Println(string(msg.Msg.Value))
 | 
			
		||||
	//fmt.Println("called:", float64(time.Now().UnixNano()-nowd)/1000000)
 | 
			
		||||
	if msg.NetType == NET_SERVER {
 | 
			
		||||
		atomic.AddInt64(&count2, 1)
 | 
			
		||||
		msg.Reply([]byte("server reply"))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_normal(t *testing.T) {
 | 
			
		||||
	server, err := net.Listen("tcp", "127.0.0.1:12345")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			conn, err := server.Accept()
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				go func() {
 | 
			
		||||
					for {
 | 
			
		||||
						buf := make([]byte, 1024)
 | 
			
		||||
						_, err := conn.Read(buf)
 | 
			
		||||
						//fmt.Println("S RECV", string(buf[:i]))
 | 
			
		||||
						atomic.AddInt64(&count2, 1)
 | 
			
		||||
						if err == nil {
 | 
			
		||||
							conn.Write([]byte("hello world server"))
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}()
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	time.Sleep(time.Second * 5)
 | 
			
		||||
	for i := 1; i <= 100; i++ {
 | 
			
		||||
		go func() {
 | 
			
		||||
			conn, err := net.Dial("tcp", "127.0.0.1:12345")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
			for {
 | 
			
		||||
				//nowd = time.Now().UnixNano()
 | 
			
		||||
				_, err := conn.Write([]byte("hello world client"))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					fmt.Println(err)
 | 
			
		||||
				}
 | 
			
		||||
				buf := make([]byte, 1024)
 | 
			
		||||
				conn.Read(buf)
 | 
			
		||||
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	time.Sleep(time.Second * 10)
 | 
			
		||||
	fmt.Println(count2)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_normal_udp(t *testing.T) {
 | 
			
		||||
	ludp, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12345")
 | 
			
		||||
	conn, _ := net.ListenUDP("udp", ludp)
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			buf := make([]byte, 1024)
 | 
			
		||||
			_, addr, err := conn.ReadFromUDP(buf)
 | 
			
		||||
			fmt.Println(time.Now(), "S RECV", addr.String())
 | 
			
		||||
			atomic.AddInt64(&count2, 1)
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				conn.WriteToUDP([]byte("hello world server"), addr)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	for i := 1; i <= 100; i++ {
 | 
			
		||||
		go func() {
 | 
			
		||||
			conn, err := net.Dial("udp", "127.0.0.1:12345")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
			for {
 | 
			
		||||
				//nowd = time.Now().UnixNano()
 | 
			
		||||
				_, err := conn.Write([]byte("hello world client"))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					fmt.Println(err)
 | 
			
		||||
				}
 | 
			
		||||
				buf := make([]byte, 1024)
 | 
			
		||||
				conn.Read(buf)
 | 
			
		||||
				fmt.Println(time.Now(), "C RECV")
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	time.Sleep(time.Second * 10)
 | 
			
		||||
	fmt.Println(count2)
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user