您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關netty中pipeline如何添加handler,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); // 綁定線程池 sb.group(group, bossGroup) // 指定使用的channel .channel(NioServerSocketChannel.class) // 綁定監聽端口 .localAddress(this.port) .handler(new LoggingHandler(LogLevel.INFO)) // 綁定客戶端連接時候觸發操作 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); //字符串解碼 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); //字符串編碼 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS)); } }); // 服務器異步創建綁定 ChannelFuture cf = sb.bind().sync(); log.info(NettyServer.class + " 啟動正在監聽: " + cf.channel().localAddress()); // 關閉服務器通道 cf.channel().closeFuture().sync(); } finally { // 釋放線程池資源 group.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } }
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
void init(Channel channel) throws Exception { //省略若干 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } //看這里 添加了一個ChannelInitializer實例 實例中實現了initChannel 獲取配置的handler添加到pipeline p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); //這方法很簡單 添加到鏈表中 但是此時添加的是上文的ChannelInitializer 而不是我們定義的handler addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } //這個方法就是關鍵 將ChannelInitializer中的handler添加到pipeline中 以及刪除ChannelInitializer //但請注意 首次注冊不會走到這里 在前面的判斷就會返回 會在注冊的邏輯中觸發 最終調用的一致 callHandlerAdded0(newCtx); return this; }
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { //看這里刪除pipeline中的 ChannelInitializer ChannelInitializer 其實就是一個handler 只是它重寫了handlerAdded方法 ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }
關于“netty中pipeline如何添加handler”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。