前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty】NioEventLoop

【Netty】NioEventLoop

作者头像
用户3467126
修改2019-07-03 19:52:18
6800
修改2019-07-03 19:52:18
举报
文章被收录于专栏:爱编码爱编码

简介

Netty框架的主要线程就是I/O线程,线程模型的设计决定了系统的吞吐量、并发性和安全性等架构质量属性。所以了解一下NioEventLoop。

Reactor线程模型

基本上所有的网络处理程序都有以下基本的处理过程: Read request Decode request Process service Encode reply Send reply

Reactor单线程模型

这是最简单的单Reactor线程模型,它负责多路分离套接字,Accept新连接,并分派请求到处理器链中。该模型适用于处理器链中业务处理组件能快速完成的场景。但这种模型并不能充分利用多核资源,实际使用少。

Reactor多线程模型

相比上一种模型,该模型在处理器链部分采用了多线程(线程池),也就是后端程序常见的模型。但Reactor仍为单个线程。

Reactor主从模型

主从Reactor多线程:多个acceptor的NIO线程池用于接受客户端的连接。将Reactor分成两部分,mainReactor负责监听Server socket,accpet新连接,并将简历的socket分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据,将业务处理功能扔给worker线程池完成。通常subReactor个数上与CPU个数等同。

以上就是对Reactor线程模型的学习。

Netty的线程模型

netty的线程模型是可以通过设置启动类的参数来配置的,设置不同的启动参数,netty支持Reactor单线程模型、多线程模型和主从Reactor多线程模型。

Boss线程池职责如下: (1)接收客户端的连接,初始化Channel参数 (2)将链路状态变更时间通知给ChannelPipeline

