什么是中间件

我国企业从20世纪80年代开始就逐渐进行信息化建设,由于方法和体系的不成熟,以及企业业务和市场需求的不断变化,一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。现在的问题是,如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。中间件便是解决之道,它用自己的复杂换取了企业应用的简单。

什么是中间件飞哥  分类:学习笔记 创建时间:2021/02/28 13:33 小字体最后修改于: 2021/03/02 19:39
1、什么是中间件
我国企业从20世纪80年代开始就逐渐进行信息化建设,由于方法和体系的不成熟,以及企业业务和市场需求的不断变化,一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。现在的问题是,如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。中间件便是解决之道,它用自己的复杂换取了企业应用的简单。中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。

举例:
1,RMI(Remote Method Invocations, 远程调用)
2,Load Balancing(负载均衡,将访问负荷分散到各个服务器中)
3,Transparent Fail-over(透明的故障切换)
4,Clustering(集群,用多个小的服务器代替大型机)
5,Back-end-Integration(后端集成,用现有的、新开发的系统如何去集成遗留的系统)
6,Transaction事务(全局/局部)全局事务(分布式事务)局部事务(在同一数据库联接内的事务)
7,Dynamic Redeployment(动态重新部署,在不停止原系统的情况下,部署新的系统)
8,System Management(系统管理)
9,Threading(多线程处理)
10,Message-oriented Middleware面向消息的中间件(异步的调用编程)
11,Component Life Cycle(组件的生命周期管理)
12,Resource pooling(资源池)
13,Security(安全)
14,Caching(缓存)

2、为什么需要使用消息中间件

具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担。中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。

3、中间件特点

为解决分布异构问题,人们提出了中间件(middleware)的概念。中间件是位于平台(硬件和操作系统)和应用之间的通用服务,如下图所示,这些服务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,它们可以有符合接口和协议规范的多种实现。

也许很难给中间件一个严格的定义,但中间件应具有如下的一些特点:
(1)满足大量应用的需要
(2)运行于多种硬件和OS平台
(3)支持分布计算,提供跨网络、硬件和OS平台的透明性的应用或服务的交互
(4)支持标准的协议
(5)支持标准的接口

由于标准接口对于可移植性和标准协议对于互操作性的重要性,中间件已成为许多标准化工作的主要部分。对于应用软件开发,中间件远比操作系统和网络服务更为重要,中间件提供的程序接口定义了一个相对稳定的高层应用环境,不管底层的计算机硬件和系统软件怎样更新换代,只要将中间件升级更新,并保持中间件对外的接口定义不变,应用软件几乎不需任何修改,从而保护了企业在应用软件开发和维护中的重大投资。

简单说:中间件有个很大的特点,是脱离于具体设计目标,而具备提供普遍独立功能需求的模块。这使得中间件一定是可替换的。如果一个系统设计中,中间件是不可替换的,不是架构、框架设计有问题,那么就是这个中间件,在 别处可能是个中间件,在这个系统内是引擎。哈。

4、在项目中什么时候使用中间件技术

在项目的架构和重构中,使用任何技术和架构的改变我们都需要谨慎斟酌和思考,因为任何技术的融入和变化都可能人员,技术,和成本的增加,中间件的技术一般现在一些互联网公司或者项目中使用比较多,如果你仅仅还只是一个初创公司建议还是使用单体架构,最多加个缓存中间件即可,不要盲目追求新或者所谓的高性能,而追求的背后一定是业务的驱动和项目的驱动,因为一旦追求就意味着你的学习成本,公司的人员结构以及服务器成本,维护和运维的成本都会增加,所以需要谨慎选择和考虑。

但是作为一个开放人员,一定要有学习中间件技术的能力和思维,否则很容易当项目发展到一个阶段在去掌握估计或者在面试中提及,就会给自己带来不小的困扰,在当今这个时代这些技术也并不是什么新鲜的东西,如果去掌握和挖掘最关键的还是自己花时间和花精力去探讨和研究。

5、课程的规划和安排

基于消息中间件的分布式系统的架构

01、基于消息中间件的分布式系统的架构

从上图中可以看出来,消息中间件的是
1:利用可靠的消息传递机制进行系统和系统直接的通讯
2:通过提供消息传递和消息的排队机制,它可以在分布式系统环境下扩展进程间的通讯。

