您好,登錄后才能下訂單哦!
這篇文章主要介紹了Reactor模型如何實現的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Reactor模型如何實現文章都會有所收獲,下面我們一起來看看吧。
Reactor翻譯過來的意思是:反應堆,所以Reactor設計模式本質是基于事件驅動的。在Reactor設計模式中,存在如下幾個角色。
Handle(事件)。Reactor整體是基于Handle進行驅動,這里的Handle叫做事件,可以類比為BIO中的Socket,NIO中的Socket管道。比如當Socket管道有連接建立,或者有數據可讀,那么此時就稱作事件發生;
EventHandler(事件處理器)。有事件發生,就需要有相應的組件來處理事件,那么這里的組件就叫做事件處理器。EventHandler是一個抽象概念,其會有不同的具體實現,因為事件會有不同的類型,那么不同類型的事件,肯定都需要有相應的具體處理邏輯,這里的具體處理邏輯,就由EventHandler的具體實現來承載;
Concrete Event Handler(具體事件處理器)。是EventHandler的具體實現,用于處理不同類型的事件;
Synchronous Event Demultiplexer(事件多路分解器)。(這里將Synchronous Event Demultiplexer簡稱為Demultiplexer)Demultiplexer用于監聽事件并得到所有發生事件的集合,在監聽的狀態下是阻塞的,直到有事件發生為止。Demultiplexer有一個很好的類比,就是NIO中的多路復用器Selector,當調用Selector的select() 方法后,會進入監聽狀態,當從select() 方法返回時,會得到SelectionKey的一個集合,而每一個SelectionKey中就保存著有事件發生的Socket管道;
Initiation Dispatcher(事件分發器)。現在已經有Concrete Event Handler(具體事件處理器)來處理不同的事件,也能通過Synchronous Event Demultiplexer(事件多路分解器)拿到發生的事件,那么最后需要做的事情,肯定就是將事件分發到正確的事件處理器上進行處理,而Initiation Dispatcher就是完成這個分發的事情。
Reactor設計模式的一個簡單類圖,如下所示。
通常,Reactor設計模式中的Reactor,可以理解為上述圖中的Synchronous Event Demultiplexer + Initiation Dispatcher。
單Reactor單線程模型中,只有一個Reactor在監聽事件和分發事件,并且監聽事件,分發事件和處理事件都在一個線程中完成。示意圖如下所示。
上述示意圖中,一次完整的處理流程可以概括如下。
Reactor監聽到ACCEPT事件發生,表示此時有客戶端建立連接;
Reactor將ACCEPT事件分發給Acceptor處理;
Acceptor會在服務端創建與客戶端通信的client-socket管道,然后注冊到IO多路復用器selector上,并監聽READ事件;
Reactor監聽到READ事件發生,表示此時客戶端數據可讀;
Reactor將ACCEPT事件分發給Handler處理,Handler處理READ事件就會基于client-socket管道完成客戶端數據的讀取。
下面將基于Java語言,實現一個簡單的單Reactor單線程模型的服務端,整體代碼實現完全符合上述示意圖,大家可以進行參照閱讀。
首先實現Reactor,如下所示。
public class Reactor implements Runnable { private final Selector selector; public Reactor(int port) throws IOException { // 開啟多路復用 selector = Selector.open(); // 服務端創建listen-socket管道 ServerSocketChannel listenSocketChannel = ServerSocketChannel.open(); // 綁定端口 listenSocketChannel.socket().bind(new InetSocketAddress(port)); // 設置為非阻塞模式 listenSocketChannel.configureBlocking(false); // ACCEPT事件的附加器是Acceptor listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, listenSocketChannel)); } @Override public void run() { while (!Thread.interrupted()) { try { // 獲取發生的事件 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterable = selectionKeys.iterator(); while (iterable.hasNext()) { // 對事件進行分發 dispatch(iterable.next()); iterable.remove(); } } catch (IOException e) { e.printStackTrace(); } LockSupport.parkNanos(1000 * 1000 * 1000); } } private void dispatch(SelectionKey selectionKey) { // 獲取事件的附加器 // ACCEPT事件的附加器是Acceptor,故由Acceptor來處理ACCEPT事件 // READ事件的附加器是Handler,故由Handler來處理READ事件 Runnable attachment = (Runnable) selectionKey.attachment(); if (attachment != null) { attachment.run(); } } }
已知Reactor會監聽客戶端連接的ACCEPT事件,還已知ACCEPT事件由Acceptor處理,所以在向多路復用器注冊服務端用于監聽客戶端連接的listen-socket管道時,添加了一個Acceptor作為附加器,那么當發生ACCEPT事件時,就能夠獲取到作為ACCEPT事件附加器的Acceptor來處理ACCEPT事件。
下面看一下Acceptor的實現,如下所示。
public class Acceptor implements Runnable { private final Selector selector; private final ServerSocketChannel listenSocketChannel; public Acceptor(Selector selector, ServerSocketChannel listenSocketChannel) { this.selector = selector; this.listenSocketChannel = listenSocketChannel; } @Override public void run() { try { // 為連接的客戶端創建client-socket管道 SocketChannel clientSocketChannel = listenSocketChannel.accept(); // 設置為非阻塞 clientSocketChannel.configureBlocking(false); // READ事件的附加器是Handler clientSocketChannel.register(selector, SelectionKey.OP_READ, new Handler(clientSocketChannel)); } catch (IOException e) { e.printStackTrace(); } } }
在Acceptor中就是在服務端創建與客戶端通信的client-socket管道,然后注冊到多路復用器上并指定監聽READ事件,同時又因為READ事件由Handler處理,所以還添加了一個Handler作為附加器,當READ事件發生時可以獲取到作為READ事件附加器的Handler來處理READ事件。
下面看一下Handler的實現,如下所示。
public class Handler implements Runnable { private final SocketChannel clientSocketChannel; public Handler(SocketChannel clientSocketChannel) { this.clientSocketChannel = clientSocketChannel; } @Override public void run() { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); try { // 讀取數據 int read = clientSocketChannel.read(byteBuffer); if (read <= 0) { clientSocketChannel.close(); } else { System.out.println(new String(byteBuffer.array())); } } catch (IOException e1) { try { clientSocketChannel.close(); } catch (IOException e2) { e2.printStackTrace(); } e1.printStackTrace(); } } }
在Handler中就是簡單的讀取數據并打印,當讀取數據為空或者發生異常時,需要及時將管道關閉。
最后編寫一個主程序將Reactor運行起來,如下所示。
public class MainServer { public static void main(String[] args) throws IOException { Thread reactorThread = new Thread(new Reactor(8080)); reactorThread.start(); } }
現在來思考一下,單Reactor單線程模型有什么優點和缺點。優點其實就是模型簡單,實現方便。缺點有兩點,如下所示。
一個Reactor同時負責監聽ACCEPT事件和READ事件;
只有一個線程在工作,處理效率低,無法利用多核CPU的優勢。
但是盡管單Reactor單線程模型有上述的缺點,但是著名的緩存中間件Redis的服務端,就是使用的單Reactor單線程模型,示意圖如下。
那為什么以性能著稱的Redis會采取單Reactor單線程模型呢,其實就是因為Redis的操作都在內存中,讀寫都非常快速,所以單Reactor單線程模型也能運行得很流暢,同時還避免了多線程下的各種并發問題。
在理解了單Reactor單線程模型后,那么肯定就能想到,假如在Handler中處理READ事件的這個事情能夠使用一個線程池來完成,從而就可以實現READ事件的處理不會阻塞主線程。而這樣的一個模型,其實就是單Reactor多線程模型,示意圖如下所示。
和單Reactor單線程模型唯一的不同,就是在Handler中多了一個線程池。
單Reactor多線程模型的代碼實現,除了Handler以外,其余和單Reactor單線程模型一摸一樣,所以下面就看一下單Reactor多線程模型中的Handler實現,如下所示。
public class Handler implements Runnable { private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(16, 32, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); private final SocketChannel clientSocketChannel; public Handler(SocketChannel clientSocketChannel) { this.clientSocketChannel = clientSocketChannel; } @Override public void run() { threadPool.execute(() -> { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); try { // 讀取數據 int read = clientSocketChannel.read(byteBuffer); if (read <= 0) { clientSocketChannel.close(); } else { System.out.println(new String(byteBuffer.array())); } // 睡眠10S,演示任務執行耗時長也不會阻塞處理其它客戶端請求 LockSupport.parkNanos(1000 * 1000 * 1000 * 10L); } catch (IOException e1) { try { clientSocketChannel.close(); } catch (IOException e2) { e2.printStackTrace(); } e1.printStackTrace(); } }); } }
其實就是每一個READ事件的處理會作為一個任務被扔到線程池中去處理。
單Reactor多線程模型雖然解決了只有一個線程的問題,但是可以發現,仍舊是只有一個Reactor在同時監聽ACCEPT事件和READ事件。
那么現在思考一下,為什么一個Reactor同時監聽ACCEPT事件和READ事件是不好的。其實就是因為通常客戶端連接的建立是不頻繁的,但是連接建立后數據的收發是頻繁的,所以如果能夠將監聽READ事件這個動作拆分出來,讓多個子Reactor來監聽READ事件,而原來的主Reactor只監聽ACCEPT事件,那么整體的效率,會進一步提升,而這,就是主從Reactor多線程模型。
主從Reactor模型中,有一個主Reactor,專門監聽ACCEPT事件,然后有多個從Reactor,專門監聽READ事件,示意圖如下所示。
上述示意圖中,一次完整的處理流程可以概括如下。
主Reactor監聽到ACCEPT事件發生,表示此時有客戶端建立連接;
主Reactor將ACCEPT事件分發給Acceptor處理;
Acceptor會在服務端創建與客戶端通信的client-socket管道,然后注冊到從Reactor的IO多路復用器selector上,并監聽READ事件;
從Reactor監聽到READ事件發生,表示此時客戶端數據可讀;
從Reactor將ACCEPT事件分發給Handler處理,Handler處理READ事件就會基于client-socket管道完成客戶端數據的讀取。
下面將基于Java語言,實現一個簡單的主從Reactor多線程模型的服務端,整體代碼實現完全符合上述示意圖,大家可以進行參照閱讀。
首先是主Reactor的實現,如下所示。
public class MainReactor implements Runnable { private final Selector selector; public MainReactor(int port) throws IOException { // 開多路復用器 selector = Selector.open(); // 服務端創建listen-socket管道 ServerSocketChannel listenSocketChannel = ServerSocketChannel.open(); // 設置為非阻塞 listenSocketChannel.configureBlocking(false); // 綁定監聽端口 listenSocketChannel.socket().bind(new InetSocketAddress(port)); // 將listen-socket管道綁定到主Reactor的多路復用器上 // 并且主Reactor上只會注冊listen-socket管道,用于監聽ACCEPT事件 listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(listenSocketChannel)); } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterable = selectionKeys.iterator(); while (iterable.hasNext()) { // 對事件進行分發 dispatch(iterable.next()); iterable.remove(); } } catch (IOException e) { e.printStackTrace(); } LockSupport.parkNanos(1000 * 1000 * 1000); } } private void dispatch(SelectionKey selectionKey) { // 獲取事件附加器,只會是Acceptor Runnable attachment = (Runnable) selectionKey.attachment(); if (attachment != null) { attachment.run(); } } }
主Reactor的實現中,還是先創建服務端監聽客戶端連接的listen-socket管道,然后注冊到主Reactor的IO多路復用器上,并監聽ACCEPT事件,同時我們現在知道,主Reactor的IO多路復用器上只會注冊listen-socket管道且只會監聽ACCEPT事件。同樣,也添加了一個Acceptor作為附加器,那么當發生ACCEPT事件時,就能夠獲取到作為ACCEPT事件附加器的Acceptor來處理ACCEPT事件。
下面是Acceptor的實現,如下所示。
public class Acceptor implements Runnable { // 指定從Reactor一共有16個 private static final int TOTAL_SUBREACTOR_NUM = 16; // 服務端的listen-socket管道 private final ServerSocketChannel listenSocketChannel; // 用于運行從Reactor private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( TOTAL_SUBREACTOR_NUM, TOTAL_SUBREACTOR_NUM * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); // 從Reactor集合 private final List<SubReactor> subReactors = new ArrayList<>(TOTAL_SUBREACTOR_NUM); public Acceptor(ServerSocketChannel listenSocketChannel) throws IOException { this.listenSocketChannel = listenSocketChannel; // 將從Reactor初始化出來并運行 for (int i = 0; i < TOTAL_SUBREACTOR_NUM; i++) { SubReactor subReactor = new SubReactor(Selector.open()); subReactors.add(subReactor); threadPool.execute(subReactor); } } @Override public void run() { try { // 為連接的客戶端創建client-socket管道 SocketChannel clientSocketChannel = listenSocketChannel.accept(); // 設置為非阻塞 clientSocketChannel.configureBlocking(false); // 任意選擇一個從Reactor,讓其監聽連接的客戶端的READ事件 Optional<SubReactor> anySubReactor = subReactors.stream().findAny(); if (anySubReactor.isPresent()) { SubReactor subReactor = anySubReactor.get(); // 從Reactor的多路復用器會阻塞在select()方法上 // 這里需要先喚醒多路復用器,立即從select()方法返回 subReactor.getSelector().wakeup(); // 讓從Reactor負責處理客戶端的READ事件 clientSocketChannel.register(subReactor.getSelector(), SelectionKey.OP_READ, new Handler(clientSocketChannel)); } } catch (IOException e) { e.printStackTrace(); } } }
首先在Acceptor的構造函數中,會將所有從Reactor初始化出來,并且每一個從Reactor都會持有一個IO多路復用器。當一個從Reactor創建出來后就會立即運行,此時從Reactor的IO多路復用器就會開始監聽,即阻塞在select() 方法上。
然后在Acceptor的主體邏輯中,會為連接的客戶端創建client-socket管道,然后從所有從Reactor中基于某種策略(隨機)選擇一個從Reactor,并將client-socket管道注冊在選擇的從Reactor的IO多路復用器上,有一點需要注意,此時從Reactor的IO多路復用器可能會阻塞在select() 方法上,所以注冊前需要先通過wakeup() 方法進行喚醒。
接下來繼續看從Reactor的實現,如下所示。
public class SubReactor implements Runnable { private final Selector selector; public SubReactor(Selector selector) { this.selector = selector; } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { // 對事件進行分發 dispatch(iterator.next()); iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } LockSupport.parkNanos(1000 * 1000 * 1000); } } private void dispatch(SelectionKey selectionKey) { // 獲取事件附加器,只會是Handler Runnable runnable = (Runnable) selectionKey.attachment(); if (runnable != null) { runnable.run(); } } public Selector getSelector() { return selector; } }
從Reactor的實現中,會監聽服務端為連接的客戶端創建的client-socket管道上的READ事件,一旦有READ事件發生,就會使用作為附加器的Handler來處理READ事件。同樣,從Reactor的IO多路復用器上只會注冊client-socket管道且只會監聽READ事件。
然后是Handler,因為是多線程模型,所以其實現和第三節中的Handler完全一樣,下面再貼一下代碼。
public class Handler implements Runnable { private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(16, 32, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); private final SocketChannel clientSocketChannel; public Handler(SocketChannel clientSocketChannel) { this.clientSocketChannel = clientSocketChannel; } @Override public void run() { threadPool.execute(() -> { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); try { // 讀取數據 int read = clientSocketChannel.read(byteBuffer); if (read <= 0) { clientSocketChannel.close(); } else { System.out.println(new String(byteBuffer.array())); } // 睡眠10S,演示任務執行耗時長也不會阻塞處理其它客戶端請求 LockSupport.parkNanos(1000 * 1000 * 1000 * 10L); } catch (IOException e1) { try { clientSocketChannel.close(); } catch (IOException e2) { e2.printStackTrace(); } e1.printStackTrace(); } }); } }
最后編寫一個主程序將主Reactor運行起來,如下所示。
public class MainServer { public static void main(String[] args) throws IOException { Thread mainReactorThread = new Thread(new MainReactor(8080)); mainReactorThread.start(); } }
關于“Reactor模型如何實現”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Reactor模型如何實現”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。