使用go语言基于redis写了一个简单的消息队列
源码地址
使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

添加数据和获取数据的操作也是非常简单的
LPUSH 从左边插入数据
RPUSH 大右边插入数据
LPOP 从左边取出一个数据
RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,
如果这期间有数据写入,则会读取并返回,没有数据则会返回空
在一个窗口1读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个窗口2写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个窗口3读取,第二次读取时,list是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer消费掉?

当然不会,因为redis是单线程的,在从list取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用list实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用LPOP或者BLPOP都可以,
这里做了一个开关 options 的UseBLopp如果为true时会使用BLPOP

type consumer struct {once            sync.OnceredisCmd        redis.Cmdablectx             context.ContexttopicName       stringhandler         HandlerrateLimitPeriod time.Durationoptions         ConsumerOptions_               struct{}
}type ConsumerOptions struct {RateLimitPeriod time.DurationUseBLPop        bool
}

看一下创建consumer的代码,最后面的opts参数是可选的配置

type Consumer = *consumerfunc NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {consumer := &consumer{redisCmd:  redisCmd,ctx:       ctx,topicName: topicName,}for _, o := range opts {o(&consumer.options)}if consumer.options.RateLimitPeriod == 0 {consumer.options.RateLimitPeriod = time.Microsecond * 200}return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理
有一个小的interface调用者根据自己的逻辑去实现

type Handler interface {HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {go func() {ticker := time.NewTicker(s.options.RateLimitPeriod)defer func() {log.Println("stop get message.")ticker.Stop()}()for {select {case <-s.ctx.Done():log.Printf("context Done msg: %#v \n", s.ctx.Err())returncase <-ticker.C:var revBody []bytevar err errorif !s.options.UseBLPop {revBody, err = s.redisCmd.LPop(s.topicName).Bytes()} else {revs := s.redisCmd.BLPop(time.Second, s.topicName)err = revs.Err()revValues := revs.Val()if len(revValues) >= 2 {revBody = []byte(revValues[1])}}if err == redis.Nil {continue}if err != nil {log.Printf("LPOP error: %#v \n", err)continue}if len(revBody) == 0 {continue}msg := &Message{}json.Unmarshal(revBody, msg)if s.handler != nil {s.handler.HandleMessage(msg)}}}}()
}

Producer 的实现

Producer还是很简单的就是把数据推送到 reids

更多干货 请点击查看

教你如何玩转redis-简单消息队列相关推荐

  1. redis简单队列java_使用Redis的简单消息队列

    redis简单队列java 在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medi ...

  2. 使用Redis的简单消息队列

    在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medium和Large. 在第一个 ...

  3. php redis zset 延迟队列_PHP + Redis 实现简单消息队列

    Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感. 应用场景有即时数据分析.秒杀计数器.缓存等. Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费 ...

  4. Redis做消息队列,香吗?

    来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...

  5. 用redis实现消息队列(实时消费+ack机制)【转】

    用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...

  6. 使用Redis 实现消息队列

    一 .为什么要用Redis实现轻量级MQ? MQ的主要作用: 应用解耦 异步化消息 流量削峰填谷 目前使用比较多的是ActiveMQ . RabbitMQ . ZeroMQ . Kafka . Met ...

  7. 【BCVP】实现基于 Redis 的消息队列

    聆听自己的声音 如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯. 话说上回书我们说到了,Redis的使用修改<[BCVP更新]Sta ...

  8. 程序员过关斩将--redis做消息队列,香吗?

    菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的redis做的MQ,很 ...

  9. java 结合redis队列_在 Java 中使用 redis 的消息队列服务

    前言 关于 redis 我们前面已经讨论过了缓存.分布式锁.分布式唯一标识.LBS服务的用法,这里我们来谈谈利用 redis 来实现一个消息服务. 典型的消息服务是一个生产者和消费者模式的服务.一般是 ...

最新文章

  1. 实验三 JSP应用开发进阶
  2. springmvc学习笔记--Interceptor机制和实践
  3. 双向带环带头结点的链表实现栈
  4. Red Hat 8.0中设置光盘为软件源
  5. 小程序·云开发实战 - 校园约拍小程序
  6. [转载] Java三元运算符示例
  7. SoC嵌入式软件架构设计II:否MMU的CPU虚拟内存管理的设计与实现方法
  8. Elastic Job 入门
  9. 正常网页开发如何解除父容器中子容器的浮动问题
  10. 用adb pull命令从android系统中读取文件失败的原因及解决办法
  11. 几家大的券商的PB系统以及算法交易概况大致是怎样的?
  12. 如何在html中制作播放按钮,HTML5+CSS3网页实例:制作网页播放器按钮
  13. 使用C#存储数据时excel有Microsoft切换到了wps时的引用
  14. 一文搞懂Matlab的3种取整函数(round、ceil、floor)
  15. NoSQL 简介及什么是AICD
  16. github网站进不去怎么办
  17. tp5的时间查询,查询时间戳是否在某一天中
  18. 响应式卡片悬停效果 html+css
  19. 平面解析几何----过抛物线上一点作互相垂直的两条直线交抛物线与点AB,AB恒过定点P的坐标
  20. Office2Pdf工具开发

热门文章

  1. 分区和分片的区别_数据库的分表、分库、分片和分区等区别
  2. 廖的python教程_廖雪峰的Python3.x教程.pdf
  3. java reflectionutils_ReflectionUtils工具类-装载
  4. python整数池_【Python】Python中神奇的小整数对象池和大整数对象池
  5. springframework引入不进来_啥?你不知道JWT
  6. 安卓手机充电慢_3.0适用苹果安卓手机充电器头
  7. WebApi与Mvc的区别
  8. 多天线技术是LTE的重要演进方向已成为产业共识
  9. 台湾证券交易开通运营现代化数据中心
  10. SQL查询分析器使用