前言:

Dubbo提供了多种协议来进行服务消费者和提供者之间的交互。

从Dubbo框架层次来看(参考:https://dubbo.apache.org/zh/docsv2.7/dev/design/),Dubbo协议位于以下位置:

如何理解这些协议呢?我觉得可以按照HTTP协议的方式来理解,他们都是位于TCP协议之上的应用层协议。

可以把他们理解为一份交流的说明书,每个字节都有其特定含义,服务消费者和提供者按照这份说明书来发送数据、接收数据,按照说明书的指示,解析出每个字节的含义,最终拼接出一个完整的请求体、响应体即可。

那么自定义的Dubbo协议,是如何设定每个字节的含义?跟随笔者一起来看下吧。

1.Dubbo的那些协议

先来概述一下Dubbo提供的那些协议。首先从Protocol接口出发,展示下其实现类。

每种协议所用来表示请求、响应的方式都有所不同,性能和适用场景也有所不同。

具体的使用场景和不同之处,可以参考Dubbo官网: 协议参考手册 | Apache Dubbo

里面的相关介绍还是非常详细的。

Dubbo官网中也有一张表来对比各协议,但是不够全,笔者从网上找到另一篇(https://www.cnblogs.com/yuandluck/p/9481084.html),具体如下:

协议名称

实现描述 连接 使用场景
dubbo 传输:mina、netty、grizzly

序列化:dubbo、hessian2、java、json

    dubbo缺省采用单一长连接和NIO异步通讯    1.传入传出参数数据包较小

2.消费者 比提供者多

3.常规远程服务方法调用

4.不适合传送大数据量的服务,比如文件、传视频

rmi 传输:java  rmi

序列化:java 标准序列化

连接个数:多连接

连接方式:短连接

传输协议:TCP/IP

传输方式:BIO


1.常规RPC调用

2.与原RMI客户端互操作

3.可传文件

4.不支持防火墙穿透

hessian
传输:Serverlet容器

序列化:hessian二进制序列化


    连接个数:多连接
    连接方式:短连接
    传输协议:HTTP
    传输方式:同步传输

1.提供者比消费者多

2.可传文件

3.跨语言传输

http
传输:servlet容器

序列化:表单序列化

    连接个数:多连接
    连接方式:短连接
    传输协议:HTTP
    传输方式:同步传输
1.提供者多余消费者

2.数据包混合

webservice
传输:HTTP

序列化:SOAP文件序列化

    连接个数:多连接
    连接方式:短连接
    传输协议:HTTP
    传输方式:同步传输

1.系统集成

2.跨语言调用

thrift
    与thrift RPC实现集成,并在基础上修改了报文头   

长连接、NIO异步传输   

2.Dubbo协议解析

针对Dubbo协议的学习,笔者觉得需要分成几个方向:

传输:数据传输的方式有mina、netty、grizzly这么几种,默认使用netty4的方式来传输。这个之前我们有介绍过

序列化:对象在网络间以二进制的方式来传输,所以,在真正传输之前,需要将对象序列化为字节数组。这个在下一篇博客中会有说明

协议:这就是本文的重点。协议表明了消费者发送请求的方式,服务提供者接收请求的方式。

在下面的分析中,我们主要分析下消费者依据Dubbo协议发送请求的过程和提供者依据Dubbo协议接收请求的过程。

传输方式默认使用Netty4,序列化方式默认使用Hessian2(这个不是本文重点)。

消费者发送请求默认使用NettyClient

提供者接收请求默认使用NettyServer

后续的分析主要围绕这两个类来完成。

3.消费者依据Dubbo协议发送请求全过程

3.1 NettyClient的相关创建

public class NettyClient extends AbstractClient {protected void doOpen() throws Throwable {final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);bootstrap = new Bootstrap();bootstrap.group(NIO_EVENT_LOOP_GROUP).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()).channel(socketChannelClass());bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));}NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ch.pipeline()// 添加decode和encode的相关Handler.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler);String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);if(socksProxyHost != null) {int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});}}

有关Netty的相关知识非本文重点(后续笔者会有Netty相关的专题博客),我们只需要知道,当消费者发送请求时必须会经过Encoder 相关的Handler即可。

