亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java?Dubbo協議下的服務端線程如何使用

發布時間:2023-03-01 11:32:35 來源:億速云 閱讀:119 作者:iii 欄目:開發技術

本篇內容主要講解“Java Dubbo協議下的服務端線程如何使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java Dubbo協議下的服務端線程如何使用”吧!

Provider端線程模型

在了解服務端線程模型之前,先了解一下Dubbo對Channel上的操作抽象,Dubbo將Channel上的操作成了5中行為,分別是:建立連接、斷開連接、發送消息、接收消息、異常捕獲,Channel上的操作的接口為org.apache.dubbo.remoting.ChannelHandler,該接口是SPI的,用戶可以自己擴展,接口代碼如下:

該接口抽象的五種Channel上的行為解釋如下:

  • 建立連接:connected,主要是的職責是在channel記錄read、write的時間,以及處理建立連接后的回調邏輯,比如dubbo支持在斷開后自定義回調的hook(onconnect),即在該操作中執行。

  • 斷開連接:disconnected,主要是的職責是在channel移除read、write的時間,以及處理端開連接后的回調邏輯,比如dubbo支持在斷開后自定義回調的hook(ondisconnect),即在該操作中執行。

  • 發送消息:sent,包括發送請求和發送響應。記錄write的時間。

  • 接收消息:received,包括接收請求和接收響應。記錄read的時間。

  • 異常捕獲:caught,用于處理在channel上發生的各類異常。

Dubbo框架的線程模型與以上這五種行為息息相關,Dubbo協議Provider端線程模型提供了五種實現,雖說都是五種但是別把二者混淆,線程模型的頂級接口是org.apache.dubbo.remoting.Dispatcher,該接口也是SPI的,提供的五種實現分別是AllDispatcherDirectDispatcherMessageOnlyDispatcherExecutionDispatcherConnectionOrderedDispatcher,默認的使用的是AllDispatcher

Java?Dubbo協議下的服務端線程如何使用

org.apache.dubbo.remoting.ChannelHandler作為Channel上的行為的頂級接口對應Dubbo協議Provider端的5種線程模型同樣也提供了對應的5種實現,分別是AllChannelHandlerDirectChannelHandlerMessageOnlyChannelHandlerExecutionChannelHandlerConnectionOrderedChannelHandler,這里Channel上行為的具體實現不展開討論。

Java?Dubbo協議下的服務端線程如何使用

Channel上行為和線程模型之間使用策略可以參考org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers的源代碼,這里不做詳細的介紹,下面的各個章節只針對5種線程模型做簡單的介紹。

AllDispatcher

IO線程上的操作:

  • 接口響應序列化

  • sent操作

Dubbo線程池上的操作:

  • received、connected、disconnected、caught都是在Dubbo線程池上執行

  • 服務端反序列化操作的Dubbo線程池上執行

AllDispatcher代碼如下,AllDispatcherdispatch方法實例化了AllChannelHandlerAllChannelHandler實現了received、connected、disconnected、caught操作在dubbo線程池中,代碼如下:

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}
public class AllChannelHandler extends WrappedChannelHandler {
    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

DirectDispatcher

該線程模型Channel上的所有行為均在IO線程中執行,并沒有在Dubbo線程池中執行

DirectDispatcherAllDispatcher相似,實例化了DirectChannelHandlerDirectChannelHandler只實現了received行為,但是received中獲取的線程池如果是ThreadlessExecutor才會提交task,否則也是在ChannelHandler中執行received行為,ThreadlessExecutor和普通線程池最大的區別是不會管理任何線程,這里不展開討論。

public class DirectDispatcher implements Dispatcher {
    public static final String NAME = "direct";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new DirectChannelHandler(handler, url);
    }
}
public class DirectChannelHandler extends WrappedChannelHandler {
    public DirectChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        if (executor instanceof ThreadlessExecutor) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
}

ExecutionDispatcher

在IO線程中執行的操作有:

  • sent、connected、disconnected、caught操作在IO線程上執行。

  • 序列化響應在IO線程上執行。

在Dubbo線程中執行的操作有:

  • received都是在Dubbo線程上執行的。

  • 反序列化請求的行為在Dubbo中做的。

同樣的,我們可以直接看ExecutionChannelHandler源碼,邏輯是當message的類型是Request時received行為在Dubbo線程池執行。感興趣的可以自己看源碼,這里不做介紹。

MessageOnlyDispatcher

Message Only Dispatcher所有的received行為和反序列化都是在dubbo線程池中執行的

public class MessageOnlyChannelHandler extends WrappedChannelHandler {
    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

ConnectionOrderedDispatcher

該線程模型與AllDispatcher類似,sent操作和相應的序列化是在IO線程上執行;connected、disconnected、received、caught操作在dubbo線程池上執行,他們的區別是在connected、disconnected行為上ConnectionOrderedDispatcher做了線程池隔離,并且在Dubbo connected thread pool中提供了鏈接限制、告警燈能力,我們直接看ConnectionOrderedChannelHandler源碼,代碼如下:

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
    protected final ThreadPoolExecutor connectionExecutor;
    private final int queueWarningLimit;
    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }
    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queueWarningLimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit));
        }
    }
}

到此,相信大家對“Java Dubbo協議下的服務端線程如何使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

庆城县| 高州市| 资兴市| 乌拉特中旗| 高唐县| 蒙自县| 永嘉县| 安仁县| 嘉定区| 济阳县| 东海县| 涡阳县| 宜州市| 虞城县| 营口市| 策勒县| 长丰县| 安陆市| 普定县| 多伦县| 泰宁县| 兰考县| 邮箱| 太谷县| 石门县| 林周县| 新和县| 平乡县| 望都县| 文水县| 南投县| 陈巴尔虎旗| 泗洪县| 娱乐| 曲阳县| 隆尧县| 静乐县| 伊吾县| 大理市| 神农架林区| 平乡县|