code-learning/netty/29-Netty 源码解析-Channel(二)之 accept 操作.md

27 KiB
Raw Permalink Blame History

精尽 Netty 源码解析 —— Channel之 accept 操作

1. 概述

本文分享 Netty NIO 服务端 NioServerSocketChannel 接受( accept )客户端连接的过程。简单来说:

  1. 服务端 NioServerSocketChannel 的 boss EventLoop 线程轮询是否有新的客户端连接接入。
  2. 当轮询到有新的连接接入,封装连入的客户端的 SocketChannel 为 Netty NioSocketChannel 对象。
  3. 选择一个服务端 NioServerSocketChannel 的 worker EventLoop ,将客户端的 NioSocketChannel 注册到其上。并且,注册客户端的 NioSocketChannel 的读事件,开始轮询该客户端是否有数据写入。

下面,让我们来看看具体的代码实现。

2. NioMessageUnsafe#read

老艿艿:有点不知道怎么取标题好,直接用方法名吧。

在 NioEventLoop 的 #processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法中,我们会看到这样一段代码:

// SelectionKey.OP_READ 或 SelectionKey.OP_ACCEPT 就绪
// readyOps == 0 是对 JDK Bug 的处理,防止空的死循环
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}
  • (readyOps & SelectionKey.OP_ACCEPT) != 0 时,这就是服务端 NioServerSocketChannel 的 boss EventLoop 线程轮询到有新的客户端连接接入。
  • 然后,调用 NioMessageUnsafe#read() 方法,“读取”( 😈 这个抽象很灵性 )新的客户端连接连入。

