RocketMq 实际案例–普通消息的发送

@(消息中间件)[RocketMq 实例]

学习 rocketMq 最根本的是要先学会用嘛,在创建 rocketMq 的第一个案例的时候,碰到很多坑,可以记录下。

第一步:启动 namesrv 和 broker

官方文档未告知的坑:

问题 1: 如果在单机环境下,没有 Docker 的部署环境,rocketMq 是会默认走 VIPChannel,这时候会在我们发送消息的时候报

rocketMqb 报错.png

解决方法: 修改 bin/conf 中的 broker.conf 文件,指定 namesrvAdd 和 brokerIp

问题二: 在实际发送过程中,会出现无法自动创建不存在 topic 错误

解决方法: nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true & 启动 broker 的时候要加上 autoCreateTopicEnable=true

第二步:启动 rocketMq 的控制台

对于启动控制台,是方便我们的查看。控制台的原码在 github 中是一个扩展项目https://github.com/apache/rocketmq-externals

项目本身是 spring-boot 的实现,所以之间启动 APP.java 即可(后面需要学习 spring-boot 和 rocketMq 的整合)

第三步:编写我们的生产者和消费者

Producer (组)

package com.zzjmay.rocketMq.producer;

import com.alibaba.fastjson.JSON;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

/**

* 生产者

* Created by zzjmay on 2018/1/14.

*/

public class RocketProducer2 {

public static void main(String[] args) {

//1.创建一个生产者,需要指定 Producer 的分组,

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Jony-P");

//2.设置命名服务的地址,默认是去读取 conf 文件下的配置文件 rocketmq.namesrv.addr

defaultMQProducer.setNamesrvAddr("127.0.0.1:9876");

try{

//3.启动生产者

defaultMQProducer.start();

for(int i=0;i<10;i++) {

String text = "你好呀,ALIBABA----"+i;

//4.最基本的生产模式 topic+文本信息

Message msg = new Message("topic_orderCreate", "Tag-B", text.getBytes());

//5.获取发送响应

SendResult sendResult = defaultMQProducer.send(msg);

System.out.println("###发送结果 result:" + JSON.toJSONString(sendResult));

}

}catch (Exception e){

e.printStackTrace();

}finally {

//6.释放生产者

defaultMQProducer.shutdown();

}

}

}

Consumer (组)

package com.zzjmay.rocketMq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.Date;

import java.util.List;

/**

* 消费者

* Created by zzjmay on 2018/1/14.

*/