这个EncoderHandler就是我们分析的重点。

3.2 NettyCodecAdapter下的encoder

final public class NettyCodecAdapter {// Encoder默认为InternalEncoder对象private final ChannelHandler encoder = new InternalEncoder();private final ChannelHandler decoder = new InternalDecoder();private final Codec2 codec;private final URL url;private final org.apache.dubbo.remoting.ChannelHandler handler;public NettyCodecAdapter(Codec2 codec, URL url, org.apache.dubbo.remoting.ChannelHandler handler) {this.codec = codec;this.url = url;this.handler = handler;}public ChannelHandler getEncoder() {return encoder;}public ChannelHandler getDecoder() {return decoder;}// Encoder对象private class InternalEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);Channel ch = ctx.channel();NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);// 最终交由codec来完成encode操作codec.encode(channel, buffer, msg);}}...
}

通过分析可知:最终的encode工作还是交由codec对象来完成了,这个对象是哪里来的呢?

回过头,到3.1中看下NettyCodecAdapter的创建,就知道codec是传递过来的。创建过程如下:

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);private Codec2 codec;private int timeout;private int connectTimeout;public AbstractEndpoint(URL url, ChannelHandler handler) {super(url, handler);// 在这里this.codec = getChannelCodec(url);this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);}protected static Codec2 getChannelCodec(URL url) {String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");// 所以,最终还是通过SPI的方式加载到Codec2的实现类if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);} else {return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));}}
}

此时通过SPI的方式来加载Codec2的实现类,默认使用DubboCodec来完成

3.3 DubboCodec

经历了上面的一系列分析,最终encode操作交由了DubboCodec.encode()方法来操作

3.3.1 DubboCodec.encode() 请求编码

// DubboCodec继承了ExchangeCodec,encode()方法在ExchangeCodec中执行
public class ExchangeCodec extends TelnetCodec {// header length.protected static final int HEADER_LENGTH = 16;// magic header.protected static final short MAGIC = (short) 0xdabb;protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];// message flag.protected static final byte FLAG_REQUEST = (byte) 0x80;protected static final byte FLAG_TWOWAY = (byte) 0x40;protected static final byte FLAG_EVENT = (byte) 0x20;protected static final int SERIALIZATION_MASK = 0x1f;private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);public Short getMagicCode() {return MAGIC;}@Overridepublic void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {if (msg instanceof Request) {// 请求信息包装encodeRequest(channel, buffer, (Request) msg);} else if (msg instanceof Response) {encodeResponse(channel, buffer, (Response) msg);} else {super.encode(channel, buffer, msg);}}protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {// 获取序列化方式,下一篇文章来分析Serialization serialization = getSerialization(channel);// header头默认16字节byte[] header = new byte[HEADER_LENGTH];// 设置魔数Bytes.short2bytes(MAGIC, header);// 设置FLAG_REQUEST和序列化类型信息header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());if (req.isTwoWay()) {header[2] |= FLAG_TWOWAY;}if (req.isEvent()) {header[2] |= FLAG_EVENT;}// 设置requestIdBytes.long2bytes(req.getId(), header, 4);// 序列化请求体int savedWriteIndex = buffer.writerIndex();buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);ObjectOutput out = serialization.serialize(channel.getUrl(), bos);if (req.isEvent()) {encodeEventData(channel, out, req.getData());} else {encodeRequestData(channel, out, req.getData(), req.getVersion());}out.flushBuffer();if (out instanceof Cleanable) {((Cleanable) out).cleanup();}bos.flush();bos.close();int len = bos.writtenBytes();checkPayload(channel, len);// 设置请求体的length到header中Bytes.int2bytes(len, header, 12);// writebuffer.writerIndex(savedWriteIndex);buffer.writeBytes(header); // write header.buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);}
}

我们可以总结Dubbo协议的请求体如下图:

3.3.2 DubboCodec.decode() 请求解码

对该请求进行解码是NettyServer的事情,而参考NettyServer的代码,最终还是交由DubboCodec.decode()方法来完成解码

