文章目录

  • pigeon TCP协议格式
    • 粘包半包问题
    • 定长消息头格式
      • 默认协议消息格式
      • 统一协议消息格式
    • 消息体
  • Netty3 Handler相关实现
    • 上下游传递原理
    • 相关实现
      • ChannelHandlerContext
      • OneToOneDecoder & OneToOneEncoder
      • SimpleChannelHandler
      • ChannelEvent
  • pigeon RPC通信的核心实现原理
    • 服务端实现
      • FrameDecoder
      • Crc32Handler
      • CompressHandler
      • ProviderDecoder
      • NettyServerHandler
      • ProviderEncoder
      • FramePrepender
    • 客户端实现
      • NettyClientHandler

pigeon底层通过Netty-3.9.2.Final实现服务端和客户端的连接通信,对应实现类为NettyServer和NettyClient。在内部,处理RPC通信的核心逻辑又分别定义在NettyServerPipelineFactory和NettyClientPipelineFactory,这两个类都实现了ChannelPipelineFactory,重写了里面的getPipeline方法,用于处理发送请求和处理请求的相关流程逻辑。

pigeon TCP协议格式

pigeon目前区分两种TCP协议方式,一种是非统一(默认)协议,为普通序列化方式如Hessian,json等方式调用,另一种是统一协议,如Thrift调用和泛化调用,其中泛化调用可以在不直到api设计的基础上,直接通过指定方法名字符串来调用相应的服务方法。基于不同协议,tcp消息包格式也是不同的,下面分开解析

粘包半包问题

在TCP传输中,一个完整的消息包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,因而数据接收方无法区分消息包的头尾,在接收方来看,可能一次接收的数据内容只是一次完整请求的部分数据,或包含多次请求的数据等情况。基于此,常见有三种解决方式:

  1. 消息包定长,固定每个消息包长度,不够的话用空格补,缺点是长度不能灵活控制,字节不够的情况下补空格会造成浪费,字节过多还要处理分包传输的逻辑
  2. 使用定长消息头和变长消息体,其中消息体的长度必须在消息头指出,在接收方每次接收数据时,先确保获取到定长的消息头,再从消息头中解析出消息体的长度,以此来界定获取以此完整请求的消息包数据。
  3. 在消息包尾部指定分隔符,缺点是需要遍历数据,判断是否为分隔符,影响效率。

在pigeon中,是基于第二种,使用定长消息头和变长消息体的方式实现的。

定长消息头格式

在消息头部分,统一协议和默认协议的区别较大,这里分开讲述:

默认协议消息格式

默认协议消息格式具体包括2部分:消息头、消息体,其中消息体包含变长请求体和定长请求尾两部分。
默认协议消息头固定为7个字节:

  1. 第1-2个字节固定为十六进制的0x39、0x3A,即十进制的57、58,可以用来区分是默认协议还是统一协议。
  2. 第3个字节为序列化方式,如Hessian是2,java是3,json是7等。
  3. 第4-7个字节:消息体长度,int,占4个字节,值为请求体长度(请求或响应对象序列化后的字节长度)+请求尾长度11。

统一协议消息格式

类似默认协议消息,统一协议消息格式也包括2部分:消息头、消息体,和默认协议不同的是,统一协议中,消息体部分为完整请求体尾部不再包含请求尾,但会在请求体头部包含一个两字节长度的请求头。这里需要区分消息头和请求头的区别。
统一协议的消息头固定为8个字节:

  1. 第1-2个字节固定为十六进制的0xAB、0xBA,即十进制的171、186,或8位有符号整型的-85、-70,可以用来区分是默认协议还是统一协议。
  2. 第3个字节为协议版本,也可以称作command字节,会用来定义编解码的相关自定义行为,如压缩方式、数据校验方式等,具体command第8位表示是否需要进行校验数据完整性,第6、7位定义了是否进行压缩及具体的压缩方式。
  3. 第4个字节为序列化方式,一般为1。
  4. 第5~8个字节为消息体长度。

消息体

消息体部分,不区分是统一协议还是默认协议,最终解析出请求和响应对象类型分别为com.dianping.dpsf.protocol.DefaultRequest或com.dianping.dpsf.protocol.DefaultResponse,而除此之外,两种协议有细微区别:

  1. 统一协议没有请求尾,在消息体头部会有两个定长字节,这两个字节在序列化内部赋值,是Thrift内部计算的头部长度。
  2. 默认协议没有请求头,在消息体尾部会有11位定长字节,前8个字节为消息sequence,long型,值请从0开始递增,每个消息的sequence都不同;后3个字节固定为:29,30,31

DefaultRequest或com的具体定义如下:

@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "seq", scope = DefaultRequest.class)
public class DefaultRequest implements InvocationRequest {/*** 不能随意修改!*/private static final long serialVersionUID = 652592942114047764L;// 必填,序列化类型,默认hessian为2private byte serialize;// 必填,消息sequence,long型,值请从0开始递增,每个消息的sequence都不同@JsonProperty("seq")private long seq;//必填,如果调用需要返回结果,固定为1,不需要回复为2,手动回复为3private int callType = Constants.CALLTYPE_REPLY;// 必填,超时时间,单位毫秒private int timeout = 0;// 请求创建时间, 不参与序列化传输@JsonIgnoreprivate transient long createMillisTime;//必填,服务名称url,服务唯一的标识@JsonProperty("url")private String serviceName;//必填,服务方法名称private String methodName;//必填,服务方法的参数值@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")private Object[] parameters;//必填,消息类型,服务调用固定为2,心跳为1,服务框架异常为3,服务调用业务异常为4private int messageType = Constants.MESSAGE_TYPE_SERVICE;// 旧版上下文信息传递对象@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")private Object context;// 服务版本private String version;// 必填,调用者所属应用名称,在META-INF/app.properties里的app.name值private String app = ConfigManagerLoader.getConfigManager().getAppName();// 请求体大小@JsonIgnoreprivate transient int size;
}

DefaultResponse的具体定义如下:

