package chat import ( "bet24.com/log" "bet24.com/servers/coreservice/friend" notification "bet24.com/servers/micros/notification/proto" task "bet24.com/servers/micros/task/proto" user "bet24.com/servers/micros/userservices/proto" "encoding/json" "strconv" "sync" "time" ) func newChannelMgr() *channelMgr { cm := new(channelMgr) cm.lock = &sync.RWMutex{} cm.channels = make(map[string]*channel) cm.lockNotification = &sync.RWMutex{} cm.notificationCache = make(map[int]map[string]int) cm.checkTimeout() return cm } type channelMgr struct { lock *sync.RWMutex channels map[string]*channel lockNotification *sync.RWMutex notificationCache map[int]map[string]int } func (cm *channelMgr) dump(param1, param2 string) { log.Release("channelMgr.dump[%s %s] ----------", param1, param2) defer log.Release("channelMgr.dump end ++++++++++++++++") if param1 == "" { cm.lock.RLock() log.Release(" channel count %d", len(cm.channels)) cm.lock.RUnlock() return } switch param1 { case "channel": if param2 == "" { cm.lock.RLock() for k := range cm.channels { log.Release(" %s", k) } cm.lock.RUnlock() return } c := cm.getChannel(param2, false) if c == nil { log.Release("channel %s not found", param2) return } c.dump() case "user": userId, err := strconv.Atoi(param2) if err != nil { log.Release("user not exist %s", param2) return } chs := cm.getUserChannels(userId) log.Release(" user %d channel count %d", userId, len(chs)) for _, v := range chs { log.Release(" %s", v.Key) } default: log.Release(" unknown command %s", param1) return } } func (cm *channelMgr) checkTimeout() { time.AfterFunc(10*time.Minute, cm.checkTimeout) toRemove := []string{} cm.lock.RLock() for _, v := range cm.channels { if v.isIdle() { toRemove = append(toRemove, v.Key) v.release() } } cm.lock.RUnlock() if len(toRemove) == 0 { return } cm.lock.Lock() for _, v := range toRemove { delete(cm.channels, v) } cm.lock.Unlock() //var toRemoveCache []int now := int(time.Now().Unix()) cm.lockNotification.Lock() for _, v := range cm.notificationCache { for k, v1 := range v { if now-v1 >= ONE_DAY { delete(v, k) } } } cm.lockNotification.Unlock() } func (cm *channelMgr) createChannel(users []*ChannelUser) *channel { if len(users) < 2 { log.Release("channelMgr createChannel not enough user") return nil } userIds := make([]int, len(users)) for k, v := range users { userIds[k] = v.UserId } // 已存在? c := cm.getChannel(getChannelKey(userIds), false) if c != nil { return c } c = newChannel(users) cm.lock.Lock() cm.channels[c.Key] = c cm.lock.Unlock() return c } func (cm *channelMgr) createChannelByKey(key string) *channel { c := cm.getChannel(key, false) if c != nil { return c } c = createChannelByKey(key) if c != nil { cm.lock.Lock() cm.channels[key] = c cm.lock.Unlock() } return c } func (cm *channelMgr) getChannel(key string, create bool) *channel { cm.lock.RLock() c, ok := cm.channels[key] cm.lock.RUnlock() if !ok { if create { c = createChannelByKey(key) if c != nil { cm.lock.Lock() cm.channels[key] = c cm.lock.Unlock() } return c } } return c } func (cm *channelMgr) getUserChannels(userId int) []*channel { var ret []*channel cm.lock.RLock() for _, v := range cm.channels { if v.isInvolved(userId) { ret = append(ret, v) } } cm.lock.RUnlock() return ret } func (cm *channelMgr) constructChannelUser(userId int, userInfo *user.UserHotInfo) *ChannelUser { cu := new(ChannelUser) cu.UserId = userId if userInfo != nil { cu.NickName = userInfo.NickName cu.FaceId = userInfo.FaceId cu.FaceUrl = userInfo.FaceUrl cu.Decorations = userInfo.Decorations cu.Vip = userInfo.Vip cu.VipPoint = userInfo.VipPoint cu.VipExpire = userInfo.VipExpire } return cu } func (cm *channelMgr) sendGiftMsg(from, to int, msg string) string { cm.sendChatMsg(from, to, msg, MessageType_Gift) return "" } func (cm *channelMgr) sendChat(from, to int, msg string, msgType int) string { // 不是好友不让发 if friend.IfFriend(from, to) != 1 { log.Release("channelMgr.sendChat failed %d and %d not friends", from, to) return "failed" } return cm.sendChatMsg(from, to, msg, msgType) } func (cm *channelMgr) sendChatMsg(from, to int, msg string, msgType int) string { fromUser := user.GetUserInfo(from) toUser := user.GetUserInfo(to) if fromUser == nil || toUser == nil { log.Release("channelMgr.sendChat failed %d to %d", from, to) return "" } userIds := []int{from, to} key := getChannelKey(userIds) c := cm.getChannel(key, false) if c == nil { users := []*ChannelUser{cm.constructChannelUser(from, fromUser), cm.constructChannelUser(to, toUser)} c = cm.createChannel(users) } c.addChat(from, msg, msgType) go cm.sendNotification(to, key, c.getNewChatCount(to), false, c.getLatestMessage(), c.getLatestTime(), msgType) if msgType == MessageType_Gift { go cm.sendNotification(from, key, c.getNewChatCount(from), false, c.getLatestMessage(), c.getLatestTime(), msgType) return "" } go task.DoTaskAction(from, task.TaskAction_privatechat, 1, task.TaskScope{}) return cm.getChat(key, from) } func (cm *channelMgr) sendNotification(userId int, channelKey string, new int, noCache bool, latestMsg, latestTime string, msgType int) { nt := notification.Notification_Channel{ Key: channelKey, New: new, LatestMsg: latestMsg, LatestTime: latestTime, MessageType: msgType, } d, _ := json.Marshal(nt) if !notification.AddNotification(userId, notification.Notification_ChannelChat, string(d)) && !noCache { cm.addCacheNotification(userId, channelKey) } } func (cm *channelMgr) getChat(key string, userId int) string { c := cm.getChannel(key, true) if c == nil { log.Release("channelMgr.getChat %s not found", key) return "" } return c.getChatByUser(userId) } func (cm *channelMgr) addCacheNotification(userId int, key string) { cm.lockNotification.Lock() cache, ok := cm.notificationCache[userId] if !ok { cache = make(map[string]int) } cache[key] = int(time.Now().Unix()) cm.notificationCache[userId] = cache cm.lockNotification.Unlock() } func (cm *channelMgr) getNotificatinCache(userId int) []string { cm.lockNotification.RLock() cache, ok := cm.notificationCache[userId] cm.lockNotification.RUnlock() if !ok { return []string{} } cm.removeCache(userId) var ret []string for k := range cache { ret = append(ret, k) } return ret } func (cm *channelMgr) removeCache(userId int) { cm.lockNotification.Lock() delete(cm.notificationCache, userId) cm.lockNotification.Unlock() } func (cm *channelMgr) onUserEnter(userId int) { channels := cm.getNotificatinCache(userId) if len(channels) == 0 { return } userInfo := user.GetUserInfo(userId) for _, v := range channels { c := cm.createChannelByKey(v) if c == nil { continue } if userInfo != nil { c.updateUser(cm.constructChannelUser(userId, userInfo)) } cm.sendNotification(userId, c.Key, c.getNewChatCount(userId), true, c.getLatestMessage(), c.getLatestTime(), c.getLatestMessageType()) } } func (cm *channelMgr) onUserExit(userId int) { channels := cm.getUserChannels(userId) if len(channels) == 0 { return } for _, v := range channels { v.onUserExit(userId) } } func (cm *channelMgr) getChannelInfo(channelKey string) string { c := cm.getChannel(channelKey, true) if c == nil { return "" } return c.getInfo() } func (cm *channelMgr) userClear(channelKey string, userId int) { c := cm.getChannel(channelKey, true) if c != nil { c.userClear(userId) } } func (cm *channelMgr) removeChannel(userId1, userId2 int) { userIds := []int{userId1, userId2} key := getChannelKey(userIds) c := cm.getChannel(key, false) if c == nil { return } c.removeFromRedis() cm.lock.Lock() delete(cm.channels, key) cm.lock.Unlock() }