package remind import ( "b612.me/sdk/candy/when" "b612.me/stardb" "b612.me/startimer" "errors" "fmt" "sync" "time" ) type Remind struct { db *stardb.StarDB tasks map[int64]Task mu *sync.RWMutex callback func(remind Task) } type Task struct { ID int64 `db:"id"` Origin string `db:"text"` timer *startimer.StarTimer TmrInfo string `db:"code"` Key string `db:"key"` Msg []byte `db:"msg"` Limit int64 `db:"limits"` } func getCreateSql() []string { return []string{ "CREATE TABLE IF NOT EXISTS remind(id INTEGER PRIMARY KEY AUTOINCREMENT,key VARCHAR(64),limits INTEGER,code TEXT,text TEXT,msg BLOB)", "CREATE INDEX IF NOT EXISTS key_idx ON remind (key)", } } func NewRemind(db *stardb.StarDB, callback func(task Task)) (*Remind, error) { if db == nil || db.Db == nil { return nil, errors.New("Invalid hanlder of database") } if err := db.Ping(); err != nil { return nil, err } for _, sql := range getCreateSql() { _, err := db.Exec(sql) if err != nil { return nil, err } } return innerLoadDB(db, callback) } func innerLoadDB(db *stardb.StarDB, callback func(task Task)) (*Remind, error) { var rem = Remind{ db: db, mu: new(sync.RWMutex), callback: callback, tasks: make(map[int64]Task), } var res []Task data, err := db.Query("select * from remind") if err != nil { return nil, err } err = data.Orm(&res) if err != nil { return nil, err } if len(res) != 0 { rem.tasks = make(map[int64]Task, len(res)) for _, task := range res { itask := new(Task) *itask = task tmr, err := startimer.NewTimer(time.Now()) if err != nil { return nil, fmt.Errorf("error while init:%s error:%v", itask.Origin, err) } err = tmr.ImportRepeats(itask.TmrInfo) if err != nil { return nil, fmt.Errorf("error while import:%s error:%v", itask.Origin, err) } tmr.SetRunCountLimit(int(itask.Limit)) itask.timer = &tmr itask.timer.AddTask(func() { rem.callbackFn(itask) }) err = itask.timer.Run() if err != nil { db.Exec(`delete from remind where id=?`, task.ID) continue } rem.tasks[itask.ID] = *itask } } return &rem, nil } func (t Task) GetTimer() *startimer.StarTimer { return t.timer } func (r *Remind) callbackFn(task *Task) { if r.callback != nil { r.callback(*task) } if !task.GetTimer().IsRunning() { go r.DeleteTask(task.ID) } } func (r *Remind) AddTask(taskStr, key string, msg []byte) (Task, error) { tmr, err := when.WhenWithPeriod(taskStr) if err != nil { return Task{}, err } exp, err := tmr.ExportRepeats() if err != nil { return Task{}, err } var rmt = Task{ Origin: taskStr, TmrInfo: exp, Key: key, Msg: msg, timer: &tmr, Limit: int64(tmr.RunCountLimit()), } rmt.timer.AddTask(func() { r.callbackFn(&rmt) }) res, err := r.db.Insert(rmt, "remind", "id") if err != nil { return Task{}, err } id, err := res.LastInsertId() if err != nil { return Task{}, err } rmt.ID = int64(id) r.mu.Lock() r.tasks[rmt.ID] = rmt r.mu.Unlock() err = rmt.timer.Run() time.Sleep(time.Microsecond * 100) if err != nil || !rmt.timer.IsRunning() { r.DeleteTask(rmt.ID) return Task{}, err } return rmt, nil } func (r *Remind) DeleteTask(id int64) error { r.mu.RLock() data, ok := r.tasks[id] r.mu.RUnlock() if !ok { return errors.New("no such id") } r.mu.Lock() defer r.mu.Unlock() err := data.timer.Stop() if err != nil { return err } _, err = r.db.Exec("delete from remind where id=?", id) if err != nil { return err } delete(r.tasks, id) return nil } func (r *Remind) ListTasks() []Task { var res = make([]Task, 0, len(r.tasks)) r.mu.RLock() defer r.mu.RUnlock() for _, tk := range r.tasks { res = append(res, tk) } return res } func (r *Remind) GetTasksByKey(key string) []Task { var res []Task r.mu.RLock() defer r.mu.RUnlock() for _, tk := range r.tasks { if tk.Key == key { res = append(res, tk) } } return res } func (r *Remind) GetTaskByID(id int64) Task { r.mu.RLock() defer r.mu.RUnlock() return r.tasks[id] } func (r *Remind) Stop() error { for _, task := range r.tasks { task.GetTimer().Stop() } return nil } func (r *Remind) Reset() error { for _, task := range r.tasks { err := r.DeleteTask(task.ID) if err != nil { return err } } return nil }