系列

开篇

这个系列主要用以分析mqadmin常见的比较核心的几个命令,主要包括订阅分组和topic的创建和删除、Topic的权限变更。

这篇文章主要是用来分析Topic的创建和删除。

创建Topic

Topic创建的核心步骤如下

1、mqadmin向broker发起创建Topic的命令。

2、broker生成Topic对应的topicConfig配置保存在broker的TopicConfigManager中。

3、broker向所有的namesrv上报topicConfig信息。

4、namesrv的RouteInfoManager的topicQueueTable保存topic的QueueData信息。

5、broker会通过定时任务定期向namesrv发送心跳信息更新topic配置。

updateTopic

usage: mqadmin updateTopic -b | -c [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t

[-u ] [-w ]

-b,--brokerAddr create topic to which broker

-c,--clusterName create topic to which cluster

-h,--help Print help

-n,--namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876

-o,--order set topic's order(true|false)

-p,--perm set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]

-r,--readQueueNums set read queue nums

-s,--hasUnitSub has unit sub (true|false)

-t,--topic topic name

-u,--unit is unit topic (true|false)

-w,--writeQueueNums set write queue nums

通过 --brokerAddr在指定的broker创建topic。

通过 --clusterName在整个集群创建topic。

通过 --namesrvAddr指定namesrv地址。

通过 --topic来指定topic名称。

通过 --perm来指定Topic的权限管理。

UpdateTopicSubCommand

public class UpdateTopicSubCommand implements SubCommand {

@Override

public void execute(final CommandLine commandLine, final Options options,

RPCHook rpcHook) throws SubCommandException {

DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);

defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

try {

// 默认的topic的读写队列为8个

TopicConfig topicConfig = new TopicConfig();

topicConfig.setReadQueueNums(8);

topicConfig.setWriteQueueNums(8);

topicConfig.setTopicName(commandLine.getOptionValue('t').trim());

// readQueueNums

if (commandLine.hasOption('r')) {

topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));

}

// writeQueueNums

if (commandLine.hasOption('w')) {

topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));

}

// perm

if (commandLine.hasOption('p')) {

topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));

}

boolean isUnit = false;

if (commandLine.hasOption('u')) {

isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());

}

boolean isCenterSync = false;

if (commandLine.hasOption('s')) {

isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());

}

int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);

topicConfig.setTopicSysFlag(topicCenterSync);

boolean isOrder = false;

if (commandLine.hasOption('o')) {

isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());

}

topicConfig.setOrder(isOrder);

if (commandLine.hasOption('b')) {

String addr = commandLine.getOptionValue('b').trim();

defaultMQAdminExt.start();

defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);

if (isOrder) {

String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);

String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();

defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);

System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",

isOrder, orderConf.toString()));

}

System.out.printf("create topic to %s success.%n", addr);

System.out.printf("%s", topicConfig);

return;

} else if (commandLine.hasOption('c')) {

String clusterName = commandLine.getOptionValue('c').trim();

defaultMQAdminExt.start();

Set masterSet =

CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);

for (String addr : masterSet) {

defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);

System.out.printf("create topic to %s success.%n", addr);

}

if (isOrder) {

Set brokerNameSet =

CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);

StringBuilder orderConf = new StringBuilder();

String splitor = "";

for (String s : brokerNameSet) {

orderConf.append(splitor).append(s).append(":")

.append(topicConfig.getWriteQueueNums());

splitor = ";";

}

defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),

orderConf.toString(), true);

System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);

}

System.out.printf("%s", topicConfig);

return;

}

ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);

} catch (Exception e) {

throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);

} finally {

defaultMQAdminExt.shutdown();

}

}

}

Topic的默认的读写队列为8.

针对指定broker的场景,只在指定的broker机器创建Topic。

针对指定cluster的场景,获取集群下的所有broker并向全部的broker机器创建Topic。

mqadmin MQClientAPIImpl

public class MQClientAPIImpl {

public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,

final long timeoutMillis)

throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();

requestHeader.setTopic(topicConfig.getTopicName());

