RabbitMQ生产者和消费者Java实现
添加Maven依赖:
使用rabbitmq-client的最新Maven坐标:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version>
</dependency>
添加账户
默认情况下,访问RabbitMQ服务的用户名和密码都是“guest”,这个账号有限制,默认只能通过本地网络(如localhost)访问,远程网络访问受限,所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。
添加新用户,用户名为“zifeiy”,密码为“passwd”:
rabbitmqctl add_user zifeiy passwd
为zifeiy用户设置所有权限:
rabbitmqctl set_permissions -p / zifeiy ".*" ".*" ".*"
设置用户zifeiy为管理员角色:
rabbitmqctl set_user_tags zifeiy administrator
计算机的世界是从“Hello World!”开始的,这里我们也沿用惯例,首先生产者发送一条消息”Hello World!“至RabbitMQ中,之后由消费者消费。
下面先演示生产者客户端的代码,然后再演示消费者客户端的代码。
生产者客户端代码
import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class RabbitProducer {private static final String EXCHANGE_NAME = "exchange_demo";private static final String ROUTING_KEY = "routingkey_demo";private static final String QUEUE_NAME = "queue_demo";private static final String IP_ADDRESS = "127.0.0.1";private static final int PORT = 5672; // RabbitMQ服务端默认端口号为5672public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(IP_ADDRESS);factory.setPort(PORT);factory.setUsername("zifeiy");factory.setPassword("passwd");Connection connection = factory.newConnection(); // 建立连接Channel channel = connection.createChannel(); // 创建信道// 创建一个type="direct"、持久化的、非自动删除的交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);// 创建一个持久化、非排他的、非自动删除的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 将交换器和队列通过路由绑定channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);// 发送一条持久化的消息:hello world!String message = "hello,world!";channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// 关闭资源channel.close();connection.close();}
}
运行。
消费者客户端代码
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Connection;public class RabbitConsumer {private static final String QUEUE_NAME = "queue_demo";private static final String IP_ADDRESS = "127.0.0.1";private static final int PORT = 5672;public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Address[] addresses = new Address[] {new Address(IP_ADDRESS, PORT)};ConnectionFactory factory = new ConnectionFactory();factory.setUsername("zifeiy");factory.setPassword("passwd");// 这里的连接方式与生产者的demo略有不同,注意区分Connection connection = factory.newConnection(addresses); // 创建连接final Channel channel = connection.createChannel(); // 创建信道channel.basicQos(64); // 设置客户端最多接受未被ack的消息的个数Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv message: " + new String(body));try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, consumer);// 等待回调函数执行完毕后,关闭资源TimeUnit.SECONDS.sleep(5);channel.close();connection.close();}
}
运行,命令行输出如下:
recv message: hello,world!
RabbitMQ生产者和消费者Java实现相关推荐
- rabbitmq 生产者和消费者
生产者 下面展示一些 内联代码片. import java.util.Date; import java.util.HashMap; import java.util.Map;import com.d ...
- Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)
1. 安装 rabbitmq 的 golang 包 golang 可使用库 github.com/streadway/amqp 操作 rabbitmq .使用下面命令安装 RabbitMQ . go ...
- kafka生产者、消费者java示例
1. 生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.Ke ...
- rabbitmq生产者和消费者
如果你曾经在工作中使用过网络软件,脑海中应该会有客户端和服务器端的概念.不管是浏览器和Web服务器,还是应用程序和MySQL服务器,都是其中一方发送请求,而另一方服务这些请求.你可以将其视为快餐车模式 ...
- Spring Cloud Stream与RabbitMQ 生产者和消费者位于同一个应用服务
第一种模型:交换机类型为topic,路由key为"#",这是简单的使用模型 当前Spring Cloud Rabbit的版本为2.1.2 <dependency>< ...
- SpringBoot整合RabbitMQ(包含生产者和消费者)
生产者 创建一个SpringBoot项目springboot-producer,作为RabbitMQ的生产者. 在pom文件中引入相关的依赖坐标 <dependency><group ...
- mq多个消费者消费一个消息_消息中间件——RabbitMQ(五)快速入门生产者与消费者...
求关注 快速入门生产者与消费者,SpringBoot整合RabbitMQ! 前言 本章我们来一次快速入门RabbitMQ--生产者与消费者.需要构建一个生产端与消费端的模型.什么意思呢?我们的生产者发 ...
- RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器
本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...
- RabbitMQ(三) HelloWorld 单生产者单消费者示例实现
一.创建Maven工程,引入RabbitMQ依赖. pom.xml 如下: <?xml version="1.0" encoding="UTF-8"?&g ...
最新文章
- 安装sql2008 enterprise (English正式版)图解
- h5 时间控件问题,怎么设置type =datetime-local 的值
- Developer Express .Net 2005 V7.2.1 crack
- c#之using关键字
- python动态表情包下载_Python从eif中导出qq表情的gif图片
- 沉得住气的程序员们!
- table表格for循环绑定数据_.NET MVC 页面表格绘制
- 单链表的逆置-C++实现(ok)
- 计算机组成原理pdf在线阅读,计算机组成原理计算机组成原理.pdf
- Nooploop空循环 TOFSense激光测距传感器 模块 红外测距测高
- 如何关闭勒索病毒端口
- 初学JAVA项目(四、魔域:文字RPG游戏)
- 智能配电系统监控解决方案在长白山机场配电工程的研究与应用
- 华为scp快充协议详解_华为SCP快充技术曝光:支持“电荷泵”技术,最高可达20W...
- 自己总结的常见命令(用过的)
- 初中级前端程序员面试中小型公司会问哪些问题?
- Teamcity打包发布的springboot 项目 ,访问swagger 报Whitelabel Error Page。
- Word中将一级标题设置为段前一行与段后一行时,不显示段前一行怎么办?
- 最近三年的百度产品经理面试与笔试题完整版
- physxloader.dll x86_PhysXLoader.dll,下载,简介,描述,修复,等相关问题一站搞定_DLL之家...
热门文章
- hive insert into values 没反应_再遇死锁insert语句导致的死锁
- Leetcode每日一题:234.palindrome-linked-list(回文链表)
- Java-包、权限修饰符final、static
- 蓝桥杯 入门训练 A+B问题进阶版(两个3000位数的加法)
- 第十一周学习进度报告
- 编写一个生成器需要编写__iter__和__next__
- php中js验证表单,js实现表单验证
- php整么去掉时间的年月日,php强大的时间转换函数strtotime
- java中获取路径_java中获取路径的几种基本的方法
- 若依如何调整首页左侧菜单栏宽度