虽然NioEventLoop追朔到源头是继承了EventExector,但是两者在使用场景上有很大的区别。

NioEventLoop的主要场景是用在Nio的场景下的IO轮询,而EventExecutor则是在事件触发的时候,将事件执行的逻辑交给它去处理。

        final EventExecutorGroup handlerGroup = new DefaultEventExecutor();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)....childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(handlerGroup, sslCtx.newHandler(ch.alloc()));}}});...}

在ChannelPipeline中添加ChannelHandler时候,可以在第一个参数传递EventExecutor的具体实现,Netty会在事件触发的时候,将ChannelHandler的处理逻辑放在EventExecutor中执行,而不占用NioEventLoop的轮询时间。所以接下来我们来看看EventExecutor的逻辑处理。

1、继承关系图

其实我们从DefaultEventExecutor出发的话,就可以直到相关EventExecutor的继承关系图。       

Netty最顶层的类是EventExecutorGroup,它继承了jdk的Executor、ScheduledExecutorService和Iterable,说明EventExecutorGroup实现了线程池、定时任务线程池以及迭代器的功能。

2、EventExecutorGroup

EventExeutorGroup直接继承的接口是ScheduledExecutorService和Iterable,所以其本身就默认提供了ScheduledExecutorService的调度方法。

// ScheduledExecutorService默认方法,提供了定时执行Runnable的功能
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);// ScheduledExecutorService默认方法,提供了定时执行Callable的功能,会有执行定时任务后的返回值
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);// ScheduledExecutorService默认方法,按指定频率周期执行某个任务。同样也会确保
// 上一次任务执行再执行,但是如果上一次任务执行的时间超过了下一次任务执行的时间
// 就会在上一次任务执行完后立即执行,相当于连续执行。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);// 区别scheduleAtFixedRate,必须保证上次任务执行完毕后,才间隔delay时间再执行。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

在继承迭代器方便,主要是返回EventExecutor,其实可以从名字理解到EventExecutorGroup就是相当于一组EventExecutor。

// 返回一个由EventExecutorGroup管理的EventExecutor
EventExecutor next();// 迭代器返回下一个EventExecutor
Iterator<EventExecutor> iterator();

EventExecutorGroup也提供了基本的任务提交功能,这里可能因为Netty全部都是异步的,需要Future去更好确定执行的任务结果,所以全部都继承了代返回值Future的submit方法。

Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);

最后EventExecutorGroup提供了优雅关闭的接口,当执行优雅关闭时,会让所有在其管理的EventExecutor尝试执行shutdown关闭自身。

// shutdownGracefully一旦执行,该方法返回true
boolean isShuttingDown();
// 优雅关闭
Future<?> shutdownGracefully();
// 带超时时间的优雅关闭
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
// 当所有的EventExecutor都terminated时,该方法返回true
Future<?> terminationFuture();

3、EventExecutor

EventExecutorGroup负责管理一组EventExecutor,类比jdk的ThreadPoolExecutor和Thread的关系。EventExecutor本身并不包含事件的处理逻辑,而是相当于提供一个执行事件的场所(线程)去执行ChannelHandler中的事件处理逻辑,当然本文依然把EventExecutor及其子类称为事件处理器,因为它

比较奇特的是EventExecutor继承了EventExecutorGroup,相当于Thread继承了ThreadPoolExecutor

public interface EventExecutor extends EventExecutorGroup {...}

其实这点很好理解,EventExecutorGroup相当于提供了多个事件处理器,可以同时提交多个任务给不同的事件处理器执行,而EventExecutor继承了EventExecutorGroup,本身也可以像EventExecutorGroup提交多个任务,但是只会委派给一个事件处理器执行。所以在注释里面你可以看到EventExecutorGroup和EventExecutor在next方法的区别

/*** Returns a reference to itself.(EventExecutor)*/
@Override
EventExecutor next();/*** Returns one of the {@link EventExecutor}s * managed by this {@link EventExecutorGroup}.*/
EventExecutor next();