public class RocketConsumer {

public static void main(String[] args) {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-g");

consumer.setNamesrvAddr("127.0.0.1:9876");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

try {

//第二个参数表示消费匹配的 tag * 表示 topic 所有的 tag

consumer.subscribe("topic_orderCreate","Tag-B");

//2. 注册消费者监听

consumer.registerMessageListener(new MessageListenerConcurrently() {

/**

* msgs 表示消息体

* @param msgs

* @param context

* @return

*/

public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {

for(MessageExt messageExt:msgs){

try {

System.out.println(new Date()+new String(messageExt.getBody(),"UTF-8"));

}catch (Exception e){

e.printStackTrace();

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

//3.consumer 启动

consumer.start();

System.out.println("消费端起来了哈.........");

} catch (MQClientException e) {

e.printStackTrace();

}

}

}

最后查看效果在 console

监控图

集群信息

作者:zzjmay

链接:https://www.jianshu.com/p/94473cb87e0b

来源:简书

简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权

转载请注明原文链接:RocketMq 实际案例–普通消息的发送

rocketmq 消息指定_RocketMq 实际案例–普通消息的发送相关推荐

  1. rocketmq 消息指定_rocketmq-常见问题总结(消息的顺序、重复、消费模式)

    参考: http://www.cnblogs.com/wxd0108/p/6038543.html https://www.cnblogs.com/520playboy/p/6750023.html ...

  2. Rabbitmq消息保存机制应用案例分析消息可靠性保证

    Rabbitmq 消息保存机制 mandatory参数和immediate参数作用 mandatory:当参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,Rabbitmq ...

  3. rocketmq 消息 自定义_RocketMQ的消息发送及消费

    RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...

  4. rocketmq 消息指定_SpringBoot 整合 RocketMQ 如何实现消息生产消费?

    有时候我们在使用消息队列的时候,往往需要能够保证消息的顺序消费,而RocketMQ是可以支持消息的顺序消费的. RocketMQ在发送消息的时候,是将消息发送到不同的队列中,然后消费端从多个队列中读取 ...

  5. rocketmq 消息删除_RocketMQ 实现分布式事务,达到数据最终一致性

    作者:江之北来源:https://www.jianshu.com/p/296e0de1b52e 前言 在分布式环境下,经常会有跨服务的事务需求,典型的例子如: 服务A 为账户服务,服务B为包月服务,在 ...

  6. rocketmq 消息指定_详解RocketMQ不同类型的消费者

    原标题:详解RocketMQ不同类型的消费者 云栖君导读:本文节选自云栖社区系列丛书<RocketMQ原理与实战解析>,作者:阿里巴巴数据专家杨开元.本节将重点讲解RocketMQ不同类型 ...

  7. rocketmq 消息指定_闲话RocketMQ

    一.简介 Apache RocketMQ是阿里开源的一款高性能.高吞吐量的分布式消息中间件,具有高性能.高可靠.高实时.分布式特点. 能够保证严格的消息顺序,提供丰富的消息拉取模式. 高效的订阅者水平 ...

  8. rocketmq 消息指定_进大厂必备的RocketMQ你会吗?

    点击关注"故里学Java" 右上角"设为星标"好文章不错过 关于消息队列,相信大家都不陌生,现在的中大型项目中或多或少都有使用到消息队列,对于消息队列大家可能都 ...

  9. rocketmq python消息堆积_RocketMQ消息存储和查询原理

    前言 RocketMQ 作为一款优秀的分布式消息中间件,可以为业务方提供高性能低延迟的稳定可靠的消息服务.其核心优势是可靠的消费存储.消息发送的高性能和低延迟.强大的消息堆积能力和消息处理能力. 从存 ...

最新文章

  1. android 线性布局位置,android – 如何在线性布局中更改视图的位置.
  2. JupyterLab 3.0发布!
  3. 黑莓作为猫带笔记本上网
  4. Qt多线程中的信号与槽
  5. 计算机硬件系统的ppt,计算机硬件系统.ppt
  6. windows文件路径 正则表达式_Windows非常实用的四款软件
  7. Ubuntu 启动自动登录
  8. 【转】AI-900认证考试攻略
  9. java 替换多个字符串_Java一次(或以最有效的方式)替换字符串中的多个不同子字符串...
  10. 基于SpringBoot/SSM的旅游论坛
  11. 大数据实战:如何实时采集上亿级别数据?
  12. 【Django BUG 已解决】You must either define the environment variable DJANGO_SETTINGS_MODULE or call ...
  13. 1934 贝茜放慢脚步(二路归并)
  14. 服务器win10系统开机慢,win10专业版系统开机启动慢 三种方法帮你敲定
  15. 小白学JS,利用JavaScripty验证通过15位和18位身份证验证性别
  16. 一行代码实现呼出热键
  17. 程序员都应该知道的福利
  18. 三维地图(3D地图)离线地图开发发布时间:2020-03-03 版权:
  19. 【干货书】图神经网络导论,清华大学刘知远老师著作
  20. 什么是管理能力,管理者的品格有哪些

热门文章

  1. Xshell使用xftp传输文件,使用pure-ftpd搭建ftp服务
  2. 贝塞尔曲线与CAShapeLayer的关系以及Stroke动画
  3. mybatis学习笔记(7)-输出映射
  4. 1112-博客十大评论
  5. Google Breakpad 完全解析(二) —— Windows前台实现篇
  6. 基于springboot实现疫情管理系统
  7. 零基础Java学习之封装
  8. 零基础Java学习之类和对象
  9. python爬取页面链接
  10. 抽象类,虚方法,接口