AbstractChannel和AbstractUnsafe抽象类
io.netty.channel.AbstractChannel
从本章开始,会有大量的篇幅涉及到代码分析。为了能够清晰简洁的地说明代码的结构和功能,我会用代码注释+独立段落的方式加以呈现。 所以,为你能更好地理解代码,请不要忽略代码中黑体字注释。
AbstractChannel和AbstractUnsafe之间的关系
AbstractChannel实现了Channel接口,AbstractUnsafe实现了Unsafe。这两个类是抽象类,他们实现了Channel和Unsafe的绝大部分接口。在AbstractChannel的实现中,每个方法都会直接或间接调用Unsafe对应的同名方法。所有的inbound和outbound方法都是通过pipeline间接调用,其他的辅助方法直接使用unsafe实例调用。pipline和unsafe实例在AbstractChannel的构造方法创建:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe(); //AbstractChannel没有实现这个方法
pipeline = newChannelPipeline(); // newChannelPipline的实现 return new DefaultChannelPipeline(this);
}
直接调用的例子:
@Override
public SocketAddress localAddres) {
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
//这里直接调用了Unsafe的localAddress()方法
this.localAddress = localAddress = unsafe().localAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
间接调用的例子
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress); //通过pipline间接调用Unsafe的bind方法
}
关于pipline是怎样调用Unsafe方法的,会在后面的Pipline相关章节详细分析,这里只需记住。pipeline所有方法调用最终都会(如果没有改变ChannelContextHandler的默认实现)通过使用newUnsafe创建的Unsafe实例调用Unsafe的同名方法(如果有的话)。
netty给出这一对Abstract实现有两个目的:
  • 进一步明确接口的语意。
  • 简化Channel接口的实现。
下面来具体看一下AbstractUnsafe的主要方法实现。
AbstractUnsafe的重要实现
register实现
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) { //检查是否已经注册, 避免重复只需注册动作。
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {//检查eventloop是否满足Channel的要求,由子类实现
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//设置Channel的EventLoop实例
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) { //检查是否在当前线程中,如果是,直接调用
register0(promise);
} else {
//如果不是,把register0包装到runnable中放到eventloop中调用。
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
这个方法的实现为我们展示了netty使用I/O线程的一般套路
if(eventLoop.inEventLoop()){
doSomething();
}else{
eventLoop.execute(new Runnable(){
@Override
public void run() {
doSomething();
}
});
}
对于某个需要放到I/O线性中执行的方法,先检查当前线程是不是I/O线程,是就直接执行,不是就把它包装到Ruannable中放到eventLoop中执行。
register的功能总结一句话就是调用register0, 下面看看register0的实现。
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
//确保promise没有被取消同时Channel没有被关闭才能执行后面的动作
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); //执行真正的register操作,留改子类实现
neverRegistered = false;
registered = true; //设置Channel已经处于registed状态
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
//触发handlerAdded事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered(); //触发channelRegistered事件
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {//确保Channel只有在第一次register 的时候被触发
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
//对于设置了autoRead的Channel执行beginRead();
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register语义:
  1. 把channel和eventLoop绑定,eventLoop线程就是I/O线程。
  2. 确保真正的register操作在I/O线程中执行。
  3. 确保每个channel的register操作只执行一次。
  4. 真正的register操作执行成功后, 触发channelRegistered事件,如果channel此时仍处于active状态,触发channelActive事件,并确保这些事件只触发一次。
  5. 真正的register操作执行成功后, 如果channel此时仍处于active状态,并且channel的配置支持autoRead, 则执行beginRead操作,让eventLoop可以自动触发channel的read事件。
bind实现
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
//先保存是否active的状态
boolean wasActive = isActive();
try {
doBind(localAddress); //调用doBind, 需要子类实现这个方法完成真正的bind操作
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
//如果执行完doBind后从非active状态变成active装,则触发channelActive事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
bind语义:
  • 调用抽象方法doBind, 它需要子类实现。
  • 如果channel的状态从非active变成active状态,则触发channelActive事件
disconnect实现
disconnect和bind的实现类型,不同的是他调用的是doDisconnect方法,这个方法同样是抽象方法需要子类实现。当channel的状态从非active变成active状态时,调用pipeline.fireChannelInactive()触发channelInactive事件。
close实现
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify)
{
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) { //这段代码的作用就是防止多次执行close操作
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//把outboundBuffer置空,在这之后无法进行write或flush操作
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = prepareToClose(); //这个方法默认实现是return null. 如果有些可以在子类中覆盖这个方法添加关闭前的准备代
//下面的if..else执行的是相同的操作,不同的是如果closeExecutor可以用,就在这个executor中执行,否则在当前线程总执行
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise); //执行close操作
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
// close完成之后的操作, 在eventLoop中执行
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
//对outboundBuffer中的数据进行错误处理
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
//执行deregister操作, 如果channel由active变成非active状态就触发channelInactive事件
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
//如果正在执行flush操作,把deregister操作放在eventLoop中执行
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
doClose(); //调用doClose执行真正的close操作,它是一个抽象方法,需要在子类中实现。
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
close实现的代码虽然比较多,但做的事情比较简单:首先执行close操作,然后实现deregister操作,触发channelInactive事件。
在close的实现中,先调用assertEventLoop方法确保当前方法是在eventLoop中执行,然后多次使用invokeLater方法吧一系列操作放在放在Runnable中执行,这样做的目的是事为了保证接下来的操作一定在当前操作完成之后才会执行,这一点是有eventLoop来保证的,eventLoop执行Runnable的顺序和调用execute的顺序一致,相关实现会在后面eventLoop章节具体讨论。
deregister实现
@Override
public final void deregister(final ChannelPromise promise) {
assertEventLoop();
deregister(promise, false);
}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) { //避免多次执行deregister操作
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister(); //执行真正的deregister操作,这方法默认没做任何事情,子类可以根据需要覆盖实现
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive(); // 触发channelInactive事件
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered(); //触发channelUnregistered事件
}
safeSetSuccess(promise);
}
}
});
}
语义:
  • 调用doDeregister执行真正的deregister操作
  • 根据参数可能需要触发channelInactive事件
  • 触发channelUnregistered事件
