rocketmq 消息指定_RocketMq 实际案例–普通消息的发送
RocketMq 实际案例–普通消息的发送
@(消息中间件)[RocketMq 实例]
学习 rocketMq 最根本的是要先学会用嘛,在创建 rocketMq 的第一个案例的时候,碰到很多坑,可以记录下。
第一步:启动 namesrv 和 broker
官方文档未告知的坑:
问题 1: 如果在单机环境下,没有 Docker 的部署环境,rocketMq 是会默认走 VIPChannel,这时候会在我们发送消息的时候报
rocketMqb 报错.png
解决方法: 修改 bin/conf 中的 broker.conf 文件,指定 namesrvAdd 和 brokerIp
问题二: 在实际发送过程中,会出现无法自动创建不存在 topic 错误
解决方法: nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true & 启动 broker 的时候要加上 autoCreateTopicEnable=true
第二步:启动 rocketMq 的控制台
对于启动控制台,是方便我们的查看。控制台的原码在 github 中是一个扩展项目https://github.com/apache/rocketmq-externals
项目本身是 spring-boot 的实现,所以之间启动 APP.java 即可(后面需要学习 spring-boot 和 rocketMq 的整合)
第三步:编写我们的生产者和消费者
Producer (组)
package com.zzjmay.rocketMq.producer;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* 生产者
* Created by zzjmay on 2018/1/14.
*/
public class RocketProducer2 {
public static void main(String[] args) {
//1.创建一个生产者,需要指定 Producer 的分组,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Jony-P");
//2.设置命名服务的地址,默认是去读取 conf 文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("127.0.0.1:9876");
try{
//3.启动生产者
defaultMQProducer.start();
for(int i=0;i<10;i++) {
String text = "你好呀,ALIBABA----"+i;
//4.最基本的生产模式 topic+文本信息
Message msg = new Message("topic_orderCreate", "Tag-B", text.getBytes());
//5.获取发送响应
SendResult sendResult = defaultMQProducer.send(msg);
System.out.println("###发送结果 result:" + JSON.toJSONString(sendResult));
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.释放生产者
defaultMQProducer.shutdown();
}
}
}
Consumer (组)
package com.zzjmay.rocketMq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.Date;
import java.util.List;
/**
* 消费者
* Created by zzjmay on 2018/1/14.
*/
public class RocketConsumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-g");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
//第二个参数表示消费匹配的 tag * 表示 topic 所有的 tag
consumer.subscribe("topic_orderCreate","Tag-B");
//2. 注册消费者监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* msgs 表示消息体
* @param msgs
* @param context
* @return
*/
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
for(MessageExt messageExt:msgs){
try {
System.out.println(new Date()+new String(messageExt.getBody(),"UTF-8"));
}catch (Exception e){
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//3.consumer 启动
consumer.start();
System.out.println("消费端起来了哈.........");
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
最后查看效果在 console
监控图
集群信息
作者:zzjmay
链接:https://www.jianshu.com/p/94473cb87e0b
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:RocketMq 实际案例–普通消息的发送
rocketmq 消息指定_RocketMq 实际案例–普通消息的发送相关推荐
- rocketmq 消息指定_rocketmq-常见问题总结(消息的顺序、重复、消费模式)
参考: http://www.cnblogs.com/wxd0108/p/6038543.html https://www.cnblogs.com/520playboy/p/6750023.html ...
- Rabbitmq消息保存机制应用案例分析消息可靠性保证
Rabbitmq 消息保存机制 mandatory参数和immediate参数作用 mandatory:当参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,Rabbitmq ...
- rocketmq 消息 自定义_RocketMQ的消息发送及消费
RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...
- rocketmq 消息指定_SpringBoot 整合 RocketMQ 如何实现消息生产消费?
有时候我们在使用消息队列的时候,往往需要能够保证消息的顺序消费,而RocketMQ是可以支持消息的顺序消费的. RocketMQ在发送消息的时候,是将消息发送到不同的队列中,然后消费端从多个队列中读取 ...
- rocketmq 消息删除_RocketMQ 实现分布式事务,达到数据最终一致性
作者:江之北来源:https://www.jianshu.com/p/296e0de1b52e 前言 在分布式环境下,经常会有跨服务的事务需求,典型的例子如: 服务A 为账户服务,服务B为包月服务,在 ...
- rocketmq 消息指定_详解RocketMQ不同类型的消费者
原标题:详解RocketMQ不同类型的消费者 云栖君导读:本文节选自云栖社区系列丛书<RocketMQ原理与实战解析>,作者:阿里巴巴数据专家杨开元.本节将重点讲解RocketMQ不同类型 ...
- rocketmq 消息指定_闲话RocketMQ
一.简介 Apache RocketMQ是阿里开源的一款高性能.高吞吐量的分布式消息中间件,具有高性能.高可靠.高实时.分布式特点. 能够保证严格的消息顺序,提供丰富的消息拉取模式. 高效的订阅者水平 ...
- rocketmq 消息指定_进大厂必备的RocketMQ你会吗?
点击关注"故里学Java" 右上角"设为星标"好文章不错过 关于消息队列,相信大家都不陌生,现在的中大型项目中或多或少都有使用到消息队列,对于消息队列大家可能都 ...
- rocketmq python消息堆积_RocketMQ消息存储和查询原理
前言 RocketMQ 作为一款优秀的分布式消息中间件,可以为业务方提供高性能低延迟的稳定可靠的消息服务.其核心优势是可靠的消费存储.消息发送的高性能和低延迟.强大的消息堆积能力和消息处理能力. 从存 ...
最新文章
- android 线性布局位置,android – 如何在线性布局中更改视图的位置.
- JupyterLab 3.0发布!
- 黑莓作为猫带笔记本上网
- Qt多线程中的信号与槽
- 计算机硬件系统的ppt,计算机硬件系统.ppt
- windows文件路径 正则表达式_Windows非常实用的四款软件
- Ubuntu 启动自动登录
- 【转】AI-900认证考试攻略
- java 替换多个字符串_Java一次(或以最有效的方式)替换字符串中的多个不同子字符串...
- 基于SpringBoot/SSM的旅游论坛
- 大数据实战:如何实时采集上亿级别数据?
- 【Django BUG 已解决】You must either define the environment variable DJANGO_SETTINGS_MODULE or call ...
- 1934 贝茜放慢脚步(二路归并)
- 服务器win10系统开机慢,win10专业版系统开机启动慢 三种方法帮你敲定
- 小白学JS,利用JavaScripty验证通过15位和18位身份证验证性别
- 一行代码实现呼出热键
- 程序员都应该知道的福利
- 三维地图(3D地图)离线地图开发发布时间:2020-03-03 版权:
- 【干货书】图神经网络导论,清华大学刘知远老师著作
- 什么是管理能力,管理者的品格有哪些