worker线程池作用是: (1)异步读取通信对端的数据报,发送读事件到ChannelPipeline (2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口 (3)执行系统调用Task; (4)执行定时任务Task;

通过配置boss和worker线程池的线程个数以及是否共享线程池等方式,netty的线程模型可以在单线程、多线程、主从线程之间切换。

为了提升性能,netty在很多地方都进行了无锁设计。比如在IO线程内部进行串行操作,避免多线程竞争造成的性能问题。表面上似乎串行化设计似乎CPU利用率不高,但是通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁串行线程设计性能更优。

NioEventLoop源码分析

基于Netty4.1.36

问题: 1.默认情况下,netty服务端起多少线程?何时启动? 2.Netty是如何解决jdk空轮询bug的? 3.Netty如何保证异步串行无锁化?

NioEventLoop创建流程

大致来说,从new NioEventLoopGroup()入手,然后到MultithreadEventLoopGroup的构造中明确的写明了默认为CPU的2倍的线程,接着new ThreadPerTaskExecutor()[线程创建器],然后就是一个死循环newChild()构造NioEventLoop,最后就是newChooser()[线程选择器]为后面的启动和执行做准备。

NioEventLoop启动流程和执行逻辑

NioEventLoop启动从客户端bind()入手,然后跟踪到doBind0(),接着到SingleThreadEventExecutor中execute(),该方法主要是添加任务addTask(task)和运行线程startThread(),然后在startThread()-->doStartThread()-->SingleThreadEventExecutor.this.run();开始执行NioEventLoop运行逻辑。

NioEventLoop启动后主要的工作

1.select() -- 检测IO事件,轮询注册到selector上面的io事件

2.processSelectedKeys() -- 处理io事件

3.runAllTasks() -- 处理外部线程扔到TaskQueue里面的任务

1.select() -- 检测IO事件

检测IO事件主要有三个部分:

deadline以及任务穿插逻辑处理:计算本次执行select截止时间(根据NioEventLoop当时是否有定时任务处理)以及判断在select的时候是否有任务要处理。

阻塞式select:未到截止时间或者任务队列为空进行一次阻塞式select操作

避免JDK空轮询的Bug:判断这次select操作是否阻塞timeoutMillis时间,未阻塞timeoutMillis时间表示触发JDK空轮询;判断触发JDK空轮询的次数是否超过阈值,达到阈值调用rebuildSelector()方法替换原来的selector操作方式避免下次JDK空轮询继续发生

代码语言:javascript
复制
1.	private void select(boolean oldWakenUp) throws IOException {
2.	        Selector selector = this.selector;
3.	        try {
4.	            int selectCnt = 0;
5.	            long currentTimeNanos = System.nanoTime();
6.	            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
7.	
8.	            for (;;) {
9.	
10.	                /** 1.deadline以及任务穿插逻辑处理-- 开始**/
11.	                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
12.	                if (timeoutMillis <= 0) {
13.	                    if (selectCnt == 0) {
14.	                        selector.selectNow();
15.	                        selectCnt = 1;
16.	                    }
17.	                    break;
18.	                }
19.	
20.	
21.	                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
22.	                // Selector#wakeup. So we need to check task queue again before executing select operation.
23.	                // If we don't, the task might be pended until select operation was timed out.
24.	                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
25.	                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
26.	                    selector.selectNow();
27.	                    selectCnt = 1;
28.	                    break;
29.	                }
30.	                  /** 1.deadline以及任务穿插逻辑处理-- 结束**/
31.	                  /**2.阻塞select--开始**/
32.	                int selectedKeys = selector.select(timeoutMillis);
33.	                selectCnt ++;
34.	               /**2.阻塞select--结束**/
35.	                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
36.	                    // - Selected something,
37.	                    // - waken up by user, or
38.	                    // - the task queue has a pending task.
39.	                    // - a scheduled task is ready for processing
40.	                    break;
41.	                }
42.	                if (Thread.interrupted()) {
43.	                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
44.	                    // As this is most likely a bug in the handler of the user or it's client library we will
45.	                    // also log it.
46.	                    //
47.	                    // See https://github.com/netty/netty/issues/2426
48.	                    if (logger.isDebugEnabled()) {
49.	                        logger.debug("Selector.select() returned prematurely because " +
50.	                                "Thread.currentThread().interrupt() was called. Use " +
51.	                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
52.	                    }
53.	                    selectCnt = 1;
54.	                    break;
55.	                }
56.	                 /**3.避免jdk空轮询的bug -- 开始 **/
57.	                long time = System.nanoTime();
58.	                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
59.	                    // timeoutMillis elapsed without anything selected.
60.	                    selectCnt = 1;
61.	                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
62.	                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
63.	                    // The code exists in an extra method to ensure the method is not too big to inline as this
64.	                    // branch is not very likely to get hit very frequently.
65.	                    selector = selectRebuildSelector(selectCnt);
66.	                    selectCnt = 1;
67.	                    break;
68.	                }
69.	
70.	                currentTimeNanos = time;
71.	            }
72.	            /**3.避免jdk空轮询的bug -- 结束**/
73.	            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
74.	                if (logger.isDebugEnabled()) {
75.	                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
76.	                            selectCnt - 1, selector);
77.	                }
78.	            }
79.	        } catch (CancelledKeyException e) {
80.	            if (logger.isDebugEnabled()) {
81.	                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
82.	                        selector, e);
83.	            }
84.	            // Harmless exception - log anyway
85.	        }
86.	    }

2. processSelectedKeys()-- 处理IO事件

selected keySet优化

select操作每次把已就绪状态的io事件添加到底层HashSet(时间复杂度为O(n))数据结构,通过反射方式将HashSet替换成数组的实现.

NioEventLoop.openSelector()

代码语言:javascript
复制
  
