之前写过一个分布是任务调度系统,每次执行完任务都要对任务进行排序,使用最小堆确实优化了效率及cpu

项目中需要使用一个简单的定时任务调度的框架,最初直接从GitHub上搜了一个star比较多的,就是https://github.com/robfig/cron,目前有8000+ star。刚开始使用的时候发现问题不大,但是随着单机需要定时调度的任务越来越多,高峰期差不多接近500QPS,随着业务的推广使用,可以预期增长还会比较快,但是已经遇到CPU使用率偏高的问题,通过pprof分析,很多都是在做排序,看了下这个项目的代码,整体执行的过程大概如下:

对所有任务进行排序,按照下次执行时间进行排序

选择数组中第一个任务,计算下次执行时间减去当前时间得到时间t,然后sleep t

然后从数组第一个元素开始遍历任务,如果此任务需要调度的时间 < now,那么就执行此任务,执行之后重新计算这个任务的next执行时间

每次待执行的任务执行完毕之后,都会重新对这个数组进行排序

然后再循环从排好序的数组中找到第一个需要执行的任务去执行。

代码如下:

for {        // Determine the next entry to run.        sort.Sort(byTime(c.entries))

        var timer *time.Timer        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {            // If there are no entries yet, just sleep - it still handles new entries            // and stop requests.            timer = time.NewTimer(100000 * time.Hour)        } else {            timer = time.NewTimer(c.entries[0].Next.Sub(now))        }

        for {            select {            case now = <-timer.C:                now = now.In(c.location)                c.logger.Info("wake", "now", now)

                // Run every entry whose next time was less than now                for _, e := range c.entries {                    if e.Next.After(now) || e.Next.IsZero() {                        break                    }                    c.startJob(e.WrappedJob)                    e.Prev = e.Next                    e.Next = e.Schedule.Next(now)                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)                }

            case newEntry := <-c.add:                timer.Stop()                now = c.now()                newEntry.Next = newEntry.Schedule.Next(now)                c.entries = append(c.entries, newEntry)                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:                replyChan <- c.entrySnapshot()                continue

            case <-c.stop:                timer.Stop()                c.logger.Info("stop")                return

            case id := <-c.remove:                timer.Stop()                now = c.now()                c.removeEntry(id)                c.logger.Info("removed", "entry", id)            }

            break        }    }

问题就显而易见了,执行一个任务(或几个任务)都重新计算next执行时间,重新排序,最坏情况就是每次执行1个任务,排序一遍,那么执行k个任务需要的时间的时间复杂度就是O(k*nlogn),这无疑是非常低效的。

于是想着怎么优化一下这个框架,不难想到每次找最先需要执行的任务就是从一堆任务中找schedule_time最小的那一个(设schedule_time是任务的执行时间),那么比较容易想到的思路就是使用最小堆:

在初始化任务列表的时候就直接构建一个最小堆

每次执行查看peek元素是否需要执行

需要执行就pop堆顶元素,计算next执行时间,重新push入堆

不需要执行就break到外层循环取堆顶元素,计算next_time-now() = need_sleep_time,然后select 睡眠、add、remove等操作。

我修改为min-heap的方式之后,每次添加任务的时候通过堆的属性进行up和down调整,每次添加任务时间复杂度O(logn),执行k个任务时间复杂度是O(klogn)。经过验证线上CPU使用降低4~5倍。CPU从50%左右降低至10%左右。

优化后的代码如下,只是其中一部分。

全部的代码也已经在github上已经创建了一个Fork的仓库并推送上去了,全部单测Case也都PASS。感兴趣可以点过去看。https://github.com/tovenja/cron

    for {        // Determine the next entry to run.        // Use min-heap no need sort anymore

     // 这里不再需要排序了,因为add的时候直接进行堆调整     //sort.Sort(byTime(c.entries))

        var timer *time.Timer        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {            // If there are no entries yet, just sleep - it still handles new entries            // and stop requests.            timer = time.NewTimer(100000 * time.Hour)        } else {            timer = time.NewTimer(c.entries[0].Next.Sub(now))            //fmt.Printf(" %v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID)        }

        for {            select {            case now = <-timer.C:                now = now.In(c.location)                c.logger.Info("wake", "now", now)                // Run every entry whose next time was less than now                for {                    e := c.entries.Peek()                    if e.Next.After(now) || e.Next.IsZero() {                        break                    }                    e = heap.Pop(&c.entries).(*Entry)                    c.startJob(e.WrappedJob)                    e.Prev = e.Next                    e.Next = e.Schedule.Next(now)                    heap.Push(&c.entries, e)                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)                }

            case newEntry := <-c.add:                timer.Stop()                now = c.now()                newEntry.Next = newEntry.Schedule.Next(now)                heap.Push(&c.entries, newEntry)                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:                replyChan <- c.entrySnapshot()                continue

            case <-c.stop:                timer.Stop()                c.logger.Info("stop")                return

            case id := <-c.remove:                timer.Stop()                now = c.now()                c.removeEntry(id)                c.logger.Info("removed", "entry", id)            }

            break        }    }

转自:

cnblogs.com/aboutblank/p/14860571.html

更多好文 关注


本文由 mdnice 多平台发布

最小堆提升每次排序的效率相关推荐

  1. java 最小堆_堆排序 最大堆 最小堆 Java 实现

    堆 一点疑惑,堆排序是就地排序,所以空间复杂度是 O(1).但是,比如我有一个数组,建立一个最小堆,然后每次取出最小堆的顶点.建立最小堆需要额外空间? 不深究了,归并排序需要额外空间. 堆是完全二叉树 ...

  2. 最大堆,最小堆插入/删除以及最大堆的排序

    先说一下最大堆如何排序:转自:http://www.cnblogs.com/luchen927/archive/2012/03/08/2381446.html 最大堆和最小堆在算法中也有运用.比如用最 ...

  3. 《恋上数据结构第1季》二叉堆原理及实现、最小堆解决 TOP K 问题

    二叉堆 BinaryHeap 堆(Heap) 堆的出现 堆简介 二叉堆(Binary Heap) 获取最大值 最大堆 - 添加 最大堆 - 添加优化 最大堆 - 删除 replace 最大堆 - 批量 ...

  4. Java实现最小堆一

    2019独角兽企业重金招聘Python工程师标准>>> Java实现最小堆一 堆是一种经过排序的完全二叉树,其中任一非终端节点的数据值均不大于(或不小于)其左孩子和右孩子节点的值. ...

  5. leetcode 703. 数据流中的第K大元素 最小堆解法 c语言

    如题: 设计一个找到数据流中第K大元素的类(class).注意是排序后的第K大元素,不是第K个不同的元素. 你的 KthLargest 类需要一个同时接收整数 k 和整数数组nums 的构造器,它包含 ...

  6. 数据结构 - 最小堆最大堆

    可以在O(nlogn)的时间复杂度内完成排序 典型的用法是,寻找 第k个/前k个 最大/最小元素,k个有序序列合并 1.合并K个升序链表(最小堆实现) 或许可以改进成每次堆只存放K个元素? # Def ...

  7. Java最小堆解决TopK问题

    转载自  Java最小堆解决TopK问题 TopK问题是指从大量数据(源数据)中获取最大(或最小)的K个数据. TopK问题是个很常见的问题:例如学校要从全校学生中找到成绩最高的500名学生,再例如某 ...

  8. 【高性能定时器】时间堆(最小堆)

    最小堆及其应用:时间堆 最小堆及其应用:时间堆 一. 堆 1. 概念 2. 最小堆的实现 3. 性质 4. 代码 二.时间堆 1. 概念简述 2. 实现细节 3. 代码 一. 堆 1. 概念 堆是一种 ...

  9. 《徐徐道来话Java》:PriorityQueue和最小堆

    在讲解PriorityQueue之前,需要先熟悉一个有序数据结构:最小堆. 最小堆是一种经过排序的完全二叉树,其中任一非终端节点数值均不大于其左孩子和右孩子节点的值. 可以得出结论,如果一棵二叉树满足 ...

最新文章

  1. OkHttp3 websocket
  2. 重启IIS报错:IIS 服务或万维网发布服务,或者依赖这 服务可能在启动期间发生错误或者已禁用...
  3. HDU 1853 MCMF
  4. mini2440 裸机编程 -led
  5. 【Python3】Python模块与包的导入
  6. Scala 空返回值Unit
  7. gcc参数 -i, -L, -l, -include
  8. Node.js webpack
  9. 小米为什么拆分红米? | 畅言
  10. 2020-04-27 三种内存屏障 acquire barrier / release barrier / full barrier
  11. tcp压力测试工具_DNS压力测试工具之——DNSperf
  12. 東京タワー初めてphoto
  13. 【大数据部落】 用机器学习识别不断变化的股市状况—隐马尔可夫模型(HMM)股票指数预测实战
  14. cvf命令报错 linux,linux命令大全
  15. Docker-常用基础命令整理
  16. 迁移学习笔记3: TCA, Finetune, 与Triplet Network(元学习)
  17. 电脑屏幕亮度能否自动调节
  18. Numpy 简单教程(二)数组的形状(shape)操作
  19. 强制用户使用定向推送功能是什么意思?整改措施请收好
  20. 基于FPGA 的PLC/PAC 硬件运行时的实现

热门文章

  1. ES6-11随手记(1)
  2. xml文档的头部说明
  3. jQuery DOM操作与简单事件
  4. java list = list,关于Java:list list = NealAlayList.()的原因;
  5. oracle 次幂函数,excel幂函数-Excel如何进行乘幂运算,开方运算,比如9的6次幂,10000开4次方...
  6. 腾讯云 WebShell 体验
  7. Win10 1903 运行安卓模拟器蓝屏解决方案
  8. php 地图导航立即前往,帝国CMS完美替换“当前位置导航”#091;!--newsnav--]--墨鱼部落格...
  9. 专访网易美术专家:《阴阳师》的美术,怎么用巧劲儿制造高级感?
  10. iOS 使用AVAudioPlayer开发录音功能