package notify import ( "context" "net" "strings" "time" "b612.me/starainrt" ) // StarNotifyC 为Client端 type StarNotifyC struct { Connc net.Conn clientSign map[string]chan string // FuncLists 当不使用channel时,使用此记录调用函数 FuncLists map[string]func(CMsg) stopSign context.Context cancel context.CancelFunc defaultFunc func(CMsg) // Stop 停止信 号 Stop chan int // UseChannel 是否使用channel作为信息传递 UseChannel bool isUDP bool // Queue 是用来处理收发信息的简单消息队列 Queue *starainrt.StarQueue // Online 当前链接是否处于活跃状态 Online bool } // CMsg 指明当前客户端被通知的关键字 type CMsg struct { Key string Value string } func (star *StarNotifyC) starinitc() { star.stopSign, star.cancel = context.WithCancel(context.Background()) star.Queue = starainrt.NewQueue() star.FuncLists = make(map[string]func(CMsg)) star.UseChannel = true star.Stop = make(chan int, 5) star.clientSign = make(map[string]chan string) star.Online = false star.Queue.RestoreDuration(time.Second * 2) } // Notify 用于获取一个通知 func (star *StarNotifyC) Notify(key string) chan string { if _, ok := star.clientSign[key]; !ok { ch := make(chan string, 20) star.clientSign[key] = ch } return star.clientSign[key] } func (star *StarNotifyC) store(key, value string) { if _, ok := star.clientSign[key]; !ok { ch := make(chan string, 20) ch <- value star.clientSign[key] = ch return } star.clientSign[key] <- value } // NewNotifyC 用于新建一个Client端进程 func NewNotifyC(netype, value string) (*StarNotifyC, error) { var err error var star StarNotifyC star.starinitc() star.isUDP = false if strings.Index(netype, "udp") >= 0 { star.isUDP = true } star.Connc, err = net.Dial(netype, value) if err != nil { return nil, err } go star.cnotify() go func() { <-star.stopSign.Done() star.Connc.Close() star.Online = false return }() go func() { for { buf := make([]byte, 8192) n, err := star.Connc.Read(buf) if n != 0 { star.Queue.ParseMessage(buf[0:n], star.Connc) } if err != nil { star.Connc.Close() star.ClientStop() //star, _ = NewNotifyC(netype, value) star.Online = false return } } }() star.Online = true return &star, nil } // Send 用于向Server端发送数据 func (star *StarNotifyC) Send(name string) error { _, err := star.Connc.Write(star.Queue.BuildMessage([]byte(name))) return err } // SendValue 用于向Server端发送key-value类型数据 func (star *StarNotifyC) SendValue(name, value string) error { _, err := star.Connc.Write(star.Queue.BuildMessage([]byte(name + "||" + value))) return err } func (star *StarNotifyC) cnotify() { for { select { case <-star.stopSign.Done(): return default: } data, err := star.Queue.RestoreOne() if err != nil { time.Sleep(time.Millisecond * 20) continue } if string(data.Msg) == "b612ryzstop" { star.ClientStop() star.Online = false return } strs := strings.SplitN(string(data.Msg), "||", 2) if len(strs) < 2 { continue } if star.UseChannel { go star.store(strs[0], strs[1]) } else { key, value := strs[0], strs[1] if msg, ok := star.FuncLists[key]; ok { go msg(CMsg{key, value}) } else { if star.defaultFunc != nil { go star.defaultFunc(CMsg{key, value}) } } } } } // ClientStop 终止client端运行 func (star *StarNotifyC) ClientStop() { if star.isUDP { star.Send("b612ryzstop") } star.cancel() star.Stop <- 1 } // SetNotify 用于设置关键词的调用函数 func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) { star.FuncLists[name] = data } // SetDefaultNotify 用于设置默认关键词的调用函数 func (star *StarNotifyC) SetDefaultNotify(name string, data func(CMsg)) { star.defaultFunc = data }