| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package network
- import (
- "bet24.com/log"
- "errors"
- "github.com/gorilla/websocket"
- "net"
- _ "runtime/debug"
- "sync"
- )
- type WebsocketConnSet map[*websocket.Conn]struct{}
- type WSConn struct {
- sync.Mutex
- conn *websocket.Conn
- writeChan chan []byte
- maxMsgLen uint32
- closeFlag bool
- messageType int
- }
- func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32, isBinary bool) *WSConn {
- wsConn := new(WSConn)
- wsConn.conn = conn
- wsConn.writeChan = make(chan []byte, pendingWriteNum)
- wsConn.maxMsgLen = maxMsgLen
- if isBinary {
- wsConn.messageType = websocket.BinaryMessage
- } else {
- wsConn.messageType = websocket.TextMessage
- }
- go func() {
- for b := range wsConn.writeChan {
- if b == nil {
- log.Debug("exiting write routine by nil message")
- break
- }
- err := conn.WriteMessage(wsConn.messageType, b)
- if err != nil {
- log.Debug("exiting write routine by WriteMessage error %v", err)
- break
- }
- }
- conn.Close()
- wsConn.Lock()
- log.Debug("exiting write routine")
- wsConn.closeFlag = true
- wsConn.Unlock()
- }()
- return wsConn
- }
- func (wsConn *WSConn) doDestroy() {
- //wsConn.conn.UnderlyingConn().(*net.TCPConn).SetLinger(0)
- wsConn.conn.Close()
- if !wsConn.closeFlag {
- close(wsConn.writeChan)
- wsConn.closeFlag = true
- }
- }
- func (wsConn *WSConn) Destroy() {
- wsConn.Lock()
- defer wsConn.Unlock()
- wsConn.doDestroy()
- }
- func (wsConn *WSConn) Close() {
- wsConn.Lock()
- defer wsConn.Unlock()
- if wsConn.closeFlag {
- return
- }
- wsConn.doWrite(nil)
- log.Debug("wsConn.Close()")
- wsConn.closeFlag = true
- }
- func (wsConn *WSConn) doWrite(b []byte) {
- if len(wsConn.writeChan) == cap(wsConn.writeChan) {
- log.Debug("close conn: channel full")
- wsConn.doDestroy()
- return
- }
- wsConn.writeChan <- b
- }
- func (wsConn *WSConn) LocalAddr() net.Addr {
- return wsConn.conn.LocalAddr()
- }
- func (wsConn *WSConn) RemoteAddr() net.Addr {
- return wsConn.conn.RemoteAddr()
- }
- // goroutine not safe
- func (wsConn *WSConn) ReadMsg() ([]byte, error) {
- _, b, err := wsConn.conn.ReadMessage()
- return b, err
- }
- func (wsConn *WSConn) WriteData(b []byte) error {
- wsConn.Lock()
- defer wsConn.Unlock()
- if wsConn.closeFlag {
- //log.Debug("WriteData closing %s", string(b))
- //log.Debug("%s", debug.Stack())
- return errors.New("closing")
- }
- wsConn.doWrite(b)
- return nil
- }
- // args must not be modified by the others goroutines
- // 暂时不考虑粘包问题
- func (wsConn *WSConn) writeMsg(args ...[]byte) error {
- wsConn.Lock()
- defer wsConn.Unlock()
- if wsConn.closeFlag {
- return nil
- }
- // get len
- var msgLen uint32
- for i := 0; i < len(args); i++ {
- msgLen += uint32(len(args[i]))
- }
- // check len
- if msgLen > wsConn.maxMsgLen {
- return errors.New("message too long")
- } else if msgLen < 1 {
- return errors.New("message too short")
- }
- // don't copy
- if len(args) == 1 {
- wsConn.doWrite(args[0])
- return nil
- }
- // merge the args
- msg := make([]byte, msgLen)
- l := 0
- for i := 0; i < len(args); i++ {
- copy(msg[l:], args[i])
- l += len(args[i])
- }
- wsConn.doWrite(msg)
- return nil
- }
|