ws_conn.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package network
  2. import (
  3. "bet24.com/log"
  4. "errors"
  5. "github.com/gorilla/websocket"
  6. "net"
  7. _ "runtime/debug"
  8. "sync"
  9. )
  10. type WebsocketConnSet map[*websocket.Conn]struct{}
  11. type WSConn struct {
  12. sync.Mutex
  13. conn *websocket.Conn
  14. writeChan chan []byte
  15. maxMsgLen uint32
  16. closeFlag bool
  17. messageType int
  18. }
  19. func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32, isBinary bool) *WSConn {
  20. wsConn := new(WSConn)
  21. wsConn.conn = conn
  22. wsConn.writeChan = make(chan []byte, pendingWriteNum)
  23. wsConn.maxMsgLen = maxMsgLen
  24. if isBinary {
  25. wsConn.messageType = websocket.BinaryMessage
  26. } else {
  27. wsConn.messageType = websocket.TextMessage
  28. }
  29. go func() {
  30. for b := range wsConn.writeChan {
  31. if b == nil {
  32. log.Debug("exiting write routine by nil message")
  33. break
  34. }
  35. err := conn.WriteMessage(wsConn.messageType, b)
  36. if err != nil {
  37. log.Debug("exiting write routine by WriteMessage error %v", err)
  38. break
  39. }
  40. }
  41. conn.Close()
  42. wsConn.Lock()
  43. log.Debug("exiting write routine")
  44. wsConn.closeFlag = true
  45. wsConn.Unlock()
  46. }()
  47. return wsConn
  48. }
  49. func (wsConn *WSConn) doDestroy() {
  50. //wsConn.conn.UnderlyingConn().(*net.TCPConn).SetLinger(0)
  51. wsConn.conn.Close()
  52. if !wsConn.closeFlag {
  53. close(wsConn.writeChan)
  54. wsConn.closeFlag = true
  55. }
  56. }
  57. func (wsConn *WSConn) Destroy() {
  58. wsConn.Lock()
  59. defer wsConn.Unlock()
  60. wsConn.doDestroy()
  61. }
  62. func (wsConn *WSConn) Close() {
  63. wsConn.Lock()
  64. defer wsConn.Unlock()
  65. if wsConn.closeFlag {
  66. return
  67. }
  68. wsConn.doWrite(nil)
  69. log.Debug("wsConn.Close()")
  70. wsConn.closeFlag = true
  71. }
  72. func (wsConn *WSConn) doWrite(b []byte) {
  73. if len(wsConn.writeChan) == cap(wsConn.writeChan) {
  74. log.Debug("close conn: channel full")
  75. wsConn.doDestroy()
  76. return
  77. }
  78. wsConn.writeChan <- b
  79. }
  80. func (wsConn *WSConn) LocalAddr() net.Addr {
  81. return wsConn.conn.LocalAddr()
  82. }
  83. func (wsConn *WSConn) RemoteAddr() net.Addr {
  84. return wsConn.conn.RemoteAddr()
  85. }
  86. // goroutine not safe
  87. func (wsConn *WSConn) ReadMsg() ([]byte, error) {
  88. _, b, err := wsConn.conn.ReadMessage()
  89. return b, err
  90. }
  91. func (wsConn *WSConn) WriteData(b []byte) error {
  92. wsConn.Lock()
  93. defer wsConn.Unlock()
  94. if wsConn.closeFlag {
  95. //log.Debug("WriteData closing %s", string(b))
  96. //log.Debug("%s", debug.Stack())
  97. return errors.New("closing")
  98. }
  99. wsConn.doWrite(b)
  100. return nil
  101. }
  102. // args must not be modified by the others goroutines
  103. // 暂时不考虑粘包问题
  104. func (wsConn *WSConn) writeMsg(args ...[]byte) error {
  105. wsConn.Lock()
  106. defer wsConn.Unlock()
  107. if wsConn.closeFlag {
  108. return nil
  109. }
  110. // get len
  111. var msgLen uint32
  112. for i := 0; i < len(args); i++ {
  113. msgLen += uint32(len(args[i]))
  114. }
  115. // check len
  116. if msgLen > wsConn.maxMsgLen {
  117. return errors.New("message too long")
  118. } else if msgLen < 1 {
  119. return errors.New("message too short")
  120. }
  121. // don't copy
  122. if len(args) == 1 {
  123. wsConn.doWrite(args[0])
  124. return nil
  125. }
  126. // merge the args
  127. msg := make([]byte, msgLen)
  128. l := 0
  129. for i := 0; i < len(args); i++ {
  130. copy(msg[l:], args[i])
  131. l += len(args[i])
  132. }
  133. wsConn.doWrite(msg)
  134. return nil
  135. }