亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

dubbo怎么實現consumer從多個group中調用指定group的provider

發布時間:2023-03-21 11:30:58 來源:億速云 閱讀:114 作者:iii 欄目:開發技術

本篇內容主要講解“dubbo怎么實現consumer從多個group中調用指定group的provider”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“dubbo怎么實現consumer從多個group中調用指定group的provider”吧!

    背景

    在工作中,遇到這樣的場景:

    有個es索引構建服務,需要從各個業務服務獲取索引的信息,從而構建索引,業務服務都實現同一個接口IndexInfoProvider,通過設置不同的group來達到區分的效果(group就是es索引名)。

    索引構建服務在內存維護了一個Map<String, IndexInfoProvider> providerMap,key是索引名&mdash;&mdash;也就是provider的group,value是IndexInfoProvider服務的consumer。

    為了圖方便,索引構建服務還一次性初始化了所有consumer,假設總共有200個分組,那么Map就會緩存200個consumer,初始化的時候巨慢。如果不一次性初始化,而是按需的話,代碼會變得復雜一些,可能需要像雙重檢驗這樣的措施。而且因為是緩存,必然也會遇到緩存一致性的問題,例如新增一個索引。

    基于這樣的問題,就想著,dubbo的一個consumer,能不能按需調用指定分組的provider呢~

    過程

    為什么必須緩存那么多consumer

    為什么有200個分組,就要緩存200個consumer,而不是像我們平時那樣,1個就可以呢?

    dubbo怎么實現consumer從多個group中調用指定group的provider

    業務服務在實現IndexInfoProvider接口的時候,都指定是分組,而且分組名,就是對應的索引名。

    相應的,消費者就必須指定消費的分組,200個索引,自然就需要200個consumer。

    consumer只能調用一個group的provider么

    在我們日常的使用中,consumer基本都是只能調用一個group的provider。但實際上,dubbo是支持調用多個group的provider的。

    dubbo怎么實現consumer從多個group中調用指定group的provider

    這樣寫是指定a分組和b分組,多個分組之間用英文逗號分隔

    dubbo怎么實現consumer從多個group中調用指定group的provider

    這樣寫,是所有的分組

    group="*"能代替Map<String, IndexInfoProvider>么

    如果將es索引構建服務的IndexInfoProvider接口的consumer分組設置為group="*",然后在調用接口的時候,根據需要,從一堆provider中篩選出指定group的provider,只調用這些provider,是不是就可以代替Map緩存了?

    初步方案:

    • 利用ThreadLocal來指定要調用的分組,在調用方法前設置group到ThreadLocal中。

    • 實現dubbo的負載均衡拓展,只選取指定分組的節點來調用。

    • 調用結束,清理ThreadLocal中的分組信息。

    似乎是可行的?

    理想很美好,現實很骨感

    通過源碼和代碼debug發現,consumer持有的Invoker,有點像一個責任鏈。

    如果是單分組&mdash;&mdash;!group.contains(",") && !group.equals("*"),那么會是這樣:

    MockClusterInvoker->FailoverClusterInvoker

    如果是多分組,則會變成:

    MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker

    負載均衡的機制,是得到了FailoverClusterInvoker才會生效

    MockClusterInvoker只是一種降級機制,不是導致問題的原因

    問題是出在MergeableClusterInvoker,下面是導致問題的代碼!!!

    dubbo怎么實現consumer從多個group中調用指定group的provider

    代碼大概的意思是:消費者沒有指定合并策略,那么就會調用第一個有效的服務提供者,如果都是無效,就直接調用第一個。

    我是沒有指定合并策略的,所以會變成調用第一個有效的服務提供者。根本走不到負載均衡那里。

    不指定合并策略不行,那指定呢?指定合并策略會怎樣呢?

    dubbo怎么實現consumer從多個group中調用指定group的provider

    dubbo怎么實現consumer從多個group中調用指定group的provider

    指定了合并策略,會調用所有invoker,然后用Merger合并結果。

    這種很適合這樣種場景:

    一部分數據在服務A,一部分數據在服務B,需要分別調用A和B,然后把兩者的結果集合并

    設想著這樣的方案:

    • 指定了合并策略,那么所有分組的invoker都會被調用到,那么請求就到FailoverClusterInvoker了,負載均衡spi就生效了

    • 修改一下負載均衡spi,如果目前調用的invoker不是指定分組的,那么就直接返回null

    • 實現自己的合并spi,返回第一個不會null的結果

    雖然不是什么優雅的方式,但是,似乎是能做到的???

    實際上,行不通,達不到想要的效果。原因是MergeableClusterInvoker內部是用線程池并發調用的,ThreadLocal里的分組信息會丟失

    dubbo怎么實現consumer從多個group中調用指定group的provider

    還有機會么

    MergeableClusterInvoker類是沒有留拓展的余地啦,還有其他機會么?MergeableClusterInvoker的Invokers列表從哪里來的?

    dubbo怎么實現consumer從多個group中調用指定group的provider

    Invokers是從directory來的,這里有沒有拓展的余地呢?

    有的,在AbstractDirectory的list方法

    dubbo怎么實現consumer從多個group中調用指定group的provider

    看到這,發現用路由spi,還是有機會的,如果實現動態路由,每次只給MergeableClusterInvoker返回指定分組的Invoker,是不是就可以呢?怎么讓routers包含我們自定義的路由spi呢?

    dubbo怎么實現consumer從多個group中調用指定group的provider

    需要url上攜帶了router參數,但是這里的url的參數是由誰決定的?@Reference 注解是沒有沒有的指定router參數的&hellip;

    最后debuig,兜兜轉轉,發現只能靠重寫ReferenceConfig類的loadRegistries方法,往url上加上動態路由的參數。

    dubbo怎么實現consumer從多個group中調用指定group的provider

    成果

    AssignGroupRouterFactory

    public class AssignGroupRouterFactory implements RouterFactory {
        public static final String NAME = "assignGroup";
    
        @Override
        public Router getRouter(URL url) {
            return new AssignGroupRouter(url);
        }
    }

    AssignGroupRouter

    public class AssignGroupRouter implements Router {
        private final URL url;
    
        public AssignGroupRouter(URL url) {
            this.url = url;
        }
    
        @Override
        public URL getUrl() {
            return url;
        }
    
        @Override
        public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
            String assignGroup = IndexInfoProviderInvoker.getAssignGroup();
            if (Objects.isNull(assignGroup)) {
                return invokers;
            }
            return invokers.stream()
                .filter(invoker -> {
                    URL invokerUrl = invoker.getUrl();
                    return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY));
                }).collect(Collectors.toList());
        }
    
        @Override
        public int compareTo(Router o) {
            return 1;
        }
    }

    AssignGroupReferenceConfig

    public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> {
        @Override
        protected List<URL> loadRegistries(boolean provider) {
            List<URL> urls = super.loadRegistries(provider);
            if (CollectionUtils.isEmpty(urls)) {
                return urls;
            }
            // 指定動態路由,路由方式為assignGroup
            return urls.stream()
                .map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY,
                    "true"))
                .collect(Collectors.toList());
        }
    }

    IndexInfoProviderInvokerProxy

    @Component
    public class IndexInfoProviderInvokerProxy {
        public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>();
    
        private final IndexInfoProvider indexInfoProvider;
    
        public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) {
            this.indexInfoProvider = indexInfoProvider;
        }
    
        /**
         * 獲取文檔id 在[start, end) 之間的所有索引文檔
         *
         * @param indexType 索引名
         * @param start 起始id
         * @param end 結束id
         * @return 文檔列表
         */
        public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) {
            return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end));
        }
    
        private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) {
            INDEX_TYPE_THREAD_LOCAL.set(indexType);
            T result = supplier.get();
            INDEX_TYPE_THREAD_LOCAL.remove();
            return result;
        }
    }

    到此,相信大家對“dubbo怎么實現consumer從多個group中調用指定group的provider”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    太湖县| 若尔盖县| 双桥区| 邵武市| 五常市| 杨浦区| 寿阳县| 静海县| 大荔县| 长垣县| 滦平县| 白玉县| 闸北区| 九龙坡区| 昔阳县| 洪湖市| 拜城县| 金湖县| 遵义市| 贵德县| 芷江| 青田县| 安丘市| 南华县| 静宁县| 甘肃省| 孟村| 仲巴县| 天峨县| 华容县| 白城市| 宝山区| 嘉峪关市| 和田市| 岗巴县| 新蔡县| 宽甸| 柳州市| 安福县| 商城县| 东阳市|