pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>dym-rocketmq-test</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency></dependencies>
</project>

Producer.java

package com.dym.simple;//发送消息import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {//1. 谁来发?DefaultMQProducer producer = new DefaultMQProducer("group1");//2. 发给谁?producer.setNamesrvAddr("localhost:9876");producer.start();// 3. 怎么发?// 4. 发什么?String msg="hello rocketmq";Message message = new Message("topic1", "tag1", msg.getBytes());SendResult sendResult = producer.send(message);// 5. 发的结果是什么?System.out.println(sendResult);//6. 打扫战场producer.shutdown();}
}

Consumer.java

package com.dym.simple;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException {//1. 谁来收DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2. 从哪里收消息consumer.setNamesrvAddr("localhost:9876");//3. 监听哪个消息队列consumer.subscribe("topic1","*");//4. 处理业务流程  注册监听器consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {//写我们的业务逻辑for(MessageExt msg:msgs){System.out.println(msg);byte[] body = msg.getBody();System.out.println(new String(body));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("消费者起起来了");}
}

 

RocketMQ--生产者与消费者的简单示例相关推荐

  1. 简单的RocketMQ生产者和消费者的开发示例demo(二)

    上一篇文章介绍了RocketMQ双Master+双Slave集群+可视化控制台环境搭建,这篇主要介绍一下Producer和Consumer的简单开发示例 Producer 创建个SpringBoot项 ...

  2. [Dubbo开发]配置简单的生产者和消费者

    配置好jdk1.7.Zookeeper和Maven环境之后,开始尝试第一次搭建简单的dubbo生产者和消费者. dubbo服务的原理,引用经典的官方图(cr.Dubbo官网): 关于Dubbo的原理和 ...

  3. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  4. java最简单的kafka生产者和消费者,未结合spring

    目录 1 最简单的生产者和消费者 1.1 引入maven 1.2 基本的生产者和代码注释 1.3 最简单消费者 2 生产者发送消息的三种方式 2.1 直接send之后就不管了,会自动重试,可能丢失消息 ...

  5. kafka查看broker上主副本_Kafka基础(一):基本概念及生产者、消费者示例

    本文章大部分内容均摘自 朱忠华老师的<深入理解Kafka:核心设计与实践原理>,也特别推荐广大读者购买阅读. 一.概述 1. 简介 Kafka 起初是由 LinkedIn 公司采用 Sca ...

  6. 计算机操作系统生产者和消费者模型的简单介绍

    同步互斥小口诀 画图理解题目 判断题目类型 分析进程数目 填写进程模板 补充基本代码(伪代码) 补充PV代码 检查调整代码 注意事项 代码是一步一步写出来的,代码是反复调整写出来的 60%是生产者和消 ...

  7. kafka的c/c++高性能客户端librdkafka简介/使用librdkafka的C++接口实现简单的生产者和消费者

    Librdkafka是c语言实现的apachekafka的高性能客户端,为生产和使用kafka提供高效可靠的客户端,并且提供了c++接口 性能: Librdkafka 是一款专为现代硬件使用而设计的高 ...

  8. 多线程-线程间通信-多生产者多消费者示例

    1.多线程-线程间通信-多生产者多消费者问题 多生产者和多消费者.等待唤醒机制. 产生了两个问题: 1.出现了多次连续生产,未消费,或者一个商品被消费多次. 解决:必须要--------每一个被唤醒的 ...

  9. Python中的生产者与消费者模式(转载)

    利用多线程和队列可以实现生产者消费者模式.该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度. 1.什么是生产者和消费者? 在线程世界里,生产者就是生产数据(或者说发布任务)的线程 ...

最新文章

  1. 电子计算机发展迅,ENIAC问世以来的短短的四十多年中,电子计算机的发展异常迅速...
  2. 51单片机怎么学啊?有推荐的网课和书籍么?
  3. R语言dataframe(data.table)使用用最近的前一个非NA值向前填充缺失值NA实战
  4. 无网络服务器(linux ubuntu),pip安装python科学计算所有需要包(packages)
  5. app 注册防刷 php,手机验证码设计和防刷制度
  6. Boost:bimap便利性标题的测试程序
  7. 2009编程语言排名
  8. 全球各国家.INFO域名注册量统计:中国排名第八
  9. an7062个引脚工作电压_马兰士PM711AV功放电路原理分析
  10. Pytorch——常用的神经网络层、激活函数
  11. webassembly环境搭建、编译h265解码器、js调用
  12. Stream.of()用法示例
  13. 华为鲲鹏泰山服务器系统安装,鲲鹏处理器正式商用:浙江移动营业厅用上华为泰山服务器...
  14. 身份证阅读器(读卡器)谷歌Chrome和火狐Firefox浏览器端网页开发接口控件分享
  15. 【Aminer论文精读训练营】Aminer第二期推荐的5篇论文
  16. 内存泄漏检测工具asan
  17. 通信手机术语:什么是IMEI IMEI串号组成
  18. 国家信息系统安全等级保护基本要求——等保一级、二级、三级、四级内容
  19. 应用程序0xc000007b无法正常启动,win10系统亲测有效
  20. minio安装部署及使用

热门文章

  1. BeanCreationException: Error creating bean with name 'dataSource' defined in class path resource [
  2. Load data local infile 实验报告
  3. springboot项目打包运行
  4. java中getClass().getResourceAsStream()与getClass().getClassLoader().getResourceAsStream()的区别
  5. 更改Firefox为中文界面(Ubuntu系统)
  6. MFC添加背景图片方法
  7. 将SVN与BUG跟踪管理集成
  8. S3C6410 KeyPad驱动(下)
  9. mounty不可重新挂载因为先前没有完全卸载_【译】React Hooks测试完全指南
  10. Equinix公司在巴西圣保罗开通了一个数据中心