You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
notify/client.go

164 lines
3.6 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package notify
import (
"net"
"strings"
"time"
"b612.me/starainrt"
)
// StarNotifyC 为Client端
type StarNotifyC struct {
connc net.Conn
clientSign map[string]chan string
// FuncLists 当不使用channel时使用此记录调用函数
FuncLists map[string]func(CMsg)
clientStopSign chan int
defaultFunc func(CMsg)
// Stop 停止信号
Stop chan int
// UseChannel 是否使用channel作为信息传递
UseChannel bool
notifychan chan int
isUDP bool
// Queue 是用来处理收发信息的简单消息队列
Queue *starainrt.StarQueue
}
// CMsg 指明当前客户端被通知的关键字
type CMsg struct {
Key string
Value string
}
func (star *StarNotifyC) starinitc() {
star.Queue = starainrt.NewQueue()
star.FuncLists = make(map[string]func(CMsg))
star.UseChannel = true
star.clientStopSign, star.notifychan, star.Stop = make(chan int, 1), make(chan int, 3), make(chan int, 5)
star.clientSign = make(map[string]chan string)
}
// Notify 用于获取一个通知
func (star *StarNotifyC) Notify(key string) chan string {
if _, ok := star.clientSign[key]; !ok {
ch := make(chan string, 20)
star.clientSign[key] = ch
}
return star.clientSign[key]
}
func (star *StarNotifyC) store(key, value string) {
if _, ok := star.clientSign[key]; !ok {
ch := make(chan string, 20)
ch <- value
star.clientSign[key] = ch
return
}
star.clientSign[key] <- value
}
// NewNotifyC 用于新建一个Client端进程
func NewNotifyC(netype, value string) (*StarNotifyC, error) {
var err error
var star StarNotifyC
star.starinitc()
star.isUDP = false
if strings.Index(netype, "udp") >= 0 {
star.isUDP = true
}
star.connc, err = net.Dial(netype, value)
if err != nil {
return nil, err
}
go star.cnotify()
go func() {
for {
go func() {
<-star.clientStopSign
star.notifychan <- 1
star.connc.Close()
star.Stop <- 1
return
}()
buf := make([]byte, 8192)
n, err := star.connc.Read(buf)
star.Queue.ParseMessage(buf[0:n], star.connc)
if err != nil {
star.connc.Close()
star.Stop <- 1
star.notifychan <- 0
//star, _ = NewNotifyC(netype, value)
return
}
}
}()
return &star, nil
}
// Send 用于向Server端发送数据
func (star *StarNotifyC) Send(name string) error {
_, err := star.connc.Write(star.Queue.BuildMessage(name))
return err
}
// SendValue 用于向Server端发送key-value类型数据
func (star *StarNotifyC) SendValue(name, value string) error {
_, err := star.connc.Write(star.Queue.BuildMessage(name + "||" + value))
return err
}
func (star *StarNotifyC) cnotify() {
for {
select {
case <-star.notifychan:
return
default:
}
data, err := star.Queue.RestoreOne()
if err != nil {
time.Sleep(time.Millisecond * 20)
continue
}
if data.Msg == "b612ryzstop" {
star.clientStopSign <- 0
return
}
strs := strings.SplitN(data.Msg, "||", 2)
if len(strs) < 2 {
continue
}
if star.UseChannel {
go star.store(strs[0], strs[1])
} else {
key, value := strs[0], strs[1]
if msg, ok := star.FuncLists[key]; ok {
go msg(CMsg{key, value})
} else {
if star.defaultFunc != nil {
go star.defaultFunc(CMsg{key, value})
}
}
}
}
}
// ClientStop 终止client端运行
func (star *StarNotifyC) ClientStop() {
if star.isUDP {
star.Send("b612ryzstop")
}
star.clientStopSign <- 0
}
// SetNotify 用于设置关键词的调用函数
func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) {
star.FuncLists[name] = data
}
// SetDefaultNotify 用于设置默认关键词的调用函数
func (star *StarNotifyC) SetDefaultNotify(name string, data func(CMsg)) {
star.defaultFunc = data
}