EventExecutor比较重要的方法就是inEventLoop,因为Channel为了保证线程安全,只会和一个EventExecutor进行绑定,这样就可以保证只会有一个线程可以访问Channel,前提是必须保证Channel的所有操作都必须是在绑定的EventExecutor上,inEventLoop的作用就是体现在这里。

boolean inEventLoop();boolean inEventLoop(Thread thread);

4、AbstarctEventExecutor

AbstractEventExecutor作用是实现了EventExecutor的部分公共方法。每个AbstractEventExecutor创建的时候可以指定EventExecutorGroup,表示当前对象是否被管理的,如果是独立的EventExecutor,可以传递null。

    private final EventExecutorGroup parent;protected AbstractEventExecutor() {this(null);}protected AbstractEventExecutor(EventExecutorGroup parent) {this.parent = parent;}// 返回EventExecutorGrouppublic EventExecutorGroup parent() {return parent;}

如果是独立的EventExecutor,那么next方法返回的必然是自身。

    public EventExecutor next() {return this;}

同时定义了执行任务的逻辑,也就是给出了在AbstractEventExecutor执行Runnable方法的逻辑,比较简单就是执行后如果抛出异常就打日志。

    protected static void safeExecute(Runnable task) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception. Task: {}", task, t);}}

但是AbstractEventExecutor并没有给出inEventLoop、shutdownGracefully的具体逻辑。而是交给其子类去负责定义。

5、AbstractScheduledEventExecutor

这个类就比较有意思了,因为它的功能并不仅仅是执行任务,而是在一定的时候到达再执行任务。它内存存储了一个优先队列,将定时任务按deadline从小到大进行排序。

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

我们先来看下定时任务ScheduledFutureTask的定义。

5.1、ScheduledFutureTask

考虑到一个定时任务,处理存储任务的执行逻辑之外,还必须存储任务的具体执行时间。Netty在这里的具体做法是存储JVM的启动时那一刻的时间,并且以纳秒保存deadline。

// 保存JVM启动时间
private static final long START_TIME = System.nanoTime();// 如果按时间轴来看的话,任何时间减去START_TIME返回的就是相当于0时刻启动经过的纳秒数
static long nanoTime() {return System.nanoTime() - START_TIME;
}// 保存的deadline,可以理解为哪一刻需要执行任务
private long deadlineNanos;

如果deadline是以0时刻开始计算的,所以计算距离存储的deadline还有多久的时候,就可以用以下公式计算

deadline-(当前时间 - 系统启动时间)

Netty中的计算也是这样的

    public long deadlineNanos() {return deadlineNanos;}public long delayNanos() {return deadlineToDelayNanos(deadlineNanos());}static long deadlineToDelayNanos(long deadlineNanos) {return Math.max(0, deadlineNanos - nanoTime());}// 如果给出某一刻的时间的话,计算也是一样的public long delayNanos(long currentTimeNanos) {return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));}

