第五课 实战go语言改造php仿优酷-RabbitMQ改造项目

tags:

  • Beego
  • 慕课网

categories:

  • RabbitMQ
  • 五种工作模式

文章目录

  • 第五课 实战go语言改造php仿优酷-RabbitMQ改造项目
    • 第一节 消息队列介绍
      • 1.1 消息队列的好处
      • 1.2 消息队列的应用场景
      • 1.3消息队列的相关术语
      • 1.4消息队列的工作模式
    • 第二节 简单模式和work工作模式
      • 2.1 封装接受端和发送端
      • 2.2 简单发送和工作模式实现
      • 2.3 持久化和手动应答
    • 第三节 订阅、路由和主题模式
      • 3.1 交换机的发送端和接收端
      • 3.2 订阅模式
      • 3.3 路由模式
      • 3.4 主题模式
    • 第四节 死信队列
      • 4.1 死信队列介绍
      • 4.2 私信队列消费端实现
      • 4.3 私信队列生产端实现
      • 4.4 私信队列使用
    • 第五节 改造功能
      • 5.1 改造评论功能
      • 5.2 改造批量发送消息功能

第一节 消息队列介绍

1.1 消息队列的好处

  1. 跨语言的,支持PHP,GO,JAVA,C# ,Python …
  2. 解耦
  3. 异步
  4. 削峰

1.2 消息队列的应用场景

  1. 跨系统间的调用(比如:发短信,防止由于第三方平台原因导致我们系统的逻辑问题)
  2. 系统内的异步调用 比如发邮件功能
  3. 消息驱动的场景 当满足一个条件后,触发一系列的操作
  4. 跨语言之间的调用

1.3消息队列的相关术语

  1. Producer生产者 发送消息到队列
  2. Consumer消费者 从队列中取出消息消费掉
  3. Queue 存储消息的容器 队列 消息的载体
  4. Channel消息通道 RabbitMQ建立连接 连接中可以有多个通道
  5. Exchange交换机 决定消息按照什么规则发送到哪个队列中
  6. Routing Key 交换机通过它来确定发送到哪个队列中

1.4消息队列的工作模式

  1. 简单模式

  2. 工作模式 高并发时容易导致同一个消息到两个消费者手中(可以加唯一键值解决)

  3. 订阅模式 每个队列的消息都是一样的 E代表交换机

  4. 路由模式 根据routing key发送到不同的消息队列中

  5. 主题模式 根据routing key分类,发送到不同的消息队列中

第二节 简单模式和work工作模式

2.1 封装接受端和发送端

  1. 实现发送端,不断向MQ中发送自增数字
  2. 实现接收端,接受消息然后打印到命令行中
  3. 启动两个接收端,争抢消息(启动两个客户端即可)
  4. services/mq/Mq.go
package mqimport ("bytes""fmt""github.com/streadway/amqp"
)type Callback func(msg string)func Connect() (*amqp.Connection, error) {conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")return conn, err
}//发送端函数
func Publish(exchange string, queueName string, body string) error {//建立连接conn, err := Connect()if err != nil {return err}defer conn.Close()//创建通道channelchannel, err := conn.Channel()if err != nil {return err}defer channel.Close()//创建队列q, err := channel.QueueDeclare(queueName,true, // 是否持久化false,false,false,nil,)if err != nil {return err}//发送消息err = channel.Publish(exchange, q.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})return err
}//接受者方法
func Consumer(exchange string, queueName string, callback Callback) {//建立连接conn, err := Connect()defer conn.Close()if err != nil {fmt.Println(err)return}//创建通道channelchannel, err := conn.Channel()defer channel.Close()if err != nil {fmt.Println(err)return}//创建queueq, err := channel.QueueDeclare(queueName,true,false,false,false,nil,)if err != nil {fmt.Println(err)return}msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)if err != nil {fmt.Println(err)return}forever := make(chan bool)go func() {for d := range msgs {s := BytesToString(&(d.Body))callback(*s)d.Ack(false)}}()fmt.Printf("Waiting for messages")<-forever
}func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)r := s.String()return &r
}