NioMessageUnsafe#read() 方法,代码如下:

 1: private final class NioMessageUnsafe extends AbstractNioUnsafe {
 2: 
 3:     /**
 4:      * 新读取的客户端连接数组
 5:      */
 6:     private final List<Object> readBuf = new ArrayList<Object>();
 7: 
 8:     @SuppressWarnings("Duplicates")
 9:     @Override
10:     public void read() {
11:         assert eventLoop().inEventLoop();
12:         final ChannelConfig config = config();
13:         final ChannelPipeline pipeline = pipeline();
14:         // 获得 RecvByteBufAllocator.Handle 对象
15:         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
16:         // 重置 RecvByteBufAllocator.Handle 对象
17:         allocHandle.reset(config);
18: 
19:         boolean closed = false;
20:         Throwable exception = null;
21:         try {
22:             try {
23:                 do {
24:                     // 读取客户端的连接到 readBuf 中
25:                     int localRead = doReadMessages(readBuf);
26:                     // 无可读取的客户端的连接,结束
27:                     if (localRead == 0) {
28:                         break;
29:                     }
30:                     // 读取出错
31:                     if (localRead < 0) {
32:                         closed = true; // 标记关闭
33:                         break;
34:                     }
35: 
36:                     // 读取消息数量 + localRead
37:                     allocHandle.incMessagesRead(localRead);
38:                 } while (allocHandle.continueReading()); // 循环判断是否继续读取
39:             } catch (Throwable t) {
40:                 // 记录异常
41:                 exception = t;
42:             }
43: 
44:             // 循环 readBuf 数组,触发 Channel read 事件到 pipeline 中。
45:             int size = readBuf.size();
46:             for (int i = 0; i < size; i ++) {
47:                 // TODO 芋艿
48:                 readPending = false;
49:                 // 在内部,会通过 ServerBootstrapAcceptor ,将客户端的 Netty NioSocketChannel 注册到 EventLoop 上
50:                 pipeline.fireChannelRead(readBuf.get(i));
51:             }
52:             // 清空 readBuf 数组
53:             readBuf.clear();
54:             // 读取完成
55:             allocHandle.readComplete();
56:             // 触发 Channel readComplete 事件到 pipeline 中。
57:             pipeline.fireChannelReadComplete();
58: 
59:             // 发生异常
60:             if (exception != null) {
61:                 // 判断是否要关闭 TODO 芋艿
62:                 closed = closeOnReadError(exception);
63: 
64:                 // 触发 exceptionCaught 事件到 pipeline 中。
65:                 pipeline.fireExceptionCaught(exception);
66:             }
67: 
68:             if (closed) {
69:                 // TODO 芋艿
70:                 inputShutdown = true;
71:                 // TODO 芋艿
72:                 if (isOpen()) {
73:                     close(voidPromise());
74:                 }
75:             }
76:         } finally {
77:             // Check if there is a readPending which was not processed yet.
78:             // This could be for two reasons:
79:             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
80:             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
81:             //
82:             // See https://github.com/netty/netty/issues/2254
83:             // TODO 芋艿
84:             if (!readPending && !config.isAutoRead()) {
85:                 removeReadOp();
86:             }
87:         }
88:     }
89: }
  • 😈 NioMessageUnsafe 只有一个 #read() 方法,而该方法,“读取”新的客户端连接连入。

  • 第 15 行:调用 Unsafe#recvBufAllocHandle() 方法,获得 获得 RecvByteBufAllocator.Handle 对象。默认情况下,返回的是 AdaptiveRecvByteBufAllocator.HandleImpl 对象。关于它的内容,我们放在 ByteBuf 相关的文章,详细解析。

    • 第 17 行:调用 DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#reset(ChannelConfig) 方法,重置 RecvByteBufAllocator.Handle 对象。代码如下:

      @Override
      public void reset(ChannelConfig config) {
          this.config = config; // 重置 ChannelConfig 对象
          maxMessagePerRead = maxMessagesPerRead(); // 重置 maxMessagePerRead 属性
          totalMessages = totalBytesRead = 0; // 重置 totalMessages 和 totalBytesRead 属性
      }
      
      • 注意AdaptiveRecvByteBufAllocator.HandleImpl 继承 DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle 抽象类。
  • 第 22 至 42 行:while 循环 “读取”新的客户端连接连入。

    • 第 25 行: 调用 NioServerSocketChannel#doReadMessages(List<Object> buf) 方法,读取客户端的连接到 readBuf 中。详细解析,胖友先跳到 「3. AbstractNioMessageChannel#doReadMessages」 中,看完记得回到此处。

    • 第 25 至 29 行:无可读取的客户端的连接,结束循环。

    • 第 30 至 34 行:读取出错,标记关闭服务端,并结束循环。目前我们看到 NioServerSocketChannel#doReadMessages(List<Object> buf) 方法的实现,返回的结果只会存在 0 和 1 ,也就是说不会出现这种情况。笔者又去翻了别的实现类,例如 NioDatagramChannel#doReadMessages(List<Object> buf) 方法,在发生异常时,会返回 -1 。

    • 第 37 行:调用 AdaptiveRecvByteBufAllocator.HandleImpl#incMessagesRead(int amt) 方法,读取消息( 客户端 )数量 + localRead 。代码如下:

      @Override
      public final void incMessagesRead(int amt) {
          totalMessages += amt;
      }
      
      • 对于 AdaptiveRecvByteBufAllocator.HandleImpl 来说,考虑到抽象的需要,所以统一使用“消息”的说法。
    • 第 38 行:调用 AdaptiveRecvByteBufAllocator.HandleImpl#incMessagesRead(int amt)#continueReading() 方法,判断是否循环是否继续,读取( 接受 )新的客户端连接。代码如下:

      // AdaptiveRecvByteBufAllocator.HandleImpl.java
      @Override
      public boolean continueReading() {
          return continueReading(defaultMaybeMoreSupplier);
      }
      
      // DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.java
      @Override
      public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
          return config.isAutoRead() &&
                 (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                 totalMessages < maxMessagePerRead &&
                 totalBytesRead > 0; // <1>
      }
      
      • 因为 <1> 处,此时 totalBytesRead 等于 0 ,所以会返回 false 。因此,循环会结束。也因此,对于 NioServerSocketChannel 来说,每次只接受一个新的客户端连接😈 当然,因为服务端 NioServerSocketChannel 对 Selectionkey.OP_ACCEPT 事件感兴趣,所以后续的新的客户端连接还是会被接受的
    • 第 39 至 42 行:读取过程中发生异常,记录该异常到 exception 中,同时结束循环。

  • 第 44 至 51 行:循环

    readBuf
    

    数组,触发 Channel read 事件到 pipeline 中。

    • 第 48 行TODO 芋艿 细节

    • 第 50 行:调用

      ChannelPipeline#fireChannelRead(Object msg)
      

      方法,触发 Channel read 事件到 pipeline 中。

      • 注意,传入的方法参数是新接受的客户端 NioSocketChannel 连接。
      • 在内部,会通过 ServerBootstrapAcceptor ,将客户端的 Netty NioSocketChannel 注册到 EventLoop 上。详细解析,胖友先跳到 「4. ServerBootstrapAcceptor」 中,看完记得回到此处。
  • 第 53 行:清空 readBuf 数组。

  • 第 55 行:调用 RecvByteBufAllocator.Handle#readComplete() 方法,读取完成。暂无重要的逻辑,不详细解析。

  • 第 57 行:调用 ChannelPipeline#fireChannelReadComplete() 方法,触发 Channel readComplete 事件到 pipeline 中。

    • 如果有需要,胖友可以自定义处理器,处理该事件。一般情况下,不需要

    • 如果没有自定义 ChannelHandler 进行处理,最终会被 pipeline 中的尾节点 TailContext 所处理。代码如下:

      // TailContext.java
      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
          onUnhandledInboundChannelReadComplete();
      }
      
      // DefaultChannelPipeline.java
      protected void onUnhandledInboundChannelReadComplete() {
      }
      
      • 具体的调用是空方法
  • 第 60 至 66 行:

    exception
    

    非空,说明在接受连接过程中发生异常。

    • 第 62 行TODO 芋艿 细节

    • 第 65 行: 调用

      ChannelPipeline#fireExceptionCaught(Throwable)
      

      方法,触发 exceptionCaught 事件到 pipeline 中。

      • 默认情况下,会使用 ServerBootstrapAcceptor 处理该事件。详细解析,见 「4.3 exceptionCaught」
      • 如果有需要,胖友可以自定义处理器,处理该事件。一般情况下,不需要
  • 第 68 至 75 行TODO 芋艿 细节

  • 第 76 至 87 行TODO 芋艿 细节

