亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

簡單RPC框架-基于Consul的服務注冊與發現

發布時間:2020-07-17 18:36:58 來源:網絡 閱讀:1055 作者:jjjssswww 欄目:網絡安全

一般我們常見的RPC框架都包含如下三個部分:

簡單RPC框架-基于Consul的服務注冊與發現

  • 注冊中心,用于服務端注冊遠程服務以及客戶端發現服務

  • 服務端,對外提供后臺服務,將自己的服務信息注冊到注冊中心

  • 客戶端,從注冊中心獲取遠程服務的注冊信息,然后進行遠程過程調用
    上面提到的注冊中心其實屬于服務治理,即使沒有注冊中心,RPC的功能也是完整的。之前我大多接觸的是基于zookeeper的注冊中心,這里基于consul來實現注冊中心的基本功能。

Consul的一些特點:

  • Raft相比Paxos直接

此外不多描述,還沒研究raft

  • 支持數據中心,可以用來解決單點故障之類的問題

  • 集成相比zookeeper更加簡單(代碼量少,邏輯清晰簡單)

  • 支持健康檢查,支持http以及tcp

  • 自帶UI管理功能,不需要額外第三方支持。(zookeeper需要單獨部署zkui之類的第三方工具)

  • 支持key/value存儲

啟動consul之后訪問管理頁面

簡單RPC框架-基于Consul的服務注冊與發現

RPC集成

提取出服務注冊與服務發現兩個接口,然后使用Consul實現,這里主要通過consul-client來實現(也可以是consul-api),需要在pom中引入:

<dependency>
	<groupId>com.orbitz.consul</groupId>
	<artifactId>consul-client</artifactId>
	<version>0.14.1</version>
</dependency>

服務注冊

  • RegistryService
    提供服務的注冊與刪除功能

public interface RegistryService {    void register(RpcURL url);    void unregister(RpcURL url);
}
  • AbstractConsulService
    consul的基類,用于構建Consl對象,服務于服務端以及客戶端。

public class AbstractConsulService {    private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class);    protected final static String CONSUL_NAME="consul_node_jim";    protected final static String CONSUL_ID="consul_node_id";    protected final static String CONSUL_TAGS="v3";    protected final static String CONSUL_HEALTH_INTERVAL="1s";    protected Consul buildConsul(String registryHost, int registryPort){        return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
    }
}
  • ConsulRegistryService
    服務注冊實現類,在注冊服務的同時,指定了健康檢查。

服務的刪除暫時未實現

public class ConsulRegistryService extends AbstractConsulService implements RegistryService {    private final static int CONSUL_CONNECT_PERIOD=1*1000;    @Override
    public void register(RpcURL url) {        Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort());        AgentClient agent = consul.agentClient();        ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();        ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
        builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);

        agent.register(builder.build());

    }    @Override
    public void unregister(RpcURL url) {

    }

}

由于我實現的RPC是基于TCP的,所以服務注冊的健康檢查也指定為TCP,consul會按指定的IP以及端口建立連接以此判斷服務的健康狀態。如果是http,則需要調用http方法,同時指定健康檢查地址。

ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();

后臺的監控信息如下:

簡單RPC框架-基于Consul的服務注冊與發現

雖然只是指定了TCP,可能出于某種機制后臺依然會發起HTTP的健康檢查請求,上圖第一條請求日志。

服務發現

  • DiscoveryService
    獲取所有注冊的有效的服務信息。

public interface DiscoveryService {    List<RpcURL> getUrls(String registryHost, int registryPort);
}
  • ConsulDiscoveryService
    首先是獲取有效的服務列表:

List<RpcURL> urls= Lists.newArrayList();Consul consul = this.buildConsul(registryHost,registryPort);HealthClient client = consul.healthClient();String name = CONSUL_NAME;ConsulResponse object= client.getAllServiceInstances(name);List<ImmutableServiceHealth> serviceHealths=(List<ImmutableServiceHealth>)object.getResponse();for(ImmutableServiceHealth serviceHealth:serviceHealths){    RpcURL url=new RpcURL();
    url.setHost(serviceHealth.getService().getAddress());
    url.setPort(serviceHealth.getService().getPort());
    urls.add(url);
}

服務更新監聽,當可用服務列表發現變化時需要通知調用端。

try {    ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
    serviceHealthCache.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {        @Override
        public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
            logger.info("serviceHealthCache.addListener notify");            RpcClientInvokerCache.clear();

        }
    });
    serviceHealthCache.start();
} catch (Exception e) {
    logger.info("serviceHealthCache.start error:",e);
}

由于之前對客戶端的Invoker有緩存,所以當服務列表有變化時需要對緩存信息進行更新。

這里簡單的直接對緩存做清除處理,其實好一點的方法應該只對有變化的做處理。

  • RpcClientInvokerCache
    對客戶端實例化后的Invoker的緩存類

public class RpcClientInvokerCache {    private static CopyOnWriteArrayList<RpcClientInvoker> connectedHandlers = new CopyOnWriteArrayList<>();    public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlersClone(){        return (CopyOnWriteArrayList<RpcClientInvoker>) RpcClientInvokerCache.getConnectedHandlers().clone();
    }    public static void addHandler(RpcClientInvoker handler) {        CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
        newHandlers.add(handler);
        connectedHandlers=newHandlers;
    }    public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlers(){        return connectedHandlers;
    }    public static RpcClientInvoker get(int i){        return connectedHandlers.get(i);
    }    public static int size(){        return connectedHandlers.size();
    }    public static void clear(){        CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
        newHandlers.clear();
        connectedHandlers=newHandlers;
    }
}
  • 負載均衡
    當同一個接口有多個服務同時提供服務時,客戶端需要有一定的負載均衡機制去決策將客戶端的請求分配給哪一臺服務器,這里實現一個簡易的輪詢實現方式。請求次數累加,累加的值與服務列表的大小做取模操作。

代碼中取服務列表的方法有小問題,未按接口信息取,后續再完成

public class RoundRobinLoadbalanceService implements LoadbalanceService {    private AtomicInteger roundRobin = new AtomicInteger(0);    private static final int MAX_VALUE=1000;    private static final int MIN_VALUE=1;    private AtomicInteger getRoundRobinValue(){        if(this.roundRobin.getAndAdd(1)>MAX_VALUE){            this.roundRobin.set(MIN_VALUE);
        }        return this.roundRobin;
    }    @Override
    public int index(int size) {        return  (this.getRoundRobinValue().get() + size) % size;
    }
}


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

sa rp
AI

黄冈市| 利辛县| 明光市| 浠水县| 渭南市| 香河县| 永丰县| 利辛县| 辽中县| 顺平县| 手机| 尼玛县| 东海县| 泽普县| 来安县| 电白县| 宕昌县| 德清县| 驻马店市| 江山市| 东兰县| 大荔县| 南涧| 玛纳斯县| 保德县| 军事| 临高县| 金塔县| 文昌市| 建瓯市| 广宗县| 太保市| 阳春市| 东至县| 大荔县| 鄂州市| 宣化县| 德州市| 中西区| 萝北县| 云阳县|