requestHeader.setDefaultTopic(defaultTopic);

requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());

requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());

requestHeader.setPerm(topicConfig.getPerm());

requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());

requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());

requestHeader.setOrder(topicConfig.isOrder());

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),

request, timeoutMillis);

assert response != null;

switch (response.getCode()) {

case ResponseCode.SUCCESS: {

return;

}

default:

break;

}

throw new MQClientException(response.getCode(), response.getRemark());

}

}

创建Topic的RequestCode为UPDATE_AND_CREATE_TOPIC。

broker AdminBrokerProcessor

public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {

private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,

RemotingCommand request) throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

final CreateTopicRequestHeader requestHeader =

(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);

if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {

String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";

log.warn(errorMsg);

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark(errorMsg);

return response;

}

if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {

return response;

}

try {

response.setCode(ResponseCode.SUCCESS);

response.setOpaque(request.getOpaque());

response.markResponseType();

response.setRemark(null);

ctx.writeAndFlush(response);

} catch (Exception e) {

log.error("Failed to produce a proper response", e);

}

// TopicConfig的创建配置

TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());

topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());

topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());

topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());

topicConfig.setPerm(requestHeader.getPerm());

topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());

// broker保存Topic的配置信息

this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);

// broker负责向namesrv上报topic信息。

this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());

return null;

}

}

创建TopicConfig对象并通过TopicConfigManager来进行保存。

通过brokerController#registerIncrementBrokerData来上报topic信息到namesrv。

broker TopicConfigManager

public class TopicConfigManager extends ConfigManager {

private static final long LOCK_TIMEOUT_MILLIS = 3000;

private transient final Lock lockTopicConfigTable = new ReentrantLock();

private final ConcurrentMap topicConfigTable =

new ConcurrentHashMap(1024);

private final DataVersion dataVersion = new DataVersion();

private final Set systemTopicList = new HashSet();

private transient BrokerController brokerController;

public void updateTopicConfig(final TopicConfig topicConfig) {

TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);

if (old != null) {

log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);

} else {

log.info("create new topic [{}]", topicConfig);

}

this.dataVersion.nextVersion();

this.persist();

}

}

通过topicConfigTable来保存Topic和对应的TopicConfig。

broker BrokerController

public class BrokerController {

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {

TopicConfig registerTopicConfig = topicConfig;

// 如果broker本身的存在不可读和不可写的权限,那么就以broker的读写权限为准

if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())

|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {

registerTopicConfig =

new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),

this.brokerConfig.getBrokerPermission());

}

ConcurrentMap topicConfigTable = new ConcurrentHashMap();

topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);

TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();

topicConfigSerializeWrapper.setDataVersion(dataVersion);

topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);

// 将topic信息注册到namesrv的逻辑

doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);

}

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,

TopicConfigSerializeWrapper topicConfigWrapper) {

// 向所有的namesrv发送topic的注册信息

List registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(

this.brokerConfig.getBrokerClusterName(),

this.getBrokerAddr(),

this.brokerConfig.getBrokerName(),

this.brokerConfig.getBrokerId(),

this.getHAServerAddr(),

topicConfigWrapper,

this.filterServerManager.buildNewFilterServerList(),

oneway,

this.brokerConfig.getRegisterBrokerTimeoutMills(),

this.brokerConfig.isCompressedRegister());

if (registerBrokerResultList.size() > 0) {

RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);

if (registerBrokerResult != null) {

if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {

this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());

}

this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

if (checkOrderConfig) {

this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());

}

}

}

}

}

registerIncrementBrokerData首先会检查broker本身的读写权限重新生成新的topicConfigSerializeWrapper。

通过brokerOuterAPI#registerBrokerAll向namesrv注册最新的topic信息。

broker BrokerOuterAPI

public class BrokerOuterAPI {

private RegisterBrokerResult registerBroker(

final String namesrvAddr,

final boolean oneway,

final int timeoutMills,

final RegisterBrokerRequestHeader requestHeader,

final byte[] body

) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,

InterruptedException {

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);

request.setBody(body);

// oneway=false,这个分支不会执行

if (oneway) {

try {

this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);

} catch (RemotingTooMuchRequestException e) {

// Ignore

}

return null;

}

