实现延迟消息具体思路我是看的下面这篇文章

https://mp.weixin.qq.com/s/eDMV25YqCPYjxQG-dvqSqQ

实现延迟消息最主要的两个结构:

环形队列:通过golang中的数组实现,分成3600个slot。

任务集合:通过map[key]*Task,每个slot一个map,map的值就是我们要执行的任务。

原理图如下:

实现代码如下:

package main;

import (

"time"

"errors"

"fmt"

)

//延迟消息

type DelayMessage struct {

//当前下标

curIndex int;

//环形槽

slots [3600]map[string]*Task;

//关闭

closed chan bool;

//任务关闭

taskClose chan bool;

//时间关闭

timeClose chan bool;

//启动时间

startTime time.Time;

}

//执行的任务函数

type TaskFunc func(args ...interface{});

//任务

type Task struct {

//循环次数

cycleNum int;

//执行的函数

exec TaskFunc;

params []interface{};

}

//创建一个延迟消息

func NewDelayMessage() *DelayMessage {

dm := &DelayMessage{

curIndex: 0,

closed: make(chan bool),

taskClose: make(chan bool),

timeClose: make(chan bool),

startTime: time.Now(),

};

for i := 0; i < 3600; i++ {

dm.slots[i] = make(map[string]*Task);

}

return dm;

}

//启动延迟消息

func (dm *DelayMessage) Start() {

go dm.taskLoop();

go dm.timeLoop();

select {

case

{

dm.taskClose

dm.timeClose

break;

}

};

}

//关闭延迟消息

func (dm *DelayMessage) Close() {

dm.closed

}

//处理每1秒的任务

func (dm *DelayMessage) taskLoop() {

defer func() {

fmt.Println("taskLoop exit");

}();

for {

select {

case

{

return;

}

default:

{

//取出当前的槽的任务

tasks := dm.slots[dm.curIndex];

if len(tasks) > 0 {

//遍历任务,判断任务循环次数等于0,则运行任务

//否则任务循环次数减1

for k, v := range tasks {

if v.cycleNum == 0 {

go v.exec(v.params...);

//删除运行过的任务

delete(tasks, k);

} else {

v.cycleNum--;

}

}

}

}

}

}

}

//处理每1秒移动下标

func (dm *DelayMessage) timeLoop() {

defer func() {

fmt.Println("timeLoop exit");

}();

tick := time.NewTicker(time.Second);

for {

select {

case

{

return;

}

case

{

fmt.Println(time.Now().Format("2006-01-02 15:04:05"));

//判断当前下标,如果等于3599则重置为0,否则加1

if dm.curIndex == 3599 {

dm.curIndex = 0;

} else {

dm.curIndex++;

}

}

}

}

}

//添加任务

func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []interface{}) error {

if dm.startTime.After(t) {

return errors.New("时间错误");

}

//当前时间与指定时间相差秒数

subSecond := t.Unix() - dm.startTime.Unix();

//计算循环次数

cycleNum := int(subSecond / 3600);

//计算任务所在的slots的下标

ix := subSecond % 3600;

//把任务加入tasks中

tasks := dm.slots[ix];

if _, ok := tasks[key]; ok {

return errors.New("该slots中已存在key为" + key + "的任务");

}

tasks[key] = &Task{

cycleNum: cycleNum,

exec: exec,

params: params,

};

return nil;

}

func main() {

//创建延迟消息

dm := NewDelayMessage();

//添加任务

dm.AddTask(time.Now().Add(time.Second*10), "test1", func(args ...interface{}) {

fmt.Println(args...);

}, []interface{}{1, 2, 3});

dm.AddTask(time.Now().Add(time.Second*10), "test2", func(args ...interface{}) {

fmt.Println(args...);

}, []interface{}{4, 5, 6});

dm.AddTask(time.Now().Add(time.Second*20), "test3", func(args ...interface{}) {

fmt.Println(args...);

}, []interface{}{"hello", "world", "test"});

dm.AddTask(time.Now().Add(time.Second*30), "test4", func(args ...interface{}) {

sum := 0;

for arg := range args {

sum += arg;

}

fmt.Println("sum : ", sum);

}, []interface{}{1, 2, 3});

//40秒后关闭

time.AfterFunc(time.Second*40, func() {

dm.Close();

});

dm.Start();

}