02、消息中间件应用的场景

1:跨系统数据传递
2:高并发的流量削峰
3:数据的分发和异步处理
4:大数据分析与传递
5:分布式事务
比如你有一个数据要进行迁移或者请求并发过多的时候,比如你有10W的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行。

03、常见的消息中间件

ActiveMQ、RabbitMQ、Kafka、RocketMQ等。

04、消息中间件的本质及设计

它是一种接受数据,接受请求、存储数据、发送数据等功能的技术服务。

MQ消息队列:负责数据的传接受,存储和传递,所以性能要过于普通服务和技术。

谁来生产消息,存储消息和消费消息呢?

05、消息中间件的核心组成部分

1:消息的协议
2:消息的持久化机制
3:消息的分发策略
4:消息的高可用,高可靠
5:消息的容错机制

06、小结

其实不论选择单体架构还是分布式架构都是项目开发的一个阶段,在什么阶段选择适合的架构方式,而不能盲目追求,最后造成的后果和问题都需要自己买单。但是作为一个开发人员学习和探讨新的技术是我们每个程序开发者都应该去保持和思考的问题。当我们没办法去改变社会和世界的时候,我们为了生活和生存那就必须要迎合企业和市场的需求,发挥你的价值和所学的才能,创造价值和实现自我。

消息队列协议

01、什么是协议

我们知道消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,你是采用底层的TCP/IP,UDP协议还是其他的自己取构建等,而这些约定成俗的规范就称之为:协议。

所谓协议是指:
1:计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。
2:和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高。
3:协议对数据格式和计算机之间交换数据都必须严格遵守规范。

02、网络协议的三要素

1.语法。语法是用户数据与控制信息的结构与格式,以及数据出现的顺序。
2.语义。语义是解释控制信息每个部分的意义。它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应。
3.时序。时序是对事件发生顺序的详细说明。

比如我MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的响应结果和反馈是什么,然后按照对应的执行顺序进行处理。如果你还是不理解:大家每天都在接触的http请求协议:

1:语法:http规定了请求报文和响应报文的格式。
2:语义:客户端主动发起请求称之为请求。(这是一种定义,同时你发起的是post/get请求)
3:时序:一个请求对应一个响应。(一定先有请求在有响应,这个是时序)

而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。

面试题:为什么消息中间件不直接使用http协议呢?
1: 因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速。
2:大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

03:AMQP协议

AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
特性:
1:分布式事务支持。
2:消息的持久化支持。
3:高性能和高可靠的消息处理优势。

AMQP协议的支持者:

04:MQTT协议