1.	private SelectorTuple openSelector() {
2.	        final Selector unwrappedSelector;
3.	        try {
4.	            unwrappedSelector = provider.openSelector();
5.	        } catch (IOException e) {
6.	            throw new ChannelException("failed to open a new selector", e);
7.	        }
8.	
9.	        if (DISABLE_KEY_SET_OPTIMIZATION) {
10.	            return new SelectorTuple(unwrappedSelector);
11.	        }
12.	
13.	        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
14.	            @Override
15.	            public Object run() {
16.	                try {
17.	                    return Class.forName(
18.	                            "sun.nio.ch.SelectorImpl",
19.	                            false,
20.	                            PlatformDependent.getSystemClassLoader());
21.	                } catch (Throwable cause) {
22.	                    return cause;
23.	                }
24.	            }
25.	        });
26.	
27.	        if (!(maybeSelectorImplClass instanceof Class) ||
28.	            // ensure the current selector implementation is what we can instrument.
29.	            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
30.	            if (maybeSelectorImplClass instanceof Throwable) {
31.	                Throwable t = (Throwable) maybeSelectorImplClass;
32.	                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
33.	            }
34.	            return new SelectorTuple(unwrappedSelector);
35.	        }
36.	
37.	        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
38.	        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
39.	
40.	        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
41.	            @Override
42.	            public Object run() {
43.	                try {
44.	                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
45.	                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
46.	
47.	                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
48.	                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
49.	                        // This allows us to also do this in Java9+ without any extra flags.
50.	                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
51.	                        long publicSelectedKeysFieldOffset =
52.	                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);
53.	
54.	                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
55.	                            PlatformDependent.putObject(
56.	                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
57.	                            PlatformDependent.putObject(
58.	                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
59.	                            return null;
60.	                        }
61.	                        // We could not retrieve the offset, lets try reflection as last-resort.
62.	                    }
63.	
64.	                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
65.	                    if (cause != null) {
66.	                        return cause;
67.	                    }
68.	                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
69.	                    if (cause != null) {
70.	                        return cause;
71.	                    }
72.	
73.	                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
74.	                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
75.	                    return null;
76.	                } catch (NoSuchFieldException e) {
77.	                    return e;
78.	                } catch (IllegalAccessException e) {
79.	                    return e;
80.	                }
81.	            }
82.	        });
83.	
84.	        if (maybeException instanceof Exception) {
85.	            selectedKeys = null;
86.	            Exception e = (Exception) maybeException;
87.	            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
88.	            return new SelectorTuple(unwrappedSelector);
89.	        }
90.	        selectedKeys = selectedKeySet;
91.	        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
92.	        return new SelectorTuple(unwrappedSelector,
93.	                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
94.	    }

processSelectedKeysOptimized()

遍历SelectionKey数组获取SelectionKey的attachment即NioChannel; SelectionKey合法获取SelectionKey的io事件进行事件处理

NioEventLoop.processSelectedKeysOptimized()

代码语言:javascript
复制
1.	private void processSelectedKeysOptimized() {
2.	        for (int i = 0; i < selectedKeys.size; ++i) {
3.	            final SelectionKey k = selectedKeys.keys[i];
4.	            // null out entry in the array to allow to have it GC'ed once the Channel close
5.	            // See https://github.com/netty/netty/issues/2363
6.	            selectedKeys.keys[i] = null;
7.	
8.	            final Object a = k.attachment();
9.	
10.	            if (a instanceof AbstractNioChannel) {
11.	                processSelectedKey(k, (AbstractNioChannel) a);
12.	            } else {
13.	                @SuppressWarnings("unchecked")
14.	                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
15.	                processSelectedKey(k, task);
16.	            }
17.	
18.	            if (needsToSelectAgain) {
19.	                // null out entries in the array to allow to have it GC'ed once the Channel close
20.	                // See https://github.com/netty/netty/issues/2363
21.	                selectedKeys.reset(i + 1);
22.	
23.	                selectAgain();
24.	                i = -1;
25.	            }
26.	        } 
27.	    }

Task的分类和添加

创建NioEventLoop构造,外部线程使用addTask()方法添加task; ScheduledTaskQueue调用schedule()封装ScheduledFutureTask添加到普通任务队列

普通任务Task

SingleThreadEventExecutor.execute()-->addTask()