write实现
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
//如果outboundBuffer是null, 意味着这个channel已经被close掉了,需要使用promise返回错误,然后释放掉msg
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg); //过滤msg, 默认实现中没有做任何操作,把msg原样返回, 资料可以根据需要覆盖实现
size = pipeline.estimatorHandle().size(msg); //计算msg序列化之后的长度
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise); //把msg放入outboundBuffer中
}
write的操作比较简单,他只是把消息放到outboundBuffer中,并没有做实际的写操作。
flush实现
@Override
public final void flush() {
assertEventLoop(); //确保在eventLoop中执行
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
//如果outboundBuffer不是null才可以进入真正的write阶段
flush0();
}
protected void flush0() {
if (inFlush0) { //确保不被多个线程同时执行
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) { //确保outboundBuffer有数据是才执行下面的步骤
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) { //如果channel不是active状态,返回错误
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer); //执行真正的写操作,这是一个抽象方法,需要子类实现。
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
//如是I/O异常,并且channel配置允许自动关闭,则关闭channel
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t); //关闭output通道,不允许执行write操作。
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
语义:
  • 调用doWrite方法执行真正的写操作
  • 如果写操作失败,调用close或者shutdownOutput进行善后。
至此,已经分析完了AbstractChannel和AbstractUnsafe的所有重要的实现,回头总结一下,这个类主要做了这么几件事:
1. 明确了AbstractChannel和AbstractUnsafe方法之间的调用关系,或通过unsafe实例直接调用,或通过pipleline间接调用。
2. 规定了Unsafe方法的执行线程,有些必须在eventLoop中执行,这样的方法第一行就调用assertEventLoop来确保当前方法是在eventLoop线性中,有些不需要一定在eventLoop中执行的则没有这个调用
3. 确保多线程多线程环境下的执行顺序,这一点通过把一系列操作包装成Runnable放入eventLoop中来保证,invokeLater方法就是一个典型的例子。
4. 定义了事件的触发条件,在前面的代码分析中,频繁地出现pipeline.fireXXX()的调用,这些调用就是在触发特定的事件,大部分情况下用户不要自己去触发事件。
5. 优化多线程环境下的数据同步性能,使用volatile减少synchronized和Lock的使用, 典型的用法如下所示:
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
......
return;
}
....
doWrite(outboundBuffer);
AbstractUnsafe的扩展点
前面说过,AbstractUnsafe做了很多事,但把临门一脚的工作交给子类完成,这样让子类的实现变得简单很多。AbstractUsafe把这些工作定义成形如doXXX的抽象方法或是没有干任何事的空方法。下面是这些方法的列表:
方法
说明
protected abstract SocketAddress localAddress0()
被localAddress调用,执行真正的获取本地地址的操作。
protected abstract SocketAddress remoteAddress0()
被remoteAddress调用,是真正的获取远程地址的操作。
protected abstract boolean isCompatible(EventLoop loop)
检查eventLoop是是否和这个Channel兼容。
protected void doRegister()
调用链register->register0->doRegister, 真正的注册操作。
protected abstract void doBind(SocketAddress localAddress)
被bind调用,执行真正绑定本地地址的操作。
protected abstract void doDisconnect()
被disconnect调用,执行真正的断开连接操作。
protected abstract void doClose()
被close掉,执行真正的关闭channel操作。
protected void doShutdownOutput()
被shutdownOutput调用,用来关闭output通道,使Channel不能write。它的的默认实现是调用doClose
protected void doDeregister()
被deregister调用,是真正的注销操作,虽然不是抽象方法,然而只有一个{}, 还是要等你来搞定。
protected abstract void doBeginRead()
调用链register->register0->beginRead->doBeginRead, 实现让eventLoop可以自动触发read事件。
protected abstract void doWrite(ChannelOutboundBuffer in)
调用链flush->flush0->doWrite, 执行真正的写操作。
protected Object filterOutboundMessage(Object msg)
被write调用,在消息被放到outboundBuffer之前对消息进行处理,默认啥事都没干,就是把你传进去的msg还给你。