public class ExchangeCodec extends TelnetCodec {@Overridepublic Object decode(Channel channel, ChannelBuffer buffer) throws IOException {int readable = buffer.readableBytes();byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];buffer.readBytes(header);return decode(channel, buffer, readable, header);}// 按照编码的方式进行解码即可protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {// 检查首两字节的魔数信息if (readable > 0 && header[0] != MAGIC_HIGH|| readable > 1 && header[1] != MAGIC_LOW) {int length = header.length;if (header.length < readable) {header = Bytes.copyOf(header, readable);buffer.readBytes(header, length, readable - length);}for (int i = 1; i < header.length - 1; i++) {if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {buffer.readerIndex(buffer.readerIndex() - header.length + i);header = Bytes.copyOf(header, i);break;}}return super.decode(channel, buffer, readable, header);}// 如果发生拆包,则等待后续的字节返回if (readable < HEADER_LENGTH) {return DecodeResult.NEED_MORE_INPUT;}// 获取请求体信息,12-15字节,共4字节,也就是一个int类型的字节长度信息int len = Bytes.bytes2int(header, 12);checkPayload(channel, len);int tt = len + HEADER_LENGTH;if (readable < tt) {return DecodeResult.NEED_MORE_INPUT;}// limit input stream.ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);try {return decodeBody(channel, is, header);} finally {if (is.available() > 0) {try {if (logger.isWarnEnabled()) {logger.warn("Skip input stream " + is.available());}StreamUtils.skipUnusedStream(is);} catch (IOException e) {logger.warn(e.getMessage(), e);}}}}// 对请求体信息进行解码protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// 获取request_idlong id = Bytes.bytes2long(header, 4);// 如果是response信息,则执行如下if ((flag & FLAG_REQUEST) == 0) {// decode response.Response res = new Response(id);if ((flag & FLAG_EVENT) != 0) {res.setEvent(true);}// get status.byte status = header[3];res.setStatus(status);try {ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);if (status == Response.OK) {Object data;if (res.isHeartbeat()) {data = decodeHeartbeatData(channel, in);} else if (res.isEvent()) {data = decodeEventData(channel, in);} else {data = decodeResponseData(channel, in, getRequestData(id));}res.setResult(data);} else {res.setErrorMessage(in.readUTF());}} catch (Throwable t) {res.setStatus(Response.CLIENT_ERROR);res.setErrorMessage(StringUtils.toString(t));}return res;} else {// 解码请求信息Request req = new Request(id);req.setVersion(Version.getProtocolVersion());req.setTwoWay((flag & FLAG_TWOWAY) != 0);if ((flag & FLAG_EVENT) != 0) {req.setEvent(true);}try {// 根据序列化的方式进行反序列化即可ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);Object data;if (req.isHeartbeat()) {data = decodeHeartbeatData(channel, in);} else if (req.isEvent()) {data = decodeEventData(channel, in);} else {data = decodeRequestData(channel, in);}req.setData(data);} catch (Throwable t) {// bad requestreq.setBroken(true);req.setData(t);}return req;}}
}

总结:

Dubbo协议主要是请求头的设置,设置序列化的方式,请求体的长度后,可以在对应字节后的信息都作为请求体,按照设置的Serialization进行反序列化出完整的Request对象即可。

源码看多了,就对协议本身没那么感冒了。就是自定义设置请求头和请求体信息即可,根据请求头获取请求体的基本信息,按照指定的方式将请求体(字节数组)反序列化为具体的对象。