测试结果如下:

golang延时_golang 实现延迟消息原理与方法相关推荐

  1. 面试常问Rocketmq延迟消息原理

    延迟消息在业务场景中使用的非常多,订单失效,过期通知等功能都可以借助延迟消息机制来实现.本文将从源码层面来分析Rocketmq的延迟消息实现原理机制. 一.延迟消息的使用    ​    ​    ​ ...

  2. golang延时_Golang 定时器底层实现深度剖析

    本文将基于 Golang 源码对 Timer 的底层实现进行深度剖析.主要包含以下内容: 1. Timer 和 Ticker 在 Golang 中的底层实现细节,包括数据结构等选型. 2. 分析 ti ...

  3. 【Golang】实现企业微信消息通知的方法(可在接口中调用)

    1.问题背景 1.在一次实际的项目中,需要在一个应用中发送消息提醒相关人员提交对应的表单.于是我查阅了企业微信开发文档有关消息提醒部分,然后按照其指导,实现了如下效果: 2.解决方案: 首先我们需要创 ...

  4. RocketMQ延迟消息的代码实战及原理分析

    RocketMQ简介 RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的.高可靠.万亿级容量.灵活可伸缩的消息发布与订阅服务. 它前身是MetaQ,是阿里基于Kafka ...

  5. RocketMQ源码解析之延迟消息实现原理

    原创不易,转载请注明出处 文章目录 前言 1.延时消息的demo 2.实现的原理 前言 今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场 ...

  6. rocktmq 消息延时清空_RocketMQ-延时消息

    一.延时消息的使用 使用比较简单,指定message的DelayTimeLevel即可.示例代码如下: Message msg = new Message("DelayTopicTest&q ...

  7. 【RocketMQ】延迟消息(延迟队列)

    文章目录 1. 什么是延迟消息 1.1 延时消息的使用场景 2. 示例 3. 原理 参考 1. 什么是延迟消息 发送消息后,消费者要等待一定的时间才能消费到该消息. RocketMQ 不支持任意时间自 ...

  8. 深入理解RocketMQ延迟消息

    延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...

  9. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

最新文章

  1. Unity Remote使用方法
  2. Mansory算法分析
  3. DELL OME监控服务器安装配置
  4. Vue.js的的理解及优缺点
  5. 原创 | 万万没想到,JVM内存结构的面试题可以问的这么难?
  6. Lync server 2013新建持久聊天室提示用户未启用SIP
  7. C语言malloc的用法和意义
  8. 通过CuteFTP用VBScript使用SFTP,实现Win与Linux的文件传输
  9. 计算机信息管理发展的重要性,建设计算机信息管理系统的意义和目标
  10. 零基础C语言入门012——关系运算符,大于小于
  11. 径向渐变加阴影html,CSS3 径向渐变(radial-gradient)
  12. CyanogenMod源码编译
  13. itunes下载的软件所在目录
  14. java 代码箭头代表什么_箭头运算符' - '在Java中做什么?
  15. Mac自带Safari浏览器如何清除缓存
  16. 去除停用词并绘制词云图
  17. FlexiTimer2库下载 无偿 分享 仅供学习
  18. Java入门之Digital eigenvalue
  19. 【文献阅读】Remote Power Attacks on the Versatile Tensor Accelerator in Multi-Tenant FPGAs
  20. 逻辑英语结构【重点】

热门文章

  1. 插入的数据不能时时查询到_数据库原理笔记
  2. 怎样理解 MVVM ( Model-View-ViewModel ) ?
  3. 基于UML的面向对象分析与设计
  4. linux如何编译wine,利用winelib编译一个可在linux下运行的程序
  5. 怎么把外部参照合并到图纸_做电气设计的,拿到建筑图,怎么下手呢?
  6. 广西大学计算机类开设课程,操作系统教学大纲-广西大学计算机与电子信息学院.DOC...
  7. 用Java 编写菜单价格和的程序_使用JAVA 编写一个程序,显示5中商品价格,用户可以选择多种商品并在其后的文本框输入购买的数量。...
  8. 深度学习之基于卷积神经网络实现花朵识别
  9. [蓝桥杯][算法提高VIP]合并石子(区间dp+平行四边形优化)
  10. 计算机2级学的是什么时候出来的,2019计算机二级考试科目有哪些 什么时候出成绩...