@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "seq", scope = DefaultResponse.class)
public class DefaultResponse implements InvocationResponse {/*** 不能随意修改!*/private static final long serialVersionUID = 4200559704846455821L;private transient byte serialize;// 返回的消息sequence,对应发送的消息sequence,long型@JsonProperty("seq")private long seq;//消息类型,服务调用为2,服务调用业务异常为4,服务框架异常为3,心跳为1private int messageType;// 请求返回异常的相关堆栈信息@JsonProperty("exception")private String cause;// 返回服务调用结果@JsonProperty("response")@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")private Object returnVal;// 旧版上下文传递@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")private Object context;// 请求体大小@JsonIgnoreprivate transient int size;// 请求创建时间, 不参与序列化传输@JsonIgnoreprivate transient long createMillisTime;// 响应自定义上下文信息private Map<String, Serializable> responseValues = null;}

Netty3 Handler相关实现

上下游传递原理

在开始分析pigeonRPC通信实现之前,先简略总结下netty3利用ChannelHandler完成通信处理的相关逻辑。
对于netty接收到一个请求后,会调用一系列的拦截器handler来处理我们的请求,可以将请求方看成在下游,服务方的核心处理逻辑在上游,服务方接收到请求后,会将请求不断往上游传递,交由一个个ChannelUpstreamHandler#handleUpstream方法处理,在到达最上游并由核心逻辑处理完后,又交由一个个ChannelDownStreamHandler#handleDownstream处理,到最下游通过网络通信将结果回传给客户端。
基于此,我们可以通过实现ChannelUpstreamHandler和ChannelDownStreamHandler,在请求的上下游传递中,拓展我们的逻辑。

相关实现

ChannelHandlerContext

请求在上下游处理过程中,处理的上下文数据是通过ChannelHandlerContext实现的。比如我们在特定的handler中通过ChannelHandlerContext的sendUpstream和sendDownstream方法将请求传递到下一个handler中处理。对于传递处理的上下文数据,可以通过getAttachment和setAttachment进行读写。

OneToOneDecoder & OneToOneEncoder

请求数据传输在进入服务端之后和返回客户端之前,分别需要进行解码和编码操作,这由OneToOneDecoder和OneToOneEncoder两个抽象类分别实现,两者分别实现了ChannelUpstreamHandler和ChannelDownstreamHandler接口,一般通过继承这两个抽象类,并实现内部的encode或decode方向模版方法来实现具体的编解码操作。

SimpleChannelHandler

SimpleChannelHandler同时实现了ChannelUpstreamHandler和ChannelDownstreamHandler接口,是一个双向handler,内部简单地实现了handleUpstream和handleDownstream,会根据传入ChannelEvent地类型,进行必要地向下转型,得到更加有意义的子类型,而后调用相关的处理方法。默认实现本Handler一般是作为最上层地handler,如果在本Handler之后还需要向更上游传递,需要确保在handleUpstream地最后,手动调用了super.handleUpstream方法。

ChannelEvent

netty有多种事件可以在Channel中传递,交由用户定义的handler处理,这些事件都以ChennelEvent的形式定义,常用有以下事件:

  1. MessageEvent:正常消息请求事件
  2. ChannelStateEvent:channel状态变更事件,包括以下几种:
    1. OPEN: channel开启
    2. BOUND: channel绑定到特定的本地地址
    3. CONNECTED: channel连接到特定的远程地址
    4. INTEREST_OPS: channel对特定感兴趣的操作会进行暂停

pigeon RPC通信的核心实现原理

下面分成服务端和客户端两部分,从这两个类展开分析pigeon

服务端实现

下面先看NettyServerPipelineFactory的实现:

public class NettyServerPipelineFactory implements ChannelPipelineFactory {// 服务端连接实例引用private NettyServer server;// 通过编解码工厂,获取单例编解码配置private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();public NettyServerPipelineFactory(NettyServer server) {this.server = server;}// 初始化pipelinepublic ChannelPipeline getPipeline() {ChannelPipeline pipeline = pipeline();pipeline.addLast("framePrepender", new FramePrepender());pipeline.addLast("frameDecoder", new FrameDecoder());pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));pipeline.addLast("compressHandler", new CompressHandler(codecConfig));pipeline.addLast("providerDecoder", new ProviderDecoder());pipeline.addLast("providerEncoder", new ProviderEncoder());pipeline.addLast("serverHandler", new NettyServerHandler(server));return pipeline;}}

对于上面所有的Handler,可以分为3类:

  1. UpStreamHandler

