| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- package chat
- import (
- "bet24.com/log"
- "bet24.com/servers/coreservice/friend"
- notification "bet24.com/servers/micros/notification/proto"
- task "bet24.com/servers/micros/task/proto"
- user "bet24.com/servers/micros/userservices/proto"
- "encoding/json"
- "strconv"
- "sync"
- "time"
- )
- func newChannelMgr() *channelMgr {
- cm := new(channelMgr)
- cm.lock = &sync.RWMutex{}
- cm.channels = make(map[string]*channel)
- cm.lockNotification = &sync.RWMutex{}
- cm.notificationCache = make(map[int]map[string]int)
- cm.checkTimeout()
- return cm
- }
- type channelMgr struct {
- lock *sync.RWMutex
- channels map[string]*channel
- lockNotification *sync.RWMutex
- notificationCache map[int]map[string]int
- }
- func (cm *channelMgr) dump(param1, param2 string) {
- log.Release("channelMgr.dump[%s %s] ----------", param1, param2)
- defer log.Release("channelMgr.dump end ++++++++++++++++")
- if param1 == "" {
- cm.lock.RLock()
- log.Release(" channel count %d", len(cm.channels))
- cm.lock.RUnlock()
- return
- }
- switch param1 {
- case "channel":
- if param2 == "" {
- cm.lock.RLock()
- for k := range cm.channels {
- log.Release(" %s", k)
- }
- cm.lock.RUnlock()
- return
- }
- c := cm.getChannel(param2, false)
- if c == nil {
- log.Release("channel %s not found", param2)
- return
- }
- c.dump()
- case "user":
- userId, err := strconv.Atoi(param2)
- if err != nil {
- log.Release("user not exist %s", param2)
- return
- }
- chs := cm.getUserChannels(userId)
- log.Release(" user %d channel count %d", userId, len(chs))
- for _, v := range chs {
- log.Release(" %s", v.Key)
- }
- default:
- log.Release(" unknown command %s", param1)
- return
- }
- }
- func (cm *channelMgr) checkTimeout() {
- time.AfterFunc(10*time.Minute, cm.checkTimeout)
- toRemove := []string{}
- cm.lock.RLock()
- for _, v := range cm.channels {
- if v.isIdle() {
- toRemove = append(toRemove, v.Key)
- v.release()
- }
- }
- cm.lock.RUnlock()
- if len(toRemove) == 0 {
- return
- }
- cm.lock.Lock()
- for _, v := range toRemove {
- delete(cm.channels, v)
- }
- cm.lock.Unlock()
- //var toRemoveCache []int
- now := int(time.Now().Unix())
- cm.lockNotification.Lock()
- for _, v := range cm.notificationCache {
- for k, v1 := range v {
- if now-v1 >= ONE_DAY {
- delete(v, k)
- }
- }
- }
- cm.lockNotification.Unlock()
- }
- func (cm *channelMgr) createChannel(users []*ChannelUser) *channel {
- if len(users) < 2 {
- log.Release("channelMgr createChannel not enough user")
- return nil
- }
- userIds := make([]int, len(users))
- for k, v := range users {
- userIds[k] = v.UserId
- }
- // 已存在?
- c := cm.getChannel(getChannelKey(userIds), false)
- if c != nil {
- return c
- }
- c = newChannel(users)
- cm.lock.Lock()
- cm.channels[c.Key] = c
- cm.lock.Unlock()
- return c
- }
- func (cm *channelMgr) createChannelByKey(key string) *channel {
- c := cm.getChannel(key, false)
- if c != nil {
- return c
- }
- c = createChannelByKey(key)
- if c != nil {
- cm.lock.Lock()
- cm.channels[key] = c
- cm.lock.Unlock()
- }
- return c
- }
- func (cm *channelMgr) getChannel(key string, create bool) *channel {
- cm.lock.RLock()
- c, ok := cm.channels[key]
- cm.lock.RUnlock()
- if !ok {
- if create {
- c = createChannelByKey(key)
- if c != nil {
- cm.lock.Lock()
- cm.channels[key] = c
- cm.lock.Unlock()
- }
- return c
- }
- }
- return c
- }
- func (cm *channelMgr) getUserChannels(userId int) []*channel {
- var ret []*channel
- cm.lock.RLock()
- for _, v := range cm.channels {
- if v.isInvolved(userId) {
- ret = append(ret, v)
- }
- }
- cm.lock.RUnlock()
- return ret
- }
- func (cm *channelMgr) constructChannelUser(userId int, userInfo *user.UserHotInfo) *ChannelUser {
- cu := new(ChannelUser)
- cu.UserId = userId
- if userInfo != nil {
- cu.NickName = userInfo.NickName
- cu.FaceId = userInfo.FaceId
- cu.FaceUrl = userInfo.FaceUrl
- cu.Decorations = userInfo.Decorations
- cu.Vip = userInfo.Vip
- cu.VipPoint = userInfo.VipPoint
- cu.VipExpire = userInfo.VipExpire
- }
- return cu
- }
- func (cm *channelMgr) sendGiftMsg(from, to int, msg string) string {
- cm.sendChatMsg(from, to, msg, MessageType_Gift)
- return ""
- }
- func (cm *channelMgr) sendChat(from, to int, msg string, msgType int) string {
- // 不是好友不让发
- if friend.IfFriend(from, to) != 1 {
- log.Release("channelMgr.sendChat failed %d and %d not friends", from, to)
- return "failed"
- }
- return cm.sendChatMsg(from, to, msg, msgType)
- }
- func (cm *channelMgr) sendChatMsg(from, to int, msg string, msgType int) string {
- fromUser := user.GetUserInfo(from)
- toUser := user.GetUserInfo(to)
- if fromUser == nil || toUser == nil {
- log.Release("channelMgr.sendChat failed %d to %d", from, to)
- return ""
- }
- userIds := []int{from, to}
- key := getChannelKey(userIds)
- c := cm.getChannel(key, false)
- if c == nil {
- users := []*ChannelUser{cm.constructChannelUser(from, fromUser), cm.constructChannelUser(to, toUser)}
- c = cm.createChannel(users)
- }
- c.addChat(from, msg, msgType)
- go cm.sendNotification(to, key, c.getNewChatCount(to), false, c.getLatestMessage(), c.getLatestTime(), msgType)
- if msgType == MessageType_Gift {
- go cm.sendNotification(from, key, c.getNewChatCount(from), false, c.getLatestMessage(), c.getLatestTime(), msgType)
- return ""
- }
- go task.DoTaskAction(from, task.TaskAction_privatechat, 1, task.TaskScope{})
- return cm.getChat(key, from)
- }
- func (cm *channelMgr) sendNotification(userId int, channelKey string, new int, noCache bool, latestMsg, latestTime string, msgType int) {
- nt := notification.Notification_Channel{
- Key: channelKey,
- New: new,
- LatestMsg: latestMsg,
- LatestTime: latestTime,
- MessageType: msgType,
- }
- d, _ := json.Marshal(nt)
- if !notification.AddNotification(userId, notification.Notification_ChannelChat, string(d)) && !noCache {
- cm.addCacheNotification(userId, channelKey)
- }
- }
- func (cm *channelMgr) getChat(key string, userId int) string {
- c := cm.getChannel(key, true)
- if c == nil {
- log.Release("channelMgr.getChat %s not found", key)
- return ""
- }
- return c.getChatByUser(userId)
- }
- func (cm *channelMgr) addCacheNotification(userId int, key string) {
- cm.lockNotification.Lock()
- cache, ok := cm.notificationCache[userId]
- if !ok {
- cache = make(map[string]int)
- }
- cache[key] = int(time.Now().Unix())
- cm.notificationCache[userId] = cache
- cm.lockNotification.Unlock()
- }
- func (cm *channelMgr) getNotificatinCache(userId int) []string {
- cm.lockNotification.RLock()
- cache, ok := cm.notificationCache[userId]
- cm.lockNotification.RUnlock()
- if !ok {
- return []string{}
- }
- cm.removeCache(userId)
- var ret []string
- for k := range cache {
- ret = append(ret, k)
- }
- return ret
- }
- func (cm *channelMgr) removeCache(userId int) {
- cm.lockNotification.Lock()
- delete(cm.notificationCache, userId)
- cm.lockNotification.Unlock()
- }
- func (cm *channelMgr) onUserEnter(userId int) {
- channels := cm.getNotificatinCache(userId)
- if len(channels) == 0 {
- return
- }
- userInfo := user.GetUserInfo(userId)
- for _, v := range channels {
- c := cm.createChannelByKey(v)
- if c == nil {
- continue
- }
- if userInfo != nil {
- c.updateUser(cm.constructChannelUser(userId, userInfo))
- }
- cm.sendNotification(userId, c.Key, c.getNewChatCount(userId), true, c.getLatestMessage(), c.getLatestTime(), c.getLatestMessageType())
- }
- }
- func (cm *channelMgr) onUserExit(userId int) {
- channels := cm.getUserChannels(userId)
- if len(channels) == 0 {
- return
- }
- for _, v := range channels {
- v.onUserExit(userId)
- }
- }
- func (cm *channelMgr) getChannelInfo(channelKey string) string {
- c := cm.getChannel(channelKey, true)
- if c == nil {
- return ""
- }
- return c.getInfo()
- }
- func (cm *channelMgr) userClear(channelKey string, userId int) {
- c := cm.getChannel(channelKey, true)
- if c != nil {
- c.userClear(userId)
- }
- }
- func (cm *channelMgr) removeChannel(userId1, userId2 int) {
- userIds := []int{userId1, userId2}
- key := getChannelKey(userIds)
- c := cm.getChannel(key, false)
- if c == nil {
- return
- }
- c.removeFromRedis()
- cm.lock.Lock()
- delete(cm.channels, key)
- cm.lock.Unlock()
- }
|