// 向namesrv上报

RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

// 处理上报结果并同步主从

assert response != null;

switch (response.getCode()) {

case ResponseCode.SUCCESS: {

RegisterBrokerResponseHeader responseHeader =

(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);

RegisterBrokerResult result = new RegisterBrokerResult();

result.setMasterAddr(responseHeader.getMasterAddr());

result.setHaServerAddr(responseHeader.getHaServerAddr());

if (response.getBody() != null) {

result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));

}

return result;

}

default:

break;

}

throw new MQBrokerException(response.getCode(), response.getRemark());

}

}

向namesrv注册topic的RequestCode为REGISTER_BROKER。

broker向namesrv发起topic注册。

namesrv DefaultRequestProcessor

public class DefaultRequestProcessor implements NettyRequestProcessor {

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)

throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);

final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();

final RegisterBrokerRequestHeader requestHeader =

(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

if (!checksum(ctx, request, requestHeader)) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("crc32 not match");

return response;

}

RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

if (request.getBody() != null) {

try {

registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());

} catch (Exception e) {

throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);

}

} else {

registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));

registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);

}

// namesrv同时注册broker信息和TopicConfig信息

RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(

requestHeader.getClusterName(),

requestHeader.getBrokerAddr(),

requestHeader.getBrokerName(),

requestHeader.getBrokerId(),

requestHeader.getHaServerAddr(),

registerBrokerBody.getTopicConfigSerializeWrapper(),

registerBrokerBody.getFilterServerList(),

ctx.channel());

responseHeader.setHaServerAddr(result.getHaServerAddr());

responseHeader.setMasterAddr(result.getMasterAddr());

byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);

response.setBody(jsonValue);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

}

}

getRouteInfoManager().registerBroker()负责向namesrv注册broker信息。

namesrv RouteInfoManager

public class RouteInfoManager {

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap> topicQueueTable;

private final HashMap brokerAddrTable;

private final HashMap> clusterAddrTable;

private final HashMap brokerLiveTable;

private final HashMap/* Filter Server */> filterServerTable;

public RouteInfoManager() {

this.topicQueueTable = new HashMap>(1024);

this.brokerAddrTable = new HashMap(128);

this.clusterAddrTable = new HashMap>(32);

this.brokerLiveTable = new HashMap(256);

this.filterServerTable = new HashMap>(256);

}

public RegisterBrokerResult registerBroker(

final String clusterName,

final String brokerAddr,

final String brokerName,

final long brokerId,

final String haServerAddr,

final TopicConfigSerializeWrapper topicConfigWrapper,

final List filterServerList,

final Channel channel) {

RegisterBrokerResult result = new RegisterBrokerResult();

try {

try {

this.lock.writeLock().lockInterruptibly();

Set brokerNames = this.clusterAddrTable.get(clusterName);

if (null == brokerNames) {

brokerNames = new HashSet();

this.clusterAddrTable.put(clusterName, brokerNames);

}

brokerNames.add(brokerName);

boolean registerFirst = false;

BrokerData brokerData = this.brokerAddrTable.get(brokerName);

if (null == brokerData) {

registerFirst = true;

brokerData = new BrokerData(clusterName, brokerName, new HashMap());

this.brokerAddrTable.put(brokerName, brokerData);

}

Map brokerAddrsMap = brokerData.getBrokerAddrs();

//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>

//The same IP:PORT must only have one record in brokerAddrTable

Iterator> it = brokerAddrsMap.entrySet().iterator();

while (it.hasNext()) {

Entry item = it.next();

if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {

it.remove();

}

}

String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);

registerFirst = registerFirst || (null == oldAddr);

// 保存topicConfig的信息

if (null != topicConfigWrapper

&& MixAll.MASTER_ID == brokerId) {

if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())

|| registerFirst) {

ConcurrentMap tcTable =

topicConfigWrapper.getTopicConfigTable();

if (tcTable != null) {

for (Map.Entry entry : tcTable.entrySet()) {

this.createAndUpdateQueueData(brokerName, entry.getValue());

}

}

}

}

BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,

new BrokerLiveInfo(

System.currentTimeMillis(),

topicConfigWrapper.getDataVersion(),

channel,

haServerAddr));

