sync support

master
兔子 4 years ago
parent c9cfb6fd8a
commit 79dcaaf249

@ -27,6 +27,7 @@ type StarNotifyC struct {
// UseChannel 是否使用channel作为信息传递 // UseChannel 是否使用channel作为信息传递
UseChannel bool UseChannel bool
isUDP bool isUDP bool
Sync bool
// Queue 是用来处理收发信息的简单消息队列 // Queue 是用来处理收发信息的简单消息队列
Queue *starnet.StarQueue Queue *starnet.StarQueue
// Online 当前链接是否处于活跃状态 // Online 当前链接是否处于活跃状态
@ -281,12 +282,20 @@ func (star *StarNotifyC) cnotify() {
mode, key, value := strs[0], strs[1], strs[2] mode, key, value := strs[0], strs[1], strs[2]
if mode[0:2] != "cr" { if mode[0:2] != "cr" {
if msg, ok := star.FuncLists[key]; ok { if msg, ok := star.FuncLists[key]; ok {
if star.Sync {
msg(CMsg{key, value, mode, nil})
} else {
go msg(CMsg{key, value, mode, nil}) go msg(CMsg{key, value, mode, nil})
}
} else { } else {
if star.defaultFunc != nil { if star.defaultFunc != nil {
if star.Sync {
star.defaultFunc(CMsg{key, value, mode, nil})
} else {
go star.defaultFunc(CMsg{key, value, mode, nil}) go star.defaultFunc(CMsg{key, value, mode, nil})
} }
} }
}
} else { } else {
if sa, ok := star.lockPool[mode]; ok { if sa, ok := star.lockPool[mode]; ok {
sa.Key = key sa.Key = key
@ -296,9 +305,16 @@ func (star *StarNotifyC) cnotify() {
sa.wait <- 1 sa.wait <- 1
} else { } else {
if msg, ok := star.FuncLists[key]; ok { if msg, ok := star.FuncLists[key]; ok {
if star.Sync {
msg(CMsg{key, value, mode, nil})
} else {
go msg(CMsg{key, value, mode, nil}) go msg(CMsg{key, value, mode, nil})
}
} else { } else {
if star.defaultFunc != nil { if star.defaultFunc != nil {
if star.Sync {
star.defaultFunc(CMsg{key, value, mode, nil})
} else {
go star.defaultFunc(CMsg{key, value, mode, nil}) go star.defaultFunc(CMsg{key, value, mode, nil})
} }
} }
@ -307,6 +323,7 @@ func (star *StarNotifyC) cnotify() {
} }
} }
} }
}
// ClientStop 终止client端运行 // ClientStop 终止client端运行
func (star *StarNotifyC) ClientStop() { func (star *StarNotifyC) ClientStop() {

@ -49,6 +49,7 @@ type StarNotifyS struct {
udpPool map[string]*net.UDPAddr udpPool map[string]*net.UDPAddr
listener net.Listener listener net.Listener
isUDP bool isUDP bool
Sync bool
// Stop 停止信 号 // Stop 停止信 号
Stop chan int Stop chan int
// UDPConn UDP监听 // UDPConn UDP监听
@ -432,8 +433,7 @@ func (star *StarNotifyS) notify() {
continue continue
} }
} }
if mode[0:2] != "sr" { replyFunc := func(key string, rmsg SMsg) {
go func() {
if msg, ok := star.FuncLists[key]; ok { if msg, ok := star.FuncLists[key]; ok {
sdata := msg(rmsg) sdata := msg(rmsg)
if sdata == "" { if sdata == "" {
@ -449,31 +449,25 @@ func (star *StarNotifyS) notify() {
rmsg.Reply(sdata) rmsg.Reply(sdata)
} }
} }
}() }
if mode[0:2] != "sr" {
if star.Sync {
go replyFunc(key, rmsg)
} else {
replyFunc(key, rmsg)
}
} else { } else {
if sa, ok := star.lockPool[mode]; ok { if sa, ok := star.lockPool[mode]; ok {
rmsg.wait = sa.wait rmsg.wait = sa.wait
star.lockPool[mode] = rmsg star.lockPool[mode] = rmsg
star.lockPool[mode].wait <- 1 star.lockPool[mode].wait <- 1
} else { } else {
go func() { if star.Sync {
if msg, ok := star.FuncLists[key]; ok { go replyFunc(key, rmsg)
sdata := msg(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
} else { } else {
if star.defaultFunc != nil { replyFunc(key, rmsg)
sdata := star.defaultFunc(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
} }
} }
}()
}
} }
} }
} }

Loading…
Cancel
Save