clientpool.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package common
  2. import (
  3. "bet24.com/log"
  4. //cclient "github.com/rpcxio/rpcx-consul/client"
  5. "github.com/smallnest/rpcx/client"
  6. "sync"
  7. )
  8. type rpc_client struct {
  9. xclient client.XClient
  10. //discovery client.ServiceDiscovery
  11. plugin client.Plugin
  12. }
  13. func (rc *rpc_client) close() {
  14. if rc.plugin != nil {
  15. if rc.xclient != nil {
  16. rc.xclient.GetPlugins().Remove(rc.plugin)
  17. }
  18. rc.plugin = nil
  19. }
  20. /*
  21. if rc.discovery != nil {
  22. rc.discovery.Close()
  23. rc.discovery = nil
  24. }*/
  25. if rc.xclient != nil {
  26. rc.xclient.Close()
  27. rc.xclient = nil
  28. }
  29. }
  30. type clientpool struct {
  31. clients map[string]*rpc_client
  32. lock *sync.RWMutex
  33. }
  34. var mgr *clientpool
  35. func GetClientPool() *clientpool {
  36. if mgr == nil {
  37. mgr = new(clientpool)
  38. mgr.ctor()
  39. }
  40. return mgr
  41. }
  42. func DumpClientPools() {
  43. GetClientPool().dump()
  44. }
  45. func (cm *clientpool) ctor() {
  46. cm.clients = make(map[string]*rpc_client)
  47. cm.lock = &sync.RWMutex{}
  48. }
  49. func (cm *clientpool) GetClient(serviceName string, consulAddr string) client.XClient {
  50. cm.lock.Lock()
  51. defer cm.lock.Unlock()
  52. ret, ok := cm.clients[serviceName]
  53. if !ok || ret == nil {
  54. ret = cm.openConnection(serviceName, consulAddr)
  55. }
  56. cm.clients[serviceName] = ret
  57. return ret.xclient
  58. }
  59. func (cm *clientpool) RemoveClient(serviceName string) {
  60. cm.lock.Lock()
  61. ret, _ := cm.clients[serviceName]
  62. if ret != nil {
  63. go ret.close()
  64. delete(cm.clients, serviceName)
  65. }
  66. cm.lock.Unlock()
  67. }
  68. func (cm *clientpool) openConnection(serviceName string, consulAddr string) *rpc_client {
  69. //log.Debug("openConnection %s", serviceName)
  70. //d, err := cclient.NewConsulDiscovery(Service_base_path, serviceName, []string{consulAddr}, nil)
  71. d := getDiscovery(serviceName, consulAddr)
  72. if d == nil {
  73. log.Release("clientpool.openConnection [%s] [%s] failed", serviceName, consulAddr)
  74. d1, _ := client.NewPeer2PeerDiscovery("localhost:8600", "")
  75. defer d1.Close()
  76. xc := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d1, client.DefaultOption)
  77. return &rpc_client{xclient: xc}
  78. }
  79. xc := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
  80. plugin := newClientEvent(serviceName)
  81. xc.GetPlugins().Add(plugin)
  82. return &rpc_client{xclient: xc, plugin: plugin}
  83. }
  84. func (cm *clientpool) dump() {
  85. cm.lock.RLock()
  86. log.Release(" Total Count:%d", len(cm.clients))
  87. for k := range cm.clients {
  88. log.Release(" %s", k)
  89. }
  90. cm.lock.RUnlock()
  91. }