package handler import ( "bet24.com/log" pb "bet24.com/servers/micros/notification/proto" subscriber_client "bet24.com/servers/micros/notification/subscribe/client" userservices "bet24.com/servers/micros/userservices/proto" "encoding/json" "strconv" "sync" ) type manager struct { lock *sync.RWMutex notification_list map[int]int subscriber []string lockSubs *sync.RWMutex } func newManager() *manager { m := new(manager) m.lock = &sync.RWMutex{} m.lockSubs = &sync.RWMutex{} m.notification_list = make(map[int]int) log.Debug("notification.manager running") return m } func (m *manager) addNotification(userId int, notificationId int, data string) bool { //log.Debug("addNotification UserId[%d] %d", userId, notificationId) var ok bool if userId != -1 { m.lock.Lock() _, ok = m.notification_list[userId] m.lock.Unlock() if !ok { //log.Release("addNotification userId[%d] not exist", userId) return false } } n := &pb.Notification{Id: notificationId, Data: data} // 金币或钻石发生变化,通知userservice更新 if notificationId <= pb.Notification_Chip { userservices.UpdateUserInfo(userId) } msgData, _ := json.Marshal(n) m.sendNotification(userId, string(msgData)) return true } func (m *manager) sendNotification(userId int, data string) { m.lockSubs.Lock() for i := 0; i < len(m.subscriber); { v := m.subscriber[i] if !subscriber_client.OnNotification(v, userId, data) { m.subscriber = append(m.subscriber[:i], m.subscriber[i+1:]...) } else { i++ } } m.lockSubs.Unlock() } func (m *manager) getNotifications(userId int) string { return "" } func (m *manager) onUserEnter(userId int) { //log.Debug("notification.manager.onUserEnter %d", userId) m.lock.Lock() defer m.lock.Unlock() _, ok := m.notification_list[userId] if !ok { m.notification_list[userId] = 1 } } func (m *manager) onUserExit(userId int) { m.lock.Lock() defer m.lock.Unlock() delete(m.notification_list, userId) } func (m *manager) syncUserList(userlist []int) { for _, v := range userlist { m.onUserEnter(v) } // 不用删除不存在的,因为可能不同的服务器过来刷新 } func (m *manager) dumpUser(param string) { log.Release("-------------------------------") log.Release("notificationMgr.dumpUser %s", param) defer func() { log.Release("+++++++++++++++++++++++++++++++") log.Release("") }() var userId int var err error m.lock.Lock() defer m.lock.Unlock() if userId, err = strconv.Atoi(param); err != nil { log.Release(" user count : %d", len(m.notification_list)) return } _, ok := m.notification_list[userId] if !ok { log.Release(" user %d have no notifications", userId) } else { log.Release(" user %d have notifications ", userId) } } func (m *manager) dumpReceiver() { log.Release("-------------------------------") log.Release("notificationMgr.dumpReceiver") defer func() { log.Release("+++++++++++++++++++++++++++++++") log.Release("") }() m.lockSubs.RLock() defer m.lockSubs.RUnlock() for _, v := range m.subscriber { log.Release(" %s", v) } } func (m *manager) subscribe(addr string) { //log.Release("subscriber come %s", addr) m.lockSubs.Lock() defer m.lockSubs.Unlock() for _, v := range m.subscriber { if v == addr { return } } m.subscriber = append(m.subscriber, addr) }