3. AbstractNioMessageChannel#doReadMessages

doReadMessages(List<Object> buf) 抽象方法,读取客户端的连接到方法参数 buf 中。它是一个抽象方法,定义在 AbstractNioMessageChannel 抽象类中。代码如下:

/**
 * Read messages into the given array and return the amount which was read.
 */
protected abstract int doReadMessages(List<Object> buf) throws Exception;
  • 返回值为读取到的数量。

NioServerSocketChannel 对该方法的实现代码如下:

  1: @Override
  2: protected int doReadMessages(List<Object> buf) throws Exception {
  3:     // 接受客户端连接
  4:     SocketChannel ch = SocketUtils.accept(javaChannel());
  5: 
  6:     try {
  7:         // 创建 Netty NioSocketChannel 对象
  8:         if (ch != null) {
  9:             buf.add(new NioSocketChannel(this, ch));
 10:             return 1;
 11:         }
 12:     } catch (Throwable t) {
 13:         logger.warn("Failed to create a new channel from an accepted socket.", t);
 14:         // 发生异常,关闭客户端的 SocketChannel 连接
 15:         try {
 16:             ch.close();
 17:         } catch (Throwable t2) {
 18:             logger.warn("Failed to close a socket.", t2);
 19:         }
 20:     }
 21: 
 22:     return 0;
 23: }
 
 @Override
