RocketMQ topic路由
为什么80%的码农都做不了架构师?>>>
// 定时从Name Server获取Topic路由信息
this . scheduledExecutorService .scheduleAtFixedRate( new Runnable() {
@Override
public void run() {
try {
MQClientInstance. this .updateTopicRouteInfoFromNameServer();
}
catch (Exception e) {
log .error( "ScheduledTask updateTopicRouteInfoFromNameServer exception" , e);
}
}
}, 10, this . clientConfig .getPollNameServerInteval(), TimeUnit. MILLISECONDS );
|
public boolean updateTopicRouteInfoFromNameServer( final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
try {
if ( this . lockNamesrv .tryLock( LockTimeoutMillis , TimeUnit. MILLISECONDS )) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null ) {
//此处省略不必要的信息,对于消费者,分支不会走到这里来,因为 isDefault为false,且生产者肯定为空
}
else {
topicRouteData =
this . mQClientAPIImpl .getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
//此处省略无关语句
}
catch (Exception e) {
if (!topic.startsWith(MixAll. RETRY_GROUP_TOPIC_PREFIX )
&& !topic.equals(MixAll. DEFAULT_TOPIC )) {
log .warn( "updateTopicRouteInfoFromNameServer Exception" , e);
}
}
finally {
this . lockNamesrv .unlock();
}
}
else {
log .warn( "updateTopicRouteInfoFromNameServer tryLock timeout {}ms" , LockTimeoutMillis );
}
}
catch (InterruptedException e) {
log .warn( "updateTopicRouteInfoFromNameServer Exception" , e);
}
return false ;
}
|
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode. GET_ALL_TOPIC_LIST_FROM_NAMESERVER , null );
RemotingCommand response = this . remotingClient .invokeSync( null , request, timeoutMillis);
|
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException {
//这里获取连接,该方法里面会做连接的检查和恢复
final Channel channel = this .getAndCreateChannel(addr);
//最后如果还是不是有效连接,则关闭连接,抛出异常
if (channel != null && channel.isActive()) {
try {
if ( this . rpcHook != null ) {
this . rpcHook .doBeforeRequest(addr, request);
}
RemotingCommand response = this .invokeSyncImpl(channel, request, timeoutMillis);
if ( this . rpcHook != null ) {
this . rpcHook .doAfterResponse(request, response);
}
return response;
}
catch (RemotingSendRequestException e) {
log .warn( "invokeSync: send request exception, so close the channel[{}]" , addr);
this .closeChannel(addr, channel);
throw e;
}
catch (RemotingTimeoutException e) {
log .warn( "invokeSync: wait response timeout exception, the channel[{}]" , addr);
// 超时异常如果关闭连接可能会产生连锁反应
// this.closeChannel( addr, channel);
throw e;
}
}
else {
this .closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
|
private Channel getAndCreateChannel ( final String addr) throws InterruptedException {
//无论是producer还是consumer,传进来的 addr参数都是null
if ( null == addr)
return getAndCreateNameserverChannel();
//因为客户端传入的addr是null,所以客户端不会走到这里来,只有broker才会走到这里来,因为broker传入的addr不为null
ChannelWrapper cw = this . channelTables .get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
//注意,如果和某个addr的连接不OK了,则再向该nameserver发起重连
return this .createChannel(addr);
}
|
String addr = this . namesrvAddrChoosed .get();
if (addr != null ) {
ChannelWrapper cw = this . channelTables .get(addr);
//注意这里,虽然长连接已经建立了,但是每次调用时,仍然要通过“ cw != null && cw.isOK()”检查连接是否OK。
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
|
addr = this . namesrvAddrChoosed .get();
if (addr != null ) {
ChannelWrapper cw = this . channelTables .get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
|
if (addrList != null && !addrList.isEmpty()) {
for ( int i = 0; i < addrList.size(); i++) {
int index = this . namesrvIndex .incrementAndGet();
index = Math. abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this .namesrvAddrChoosed.set(newAddr);
Channel channelNew = this .createChannel(newAddr);
if (channelNew != null )
return channelNew;
}
}
|
转载于:https://my.oschina.net/boltwu/blog/473025
RocketMQ topic路由相关推荐
- RocketMQ Topic/Group/Tags介绍
Topic 功能介绍 Topic是RocketMQ里对消息的一级归类. RocketMQ通过Topic完成消息的发布和订阅.消息生产者将消息发送到Topic中,而消息消费者则通过订阅该Topic来消费 ...
- RocketMQ 消息路由解析——图解、源码级解析
- RocketMQ:NameServer路由管理源码分析
文章目录 NameServer路由管理 1.前言 2.路由元信息 3.路由注册 3.1Broker路由注册 3.2NameServer处理路由注册 3.3路由删除 3.3.1Broker异常关闭 3. ...
- RocketMQ特性、专业术语(Producer,Producer Group,Consumer Group,Topic,Message,Tag,Broker,Name Server)等
RocketMQ 是什么 Github 上关于 RocketMQ 的介绍:RcoketMQ 是一款低延迟.高可靠.可伸缩.易于使用的消息中间件.具有以下特性: 支持发布/订阅(Pub/Sub)和点对点 ...
- activemq后台管理 看topic消息_17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列...
作者:28cm不含头(来自:知乎) 原文链接: https://www.zhihu.com/question/43557507 一.资料文档 二.开发语言 三.支持的协议 四.消息存储 五.消息事务 ...
- RocketMQ自动创建topic原理-TBW102
自动创建Topic原理介绍 RocketMQ在发送消息的时候,我们一般会先去Broker创建Topic信息,Producer在发送消息的时候会先去nameSrv拉取Topic信息,那么如果拉取不到 ...
- RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic
此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...
- RocketMQ—Producer(二)路由动态更新
一.Producer路由信息 从NameServer章节分析得知,路由信息存储在NameServer,生产端和消费端定时向NameServer获取topic相关的路由信息: 从生产者启动流程得知: 路 ...
- RocketMQ快速入门之手动创建topic
package cn.learn.rocketmq.topic;import org.apache.rocketmq.client.producer.DefaultMQProducer;public ...
最新文章
- 【算法与数据结构】在n个数中取第k大的数(基础篇)
- 终于看腻了黄色!让它五彩斑斓起来!
- IsWindow,findwindow
- 【渝粤教育】国家开放大学2018年秋季 0314-21T兽医基础 参考试题
- mysql 从服务器同步设置_mysql主从同步配置
- Xpath语法学习记录
- 阿里腾讯前端一面小结
- 算法导论 资源 课后答案 PDF
- 《七哥说道》第八章:约在410,北漂可还行?
- Tomcat使用总结
- zuiqingchun4
- img实现图片加载前默认图片,加载时替换真实图片,加载失败时替换加载失败图片
- spring @annotation 注解
- 微信小程序wepy框架快速入门
- 常见的网络连接设备有哪些?
- WPF 触发器Triggers
- 多层神经元感知器模型_使用多层感知器模型对星系进行分类
- AV1代码学习:av1_foreach_transformed_block_in_plane函数
- 给你的热图挑选一个合适的渐变色
- 蓝牙nrf52832的架构和开发
热门文章
- python字符复制函数是啥_Python最全的字符和字符串函数,直接复制到IDLE或另存为py可以运行...
- oracle 9i sql_id,Oracle9i增添 wm_concat函数(转)
- java和opencv配置_Java——OpenCVWindows配置和项目中jar包的简单配置
- java swing事件监听_Java swing(awt):事件监听机制的实现原理+简单示例
- java 解析csv 乱码_Java采用opencsv解析csv文件以及解析中文乱码问题
- UML工具:EA(Enterprise Architect)
- 浙大网新实训项目介绍
- [JS] 修改Navigator对象
- Python Demo 02 蒙特卡罗方法输出e
- elasticsearch api中的Buckets(桶)及Metrics(指标)