ScheduledFutureTask本身也是继承了Comparable,所以在塞入优先队列的时候可以保证一定的顺序

   public int compareTo(Delayed o) {if (this == o) {return 0;}ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;// 优先保证最近的deadlinelong d = deadlineNanos() - that.deadlineNanos();if (d < 0) {return -1;} else if (d > 0) {return 1;} else if (id < that.id) {return -1;} else {assert id != that.id;return 1;}}

ScheduledFutureTask继承的Comparable,有限保证deadline比较小的task可以排在队列的头。如果时间一样,则会根据id进行排序。id是只有在优先队列塞入任务时候才会定义。

    // set once when added to priority queueprivate long id;// 具体调用scheduledTaskQueue().add(task.setId(nextTaskId++));

现在知道了AbstractScheduledEventExecutor在队列里面会塞入SchedulerFutureTask,并且会在task的deadline中执行,那么此时会遇到两个问题

a、如何往队列中塞入ScheduledFutureTask;

b、如何准确在deadline的时候执行任务。

AbstractScheduledEventExecutor内部已经定义了往队列中塞入ScheduledFutureTask的逻辑。

    private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {if (inEventLoop()) {scheduledTaskQueue().add(task.setId(nextTaskId++));} else {executeScheduledRunnable(new Runnable() {@Overridepublic void run() {scheduledTaskQueue().add(task.setId(nextTaskId++));}}, true, task.deadlineNanos());}return task;}

首先会判断当前是否处于EventLoop中,如果时直接往队列中塞入任务。否则,就会把这个塞入当作当作一个任务,在后续某一时刻执行。

    void executeScheduledRunnable(Runnable runnable,@SuppressWarnings("unused") boolean isAddition,@SuppressWarnings("unused") long deadlineNanos) {execute(runnable);}

这里有一个问题,executeScheduledRunnable应该是交给ScheduledFutureTask对应的EventLoop中执行才对,所以这里具体要看实现的子类如何将任务转交给对应的ScheduledFutureTask,所以在这点保留。

最后关于AbstractScheduledEventExecutor如何处理定时任务的逻辑交给下面的SingelThreadEvenentExecutor去解析,这个类将是比较重要的,因为基本EventExecutor的实现逻辑都在这里面了。

6、SingleThreadEventExecutor

SingleThreadEventExecutor就是不断塞入任务到队列中,然后另一个线程不断拿任务执行,就是和普通的线程池一样的生产者和消费者模式。

那么既然是生产者和消费者模式,塞入任务这件事交给外部的线程去操作,SingleThreadEventExecutor内部的线程池就是专门负责处理这些任务的。我们来看看内部的线程池是如何初始化的。

SingleThreadEventExecutor内部存有一个Thread变量,所有的任务都会在这个线程执行,但是其本身又不会直接使用这个线程,而是通过线程池的方式传递任务和管理线程。

// 线程池
private final Executor executor;
// 线程
private volatile Thread thread;

SingleThreadEventExecutor会初始化内部的线程池,具体在构造方法中实现。

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;// 最大塞入队列的任务数this.maxPendingTasks = Math.max(16, maxPendingTasks);// 任务执行器this.executor = ThreadExecutorMap.apply(executor, this);// 任务队列taskQueue = newTaskQueue(this.maxPendingTasks);// 队列满了之后的拒绝策略rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}// ThreadExecutorMappublic static Executor apply(final Executor executor, final EventExecutor eventExecutor) {ObjectUtil.checkNotNull(executor, "executor");ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");return new Executor() {@Overridepublic void execute(final Runnable command) {executor.execute(apply(command, eventExecutor));}};}// ThreadExecutorMappublic static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {ObjectUtil.checkNotNull(command, "command");ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");return new Runnable() {@Overridepublic void run() {setCurrentEventExecutor(eventExecutor);try {command.run();} finally {setCurrentEventExecutor(null);}}};}

第一个apply方法可能嵌套着看不太清楚,可以分成这样看

    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {ObjectUtil.checkNotNull(executor, "executor");ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");return new Executor() {@Overridepublic void execute(final Runnable command) {Runnable runnable = apply(command, eventExecutor);executor.execute(runnable);}};}

所以初始化SingleThreadEventExecutor.EventExecutor的时候,过程就很清楚了。SingleThreadEventExecutor在初始化的时候,通过调用ThreadExecutorMap.apply返回了一个Executor,而Jdk原生的Executor只需要实现一个方法即可。

void execute(Runnable command);

所以在ThreadExecutorMap.apply会再传递一个Executor负责执行SingleThreadEventExecutor中的Executor的任务,换句话说,就是SingleThreadEventExecutor.Executor内部的任务执行是通过构造函数中传递的另一个Executor执行的,相当于就是一个代理模式。为什么需要这样做呢?重点在于第二个apply这个方法,它在传递任务Runnable的时候,在任务执行的前后分别将执行此任务的EventExecutor绑定、解绑到自身持有的一个FastThreadLocal中。

    private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();private static void setCurrentEventExecutor(EventExecutor executor) {mappings.set(executor);}

这样绑定到一个FastThreadLocal的目的是,方便框架和内部任务去获取到执行的EventExecutor,要知道你最终执行任务的是线程Thread,和Runnable。你如果在执行任务的过程中,需要获取EventExecutor的话,如果放在每一个Runnable中其实不太现实,最好还是放在线程本身岁时可以拿到的地方,所以这里就会使线程本地变量。

而且要直到SingleThreadEventExecutor初始化的时候,传递的Executor是不限类型的,所以你可以传递多线程的线程池,但是呢SingleThreadEventExecutor只会和一个线程进行绑定,具体可以看到doStartThread()的逻辑,当你启动线程的时候,其实是通过往线程池丢了一个任务进去

private void doStartThread() {executor.execute(new Runnable() {...// run内部是死循环SingleThreadEventExecutor.this.run();...}...
}

所以一旦SingleThreadEventExecutor从你通过构造函数传递的Executor获取到线程的时候,就会在run方法内部通过死循环不断获取任务执行,相当于会一直把握住该线程。

作为一个封装了底层的框架,应该尽量和底层的框架API使用方法不要很大的区别,SingleThreadEventExecutor既然继承自JDK底层的Executor,那么使用方法也不应该又很大的区别。当你想直接往里面执行任务的时候,只需要和以往的Executor一样即可。

SingleThreadEventExecutor.executor(Runnable runnable);

所以我们来看executor的逻辑

    @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();addTask(task);// 因为任务必须丢给EventLoop内部绑定的线程执行,而这个线程是jdk原生的// Executor的,只有在在任务内部执行过程中才可以获取,所以刚开始EventLoop// 的thread应该是null,也就是inEventLoop应该是返回false,所以才会有startThread的这个方法if (!inEventLoop) {startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}protected void addTask(Runnable task) {if (task == null) {throw new NullPointerException("task");}if (!offerTask(task)) {reject(task);}}final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task);}

executor内部直接把任务放到队列中去,如果检测到当前线称不是SingleThreadEventExecutor内部存储的Thread变量,那么会直接退出;否则就会检测当前线程的状态是否启动。并且里面启动线程。下面是inEventLoop的逻辑

    @Overridepublic boolean inEventLoop(Thread thread) {return thread == this.thread;}

具体任务的处理细节还得深入了解

    // SingelTheadEventExecutorprivate void startThread() {// 如果当前SingelTheadEventExecutor还未启动,则尝试通过CAS修改状态// CAS确保多个线程投递任务时,只有一个修改成功。if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {// 最终进入doStartThread逻辑启动任务doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}

这里其实可以发现一点,SingleThreadEventExecutor会通过内部的队列不断获取任务出来执行,但是它不是一开始就是启动的,而是类似懒加载那种,只有第一次有任务投递时才开始启动,而且为了避免多个线程投递任务时线程启动导致线程不安全,启动时都用了CAS保证只有一个线程能够修改状态正确。

    private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;// 设置状态为ST_SHUTTING_DOWNif (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {if (logger.isErrorEnabled()) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +"be called before run() implementation terminates.");}}try {// Run all remaining tasks and shutdown hooks.for (;;) {// 将任务和钩子函数执行完if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify// the future. The user may block on the future and once it unblocks the JVM may terminate// and start unloading classes.// See https://github.com/netty/netty/issues/6596.FastThreadLocal.removeAll();// 设置状态为terminatedSTATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});
}

SingleThreadEventExecutor更加类似的是JDK原生的SingleThreadExecutor,都是只会有一个线程在不断处理任务,这样可以有效的避免的线程安全,当然效率可能会大大折扣,这个得看具体的场景。

SingleThreadEventExecutor没有定义run的具体实现,而是交给了子类实现。例如DefaultEventExecutor。

    // DefaultEventExecutor@Overrideprotected void run() {for (;;) {Runnable task = takeTask();if (task != null) {task.run();updateLastExecutionTime();}if (confirmShutdown()) {break;}}}

Netty中的EventExecutor相关推荐

  1. netty中的future和promise源码分析(二)

    前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise. promise是可写的future,从future的分析中可以发现在其中没 ...

  2. Netty中的策略者模式

    策略者模式的特点 在设计类的继承体系时,我们会刻意的把公共的部分都提取到基类中 比如先设计Person类,把人类都具有的行为放到这个Person,特有的行为设计成抽象方法,让子类具体去实现, 这样后续 ...

  3. Netty中的Future

    先看下Future的整个继承体系,还有一个ChannelFuture不在里面:     在并发编程中,我们通常会用到一组非阻塞的模型:Promise,Future 和 Callback.其中的 Fut ...

  4. 浅析操作系统和Netty中的零拷贝机制

    点击关注公众号,Java干货及时送达 零拷贝机制(Zero-Copy)是在操作数据时不需要将数据从一块内存区域复制到另一块内存区域的技术,这样就避免了内存的拷贝,使得可以提高CPU的.零拷贝机制是一种 ...

  5. TCP的粘包和拆包及Netty中的解决方案

    1.基本介绍 TCP 是面向连接的,面向流的,提供高可靠性服务.收发两端(客户端和服务器端)都要有一一成对的 socket, 因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(N ...

  6. Netty中的那些坑

    Netty中的那些坑(上篇) 最近开发了一个纯异步的redis客户端,算是比较深入的使用了一把netty.在使用过程中一边优化,一边解决各种坑.儿这些坑大部分基本上是Netty4对Netty3的改进部 ...

  7. java中channelmessage,MessagePack在Netty中的应用

    [toc] MessagePack在Netty中的应用 前面使用Netty通信时,传输的都是字符串对象,因为在进行远程过程调用时,更多的是传输pojo对象,这时就需要对pojo对象进行序列化与反序列化 ...

  8. 理解Netty中的零拷贝(Zero-Copy)机制

    理解Netty中的零拷贝(Zero-Copy)机制 发表于2年前(2014-01-13 15:11)   阅读(10209) | 评论(12) 164人收藏此文章,我要收藏 赞29 12月12日北京O ...

  9. netty系列之:netty中各不同种类的channel详解

    文章目录 简介 ServerChannel和它的类型 Epoll和Kqueue AbstractServerChannel ServerSocketChannel ServerDomainSocket ...

最新文章

  1. 李开复预测:未来20年 AI将深刻影响五大产业
  2. 巧用Linux 架设TFTP Server备份路由器的配置文件
  3. ctf之py反编译求p*q%n==1
  4. 局内网用户访问wamp本地站点
  5. ES6-1 ES6版本过渡历史
  6. 福建省考计算机专业,2020福建省考,这些报考专业问题你清楚吗?
  7. 收藏 | 深度学习19个损失函数汇总
  8. c语言创建线程代码,如何用C语言实现多线程
  9. 计算机网络TCPP是一组什么,WWW的全称是什么?WWW中文名称是啥?
  10. C# 关于浏览器——CefSharp篇
  11. 贴吧自动签到脚本linux,【渣作】shell脚本百度贴吧签到器
  12. 基于CNN+tensorflow对搜狐新闻进行分类并对函数进行封装
  13. 【ML37】Bellman Equations
  14. 北大教授:学术会议,已沦为表演
  15. mysql字段时间类型报异常Data truncation: Incorrect datetime value: ‘2099-01-01 00:00:00‘ for column
  16. C语言鼠标操作方法及源码
  17. 输入月份号,输出该月的英文月名。用指针数组处理
  18. ActiveMQ Message Cursors、Async Sends、Optimized Acknowledgement、Producer Flow Control
  19. 【CSS】DIV 自定义滚动条样式
  20. 使用lifecycle时,1.0.0和1.0.3问题解决方法

热门文章

  1. C语言子函数通过传递参数地址改变参数数值2021-05-28
  2. iOS游戏开发没有你想的那么难--Hardest
  3. 6s有时有信号 有时无服务器,揭秘:iPhone 6S 的没信号没网络维修故障分析
  4. 直播代码,直播室源码,直播视频源码
  5. Appium连接手机之Remote
  6. Java 1~4章复习
  7. compaq CQ40 CQ45系列笔记本IDT声卡在Ubuntu9.04中无声的解决方法
  8. 安卓手机如何投屏到电视上_一招教你手机如何投屏到电视上
  9. 教你批量筛选,快递物流中一天未更新的单号
  10. 关于单片机串口单步调试运行正常,全速异常