protected ServerSocketChannel javaChannel() {
    return (ServerSocketChannel) super.javaChannel();
}
  • 第 4 行:调用 SocketUtils#accept(ServerSocketChannel serverSocketChannel) 方法,接受客户端连接。代码如下:

    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept(); // <1>
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }
    
    • 重点是看 <1> 处,调用 ServerSocketChannel#accept() 方法,接受客户端连接。
  • 第 9 行:基于客户端的 NIO ServerSocket ,创建 Netty NioSocketChannel 对象。整个过程,就是

    《精尽 Netty 源码分析 —— 启动(二)之客户端》

    「3.7.1 创建 Channel 对象」

    小节。

    • 第 10 行:返回 1 ,表示成功接受了 1 个新的客户端连接。
  • 第 12 至 20 行:发生异常,关闭客户端的 SocketChannel 连接,并打印

    告警

    日志。

    • 第 22 行:返回 0 ,表示成功接受 0 个新的客户端连接。

4. ServerBootstrapAcceptor

ServerBootstrapAcceptor ,继承 ChannelInboundHandlerAdapter 类,服务器接收器( acceptor ),负责将接受的客户端的 NioSocketChannel 注册到 EventLoop 中。

另外,从继承的是 ChannelInboundHandlerAdapter 类,可以看出它是 Inbound 事件处理器。

4.1 构造方法

在服务端的启动过程中,我们看到 ServerBootstrapAcceptor 注册到服务端的 NioServerSocketChannel 的 pipeline 的尾部,代码如下:

// 记录当前的属性
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
    currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
    currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}

// 添加 ChannelInitializer 对象到 pipeline 中,用于后续初始化 ChannelHandler 到 pipeline 中。
p.addLast(new ChannelInitializer<Channel>() {

    @Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();

        // 添加配置的 ChannelHandler 到 pipeline 中。
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        // 添加 ServerBootstrapAcceptor 到 pipeline 中。
        // 使用 EventLoop 执行的原因,参见 https://github.com/lightningMan/netty/commit/4638df20628a8987c8709f0f8e5f3679a914ce1a
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); // <1>
            }
        });
    }

});
  • <1> 处。也是在此处,创建了 ServerBootstrapAcceptor 对象。代码如下:

    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;
    /**
     * 自动恢复接受客户端连接的任务
     */
    private final Runnable enableAutoReadTask;
    
    ServerBootstrapAcceptor(
            final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
    
        // Task which is scheduled to re-enable auto-read.
        // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
        // not be able to load the class because of the file limit it already reached.
        //
        // See https://github.com/netty/netty/issues/1328
        enableAutoReadTask = new Runnable() { // <2>
            @Override
            public void run() {
                channel.config().setAutoRead(true);
            }
        };
    }
    
    • enableAutoReadTask 属性,自动恢复接受客户端连接的任务,在 <2> 处初始化。具体的使用,我们在 「4.3 exceptionCaught」 中,详细解析。

4.2 channelRead