MQTT协议:(Message Queueing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。
特点:
1:轻量
2:结构简单
3:传输快,不支持事务
4:没有持久化设计。
应用场景:
1:适用于计算能力有限
2:低带宽
3:网络不稳定的场景。
支持者:

05、OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
特点:
1:结构简单
2:解析速度快
3:支持事务和持久化设计。

06、Kafka协议

Kafka协议是基于TCP/IP的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。
特点是:
1:结构简单
2:解析速度快
3:无事务支持
4:有持久化设计

07、小结

协议:是在tcp/ip协议基础之上构建的一种约定成俗的规范和机制、它的主要目

消息队列持久化

01、持久化

简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。

02、常见的持久化方式

ActiveMQ RabbitMQ Kafka RocketMQ
文件存储 支持 支持 支持 支持
数据库 支持 / / /

消息的分发策略

01、消息的分发策略

MQ消息队列有如下几个角色
1:生产者
2:存储消息
3:消费者
那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。

02、场景分析一

比如我在APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被那个系统或者那些服务或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论。

03、场景分析二

在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发。

04、消息分发策略的机制和对比

ActiveMQ RabbitMQ Kafka RocketMQ
发布订阅 支持 支持 支持 支持
轮询分发 支持 支持 支持 /
公平分发 / 支持 支持 /
重发 支持 支持 / 支持
消息拉取 / 支持 支持 支持

消息队列高可用和高可靠

01、什么是高可用机制

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。

02、集群模式1 - Master-slave主从共享数据的部署方式

解说:生产者讲消费发送到Master节点,所有的都连接这个消息队列共享这块数据区域,Master节点负责写入,一旦Master挂掉,slave节点继续服务。从而形成高可用,

03、集群模式2 - Master- slave主从同步部署方式

解释:这种模式写入消息同样在Master主节点上,但是主节点会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点就行消费,以为消息的拷贝和同步会暂用很大的带宽和网络资源。在后续的rabbtmq中会有使用。

04、集群模式3 - 多主集群同步部署模式

解释:和上面的区别不是特别的大,但是它的写入可以往任意节点去写入。

05、集群模式4 - 多主集群转发部署模式

解释:如果你插入的数据是broker-1中,元数据信息会存储数据的相关描述和记录存放的位置(队列)。
它会对描述信息也就是元数据信息就行同步,如果消费者在broker-2中进行消费,发现自己几点没有对应的消息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他会去联系其他的黄牛询问,如果有就返回。

06、集群模式5 Master-slave与Breoker-cluster组合的方案

解释:实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高。

这么集群模式,具体在后续的课程中会进行一个分析和讲解。他们的最终目的都是为保证:消息服务器不会挂掉,出现了故障依然可以抱着消息服务继续使用。

反正终归三句话:
1:要么消息共享,
2:要么消息同步
3:要么元数据共享

07、什么是高可靠机制

所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。
如何保证中间件消息的可靠性呢?可以从两个方面考虑:
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2:消息的存储可靠:通过持久化来保证消息的可靠性。

RabbitMQ入门及安装

01、概述

官网:https://www.rabbitmq.com/
什么是RabbitMQ,官方给出来这样的解释:

RabbitMQ is the most widely deployed open source message broker.
With tens of thousands of users, RabbitMQ is one of the most popular open source message brokers. From T-Mobile to Runtastic, RabbitMQ is used worldwide at small startups and large enterprises.
RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.
RabbitMQ runs on many operating systems and cloud environments, and provides a wide range of developer tools for most popular languages.
翻译以后:
RabbitMQ是部署最广泛的开源消息代理。
RabbitMQ拥有成千上万的用户,是最受欢迎的开源消息代理之一。从T-Mobile 到Runtastic,RabbitMQ在全球范围内的小型初创企业和大型企业中都得到使用。
RabbitMQ轻巧,易于在内部和云中部署。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。
RabbitMQ可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具。

简单概述:
RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。

02、安装RabbitMQ

1:下载地址:https://www.rabbitmq.com/download.html
2:环境准备:CentOS7.x+ / Erlang
RabbitMQ是采用Erlang语言开发的,所以系统环境必须提供Erlang环境,第一步就是安装Erlang。

erlang和RabbitMQ版本的按照比较: https://www.rabbitmq.com/which-erlang.html

03、 Erlang安装

查看系统版本号

[root@iZm5eauu5f1ulwtdgwqnsbZ ~]# lsb_release -aLSB Version:    :core-4.1-amd64:core-4.1-noarchDistributor ID: CentOSDescription:    CentOS Linux release 8.3.2011Release:        8.3.2011Codename:       n/a

3-1:安装下载

参考地址:https://www.erlang-solutions.com/downloads/

wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpmrpm -Uvh erlang-solutions-2.0-1.noarch.rpm

3-2:安装成功

yum install -y erlang

3-3:安装成功

erl -v

04、安装socat

yum install -y socat

05、安装rabbitmq

下载地址:https://www.rabbitmq.com/download.html

5-1:下载rabbitmq

> wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.13/rabbitmq-server-3.8.13-1.el8.noarch.rpm> rpm -Uvh rabbitmq-server-3.8.13-1.el8.noarch.rpm
yum install rabbitmq-server -y

5-2:启动rabbitmq服务

# 启动服务> systemctl start rabbitmq-server# 查看服务状态> systemctl status rabbitmq-server# 停止服务> systemctl stop rabbitmq-server# 开机启动服务> systemctl enable rabbitmq-server

06、RabbitMQ的配置

RabbitMQ默认情况下有一个配置文件,定义了RabbitMQ的相关配置信息,默认情况下能够满足日常的开发需求。如果需要修改需要,需要自己创建一个配置文件进行覆盖。
参考官网:
1:https://www.rabbitmq.com/documentation.html
2:https://www.rabbitmq.com/configure.html
3:https://www.rabbitmq.com/configure.html#config-items
4:https://github.com/rabbitmq/rabbitmq-server/blob/add-debug-messages-to-quorum_queue_SUITE/docs/rabbitmq.conf.example

06-1、相关端口

5672:RabbitMQ的通讯端口
25672:RabbitMQ的节点间的CLI通讯端口是
15672:RabbitMQ HTTP_API的端口,管理员用户才能访问,用于管理RabbitMQ,需要启动Management插件。
1883,8883:MQTT插件启动时的端口。
61613、61614:STOMP客户端插件启用的时候的端口。
15674、15675:基于webscoket的STOMP端口和MOTT端口

一定要注意:RabbitMQ 在安装完毕

RabbitMQWeb管理界面及授权操作

理界面

01-1:默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效

rabbitmq-plugins enable rabbitmq_management

说明:rabbitmq有一个默认账号和密码是:guest 默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户。

01-2:安装完毕以后,重启服务即可

systemctl restart rabbitmq-server

一定要记住,在对应服务器(阿里云,腾讯云等)的安全组中开放15672的端口。

01-3:在浏览器访问

http://ip:15672/ 如下:

02、授权账号和密码

2-1:新增用户

rabbitmqctl add_user admin admin

2-2:设置用户分配操作权限

rabbitmqctl set_user_tags admin administrator

用户级别:

  • 1、administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
  • 2、monitoring 监控者 登录控制台,查看所有信息
  • 3、policymaker 策略制定者 登录控制台,指定策略
  • 4、managment 普通管理员 登录控制台

2-3:为用户添加资源权限

rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"

03、小结:

rabbitmqctl add_user 账号 密码rabbitmqctl set_user_tags 账号 administratorrabbitmqctl change_password Username Newpassword 修改密码rabbitmqctl delete_user Username 删除用户rabbitmqctl list_users 查看用户清单rabbitmqctl.bat set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置administrator角色rabbitmqctl.bat set_permissions -p / root ".*" ".*" ".*"

RabbitMQ之Docker安装

01、Docker安装RabbitMQ

1-1、虚拟化容器技术—Docker的安装

(1)yum 包更新到最新> yum update(2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的> yum install -y yum-utils device-mapper-persistent-data lvm2(3)设置yum源为阿里云> yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo(4)安装docker> yum install docker-ce -y(5)安装后查看docker版本> docker -v (6) 安装加速镜像 sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' {  "registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload sudo systemctl restart docker

1-2、docker的相关命令

# 启动docker:systemctl start docker# 停止docker:systemctl stop docker# 重启docker:systemctl restart docker# 查看docker状态:systemctl status docker# 开机启动:  systemctl enable dockersystemctl unenable docker# 查看docker概要信息docker info# 查看docker帮助文档docker --help

1-3、安装rabbitmq

参考网站:
1:https://www.rabbitmq.com/download.html
2:https://registry.hub.docker.com/_/rabbitmq/

1-4、获取rabbit镜像:

docker pull rabbitmq:management

1-5、创建并运行容器

docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management

—hostname:指定容器主机名称
—name:指定容器名称
-p:将mq端口号映射到本地
或者运行时设置用户和密码

docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

查看正在运行容器

docker ps

启动rabbitmq_management(图形化界面 )

docker exec -it myrabbit rabbitmq-plugins enable rabbitmq_management

查看日志

docker logs -f myrabbit

1-6、容器运行正常

使用 http://你的IP地址:15672 访问rabbit控制台

02、额外Linux相关排查命令

> more xxx.log  查看日记信息> netstat -naop | grep 5672 查看端口是否被占用> ps -ef | grep 5672  查看进程> systemctl stop 服务

RabbitMQ的角色分类

01、RabbitMQ的角色分类

1:none:

  • 不能访问management plugin

2:management:查看自己相关节点信息

  • 列出自己可以通过AMQP登入的虚拟机
  • 查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
  • 查看和关闭自己的channels和connections
  • 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。

3:Policymaker

  • 包含management所有权限
  • 查看和创建和删除自己的virtual hosts所属的policies和parameters信息。

4:Monitoring

  • 包含management所有权限
  • 罗列出所有的virtual hosts,包括不能登录的virtual hosts。
  • 查看其他用户的connections和channels信息
  • 查看节点级别的数据如clustering和memory使用情况
  • 查看所有的virtual hosts的全局统计信息。

5:Administrator

  • 最高权限
  • 可以创建和删除virtual hosts
  • 可以查看,创建和删除users
  • 查看创建permisssions
  • 关闭所有用户的connections

6、具体操作的界面

RabbitMQ入门案例 - Simple 简单模式

01、实现步骤

1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程

02、构建一个maven工程

03、导入rabbitmq的maven依赖

03-1:Java原生依赖

<dependency>    <groupId>com.rabbitmq</groupId>    <artifactId>amqp-client</artifactId>    <version>5.10.0</version></dependency>

03-2:spring依赖

<dependency>    <groupId>org.springframework.amqp</groupId>    <artifactId>spring-amqp</artifactId>    <version>2.2.5.RELEASE</version></dependency><dependency>    <groupId>org.springframework.amqp</groupId>    <artifactId>spring-rabbit</artifactId>    <version>2.2.5.RELEASE</version></dependency>

03-3、springboot依赖

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

上面根据自己的项目环境进行选择即可。

番外:rabbitmq和spring同属一个公司开放的产品,所以他们的支持也是非常完善,这也是为什么推荐使用rabbitmq的一个原因。

04、启动rabbitmq-server服务

systemctl start rabbitmq-server或者docker start myrabbit

05、定义生产者

package com.xuexiangban.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*** @author: 学相伴-飞哥* @description: Producer 简单队列生产者* @Date : 2021/3/2*/
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */channel.queueDeclare("queue1", false, false, false, null);// 6: 准备发送消息的内容String message = "你好,学相伴!!!";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routing// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish("", "queue1", null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

1:执行发送,这个时候可以在web控制台查看到这个队列queue的信息

2:我们可以进行对队列的消息进行预览和测试如下:

3:进行预览和获取消息进行测试

06、定义消费者

package com.xuexiangban.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*** @author: 学相伴-飞哥* @description: Producer 简单队列生产者* @Date : 2021/3/2*/
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */channel.queueDeclare("queue1", false, false, false, null);// 6: 准备发送消息的内容String message = "你好,学相伴!!!";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routing// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish("", "queue1", null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

07、观察消息的在rabbitmq-server服务中的过程

  • 具体详细视频教程

什么是AMQP

什么是AMQP

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。

AMQP生产者流转过程

AMQP消费者流转过程

RabbitMQ的核心组成部分

01、RabbitMQ的核心组成部分

核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

02、RabbitMQ整体架构是什么样子的?

03、RabbitMQ的运行流程

03、RabbitMQ支持消息的模式

参考官网:https://www.rabbitmq.com/getstarted.html

03-1、简单模式 Simple

  • 参考第12章节

03-2、工作模式 Work

  • web操作查看视频
  • 类型:无
  • 特点:分发机制

03-3、发布订阅模式

  • web操作查看视频
  • 类型:fanout
  • 特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。

03-4、路由模式

  • web操作查看视频
  • 类型:direct
  • 特点:有routing-key的匹配模式

03-5、主题Topic模式

  • web操作查看视频
  • 类型:topic
  • 特点:模糊的routing-key的匹配模式

03-6、参数模式

  • web操作查看视频
  • 类型:headers
  • 特点:参数匹配模式

小结

  • rabbitmq发送消息一定有一个交换机
  • 如果队列没有指定交换机会默认绑定一个交换机

RabbitMQ入门案例 - fanout模式

RabbitMQ支持消息的模式

参考官网:https://www.rabbitmq.com/getstarted.html

01、RabbitMQ的模式之发布订阅模式

图解

01-1、发布订阅模式具体实现

  • web操作查看视频
  • 类型:fanout
  • 特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。

生产者

package com.xuexiangban.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*** @author: 学相伴-飞哥* @description: Producer 简单队列生产者* @Date : 2021/3/2*/
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容String message = "你好,学相伴!!!";String  exchangeName = "fanout-exchange";String routingKey = "";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish(exchangeName, routingKey, null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

消费者

package com.xuexiangban.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
/*** @author: 学相伴-飞哥* @description: Consumer* @Date : 2021/3/2*/
public class Consumer {private static Runnable runnable = () -> {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//获取队列的名称final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 启动三个线程去执行new Thread(runnable, "queue-1").start();new Thread(runnable, "queue-2").start();new Thread(runnable, "queue-3").start();}
}

RabbitMQ入门案例 - Direct模式

RabbitMQ支持消息的模式

参考官网:https://www.rabbitmq.com/getstarted.html

01、RabbitMQ的模式之Direct模式

图解

01-1、发布订阅模式具体实现

  • web操作查看视频
  • 类型:direct
  • 特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。

生产者

package com.xuexiangban.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*** @author: 学相伴-飞哥* @description: Producer 简单队列生产者* @Date : 2021/3/2*/
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容String message = "你好,学相伴!!!";String  exchangeName = "direct-exchange";String routingKey1 = "testkey";String routingKey2 = "testkey2";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

消费者

package com.xuexiangban.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
/*** @author: 学相伴-飞哥* @description: Consumer* @Date : 2021/3/2*/
public class Consumer {private static Runnable runnable = () -> {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//获取队列的名称final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 启动三个线程去执行new Thread(runnable, "queue-1").start();new Thread(runnable, "queue-2").start();new Thread(runnable, "queue-3").start();}
}

RabbitMQ入门案例 - Topic模式

RabbitMQ支持消息的模式

参考官网:https://www.rabbitmq.com/getstarted.html

01、RabbitMQ的模式之Topic模式

图解

01-1、发布订阅模式具体实现

  • web操作查看视频
  • 类型:topic
  • 特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。

生产者

package com.xuexiangban.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*** @author: 学相伴-飞哥* @description: Producer 简单队列生产者* @Date : 2021/3/2*/
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容String message = "你好,学相伴!!!";String  exchangeName = "topic-exchange";String routingKey1 = "com.course.order";//都可以收到 queue-1 queue-2String routingKey2 = "com.order.user";//都可以收到 queue-1 queue-3// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

消费者

package com.xuexiangban.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
/*** @author: 学相伴-飞哥* @description: Consumer* @Date : 2021/3/2*/
public class Consumer {private static Runnable runnable = () -> {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//获取队列的名称final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 启动三个线程去执行new Thread(runnable, "queue-1").start();new Thread(runnable, "queue-2").start();new Thread(runnable, "queue-3").start();}
}

RabbitMQ入门案例 - Work模式 - 轮询模式(Round-Robin)

01、RabbitMQ支持消息的模式

参考官网:https://www.rabbitmq.com/getstarted.html

02、Work模式轮询模式(Round-Robin)

图解

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

Work模式 - 轮询模式(Round-Robin)

  • 类型:无
  • 特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成;

生产者

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*** @author: 学相伴-飞哥* @description: Producer 简单队列生产者* @Date : 2021/3/2*/
public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容//===============================end topic模式==================================for (int i = 1; i <= 20; i++) {//消息的内容String msg = "学相伴:" + i;// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish("", "queue1", null, msg.getBytes());}System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

消费者 - Work1

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/*** @author: 学相伴-飞哥* @description: Consumer* @Date : 2021/3/2*/
public class Work1 {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("消费者-Work1");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义
//            channel.queueDeclare("queue1", false, false, false, null);// 同一时刻,服务器只会推送一条消息给消费者// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicQos(1);finalChannel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(2000);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work1-开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

消费者 - Work2

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/*** @author: 学相伴-飞哥* @description: Consumer* @Date : 2021/3/2*/
public class Work2 {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("消费者-Work2");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, true, false, null);// 同一时刻,服务器只会推送一条消息给消费者//channel.basicQos(1);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicQos(1);finalChannel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(200);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work2-开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

03、小结

work1和work2的消息处理能力不同,但是最后处理的消息条数相同,是“按均分配”。

RabbitMQ入门案例 - Work模式 - 公平分发(Fair Dispatch)

飞哥 分类:学习笔记 创建时间:2021/03/02 20:05 [字体](javascript:void(0)

学相伴飞哥RabbitMQ笔记相关推荐

  1. 学相伴飞哥RabbitMQ笔记以及个人总结

    一.什么是中间件? 1.简介 我国企业从20世纪80年代开始就逐渐进行信息化建设,由于方法和体系的不成熟,以及企业业务和市场需求的不断变化,一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不 ...

  2. RabbitMQ狂神说笔记(RabbitMQ B站狂神说笔记、KuangStudy、学相伴飞哥)

    一. 引用文章 RabbitMQ狂神说笔记(B站狂神说笔记.KuangStudy.学相伴飞哥) RabbitMQ狂神说笔记(B站狂神说笔记.KuangStudy.学相伴飞哥)百度云盘地址,提取码:07 ...

  3. 【学相伴】狂神说 RabbitMQ笔记(简单使用RabbitMQ)

    目录 什么是rabbitMQ 使用docker安装RabbitMQ,如果没有使用过docker的可以看这篇文章https://blog.csdn.net/qq_44716544/article/det ...

  4. jpa 自定义sql if_跟飞哥学编程:SQL入门-:函数、存储过程和触发器

    最后不要忘记:SQL是一种结构化(Structured)的语言(Language),所以它具有 编程语言的特性 声明变量和赋值 所谓变量,可以是看成一个存储数据的容器,所有它里面存储的值是可以变化的. ...

  5. sql找出2000-3000年中的闰年。_跟飞哥学编程:SQL入门-4-查询和条件

    为了教学方便,我们先引入一个关键字: SELECT 使用SELECT,可以查询得到表数据,比如: SELECT 其中,星号(*)代表所有列.运行上述SQL语句,返回的就是Student表的所有行所有列 ...

  6. 网站性能调优实战-学相伴KuangStudy

    面对并发我们是如何优化KuangStudy网站性能的? 每个项目都会随着用户和数据的增长调整架构,来面对未来的问题,我们也不例外,在1月5号我们平台正式公测后,引起了很多观众的热烈反响,仅仅4天,注册 ...

  7. 今天聊聊飞哥是怎么阅读内核源码的

    大家好,我是飞哥! 经常在后台收到读者的交流,Linux 源码那么庞大,飞哥你是如何读的呢?由于问这个问题的太多,我想有必要专门写一篇文章聊一聊. 首先,我先说一点,其实我本人不是搞内核相关工作的.我 ...

  8. 菜鸟学Linux 第044篇笔记 算法和私有CA

    菜鸟学Linux 第044篇笔记 算法和私有CA 证书吊销列表CRL(Certificate Revocation List ) 如何解决私钥丢失 PKI: Public Key Infrastruc ...

  9. windows pxe 安装linux,菜鸟学Linux 第103篇笔记 pxe自动化安装linux

    菜鸟学Linux 第103篇笔记 pxe自动化安装linux 内容总览 linux的系统安装 kickstart文件的组成部分 DHCP (Dynamic Host Configuration Pro ...

最新文章

  1. 王贻芳院士:我们的科技管理过度强调竞争,缺乏稳定支持
  2. Mongodb集群 - 副本集内部选举机制
  3. 【Java Web开发指南】AjaxJson笔记
  4. Lua 中的 function、closure、upvalue
  5. 《C++ Primer Plus(第6版)中文版》——1.2 C++简史
  6. O(lg m + lgn)时间复杂度求两个有序序列合并后第K大的数
  7. MEMS:万物智联技术关键
  8. 改变蜡笔小新的眼睛颜色(对bmp图像的部分更改)
  9. Android开发获取ImageView显示的图片尺寸
  10. 超人视觉怎么样/机器视觉培训适合报培训班吗
  11. Hadoop的归档---har
  12. 微信小程序 live-player 实时音视频播放 组件
  13. 音质蓝牙耳机哪款好用?2023公认音质好的四款蓝牙耳机推荐
  14. 360FLEX与会者的演讲资源链接
  15. 设计模式之一代理模式
  16. 智能物联网网关有哪些必备功能
  17. 10秒钟执行一次计划任务
  18. 为什么需要云计算机,为什么需要云计算
  19. 算法竞赛入门经典第一节问题1-5
  20. 介绍一款好用的java反编译工具 - jd-gui

热门文章

  1. 中国女足,中国女垒,中国女排,舒米,让我欢喜让我忧
  2. 甲骨文发布2018第一季度财报 总收入92亿美元
  3. c语言考试系统设计报告,C语言课程设计(单项选择题标准化考试系统)报告
  4. 重磅!欧盟委员会正式提议:禁止强迫劳动产品进入欧洲市场
  5. python 自动抢红包_基于Airtest实现python自动抢红包
  6. 第三章 View的基本概念
  7. 2018最牛java初级笔试面试题,offer拿到手软
  8. 前端证券项目_头条猿辅导瓜子老虎证券等前端面经
  9. 政务使用区块链技术,网络安全风险不容小觑
  10. 【Linux Mint】通过vino-server共享Cinnamon桌面