common.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package common
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net"
  6. "time"
  7. "bet24.com/log"
  8. "bet24.com/servers/monitor"
  9. "bet24.com/utils"
  10. consulapi "github.com/hashicorp/consul/api"
  11. "github.com/rcrowley/go-metrics"
  12. "github.com/rpcxio/rpcx-consul/serverplugin"
  13. "github.com/smallnest/rpcx/server"
  14. )
  15. const Service_base_path = "bet24.com"
  16. const Default_Consul_Addr = "localhost:8500"
  17. const port_monitor_delta = 1000
  18. var _port int
  19. var _server *server.Server
  20. var serviceId string
  21. var consul_client *consulapi.Client
  22. func RunService(name string, rcvr interface{}, consulAddr string) {
  23. run(name, rcvr, "8.141.80.205:8500", true, 0, fmt.Sprintf("log/%s", name))
  24. }
  25. func RunServiceWithPort(name string, rcvr interface{}, consulAddr string, port int, logFilePath string) {
  26. run(name, rcvr, "localhost:8500", true, port, logFilePath)
  27. }
  28. func RunWithoutChangeTitle(name string, rcvr interface{}, consulAddr string) *server.Server {
  29. return run(name, rcvr, "localhost:8500", false, 0, "")
  30. }
  31. func run(name string, rcvr interface{}, consulAddr string, changeTitle bool, port int, logFilePath string) *server.Server {
  32. //utils.SetErrorFile(fmt.Sprintf("log/%s/err.log", name), fmt.Sprintf("%s starting", name))
  33. rand.Seed(time.Now().UnixNano())
  34. if port == 0 {
  35. port = getRandomPort()
  36. }
  37. serviceId = fmt.Sprintf("%s_%s_%d", name, getLocalIp(), port)
  38. local_server := server.NewServer()
  39. if !addRegistryPlugin(local_server, name, consulAddr, port) {
  40. return nil
  41. }
  42. local_server.RegisterName(name, rcvr, "")
  43. if changeTitle {
  44. monitorPort := port + port_monitor_delta
  45. if port == 5000 {
  46. monitorPort = 5100
  47. }
  48. monitor.Run(monitorPort, logFilePath)
  49. utils.SetConsoleTitle(fmt.Sprintf("[S]%s running in port[%d] monitor[%d]", name, port, monitorPort))
  50. _server = local_server
  51. local_server.Serve("tcp", getAddress(port))
  52. _port = port
  53. } else {
  54. go local_server.Serve("tcp", getAddress(port))
  55. log.Release("side service [%s] running in port[%d]", name, port)
  56. }
  57. return local_server
  58. }
  59. func GetServer() *server.Server {
  60. return _server
  61. }
  62. func DeregisterService(name string) {
  63. if consul_client != nil {
  64. consul_client.Agent().ServiceDeregister(serviceId)
  65. }
  66. if _server != nil {
  67. _server.UnregisterAll()
  68. utils.SetConsoleTitle(fmt.Sprintf("Service[%s] stopping in port[%d]", name, _port))
  69. } else {
  70. log.Release("DeregisterService instance not found")
  71. }
  72. }
  73. func getRandomPort() int {
  74. var port int
  75. tryMax := 100
  76. for i := 0; i < tryMax; i++ {
  77. port = 6000 + rand.Intn(1000)
  78. if !utils.CheckPortInUse(port) {
  79. return port
  80. }
  81. }
  82. log.Release("getRandomPort failed")
  83. port = 6999
  84. return port
  85. }
  86. func GetLocalIp() string {
  87. return getLocalIp()
  88. }
  89. func GetServicePort() int {
  90. return _port
  91. }
  92. func getLocalIp() string {
  93. addrs, err := net.InterfaceAddrs()
  94. if err != nil {
  95. fmt.Println(err)
  96. return ""
  97. }
  98. //fmt.Println(addrs)
  99. for _, address := range addrs {
  100. // 检查ip地址判断是否回环地址
  101. if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
  102. if ipnet.IP.To4() != nil {
  103. return ipnet.IP.String()
  104. }
  105. }
  106. }
  107. return ""
  108. }
  109. func getServiceAddress(port int) string {
  110. return fmt.Sprintf("tcp@%s:%d", getLocalIp(), port)
  111. }
  112. func getAddress(port int) string {
  113. return fmt.Sprintf(":%d", port)
  114. }
  115. func addRegistryPlugin(s *server.Server, name string, consulAddr string, port int) bool {
  116. r := &serverplugin.ConsulRegisterPlugin{
  117. ServiceAddress: getServiceAddress(port),
  118. ConsulServers: []string{consulAddr},
  119. BasePath: Service_base_path,
  120. Metrics: metrics.NewRegistry(),
  121. UpdateInterval: time.Second * 10,
  122. }
  123. err := r.Start()
  124. if err != nil {
  125. log.Release("%v", err)
  126. return false
  127. }
  128. s.Plugins.Add(r)
  129. registerConsul(consulAddr, name, port)
  130. return true
  131. }
  132. func registerConsul(consulAddress string, name string, port int) {
  133. config := consulapi.DefaultConfig()
  134. config.Address = consulAddress
  135. client, err := consulapi.NewClient(config)
  136. if err != nil {
  137. fmt.Println("consul client error : ", err)
  138. return
  139. }
  140. // 创建注册到consul的服务到
  141. registration := new(consulapi.AgentServiceRegistration)
  142. registration.Address = getLocalIp()
  143. registration.ID = serviceId
  144. registration.Name = name
  145. registration.Port = port
  146. // 增加consul健康检查回调函数
  147. check := new(consulapi.AgentServiceCheck)
  148. check.TCP = fmt.Sprintf("%s:%d", registration.Address, registration.Port)
  149. check.Timeout = "10s" //超时
  150. check.Interval = "5s" //健康检查频率
  151. check.DeregisterCriticalServiceAfter = "30s" // 故障检查失败30s后 consul自动将注册服务删除
  152. registration.Check = check
  153. client.Agent().ServiceRegister(registration)
  154. consul_client = client
  155. }