if (null == prevBrokerLiveInfo) {

log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);

}

if (filterServerList != null) {

if (filterServerList.isEmpty()) {

this.filterServerTable.remove(brokerAddr);

} else {

this.filterServerTable.put(brokerAddr, filterServerList);

}

}

if (MixAll.MASTER_ID != brokerId) {

String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);

if (masterAddr != null) {

BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);

if (brokerLiveInfo != null) {

result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());

result.setMasterAddr(masterAddr);

}

}

}

} finally {

this.lock.writeLock().unlock();

}

} catch (Exception e) {

log.error("registerBroker Exception", e);

}

return result;

}

}

RouteInfoManager是namesrv的核心数据管理中心,broker上报Topic的过程中会同时更新broker的保活信息等。

核心的createAndUpdateQueueData负责注册topic配置信息。

namesrv createAndUpdateQueueData

public class RouteInfoManager {

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {

QueueData queueData = new QueueData();

queueData.setBrokerName(brokerName);

queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());

queueData.setReadQueueNums(topicConfig.getReadQueueNums());

queueData.setPerm(topicConfig.getPerm());

queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

List queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());

if (null == queueDataList) {

queueDataList = new LinkedList();

queueDataList.add(queueData);

this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);

log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);

} else {

boolean addNewOne = true;

Iterator it = queueDataList.iterator();

while (it.hasNext()) {

QueueData qd = it.next();

if (qd.getBrokerName().equals(brokerName)) {

if (qd.equals(queueData)) {

addNewOne = false;

} else {

log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,

queueData);

it.remove();

}

}

}

if (addNewOne) {

queueDataList.add(queueData);

}

}

}

}

负责创建QueueData并保存到topicQueueTable当中。

createAndUpdateQueueData会处理所有broker的上报,所以在处理QueueData过程中需要判断brokerName以处理对应的broker的QueueData。

定时上报 BrokerController

public class BrokerController {

public void start() throws Exception {

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

} catch (Throwable e) {

log.error("registerBrokerAll Exception", e);

}

}

}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

}

broker会定时向namesrv上报broker的信息并且包括topic的注册信息。

删除Topic

删除Topic的核心逻辑如下

1、mqadmin负责通知所有的broker删除topic对应的配置和消息文件。

2、mqadmin负责通知所有的namesrv删除topic对应的配置。

deleteTopic

usage: mqadmin deleteTopic -c [-h] [-n ] -t

-c,--clusterName delete topic from which cluster

-h,--help Print help

-n,--namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876

-t,--topic topic name

--topic指定待删除的topic信息。

--clusterName指定待删除Topic所在的集群信息。

mqadmin DeleteTopicSubCommand

public class DeleteTopicSubCommand implements SubCommand {

public static void deleteTopic(final DefaultMQAdminExt adminExt,

final String clusterName,

final String topic

) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {

Set brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(adminExt, clusterName);

adminExt.deleteTopicInBroker(brokerAddressSet, topic);

System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);

Set nameServerSet = null;

if (adminExt.getNamesrvAddr() != null) {

String[] ns = adminExt.getNamesrvAddr().trim().split(";");

nameServerSet = new HashSet(Arrays.asList(ns));

}

adminExt.deleteTopicInNameServer(nameServerSet, topic);

System.out.printf("delete topic [%s] from NameServer success.%n", topic);

}

@Override

public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {

DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);

adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

try {

String topic = commandLine.getOptionValue('t').trim();

if (commandLine.hasOption('c')) {

String clusterName = commandLine.getOptionValue('c').trim();

adminExt.start();

deleteTopic(adminExt, clusterName, topic);

return;

}

ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);

} catch (Exception e) {

throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);

} finally {

adminExt.shutdown();

}

}

}