Dubbo源码解析-Dubbo协议解析相关推荐

  1. dlt645 2007 java源码,DLT645 1997 协议解析

    源码下载  -> 提取码 QQ:505645074 DLT645.zip 工具 源码 规约解析 DL/T645-07: 数据帧格式: 注意事项: (1)前导字节-一般在传输帧信息前,都要有0~4 ...

  2. dubbo源码分析学习---dubbo 重要内容Invoker 和服务注册过程

    这篇文章主要续接上一篇文章的基础上做的分析学习.前面没有分析 Invoker,我们来简单看看 Invoker 到底是一个啥东西. 一.Invoker 是什么 从前面的分析来看,服务的发布分三个阶段: ...

  3. 面试题:DUBBO源码使用了哪些设计模式

    0 文章概述 DUBBO作为RPC领域优秀开源的框架在业界十分流行,本文我们阅读其源码并对其使用到的设计模式进行分析.需要说明的是本文所说的设计模式更加广义,不仅包括标准意义上23种设计模式,还有一些 ...

  4. Dubbo源码解析-Dubbo服务消费者_Dubbo协议(一)

    前言: 在介绍完Dubbo 本地模式(Injvm协议)下的服务提供与消费后,上文我们又介绍了Dubbo远程模式(dubbo协议)下的服务暴露过程,本质上就是通过Netty将dubbo协议端口暴露出去, ...

  5. dubbo源码解析(十)远程通信——Exchange层

    远程通讯--Exchange层 目标:介绍Exchange层的相关设计和逻辑.介绍dubbo-remoting-api中的exchange包内的源码解析. 前言 上一篇文章我讲的是dubbo框架设计中 ...

  6. dubbo(5) Dubbo源码解析之服务调用过程

    来源:https://juejin.im/post/5ca4a1286fb9a05e731fc042 Dubbo源码解析之服务调用过程 简介 在前面的文章中,我们分析了 Dubbo SPI.服务导出与 ...

  7. spring注解方式整合Dubbo源码解析

    系列文章目录 前言 本节我们的Dubbo源码版本基于2.6.x 在前一章我们的整合案例中,我们有几个比较关键的步骤: 在启动类上标注了@EnableDubbo注解 在provider类上面标注了@Se ...

  8. dubbo源码解析之框架粗谈

    dubbo框架设计 一.dubbo框架整体设计 二.各层说明 三.dubbo工程模块分包 四.依赖关系 五.调用链 文章系列 [一.dubbo源码解析之框架粗谈] [二.dubbo源码解析之dubbo ...

  9. 【dubbo源码解析】--- dubbo中Invoker嵌套调用底层原理

    本文对应源码地址:https://github.com/nieandsun/dubbo-study 文章目录 1 dubbo中Invoker的重要性 2 dubbo RPC链条中代理对象的底层逻辑 2 ...

  10. Dubbo源码解析 --- DIRECTORY和ROUTER

    Dubbo源码解析 --- DIRECTORY和ROUTER 今天看一下Directory和Router. 我们直接从代码看起(一贯风格),先看后总结,对着总结再来看,相信会收获很多.我们先看com. ...

最新文章

  1. Linux里新建文件/目录的默认权限
  2. Feature Selection: A Data Perspective --阅读笔记2 传统数据的特征选择算法
  3. POSIX消息队列信号通知
  4. 获取C#中方法的执行时间及其代码注入
  5. LSTM神经网络Demystifying LSTM neural networks
  6. AppBoxFuture(四). 随需而变-Online Schema Change
  7. SAP Smart Business design time = CDS view SADL
  8. 5个Vue.js项目的令人敬畏的模板
  9. UML的奥妙 - 学习UML笔记(1)
  10. Canvas-drawImage 绘制图片模糊问题
  11. MySQL配置文件简单解析
  12. 解决 jQuery 和其他库的冲突
  13. Uva 1630 折叠串
  14. ENVI5.4 新增图像分类介绍
  15. sql相关日期截取函数
  16. 联想重装系统去掉保护_经验:联想硬盘保护系统EDU 7.0清除日志
  17. 关于站内搜索的那些事儿
  18. 32/64位处理器、操作系统、应用程序和库之间有什么关系?
  19. Zemax实现微透镜阵列光束整形(原理+仿真)
  20. 39、C++定义一个类,实现向量的加减运算

热门文章

  1. Pycharm-SSH连接服务器
  2. 蓝湖+Vue.js+SosoApi+Spring Cloud+Rancher——项目架构总结介绍
  3. oracle asm结构,深入了解Oracle ASM
  4. oracle中asm是什么,什么是ASM?
  5. 安装FeHelper插件
  6. 2016考研数学四轮进阶复习规划
  7. otool 分析Mach-O
  8. 肯德尔系数怎么分析_2020LPL春季赛3月15日比赛数据的数据分析(Python)
  9. 标准gpx文件的时间格式
  10. Scratch-Q版三国小人物角色素材分享,值得您的收藏!