代码语言:javascript
复制
1.	protected void addTask(Runnable task) {
2.	        if (task == null) {
3.	            throw new NullPointerException("task");
4.	        }
5.	        if (!offerTask(task)) {
6.	            reject(task);
7.	        }
8.	    }

定时任务Task

将线程外的任务是通过加入队列实现,从而保证了线程安全。

AbstractScheduledEventExecutor.schedule() -->ScheduledFuture

代码语言:javascript
复制
   
1.	
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
2.	        if (inEventLoop()) {
3.	            scheduledTaskQueue().add(task);
4.	        } else {
5.	            execute(new Runnable() {
6.	                @Override
7.	                public void run() {
8.	                    scheduledTaskQueue().add(task);
9.	                }
10.	            });
11.	        }
12.	
13.	        return task;
14.	    }

任务的聚合

将定时任务队列任务聚合到普通任务队列

SingleThreadEventExecutor.fetchFromScheduledTaskQueue()

代码语言:javascript
复制
 1.	private boolean fetchFromScheduledTaskQueue() {
2.	        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
3.	        Runnable scheduledTask  = pollScheduledTask(nanoTime);
4.	        while (scheduledTask != null) {
5.	            if (!taskQueue.offer(scheduledTask)) {
6.	                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
7.	                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
8.	                return false;
9.	            }
10.	            scheduledTask  = pollScheduledTask(nanoTime);
11.	        }
12.	        return true;
13.	    }

ScheduledFutureTask中可以看到任务Task是先按照截止时间排序,然后按照id进行排序的。

代码语言:javascript
复制
1.	
public int compareTo(Delayed o) {
2.	        if (this == o) {
3.	            return 0;
4.	        }
5.	
6.	        ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
7.	        long d = deadlineNanos() - that.deadlineNanos();
8.	        if (d < 0) {
9.	            return -1;
10.	        } else if (d > 0) {
11.	            return 1;
12.	        } else if (id < that.id) {
13.	            return -1;
14.	        } else if (id == that.id) {
15.	            throw new Error();
16.	        } else {
17.	            return 1;
18.	        }
19.	    }

任务的执行

获取普通任务队列待执行任务,使用safeExecute()方法执行任务,每次当累计任务数量达到64判断当前时间是否超过截止时间中断执行后续任务

NioEventLoop.runAllTasks()

代码语言:javascript
复制
1.	
protected boolean runAllTasks(long timeoutNanos) {
2.	        fetchFromScheduledTaskQueue();
3.	        Runnable task = pollTask();
4.	        if (task == null) {
5.	            afterRunningAllTasks();
6.	            return false;
7.	        }
8.	
9.	        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
10.	        long runTasks = 0;
11.	        long lastExecutionTime;
12.	        for (;;) {
13.	            safeExecute(task);
14.	
15.	            runTasks ++;
16.	
17.	            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
18.	            // XXX: Hard-coded value - will make it configurable if it is really a problem.
19.	            if ((runTasks & 0x3F) == 0) {
20.	                lastExecutionTime = ScheduledFutureTask.nanoTime();
21.	                if (lastExecutionTime >= deadline) {
22.	                    break;
23.	                }
24.	            }
25.	
26.	            task = pollTask();
27.	            if (task == null) {
28.	                lastExecutionTime = ScheduledFutureTask.nanoTime();
29.	                break;
30.	            }
31.	        }
32.	
33.	        afterRunningAllTasks();
34.	        this.lastExecutionTime = lastExecutionTime;
35.	        return true;
36.	    }

总结

主要学习了NioEventLoop的基本知识,如果有更多知识欢迎各位分享,我还是个小菜鸟。

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-06,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 爱编码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • Reactor线程模型
    • Reactor单线程模型
      • Reactor多线程模型
        • Reactor主从模型
        • Netty的线程模型
        • NioEventLoop源码分析
          • NioEventLoop创建流程
            • NioEventLoop启动流程和执行逻辑
            • 总结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
            http://www.vxiaotou.com