ws_server.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package network
  2. import (
  3. "crypto/tls"
  4. "net"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "bet24.com/log"
  9. "github.com/gorilla/websocket"
  10. )
  11. type WSServer struct {
  12. Addr string
  13. MaxConnNum int
  14. PendingWriteNum int
  15. MaxMsgLen uint32
  16. HTTPTimeout time.Duration
  17. CertFile string
  18. KeyFile string
  19. NewAgent func(*WSConn) Agent
  20. IsTextMessage bool
  21. ln net.Listener
  22. handler *WSHandler
  23. }
  24. type WSHandler struct {
  25. maxConnNum int
  26. pendingWriteNum int
  27. maxMsgLen uint32
  28. newAgent func(*WSConn) Agent
  29. upgrader websocket.Upgrader
  30. conns WebsocketConnSet
  31. mutexConns sync.Mutex
  32. wg sync.WaitGroup
  33. isBinary bool
  34. }
  35. func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  36. if r.Method != "GET" {
  37. http.Error(w, "Method not allowed", 405)
  38. return
  39. }
  40. conn, err := handler.upgrader.Upgrade(w, r, nil)
  41. if err != nil {
  42. log.Debug("upgrade error: %v", err)
  43. return
  44. }
  45. conn.SetReadLimit(int64(handler.maxMsgLen))
  46. handler.wg.Add(1)
  47. defer handler.wg.Done()
  48. conn.SetRealIP(r.Header.Get("X-Real-IP"))
  49. handler.mutexConns.Lock()
  50. if handler.conns == nil {
  51. handler.mutexConns.Unlock()
  52. conn.Close()
  53. return
  54. }
  55. if len(handler.conns) >= handler.maxConnNum {
  56. handler.mutexConns.Unlock()
  57. conn.Close()
  58. log.Debug("too many connections")
  59. return
  60. }
  61. handler.conns[conn] = struct{}{}
  62. handler.mutexConns.Unlock()
  63. wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen, handler.isBinary)
  64. agent := handler.newAgent(wsConn)
  65. agent.Run()
  66. // cleanup
  67. //log.Debug("cleanup connection")
  68. wsConn.Close()
  69. handler.mutexConns.Lock()
  70. delete(handler.conns, conn)
  71. handler.mutexConns.Unlock()
  72. agent.OnClose()
  73. }
  74. func (server *WSServer) Start() {
  75. ln, err := net.Listen("tcp", server.Addr)
  76. if err != nil {
  77. log.Fatal("%v", err)
  78. }
  79. if server.MaxConnNum <= 0 {
  80. server.MaxConnNum = 100
  81. log.Release("invalid MaxConnNum, reset to %v", server.MaxConnNum)
  82. }
  83. if server.PendingWriteNum <= 0 {
  84. server.PendingWriteNum = 100
  85. log.Release("invalid PendingWriteNum, reset to %v", server.PendingWriteNum)
  86. }
  87. if server.MaxMsgLen <= 0 {
  88. server.MaxMsgLen = 4096
  89. log.Release("invalid MaxMsgLen, reset to %v", server.MaxMsgLen)
  90. }
  91. if server.HTTPTimeout <= 0 {
  92. server.HTTPTimeout = 10 * time.Second
  93. log.Release("invalid HTTPTimeout, reset to %v", server.HTTPTimeout)
  94. }
  95. if server.NewAgent == nil {
  96. log.Fatal("NewAgent must not be nil")
  97. }
  98. if server.CertFile != "" || server.KeyFile != "" {
  99. config := &tls.Config{}
  100. config.NextProtos = []string{"http/1.1"}
  101. var err error
  102. config.Certificates = make([]tls.Certificate, 1)
  103. config.Certificates[0], err = tls.LoadX509KeyPair(server.CertFile, server.KeyFile)
  104. if err != nil {
  105. log.Fatal("%v", err)
  106. }
  107. ln = tls.NewListener(ln, config)
  108. }
  109. server.ln = ln
  110. server.handler = &WSHandler{
  111. maxConnNum: server.MaxConnNum,
  112. pendingWriteNum: server.PendingWriteNum,
  113. maxMsgLen: server.MaxMsgLen,
  114. newAgent: server.NewAgent,
  115. isBinary: !server.IsTextMessage,
  116. conns: make(WebsocketConnSet),
  117. upgrader: websocket.Upgrader{
  118. HandshakeTimeout: server.HTTPTimeout,
  119. CheckOrigin: func(_ *http.Request) bool { return true },
  120. },
  121. }
  122. httpServer := &http.Server{
  123. Addr: server.Addr,
  124. Handler: server.handler,
  125. ReadTimeout: server.HTTPTimeout,
  126. WriteTimeout: server.HTTPTimeout,
  127. MaxHeaderBytes: 1024,
  128. }
  129. go httpServer.Serve(ln)
  130. }
  131. func (server *WSServer) Close() {
  132. server.ln.Close()
  133. server.handler.mutexConns.Lock()
  134. for conn := range server.handler.conns {
  135. conn.Close()
  136. }
  137. server.handler.conns = nil
  138. server.handler.mutexConns.Unlock()
  139. server.handler.wg.Wait()
  140. }
  141. func (server *WSServer) GetConnectionCount() int {
  142. server.handler.mutexConns.Lock()
  143. defer server.handler.mutexConns.Unlock()
  144. return len(server.handler.conns)
  145. }