    1. FrameDecoder
    2. ProviderDecoder
    3. NettyServerHandler
  2. DownStreamHandler
    1. FramePrepender
    2. ProviderEncoder
  3. 双向Handler
    1. Crc32Handler
    2. compressHandler

结合代码分析,最终的调用时序如下所示:

下面根据每个handler的处理顺序,依次分析每个handler的处理逻辑

FrameDecoder

顾名思义,FrameDecoder用来解析出通信管道中一次请求的数据,解决tcp通信中粘包和半包的问题。
pigeon的FrameDecoder继承自netty的org.jboss.netty.handler.codec.frame.FrameDecoder,而org.jboss.netty.handler.codec.frame.FrameDecoder又继承自SimpleChannelUpstreamHandler,在netty实现的FrameDecoder,核心实现方法是messageReceived,大致实现原理是不断读取接收到的字节流,并累加到cumulation变量,通过调用callDecode来尝试对当前累加的字节Buffer cumulation进行解码,直到解析出一个完整请求的feame对象,最后会调用Channels#fireMessageReceived触发Handler的pipeline调用来完成一次完整请求。
在解码过程callDecode中调用了一个抽象模版方法decode来完成具体的解码逻辑,decode方法尝试解析cumulation变量,如果不能按照自定义解析规则解析出一个完整请求的数据包,就返回null,否则返回一个完整的数据包,这里读取成功的同时,需要更新cumulation的字节起始点到当前完整数据包字节的尾部。
分析完Netty的解码流程,具体看看pigeon如何基于自己设计的协议格式来进行数据包解析。

在pigeon中,decode方法在自定义的FrameDecoder中实现,代码尝试先解析出请求头,再通过头部的请求体长度,解析出一次完整请求的包体,在代码中请求尾的长度包含在请求体的长度内部,具体实现如下所示:

public class FrameDecoder extends org.jboss.netty.handler.codec.frame.FrameDecoder {@Overrideprotected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)throws Exception {Object message = null;// 如果当前累积的小于两个字节,直接返回nullif (buffer.readableBytes() <= 2) {return message;}byte[] headMsgs = new byte[2];// 复制buff两个字节到headMsgsbuffer.getBytes(buffer.readerIndex(), headMsgs);if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) {// 0x39=57,0x3A=58//old protocolmessage = doDecode(buffer);} else if ((byte) 0xAB == headMsgs[0] && (byte) 0xBA == headMsgs[1]) {//0xAB=171,0xBA=186//new protocolmessage = _doDecode(buffer);} else {throw new IllegalArgumentException("Decode invalid message head:" +headMsgs[0] + " " + headMsgs[1] + ", " + "message:" + buffer);}return message;}protected Object doDecode(ChannelBuffer buffer) throws Exception {CodecEvent codecEvent = null;// FRONT_LENGTH = 7,即如果buffer小于消息头7位长度,直接返回nullif (buffer.readableBytes() <= CodecConstants.FRONT_LENGTH) {return codecEvent;}// 从消息的第3位开始,读取4位字节位一个无符号整数,实际位请求体大小int totalLength = (int) buffer.getUnsignedInt(buffer.readerIndex() +CodecConstants.HEAD_LENGTH);// 最后包体大小是请求体大小+请求头大小int frameLength = totalLength + CodecConstants.FRONT_LENGTH;// 当前累积的buffer是否已经包含一个完整的数据包if (buffer.readableBytes() >= frameLength) {// 获取具体数据包字节内容ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);// 更新累积缓存的读起点,方便读取处理下一个数据包buffer.readerIndex(buffer.readerIndex() + frameLength);// 用CodecEvent包装frame,并标记位非统一协议codecEvent = new CodecEvent(frame, false);// 设置接收时间codecEvent.setReceiveTime(System.currentTimeMillis());}return codecEvent;}protected Object _doDecode(ChannelBuffer buffer)throws Exception {CodecEvent codecEvent = null;// _FRONT_LENGTH = 10,即如果buffer小于消息头10位长度,直接返回nullif (buffer.readableBytes() <= CodecConstants._FRONT_LENGTH) {return codecEvent;}// 从消息的第4位开始,读取4位字节位一个无符号整数,实际位请求体大小int totalLength = (int) (buffer.getUnsignedInt(buffer.readerIndex() +CodecConstants._HEAD_LENGTH));// 最后包体大小是请求体大小+请求头大小int frameLength = totalLength + CodecConstants._FRONT_LENGTH_;// 当前累积的buffer是否已经包含一个完整的数据包if (buffer.readableBytes() >= frameLength) {// 获取具体数据包字节内容ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);// 更新累积缓存的读起点,方便读取处理下一个数据包buffer.readerIndex(buffer.readerIndex() + frameLength);// 用CodecEvent包装frame,并标记位统一协议codecEvent = new CodecEvent(frame, true);// 设置接收时间codecEvent.setReceiveTime(System.currentTimeMillis());}return codecEvent;}}

Crc32Handler

Crc32Handler主要用于校验统一协议请求的数据完整性,在解析出完整消息包长度数据之后,在解码为DefaultRequest之前,会先获取实际的数据包数据,计算crc32校验和,再和消息尾部传入的校验和进行比对,如果一致,说明校验通过,否则校验失败。而在请求结束发送相应时,又会对数据计算校验和,放在消息包尾中,以便客户端获取校验。
看看代码的具体实现:

public class Crc32Handler extends SimpleChannelHandler {private static final Logger logger = LoggerLoader.getLogger(Crc32Handler.class);private static ThreadLocal<Adler32> adler32s = new ThreadLocal<Adler32>();private CodecConfig codecConfig;public Crc32Handler(CodecConfig codecConfig) {this.codecConfig = codecConfig;}// 上游接收数据处理@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {if (e.getMessage() == null || !(e.getMessage() instanceof CodecEvent)) {return;}CodecEvent codecEvent = (CodecEvent) e.getMessage();if (codecEvent.isValid()) {if (codecEvent.isUnified()) {// 如果是统一协议,需要进行校验if (!doUnChecksum(e.getChannel(), codecEvent)) {// 校验失败codecEvent.setIsValid(false);}}}// 向上游发送消息接收事件Channels.fireMessageReceived(ctx, codecEvent, e.getRemoteAddress());}// 下游发送数据前处理@Overridepublic void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {if (!(e instanceof MessageEvent)) {// 只对MessageEvent添加校验ctx.sendDownstream(e);return;}MessageEvent evt = (MessageEvent) e;if (!(evt.getMessage() instanceof CodecEvent)) {// 只对CodecEvent添加校验ctx.sendDownstream(evt);return;}CodecEvent codecEvent = (CodecEvent) evt.getMessage();if (codecEvent.isUnified()) {// 只对统一协议请求添加校验ChannelBuffer buffer = doChecksum(e.getChannel(), codecEvent);codecEvent.setBuffer(buffer);write(ctx, evt.getFuture(), codecEvent, evt.getRemoteAddress());} else {ctx.sendDownstream(e);}}private boolean doUnChecksum(Channel channel, CodecEvent codecEvent) {ChannelBuffer frame = codecEvent.getBuffer();// 获取第3位字节byte command = frame.getByte(frame.readerIndex() +CodecConstants._FRONT_COMMAND_LENGTH);// 第8位是否位1if ((command & 0x80) == 0x80) {int frameLength = frame.readableBytes();// 数据包长度 = 消息包总长度 - 消息尾长度int dataLength = frameLength - CodecConstants._TAIL_LENGTH;// 初始化空buffer为数据包长度ChannelBuffer buffer = frame.factory().getBuffer(dataLength);// 将消息包中的数据包内容复制到buffer中buffer.writeBytes(frame, frame.readerIndex(), dataLength);// 设置需要进行校验codecEvent.setIsChecksum(true);// 计算数据包的校验和int checksum = (int) doChecksum0(buffer, dataLength);// 获取dataLength位(请求尾首位)的校验和int _checksum = frame.getInt(dataLength);if (checksum == _checksum) {// 校验有效int totalLength = buffer.getInt(CodecConstants._HEAD_LENGTH);// 更新消息包中数据包长度的4位字节实际值为数据包体的长度buffer.setInt(CodecConstants._HEAD_LENGTH, totalLength - CodecConstants._TAIL_LENGTH);// 更新codecEvent的buffercodecEvent.setBuffer(buffer);} else {// 校验失败String host = ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress();logger.error("Checksum failed. data from host:" + host);return false;}}return true;}private ChannelBuffer doChecksum(Channel channel, CodecEvent codecEvent) {ChannelBuffer frame = codecEvent.getBuffer();// 是否需要校验数据完整性boolean isChecksum = codecConfig.isChecksum();int command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);int frameLength = frame.readableBytes();if (isChecksum) {// 更新指定位数据command = command | 0x80;// commandframe.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);frame.writeByte(command);// totalLength=消息包长度-头部+尾部frame.writeInt(frameLength -CodecConstants._FRONT_LENGTH_ +CodecConstants._TAIL_LENGTH);frame.writerIndex(frameLength);if (!(frame instanceof DynamicChannelBuffer)) {// 更新bufferChannelBuffer buffer = frame.factory().getBuffer(frameLength +CodecConstants._TAIL_LENGTH);buffer.writeBytes(frame, frame.readerIndex(), frameLength);frame = buffer;}// 计算校验和long checksum = doChecksum0(frame, frameLength);frame.writeInt((int) checksum);} else {//command=127command = command & 0x7f;frame.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);frame.writeByte(command);frame.writerIndex(frameLength);}return frame;}// crc32校验和计算private long doChecksum0(ChannelBuffer frame, int frameLength) {//checksumAdler32 adler32 = adler32s.get();if (adler32 == null) {adler32 = new Adler32();adler32s.set(adler32);}adler32.reset();adler32.update(frame.array(), 0, frameLength);return adler32.getValue();}}

CompressHandler

CompressHandler构造函数传入了一个CodecConfig,用来定义压缩的相关属性,在接收请求和返回响应经过CompressHandler时,会根据配置尝试对消息包的正文部分进行压缩,压缩之后需要更新消息包正文内容的长度,先看CodeConfig实现:

public class CodecConfig {private static final Logger logger = LoggerLoader.getLogger(CodecConfig.class);private ConfigManager configManager;// 是否允许压缩private volatile boolean compressed;// 压缩类型private volatile CompressType compressType;// 压缩最小阈值,只有数据包大于指定阈值才进行压缩private volatile int compressThreshold;// 是否校验数据完整性private volatile boolean checksum;// 从配置中心初始化相关配置,并注册监听器,监听配置修改public CodecConfig(ConfigManager configManager) {// 指定配置中心this.configManager = configManager;// 获取配置属性并解析this.compressed = this.configManager.getBooleanValue(Constants.KEY_CODEC_COMPRESS_ENABLE,Constants.DEFAULT_CODEC_COMPRESS_ENABLE);this.compressType = getCompressType((byte) this.configManager.getIntValue(Constants.KEY_CODEC_COMPRESS_TYPE,Constants.DEFAULT_CODEC_COMPRESS_TYPE));this.compressThreshold = this.configManager.getIntValue(Constants.KEY_CODEC_COMPRESS_THRESHOLD,Constants.DEFAULT_CODEC_COMPRESS_THRESHOLD);this.checksum = this.configManager.getBooleanValue(Constants.KEY_CODEC_CHECKSUM_ENABLE,Constants.DEFAULT_CODEC_CHECKSUM_ENABLE);// 注册配置变更监听器configManager.registerConfigChangeListener(new InnerConfigChangeListener());}public boolean isCompress(int frameSize) {if (compressed) {// 只有数据包大于指定阈值才进行压缩if (frameSize > compressThreshold) {return true;}}return false;}public CompressType getCompressType() {return compressType;}// 根据配置code,获取具体的压缩类型枚举成员private final CompressType getCompressType(byte code) {CompressType compressType = CompressType.None;try {compressType = CompressType.getCompressType(code);} catch (Exception e) {logger.error("Invalid compressType. code:" + code, e);}return compressType;}public boolean isChecksum() {return checksum;}// 监听更新相关压缩配置private class InnerConfigChangeListener implements ConfigChangeListener {@Overridepublic void onKeyUpdated(String key, String value) {if (key.endsWith(Constants.KEY_CODEC_COMPRESS_ENABLE)) {try {compressed = Boolean.valueOf(value);} catch (RuntimeException e) {}} else if (key.endsWith(Constants.KEY_CODEC_COMPRESS_TYPE)) {try {compressType = getCompressType(Byte.valueOf(value));} catch (RuntimeException e) {}} else if (key.endsWith(Constants.KEY_CODEC_COMPRESS_THRESHOLD)) {try {compressThreshold = Integer.valueOf(value);} catch (RuntimeException e) {}} else if (key.endsWith(Constants.KEY_CODEC_CHECKSUM_ENABLE)) {try {checksum = Boolean.valueOf(value);} catch (RuntimeException e) {}}}@Overridepublic void onKeyAdded(String key, String value) {// TODO Auto-generated method stub}@Overridepublic void onKeyRemoved(String key) {// TODO Auto-generated method stub}}
}

再看看CompressHandler的具体实现:

public class CompressHandler extends SimpleChannelHandler {private static Compress gZipCompress = CompressFactory.getGZipCompress();private static Compress snappyCompress = CompressFactory.getSnappyCompress();private CodecConfig codecConfig;public CompressHandler(CodecConfig codecConfig) {this.codecConfig = codecConfig;}@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {if (e.getMessage() == null || !(e.getMessage() instanceof CodecEvent)) {return;}CodecEvent codecEvent = (CodecEvent) e.getMessage();if (codecEvent.isValid()) {if (codecEvent.isUnified()) {// 统一协议请求ChannelBuffer buffer = doUnCompress(e.getChannel(), codecEvent);codecEvent.setBuffer(buffer);}}Channels.fireMessageReceived(ctx, codecEvent, e.getRemoteAddress());}@Overridepublic void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {if (!(e instanceof MessageEvent)) {ctx.sendDownstream(e);return;}MessageEvent evt = (MessageEvent) e;if (!(evt.getMessage() instanceof CodecEvent)) {ctx.sendDownstream(evt);return;}CodecEvent codecEvent = (CodecEvent) evt.getMessage();if (codecEvent.isUnified()) {ChannelBuffer buffer = doCompress(e.getChannel(), codecEvent);codecEvent.setBuffer(buffer);write(ctx, evt.getFuture(), codecEvent, evt.getRemoteAddress());} else {ctx.sendDownstream(e);}}private ChannelBuffer doUnCompress(Channel channel, CodecEvent codecEvent)throws IOException {ChannelBuffer frame = codecEvent.getBuffer();// 获取command值byte command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);//获取command的第6、7位short compress = (short) (command & 0x60);if (compress == 0x00) {return frame;}// 获取数据包体长度int totalLength = frame.getInt(frame.readerIndex() + CodecConstants._HEAD_LENGTH);// 减去头部值int compressLength = totalLength - CodecConstants._HEAD_FIELD_LENGTH;byte[] in;byte[] out = null;ChannelBuffer result;switch (compress) {case 0x00: // 没有压缩处理,直接返回return frame;case 0x20: // 解压缩处理in = new byte[compressLength];// 减去消息头长度frame.getBytes(frame.readerIndex() + CodecConstants._FRONT_LENGTH, in);// 使用snappy压缩out = snappyCompress.unCompress(in);codecEvent.setIsCompress(true);break;case 0x40: // 和0X20类似处理,只是压缩方式不同in = new byte[compressLength];frame.getBytes(frame.readerIndex() + CodecConstants._FRONT_LENGTH, in);// 使用gZip压缩out = gZipCompress.unCompress(in);codecEvent.setIsCompress(true);break;case 0x60:throw new IllegalArgumentException("Invalid compress type.");}// 实际长度是解压缩后长度+头部长度int _totalLength = CodecConstants._HEAD_FIELD_LENGTH + out.length;result = channel.getConfig().getBufferFactory().getBuffer(_totalLength + CodecConstants._FRONT_LENGTH_);// 写长度result.writeBytes(frame, frame.readerIndex(), CodecConstants._HEAD_LENGTH);result.writeInt(_totalLength);// 写解压缩的数据包result.writeBytes(frame, frame.readerIndex() + CodecConstants._FRONT_LENGTH_,CodecConstants._HEAD_FIELD_LENGTH);result.writeBytes(out);return result;}private ChannelBuffer doCompress(Channel channel, CodecEvent codecEvent)throws IOException {ChannelBuffer frame = codecEvent.getBuffer();// 取第三位字节commandint command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);//compressChannelBuffer result = frame;int frameLength = frame.readableBytes();if (codecConfig.isCompress(frameLength)) {CompressType compressType = codecConfig.getCompressType();// 设置command第6、7位来定义压缩类型,并进行相应的压缩操作switch (compressType) {case None:command = command | 0x00;break;case Snappy:command = command | 0x20;result = doCompress0(channel, frame, frameLength, snappyCompress);break;case Gzip:command = command | 0x40;result = doCompress0(channel, frame, frameLength, gZipCompress);break;}} else {// 默认不压缩command = command | 0x00;}// 更新command位值int oldWriteIndex = result.writerIndex();result.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);result.writeByte(command);result.writerIndex(oldWriteIndex);return result;}private ChannelBuffer doCompress0(Channel channel, ChannelBuffer frame,int frameLength, Compress compress)throws IOException {ChannelBuffer result;// 数据体长度位消息包长度减去请求头int bodyLength = frameLength - CodecConstants._FRONT_LENGTH;byte[] in = new byte[bodyLength];// 复制数据体到inframe.getBytes(CodecConstants._FRONT_LENGTH, in, 0, bodyLength);// 调用传入的压缩器类型完成压缩byte[] out = compress.compress(in);byte[] lengthBuf = new byte[CodecConstants._HEAD_FIELD_LENGTH];// 获取frame第8~10位到lengBufframe.getBytes(CodecConstants._FRONT_LENGTH_, lengthBuf, 0, lengthBuf.length);// 压缩后的长度+2int totalLength = out.length + lengthBuf.length;// 压缩后的长度+2+8int _frameLength = totalLength + CodecConstants._FRONT_LENGTH_;result = dynamicBuffer(_frameLength, channel.getConfig().getBufferFactory());// 写入前4位result.writeBytes(frame, frame.readerIndex(), CodecConstants._HEAD_LENGTH);// 写入4~8位result.writeInt(totalLength);// 写入8~10位result.writeBytes(lengthBuf);// 写入压缩内容result.writeBytes(out);return result;}
}

ProviderDecoder

在完成消息包体解析、消息解压缩等处理后,可以开始真正对数据包解码,以解析出DefaultRequest请求对象。ProviderDecoder继承了pigeon实现的AbstractDecoder,而AbstractDecoder又继承netty实现的OneToOneDecoder,具体继承类型如下所示:

在Netty的OneToOneDecoder中,实现了ChannelUpstreamHandler接口的handleUpstream方法,在内部调用了抽象方法decode来尝试对消息进行解码,如果解码前后,消息未发生变化,会继续委托给上游处理,如果解码结果为null,则丢弃本消息,否则会调用Channels#fireMessageReceived方法触发上游方法调用。而抽象模版方法decode在AbstractDecoder中有了实现,可以先看AbstractDecoder的实现:

public abstract class AbstractDecoder extends OneToOneDecoder {private static final Logger logger = LoggerLoader.getLogger(AbstractDecoder.class);@Overridepublic Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)throws Exception {if (msg == null || !(msg instanceof CodecEvent)) {return null;}CodecEvent codecEvent = (CodecEvent) msg;if (codecEvent.isValid()) {Object message = null;if (codecEvent.isUnified()) {// 解码统一协议请求message = _doDecode(ctx, channel, codecEvent);codecEvent.setInvocation((InvocationSerializable) message);} else {// 解码非统一协议请求message = doDecode(ctx, channel, codecEvent);codecEvent.setInvocation((InvocationSerializable) message);}}return codecEvent;}protected Object doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent)throws IOException {Object msg = null;ChannelBuffer buffer = codecEvent.getBuffer();// 忽略头部两位(用于确认是否为统一协议的)buffer.skipBytes(CodecConstants.MEGIC_FIELD_LENGTH);// 第3位表示序列化方式byte serialize = buffer.readByte();Long sequence = null;try {// 4-8位表示数据体长int totalLength = buffer.readInt();// 总消息包长=消息体+头部int frameLength = totalLength + CodecConstants.FRONT_LENGTH;// 数据体长度 = 消息体 - 尾部11位长度int bodyLength = (totalLength - CodecConstants.TAIL_LENGTH);// 获取数据体长度,不包括尾部ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), bodyLength);buffer.readerIndex(buffer.readerIndex() + bodyLength);// 获取8位序列号sequence = buffer.readLong();// 跳过3位拓展位buffer.skipBytes(CodecConstants.EXPAND_FIELD_LENGTH);//deserializeChannelBufferInputStream is = new ChannelBufferInputStream(frame);// 根据传入的序列化类型调用抽象方法完成序列化msg = deserialize(serialize, is);//afterdoAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime());} catch (Throwable e) {SerializationException se = new SerializationException(e);try {if (sequence != null) {// 调用子类实现的抽象模版方法处理失败响应doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(sequence.longValue(),serialize, se));}logger.error("Deserialize failed. host:"+ ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress()+ "\n" + e.getMessage(), se);} catch (Throwable t) {logger.error("[doDecode] doFailResponse failed.", t);}}return msg;}protected Object _doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent) throws IOException {Object msg = null;ChannelBuffer buffer = codecEvent.getBuffer();try {//magicbuffer.skipBytes(CodecConstants._MEGIC_FIELD_LENGTH);//versionbuffer.readByte();//serializebyte serialize = (byte) (buffer.readByte() & 0x1f);serialize = SerializerFactory.convertToSerialize(serialize);int totalLength = buffer.readInt();int frameLength = totalLength + CodecConstants._FRONT_LENGTH_;ChannelBuffer frameBody = extractFrame(buffer, buffer.readerIndex(), totalLength);buffer.readerIndex(buffer.readerIndex() + totalLength);ChannelBufferInputStream is = new ChannelBufferInputStream(frameBody);//deserializemsg = deserialize(serialize, is);//doAfterdoAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime());} catch (Throwable e) {logger.error("Deserialize failed. host:"+ ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress()+ "\n" + e.getMessage(), e);}return msg;}// 获取buffer的一个子序列,从index开始,长度位lengthprotected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length) {ChannelBuffer frame = buffer.slice(index, length);return frame;}private Object doAfter(Channel channel,Object msg,byte serialize,int frameLength,long receiveTime)throws IOException {if (msg instanceof InvocationSerializable) {InvocationSerializable msg_ = (InvocationSerializable) msg;int msgType = msg_.getMessageType();// 注入size属性if (msgType == Constants.MESSAGE_TYPE_SERVICE && frameLength > 0) {msg_.setSize(frameLength);}// 注入序列化类型msg_.setSerialize(serialize);// 调用子类实现的抽象消息初始化方法doInitMsg(msg, channel, receiveTime);}return msg;}protected abstract Object deserialize(byte serializerType, InputStream is);protected abstract Object doInitMsg(Object message, Channel channel, long receiveTime);protected abstract void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response);}

在子类ProviderDecoder中,实现了AbstractDecoder的三个抽象方法:

public class ProviderDecoder extends AbstractDecoder {@Overridepublic Object doInitMsg(Object message, Channel channel, long receiveTime) {if (message == null) {return null;}// 更新请求创建时间InvocationRequest request = (InvocationRequest) message;request.setCreateMillisTime(receiveTime);return request;}@Overridepublic void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response) {// 获取Channel并回写客户端NettyServerChannel nettyChannel = new NettyServerChannel(channel);nettyChannel.write(null, response);}@Overridepublic Object deserialize(byte serializerType, InputStream is) {// 根据序列化类型获取指定的序列化工具类来完成序列化工作Object decoded = SerializerFactory.getSerializer(serializerType).deserializeRequest(is);return decoded;}
}

NettyServerHandler

NettyServerHandler继承自SimpleChannelUpstreamHandler,是pigeon服务提供方pipeline中最上游的一个Handler,调用了服务提供方处理逻辑的拦截器链,核心实现如下:

public class NettyServerHandler extends SimpleChannelUpstreamHandler {@Overridepublic void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {// 在SimpleChannelUpstreamHandler基础上,添加一个debug处理if (log.isDebugEnabled()) {if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {log.debug(e.toString());}}// 父类根据事件类型,进行向下转型,并交由响应的方法处理,如MessageEvent则调用messageReceived方法super.handleUpstream(ctx, e);}/*** 服务器端接受到消息** @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,* org.jboss.netty.channel.MessageEvent)*/@SuppressWarnings("unchecked")@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {// 解密获取真正的InvocationRequestCodecEvent codecEvent = (CodecEvent) (message.getMessage());if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {return;}InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();// 用DefaultProviderContext包装InvocationRequest请求ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));try {// 调用注册的拦截器链处理请求this.server.processRequest(request, invocationContext);} catch (Throwable e) {String msg = "process request failed:" + request;// 心跳消息只返回正常的, 异常不返回if (request.getCallType() == Constants.CALLTYPE_REPLY&& request.getMessageType() != Constants.MESSAGE_TYPE_HEART) {ctx.getChannel().write(ProviderUtils.createFailResponse(request, e));}log.error(msg, e);}}
}

ProviderEncoder

ProviderEncoder继承自AbstractEncoder,用于对消息响应进行加密处理,类似于ProviderDecoder的实现,下面展示其继承类图:

从pigeon实现从下往上看,OneToOneEncoder类似前面的OneToOneDecoder实现,核心编码实现通过调用了内部的抽象方法encode实现,encode方法在AbstractEncoder中实现:

public abstract class AbstractEncoder extends OneToOneEncoder {private static final Logger logger = LoggerLoader.getLogger(AbstractEncoder.class);public abstract void serialize(byte serializer, OutputStream os, Object obj, Channel channel) throws IOException;@Overridepublic Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {if (msg instanceof InvocationSerializable) {InvocationSerializable _msg = (InvocationSerializable) msg;try {ChannelBuffer frame;CodecEvent codecEvent;if (msg instanceof UnifiedInvocation) {// 统一协议请求编码frame = _doEncode(channel, (UnifiedInvocation) _msg);// 用CodecEvent包装消息包codecEvent = new CodecEvent(frame, true);} else {// 非统一协议请求编码frame = doEncode(channel, _msg);// 用CodecEvent包装消息包codecEvent = new CodecEvent(frame, false);}return codecEvent;} catch (Exception e) {SerializationException se = new SerializationException(e);try {// 调用子类实现的抽象模版方法处理编码响应doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(_msg,_msg.getSerialize(), se));} catch (Throwable t) {}logger.error(e.getMessage(), se);throw se;}} else {throw new SerializationException("Invalid message format");}}protected ChannelBuffer doEncode(Channel channel, InvocationSerializable msg)throws IOException {ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,channel.getConfig().getBufferFactory()));// 魔数, 0x39(57),0x3A(58)os.write(CodecConstants.MAGIC);// 序列化方式os.writeByte(msg.getSerialize());// 数据包体大小,包括消息尾部,先写最大值占位os.writeInt(Integer.MAX_VALUE);// 调用抽象模版方式序列化serialize(msg.getSerialize(), os, msg, channel);ChannelBuffer frame = os.buffer();// 写入递增序列号frame.writeLong(msg.getSequence());// 写入拓展位frame.writeBytes(CodecConstants.EXPAND);// 根据序列化后的数据包体长度更新消息体长度,这里包含消息尾frame.setInt(CodecConstants.HEAD_LENGTH, frame.readableBytes() -CodecConstants.FRONT_LENGTH);doAfter(msg, frame.readableBytes());return frame;}protected ChannelBuffer _doEncode(Channel channel, UnifiedInvocation msg)throws IOException {ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,channel.getConfig().getBufferFactory()));// 魔数, 0xAB(171),0xBA(186)os.write(CodecConstants._MAGIC);// 写入协议版本os.writeByte(msg.getProtocolVersion());// 写入序列化方式byte serialize = SerializerFactory.convertToUnifiedSerialize(msg.getSerialize());os.writeByte(serialize);// 数据包体大小,包括消息尾部,先写最大值占位os.writeInt(Integer.MAX_VALUE);// 调用抽象模版方式序列化serialize(msg.getSerialize(), os, msg, channel);ChannelBuffer frame = os.buffer();// 根据序列化后的数据包体长度更新消息体长度,非统一协议不包含消息尾frame.setInt(CodecConstants._HEAD_LENGTH, frame.readableBytes() -CodecConstants._FRONT_LENGTH_);doAfter(msg, frame.readableBytes());return frame;}private void doAfter(Object msg,int frameLength) throws IOException {if (msg instanceof InvocationSerializable) {InvocationSerializable msg_ = (InvocationSerializable) msg;int msgType = msg_.getMessageType();// 注入size属性if (msgType == Constants.MESSAGE_TYPE_SERVICE && frameLength > 0) {msg_.setSize(frameLength);}}}public abstract void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response);}

再看ProviderEncoder实现了AbstractEncoder的三个方法:

public class ProviderEncoder extends AbstractEncoder {public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {// 无实际意义的重写实现Object encoded = super.encode(ctx, channel, msg);return encoded;}@Overridepublic void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response) {// 直接写回客户端Channels.write(channel, response);}@Overridepublic void serialize(byte serializerType, OutputStream os, Object obj, Channel channel) throws IOException {// 根据序列化方式获取指定的序列化工具类完成序列化操作,将序列化结果写入os中SerializerFactory.getSerializer(serializerType).serializeResponse(os, obj);}}

FramePrepender

FramePrepender也是继承自OneToOneEncoder,重写了内部的encode方法,实现及其简单,判断如果消息类型是CodecEvent,则提取出消息的具体消息字节码内容,具体实现:

public class FramePrepender extends OneToOneEncoder {@Overrideprotected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {if (msg == null || !(msg instanceof CodecEvent)) {return null;} else {// 提取CodecEvent的buffer数据,即实际的消息字节码return ((CodecEvent) msg).getBuffer();}}
}

客户端实现

下面先看NettyClientPipelineFactory的实现:

public class NettyClientPipelineFactory implements ChannelPipelineFactory {// 客户端连接实例引用private NettyClient client;// 通过编解码工厂,获取单例编解码配置private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();public NettyClientPipelineFactory(NettyClient client) {this.client = client;}// 初始化pipelinepublic ChannelPipeline getPipeline() throws Exception {ChannelPipeline pipeline = pipeline();pipeline.addLast("framePrepender", new FramePrepender());pipeline.addLast("frameDecoder", new FrameDecoder());pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));pipeline.addLast("compressHandler", new CompressHandler(codecConfig));pipeline.addLast("invokerDecoder", new InvokerDecoder());pipeline.addLast("invokerEncoder", new InvokerEncoder());pipeline.addLast("clientHandler", new NettyClientHandler(this.client));return pipeline;}}

从上面看到,大部分Handler与服务方类型,区别只有3个Handler:FramePrepender,FrameDecoder,NettyClientHandler
对于上面所有的Handler,类似服务方,同样可以分为3类:

