Netty03-核心组件NioEventLoopGroup解读

NioEventLoopGroup

在这里插入图片描述

可以看到NioEventLoopGroup继承了MultithreadEventExecutorGroup并且实现了EventLoopGroup接口,而这两个类被ExecutorService修饰,所以NioEventLoopGroup实际上是一个线程池,池中的对象其实就是单个的NioEventLoop。

源码解读

NioEventLoopGroup 的参数初始化

   // 进入无参构造
    public NioEventLoopGroup() {
        this(0);
    } 
public NioEventLoopGroup(int nThreads) {
    // group所包含的executor
        this(nThreads, (Executor) null);
    }
public NioEventLoopGroup(int nThreads, Executor executor) {
    // 注入单例模式的提供者    
    this(nThreads, executor, SelectorProvider.provider());
    }
  public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
      // 注入默认的策略工厂实例
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) { // 注入拒绝处理器
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
  protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        //默认线程数是cpu核数的两倍
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
构造真正的NioEventGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        //1、
        //executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接
        // 这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从
        //NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
      //2、
        //这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现       线程池;
        //数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //3、
            //newChild(executor, args) 函数在NioEventLoopGroup类中实现了,
            // 实质就是就是存入了一个 NIOEventLoop类实例
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
 						// 终止所有eventLoop上所执行的任务
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
//4、实例化线程工厂执行器选择器: 根据children获取选择器
        chooser = chooserFactory.newChooser(children);
 //5、为每个EventLoop线程添加 线程终止监听器
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
//6、将children 添加到对应的set集合中去重, 表示只可读。
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

children[i] = newChild(executor, args);

/**
 * newChild(executor, args) 里的方法
 * 我们可以看到 返回的就是一个 NioEventLoop
 */
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
1. NioEventLoopGroup初始化时未指定线程数,那么会使用默认线程数,即 `线程数 = CPU核心数 * 2`;
2. 每个NioEventLoopGroup对象内部都有一组可执行的`NioEventLoop数组`,其大小是 nThreads, 这样就构成了一个线程池, `一个NIOEventLoop可以理解成就是一个线程`。
3. 所有的NIOEventLoop线程是使用相同的 executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler以及是属于某一个
    NIOEventLoopGroup的。这一点从 newChild(executor, args); 方法就可以看出:newChild()的实现是在NIOEventLoopGroup中实现的。
4. 当有IO事件来时,需要从线程池中选择一个线程出来执行,这时候的NioEventLoop选择策略是由GenericEventExecutorChooser实现的,并调用该类的next()方法。
5. 每个NioEventLoopGroup对象都有一个NioEventLoop选择器与之对应,其会根据NioEventLoop的个数,动态选择chooser(如果是2的幂次方,则按位运算,否则使用普通的轮询)

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>