转载于:https://www.cnblogs.com/brandonli/p/9949730.html

netty源码解解析(4.0)-3 Channel的抽象实现相关推荐

  1. netty源码解解析(4.0)-5 线程模型-EventExecutorGroup框架

    上一章讲了EventExecutorGroup的整体结构和原理,这一章我们来探究一下它的具体实现. EventExecutorGroup和EventExecutor接口 io.netty.util.c ...

  2. netty源码解解析(4.0)-2 Chanel的接口设计

    全名: io.netty.channel.Channel Channel内部定义了一个Unsafe类型,Channel定义了对外提供的方法,Unsafe定义了具体实现.我把Channel定义的的方法分 ...

  3. Netty源码分析系列之服务端Channel的端口绑定

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 微信公众号 问题 本文内容是接着前两篇文章写的,有兴趣的朋友可以先去阅读下两篇文章: Ne ...

  4. Netty源码深度解析-ByteBuf(1) ByteBuf简介

    导读 原创文章,转载请注明出处. 本文源码地址:netty-source-code-analysis 本文所使用的netty版本4.1.6.Final:带注释的netty源码 本文简要地介绍ByteB ...

  5. Netty 源码深度解析(九) - 编码

    概述 一个问题 转载于:https://juejin.im/post/5bff467fe51d4555ed5a3111

  6. Netty源码分析系列之常用解码器(下)——LengthFieldBasedFrameDecoder

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 前言 在上一篇文章中分析了三个比较简单的解码器,今天接着分析最后一个常用的解码器:Leng ...

  7. 【Netty源码分析摘录】(八)新连接的接入

    文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...

  8. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  9. Netty源码解析之内存管理-PooledByteBufAllocator-PoolArena

      PooledByteBufAllocator是Netty中比较复杂的一种ByteBufAllocator , 因为他涉及到对内存的缓存,分配和释放策略,PooledByteBufAllocator ...

最新文章

  1. 写一个ArrayList类的动态代理类
  2. 十二张图详解淘宝架构变迁
  3. 【译】①JWS之Java[tm] Web Start开发者指南目录
  4. 编程作业—C++初探 简单的学生信息处理程序实现
  5. Axis --SOAP引擎
  6. Python 面向对象封装和继承
  7. python3安装过程中出现的ssl问题,No module named _ssl或者renaming “_ssl“ since importing it failed
  8. 使用MemoryStream和FileStream
  9. Iphone开发之音频101(part 2):转换和录音
  10. 计算机网络之数据链路层:9、ALOHA协议-随机访问介质访问控制
  11. ext/iconv/.libs/iconv.o: In function `_php_iconv_strlen'
  12. 图像处理中滤波(filtering)与卷积(convolution)的区别
  13. git revert 回滚代码至上一版本
  14. 移动互联网(一)短信和彩信等接口开发封装
  15. chrome浏览器怎么把整个网页截图保存
  16. 创业公司天使轮、A轮、B轮……IPO融资时如何分配股权?
  17. win10 开 5g 热点
  18. 林轩田机器学习基石笔记6 - Theory of Generalization
  19. 禁用笔记本键盘和触摸板
  20. shell无限死循环

热门文章

  1. npm install对本地工程文件造成了哪些修改
  2. 推荐一个免费的屏幕取色器,鼠标放到的位置自动显示RGB
  3. linux的各种版本,各种版本Linux系统下载
  4. python linux 优化_Python 代码性能优化技巧
  5. 剑灵力士卡刀ahk_技术宅教你:召唤代码一键卡刀详细教程帖
  6. 恢复二叉搜索树Python解法
  7. 2top 存储过程 查看_S7-1500 PLC的存储区
  8. 易语言复制C指针,易语言教程API模块制作cmd复制文件
  9. php基础小结,PHP基础学习小结
  10. java 数字表示什么意思是什么,读取Java字节码指令:数字是什么意思?