  1. UpStreamHandler

    1. FrameDecoder
    2. invokerDecoder
    3. NettyClientHandler
  2. DownStreamHandler
    1. FramePrepender
    2. invokerEncoder
  3. 双向Handler
    1. Crc32Handler
    2. compressHandler

结合代码分析,最终的调用时序如下所示:

sequenceDiagram
客户端->>FrameDecoder: 开始处理请求
FrameDecoder->>Crc32Handler:decode
Crc32Handler->>compressHandler: messageReceived
compressHandler->>invokerDecoder: messageReceived
invokerDecoder->>NettyClientHandler: AbstractDecoder.decode
NettyClientHandler->>invokerEncoder: handleUpstream
invokerEncoder->>compressHandler: AbstractEncoder.encode
compressHandler->>Crc32Handler:  handleDownstream
Crc32Handler->>FramePrepender:  handleDownstream
FramePrepender->>客户端: encode

上面的所有Handler中,除了NettyClientHandler,其他Handler的处理逻辑基本和服务端实现是一致的,下面具体看看NettyClientHandler的实现原理:

NettyClientHandler

NettyClientHandler是客户端请求处理链中最顶层的Handler,用于完成处理服务端调用响应的具体逻辑,里面的核心方法实现是messageReceived:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {CodecEvent codecEvent = (CodecEvent) e.getMessage();if (codecEvent.isValid() && codecEvent.getInvocation() != null) {// 请求有效且响应消息不为nullclient.processResponse((InvocationResponse) codecEvent.getInvocation());}
}

调用了NettyClient#processResponse方法:

public void processResponse(InvocationResponse response) {this.responseProcessor.processResponse(response, this);
}

this.responseProcessor是ResponseThreadPoolProcessor实例,继承了AbstractResponseProcessor抽象类,上面调用的是AbstractResponseProcessor#processResponse方法:

public void processResponse(InvocationResponse response, Client client) {try {doProcessResponse(response, client);} catch (Throwable e) {String error = String.format("process response failed:%s, processor stats:%s", response,getProcessorStatistics());logger.error(error, e);if (monitor != null) {monitor.logError(error, e);}}
}

doProcessResponse是一个抽象方法,在子类ResponseThreadPoolProcessor中实现:

public class ResponseThreadPoolProcessor extends AbstractResponseProcessor {private static ThreadPool responseProcessThreadPool;public ResponseThreadPoolProcessor() {ConfigManager configManager = ConfigManagerLoader.getConfigManager();int corePoolSize = configManager.getIntValue(Constants.KEY_RESPONSE_COREPOOLSIZE,Constants.DEFAULT_RESPONSE_COREPOOLSIZE);int maxPoolSize = configManager.getIntValue(Constants.KEY_RESPONSE_MAXPOOLSIZE,Constants.DEFAULT_RESPONSE_MAXPOOLSIZE);int queueSize = configManager.getIntValue(Constants.KEY_RESPONSE_WORKQUEUESIZE,Constants.DEFAULT_RESPONSE_WORKQUEUESIZE);responseProcessThreadPool = new DynamicThreadPool("Pigeon-Client-Response-Processor", corePoolSize,maxPoolSize, queueSize, new CallerRunsPolicy(), false, false);}public void stop() {ThreadPoolUtils.shutdown(responseProcessThreadPool.getExecutor());}// 重写父类的抽象模版方法完成具体的处理响应逻辑public void doProcessResponse(final InvocationResponse response, final Client client) {Runnable task = new Runnable() {public void run() {// 调用ServiceInvocationRepository单例的receiveResponse方法处理接收的请求ServiceInvocationRepository.getInstance().receiveResponse(response);}};try {// 由根据配置初始化的线程池来执行相关信息responseProcessThreadPool.execute(task);} catch (RejectedExecutionException e) {String error = String.format("process response failed:%s, processor stats:%s", response,getProcessorStatistics());throw new RejectedException(error, e);}}@Overridepublic String getProcessorStatistics() {// 获取当前响应处理线程池的实际处理情况ThreadPoolExecutor e = responseProcessThreadPool.getExecutor();String stats = String.format("response pool size:%d(active:%d,core:%d,max:%d,largest:%),task count:%d(completed:%d),queue size:%d",e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.getQueue().size());return stats;}@Overridepublic ThreadPool getResponseProcessThreadPool() {return responseProcessThreadPool;}
}

具体处理接收响应的逻辑如下:

ublic void receiveResponse(InvocationResponse response) {RemoteInvocationBean invocationBean = invocations.get(response.getSequence());if (invocationBean != null) {if (logger.isDebugEnabled()) {logger.debug("received response:" + response);}InvocationRequest request = invocationBean.request;try {// 处理回调Callback callback = invocationBean.callback;if (callback != null) {Client client = callback.getClient();if (client != null) {// 记录请求流出ServiceStatisticsHolder.flowOut(request, client.getAddress());}// 向callback对象注入responsecallback.callback(response);// 通知等待线程接收响应callback.run();}} finally {invocations.remove(response.getSequence());}}
}

callback.run方法会唤醒之前发送请求的拦截器链等待获取响应的线程,并根据callback.callback注入的response对象,来完成对请求响应的处理,具体实现在pigeon的CallbackFuture中,看看两个核心方法实现:

public class CallbackFuture implements Callback, CallFuture {@Overridepublic void callback(InvocationResponse response) {this.response = response;}@Overridepublic void run() {lock.lock();try {this.done = true;if (condition != null) {condition.signal();}} finally {lock.unlock();}}
}

【Pigeon源码阅读】RPC底层通信实现原理(八)相关推荐

  1. pytorch load state dict_pytorch源码阅读(二)optimizer原理

    pytorch包含多种优化算法用于网络参数的更新,比如常用的SGD.Adam.LBFGS以及RMSProp等.使用中可以发现各种优化算法的使用方式几乎相同,是因为父类optimizer[1]定义了各个 ...

  2. Rpc框架dubbo-client(v2.6.3) 源码阅读(二)

    接上一篇 dubbo-server 之后,再来看一下 dubbo-client 是如何工作的. dubbo提供者服务示例, 其结构是这样的! dubbo://192.168.11.6:20880/co ...

  3. 源码 状态机_[源码阅读] 阿里SOFA服务注册中心MetaServer(1)

    [源码阅读] 阿里SOFA服务注册中心MetaServer(1) 0x00 摘要 0x01 服务注册中心 1.1 服务注册中心简介 1.2 SOFARegistry 总体架构 1.3 为什么要分层 0 ...

  4. [源码阅读] 阿里SOFA服务注册中心MetaServer(1)

    0x00 摘要 SOFARegistry 是蚂蚁金服开源的一个生产级.高时效.高可用的服务注册中心.本系列将带领大家一起分析其MetaServer的实现机制,本文为第一篇,介绍MetaServer总体 ...

  5. 多线程与高并发(四):LockSupport,高频面试题,AQS源码,以及源码阅读方法论

    补充几道面试题 锁升级过程:无锁.偏向锁.轻量级锁.重量级锁 StampedLock 自己看一下 面试题:syn和Reentrantlock的区别? LockSupport LockSupport.p ...

  6. 【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

    1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:[Flink]Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl ...

  7. Spark源码阅读——任务提交过程

    2019独角兽企业重金招聘Python工程师标准>>> Spark 源码阅读--任务提交过程 当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者 ...

  8. MyBatis 源码阅读 -- 核心操作篇

    核心操作包是 MyBatis 进行数据库查询和对象关系映射等工作的包.该包中的类能完成参数解析.数据库查询.结果映射等主要功能.在主要功能的执行过程中还会涉及缓存.懒加载.鉴别器处理.主键自增.插件支 ...

  9. React 表单源码阅读笔记

    1 概念 1.1 什么是表单 实际上广义上的表单并不是特别好界定,维基上讲表单是一系列带有空格的文档,用于输写或选择.更具体的,在网页中表单主要负责数据采集的功能,我们下文中所提到的表单都指后者.如下 ...

  10. Dubbo注册协议原理以及源码阅读

    前言 继上次小编所讲RPC协议暴露服务并且远程调用之后,小编这次给大家带来注册中心协议整体流程原理以及源码精讲,Dubbo协议服务暴露与引用以及源码分析文章中,远程服务暴露可以只通过RPC协议即可,那 ...

最新文章

  1. mysqlsla slow-query常用用法
  2. lvs为何不能完全替代DNS轮询--转
  3. Andorid之taskAffinity 和 FLAG_ACTIVITY_NEW_TASK
  4. 小小鸡蛋竟然能够承受1200kg重量!?
  5. TCL座机日期时间调整
  6. 人脸识别门禁_小区人脸识别门禁或取代传统门禁刷卡方式
  7. Windows Azure 配置Active Directory 主机(1)
  8. php日期转时间戳,指定日期转换成时间戳
  9. 泛微移动端数据库 :H2数据库
  10. 金蝶kis专业版公网访问_金蝶KIS系统专业版客户端连接不上服务器处理方法
  11. java成员变量的调用_java中对象调用成员变量与成员的方法介绍
  12. CSS实现文字动画效果
  13. 利用Hessian矩阵的Frangi 滤波器-python版本
  14. C语言实现url的编码和解码
  15. html 怎么检测ie浏览器的最高版本,检测是否为IE浏览器及IE浏览器的版本
  16. 【opencv-python】视频处理(4) cv2.VideoCapture.get()函数、cv2.VideoCapture.set()函数
  17. 张艾迪(创始人):解码互联网天才
  18. 计算机中术语中bit的含义是,bit的用法总结大全
  19. 用HTML/CSS制作个人简历
  20. 小车自动往返工作原理_自动往返小车

热门文章

  1. ArcGIS学习总结(六)——地形分析-DEM应用
  2. Linux tc QOS 详解
  3. 从Spring为什么要用IoC的支点,我撬动了整个Spring的源码脉络
  4. ligerui之控件列表初始化设置
  5. 站在潮流前沿,不到100行代码快速实现一个简易版 vite
  6. python字典forward_python工具库库介绍-bidict: 双向字典
  7. 【原创】我所亲证的气功层次 ——了空居士
  8. vbox虚拟机系统转移到vmware虚拟机中
  9. github 下载慢下载失败?不存在的!!!
  10. 《冰河世纪》特效指导罗皓做客【ftrack聊天室】