dubbo的handler机制
Dubbo
的整套handler
。。。反正我刚看的时候挺头疼。从Protocol
层到Transporter
层。纵深3层。从DubboProtocol
构建,被逐层传递到NettyServer
,然后在逐层返回。整个过程中,还被不断包裹,从同步到异步,不停变换用法和说法。总之,看得挺累,还是坚持逐个分析一遍。下面来说说。
先看一张图,是使用Dubbo Protocol
并且使用Netty
作为服务器的情况下Handler
的整个包装过程。
解释下上面这张图
1.在DubboProtocol
中构建ExchangeHandler
命名requestHandler
2.在Exchange
层做两次包装new DecodeHandler(new HeaderExchangeHandler(requestHandler))
,具体参考类:HeaderExchanger
① 使用HeaderExchangeHandler
做一次包装,HeaderExchangeHandler
的作用是实现了Reques
t和Response
的概念,当接到received
请求后,将请求转为reply
。请参考类HeaderExchangeHandler
,对这部分不熟悉的可以参考文章dubbo的exchange层
② 使用DecodeHandler
做一次包装,DecodeHandler
的作用是用来对Request Message
和Response Message
做解码操作,解码完成后才能给HeaderExchangeHandler
使用。
3.在Exchange
层包装后的Handler
会被传递到Transporter
层(NettyTransporter
)并且把类型转换成ChannelHandler
,因为ChannelHandler
更为抽象。
4.Handler
在Transporter
层流转,会被传递到NettyServer
中
5.在NettyServer
中被AllChannelHandler
包装,其作用是把NettyServer
接收到的请求转移给Transporter
层的线程池来处理。同步转异步。
6.Handler
被再次NettyServerHandler
包装,NettyServerHandler
的父类是ChannelDuplexHandler
。它属于Netty
的Handler
。由Netty
来管理和调用其中的回调方法。Netty
在接受到channelActive
,channelRead
等方法后,会把请求转移给Dubbo
的Handler
,这样每当请求过来,Netty
的Handler
接到请求就立马把数据和相关信息转交给Dubbo
的Handler
,由Dubbo
的Handler
来管理了。
上述的整个包装过程基本提现了Dubbo Protocol
的Handler
的转移过程。从DubboProtocol
构建逐层向下传递。当Netty
接到TCP
请求后,调用过程又逐层向上传递。下面看下几个关键转折点的代码。
1.请参考DubboProtocol
类,Dubbo Handler
初始化创建的地方。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {// ----------------------此处省略一堆代码------------------------
};
2.下面看下HeaderExchangeHandler
,Request
和Response
概念重点提现的地方
public class HeaderExchangeHandler implements ChannelHandlerDelegate {// ----------------------此处生路一堆代码------------------------@Override // 接受信息public void received(Channel channel, Object message) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {if (message instanceof Request) {// 看看信息是不是Request// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {if (request.isTwoWay()) {// twoWay代表这个消息要回复// 服务器端接到请求,调用handleRequest得到Response。// 这边就提现了Request和Response的概念// handleRequest,实际上是去做实际的业务动作了Response response = handleRequest(exchangeChannel, request);channel.send(response);} else {// 不需要返回handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {// 看看信息是不是Response// 如果是Response,那么消息肯定是从Privoder方发来的。handleResponse(channel, (Response) message);} else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (echo != null && echo.length() > 0) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}// 收到Privoder方返回的Response信息,并且做出处理static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {// 调用DefaultFuture.received来通知Response消息到了。DefaultFuture.received(channel, response);}}// 服务器端接到请求,调用handleRequest得到Response。// 这边就提现了Request和Response的概念// handleRequest,实际上是去做实际的业务动作了Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());if (req.isBroken()) {Object data = req.getData();String msg;if (data == null) msg = null;else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);else msg = data.toString();res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);return res;}// find handler by message class.Object msg = req.getData();try {// handle data.// 完成实际的业务动作,也就是调用DubboProtocol.requestHandler.reply类中的实现。// 并且返回Response信息Object result = handler.reply(channel, msg);res.setStatus(Response.OK);res.setResult(result);} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));}return res;}// ----------------------此处生路一堆代码------------------------
}
上面的代码比较多,这边总结下。received
方法用于接受信息,这个方法是Provider
和Consumer
共用的。Provider
接收的信息必然是Request
,它所处的角色就类似与服务器。Consumer
接收的信息必然是Response
,它所处的角色就类似于客户端。当然,对于事件Event
类信息另说,这边为了思路清晰,不展开细说。
角色Provider
接收Request
信息。然后就是做业务动作,接着就是判断是否回复Response
,要看twoWay
这个标识。具体再看下代码中的一些注释。追踪下需要回复Response
的情况(因为大部分情况下,我们都是用同步请求,需要回复Response
)
找到代码Object result = handler.reply(channel, msg);
,reply
动作会调用到DubboProtocol.requestHandler.reply
。这个地方如果自己跟代码会比较乱。对Dubbo Handler
不太熟悉可以看文章dubbo的handler机制。提醒下,看HeaderExchanger.bind
动作的实现,会发现DubboProtocol.requestHandler
被包起来了。所以这个地方reply一定是调用DubboProtocol.requestHandler
的。看下那块代码。
@Override// 执行invoke并且返回Response信息
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {if (message instanceof Invocation) {Invocation inv = (Invocation) message;Invoker<?> invoker = getInvoker(channel, inv);// ----------------------此处生路一堆代码------------------------RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 执行invoke并且返回Response信息return invoker.invoke(inv);}
}
invoker.invoke(inv);
执行的是服务在初始化的时候Protocol.export
出来的。实际export
出来的Exporter
会被过滤器链给包住。这部分知识可以查看文档dubbo的filter
3.看下NettyServerHandler
的源码,Netty handler
服务与Dubbo handler
职责交接的地方。
public class NettyServerHandler extends ChannelDuplexHandler {// dubbo的handlerprivate final ChannelHandler handler;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try {if (channel != null) {channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);}// netty的客户端激活消息被传递给dubbo的handler,并转换概念变为connectedhandler.connected(channel);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try {// netty的数据到达消息被传递给dubbo的handler,并转换概念变为receivedhandler.received(channel, msg);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}}
}
上面的类,省略打大部分代码。只是以客户端建立连接和TCP
数据到达作为案例说下。其他场景也是类似。
总结
Dubbo
的Handler
纵深3层,包装过程较多,代码追踪比较麻烦。本文用一张示意图说了具体的包装过程。源码追踪过程要关注3个重要点位。
1.DubboProtocol
类,Dubbo Handler
初始化创建的地方
2.HeaderExchangeHandler
类,Request
和Response
概念重点提现的地方
3.NettyServerHandler
类,Netty Handler
服务与Dubbo Handler
职责交接的地方
dubbo的handler机制相关推荐
- dubbo的超时机制和重试机制
参考: https://www.cnblogs.com/ASPNET2008/p/7292472.html https://www.tuicool.com/articles/YfA3Ub https: ...
- 聊聊Dubbo - Dubbo可扩展机制源码解析
2019独角兽企业重金招聘Python工程师标准>>> 摘要: 在Dubbo可扩展机制实战中,我们了解了Dubbo扩展机制的一些概念,初探了Dubbo中LoadBalance的实现, ...
- android Handler机制之ThreadLocal详解
概述 我们在谈Handler机制的时候,其实也就是谈Handler.Message.Looper.MessageQueue之间的关系,对于其工作原理我们不做详解(Handler机制详解). Messa ...
- Android多线程:深入分析 Handler机制源码(二)
前言 在Android开发的多线程应用场景中,Handler机制十分常用 接下来,深入分析 Handler机制的源码,希望加深理解 目录 1. Handler 机制简介 定义 一套 Android 消 ...
- Android开发--多线程中的Handler机制/Looper的介绍
在多线程的开发中,Handler机制如同在主线程中运行一样,只是需要注意在非主线程中Handler机制的作用限制,本文将对这些内容作出解释. * 如果应用上一个例子的方法对UI界面进行操作,将抛出异常 ...
- 深入浅出,Handler机制外科手术式的剖析(ThreadLocal,Looper,MessageQueen,Message)(上)...
2019独角兽企业重金招聘Python工程师标准>>> 为什么会有handler机制? 在Android中,所有的UI控件都是运行在主线程中的, 如果我们从子线程访问UI,系统会报异 ...
- 【Android 异步操作】Handler 机制 ( Handler 常用用法 | HandlerThread 简介 | HandlerThread 源码注释分析 )
文章目录 一.Handler 常用用法 二.HandlerThread 简介 三.HandlerThread 源码 一.Handler 常用用法 主线程 Handler 主要作用 : Looper 和 ...
- 【Android】Handler 机制 ( Handler | Message | Looper | MessageQueue )
文章目录 I . Handler 机制简介 II . Handler 机制 Handler Message Looper MessageQueue 四组件对应关系 III . Handler ( 消息 ...
- Android 为什么要有handler机制?handler机制的原理
为什么要有handler机制? 在Android的UI开发中,我们经常会使用Handler来控制主UI程序的界面变化.有关Handler的作用,我们总结为:与其他线程协同工作,接收其他线程的消息并通过 ...
最新文章
- 实验进行中:.NET WebAssembly支持
- 平均 14926 元!2021 年 5 月程序员工资统计出炉
- springboot activiti 整合项目框架源码 shiro 安全框架 druid 数据库连接池
- 最大的100家外包公司(zz.IS2120@BG57IV3)
- c++ 特定容器算法(sort,merge,reverse,remove,unique)
- 阿里技术:如何画出一张合格的技术架构图?
- linux文件备份与删除,【Linux Shell脚本编程】自动备份与删除历史备份脚本
- java 死锁_java死锁分析
- 批量修改同一目录下文件名--操作so easy
- java 奇数 字符乱码_socket中文奇数个出现乱码的解决办法
- java的访问修饰符
- 6月7日 bc总结
- c++ vector向量
- 网站速度优化模块HttpCompressionModule
- 数学建模常用模型简介其他模型大全汇总
- 夏普mx2608n网络扫描到计算机,【转载】夏普复印机网络扫描教程
- 淘宝购物流程图 基本流和备选流以及测试用例
- OpenCV鼠标修改图片透明度
- 计算机表格如何求和,excel表格怎么求和? excel自动求和的三种方法
- 大数据就业方向_学大数据就业前景如何,就业方向有哪些?