deleteTopicInBroker负责当前集群的所有broker上的Topic信息。

deleteTopicInNameServer负责删除namesrv中的topic信息

mqadmin DefaultMQAdminExtImpl

public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {

public void deleteTopicInBroker(Set addrs,

String topic) throws RemotingException, MQBrokerException, InterruptedException,

MQClientException {

for (String addr : addrs) {

this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);

}

}

}

遍历所有broker依次执行deleteTopicInBroker操作。

mqadmin MQClientAPIImpl

public class MQClientAPIImpl {

public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)

throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();

requestHeader.setTopic(topic);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),

request, timeoutMillis);

assert response != null;

switch (response.getCode()) {

case ResponseCode.SUCCESS: {

return;

}

default:

break;

}

throw new MQClientException(response.getCode(), response.getRemark());

}

}

deleteTopicInBroker的RequestCode为DELETE_TOPIC_IN_BROKER。

broker AdminBrokerProcessor

public class AdminBrokerProcessor implements NettyRequestProcessor {

private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,

RemotingCommand request) throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

DeleteTopicRequestHeader requestHeader =

(DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);

log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

// 删除配置信息

this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());

// 删除消息存储文件

this.brokerController.getMessageStore()

.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

}

}

public class TopicConfigManager extends ConfigManager {

public void deleteTopicConfig(final String topic) {

// 从topicConfigTable中删除topic的配置信息

TopicConfig old = this.topicConfigTable.remove(topic);

if (old != null) {

log.info("delete topic config OK, topic: {}", old);

this.dataVersion.nextVersion();

this.persist();

} else {

log.warn("delete topic config failed, topic: {} not exists", topic);

}

}

deleteTopic操作包括删除broker上topic的配置信息和topic对应的消息存储文件。

mqadmin DefaultMQAdminExtImpl

public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {

@Override

public void deleteTopicInNameServer(Set addrs,

String topic) throws RemotingException, MQBrokerException, InterruptedException,

MQClientException {

if (addrs == null) {

String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();

addrs = new HashSet(Arrays.asList(ns.split(";")));

}

for (String addr : addrs) {

this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, topic, timeoutMillis);

}

}

}

public class MQClientAPIImpl {

public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis)

throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();

requestHeader.setTopic(topic);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

assert response != null;

switch (response.getCode()) {

case ResponseCode.SUCCESS: {

return;

}

default:

break;

}

throw new MQClientException(response.getCode(), response.getRemark());

}

}

遍历所有的namesrv并依次执行topic的删除。

requestCode为DELETE_TOPIC_IN_NAMESRV。

namesrv DefaultRequestProcessor

public class DefaultRequestProcessor implements NettyRequestProcessor {

private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,

RemotingCommand request) throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

final DeleteTopicInNamesrvRequestHeader requestHeader =

(DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);

this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

}

}

public class RouteInfoManager {

public void deleteTopic(final String topic) {

try {

try {

this.lock.writeLock().lockInterruptibly();

this.topicQueueTable.remove(topic);

} finally {

this.lock.writeLock().unlock();

}

} catch (Exception e) {

log.error("deleteTopic Exception", e);

}

}

}

namesrv的topic删除负责从RouteInfoManager移除topic对应的QueueData。

