第五课 实战go语言改造php仿优酷-RabbitMQ改造项目
第五课 实战go语言改造php仿优酷-RabbitMQ改造项目
tags:
- Beego
- 慕课网
categories:
- RabbitMQ
- 五种工作模式
文章目录
- 第五课 实战go语言改造php仿优酷-RabbitMQ改造项目
- 第一节 消息队列介绍
- 1.1 消息队列的好处
- 1.2 消息队列的应用场景
- 1.3消息队列的相关术语
- 1.4消息队列的工作模式
- 第二节 简单模式和work工作模式
- 2.1 封装接受端和发送端
- 2.2 简单发送和工作模式实现
- 2.3 持久化和手动应答
- 第三节 订阅、路由和主题模式
- 3.1 交换机的发送端和接收端
- 3.2 订阅模式
- 3.3 路由模式
- 3.4 主题模式
- 第四节 死信队列
- 4.1 死信队列介绍
- 4.2 私信队列消费端实现
- 4.3 私信队列生产端实现
- 4.4 私信队列使用
- 第五节 改造功能
- 5.1 改造评论功能
- 5.2 改造批量发送消息功能
第一节 消息队列介绍
1.1 消息队列的好处
- 跨语言的,支持PHP,GO,JAVA,C# ,Python …
- 解耦
- 异步
- 削峰
1.2 消息队列的应用场景
- 跨系统间的调用(比如:发短信,防止由于第三方平台原因导致我们系统的逻辑问题)
- 系统内的异步调用 比如发邮件功能
- 消息驱动的场景 当满足一个条件后,触发一系列的操作
- 跨语言之间的调用
1.3消息队列的相关术语
- Producer生产者 发送消息到队列
- Consumer消费者 从队列中取出消息消费掉
- Queue 存储消息的容器 队列 消息的载体
- Channel消息通道 RabbitMQ建立连接 连接中可以有多个通道
- Exchange交换机 决定消息按照什么规则发送到哪个队列中
- Routing Key 交换机通过它来确定发送到哪个队列中
1.4消息队列的工作模式
简单模式
工作模式 高并发时容易导致同一个消息到两个消费者手中(可以加唯一键值解决)
订阅模式 每个队列的消息都是一样的 E代表交换机
路由模式 根据routing key发送到不同的消息队列中
主题模式 根据routing key分类,发送到不同的消息队列中
第二节 简单模式和work工作模式
2.1 封装接受端和发送端
- 实现发送端,不断向MQ中发送自增数字
- 实现接收端,接受消息然后打印到命令行中
- 启动两个接收端,争抢消息(启动两个客户端即可)
- services/mq/Mq.go
package mqimport ("bytes""fmt""github.com/streadway/amqp"
)type Callback func(msg string)func Connect() (*amqp.Connection, error) {conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")return conn, err
}//发送端函数
func Publish(exchange string, queueName string, body string) error {//建立连接conn, err := Connect()if err != nil {return err}defer conn.Close()//创建通道channelchannel, err := conn.Channel()if err != nil {return err}defer channel.Close()//创建队列q, err := channel.QueueDeclare(queueName,true, // 是否持久化false,false,false,nil,)if err != nil {return err}//发送消息err = channel.Publish(exchange, q.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType: "text/plain",Body: []byte(body),})return err
}//接受者方法
func Consumer(exchange string, queueName string, callback Callback) {//建立连接conn, err := Connect()defer conn.Close()if err != nil {fmt.Println(err)return}//创建通道channelchannel, err := conn.Channel()defer channel.Close()if err != nil {fmt.Println(err)return}//创建queueq, err := channel.QueueDeclare(queueName,true,false,false,false,nil,)if err != nil {fmt.Println(err)return}msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)if err != nil {fmt.Println(err)return}forever := make(chan bool)go func() {for d := range msgs {s := BytesToString(&(d.Body))callback(*s)d.Ack(false)}}()fmt.Printf("Waiting for messages")<-forever
}func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)r := s.String()return &r
}
2.2 简单发送和工作模式实现
- controllers/mqDemo不断的发送数据到队列fyouku_demo中
package controllersimport ("demo/services/mq""strconv""time""github.com/astaxie/beego"
)type MqDemoController struct {beego.Controller
}//简单模式和work工作模式 push方法
// @router /mq/push [*]
func (this *MqDemoController) GetMq() {go func() {count := 0for {// 交换机名字为空 队列名为fyouku_demo 信息mq.Publish("", "fyouku_demo", "hello"+strconv.Itoa(count))count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("hello")
}
- 接受端代码实现。这里需要单独go run (启动一个是简单模式,两个是工作模式)
package mainimport ("demo/services/mq""fmt"
)func main() {mq.Consumer("", "fyouku_demo", callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}
2.3 持久化和手动应答
- 必须发送端和接受端都设置为持久化才可以。
// 发送端//创建队列q, err := channel.QueueDeclare(queueName,true, // 是否持久化false,false,false,nil,)//发送消息err = channel.Publish(exchange, q.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久化ContentType: "text/plain",Body: []byte(body),})// 接收端
//创建queueq, err := channel.QueueDeclare(queueName,true, // 持久化false,false,false,nil,)
- 手动应答。假如消费者取出消息之后,消费者挂掉了,如果自动应答这个消息就丢失了(业务端没有处理).
// 消费者// 第三个参数 为true自动应答 false手动应答msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)// d.Ack(false) go func() {for d := range msgs {s := BytesToString(&(d.Body))callback(*s)d.Ack(false)}}()
第三节 订阅、路由和主题模式
3.1 交换机的发送端和接收端
- 这三个模式都用到了交换机Exchange。
// 交换机名称 交换机类型(决定了哪种模式) 路由的key 内容
func PublishEx(exchange string, types string, routingKey string, body string) error {//建立连接conn, err := Connect()defer conn.Close()if err != nil {return err}//创建channelchannel, err := conn.Channel()defer channel.Close()if err != nil {return err}//创建交换机err = channel.ExchangeDeclare(exchange, // nametypes, // 类型true, // 是否持久化false,false,false,nil,)if err != nil {return err}err = channel.Publish(exchange, routingKey, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType: "text/plain",Body: []byte(body),})return err
}// 消费端
func ConsumerEx(exchange string, types string, routingKey string, callback Callback) {//建立连接conn, err := Connect()defer conn.Close()if err != nil {fmt.Println(err)return}//创建通道channelchannel, err := conn.Channel()defer channel.Close()if err != nil {fmt.Println(err)return}//创建交换机err = channel.ExchangeDeclare(exchange,types,true,false,false,false,nil,)if err != nil {fmt.Println(err)return}//创建队列q, err := channel.QueueDeclare("", // 临时队列没有名字false,false,true,false,nil,)if err != nil {fmt.Println(err)return}//把队列和交换机绑定起来err = channel.QueueBind(q.Name, // 队列名routingKey, // 路由exchange, // 交换机false,nil,)if err != nil {fmt.Println(err)return}msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)if err != nil {fmt.Println(err)return}forever := make(chan bool)go func() {for d := range msgs {s := BytesToString(&(d.Body))callback(*s)d.Ack(false)}}()fmt.Printf("Waiting for messages\n")<-forever
}
3.2 订阅模式
- 发送端发送
//订阅模式push 发送端实现
// @router /mq/fanout/push [*]
func (this *MqDemoController) GetFanout() {go func() {count := 0for {// 交换机名 交换机类型 订阅模式没有路由为空mq.PublishEx("fyouku.demo.fanout", "fanout", "", "fanout"+strconv.Itoa(count))count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("fanout")
- 接收端接收(启动两个接收端,发现两个接收端收到内容相同)
package mainimport ("demo/services/mq""fmt"
)func main() {// 消费者绑定交换机mq.ConsumerEx("fyouku.demo.fanout", "fanout", "", callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}
3.3 路由模式
- 发送端,奇数发送到交换机,路由是one
- 发送端,偶数发送到交换机,路由是two
- 交换机,分别路由到不同的队列
- 发送端发送 “direct”
//路由模式push
// @router /mq/direct/push [*]
func (this *MqDemoController) GetDirect() {go func() {count := 0for {if count%2 == 0 {// 这里类型direct 路由twomq.PublishEx("fyouku.demo.direct", "direct", "two", "direct"+strconv.Itoa(count))} else {// 这里类型direct 路由onemq.PublishEx("fyouku.demo.direct", "direct", "one", "direct"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("direct")
- 接收端接收创建两个 一个路由为one 一个路由为two
package mainimport ("demo/services/mq""fmt"
)func main() {mq.ConsumerEx("fyouku.demo.direct", "direct", "one", callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}
3.4 主题模式
- 发送key为fyouku.video和user.fyouku的消息
- 发送key为a.frog.name和b.frog.user的消息
- 实现#接受所有和*接受部分的功能
- 发送端"topic"
//topic主题模式
// @router /mq/topic/push [*]
func (this *MqDemoController) GetTopic() {go func() {count := 0for {if count%2 == 0 {mq.PublishEx("fyouku.demo.topic", "topic", "fyouku.video", "fyouku.video"+strconv.Itoa(count))} else {mq.PublishEx("fyouku.demo.topic", "topic", "user.fyouku", "user.fyouku"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("topic")
}// @router /mq/topictwo/push [*]
func (this *MqDemoController) GetTopicTwo() {go func() {count := 0for {if count%2 == 0 {mq.PublishEx("fyouku.demo.topic", "topic", "a.frog.name", "a.frog.name"+strconv.Itoa(count))} else {mq.PublishEx("fyouku.demo.topic", "topic", "b.frog.uid", "b.frog.uid"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("topictwo")
}
- 接收端。所有的路由用# 如果只获取frog主题的用 .frog.
package mainimport ("demo/services/mq""fmt"
)func main() {// 获取所有的路由用# 如果只获取frog主题的用 *.frog.*mq.ConsumerEx("fyouku.demo.topic", "topic", "#", callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}
第四节 死信队列
4.1 死信队列介绍
- 死信队列使用场景。(必须掌握)
- 发送消息后,规定十分钟后发给用户。
- 规定发送消息每天固定时间发给用户。
- 订单下完后没有支付,30分钟后取消订单。
- 下单后定时收到系统的提示消息。
- 在A队列上设置消息过期时间TTL,绑定到B交换机。这里B交换机就被成为死信交换机,B队列就被成为死信队列。
- 死信产生的条件不止是TTL过期,还有消息被拒绝和队列达到最大长度都会产生死信。
- 死信产生的条件不止是TTL过期,还有消息被拒绝和队列达到最大长度都会产生死信。
4.2 私信队列消费端实现
// 死信队列消费端
func ConsumerDlx(exchangeA string, queueAName string, exchangeB string, queueBName string, ttl int, callback Callback) {//建立连接conn, err := Connect()if err != nil {fmt.Println(err)return}defer conn.Close()//创建一个Channelchannel, err := conn.Channel()if err != nil {fmt.Println(err)return}defer channel.Close()//创建A交换机//创建A队列//A交换机和A队列绑定err = channel.ExchangeDeclare(exchangeA, // name"fanout", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Println(err)return}//创建一个queue,指定消息过期时间,并且绑定过期以后发送到那个交换机queueA, err := channel.QueueDeclare(queueAName, // nametrue, // durablefalse, // delete when ususedfalse, // exclusivefalse, // no-waitamqp.Table{// 当消息过期时把消息发送到 exchangeB"x-dead-letter-exchange": exchangeB,"x-message-ttl": ttl,//"x-dead-letter-queue" : queueBName,//"x-dead-letter-routing-key" :},)if err != nil {fmt.Println(err)return}//A交换机和A队列绑定err = channel.QueueBind(queueA.Name, // queue name"", // routing keyexchangeA, // exchangefalse,nil,)if err != nil {fmt.Println(err)return}//创建B交换机//创建B队列//B交换机和B队列绑定err = channel.ExchangeDeclare(exchangeB, // name"fanout", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Println(err)return}//创建一个queuequeueB, err := channel.QueueDeclare(queueBName, // nametrue, // durablefalse, // delete when ususedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {fmt.Println(err)return}//B交换机和B队列绑定err = channel.QueueBind(queueB.Name, // queue name"", // routing keyexchangeB, // exchangefalse,nil,)if err != nil {fmt.Println(err)return}msgs, err := channel.Consume(queueB.Name, "", false, false, false, false, nil)if err != nil {fmt.Println(err)return}forever := make(chan bool)go func() {for d := range msgs {s := BytesToString(&(d.Body))callback(*s)d.Ack(false)}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")<-forever
}
4.3 私信队列生产端实现
func PublishDlx(exchangeA string, body string) error {//建立连接conn, err := Connect()if err != nil {return err}defer conn.Close()//创建一个Channelchannel, err := conn.Channel()if err != nil {return err}defer channel.Close()//消息发送到A交换机err = channel.Publish(exchangeA, "", false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType: "text/plain",Body: []byte(body),})return err
}
4.4 私信队列使用
- 服务器端发送
//死信队列push
// @router /mq/dlx/push [*]
func (this *MqDemoController) GetDlx() {go func() {count := 0for {mq.PublishDlx("fyouku.dlx.a", "dlx"+strconv.Itoa(count))count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("dlx")
}// @router /mq/dlx/two/push [*]
func (this *MqDemoController) GetTwoDlx() {go func() {count := 0for {mq.PublishEx("fyouku.dlx.b", "fanout", "", "dlxtwo"+strconv.Itoa(count))count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("dlxTwo")
}
- 客户端接收
package mainimport ("demo/services/mq""fmt"
)func main() {// A交换 A队列 B交换 B队列 过期时间 回调函数mq.ConsumerDlx("fyouku.dlx.a", "fyouku_dlx_a", "fyouku.dlx.b", "fyouku_dlx_b", 10000, callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}
第五节 改造功能
5.1 改造评论功能
- 改造点:发送评论后更新排行榜的数据。
- 排行榜没有实时跟新的要求。
- 更新排行榜可能造成评论接口响应时间边长。造成用户体验差…
- 可以用MQ实现解耦。增加用户体验
- 更新排行榜往MQ推送消息。简单模式
func SaveComment(content string, uid int, episodesId int, videoId int) error {o := orm.NewOrm()var comment Commentcomment.Content = contentcomment.UserId = uidcomment.EpisodesId = episodesIdcomment.VideoId = videoIdcomment.Stamp = 0comment.Status = 1comment.AddTime = time.Now().Unix()_, err := o.Insert(&comment)if err == nil {//修改视频的总评论数o.Raw("UPDATE video SET comment=comment+1 WHERE id=?", videoId).Exec()//修改视频剧集的评论数o.Raw("UPDATE video_episodes SET comment=comment+1 WHERE id=?", episodesId).Exec()//更新redis排行榜 - 通过MQ来实现//创建一个简单模式的MQ//把要传递的数据转换为json字符串videoObj := map[string]int{"VideoId": videoId,}videoJson, _ := json.Marshal(videoObj)mq.Publish("", "fyouku_top", string(videoJson))}return err
}
- 消费消息更新排行榜。
package mainimport ("encoding/json""fmt""fyoukuApi/models""fyoukuApi/services/mq"redisClient "fyoukuApi/services/redis""strconv""github.com/astaxie/beego""github.com/astaxie/beego/orm"_ "github.com/go-sql-driver/mysql"
)func main() {beego.LoadAppConfig("ini", "../../conf/app.conf")defaultdb := beego.AppConfig.String("defaultdb")orm.RegisterDriver("mysql", orm.DRMySQL)orm.RegisterDataBase("default", "mysql", defaultdb, 30, 30)mq.Consumer("", "fyouku_top", callback)
}func callback(s string) {type Data struct {VideoId int}var data Dataerr := json.Unmarshal([]byte(s), &data)videoInfo, err := models.RedisGetVideoInfo(data.VideoId)if err == nil {conn := redisClient.PoolConnect()defer conn.Close()//更新排行榜redisChannelKey := "video:top:channel:channelId:" + strconv.Itoa(videoInfo.ChannelId)redisTypeKey := "video:top:type:typeId:" + strconv.Itoa(videoInfo.TypeId)conn.Do("zincrby", redisChannelKey, 1, data.VideoId)conn.Do("zincrby", redisTypeKey, 1, data.VideoId)}fmt.Printf("msg is :%s\n", s)
}
5.2 改造批量发送消息功能
- 改造点:假如发送给100万用户,实时响应可能不可能。必须异步。
- 简单模式
//保存消息接收人到队列中
func SendMessageUserMq(userId int, messageId int64) {//把数据转换成json字符串type Data struct {UserId intMessageId int64}var data Datadata.UserId = userIddata.MessageId = messageIddataJson, _ := json.Marshal(data)mq.Publish("", "fyouku_send_message_user", string(dataJson))
- 消费发送消息
package mainimport ("encoding/json""fmt""fyoukuApi/models""fyoukuApi/services/mq""github.com/astaxie/beego""github.com/astaxie/beego/orm"_ "github.com/go-sql-driver/mysql"
)func main() {beego.LoadAppConfig("ini", "../../conf/app.conf")defaultdb := beego.AppConfig.String("defaultdb")orm.RegisterDriver("mysql", orm.DRMySQL)orm.RegisterDataBase("default", "mysql", defaultdb, 30, 30)mq.Consumer("", "fyouku_send_message_user", callback)
}func callback(s string) {type Data struct {UserId intMessageId int64}var data Dataerr := json.Unmarshal([]byte(s), &data)if err == nil {models.SendMessageUser(data.UserId, data.MessageId)}fmt.Printf("msg is :%s\n", s)
}
第五课 实战go语言改造php仿优酷-RabbitMQ改造项目相关推荐
- 第四课 实战go语言改造php仿优酷-Redis改造优化接口
第四课 实战go语言改造php仿优酷-Redis改造优化接口 tags: Beego 慕课网 categories: redis 文章目录 第四课 实战go语言改造php仿优酷-Redis改造优化接口 ...
- 第三课 go语言改到php仿优酷-阿里云视频上传功能
第三课 go语言改到php仿优酷-阿里云视频上传功能 tags: Beego 慕课网 categories: go环境安装 Beego框架 阿里云 视频点播服务 文章目录 第三课 go语言改到php仿 ...
- 第二课 Beego仿优酷-go环境安装和Beego框架基础
第二课 Beego仿优酷-go环境安装和Beego框架基础 tags: Beego 慕课网 categories: go环境安装 Beego框架 文章目录 第二课 Beego仿优酷-go环境安装和Be ...
- 优酷IPv6改造纪实:视频行业首家拥抱下一代网络技术
阿里妹导读:2018年双11前,优酷开启了IPv6的大门.9月份PC端业务开启灰度,迎来首位IPv6 VIP用户后,优酷移动客户端也马不停蹄地加入灰度大军.从0到1,花了几个月:从10到1000,花了 ...
- 优酷IPv6改造纪实:视频行业首家拥抱下一代网络技术...
阿里妹导读:2018年双11前,优酷开启了IPV6的大门.9月份PC端业务开启灰度,迎来首位IPV6 VIP用户后,优酷移动客户端也马不停蹄地加入灰度大军.从0到1,花了几个月:从10到1000,花了 ...
- C语言笔记 第三十五课 数组参数和指针参数分析
第三十五课 数组参数和指针参数分析 思考:为什么C语言中的数组参数会退化为指针? 退化的意义 C语言中只会以值拷贝的方式传输参数 当向函数传递数组时:(错误的,设计当初的思路) 将整个数组拷贝一份传入 ...
- C语言学习第十五课(文件操作)
第十五课 1,文件的概念 文件一般指存储在外部介质(如磁盘磁带)上的集合: 2,流的概念 ·操作系统是以文件为单位对数据进行管理的,输入输出是数据传送的过程,数据如流水一般从一处流向另一处,因此将输入 ...
- 视频教程-Kali Linux渗透测试全程课与脚本语言编程系列课程-渗透测试
Kali Linux渗透测试全程课与脚本语言编程系列课程 本人有多年的服务器高级运维与开发经验,擅长计算机与服务器攻防及网络攻防技术!对网络安全领域有持续的关注和研究! 林晓炜 ¥899.00 立即订 ...
- 第五课.Linux开发基础
第五课目录 GCC用法参考 GCC的常用用法 make与Makefile Makefile的规则 Makefile的语法 Makefile实例 调试 GDB 设置断点 虽然已经有很多优秀的IDE可以化 ...
最新文章
- 【NAACL2021】Graph4NLP:图深度学习自然语言处理(附ppt)
- Quartus II 8.1 详解--有图---图片详解 【1讲】
- android mapping.txt,Android根据mapping.txt还原混淆的代码
- (转) Hadoop1.2.1安装
- java web项目自动部署到Tomcat的原因
- 推荐 7 个超棒的监控工具
- centos6.3安装Samba及权限
- class with pointer
- SU草图大师必备实用插件,拿走不谢!
- 【转】 精密贴片电阻阻值对照表
- 实验2-1-2 温度转换 (5 分)
- Pr:音频和视频的同步
- 618运动装备推荐、这几款产品都是运动必备
- 万维钢:怎样做读书笔记
- 供应链安全这件事,早就被朱元璋玩明白了
- android studio lint,Android Studio Lint 工具
- 2021csgo网页开箱网站有哪些?csgo靠谱的开箱网站大全
- 工业线上赛(2022省赛)
- 梯度下降法python
- m4a html 播放器,HTML5 Audio m4a
热门文章
- IntelliJ IDEA 学习笔记 - Default Settings
- Nested嵌套对象类型还挺实用
- 北大最会读书的人《何帆的读书笔记》-推荐200本书籍清单,让你快速提高阅读能力
- 使用 UDP 数据包发送消息
- java 手机号替换_Java正则替换手机号代码实例
- 命令行休眠计算机,电脑用powercfg.exe命令关闭win10系统休眠的方法
- 如何连接局域网_世上最强远程访问和局域网共享技术!
- 戴尔G3 3579笔记本无法使用耳机上的麦克风/声卡驱动异常/无声音/声卡版本不兼容 的一个解决办法
- Flutter-适配相关
- 网易开发三年,现跳槽蚂蚁花呗,4面顺利通过,拿下Java岗offer