#channelRead(ChannelHandlerContext ctx, Object msg) 方法,将接受的客户端的 NioSocketChannel 注册到 EventLoop 中。代码如下:

 1: @Override
 2: public void channelRead(ChannelHandlerContext ctx, Object msg) {
 3:     // 老艿艿:如下的注释,先暂时认为是接受的客户端的 NioSocketChannel
 4: 
 5:     // 接受的客户端的 NioSocketChannel 对象
 6:     final Channel child = (Channel) msg;
 7:     // 添加 NioSocketChannel 的处理器
 8:     child.pipeline().addLast(childHandler);
 9:     // 设置 NioSocketChannel 的配置项
10:     setChannelOptions(child, childOptions, logger);
11:     // 设置 NioSocketChannel 的属性
12:     for (Entry<AttributeKey<?>, Object> e: childAttrs) {
13:         child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
14:     }
15: 
16:     try {
17:         // 注册客户端的 NioSocketChannel 到 work EventLoop 中。
18:         childGroup.register(child).addListener(new ChannelFutureListener() {
19: 
20:             @Override
21:             public void operationComplete(ChannelFuture future) throws Exception {
22:                 // 注册失败,关闭客户端的 NioSocketChannel
23:                 if (!future.isSuccess()) {
24:                     forceClose(child, future.cause());
25:                 }
26:             }
27: 
28:         });
29:     } catch (Throwable t) {
30:         // 发生异常,强制关闭客户端的 NioSocketChannel
31:         forceClose(child, t);
32:     }
33: }
  • 为了方便描述,我们统一认为接受的客户端连接为 NioSocketChannel 对象。

  • 第 6 行:接受的客户端的 NioSocketChannel 对象。

    • 第 8 行:调用 ChannelPipeline#addLast(childHandler) 方法,将配置的子 Channel 的处理器,添加到 NioSocketChannel 中。
    • 第 10 至 14 行:设置 NioSocketChannel 的配置项、属性。
  • 第 17 至 28 行:调用 EventLoopGroup#register(Channel channel) 方法,将客户端的 NioSocketChannel 对象,从 worker EventLoopGroup 中选择一个 EventLoop ,注册到其上。

    • 后续的逻辑,就和 《精尽 Netty 源码分析 —— 启动(一)之服务端》 的注册逻辑基本一致( 虽然说,文章写的是 NioServerSocketChannel 的注册逻辑 )。

    • 在注册完成之后,该 worker EventLoop 就会开始轮询该客户端是否有数据写入。

    • 第 18 至 28 行:添加监听器,如果注册失败,则调用 #forceClose(Channel child, Throwable t) 方法,强制关闭客户端的 NioSocketChannel 连接。代码如下:

      private static void forceClose(Channel child, Throwable t) {
          child.unsafe().closeForcibly();
          logger.warn("Failed to register an accepted channel: {}", child, t);
      }
      
      • 在该方法内部,会调用 Unsafe#closeForcibly() 方法,强制关闭客户端的 NioSocketChannel 。
    • 第 29 至 32 行:发生异常,则调用 #forceClose(Channel child, Throwable t) 方法,强制关闭客户端的 NioSocketChannel 连接。