rocketmq 消息删除_RocketMq Topic创建和删除相关推荐

  1. rocketmq 消息 自定义_RocketMQ的消息发送及消费

    RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...

  2. rocketmq 消息 自定义_RocketMQ消息轨迹-设计篇

    RocketMQ 消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹-设计相关. RocketMQ消息轨迹,主要跟踪消息发送.消息消费的轨迹,即详细记录消息各个处理环 ...

  3. MySQL进阶11--DDL数据库定义语言--库创建/修改/删除--表的创建/修改/删除/复制

    /*进阶 11 DDL 数据库定义语言库和表的管理一:库的管理:创建/修改/删除二:表的管理:创建/修改/删除创建: CREATE DATABASE [IF NOT EXISTS] 库名;修改: al ...

  4. Conda 创建和删除虚拟环境Conda 创建和删除虚拟环境

    Conda 创建和删除虚拟环境 一.检验当前conda的版本 conda -V 二.conda常用的命令 1.查看已有的虚拟环境 conda env list 2.创建虚拟环境和删除虚拟环境 anac ...

  5. rocketmq 消息指定_RocketMq 实际案例–普通消息的发送

    RocketMq 实际案例–普通消息的发送 @(消息中间件)[RocketMq 实例] 学习 rocketMq 最根本的是要先学会用嘛,在创建 rocketMq 的第一个案例的时候,碰到很多坑,可以记 ...

  6. mysql 创建和删除库_Mysql创建和删除数据库

    很久没有使用命令行操作mysql,百度搜罗,重新整理,加强记忆,便于查询. 以下命令均在win7,64位操作系统下,cmd命令行操作: 一.创建1个mysql 数据库 方法1: 1. cmd下输入:m ...

  7. 怎么从mysql注册表删除用户_mysql 创建和删除用户

    1.远程登录mysql mysql -h ip -u root -p 密码 2.创建用户 格式:grant 权限 on 数据库.* to 用户名@登录主机 identified by "密码 ...

  8. rocketmq 消息 自定义_rocketmq中的自定义消息头

    在springboot中使用rocketmq的客户端,有2种方式. 一是使用 org.apache.rocketmq的rocketmq-client 二是,在pom.xml中引用 org.apache ...

  9. java删除表格_Java 创建、删除Word表格

    在Word文档中,我们可以通过添加表格的方式来帮助我们更加清晰.直观地分析和展示数据.本文将介绍如何使用Free Spire.Doc for Java组件来给Word文档创建表格,及删除文档中已有的表 ...

  10. mysql中删除用户_Mysql创建,删除用户

    MySql中添加用户,新建数据库,用户授权,删除用户,修改密码(注意每行后边都跟个;表示一个命令语句结束): 1.新建用户 登录MYSQL: @>mysql -u root -p @>密码 ...

最新文章

  1. 关于hadoop与jstl冲突的jar包
  2. java 日期检查_如何在Java中检查日期
  3. webpack从入门到精通(四)优化打包配置总结①
  4. hdu 3948(后缀数组+RMQ)
  5. 因为阿里,他们成了“杭漂”
  6. php-ml案例,AppML 案例原型
  7. 【数据结构】B树的理解
  8. require.context实现前端工程自动化
  9. 31天重构学习笔记下载
  10. ABPZero系列教程之拼多多卖家工具
  11. P2494 [SDOI2011]保密(网络流/最小割/01分数规划)
  12. 抖音回应“天价烤虾”事件:已经在调查违规广告主
  13. tasker运行java_Tasker 打开桌面快捷方式(以微信公众号为例)[No Root]
  14. JAVA偶数分解质数_优化后的寻找偶数是两个质数之和的JAVA代码
  15. ORA-01034: ORACLE not available ORA-27101: shared memory realm does not exist的原因分析
  16. ArcView GIS 应用与开发技术(1)-ViewTheme
  17. python自动生成采集规则_python 织梦自动采集更新脚本
  18. Pandas一键爬取解析代理IP与代理IP池的维护
  19. 免费开源商用级人脸识别库SeetaFace6使用教程(含人脸识别、口罩识别、活体识别、人眼状态判断、性别年龄识别等)
  20. 云网融合赋能智慧转型,“天翼云管 ”开启贴身云管家时代

热门文章

  1. Windows7系统怎么给文件夹加密?
  2. 【大咖说】激活数据价值,永洪科技智胜法则
  3. 【xtku】铜雀台张馨予xp主题_8.2
  4. 非常不错的垃圾删除批处理代码,用了10年不用安装清理软件
  5. 怎么修改CSDN的个性签名或者个人说明?
  6. Java实现格式化打印慢SQL日志
  7. Cursor的关闭问题
  8. 基于微信公众平台的智能硬件/智能设备APP开发详解
  9. OC中的分类与类扩展
  10. 异或运算——小卡和质数