文章目录

  • 写在前面
  • 1. 介绍
    • 1.1 什么是MQ
    • 1.2 什么是RabbitMQ
    • 1.3 AMQP 协议
  • 2. Go语言操作RabbitMQ
    • 2.1 下载
    • 2.2 引入驱动
    • 2.3 HelloWorld 模型
      • 2.3.1 生产者
      • 2.3.2 消费者
      • 2.3.3 结果
    • 2.4 Work Queues 模型
      • 2.4.1 生产者
      • 2.4.2 消费者
      • 2.4.3 结果
    • 2.5 Publish/Subscribe 模型
      • 2.5.1 生产者
      • 2.5.2 消费者
      • 2.5.3 结果
    • 2.6 Routing 模型
      • 2.6.1 生产者
      • 2.6.2 消费者
    • 2.7 Topics 模型
      • 2.7.1 生产者
      • 2.7.2 消费者
    • 2.8 RPC 模型

写在前面

本文是使用Go语言实现各种RabbitMQ的中间件模型

1. 介绍

1.1 什么是MQ

MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。

别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

目前市面上有很多消息中间件:RabbitMQ,RocketMQ,Kafka等等…

1.2 什么是RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。

1.3 AMQP 协议

AMQP(advanced message queuing protocol) 在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。

顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

2. Go语言操作RabbitMQ

2.1 下载

下载rabbitmq过程就省了,可以直接到官网网站下载安装,像安装qq一样。

2.2 引入驱动

  • 驱动

go get github.com/streadway/amqp

  • 连接
var MQ *amqp.Connection// RabbitMQ 链接
func RabbitMQ(connString string) {conn, err := amqp.Dial(connString)if err != nil {panic(err)}MQ = conn
}

2.3 HelloWorld 模型


P代表生产者,C代表消费者,红色部分是队列。
生产者生成消息到队列中,消费者进行消费,直连单点模式。

2.3.1 生产者

  • 声明连接对象
var ProductMQ *amqp.Connection
  • 声明通道
ch, err := ProductMQ.Channel()
  • 创建队列
q, err := ch.QueueDeclare("hello",    // 队列名字false,       // 是否持久化,false,   // 不用的时候是否自动删除false,       // 用来指定是否独占队列false,     // no-waitnil,          // 其他参数
)

参数1(name):队列名字
参数2(durable):持久化,队列中所有的数据都是在内存中的,如果为true的话,这个通道关闭之后,数据就会存在磁盘中持久化,false的话就会丢弃
参数3(autoDelete):不需要用到队列的时候,是否将消息删除
参数4(exclusive):是否独占队列,true的话,就是只能是这个进程独占这个队列,其他都不能对这个队列进行读写
参数5(noWait):是否阻塞
参数6(args):其他参数

  • 发布消息
