netty reactor线程模型分析
netty4线程模型
ServerBootstrap http示例
// Configure the server.EventLoopGroup bossGroup = new EpollEventLoopGroup(1);EventLoopGroup workerGroup = new EpollEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.channel(EpollServerSocketChannel.class);b.option(ChannelOption.SO_BACKLOG, 1024);b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);b.group(bossGroup, workerGroup)// .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpHelloWorldServerInitializer(sslCtx));Channel ch = b.bind(PORT).sync().channel(); /* System.err.println("Open your web browser and navigate to " +(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');*/ch.closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
绑定过程:
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.executor = channel.eventLoop();}doBind0(regFuture, channel, localAddress, promise);}});return promise;}}
初始化过程:
final ChannelFuture initAndRegister() {final Channel channel = channelFactory().newChannel();try {init(channel);} catch (Throwable t) {channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}
ServerBootStrap的初始化过程:
@Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = handler();if (handler != null) {pipeline.addLast(handler);} pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}
接收器ServerBootstrapAcceptor
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}
ThreadPerChannelEventLoopGroup实现注册
@Overridepublic ChannelFuture register(Channel channel) {if (channel == null) {throw new NullPointerException("channel");}try { EventLoop l = nextChild();return l.register(channel, new DefaultChannelPromise(channel, l));} catch (Throwable t) {return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);}}
获取子eventLoop
private EventLoop nextChild() throws Exception {if (shuttingDown) {throw new RejectedExecutionException("shutting down");}EventLoop loop = idleChildren.poll();if (loop == null) {if (maxChannels > 0 && activeChildren.size() >= maxChannels) {throw tooManyChannels;}loop = newChild(childArgs);loop.terminationFuture().addListener(childTerminationListener);}activeChildren.add(loop);return loop;}
产生新子eventLoop(SingleThreadEventExecutor.java)
/*** Create a new instance** @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it* @param executor the {@link Executor} which will be used for executing* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the* executor thread*/protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {super(parent);if (executor == null) {throw new NullPointerException("executor");}this.addTaskWakesUp = addTaskWakesUp;this.executor = executor;taskQueue = newTaskQueue();}
其执行方法(SingleThreadEventExecutor.java):
@Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else { startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}
启动处理线程(SingleThreadEventExecutor.java):
private void startThread() {if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}
其中的run方法由其子类(DefaultEventLoop,EpollEventLoop,NioEventLoop,ThreadPerChannelEventLoop)各种实现,以NioEventLoop为例:
@Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {if (hasTasks()) {selectNow();} else {select(oldWakenUp);// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys(); runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore. }}}}
运行所有任务(SingleThreadEventExecutor.java)
/*** Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.*/protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();Runnable task = pollTask();if (task == null) {return false;}final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception.", t);}runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}this.lastExecutionTime = lastExecutionTime;return true;}
小结
本文从一个简单的示例程序,一步步分析netty4的线程模型,从ServerBootstrapAcceptor到SingleThreadEventExecutor的源码,环环相扣,可以根据上面的分析链理解
一个请求过来后,netty的处理流程。
转载于:https://www.cnblogs.com/davidwang456/p/5118802.html
netty reactor线程模型分析相关推荐
- Netty Reactor线程模型与EventLoop详解
本文来说下Netty Reactor线程模型与EventLoop 文章目录 EventLoop事件循环 任务调度 线程管理 线程分配 非阻塞传输 阻塞传输 Netty线程模型 单Reactor单线程模 ...
- Linux 的 IO 通信 以及 Reactor 线程模型浅析
目录 随着计算机硬件性能不断提高,服务器 CPU 的核数越来越越多,为了充分利用多核 CPU 的处理能力,提升系统的处理效率和并发性能,多线程并发编程越来越显得重要.无论是 C++ 还是 Java 编 ...
- 面试官:Netty的线程模型可不是Reactor这么简单
笔者看来Netty的内核主要包括如下图三个部分: 其各个核心模块主要的职责如下: 内存管理 主要提高高效的内存管理,包含内存分配,内存回收. 网通通道 复制网络通信,例如实现对NIO.OIO等底层JA ...
- 【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
文章目录 一. NIO 原生 API 弊端 二. Netty 简介 三. Netty 架构 四. Netty 版本 五. Netty 线程模型 六. 阻塞 IO 线程模型 七. 反应器 ( React ...
- Netty和RPC框架线程模型分析
<Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty和RPC框架线程模型.李林锋已在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可 ...
- reactor线程模型_简单了解Java Netty Reactor三种线程模型
1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接 ...
- Netty 和 RPC 框架线程模型分析
https://www.infoq.cn/article/9Ib3hbKSgQaALj02-90y 1. 背景 1.1 线程模型的重要性 对于 RPC 框架而言,影响其性能指标的主要有三个要素: I/ ...
- Netty之线程模型
Reactor 线程模型: Reactor 是反应堆的意思,Reactor 模型是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式.服务端程序处理传入多路请求,并将它们同步分派给请 ...
- reactor线程模型_面试一文搞定JAVA的网络IO模型
1,最原始的BIO模型 该模型的整体思路是有一个独立的Acceptor线程负责监听客户端的链接,它接收到客户端链接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客 ...
最新文章
- 04- CoreData轻量级版本的迁移
- Linux下给PHP安装redis扩展
- CSSbackground的详细使用
- Python学习笔记: Python 标准库概览
- Python爬取京东笔记本电脑,来看看那个牌子最棒
- java contains 通配符_java删除文件支持通配符
- raspberry ubuntu 修改源为清华_Ubuntu 下 Janus Server 搭建笔记
- .Net Core下使用MQTT协议直连IoT平台
- Unity上的Oculus Quest2开发(1) ——首先要空工程能在Quest上跑起来吧
- [转载]Mysql导出表结构及表数据 mysqldump用法
- Prometheus正式从CNCF毕业
- 集成电路实践----D触发器
- firefox linux 关闭,Firefox 的 Flash 被禁用的解决方法
- CCF推荐-计算机网络领域顶级期刊会议
- a one-way repeated-measures ANOVA
- Typora + PicGo + Github实现图床
- 什么是构造函数及定义
- dragonfly数据库
- 搜索引擎优化的优势及发展问题
- Linux中awk后面的RS, ORS, FS, OFS 用法
热门文章
- 计算机应用基础操作题教学考试,电大教学全国计算机应用基础考试网考内容全部操作题.doc...
- mybatis返回map键值对_mybatis返回map结果集怎么配置
- java标注释_跪请JAVA高手帮忙标一下注释
- 事件标志组的原理与创建
- 定点数的编码表示方法
- 下边框_山寨iPhone12Pro开箱:浴霸四摄更小,下边框有点宽
- 不属于计算机常用软件日常应用的是,综合技能实践+计算机常用应用软件的安装和使用指导 (1)...
- 树莓派:和电脑之间的串口编程,以及树莓派的备份
- CV之路——opencv基本操作
- ubuntu查看OpenCV的版本和安装的库