commit 2cf31950e2b33538221143acd0027b4dae2ad49a Author: ren yuze Date: Thu Nov 14 10:44:19 2019 +0800 init diff --git a/client.go b/client.go new file mode 100644 index 0000000..3d6642c --- /dev/null +++ b/client.go @@ -0,0 +1,81 @@ +package notify + +import ( + "net" + "strings" + "time" +) + +var connc net.Conn + +var clientSign map[string]chan string + +func init() { + clientSign = make(map[string]chan string) +} + +func Notify(key string) chan string { + if _, ok := clientSign[key]; !ok { + ch := make(chan string, 5) + clientSign[key] = ch + } + return clientSign[key] +} + +func store(key, value string) { + if _, ok := clientSign[key]; !ok { + ch := make(chan string, 5) + ch <- value + clientSign[key] = ch + return + } + clientSign[key] <- value +} + +func NewNotifyC(netype, value string) error { + var err error + connc, err = net.Dial(netype, value) + if err != nil { + return err + } + go cnotify() + go func() { + for { + buf := make([]byte, 8192) + n, err := connc.Read(buf) + Queue.ParseMessage(buf[0:n], connc) + if err != nil { + connc.Close() + notifychan <- 0 + go NewNotifyC(netype, value) + break + } + } + }() + return nil +} + +func Send(name string) error { + _, err := connc.Write(Queue.BuildMessage(name)) + return err +} + +func cnotify() { + for { + select { + case <-notifychan: + break + default: + } + data, err := Queue.RestoreOne() + if err != nil { + time.Sleep(time.Millisecond * 20) + continue + } + strs := strings.SplitN(data.Msg, "||", 2) + if len(strs) < 2 { + continue + } + go store(strs[0], strs[1]) + } +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..b69a839 --- /dev/null +++ b/server.go @@ -0,0 +1,101 @@ +package notify + +import ( + "net" + "time" + + "b612.me/starainrt" +) + +var Queue *starainrt.StarQueue +var FuncLists map[string]func(NetMsg) string +var serverStopSign chan int +var notifychan chan int + +type NetMsg struct { + Conn net.Conn + key string +} + +func (this *NetMsg) Send(msg string) error { + _, err := this.Conn.Write(Queue.BuildMessage(this.key + "||" + msg)) + return err +} + +func init() { + serverStopSign, notifychan = make(chan int), make(chan int) + Queue = starainrt.NewQueue() + FuncLists = make(map[string]func(NetMsg) string) +} +func NewNotifyS(netype, value string) error { + listener, err := net.Listen(netype, value) + if err == nil { + go notify() + go func() { + for { + select { + case <-serverStopSign: + listener.Close() + break + default: + } + conn, err := listener.Accept() + if err != nil { + continue + } + go func(conn net.Conn) { + for { + select { + case <-serverStopSign: + break + default: + } + buf := make([]byte, 8192) + n, err := conn.Read(buf) + if n != 0 { + Queue.ParseMessage(buf[0:n], conn) + } + if err != nil { + conn.Close() + break + } + } + }(conn) + } + }() + } + return err +} + +func SetNotify(name string, data func(NetMsg) string) { + FuncLists[name] = data +} + +func notify() { + for { + select { + case <-serverStopSign: + break + case <-notifychan: + break + default: + } + data, err := Queue.RestoreOne() + if err != nil { + time.Sleep(time.Millisecond * 20) + continue + } + if msg, ok := FuncLists[data.Msg]; ok { + sdata := msg(NetMsg{data.Conn.(net.Conn), data.Msg}) + if sdata == "" { + continue + } + sdata = data.Msg + "||" + sdata + data.Conn.(net.Conn).Write(Queue.BuildMessage(sdata)) + } + } +} + +func ServerStop() { + serverStopSign <- 0 +}