manager.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package handler
  2. import (
  3. "bet24.com/log"
  4. pb "bet24.com/servers/micros/notification/proto"
  5. subscriber_client "bet24.com/servers/micros/notification/subscribe/client"
  6. userservices "bet24.com/servers/micros/userservices/proto"
  7. "encoding/json"
  8. "strconv"
  9. "sync"
  10. )
  11. type manager struct {
  12. lock *sync.RWMutex
  13. notification_list map[int]int
  14. subscriber []string
  15. lockSubs *sync.RWMutex
  16. }
  17. func newManager() *manager {
  18. m := new(manager)
  19. m.lock = &sync.RWMutex{}
  20. m.lockSubs = &sync.RWMutex{}
  21. m.notification_list = make(map[int]int)
  22. log.Debug("notification.manager running")
  23. return m
  24. }
  25. func (m *manager) addNotification(userId int, notificationId int, data string) bool {
  26. //log.Debug("addNotification UserId[%d] %d", userId, notificationId)
  27. var ok bool
  28. if userId != -1 {
  29. m.lock.Lock()
  30. _, ok = m.notification_list[userId]
  31. m.lock.Unlock()
  32. if !ok {
  33. //log.Release("addNotification userId[%d] not exist", userId)
  34. return false
  35. }
  36. }
  37. n := &pb.Notification{Id: notificationId, Data: data}
  38. // 金币或钻石发生变化,通知userservice更新
  39. if notificationId <= pb.Notification_Chip {
  40. userservices.UpdateUserInfo(userId)
  41. }
  42. msgData, _ := json.Marshal(n)
  43. m.sendNotification(userId, string(msgData))
  44. return true
  45. }
  46. func (m *manager) sendNotification(userId int, data string) {
  47. m.lockSubs.Lock()
  48. for i := 0; i < len(m.subscriber); {
  49. v := m.subscriber[i]
  50. if !subscriber_client.OnNotification(v, userId, data) {
  51. m.subscriber = append(m.subscriber[:i], m.subscriber[i+1:]...)
  52. } else {
  53. i++
  54. }
  55. }
  56. m.lockSubs.Unlock()
  57. }
  58. func (m *manager) getNotifications(userId int) string {
  59. return ""
  60. }
  61. func (m *manager) onUserEnter(userId int) {
  62. //log.Debug("notification.manager.onUserEnter %d", userId)
  63. m.lock.Lock()
  64. defer m.lock.Unlock()
  65. _, ok := m.notification_list[userId]
  66. if !ok {
  67. m.notification_list[userId] = 1
  68. }
  69. }
  70. func (m *manager) onUserExit(userId int) {
  71. m.lock.Lock()
  72. defer m.lock.Unlock()
  73. delete(m.notification_list, userId)
  74. }
  75. func (m *manager) syncUserList(userlist []int) {
  76. for _, v := range userlist {
  77. m.onUserEnter(v)
  78. }
  79. // 不用删除不存在的,因为可能不同的服务器过来刷新
  80. }
  81. func (m *manager) dumpUser(param string) {
  82. log.Release("-------------------------------")
  83. log.Release("notificationMgr.dumpUser %s", param)
  84. defer func() {
  85. log.Release("+++++++++++++++++++++++++++++++")
  86. log.Release("")
  87. }()
  88. var userId int
  89. var err error
  90. m.lock.Lock()
  91. defer m.lock.Unlock()
  92. if userId, err = strconv.Atoi(param); err != nil {
  93. log.Release(" user count : %d", len(m.notification_list))
  94. return
  95. }
  96. _, ok := m.notification_list[userId]
  97. if !ok {
  98. log.Release(" user %d have no notifications", userId)
  99. } else {
  100. log.Release(" user %d have notifications ", userId)
  101. }
  102. }
  103. func (m *manager) dumpReceiver() {
  104. log.Release("-------------------------------")
  105. log.Release("notificationMgr.dumpReceiver")
  106. defer func() {
  107. log.Release("+++++++++++++++++++++++++++++++")
  108. log.Release("")
  109. }()
  110. m.lockSubs.RLock()
  111. defer m.lockSubs.RUnlock()
  112. for _, v := range m.subscriber {
  113. log.Release(" %s", v)
  114. }
  115. }
  116. func (m *manager) subscribe(addr string) {
  117. //log.Release("subscriber come %s", addr)
  118. m.lockSubs.Lock()
  119. defer m.lockSubs.Unlock()
  120. for _, v := range m.subscriber {
  121. if v == addr {
  122. return
  123. }
  124. }
  125. m.subscriber = append(m.subscriber, addr)
  126. }