2.2 简单发送和工作模式实现

  1. controllers/mqDemo不断的发送数据到队列fyouku_demo中
package controllersimport ("demo/services/mq""strconv""time""github.com/astaxie/beego"
)type MqDemoController struct {beego.Controller
}//简单模式和work工作模式 push方法
// @router /mq/push [*]
func (this *MqDemoController) GetMq() {go func() {count := 0for {// 交换机名字为空 队列名为fyouku_demo 信息mq.Publish("", "fyouku_demo", "hello"+strconv.Itoa(count))count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("hello")
}
  1. 接受端代码实现。这里需要单独go run (启动一个是简单模式,两个是工作模式)
package mainimport ("demo/services/mq""fmt"
)func main() {mq.Consumer("", "fyouku_demo", callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}

2.3 持久化和手动应答

  1. 必须发送端和接受端都设置为持久化才可以。
// 发送端//创建队列q, err := channel.QueueDeclare(queueName,true, // 是否持久化false,false,false,nil,)//发送消息err = channel.Publish(exchange, q.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久化ContentType:  "text/plain",Body:         []byte(body),})// 接收端
//创建queueq, err := channel.QueueDeclare(queueName,true, // 持久化false,false,false,nil,)
  1. 手动应答。假如消费者取出消息之后,消费者挂掉了,如果自动应答这个消息就丢失了(业务端没有处理).
// 消费者// 第三个参数 为true自动应答 false手动应答msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)// d.Ack(false) go func() {for d := range msgs {s := BytesToString(&(d.Body))callback(*s)d.Ack(false)}}()

第三节 订阅、路由和主题模式

3.1 交换机的发送端和接收端

  1. 这三个模式都用到了交换机Exchange
// 交换机名称 交换机类型(决定了哪种模式) 路由的key 内容
func PublishEx(exchange string, types string, routingKey string, body string) error {//建立连接conn, err := Connect()defer conn.Close()if err != nil {return err}//创建channelchannel, err := conn.Channel()defer channel.Close()if err != nil {return err}//创建交换机err = channel.ExchangeDeclare(exchange, // nametypes,    // 类型true,     // 是否持久化false,false,false,nil,)if err != nil {return err}err = channel.Publish(exchange, routingKey, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})return err
}// 消费端
func ConsumerEx(exchange string, types string, routingKey string, callback Callback) {//建立连接conn, err := Connect()defer conn.Close()if err != nil {fmt.Println(err)return}//创建通道channelchannel, err := conn.Channel()defer channel.Close()if err != nil {fmt.Println(err)return}//创建交换机err = channel.ExchangeDeclare(exchange,types,true,false,false,false,nil,)if err != nil {fmt.Println(err)return}//创建队列q, err := channel.QueueDeclare("", // 临时队列没有名字false,false,true,false,nil,)if err != nil {fmt.Println(err)return}//把队列和交换机绑定起来err = channel.QueueBind(q.Name,     // 队列名routingKey, // 路由exchange,   // 交换机false,nil,)if err != nil {fmt.Println(err)return}msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)if err != nil {fmt.Println(err)return}forever := make(chan bool)go func() {for d := range msgs {s := BytesToString(&(d.Body))callback(*s)d.Ack(false)}}()fmt.Printf("Waiting for messages\n")<-forever
}

3.2 订阅模式

  1. 发送端发送
//订阅模式push 发送端实现
// @router /mq/fanout/push [*]
func (this *MqDemoController) GetFanout() {go func() {count := 0for {// 交换机名 交换机类型 订阅模式没有路由为空mq.PublishEx("fyouku.demo.fanout", "fanout", "", "fanout"+strconv.Itoa(count))count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("fanout")
  1. 接收端接收(启动两个接收端,发现两个接收端收到内容相同)
package mainimport ("demo/services/mq""fmt"
)func main() {// 消费者绑定交换机mq.ConsumerEx("fyouku.demo.fanout", "fanout", "", callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}

3.3 路由模式

  1. 发送端,奇数发送到交换机,路由是one
  2. 发送端,偶数发送到交换机,路由是two
  3. 交换机,分别路由到不同的队列
  4. 发送端发送 “direct”
//路由模式push
// @router /mq/direct/push [*]
func (this *MqDemoController) GetDirect() {go func() {count := 0for {if count%2 == 0 {// 这里类型direct 路由twomq.PublishEx("fyouku.demo.direct", "direct", "two", "direct"+strconv.Itoa(count))} else {// 这里类型direct 路由onemq.PublishEx("fyouku.demo.direct", "direct", "one", "direct"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("direct")
  1. 接收端接收创建两个 一个路由为one 一个路由为two
package mainimport ("demo/services/mq""fmt"
)func main() {mq.ConsumerEx("fyouku.demo.direct", "direct", "one", callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}

3.4 主题模式

  1. 发送key为fyouku.video和user.fyouku的消息
  2. 发送key为a.frog.name和b.frog.user的消息
  3. 实现#接受所有和*接受部分的功能
  4. 发送端"topic"
//topic主题模式
// @router /mq/topic/push [*]
func (this *MqDemoController) GetTopic() {go func() {count := 0for {if count%2 == 0 {mq.PublishEx("fyouku.demo.topic", "topic", "fyouku.video", "fyouku.video"+strconv.Itoa(count))} else {mq.PublishEx("fyouku.demo.topic", "topic", "user.fyouku", "user.fyouku"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("topic")
}// @router /mq/topictwo/push [*]
func (this *MqDemoController) GetTopicTwo() {go func() {count := 0for {if count%2 == 0 {mq.PublishEx("fyouku.demo.topic", "topic", "a.frog.name", "a.frog.name"+strconv.Itoa(count))} else {mq.PublishEx("fyouku.demo.topic", "topic", "b.frog.uid", "b.frog.uid"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("topictwo")
}
  1. 接收端。所有的路由用# 如果只获取frog主题的用 .frog.
package mainimport ("demo/services/mq""fmt"
)func main() {// 获取所有的路由用# 如果只获取frog主题的用 *.frog.*mq.ConsumerEx("fyouku.demo.topic", "topic", "#", callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}

第四节 死信队列

4.1 死信队列介绍

  1. 死信队列使用场景。(必须掌握)

    • 发送消息后,规定十分钟后发给用户。
    • 规定发送消息每天固定时间发给用户。
    • 订单下完后没有支付,30分钟后取消订单。
    • 下单后定时收到系统的提示消息。
  2. 在A队列上设置消息过期时间TTL,绑定到B交换机。这里B交换机就被成为死信交换机,B队列就被成为死信队列。
    • 死信产生的条件不止是TTL过期,还有消息被拒绝和队列达到最大长度都会产生死信。

4.2 私信队列消费端实现

// 死信队列消费端
func ConsumerDlx(exchangeA string, queueAName string, exchangeB string, queueBName string, ttl int, callback Callback) {//建立连接conn, err := Connect()if err != nil {fmt.Println(err)return}defer conn.Close()//创建一个Channelchannel, err := conn.Channel()if err != nil {fmt.Println(err)return}defer channel.Close()//创建A交换机//创建A队列//A交换机和A队列绑定err = channel.ExchangeDeclare(exchangeA, // name"fanout",  // typetrue,      // durablefalse,     // auto-deletedfalse,     // internalfalse,     // no-waitnil,       // arguments)if err != nil {fmt.Println(err)return}//创建一个queue,指定消息过期时间,并且绑定过期以后发送到那个交换机queueA, err := channel.QueueDeclare(queueAName, // nametrue,       // durablefalse,      // delete when ususedfalse,      // exclusivefalse,      // no-waitamqp.Table{// 当消息过期时把消息发送到 exchangeB"x-dead-letter-exchange": exchangeB,"x-message-ttl":          ttl,//"x-dead-letter-queue" : queueBName,//"x-dead-letter-routing-key" :},)if err != nil {fmt.Println(err)return}//A交换机和A队列绑定err = channel.QueueBind(queueA.Name, // queue name"",          // routing keyexchangeA,   // exchangefalse,nil,)if err != nil {fmt.Println(err)return}//创建B交换机//创建B队列//B交换机和B队列绑定err = channel.ExchangeDeclare(exchangeB, // name"fanout",  // typetrue,      // durablefalse,     // auto-deletedfalse,     // internalfalse,     // no-waitnil,       // arguments)if err != nil {fmt.Println(err)return}//创建一个queuequeueB, err := channel.QueueDeclare(queueBName, // nametrue,       // durablefalse,      // delete when ususedfalse,      // exclusivefalse,      // no-waitnil,        // arguments)if err != nil {fmt.Println(err)return}//B交换机和B队列绑定err = channel.QueueBind(queueB.Name, // queue name"",          // routing keyexchangeB,   // exchangefalse,nil,)if err != nil {fmt.Println(err)return}msgs, err := channel.Consume(queueB.Name, "", false, false, false, false, nil)if err != nil {fmt.Println(err)return}forever := make(chan bool)go func() {for d := range msgs {s := BytesToString(&(d.Body))callback(*s)d.Ack(false)}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")<-forever
}

4.3 私信队列生产端实现

func PublishDlx(exchangeA string, body string) error {//建立连接conn, err := Connect()if err != nil {return err}defer conn.Close()//创建一个Channelchannel, err := conn.Channel()if err != nil {return err}defer channel.Close()//消息发送到A交换机err = channel.Publish(exchangeA, "", false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})return err
}

4.4 私信队列使用

  1. 服务器端发送
//死信队列push
// @router /mq/dlx/push [*]
func (this *MqDemoController) GetDlx() {go func() {count := 0for {mq.PublishDlx("fyouku.dlx.a", "dlx"+strconv.Itoa(count))count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("dlx")
}// @router /mq/dlx/two/push [*]
func (this *MqDemoController) GetTwoDlx() {go func() {count := 0for {mq.PublishEx("fyouku.dlx.b", "fanout", "", "dlxtwo"+strconv.Itoa(count))count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("dlxTwo")
}
  1. 客户端接收
package mainimport ("demo/services/mq""fmt"
)func main() {// A交换 A队列 B交换 B队列 过期时间 回调函数mq.ConsumerDlx("fyouku.dlx.a", "fyouku_dlx_a", "fyouku.dlx.b", "fyouku_dlx_b", 10000, callback)
}func callback(s string) {fmt.Printf("msg is :%s\n", s)
}

第五节 改造功能

5.1 改造评论功能

  1. 改造点:发送评论后更新排行榜的数据。

    • 排行榜没有实时跟新的要求。
    • 更新排行榜可能造成评论接口响应时间边长。造成用户体验差…
    • 可以用MQ实现解耦。增加用户体验
  2. 更新排行榜往MQ推送消息。简单模式
func SaveComment(content string, uid int, episodesId int, videoId int) error {o := orm.NewOrm()var comment Commentcomment.Content = contentcomment.UserId = uidcomment.EpisodesId = episodesIdcomment.VideoId = videoIdcomment.Stamp = 0comment.Status = 1comment.AddTime = time.Now().Unix()_, err := o.Insert(&comment)if err == nil {//修改视频的总评论数o.Raw("UPDATE video SET comment=comment+1 WHERE id=?", videoId).Exec()//修改视频剧集的评论数o.Raw("UPDATE video_episodes SET comment=comment+1 WHERE id=?", episodesId).Exec()//更新redis排行榜 - 通过MQ来实现//创建一个简单模式的MQ//把要传递的数据转换为json字符串videoObj := map[string]int{"VideoId": videoId,}videoJson, _ := json.Marshal(videoObj)mq.Publish("", "fyouku_top", string(videoJson))}return err
}
  1. 消费消息更新排行榜。
package mainimport ("encoding/json""fmt""fyoukuApi/models""fyoukuApi/services/mq"redisClient "fyoukuApi/services/redis""strconv""github.com/astaxie/beego""github.com/astaxie/beego/orm"_ "github.com/go-sql-driver/mysql"
)func main() {beego.LoadAppConfig("ini", "../../conf/app.conf")defaultdb := beego.AppConfig.String("defaultdb")orm.RegisterDriver("mysql", orm.DRMySQL)orm.RegisterDataBase("default", "mysql", defaultdb, 30, 30)mq.Consumer("", "fyouku_top", callback)
}func callback(s string) {type Data struct {VideoId int}var data Dataerr := json.Unmarshal([]byte(s), &data)videoInfo, err := models.RedisGetVideoInfo(data.VideoId)if err == nil {conn := redisClient.PoolConnect()defer conn.Close()//更新排行榜redisChannelKey := "video:top:channel:channelId:" + strconv.Itoa(videoInfo.ChannelId)redisTypeKey := "video:top:type:typeId:" + strconv.Itoa(videoInfo.TypeId)conn.Do("zincrby", redisChannelKey, 1, data.VideoId)conn.Do("zincrby", redisTypeKey, 1, data.VideoId)}fmt.Printf("msg is :%s\n", s)
}

5.2 改造批量发送消息功能

  1. 改造点:假如发送给100万用户,实时响应可能不可能。必须异步。
  2. 简单模式
//保存消息接收人到队列中
func SendMessageUserMq(userId int, messageId int64) {//把数据转换成json字符串type Data struct {UserId    intMessageId int64}var data Datadata.UserId = userIddata.MessageId = messageIddataJson, _ := json.Marshal(data)mq.Publish("", "fyouku_send_message_user", string(dataJson))
  1. 消费发送消息
package mainimport ("encoding/json""fmt""fyoukuApi/models""fyoukuApi/services/mq""github.com/astaxie/beego""github.com/astaxie/beego/orm"_ "github.com/go-sql-driver/mysql"
)func main() {beego.LoadAppConfig("ini", "../../conf/app.conf")defaultdb := beego.AppConfig.String("defaultdb")orm.RegisterDriver("mysql", orm.DRMySQL)orm.RegisterDataBase("default", "mysql", defaultdb, 30, 30)mq.Consumer("", "fyouku_send_message_user", callback)
}func callback(s string) {type Data struct {UserId    intMessageId int64}var data Dataerr := json.Unmarshal([]byte(s), &data)if err == nil {models.SendMessageUser(data.UserId, data.MessageId)}fmt.Printf("msg is :%s\n", s)
}

第五课 实战go语言改造php仿优酷-RabbitMQ改造项目相关推荐

  1. 第四课 实战go语言改造php仿优酷-Redis改造优化接口

    第四课 实战go语言改造php仿优酷-Redis改造优化接口 tags: Beego 慕课网 categories: redis 文章目录 第四课 实战go语言改造php仿优酷-Redis改造优化接口 ...

  2. 第三课 go语言改到php仿优酷-阿里云视频上传功能

    第三课 go语言改到php仿优酷-阿里云视频上传功能 tags: Beego 慕课网 categories: go环境安装 Beego框架 阿里云 视频点播服务 文章目录 第三课 go语言改到php仿 ...

  3. 第二课 Beego仿优酷-go环境安装和Beego框架基础

    第二课 Beego仿优酷-go环境安装和Beego框架基础 tags: Beego 慕课网 categories: go环境安装 Beego框架 文章目录 第二课 Beego仿优酷-go环境安装和Be ...

  4. 优酷IPv6改造纪实:视频行业首家拥抱下一代网络技术

    阿里妹导读:2018年双11前,优酷开启了IPv6的大门.9月份PC端业务开启灰度,迎来首位IPv6 VIP用户后,优酷移动客户端也马不停蹄地加入灰度大军.从0到1,花了几个月:从10到1000,花了 ...

  5. 优酷IPv6改造纪实:视频行业首家拥抱下一代网络技术...

    阿里妹导读:2018年双11前,优酷开启了IPV6的大门.9月份PC端业务开启灰度,迎来首位IPV6 VIP用户后,优酷移动客户端也马不停蹄地加入灰度大军.从0到1,花了几个月:从10到1000,花了 ...

  6. C语言笔记 第三十五课 数组参数和指针参数分析

    第三十五课 数组参数和指针参数分析 思考:为什么C语言中的数组参数会退化为指针? 退化的意义 C语言中只会以值拷贝的方式传输参数 当向函数传递数组时:(错误的,设计当初的思路) 将整个数组拷贝一份传入 ...

  7. C语言学习第十五课(文件操作)

    第十五课 1,文件的概念 文件一般指存储在外部介质(如磁盘磁带)上的集合: 2,流的概念 ·操作系统是以文件为单位对数据进行管理的,输入输出是数据传送的过程,数据如流水一般从一处流向另一处,因此将输入 ...

  8. 视频教程-Kali Linux渗透测试全程课与脚本语言编程系列课程-渗透测试

    Kali Linux渗透测试全程课与脚本语言编程系列课程 本人有多年的服务器高级运维与开发经验,擅长计算机与服务器攻防及网络攻防技术!对网络安全领域有持续的关注和研究! 林晓炜 ¥899.00 立即订 ...

  9. 第五课.Linux开发基础

    第五课目录 GCC用法参考 GCC的常用用法 make与Makefile Makefile的规则 Makefile的语法 Makefile实例 调试 GDB 设置断点 虽然已经有很多优秀的IDE可以化 ...

最新文章

  1. 【NAACL2021】Graph4NLP:图深度学习自然语言处理(附ppt)
  2. Quartus II 8.1 详解--有图---图片详解 【1讲】
  3. android mapping.txt,Android根据mapping.txt还原混淆的代码
  4. (转) Hadoop1.2.1安装
  5. java web项目自动部署到Tomcat的原因
  6. 推荐 7 个超棒的监控工具
  7. centos6.3安装Samba及权限
  8. class with pointer
  9. SU草图大师必备实用插件,拿走不谢!
  10. 【转】 精密贴片电阻阻值对照表
  11. 实验2-1-2 温度转换 (5 分)
  12. Pr:音频和视频的同步
  13. 618运动装备推荐、这几款产品都是运动必备
  14. 万维钢:怎样做读书笔记
  15. 供应链安全这件事,早就被朱元璋玩明白了
  16. android studio lint,Android Studio Lint 工具
  17. 2021csgo网页开箱网站有哪些?csgo靠谱的开箱网站大全
  18. 工业线上赛(2022省赛)
  19. 梯度下降法python
  20. m4a html 播放器,HTML5 Audio m4a

热门文章

  1. IntelliJ IDEA 学习笔记 - Default Settings
  2. Nested嵌套对象类型还挺实用
  3. 北大最会读书的人《何帆的读书笔记》-推荐200本书籍清单,让你快速提高阅读能力
  4. 使用 UDP 数据包发送消息
  5. java 手机号替换_Java正则替换手机号代码实例
  6. 命令行休眠计算机,电脑用powercfg.exe命令关闭win10系统休眠的方法
  7. 如何连接局域网_世上最强远程访问和局域网共享技术!
  8. 戴尔G3 3579笔记本无法使用耳机上的麦克风/声卡驱动异常/无声音/声卡版本不兼容 的一个解决办法
  9. Flutter-适配相关
  10. 网易开发三年,现跳槽蚂蚁花呗,4面顺利通过,拿下Java岗offer