channelMgr.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package chat
  2. import (
  3. "bet24.com/log"
  4. "bet24.com/servers/coreservice/friend"
  5. notification "bet24.com/servers/micros/notification/proto"
  6. task "bet24.com/servers/micros/task/proto"
  7. user "bet24.com/servers/micros/userservices/proto"
  8. "encoding/json"
  9. "strconv"
  10. "sync"
  11. "time"
  12. )
  13. func newChannelMgr() *channelMgr {
  14. cm := new(channelMgr)
  15. cm.lock = &sync.RWMutex{}
  16. cm.channels = make(map[string]*channel)
  17. cm.lockNotification = &sync.RWMutex{}
  18. cm.notificationCache = make(map[int]map[string]int)
  19. cm.checkTimeout()
  20. return cm
  21. }
  22. type channelMgr struct {
  23. lock *sync.RWMutex
  24. channels map[string]*channel
  25. lockNotification *sync.RWMutex
  26. notificationCache map[int]map[string]int
  27. }
  28. func (cm *channelMgr) dump(param1, param2 string) {
  29. log.Release("channelMgr.dump[%s %s] ----------", param1, param2)
  30. defer log.Release("channelMgr.dump end ++++++++++++++++")
  31. if param1 == "" {
  32. cm.lock.RLock()
  33. log.Release(" channel count %d", len(cm.channels))
  34. cm.lock.RUnlock()
  35. return
  36. }
  37. switch param1 {
  38. case "channel":
  39. if param2 == "" {
  40. cm.lock.RLock()
  41. for k := range cm.channels {
  42. log.Release(" %s", k)
  43. }
  44. cm.lock.RUnlock()
  45. return
  46. }
  47. c := cm.getChannel(param2, false)
  48. if c == nil {
  49. log.Release("channel %s not found", param2)
  50. return
  51. }
  52. c.dump()
  53. case "user":
  54. userId, err := strconv.Atoi(param2)
  55. if err != nil {
  56. log.Release("user not exist %s", param2)
  57. return
  58. }
  59. chs := cm.getUserChannels(userId)
  60. log.Release(" user %d channel count %d", userId, len(chs))
  61. for _, v := range chs {
  62. log.Release(" %s", v.Key)
  63. }
  64. default:
  65. log.Release(" unknown command %s", param1)
  66. return
  67. }
  68. }
  69. func (cm *channelMgr) checkTimeout() {
  70. time.AfterFunc(10*time.Minute, cm.checkTimeout)
  71. toRemove := []string{}
  72. cm.lock.RLock()
  73. for _, v := range cm.channels {
  74. if v.isIdle() {
  75. toRemove = append(toRemove, v.Key)
  76. v.release()
  77. }
  78. }
  79. cm.lock.RUnlock()
  80. if len(toRemove) == 0 {
  81. return
  82. }
  83. cm.lock.Lock()
  84. for _, v := range toRemove {
  85. delete(cm.channels, v)
  86. }
  87. cm.lock.Unlock()
  88. //var toRemoveCache []int
  89. now := int(time.Now().Unix())
  90. cm.lockNotification.Lock()
  91. for _, v := range cm.notificationCache {
  92. for k, v1 := range v {
  93. if now-v1 >= ONE_DAY {
  94. delete(v, k)
  95. }
  96. }
  97. }
  98. cm.lockNotification.Unlock()
  99. }
  100. func (cm *channelMgr) createChannel(users []*ChannelUser) *channel {
  101. if len(users) < 2 {
  102. log.Release("channelMgr createChannel not enough user")
  103. return nil
  104. }
  105. userIds := make([]int, len(users))
  106. for k, v := range users {
  107. userIds[k] = v.UserId
  108. }
  109. // 已存在?
  110. c := cm.getChannel(getChannelKey(userIds), false)
  111. if c != nil {
  112. return c
  113. }
  114. c = newChannel(users)
  115. cm.lock.Lock()
  116. cm.channels[c.Key] = c
  117. cm.lock.Unlock()
  118. return c
  119. }
  120. func (cm *channelMgr) createChannelByKey(key string) *channel {
  121. c := cm.getChannel(key, false)
  122. if c != nil {
  123. return c
  124. }
  125. c = createChannelByKey(key)
  126. if c != nil {
  127. cm.lock.Lock()
  128. cm.channels[key] = c
  129. cm.lock.Unlock()
  130. }
  131. return c
  132. }
  133. func (cm *channelMgr) getChannel(key string, create bool) *channel {
  134. cm.lock.RLock()
  135. c, ok := cm.channels[key]
  136. cm.lock.RUnlock()
  137. if !ok {
  138. if create {
  139. c = createChannelByKey(key)
  140. if c != nil {
  141. cm.lock.Lock()
  142. cm.channels[key] = c
  143. cm.lock.Unlock()
  144. }
  145. return c
  146. }
  147. }
  148. return c
  149. }
  150. func (cm *channelMgr) getUserChannels(userId int) []*channel {
  151. var ret []*channel
  152. cm.lock.RLock()
  153. for _, v := range cm.channels {
  154. if v.isInvolved(userId) {
  155. ret = append(ret, v)
  156. }
  157. }
  158. cm.lock.RUnlock()
  159. return ret
  160. }
  161. func (cm *channelMgr) constructChannelUser(userId int, userInfo *user.UserHotInfo) *ChannelUser {
  162. cu := new(ChannelUser)
  163. cu.UserId = userId
  164. if userInfo != nil {
  165. cu.NickName = userInfo.NickName
  166. cu.FaceId = userInfo.FaceId
  167. cu.FaceUrl = userInfo.FaceUrl
  168. cu.Decorations = userInfo.Decorations
  169. cu.Vip = userInfo.Vip
  170. cu.VipPoint = userInfo.VipPoint
  171. cu.VipExpire = userInfo.VipExpire
  172. }
  173. return cu
  174. }
  175. func (cm *channelMgr) sendGiftMsg(from, to int, msg string) string {
  176. cm.sendChatMsg(from, to, msg, MessageType_Gift)
  177. return ""
  178. }
  179. func (cm *channelMgr) sendChat(from, to int, msg string, msgType int) string {
  180. // 不是好友不让发
  181. if friend.IfFriend(from, to) != 1 {
  182. log.Release("channelMgr.sendChat failed %d and %d not friends", from, to)
  183. return "failed"
  184. }
  185. return cm.sendChatMsg(from, to, msg, msgType)
  186. }
  187. func (cm *channelMgr) sendChatMsg(from, to int, msg string, msgType int) string {
  188. fromUser := user.GetUserInfo(from)
  189. toUser := user.GetUserInfo(to)
  190. if fromUser == nil || toUser == nil {
  191. log.Release("channelMgr.sendChat failed %d to %d", from, to)
  192. return ""
  193. }
  194. userIds := []int{from, to}
  195. key := getChannelKey(userIds)
  196. c := cm.getChannel(key, false)
  197. if c == nil {
  198. users := []*ChannelUser{cm.constructChannelUser(from, fromUser), cm.constructChannelUser(to, toUser)}
  199. c = cm.createChannel(users)
  200. }
  201. c.addChat(from, msg, msgType)
  202. go cm.sendNotification(to, key, c.getNewChatCount(to), false, c.getLatestMessage(), c.getLatestTime(), msgType)
  203. if msgType == MessageType_Gift {
  204. go cm.sendNotification(from, key, c.getNewChatCount(from), false, c.getLatestMessage(), c.getLatestTime(), msgType)
  205. return ""
  206. }
  207. go task.DoTaskAction(from, task.TaskAction_privatechat, 1, task.TaskScope{})
  208. return cm.getChat(key, from)
  209. }
  210. func (cm *channelMgr) sendNotification(userId int, channelKey string, new int, noCache bool, latestMsg, latestTime string, msgType int) {
  211. nt := notification.Notification_Channel{
  212. Key: channelKey,
  213. New: new,
  214. LatestMsg: latestMsg,
  215. LatestTime: latestTime,
  216. MessageType: msgType,
  217. }
  218. d, _ := json.Marshal(nt)
  219. if !notification.AddNotification(userId, notification.Notification_ChannelChat, string(d)) && !noCache {
  220. cm.addCacheNotification(userId, channelKey)
  221. }
  222. }
  223. func (cm *channelMgr) getChat(key string, userId int) string {
  224. c := cm.getChannel(key, true)
  225. if c == nil {
  226. log.Release("channelMgr.getChat %s not found", key)
  227. return ""
  228. }
  229. return c.getChatByUser(userId)
  230. }
  231. func (cm *channelMgr) addCacheNotification(userId int, key string) {
  232. cm.lockNotification.Lock()
  233. cache, ok := cm.notificationCache[userId]
  234. if !ok {
  235. cache = make(map[string]int)
  236. }
  237. cache[key] = int(time.Now().Unix())
  238. cm.notificationCache[userId] = cache
  239. cm.lockNotification.Unlock()
  240. }
  241. func (cm *channelMgr) getNotificatinCache(userId int) []string {
  242. cm.lockNotification.RLock()
  243. cache, ok := cm.notificationCache[userId]
  244. cm.lockNotification.RUnlock()
  245. if !ok {
  246. return []string{}
  247. }
  248. cm.removeCache(userId)
  249. var ret []string
  250. for k := range cache {
  251. ret = append(ret, k)
  252. }
  253. return ret
  254. }
  255. func (cm *channelMgr) removeCache(userId int) {
  256. cm.lockNotification.Lock()
  257. delete(cm.notificationCache, userId)
  258. cm.lockNotification.Unlock()
  259. }
  260. func (cm *channelMgr) onUserEnter(userId int) {
  261. channels := cm.getNotificatinCache(userId)
  262. if len(channels) == 0 {
  263. return
  264. }
  265. userInfo := user.GetUserInfo(userId)
  266. for _, v := range channels {
  267. c := cm.createChannelByKey(v)
  268. if c == nil {
  269. continue
  270. }
  271. if userInfo != nil {
  272. c.updateUser(cm.constructChannelUser(userId, userInfo))
  273. }
  274. cm.sendNotification(userId, c.Key, c.getNewChatCount(userId), true, c.getLatestMessage(), c.getLatestTime(), c.getLatestMessageType())
  275. }
  276. }
  277. func (cm *channelMgr) onUserExit(userId int) {
  278. channels := cm.getUserChannels(userId)
  279. if len(channels) == 0 {
  280. return
  281. }
  282. for _, v := range channels {
  283. v.onUserExit(userId)
  284. }
  285. }
  286. func (cm *channelMgr) getChannelInfo(channelKey string) string {
  287. c := cm.getChannel(channelKey, true)
  288. if c == nil {
  289. return ""
  290. }
  291. return c.getInfo()
  292. }
  293. func (cm *channelMgr) userClear(channelKey string, userId int) {
  294. c := cm.getChannel(channelKey, true)
  295. if c != nil {
  296. c.userClear(userId)
  297. }
  298. }
  299. func (cm *channelMgr) removeChannel(userId1, userId2 int) {
  300. userIds := []int{userId1, userId2}
  301. key := getChannelKey(userIds)
  302. c := cm.getChannel(key, false)
  303. if c == nil {
  304. return
  305. }
  306. c.removeFromRedis()
  307. cm.lock.Lock()
  308. delete(cm.channels, key)
  309. cm.lock.Unlock()
  310. }