body := "Hello World!"err = ch.Publish("",     // 交换机q.Name, // 队列名字false,  // 是否强制性// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者// 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉false, //当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者// 是否立刻/**概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。**/amqp.Publishing{ContentType: "text/plain",Body:        []byte(body), // 发送的消息})

参数1(exchange):交换机,后续会讲到
参数2(route-key):队列名字
参数3(mandatory):是否强制性,

当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者
当mandatory设置为false时,出现上述情形broker会直接将消息扔掉

参数4(immediate):是否立即处理

当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者

也就是说,mandatory 标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

参数5(msg):发布的消息,ContentType是传输类型,Body是发送的消息。

2.3.2 消费者

  • 声明通道
ch, err := ConsumerMQ.Channel()
  • 创建队列
q, err := ch.QueueDeclare("hello",false,false,false,false,nil,
)
  • 读取队列消息
msgs, err := ch.Consume(q.Name,"",    true,  false,  false,  false, nil,
)

由于消费者端需要一直监听,所以我们要用一个for循环+channel去阻塞主进程,使得主进程一直处于监听状态。

forever := make(chan bool)
go func() {for d := range msgs {fmt.Printf("Received a message: %s", d.Body)}
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

2.3.3 结果

  • 生产者

  • 消费者

2.4 Work Queues 模型

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work queues模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

2.4.1 生产者

生成10条消息到队列中

body := "Hello World!  "
for i := 0; i < 10; i++ {msg := strconv.Itoa(i)err = ch.Publish("",     // 交换机q.Name, // 队列名字false,  // 是否强制性false,  // 是否立刻amqp.Publishing{ContentType: "text/plain",Body:        []byte(body+msg), // 发送的消息})
}

2.4.2 消费者

创建两个一样的消费者进行监听消费,与上面2.3.2的消费者保持一致

2.4.3 结果

消费者1号

消费者2号

2.5 Publish/Subscribe 模型

fanout 扇出 也称为广播

在广播模式下,消息发送流程如下:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

2.5.1 生产者

  • 声明交换机
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)

参数1(name):交换机名称
参数2(kind):交换机类型

  • 生产消息
_ = ch.Publish("logs", "", false, false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

2.5.2 消费者

  • 声明交换机
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil, )
  • 声明队列
q, _ := ch.QueueDeclare("", false, false, true, false, nil, )
  • 绑定交换机
_ = ch.QueueBind(q.Name, "", "logs", false, nil, )
  • 消费消息
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

2.5.3 结果

  • 生产者

  • 消费者

2.6 Routing 模型

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

2.6.1 生产者

_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_direct", "", false, false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

2.6.2 消费者

  • 只接受warn
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "warn", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
  • 只接受info
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "info", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

2.7 Topics 模型

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

  • 统配符
    * 匹配不多不少恰好1个词
    # 匹配一个或多个词
  • 如:
    fan.# 匹配 fan.one.two 或者 fan.one 等
    fan.* 只能匹配 fan.one

2.7.1 生产者

_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_topic", "", false, false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

2.7.2 消费者

  • 只接受*.one
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.one", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
  • 只接受*.fan
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.fan", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

2.8 RPC 模型

日后补充

【RabbitMQ】Go语言实现六种消息中间件模型相关推荐

  1. R语言构建文本分类模型:文本数据预处理、构建词袋模型(bag of words)、构建xgboost文本分类模型、基于自定义函数构建xgboost文本分类模型

    R语言构建文本分类模型:文本数据预处理.构建词袋模型(bag of words).构建xgboost文本分类模型.基于自定义函数构建xgboost文本分类模型 目录

  2. R语言构建随机森林模型randomForest分类模型并评估模型在测试集和训练集上的效果(accuray、F1、偏差Deviance):随机森林在Bagging算法的基础上加入了列采样(分枝特征随机)

    R语言构建随机森林模型randomForest分类模型并评估模型在测试集和训练集上的效果(accuray.F1.偏差Deviance):随机森林在Bagging算法的基础上加入了列采样(分枝特征随机) ...

  3. R语言构建logistic回归模型:构建模型公式、拟合logistic回归模型、模型评估,通过混淆矩阵计算precision、enrichment、recall指标

    R语言构建logistic回归模型:构建模型公式.拟合logistic回归模型.模型评估,通过混淆矩阵计算precision.enrichment.recall指标 目录

  4. R语言广义加性模型(GAMs:Generalized Additive Model)建模:数据加载、划分数据、并分别构建线性回归模型和广义线性加性模型GAMs、并比较线性模型和GAMs模型的性能

    R语言广义加性模型(GAMs:Generalized Additive Model)建模:数据加载.划分数据.并分别构建线性回归模型和广义线性加性模型GAMs.并比较线性模型和GAMs模型的性能 目录

  5. R语言构建文本分类模型并使用LIME进行模型解释实战:文本数据预处理、构建词袋模型、构建xgboost文本分类模型、基于文本训练数据以及模型构建LIME解释器解释多个测试语料的预测结果并可视化

    R语言构建文本分类模型并使用LIME进行模型解释实战:文本数据预处理.构建词袋模型.构建xgboost文本分类模型.基于文本训练数据以及模型构建LIME解释器解释多个测试语料的预测结果并可视化 目录

  6. R语言构建随机森林模型错误解决:Error in y - ymean : non-numeric argument to binary operator

    R语言构建随机森林模型错误解决:Error in y - ymean : non-numeric argument to binary operator 目录 R语言构建随机森林模型错误解决:Erro ...

  7. R语言广义加性模型GAMs:可视化每个变量的样条函数、样条函数与变量与目标变量之间的平滑曲线比较、并进行多变量的归一化比较、测试广义线性加性模型GAMs在测试集上的表现(防止过拟合)

    R语言广义加性模型GAMs:可视化每个变量的样条函数.样条函数与变量与目标变量之间的平滑曲线比较.并进行多变量的归一化比较.测试广义线性加性模型GAMs在测试集上的表现(防止过拟合) 目录

  8. R语言广义加性模型(generalized additive models,GAMs):使用广义线性加性模型GAMs构建logistic回归

    R语言广义加性模型(generalized additive models,GAMs):使用广义线性加性模型GAMs构建logistic回归 目录

  9. R语言构建logistic回归模型:WVPlots包PRTPlot函数可视化获取logistic回归模型的最优阈值、优化(precision、enrichment)和recall之间的折衷

    R语言构建logistic回归模型:WVPlots包PRTPlot函数可视化获取logistic回归模型的最佳阈值(改变阈值以优化精确度(precision.enrichment)和查全率(recal ...

最新文章

  1. c++根据二叉树的层次遍历建立二叉树_LeetCode | 102.二叉树的层次遍历
  2. ubnutu18.10拔除硬盘后进行recovery mode
  3. 如何快速删除打印机任务
  4. c语言最大公约数和最小公倍数_五年级奥数课堂之七:公因数和公倍数
  5. 大工17秋《计算机文化基础》在线测试1,奥鹏大工15秋《计算机文化基础》在线测试1答案...
  6. mysql能将查询结果与表左查询_mysql重点,表查询操作和多表查询
  7. Scanner类(next,nextLine,nextInt)的用法与常见问题
  8. 网卡设备状态显示错误代码56
  9. 综述 | 结肠细胞代谢如何塑造肠道菌群
  10. 计算机win7卡顿如何解决方法,win7电脑玩2D游戏经常发生卡顿六大解决方法
  11. Linux 上安装iib 9.0
  12. 图灵奖得主Geoffrey Hinton最新研究:利用神经网络对有关节三维模型进行估计的方法NASA
  13. iOS开发 App内自动连接指定Wi-Fi
  14. 怎样对一个项目进行成本管理,具体步骤是啥?
  15. 鲸探发布点评:8月24日发售《LuLu猪西游》系列数字藏品
  16. Browsers Series_2_Firefox
  17. java ssssss,在Java 7中将字符串日期转换为yyyy-MM-dd’T’HH:mm:ss.SSSSSS格式的字符串...
  18. 【情态动词练习题】mustn‘t 和 don‘t have to
  19. radius mysql md5_在Free Radius的PAP认证过程中使用MD5密码
  20. 【微信小程序AR】基于Kivicube零代码实现微信小程序AR

热门文章

  1. Visual Studio.NET 术语大全
  2. 教师资格证报名网页兼容问题
  3. 2020年焊工(技师)考试内容及焊工(技师)考试申请表
  4. ubuntu 16.04网速慢解决方案
  5. php+创建微信标签,微信公众号——创建标签,给粉丝打标签。
  6. 一句话说清NI公司的LabView、CVI、MeasurementStudio三者之间的区别
  7. 大规模敏捷导入工具实践
  8. W3Shool学习 - 初级教程
  9. 【神DP】-ZOJ-3623-Battle Ships
  10. 百度地图 驾车导航用来生成路线 轨迹回放(LuShu)