4.3 exceptionCaught

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,当捕获到异常时,暂停 1 秒,不再接受新的客户端连接;而后,再恢复接受新的客户端连接。代码如下:

 1: @Override
 2: public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 3:     final ChannelConfig config = ctx.channel().config();
 4:     if (config.isAutoRead()) {
 5:         // 关闭接受新的客户端连接
 6:         // stop accept new connections for 1 second to allow the channel to recover
 7:         // See https://github.com/netty/netty/issues/1328
 8:         config.setAutoRead(false);
 9:         // 发起 1 秒的延迟任务,恢复重启开启接受新的客户端连接
10:         ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
11:     }
12: 
13:     // 继续传播 exceptionCaught 给下一个节点
14:     // still let the exceptionCaught event flow through the pipeline to give the user
15:     // a chance to do something with it
16:     ctx.fireExceptionCaught(cause);
17: }
  • 第 8 行:调用 ChannelConfig#setAutoRead(false) 方法,关闭接受新的客户端连接。代码如下:

    // DefaultChannelConfig.java
    /**
     * {@link #autoRead} 的原子更新器
     */
    private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
    /**
     * 是否开启自动读取的开关
     *
     * 1 - 开启
     * 0 - 关闭
     */
    @SuppressWarnings("FieldMayBeFinal")
    private volatile int autoRead = 1;
    
    @Override
    public ChannelConfig setAutoRead(boolean autoRead) {
        // 原子更新,并且获得更新前的值 <1>
        boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
        // 发起读取 <2.1>
        if (autoRead && !oldAutoRead) {
            channel.read();
        // 关闭读取 <2.2>
        } else if (!autoRead && oldAutoRead) {
            autoReadCleared();
        }
        return this;
    }
    
    • autoRead
      

      字段,是否开启自动读取的开关。😈 笔者原本以为是个

      boolean
      

      类型是不是胖友也是。其中1 表示开启0 表示关闭。

      • AUTOREAD_UPDATER 静态变量,对 autoRead 字段的原子更新器。
    • <1> 处,使用 AUTOREAD_UPDATER 更新 autoRead 字段,并获得更新前的值。为什么需要获取更新前的值呢?在后续的 <2.1><2.2> 中,当 autoRead 有变化时候,才进行后续的逻辑。

    • 😈 下面的逻辑,我们按照 channel 的类型为 NioServerSocketChannel 来分享。

    • <2.1> 处,autoRead && !oldAutoRead 返回 true ,意味着恢复重启开启接受新的客户端连接。所以调用 NioServerSocketChannel#read() 方法,后续的逻辑,就是 《精尽 Netty 源码分析 —— 启动(一)之服务端》「3.13.3 beginRead」 的逻辑。

    • <2.2> 处,!autoRead && oldAutoRead 返回 false ,意味着关闭接受新的客户端连接。所以调用 #autoReadCleared() 方法,移除对 SelectionKey.OP_ACCEPT 事件的感兴趣。

      // NioServerSocketChannel.java
      
      @Override
      protected void autoReadCleared() {
          clearReadPending();
      }
      
      • 在方法内部,会调用 #clearReadPending() 方法,代码如下:

        protected final void clearReadPending() {
            if (isRegistered()) {
                EventLoop eventLoop = eventLoop();
                if (eventLoop.inEventLoop()) {
                    clearReadPending0();
                } else {
                    eventLoop.execute(clearReadPendingRunnable);
                }
            } else {
                // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
                // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
                // not set yet so it would produce an assertion failure.
                readPending = false;
            }
        }
        
        private final Runnable clearReadPendingRunnable = new Runnable() {
            @Override
            public void run() {
                clearReadPending0();
            }
        };
        
        private void clearReadPending0() {
            // TODO 芋艿
            readPending = false;
            // 移除对“读”事件的感兴趣。
            ((AbstractNioUnsafe) unsafe()).removeReadOp();
        }
        
        • 最终的结果,是在 EventLoop 的线程中,调用 AbstractNioUnsafe#clearReadPending0() 方法,移除对“”事件的感兴趣( 对于 NioServerSocketChannel 的 ““事件就是 SelectionKey.OP_ACCEPT )。代码如下:

          // AbstractNioUnsafe.java
          
          protected final void removeReadOp() {
              SelectionKey key = selectionKey();
              // 忽略,如果 SelectionKey 不合法,例如已经取消
              // Check first if the key is still valid as it may be canceled as part of the deregistration
              // from the EventLoop
              // See https://github.com/netty/netty/issues/2104
              if (!key.isValid()) {
                  return;
              }
              // 移除对“读”事件的感兴趣。
              int interestOps = key.interestOps();
              if ((interestOps & readInterestOp) != 0) {
                  // only remove readInterestOp if needed
                  key.interestOps(interestOps & ~readInterestOp);
              }
          }
          
          • 通过取反求并,后调用 SelectionKey#interestOps(interestOps) 方法,移除对“读”事件的感兴趣。
          • 😈 整个过程的调用链,有丢丢长,胖友可以回看,或者多多调试。
  • 第 10 行:调用 EventLoop#schedule(Runnable command, long delay, TimeUnit unit) 方法,发起 1 秒的延迟任务,恢复重启开启接受新的客户端连接。该定时任务会调用 ChannelConfig#setAutoRead(true) 方法,即对应 <2.1> 情况。

  • 第 16 行:调用 ChannelHandlerContext#fireExceptionCaught(cause) 方法,继续传播 exceptionCaught 给下一个节点。具体的原因,可看英文注释。

666. 彩蛋

推荐阅读文章: