RocketMQ源码解析-PullConsumer取消息(2)
如果在调用DefaultMQPullConsumer的pull方法的时候添加了pullcallback参数,那么就会调用DefaultMQPullConsumerImpl的pullAsyncImpl()方法进行异步发送。
private void pullAsyncImpl(//final MessageQueue mq,//final String subExpression,//final long offset,//final int maxNums,//final PullCallback pullCallback,//final boolean block,//final long timeout) throws MQClientException, RemotingException, InterruptedException {this.makeSureStateOK();if (null == mq) {throw new MQClientException("mq is null", null);}if (offset < 0) {throw new MQClientException("offset < 0", null);}if (maxNums <= 0) {throw new MQClientException("maxNums <= 0", null);}if (null == pullCallback) {throw new MQClientException("pullCallback is null", null);}this.subscriptionAutomatically(mq.getTopic());try {int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);final SubscriptionData subscriptionData;try {subscriptionData =FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//mq.getTopic(), subExpression);} catch (Exception e) {throw new MQClientException("parse subscription error", e);}long timeoutMillis =block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;this.pullAPIWrapper.pullKernelImpl(//mq, // 1subscriptionData.getSubString(), // 20L, // 3offset, // 4maxNums, // 5sysFlag, // 60, // 7this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8timeoutMillis, // 9CommunicationMode.ASYNC, // 10new PullCallback() {@Overridepublic void onException(Throwable e) {pullCallback.onException(e);}@Overridepublic void onSuccess(PullResult pullResult) {pullCallback.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));}});} catch (MQBrokerException e) {throw new MQClientException("pullAsync unknow exception", e);}
}
与同步发送的方法区别最大之处在于,异步发送的时候会要求传入一个实现了pullCallBack接口的参数,可以看到pullCallBack接口的组成。
public void onSuccess(final PullResult pullResult);public void onException(final Throwable e);
可以很清楚的看到实现了这个接口对于提交的请求的成功接收与异常处理明确提出了实现的要求,用以处理异步发送的请求在接收到之后的处理。在PullAPIWrapper传递参数的时候,会将pullCallBack外面在包一层pullCallBack。
而异步处理在PullAPIWrapper完成了requestHeader的封装之后,将会交给客户端实例的MQClientAPIImpl来继续发送取消息的要求。
public PullResult pullMessage(//final String addr,//final PullMessageRequestHeader requestHeader,//final long timeoutMillis,//final CommunicationMode communicationMode,//final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC:this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC:return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break;}return null;
}
在这里可以清楚见到,异步请求比同步请求多了一个pullCallBack参数,用来处理异步接受消息接收的操作。
private void pullMessageAsync(//final String addr,// 1final RemotingCommand request,//final long timeoutMillis,//final PullCallback pullCallback//
) throws RemotingException, InterruptedException {this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {RemotingCommand response = responseFuture.getResponseCommand();if (response != null) {try {PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);assert pullResult != null;pullCallback.onSuccess(pullResult);}catch (Exception e) {pullCallback.onException(e);}}else {if (!responseFuture.isSendRequestOK()) {pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));}else if (responseFuture.isTimeout()) {pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",responseFuture.getCause()));}else {pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));}}}});
}
可以清楚见到,在提交给netty客户端的实现了新的类继承了InvokeCallBack接口,在接口定义的operationComplete()方法中清楚地在完成了对拉取消息的请求进行处理(此处与同步发送方式一样)之后,根据拉取消息的成功与否,异常出现与否调用onSuccess()方法或者onException()方法实现自己在pullCallBack当中对相应拉取消息结果的处理。
那么究竟,是如何在netty客户端实现的呢?
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis,InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {// test the channel writable or notif (!channel.isWritable()) {throw new RemotingTooMuchRequestException(String.format("the channel[%s] is not writable now", channel.toString()));}try {if (this.rpcHook != null) {this.rpcHook.doBeforeRequest(addr, request);}this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);}catch (RemotingSendRequestException e) {log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;}}else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}
}
在NettyRemotingClient里,首先,会给传进来的Broker地址尝试寻找相应的通道。首先会尝试在map里寻找地址与ChannelWrapper的键值对,如果没有寻找到,则会尝试从新建立一个通道。ChannelWrapper作为NettyRemotingClient的内部类,用来包装ChannelFuture来维护通道对象。
private Channel createChannel(final String addr) throws InterruptedException {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}// 进入临界区后,不能有阻塞操作,网络连接采用异步方式if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {try {boolean createNewConnection = false;cw = this.channelTables.get(addr);if (cw != null) {// channel正常if (cw.isOK()) {return cw.getChannel();}// 正在连接,退出锁等待else if (!cw.getChannelFuture().isDone()) {createNewConnection = false;}// 说明连接不成功else {this.channelTables.remove(addr);createNewConnection = true;}}// ChannelWrapper不存在else {createNewConnection = true;}if (createNewConnection) {ChannelFuture channelFuture =this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);cw = new ChannelWrapper(channelFuture);this.channelTables.put(addr, cw);}}catch (Exception e) {log.error("createChannel: create channel exception", e);}finally {this.lockChannelTables.unlock();}}else {log.warn("createChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis);}if (cw != null) {ChannelFuture channelFuture = cw.getChannelFuture();if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {if (cw.isOK()) {log.info("createChannel: connect remote host[{}] success, {}", addr,channelFuture.toString());return cw.getChannel();}else {log.warn("createChannel: connect remote host[" + addr + "] failed, "+ channelFuture.toString(), channelFuture.cause());}}else {log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr,this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString());}}return null;
}
在这里会尝试与地址进行连接建立通道。
在得到通道之后,在调用rpc钩子来调用InvokeAsyncImpl()方法来继续发送。
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);final ResponseFuture responseFuture =new ResponseFuture(request.getOpaque(), timeoutMillis, invokeCallback, once);this.responseTable.put(request.getOpaque(), responseFuture);try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;}else {responseFuture.setSendRequestOK(false);}responseFuture.putResponse(null);responseTable.remove(request.getOpaque());try {responseFuture.executeInvokeCallback();}catch (Throwable e) {plog.warn("excute callback in writeAndFlush addListener, and callback throw", e);}finally {responseFuture.release();}plog.warn("send a request command to channel <{}> failed.",RemotingHelper.parseChannelRemoteAddr(channel));plog.warn(request.toString());}});}catch (Exception e) {responseFuture.release();plog.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel)+ "> Exception", e);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}}else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");}else {String info =String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //timeoutMillis,//this.semaphoreAsync.getQueueLength(),//this.semaphoreAsync.availablePermits()//);plog.warn(info);plog.warn(request.toString());throw new RemotingTimeoutException(info);}}
}
在这里,对已经处理过pullCallBack的InvokeCallBack进行进一步的封装。在这里,对于消息的异步请求的回答,被封装成了ResponseFuture。
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
private final int opaque;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);// 保证信号量至多至少只被释放一次
private final SemaphoreReleaseOnlyOnce once;// 保证回调的callback方法至多至少只被执行一次
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
这里用来封装的ResponseFuture完成了用户对于消息接收的具体操作的封装,以及对于消息被处理操作的线程安全处理。
由此可见,ResponseFuture又被封装在了netty给出的ChannelFutureListener当中,实现了operationComplete方法来完成对于消息异步请求接受的处理。
在ChannelFutureListener中,调用ResponseFuture的executeInvokeCallBack()方法,来具体进行对于response的异步操作,这里response并没有直接调用自己的实际操作方法,而是先尝试给atomicBoolean类进行cas操作,用以完成请求的单次操作。
ChannelFutureListener作为监听器,加入在了这个Channel当中作为监听器等待消息的异步处理。
取消息不支持oneway。
RocketMQ源码解析-PullConsumer取消息(2)相关推荐
- RocketMQ源码解析-PullConsumer取消息(1)
PullConsumer取消息需要自己手动调用Consumer的pull方法主动拉取消息.需要的参数有具体的消息队列(调用消费者的fetchSubscibeMessageQueue()可以得到相应to ...
- RocketMQ源码解析之延迟消息实现原理
原创不易,转载请注明出处 文章目录 前言 1.延时消息的demo 2.实现的原理 前言 今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场 ...
- RocketMQ源码解析之消息消费者(consume Message)
原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- RocketMQ源码解析:Filtersrv
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- rocketmq源码解析之name启动(一)
2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...
- 消息中间件RocketMQ源码解析-- --调试环境搭建
1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...
最新文章
- nyoj 776 删除元素
- 云计算市场竞争激烈 亚马逊微软IBM阿里业绩亮眼
- 好好学python · 内置函数(range(),zip(),sorted(),map(),reduce(),filter())
- 浅析Java的“克隆”方法[zt]
- python之函数用法islower()
- Redis服务信息--Info指令
- python过滤敏感词汇_浅谈Python 敏感词过滤的实现
- Oracle PLSQL 客户端 连接Oracle12.2 出现权限问题的解决办法以及绿色版Oracle客户端的使用....
- 每日codewars题之判断一个数是否是水仙花数
- JIRA状态为任务结束,但是解决结果为未解决相关配置
- 格签名困难假设: 最短向量问题SVP
- 当前版本与卡刷包android_安卓7.0 xposed框架卡刷包
- java read dxf xdata_DXF格式文件数据提取
- ajax请求的所有状态码详解
- 唐巧:技术人如何成为管理者
- 黑苹果制作Clover开机引导脱离U盘,小新Pro13详细教程
- 【Cicadaplayer】播放器的时间戳
- android手机常用分辨率
- 做完系统回来计算机连接不上网络,电脑重装系统后网络连接不上怎么处理
- ABS系列一:揭开资产证券化的面纱,简单易懂,贴合实际