| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package common
- import (
- "bet24.com/log"
- //cclient "github.com/rpcxio/rpcx-consul/client"
- "github.com/smallnest/rpcx/client"
- "sync"
- )
- type rpc_client struct {
- xclient client.XClient
- //discovery client.ServiceDiscovery
- plugin client.Plugin
- }
- func (rc *rpc_client) close() {
- if rc.plugin != nil {
- if rc.xclient != nil {
- rc.xclient.GetPlugins().Remove(rc.plugin)
- }
- rc.plugin = nil
- }
- /*
- if rc.discovery != nil {
- rc.discovery.Close()
- rc.discovery = nil
- }*/
- if rc.xclient != nil {
- rc.xclient.Close()
- rc.xclient = nil
- }
- }
- type clientpool struct {
- clients map[string]*rpc_client
- lock *sync.RWMutex
- }
- var mgr *clientpool
- func GetClientPool() *clientpool {
- if mgr == nil {
- mgr = new(clientpool)
- mgr.ctor()
- }
- return mgr
- }
- func DumpClientPools() {
- GetClientPool().dump()
- }
- func (cm *clientpool) ctor() {
- cm.clients = make(map[string]*rpc_client)
- cm.lock = &sync.RWMutex{}
- }
- func (cm *clientpool) GetClient(serviceName string, consulAddr string) client.XClient {
- cm.lock.Lock()
- defer cm.lock.Unlock()
- ret, ok := cm.clients[serviceName]
- if !ok || ret == nil {
- ret = cm.openConnection(serviceName, consulAddr)
- }
- cm.clients[serviceName] = ret
- return ret.xclient
- }
- func (cm *clientpool) RemoveClient(serviceName string) {
- cm.lock.Lock()
- ret, _ := cm.clients[serviceName]
- if ret != nil {
- go ret.close()
- delete(cm.clients, serviceName)
- }
- cm.lock.Unlock()
- }
- func (cm *clientpool) openConnection(serviceName string, consulAddr string) *rpc_client {
- //log.Debug("openConnection %s", serviceName)
- //d, err := cclient.NewConsulDiscovery(Service_base_path, serviceName, []string{consulAddr}, nil)
- d := getDiscovery(serviceName, consulAddr)
- if d == nil {
- log.Release("clientpool.openConnection [%s] [%s] failed", serviceName, consulAddr)
- d1, _ := client.NewPeer2PeerDiscovery("localhost:8600", "")
- defer d1.Close()
- xc := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d1, client.DefaultOption)
- return &rpc_client{xclient: xc}
- }
- xc := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
- plugin := newClientEvent(serviceName)
- xc.GetPlugins().Add(plugin)
- return &rpc_client{xclient: xc, plugin: plugin}
- }
- func (cm *clientpool) dump() {
- cm.lock.RLock()
- log.Release(" Total Count:%d", len(cm.clients))
- for k := range cm.clients {
- log.Release(" %s", k)
- }
- cm.lock.RUnlock()
- }
|