package notify import ( "net" "strings" "time" "b612.me/starainrt" ) // StarNotifyC 为Client端 type StarNotifyC struct { connc net.Conn clientSign map[string]chan string // FuncLists 当不使用channel时,使用此记录调用函数 FuncLists map[string]func(CMsg) clientStopSign chan int defaultFunc func(CMsg) // Stop 停止信号 Stop chan int // UseChannel 是否使用channel作为信息传递 UseChannel bool notifychan chan int // Queue 是用来处理收发信息的简单消息队列 Queue *starainrt.StarQueue } // CMsg 指明当前客户端被通知的关键字 type CMsg struct { Key string Value string } func (star *StarNotifyC) starinitc() { star.Queue = starainrt.NewQueue() star.FuncLists = make(map[string]func(CMsg)) star.UseChannel = true star.clientStopSign, star.notifychan, star.Stop = make(chan int, 1), make(chan int, 3), make(chan int, 5) star.clientSign = make(map[string]chan string) } // Notify 用于获取一个通知 func (star *StarNotifyC) Notify(key string) chan string { if _, ok := star.clientSign[key]; !ok { ch := make(chan string, 20) star.clientSign[key] = ch } return star.clientSign[key] } func (star *StarNotifyC) store(key, value string) { if _, ok := star.clientSign[key]; !ok { ch := make(chan string, 20) ch <- value star.clientSign[key] = ch return } star.clientSign[key] <- value } // NewNotifyC 用于新建一个Client端进程 func NewNotifyC(netype, value string) (*StarNotifyC, error) { var err error var star StarNotifyC star.starinitc() star.connc, err = net.Dial(netype, value) if err != nil { return nil, err } go star.cnotify() go func() { for { go func() { <-star.clientStopSign star.notifychan <- 1 star.connc.Close() star.Stop <- 1 return }() buf := make([]byte, 8192) n, err := star.connc.Read(buf) star.Queue.ParseMessage(buf[0:n], star.connc) if err != nil { star.connc.Close() star.Stop <- 1 star.notifychan <- 0 //star, _ = NewNotifyC(netype, value) return } } }() return &star, nil } // Send 用于向Server端发送数据 func (star *StarNotifyC) Send(name string) error { _, err := star.connc.Write(star.Queue.BuildMessage(name)) return err } // SendValue 用于向Server端发送key-value类型数据 func (star *StarNotifyC) SendValue(name, value string) error { _, err := star.connc.Write(star.Queue.BuildMessage(name + "||" + value)) return err } func (star *StarNotifyC) cnotify() { for { select { case <-star.notifychan: return default: } data, err := star.Queue.RestoreOne() if err != nil { time.Sleep(time.Millisecond * 20) continue } strs := strings.SplitN(data.Msg, "||", 2) if len(strs) < 2 { continue } if star.UseChannel { go star.store(strs[0], strs[1]) } else { key, value := strs[0], strs[1] if msg, ok := star.FuncLists[key]; ok { go msg(CMsg{key, value}) } else { if star.defaultFunc != nil { go star.defaultFunc(CMsg{key, value}) } } } } } // ClientStop 终止client端运行 func (star *StarNotifyC) ClientStop() { star.clientStopSign <- 0 } // SetNotify 用于设置关键词的调用函数 func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) { star.FuncLists[name] = data } // SetDefaultNotify 用于设置默认关键词的调用函数 func (star *StarNotifyC) SetDefaultNotify(name string, data func(CMsg)) { star.defaultFunc = data }