详细讲讲自己第一次在哪里 详细讲讲netty的pipiline!
前言提到 Netty 首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到 NioEventLoopGroup 这个线程池,接下来进入正题 。
线程模型首先来看一段 Netty 的使用示例
package com.coding.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public final class SimpleServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new SimpleServerHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {}});ChannelFuture f = b.bind(8888).sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelActive");}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("channelRegistered");}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("handlerAdded");}}}下面将分析第一、二行代码,看下 NioEventLoopGroup 类的构造函数干了些什么 。其余的部分将在其他博文中分析 。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();从代码中可以看到这里使用了两个线程池 bossGroup 和 workerGroup,那么为什么需要定义两个线程池呢?这就要说到 Netty 的线程模型了 。

文章插图
Netty 的线程模型被称为 Reactor 模型,具体如图所示,图上的 mainReactor 指的就是 bossGroup,这个线程池处理客户端的连接请求,并将 accept 的连接注册到 subReactor 的其中一个线程上;图上的 subReactor 当然指的就是 workerGroup,负责处理已建立的客户端通道上的数据读写;图上还有一块 ThreadPool 是具体的处理业务逻辑的线程池,一般情况下可以复用 subReactor,比我的项目中就是这种用法,但官方建议处理一些较为耗时的业务时还是要使用单独的 ThreadPool 。
NioEventLoopGroup 构造函数
public NioEventLoopGroup() {this(0);}public NioEventLoopGroup(int nThreads) {this(nThreads, null);}public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {this(nThreads, threadFactory, SelectorProvider.provider());}public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {super(nThreads, threadFactory, selectorProvider);}NioEventLoopGroup 类中的构造函数最终都是调用的父类 MultithreadEventLoopGroup 如下的构造函数:protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);}从上面的构造函数可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()来创建对象,即不指定线程个数,则 netty 给我们使用默认的线程个数,如果指定则用我们指定的线程个数 。默认线程个数相关的代码如下:
static {DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));if (logger.isDebugEnabled()) {logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);}}而 SystemPropertyUtil.getInt 函数的功能为:得到系统属性中指定 key(这里:key =”io.netty.eventLoopThreads”)所对应的 value,如果获取不到获取失败则返回默认值,这里的默认值为:cpu 的核数的2倍 。结论:如果没有设置程序启动参数(或者说没有指定 key=”io.netty.eventLoopThreads”的属性值),那么默认情况下线程的个数为 cpu 的核数乘以 2 。
继续看,由于 MultithreadEventLoopGroup 的构造函数是调用的是其父类 MultithreadEventExecutorGroup 的构造函数,因此,看下此类的构造函数
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (threadFactory == null) {threadFactory = newDefaultThreadFactory();}children = new SingleThreadEventExecutor[nThreads];//根据线程个数是否为2的幂次方,采用不同策略初始化chooserif (isPowerOfTwo(children.length)) {chooser = new PowerOfTwoEventExecutorChooser();} else {chooser = new GenericEventExecutorChooser();}//产生nTreads个NioEventLoop对象保存在children数组中for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(threadFactory, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {//如果newChild方法执行失败,则对前面执行new成功的几个NioEventLoop进行shutdown处理if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {Thread.currentThread().interrupt();break;}}}}}}该构造函数干了如下三件事:- 产生了一个线程工场:threadFactory = newDefaultThreadFactory();
MultithreadEventExecutorGroup类protected ThreadFactory newDefaultThreadFactory() {return new DefaultThreadFactory(getClass());//getClass()为:NioEventLoopGroup.class}DefaultThreadFactory类public DefaultThreadFactory(Class<?> poolType) {this(poolType, false, Thread.NORM_PRIORITY);}- 根据线程个数是否为 2 的幂次方,采用不同策略初始化 chooser
private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}- 产生 nTreads 个 NioEventLoop 对象保存在 children 数组中 ,线程都是通过调用 newChild 方法来产生的 。
@Overrideprotected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);}这里传给 NioEventLoop 构造函数的参数为:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider 。NioEventLoop 构造函数分析既然上面提到来 new 一个 NioEventLoop 对象,下面我们就看下这个类以及其父类 。
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {super(parent, threadFactory, false);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}provider = selectorProvider;selector = openSelector();}继续看父类 SingleThreadEventLoop 的构造函数protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {super(parent, threadFactory, addTaskWakesUp);}又是直接调用来父类 SingleThreadEventExecutor 的构造函数,继续看protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}this.parent = parent;this.addTaskWakesUp = addTaskWakesUp;//falsethread = threadFactory.newThread(new Runnable() {@Overridepublic void run() {boolean success = false;updateLastExecutionTime();try {//调用NioEventLoop类的run方法SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});taskQueue = newTaskQueue();}protected Queue<Runnable> newTaskQueue() {return new LinkedBlockingQueue<Runnable>();}主要干如下两件事:- 利用 ThreadFactory 创建来一个 Thread,传入了一个 Runnable 对象,该 Runnable 重写的 run 代码比较长,不过重点仅仅是调用 NioEventLoop 类的 run 方法 。
- 使用 LinkedBlockingQueue 类初始化 taskQueue。
DefaultThreadFactory类@Overridepublic Thread newThread(Runnable r) {Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());try {//判断是否是守护线程,并进行设置if (t.isDaemon()) {if (!daemon) {t.setDaemon(false);}} else {if (daemon) {t.setDaemon(true);}}//设置其优先级if (t.getPriority() != priority) {t.setPriority(priority);}} catch (Exception ignored) {// Doesn't matter even if failed to set.}return t;}protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(r, name);}FastThreadLocalThread类public FastThreadLocalThread(Runnable target, String name) {super(target, name);// FastThreadLocalThread extends Thread}到这里,可以看到底层还是借助于类似于Thread thread = new Thread(r)这种方式来创建线程 。关于NioEventLoop对象可以得到的点有,初始化了如下4个属性 。
- NioEventLoopGroup (在父类SingleThreadEventExecutor中)
- selector
- provider
- thread (在父类SingleThreadEventExecutor中)
入群 ,扫码加入我们交流群!
文章插图
【详细讲讲自己第一次在哪里 详细讲讲netty的pipiline!】

文章插图
欢迎关注我的公众号!里面可以加入微信技术交流群!
- 自私不考虑别人的说说 只顾自己不顾别人说说
- 自己在家怎么清洗热水器 自己在家怎么清洗油烟机
- 怎样查电脑MAC地址,怎么查自己电脑的mac地址在哪里
- 自己包的粽子保质期是多久?
- 心情简短一句话分享 用一句话描述自己的心情
- 如何贷款创业 自己创业可以贷款吗
- 怎么看自己共享的文件,共享文件谁打开了怎么查看
- 找个正规产品代理 自己找厂家做代理卖货
- 怎样跟自己的孩子握手言和?
- 自己生日感谢的话语 生日感谢词精美句子
