RabbitMQ入门篇、介绍RabbitMQ常用的五种模式
RabbitMQ
认识RabbitMQ
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。
注意事项:
- 一个队列可以绑定到多个交换机上
RabbitMQ六种工作模式
RabbitMQ官网6种工作模式的介绍:https://www.rabbitmq.com/getstarted.html
- “Hello World”
- Work queue
- Publish/Subscribe
- Routing
- Topics
- RPC
RabbitMQ常用的工作模式使用
以下工作模式代码中共用代码
pom.xml使用到的依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version>
</dependency>
<dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version>
</dependency>
工具的封装(下面的工作模式中都用到了)
public class RabbitConstant {public static final String QUEUE_HELLOWORLD = "helloworld";public static final String QUEUE_SMS = "sms";public static final String EXCHANGE_WEATHER = "weather";public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";public static final String QUEUE_BAIDU = "baidu";public static final String QUEUE_SINA = "sina";public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitUtils {private static ConnectionFactory factory = new ConnectionFactory();static {factory.setHost("39.105.91.158");factory.setPort(5672);factory.setUsername("jack");factory.setPassword("123456");factory.setVirtualHost("/test");}public static Connection getConnection(){Connection conn = null;try {conn = factory.newConnection();return conn;} catch (Exception e) {throw new RuntimeException(e);}}
}
Hello World模式
代码
生产者
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//TCP 物理连接Connection conn= RabbitUtils.getConnection();//创建通信“通道”,相当于TCP中的虚拟连接Channel channel = conn.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//参数1:队列名称ID//参数2:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//参数3:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//参数4:是否自动删除,false代表连接停掉后不自动删除掉这个队列//参数5: nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//四个参数//参数1:exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到//参数2:队列名称//参数3:额外的设置属性//参数4:最后一个参数是要传递的消息字节数组String message = "helloworld!";channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes());channel.close();conn.close();System.out.println("发送数据成功");}
}
消费者
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection conn= RabbitUtils.getConnection();//创建通道Channel channel = conn.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//参数1:指定消费者要消费的队列//参数2:代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法//参数3:第三个参数要传入DefaultConsumer的实现类channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));}
}class Reciver extends DefaultConsumer{private Channel channel;//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到public Reciver(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/*super.handleDelivery(consumerTag,envelope,properties,body);*/String messageBody = new String(body);System.out.println("消费者接收到:" + messageBody);//签收消息,确认消息//envelope.getDeliveryTag() 获取这个消息的TagId//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag() , false);}
}
Work Queue模式
介绍
- Work Queue工作模式,它会发 送一些耗时的任务给多个工作者(Worker)。
- 在多个消息的情况下,Work Queue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息,并且可以根据处理消息的速度来接收消息的数量,进而让消费者程序发挥最大性能。
- Work Queue特别适合在集群环境中处理,能最大程序发挥每一台服务器的性能。
代码
消息对象
public class SMS {private String name;private String mobile;private String content;public SMS(String name, String mobile, String content) {this.name = name;this.mobile = mobile;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}
}
生产者
import com.google.gson.Gson;
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** OrderSystem作为订单消息的生产者* SMSSender1、SMSSender2、SMSSender3 这3个不同的消费者从同一个队列中消费不同的消息*/
public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);for(int i = 0 ; i <= 100 ; i++) {SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");String jsonSMS = new Gson().toJson(sms);channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());}System.out.println("发送数据成功");channel.close();connection.close();}
}
消费者1
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** SMSSender1作为订单消息的生产者1*/
public class SMSSender1 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是每次消费者处理完一个消息后(确认后),再从队列中获取一个新的//处理完一个取一个(这样性能比较好的服务器可以多消费消息)channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender1-短信发送成功:" + jsonSMS);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
消费者2
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** SMSSender2作为订单消息的生产者2*/
public class SMSSender2 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender2-短信发送成功:" + jsonSMS);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
消费者3
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** SMSSender3作为订单消息的生产者3*/
public class SMSSender3 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender3-短信发送成功:" + jsonSMS);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
发布(Publish)/订阅(Subscribe)模式
介绍
- 发布/订阅模式中,生产者不再直接与队列绑定, 而是将数据发送至“交换机Exchange”
- 交换机Exchange用于将数据按某种规则送入与之绑定的队列,进而供消费者使用。
- 发布/订阅模式中,交换机将无差别的将所有消息送入与之绑定的队列,所有消费者拿到的消息完全相同,交换机的类型被称为**fanout**。
使用场景
发布订阅模式因为所有消费者获得相同的消息,所以特别适合“数据提供商与应用商“。
例如:中国气象局提供“天气预报”送入交换机,网 易、新浪、百度、搜狐等门户接入通过队列绑定到该交换机,自动获取气象局推送的气象数据。
代码
生产者
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;/*** WeatherBureau作为消息生产者,将消息发布到交换机weather中* Baidu和Sina创建自己的队列并与交换机weather进行绑定*/
public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();String input = new Scanner(System.in).next();Channel channel = connection.createChannel();channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());channel.close();connection.close();}
}
消费者1
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名//参数2:交互机名//参数3:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
消费者2
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
路由Routing模式
介绍
- 路由(Routing)模式是在发布订阅模式基础上的变种。
- 发布订阅模式是无条件将所有消息分发给所有消费者队列。
- 路由模式则是交换机根据Routing Key有条件的将数据筛选后发给消费者队列。
- 路由模式下交换机的类型被称为**direct**。
代码
生产者
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String, String>();area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//Routing key 第二个参数相当于数据筛选的条件channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();}
}
消费者1
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991011");channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991012");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
消费者2
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991011");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991011");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991012");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991012");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
主题Topic模式
介绍
- 主题Topic模式是在Routing模式基础上,提供了对RouteKey模糊匹配的功能,可以简化程序的编写。
- 主题模式下,模糊匹配表达式规则为
- *****匹配单个关键字
- **#**匹配所有关键字
- 主题模式下交换机的类型被称为**topic**。
代码
生产者
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String, String>();area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//Routing key 第二个参数相当于数据筛选的条件channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();}
}
消费者1
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20991011");//channel.queueUnbind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20991011");//*.hebei.*.*channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
消费者2
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
Spring与RabbitMQ整合Exchange模式
代码
生产者和消费者共用的依赖:pom.xml文件的依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version>
</dependency>
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.5.RELEASE</version>
</dependency>
生产者端的代码和配置
spring配置文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 设置连接工厂,配置基本参数 --><rabbit:connection-factory id="connectionFactory"host="127.0.0.1"port="5672"username="guest"password="guest"virtual-host="/test" /><!--fanout-exchange | direct-exchange | topic-exchange声明一个名为topicExchange的topic交换机,如果这个交换机不存在,则自动创建--><rabbit:topic-exchange name="topicExchange" auto-declare="true"></rabbit:topic-exchange><!-- Spring为我们封装了RabbitTemplate对象来简化生产者发送数据的过程,对常用的方法进行了封装。 --><rabbit:template id="template" connection-factory="connectionFactory" exchange="topicExchange"></rabbit:template><!--在生产者中配置template对象,用于发送数据--><bean id="newsProducer" class="com.itlaoqi.rabbit.exchange.NewsProducer"><property name="rabbitTemplate" ref="template"/></bean><!-- RabbitAdmin对象用于创建、绑定、管理队列与交换机 --><rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
</beans>
消息对象News
import java.io.Serializable;
import java.util.Date;public class News implements Serializable{private String source;private String title;private Date createTime;private String content;public News(String source, String title, Date createTime, String content) {this.source = source;this.title = title;this.createTime = createTime;this.content = content;}public String getSource() {return source;}public void setSource(String source) {this.source = source;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}
}
消息生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;import java.util.Date;public class NewsProducer {private RabbitTemplate rabbitTemplate = null;public RabbitTemplate getRabbitTemplate() {return rabbitTemplate;}public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendNews(String routingKey , News news){//convertAndSend 用于向exchange发送数据//第一个参数是routingkey//第二个参数是要传递的对象,可以是字符串、byte【】或者任何实现了【序列化接口】的对象rabbitTemplate.convertAndSend(routingKey , news);System.out.println("新闻发送成功");}public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");np.sendNews("us.20190101" , new News("新华社" , "特朗普又又又退群啦" , new Date() , "国际新闻内容"));np.sendNews("china.20190101" , new News("凤凰TV" , "XXX企业荣登世界500强" , new Date() , "国内新闻内容"));}
}
消费者端的代码和配置
spring配置文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><rabbit:connection-factory id="connectionFactory"host="39.105.91.158"port="5672"username="jack"password="123456"virtual-host="/test"/><rabbit:admin connection-factory="connectionFactory"/><!--创建队列--><rabbit:queue name="topicQueue" auto-declare="true" auto-delete="false" durable="false" exclusive="false"/><!--交换机与队列绑定,并指明筛选条件--><rabbit:topic-exchange name="topicExchange" auto-declare="true"><rabbit:bindings><rabbit:binding queue="topicQueue" pattern="us.*"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--启动消费者后,Spring底层自动监听对应的topicQueue数据,一旦有新的消息进来,自动传入到consumer Bean的recv的News参数中,之后再程序对News进一步处理--><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="consumer" method="recv" queue-names="topicQueue"/></rabbit:listener-container><bean id="consumer" class="com.itlaoqi.rabbitmq.NewsConsumer"></bean>
</beans>
消息消费者
package com.itlaoqi.rabbitmq;import com.itlaoqi.rabbit.exchange.News;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class NewsConsumer {public void recv(News news){System.out.println("接收到最新新闻:" + news.getTitle() + ":" + news.getSource());}public static void main(String[] args) {//初始化IOC容器,加载spring的配置文件后,就会创建配置文件中配置的队列 // 并消费与之绑定的交换机里面的消息ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");}
}
SpringBoot2与RabbitMQ整合
RabbitMQ入门篇、介绍RabbitMQ常用的五种模式相关推荐
- Rabbitmq中常用的五种连接方式
目录 前提准备 方式一: Hello World 服务端(provider)代码 客户端(customer)代码 方式二: work(以下方式都是通过工具类来创建connection对象) 有两种方 ...
- rabbitMQ概述/在springboot下测试五种模式
一.应用场景: (1) 异步操作: 任务异步处理将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理.提高了应用程序的响应时间. (2) 解耦: 应用程序解耦合MQ相当于一个中介,生 ...
- Kotlin中单利常用的五种写法
前言 单利模式是写代码过程中不可避免用到的,下面我总结一下单利常用的五种写法,话不多说了,来一起看看详细的介绍吧 加载类时创建单利 Java实现 public class Config{ privat ...
- SpingBoot yml语法及测试总结yml文件常用的五种方式
yml语法及测试总结yml文件常用的五种方式 引言 项目介绍 初始化SpringBoot项目 application.properties介绍 yml配置文件基本语法 实际操作测试 引言 今天在开发过 ...
- python各个解释器的用途-常用的五种Python解释器|老男孩网络Python学习课程
Python是一门解释器语言,代码想运行,必须通过解释器执行,Python存在多种解释器,分别基于不同语言开发,每个解释器有不同的特点,但都能正常运行Python代码,以下是常用的五种Python解释 ...
- ps抠图基础篇:最常用的四种抠图方法
ps抠图基础篇:最常用的四种抠图方法 一.善用魔术棒法 用魔法帮抠图是最直观明了的抠图方法,也是最基础的抠图方法,适用范围是图像和背景色差非常明显,背景颜色单一,图像边界清晰. 魔法棒抠图就是通过删除 ...
- PMP 冲突管理常用的五种方法
PMP 冲突管理常用的五种方法 一.五种常用的方法 • 撤退/回避:从实际或者潜在冲突中退出,将问题推迟到准备充分的时候,或推给其他人 • 缓和/包容:强调一致而非差异(求同存异) • 妥协/调解:为 ...
- 常用的五种Python解释器
Python是一门解释器语言,代码想运行,必须通过解释器执行,Python存在多种解释器,分别基于不同语言开发,每个解释器有不同的特点,但都能正常运行Python代码, 以下是常用的五种Python解 ...
- 简单介绍一下常用的几种无位置传感器的控制方式
近年来,直流无刷电机的无位置传感器技术日益受到人们的关注,无位置传感器控制技术已成为直流无刷电机控制技术的一个发展方向.下面就简单介绍一下常用的几种无位置传感器控制方式: 1.反电势过零检测法 在直流 ...
最新文章
- 从Qcheck 1.3 不能在不同操作系统上运行问题(chro124、chro342)说开来------
- Nginx的配置文件nginx.conf详解
- linux如何ARP嗅探 Linux下嗅探工具Dsniff安装记录
- 中国建设银行(2011.11.19南京大学)
- QT串口编程的相关类(QSerialPortInfo)
- 【华为云技术分享】STM32L476移植华为LiteOS系列教程---开发前的准备 2
- 在Mac中关闭应用通知的两种方法
- PHP+MYSQL图书管理系统(课设)
- Windows 电源计划设置关闭显示器不起作用的解决方法
- Mangos地区代码
- Linux C编程(五) 之 gdb详解
- 以下哪个不是迭代算法的缺点_深究递归和迭代的区别、联系、优缺点及实例对比...
- 为什么手机里的小爱音响app里搜不到家里的小爱音箱_水哥岁末诚意奉献:基于米家App的家庭智能安全方案详解...
- gee学习2数据获取、数据筛选、创建地理要素
- 微软邮箱(hotmail/outlook):应用密码获取+STARTTLS加密
- TypeError:‘bool’ oboject is not callable
- 重庆大学计算机学院课题组,【计算机】计算机学院关于智能计算的大规模优化学术报告圆满结束...
- oracle数据表转换为mysql数据表
- python cannot concatenate_python相关报错及解决方式
- 2022年终总结--你好2023
热门文章
- 还不会用Vue写出的excel表实现pdf导出吗?来,豪豪手把手教你
- html平板电脑打不开,平板电脑浏览器打不开网页
- 在威联通NAS上实现硬盘独立休眠
- ubuntu安装gef,pwndbg,peda
- oracle查询谁修改了数据ip,查询oracle特定表修改的用户及IP信息
- Android中TextView加横线的属性
- 解决Xcode couldn‘t find any iOS App Development provisioning profiles matching ‘com.facebook.WebDriver
- java activeMQ消息的发送与接收
- jar包扫描工具: gamma
- docker基础篇--有它就够了