转载自:http://www.cnblogs.com/shi-meng/p/4800080.html

1:驱动

  本来打算自己写一个驱动的,后来发现github上面已经有了,那我就直接拿现成的了, 驱动采用 github.com/streadway/amqp ,直接import就可以啦!

2:exchange and queue

  在上一篇文章中,我们已经创建好virtualhost 、exchange and queue,所以我们先定义这些常量

  

const (queueName = "push.msg.q"exchange  = "t.msg.ex"mqurl ="amqp://shi:123@192.168.232.130:5672/test"
)
var conn *amqp.Connectionvar channel *amqp.Channel
 

3:错误处理

  

func failOnErr(err error, msg string) {
    if err != nil {
        log.Fatalf("%s:%s", msg, err)
        panic(fmt.Sprintf("%s:%s", msg, err))
    }
}

4:连接mq

func mqConnect() {var err errorconn, err = amqp.Dial(mqurl)failOnErr(err, "failed to connect tp rabbitmq")channel, err = conn.Channel()failOnErr(err, "failed to open a channel")
}

5:push

  先上代码:

func push() {if channel == nil {mqConnect()}msgContent := "hello world!"channel.Publish(exchange, queueName, false, false, amqp.Publishing{ContentType: "text/plain",Body:        []byte(msgContent),})
}

  其实是很简单的,调用 channel函数的Publish方法,传入exchange name 和 queue name,最后一个参数是消息内容,ContentType我们设置为text/plain, 为文本类型,body是消息内容,要传入字节数组,这样就完成了一条消息的push,接下来我们再看receive

6:receive

代码:

func receive() {if channel == nil {mqConnect()}msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)failOnErr(err, "")forever := make(chan bool)go func() {//fmt.Println(*msgs)for d := range msgs {s := BytesToString(&(d.Body))count++fmt.Printf("receve msg is :%s -- %d\n", *s, count)}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")<-forever
}

  通过调用channel.Consume函数返回一个接受消息的chan类型管道,然后range 这个chan,接收到的数据是[]byte,转换为string后输出

  <-forever  这个是为了控制当前线程不退出

7:入口main

  

func main() {go func() {for {push()time.Sleep(1 * time.Second)}}()receive()fmt.Println("end")close()
}

  for 循环保证每秒发送一条消息到mq,这个地方采用协程保证不阻塞主线程。receive函数不能采用协程,不然主线程就退出了。close函数是释放连接对象,但是在这个例子中是没有起效的,因为线程永远都不会自动退出,只能认为的CTRL+C 或者程序死掉,系统重启

8:执行:

切换到go文件目录执行

go run main.go
//运行日志

receve msg is :hello world! -- 1246
receve msg is :hello world! -- 1247
receve msg is :hello world! -- 1248
receve msg is :hello world! -- 1249
receve msg is :hello world! -- 1250
receve msg is :hello world! -- 1251
receve msg is :hello world! -- 1252
receve msg is :hello world! -- 1253
receve msg is :hello world! -- 1254
receve msg is :hello world! -- 1255
receve msg is :hello world! -- 1256
receve msg is :hello world! -- 1257
receve msg is :hello world! -- 1258
receve msg is :hello world! -- 1259
receve msg is :hello world! -- 1260
receve msg is :hello world! -- 1261
receve msg is :hello world! -- 1262
receve msg is :hello world! -- 1263
receve msg is :hello world! -- 1264
receve msg is :hello world! -- 1265
receve msg is :hello world! -- 1266

9:全部代码

package mainimport ("fmt""log""bytes""time""github.com/streadway/amqp"
)var conn *amqp.Connection
var channel *amqp.Channel
var count = 0const (queueName = "push.msg.q"exchange  = "t.msg.ex"mqurl ="amqp://shi:123@192.168.232.130:5672/test"
)func main() {go func() {for {push()time.Sleep(1 * time.Second)}}()receive()fmt.Println("end")close()
}func failOnErr(err error, msg string) {if err != nil {log.Fatalf("%s:%s", msg, err)panic(fmt.Sprintf("%s:%s", msg, err))}
}func mqConnect() {var err errorconn, err = amqp.Dial(mqurl)failOnErr(err, "failed to connect tp rabbitmq")channel, err = conn.Channel()failOnErr(err, "failed to open a channel")
}func close() {channel.Close()conn.Close()
}//连接rabbitmq server
func push() {if channel == nil {mqConnect()}msgContent := "hello world!"channel.Publish(exchange, queueName, false, false, amqp.Publishing{ContentType: "text/plain",Body:        []byte(msgContent),})
}func receive() {if channel == nil {mqConnect()}msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)failOnErr(err, "")forever := make(chan bool)go func() {//fmt.Println(*msgs)for d := range msgs {s := BytesToString(&(d.Body))count++fmt.Printf("receve msg is :%s -- %d\n", *s, count)}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")<-forever
}func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)r := s.String()return &r
}

转载于:https://www.cnblogs.com/tianlongtc/p/8822946.html

