您好,登錄后才能下訂單哦!
題記
最近對netty有了興趣,現在官方推薦版本是netty4.*,但是縱觀網絡,大部分都是關于netty3.x的知識。
最好的學習,莫過于通過官方文檔進行學習,系統,透徹,權威,缺點是英文。本文,算做自己學習netty的第一篇,總體思路與User guide for 4.x基本一致,本篇文章不是嚴格意義的翻譯文章。開始了...
1.前言
1.1 問題
現 在,我們使用通用的應用程序和程序庫,進行互相交流。例如,我們經常使用HTTP client庫從web服務器上獲取信息,通過web services調用遠程接口。然而,通用的協議或實現,有時候不能很好的擴展伸縮。就像我們不能使用通用協議進行部分信息的交換,如:huge files,e-mail,實時信息。我們需要高度優化的協議實現,來完成一些特殊目的。比如,你想實現一款專門的HTTP服務器,以支持基于AJAX的聊天應用,流媒體,大文件傳輸。你需要設計和實現一個完整的新協議,不可避免需要處理遺留協議,在不影響性能和穩定的情況下,實現新協議速度能有多塊?
1.2 解決方案
Netty 致力于提供異步,基于事件驅動的網絡應用框架,是一款進行快速開發高性能,可伸縮的協議服務器和客戶端工具。換句話說,Netty是一個快速和容易開發的NIO客戶端,服務端網絡框架,它簡化和使用流式方式處理網絡編程,如TCP,UDP。
"快速和容易“,并不代表netty存在可維護性和性能問題。它很完美的提供FTP,SMTP,HTTP,二進制和基于文本的協議支持。有的用戶可能已經發現了擁有同樣有點的其他網絡應用框架。你可能想問:netty和他們的區別?這個問題不好回答,Netty被設計成提供最合適的API和實現,會讓你的生活更簡單。
2.開始
本節使用一些簡單的例子讓你快速的感知netty的核心結構。閱讀本節之后,你可以熟練掌握netty,可以寫一個client和server。
準備
JDK 1.6+
最新版本的netty:下載地址
maven依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
2.1寫一個Discard服務
最簡單的協議,不是'hello world',而是DISCARD。這個協議拋棄接收的數據,沒有響應。
協議實現,唯一需要做的一件事就是無視所有接收到的數據,讓我們開始吧。
直接上Handler的實現,它處理來自netty的I/O事件。
package io.netty.examples.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently. ((ByteBuf) msg).release(); // (3) /* //也可以這樣 try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } */ } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
1)DiscardServerHandler
繼承了ChannelInboundHandlerAdapter
,
實現了
ChannelInboundHandler
接口。ChannelInboundHandler
提供了多個可重寫的事件處理方法。現在,繼承了ChannelInboundHandlerAdapter,而不用自己實現handler接口。
2)重寫了channelRead
(),這個方法在接收到新信息時被調用。信息類型是ByteBuf
3)實現Discard協議,ByteBuf是reference-counted對象,必須顯示的調用release(),進行釋放。需要記住handler的責任包括釋放任意的reference-counted對象。
4)exceptionCaught(),當發生異常時調用,I/O異常或Handler處理異常。一般情況下,在這里可以記錄異常日志,和返回錯誤碼。
到了這里,完成了一半,接下來編寫main()
,啟動Server
package io.netty.examples.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.examples.discard.time.TimeServerHandler; /** * Discards any incoming data. */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }
NioEventLoopGroup
是一個多線程的處理I/O操作Event Loop(這是異步機制特點) Netty 提供了多個 EventLoopGroup
,支持多種傳輸方式。在這個例子中,我們使用了2個NioEventLoopGroup
。
第一個,叫"boss",接收進入的連接,第二個經常叫“worker",一旦boss接收了連接并注冊連接到這個worker,worker就會處理這個連接。多少個線程被使用?,EventLoopGroup擁有多少個Channel?都可以通過構造函數配置
ServerBootstrap
是建立server的幫助類。你可以使用Channel直接創建server,注意,這個創建過程非常的冗長,大部分情況,你不需要直接創建。
指定 NioServerSocketChannel類,當接收新接入連接時,被用在Channel的初始化。
這個handler,初始化Channel時調用。ChannelInitializer是一個專門用于配置新的Channel的Handler,一般為Channel配置ChannelPipeline。當業務復雜時,會添加更多的handler到pipeline.
你可以設置影響Channel實現的參數,比如keepAlive..
option()
影響的是接收進入的連接 的NioServerSocketChannel
;childOption()
影響的是來自父 ServerChannel分發的Channel, 在本例中是 NioServerSocketChannel
保證工作準備好了
通過telnet進行測試。
輸入telnet 127.0.0.1 8080
由于是Discard協議,一沒響應,二服務端也沒輸出。通過測試,只能確認是服務正常啟動了。
調整下Handler邏輯,修改channelRead()方法。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } }
修改之后,在使用telnet測試。你在命令行中輸入什么,在服務端就能看到什么!(只能是英文)
2.2寫一個Echo服務
作為一個服務,沒有返回響應信息,明顯是不合格的。接下來實現Echo協議,返回響應信息。與前面的Discard服務實現唯一的不同點就是Handler,需要回寫接收到的信息,而不是打印輸出。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
1)ChannelHandlerContext提供多個方法,觸發I/O事件。在這里,不需要像Discard一樣釋放 接收的信息。
2)ctx.write(Object)不能保證完全寫入,底層存在緩存,需要通過ctx.flush()刷新,保證完全寫入。
完成。
2.3寫一個Time服務
時間協議見 TIME protocol。和前面2個協議不同,服務將發送一個32位的integer值。不需要接收信息,一旦信息發出,將關閉連接。在這個例子中,將學到如何構造和發送信息,并在完成時關閉連接。
由于我們不需要接收信息,所以不需要channelRead()方法,而是使用channelActive()。
package io.netty.example.time;public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
1)channelActive()在連接建立和準備完成時調用。
2) 發送一個新消息,需要分配一個buff.將來寫入一個4位的integer值。獲取當前的 ByteBufAllocator,可以通過ChannelHandlerContext.alloc()獲取。
3)注意沒有java.nio.ByteBuffer.flip(),(nio 讀寫切換時使用),這是因為netty重寫了Buffer,維護了2個指示器,分別用來(reader index)讀取和( writer index )寫入。
注意:ChannelHandlerContext.write() (and writeAndFlush())返回的ChannelFuture 。ChannelFuture 代表一個尚未發生的I/O操作。這一個是異步操作,會觸發通知listeners 。這兒的意思是當ChannelFuture完成時,才會調用close()。
最終版的Server Handler
package io.netty.examples.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
接下來實現Time客戶端
與Echo和Discard協議不同,人不能很好的將32位的integer,轉換為可以理解的日期數據。通過學習本節,可以學習如何寫一個Client。這和前面的EchoServer,DiscardServer實現有很大的不同。
package io.netty.examples.time; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = Integer.parseInt("8080"); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
Bootstrap
is similar to ServerBootstrap
except that it's for non-server channels such as a client-side or connectionless channel.
如果只使用一個 EventLoopGroup
,它會被當成boss group 和 worker group. boss worker不會再client使用。
代替NioServerSocketChannel
, NioSocketChannel
是一個 客戶端的Channel
.
注意沒有使用 childOption()
,因為客戶端沒有上級。
調用connect()
,而不是 bind()
下面是對應的Handler
package io.netty.examples.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
In TCP/IP, Netty reads the data sent from a peer into a ByteBuf
.
測試過程:略。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。