(转)go rabbitmq实践相关推荐

  1. 绝对详细的 RabbitMQ 实践操作手册(一)

    绝对详细的 RabbitMQ 实践操作手册,看完本系列就够了. 一.什么是MQ ? 1.MQ的概念 2.理解消息队列 二.MQ的优势和劣势 1.优势和作用 2.劣势 三.MQ的应用场景 四.AMQP ...

  2. SpringBoot 整合 RabbitMQ 实践

    作者:Cott www.cnblogs.com/cott/p/12402423.html 前文总结了Dubbo与SpringBoot的整合,本文基于这套框架,引入RabbitMQ消息队列. 1.在li ...

  3. 消息队列中间件之RabbitMQ(上)

    文章目录 1.MQ引言 1.1 什么是MQ 1.2 主流MQ以及其特点 ActiveMQ Kafka RocketMQ RabbitMQ 1.3 MQ的作用 2.RabbitMQ 的引言 2.1 Ra ...

  4. java数组遍历赋值,最终入职阿里

    Part 1消息队列 介绍消息队列技术的背景,包括使用场景和消息队列的功能特点,并设计了一个简单的消息队列. 1.1 系统间通信技术介绍 1.2 为何要用消息队列 1.3 消息队列的功能特点 1.4 ...

  5. 【月报】Java知音的四月汇总

    往期: Java知音的十月:[月报]Java知音十月汇总 Java知音的十一月:[月报]Java知音十一月汇总 Java知音的十二月:[月报]Java知音十二月汇总 Java知音的一月:[月报]Jav ...

  6. Kafka必须掌握的核心技术:简述Java线程池的作用和实现方式

    Part 1消息队列 介绍消息队列技术的背景,包括使用场景和消息队列的功能特点,并设计了一个简单的消息队列. 1.1 系统间通信技术介绍 1.2 为何要用消息队列 1.3 消息队列的功能特点 1.4 ...

  7. 每周一看:16份文档资料,程序员软硬实力全概览,总有一个适合你

    技术分享 程序员,没有硬实力怎么可以,这是一切都基础,首要条件,所以在工作之余,我会把一些比较好玩的,或者身边的朋友问的相应的问题,我会通过源码的形式展示出来,但是因为个人能力有限,也不知道该写啥,所 ...

  8. docker修改端口映射,技术总监都拍手叫好

    Part 1消息队列 介绍消息队列技术的背景,包括使用场景和消息队列的功能特点,并设计了一个简单的消息队列. 1.1 系统间通信技术介绍 1.2 为何要用消息队列 1.3 消息队列的功能特点 1.4 ...

  9. 未读消息(小红点),前端 与 RabbitMQ 实时消息推送实践,贼简单~

    前几天粉丝群里有个小伙伴问过:web 页面的未读消息(小红点)怎么实现比较简单,刚好本周手头有类似的开发任务,索性就整理出来供小伙伴们参考,没准哪天就能用得上呢. 之前在 <springboot ...

最新文章

  1. 两种ICP的改进算法:PLICP与NICP
  2. tomcat 5.5 jdbc myeclipse8.0
  3. 在dos下用csc命令编译,提示“csc不是内部或外部命令,也不是可运行的程序... ”
  4. C++(Goto使用实例)
  5. FLASH开发[00]
  6. 《vue+vant 文本超出两行部分省略号显示》
  7. 华中科技大学2005年计算机组成原理试题,华中科技大学200年计算机组成原理考研试题.doc...
  8. from injection shell sql to_FROM SQL INJECTION TO SHELL: POSTGRESQL EDITION
  9. 基础04继承、super、this、抽象类
  10. pycharm小技巧
  11. WPF中实现图片文件转换成Visual对象,Viewport3D对象转换成图片
  12. arm linux 优化
  13. 数据中台落地问题与建议-数字化架构设计(2)
  14. Linux上配置Gaussian的方法
  15. 国内ERP技术公司综合
  16. python中的def是什么意思啊_python中的def是什么意思
  17. ad中按钮开关的符号_收藏:电路图形符号大全!!!
  18. win10系统怎么恢复出厂设置,电脑重置win10系统
  19. 计算机取消健,电脑取消撤销快捷键是什么_电脑健盘中的所有英文组合意思超值解释建议收藏......
  20. php pdo 与对比mysql,php使用mysqli和pdo扩展,测试对比连接mysql数据库的效率完整示例...

热门文章

  1. 我以为自己MySQL够牛逼了,直到被腾讯面试官狠狠抽了两巴掌
  2. rc时间常数定义_低通滤波中RC时间常数设定,我是这么思考的
  3. 流放者柯南rust_《流放者柯南》评测8.0分 胯下生风的沙盒生存游戏
  4. Visual Studio 2019 许可证过期解决方法
  5. 计算机网络常见面试问题和解析
  6. java 摄像头 视频_javaCV-摄像头实时视频
  7. 使用Google时区API显示任何城市的实时本地时间
  8. Spark RDD用法
  9. 光猫-新版水星路由器配置(WiFi连接不上后)
  10. php高仿网络硬盘,高仿永硕网盘E盘源码