大家好,我是渔夫子,今天跟大家聊聊在我们项目中的优先级队列的实现。

  优先级队列概述

队列,是数据结构中实现先进先出策略的一种数据结构。而优先队列则是带有优先级的队列,即先按优先级分类,然后相同优先级的再 进行排队。优先级高的队列中的元素会优先被消费。如下图所示:

在Go中,可以定义一个切片,切片的每个元素代表一种优先级队列,切片的索引顺序代表优先级顺序,后面代码实现部分我们会详细讲解。

  为什么需要优先级队列

先来看现实生活中的例子。银行的办事窗口,有普通窗口和vip窗口,vip窗口因为排队人数少,等待的时间就短,比普通窗口就会优先处理。同样,在登机口,就有贵宾通道和普通,同样贵宾通道优先登机。

在互联网中,当然就是请求和响应。使用优先级队列的作用是将请求按特定的属性划分出优先级,然后按优先级的高低进行优先处理。在研发服务的时候这里有个隐含的约束条件就是服务器资源(CPU、内存、带宽等)是有限的。如果服务器资源是无限的,那么也就不需要队列进行排队了,来一个请求就立即处理一个请求就好了。所以,为了在最大限度的利用服务器资源的前提下,将更重要的任务(优先级高的请求)优先处理,以更好的服务用户。

对于请求优先级的划分可以根据业务的特点根据价值高的优先原则来进行划分即可。例如可以根据是否是否是会员、是否是VIP会员等属性进行划分优先级。也可以根据是否是付费用户进行划分。在博客的业务中,也可以根据是否是大V的属性进行优先级划分。在互联网广告业务中,可以根据广告位资源价值高低来划分优先级。

  优先级队列实现

01 三个角色

在完整的优先级队列中有三个重要的角色,分别是优先级队列、工作单元Job、消费者worker。

  • 优先级队列:按优先级划分的队列,用来暂存对应优先级的工作单元Job,相同优先级的工作单元会在同一个队列里。

  • 工作单元Job:队列里的元素。我们把每一次业务处理都封装成一个工作单元,该工作单元会进入对应的优先级队列进行排队,然后等待消费者worker来消费执行。

  • 消费者worker:监听noticeChan,当监听到noticeChan有消息时,说明队列中有工作单元需要被处理,优先从高优先级队列中获取元素进行消费。

02 队列-消费者模式

根据队列个数和消费者个数,我们可以将队列-消费者模式分为单队列-单消费者模式、多队列(优先级队列)- 单消费者模式、多队列(优先级队列)- 多消费者模式。

我们先从最简单的单队列-单消费者模式实现,然后一步步演化成多队列(优先级队列)-多消费者模式。

03 单队列-单消费者模式实现

3.1 队列的实现

我们先来看下队列的实现。这里我们用Golang中的List数据结果来实现,List数据结构是一个双向链表,包含了将元素放到链表尾部、将头部元素弹出的操作,符合队列先进先出的特性。

好,我们看下具体的队列的数据结构:

type JobQueue struct {mu sync.Mutex //队列的操作需要并发安全jobList *list.List //List是golang库的双向队列实现,每个元素都是一个jobnoticeChan chan struct{} //入队一个job就往该channel中放入一个消息,以供消费者消费
}
  • 入队操作

/*** 队列的Push操作*/
func (queue *JobQueue) PushJob(job Job) {queue.jobList.PushBack(job) //将job加到队尾queue.noticeChan <- struct{}{}
}

到这里有同学就会问了,为什么不直接将job推送到Channel中,然后让消费者依次消费不就行了么?是的,单队列这样是可以的,因为我们最终目标是为了实现优先级的多队列,所以这里即使是单队列,我们也使用List数据结构,以便后续的演变。

还有一点,大家注意到了,这里入队操作时有一个 这样的操作:

queue.noticeChan <- struct{}{}

消费者监听的实际上不是队列本身,而是通道noticeChan。当有一个元素入队时,就往noticeChan通道中输入一条消息,这里是一个空结构体,主要作用就是通知消费者worker,队列里有要处理的元素了,可以从队列中获取了。这个在后面演化成多队列以及多消费者模式时会很有用。

  • 出队操作

根据队列的先进先出原则,是要获取队列的最先进入的元素。Golang中List结构体的Front()函数是获取链表的第一个元素,然后通过Remove函数将该元素从链表中移出,即得到了队列中的第一个元素。这里的Job结构体先不用关心,我们后面实现工作单元Job时,会详细讲解。

/*** 弹出队列的第一个元素*/
func (queue *JobQueue) PopJob() Job {queue.mu.Lock()defer queue.mu.Unlock()/*** 说明在队列中没有元素了*/if queue.jobList.Len() == 0 {return nil}elements := queue.jobList.Front() //获取队列的第一个元素return queue.jobList.Remove(elements).(Job) //将元素从队列中移除并返回
}
  • 等待通知操作

上面我们提到,消费者监听的是noticeChan通道。当有元素入队时,会往noticeChan中输入一条消息,以便通知消费者进行消费。如果队列中没有要消费的元素,那么消费者就会阻塞在该通道上。

func (queue *JobQueue) WaitJob() <-chan struct{} {return queue.noticeChan
}
3.2 工作单元Job的实现

一个工作单元就是一个要执行的任务。在系统中往往需要执行不同的任务,就是需要有不同类型的工作单元,但这些工作单元都有一组共同的执行流程。我们看下工作单元的类图。

我们看下类图中的几个角色:

  • Job接口:定义了所有Job要实现的方法。

  • BaseJob类(结构体):定义了具体Job的基类。因为具体Job类中的有共同的属性和方法。所以抽象出一个基类,避免重复实现。但该基类对Execute方法没有实现,因为不同的工作单元有具体的执行逻辑。

  • SquareJob和AreaJob类(结构体):是我们要具体实现的业务工作Job。主要是实现Execute的具体执行逻辑。根据业务的需要定义自己的工作Job和对应的Execute方法即可。

接下来,我们以计算一个int类型数字的平方的SquareJob为例来看下具体的实现。

  • BaseJob结构体

首先看下该结构体的定义

type BaseJob struct {Err errorDoneChan chan struct{} //当作业完成时,或者作业被取消时,通知调用者Ctx context.ContextcancelFunc context.CancelFunc
}

在该结构体中,我们主要关注DoneChan字段就行,该字段是当具体的Job的Execute执行完成后,来通知调用者的。

再来看Done函数,该函数就是在Execute函数完成后,要关闭DoneChan通道,以解除Job的阻塞而继续执行其他逻辑。

/*** 作业执行完毕,关闭DoneChan,所有监听DoneChan的接收者都能收到关闭的信号*/
func (job *BaseJob) Done() {close(job.DoneChan)
}

再来看WaitDone函数,该函数是当Job执行后,要等待Job执行完成,在未完成之前,DoneChan里没有消息,通过该函数就能将job阻塞,直到Execute中调用了Done(),以便解除阻塞。

/*** 等待job执行完成*/
func (job *BaseJob) WaitDone()  {select {case <-job.DoneChan:return}
}
  • SquareJob结构体

type SquareJob struct {*BaseJobx int
}

从结构体的定义中可知,SquareJob嵌套了BaseJob,所以该结构体拥有BaseJob的所有字段和方法。在该结构体主要实现了Execute的逻辑:对x求平方。

func (s *SquareJob) Execute() error {result := s.x * s.xfmt.Println("the result is ", result)return nil
}
3.3 消费者Worker的实现

Worker主要功能是通过监听队列里的noticeChan是否有需要处理的元素,如果有元素的话从队列里获取到要处理的元素job,然后执行job的Execute方法。

我们将该结构体定位为WorkerManager,因为在后面我们讲解多Worker模式时,会需要一个Worker的管理者,因此定义成了WorkerManager。

type WorkerManager struct {queue *JobQueuecloseChan chan struct{}
}

StartWorker函数,只有一个for循环,不断的从队列中获取Job。获取到Job后,进行消费Job,即ConsumeJob。

func (m *WorkerManager) StartWork() error {fmt.Println("Start to Work")for {select {case <-m.closeChan:return nilcase <-m.queue.noticeChan:job := m.queue.PopJob()m.ConsumeJob(job)}}return nil
}func (m *WorkerManager) ConsumeJob(job Job) {defer func() {job.Done()}()job.Execute()
}

到这里,单队列-单消费者模式中各角色的实现就讲解完了。我们通过main函数将其关联起来。

func main() {//初始化一个队列queue := &JobQueue{jobList: list.New(),noticeChan: make(chan struct{}, 10),}//初始化一个消费workerworkerManger := NewWorkerManager(queue)// worker开始监听队列go workerManger.StartWork()// 构造SquareJobjob := &SquareJob{BaseJob: &BaseJob{DoneChan: make(chan struct{}, 1),},x: 5,}//压入队列尾部queue.PushJob(job)//等待job执行完成job.WaitDone()print("The End")
}

04 多队列-单消费者模式

有了单队列-单消费者的基础,我们如何实现多队列-单消费者模式。也就是优先级队列。

优先级的队列,实质上就是根据工作单元Job的优先级属性,将其放到对应的优先级队列中,以便worker可以根据优先级进行消费。我们要在Job结构体中增加一个Priority属性。因为该属性是所有Job都共有的,因此定义在BaseJob上更合适。

type BaseJob struct {Err errorDoneChan chan struct{} //当作业完成时,或者作业被取消时,通知调用者Ctx context.ContextcancelFunc context.CancelFuncpriority int //工作单元的优先级
}

我们再来看看多队列如何实现。实际上就是用一个切片来存储各个队列,切片的每个元素存储一个JobQueue队列元素即可。

var queues = make([]*JobQueue, 10, 100)

那各优先级的队列在切片中是如何存储的呢?切片索引顺序只代表优先级的高于低,不代表具体是哪个优先级。

什么意思呢?假设我们现在对目前的工作单元定义了1、4、7三个优先级。这3个优先级在切片中是按优先级从小到到依次存储在queues切片中的,如下图:

那为什么不让切片的索引就代表优先级,让优先级为1的队列存储在索引1处,优先级4的队列存储在索引4处,优先级7的队列存储在索引7处呢?如果这样存储的话,就会变成如下这样:

由此可见,这样的存储会造成空间的浪费。所以,我们是将队列按优先级高低依次存放到了切片中。

那既然这样,当一个优先级的job来了之后,我该怎么知道该优先级的队列是存储在哪个索引中呢?我们用一个map来映射优先级和切片索引之间的关系。这样当一个工作单元Job入队的时候,以优先级为key,就可以查找到对应优先级的队列存储在切片的哪个位置了。如下图所示:

代码定义:

var priorityIdx map[int][int] //该map的key是优先级,value代表的是queues切片的索引

好了,我们重新定义一下队列的结构体:

type PriorityQueue struct {mu sync.MutexnoticeChan chan struct{}queues []*JobQueuepriorityIdx map[int]int
}//原来的JobQueue会变成如下这样:
type JobQueue struct {priority int //代表该队列是哪种优先级的队列jobList *list.List //List是golang库的双向队列实现,每个元素都是一个job
}

这里我们注意到有以下几个变化:

  • JobQueue里多了一个Priority属性,代表该队列是哪个优先级别。

  • noticeChan属性从JobQueue中移动到了PriorityQueue中。因为现在有多个队列,只要任意一个队列里有元素就需要通知消费者worker进行消费,因此消费者worker监听的是PriorityQueue中是否有元素,而在监听阶段不关心具体哪个优先级队列中有元素。

好了,数据结构定义完了,我们看看将工作单元Job推入队列和从队列中弹出Job又有什么变化。

  • 优先级队列的入队操作

优先级队列的入队操作,就需要根据入队Job的优先级属性放到对应的优先级队列中,入队流程图如下:

当一个Job加入队列的时候,有两种场景,一种是该优先级的队列已经存在,则直接Push到队尾即可。一种是该优先级的队列还不存在,则需要先创建该优先级的队列,然后再将该工作单元Push到队尾。如下是两种场景。

队列已经存在的场景

这种场景会比较简单。假设我们要插入优先级为7的工作单元,首先从映射表中查找7是否存在,发现对应关系是2,则直接找到切片中索引2的元素,即优先级为7的队列,将job加入即可。如下图。

队列不存在的场景

这种场景稍微复杂些,在映射表中找不到要插入优先级的队列的话,则需要在切片中插入一个优先级队列,而为了优先级队列在切片中也保持有序(保持有序就可以知道队列的优先级的高低了),则需要移动相关的元素。我们以插入优先级为6的工作单元为例来讲解。

1、首先,我们的队列有一个初始化的状态,存储了优先级1、4、7的队列。如下图。

2、当插入优先级为6的工作单元时,发现在映射表中没有优先级6的映射关系,说明在切片中还没有优先级为6的队列的元素。所以需要在切片中依次查找到优先级6应该插入的位置在4和7之间,也就是需要存储在切片2的位置。

3、将原来索引2位置的优先级为7的队列往后移动到3,同时更新映射表中的对应关系。

4、将优先级为6的工作单元插入到索引2的队列中,同时更新映射表中的优先级和索引的关系。

我们看下代码实现:

func (priorityQueue *PriorityQueue) Push(job Job) {priorityQueue.mu.Lock()defer priorityQueue.mu.Unlock()//先根据job的优先级找要入队的队列var idx intvar ok bool//从优先级-切片索引的map中查找该优先级的队列是否存在if idx, ok = priorityQueue.priorityIdx[job.Priority()]; !ok {//如果不存在该优先级的队列,则需要初始化一个队列,并返回该队列在切片中的索引位置idx = priorityQueue.addPriorityQueue(job.Priority)}//根据获取到的切片索引idx,找到具体的队列queue := priority.queues[idx]//将job推送到队列的队尾queue.JobList.PushBack(job)//队列job个数+1priorityQueue.Size++//如果队列job个数超过队列的最大容量,则从优先级最低的队列中移除工作单元if priorityQueue.size > priorityQueue.capacity {priorityQueue.RemoveLeastPriorityJob()}else {//通知新进来一个jobpriorityQueue.noticeChan <- struct{}{}}
}

代码中大部分也都做了注释,不难理解。这里我们来看下addPriorityQueue的具体实现:

func (priorityQueue *PriorityQueue) addPriorityQueue(priority int) int {n := len(priorityQueue.queues)//通过二分查找找到priority应插入的切片索引pos := sort.Search(n, func(i int) bool {return priority < priorityQueue.priority})//更新映射表中优先级和切片索引的对应关系for i := pos; i < n; i++ {priorityQueue.priorityIdx[priorityQueue.queues[i].priority] = i + 1}tail := make([]*jobQueue, n-pos)copy(tail, priorityQueue.queues[pos:])//初始化一个新的优先级队列,并将该元素放到切片的pos位置中priorityQueue.queues = append(priorityQueue.queues[0:pos], newJobQueue(priority))//将高于priority优先级的元素也拼接到切片后面priorityQueue.queues = append(priorityQueue.queues, tail...) return pos
}

最后,我们再来看一个实际的调用例子:

func main() {//初始化一个队列queue := &PriorityQueue{noticeChan: make(chan struct{}, cap),capacity: cap,priorityIdx: make(map[int]int),size: 0,}//初始化一个消费workerworkerManger := NewWorkerManager(queue)// worker开始监听队列go workerManger.StartWork()// 构造SquareJobjob := &SquareJob{BaseJob: &BaseJob{DoneChan: make(chan struct{}, 1),},x: 5,priority: 10,}//压入队列尾部queue.PushJob(job)//等待job执行完成job.WaitDone()print("The End")
}

05 多队列-多消费者模式

我们在多队列-单消费者的基础上,再来看看多消费者模式。也就是增加worker的数量,提高Job的处理速度。

我们再来看下worker的定义:

type WorkerManager struct {queue *PriorityQueuecloseChans []chan struct{}
}
这里需要注意,closeChans变成了切片数组。因为我们每启动一个worker,就需要有一个关闭通道。

然后看StartWorker函数的实现:

func (m *WorkerManager) StartWork(n int) error {fmt.Println("Start to Work")for i := 0; i < n; i++ {m.createWorker();}return nil
}func (m *WorkerManager) createWorker() {closeChan := make(chan struct{})//每个协程,就是一个workergo func(closeChan chan struct{}) {var job Jobfor {select {case <-m.closeChan:return nilcase <-m.queue.noticeChan:job := m.queue.PopJob()m.ConsumeJob(job)}  }}(closeChan)m.closeChanMu.Lock()defer m.closeChanMu.Unlock()m.closeChans = append(m.closeChans, closeChan)return nil
}func (m *WorkerManager) ConsumeJob(job Job) {defer func() {job.Done()}()job.Execute()
}
这里需要注意的是,所有的worker都需要监听队列的noticeChan通道。测试的例子就留给读者自己了。

另外如下图的单队列-多消费者模式是多队列-多消费者模式的一个特例,这里就不再进行实现了。

总结

优先级队列的实现主要利用了切片来存储多个队列,并将队列的优先级依次存储在切片索引中,并将具体的优先级和切片索引存储在映射表中,以便快速的定位一个具体优先级队列的存储位置。本文中一些细节的并发加锁操作做了忽略,大家在实际应用中根据需要进行完善即可。

想要了解更多有关 Go 语言的资讯动态,还可通过扫描下方二维码,进去一起探讨交流哦~

Go实战 | 一文带你搞懂从单队列到优先级队列的实现相关推荐

  1. Go 实战 | 一文带你搞懂从单队列到优先级队列的实现

    大家好,我是「Go学堂」的渔夫子,今天跟大家聊聊在我们项目中的优先级队列的实现及应用场景. 优先级队列概述 队列,是数据结构中实现先进先出策略的一种数据结构.而优先队列则是带有优先级的队列,即先按优先 ...

  2. 一文带你搞懂从动态代理实现到Spring AOP

    摘要:本文主要讲了Spring Aop动态代理实现的两种方式. 1. Spring AOP Spring是一个轻型容器,Spring整个系列的最最核心的概念当属IoC.AOP.可见AOP是Spring ...

  3. 一文带你搞懂C#多线程的5种写法

    一文带你搞懂C#多线程的5种写法 1.简介 超长警告! 在学习本篇文章前你需要学习的相关知识: 线程基本知识 此篇文章简单总结了C#中主要的多线程实现方法,包括: Thread 线程 ThreadPo ...

  4. RPC框架:一文带你搞懂RPC

    RPC是什么(GPT答) ChatGPT回答: RPC(Remote Procedure Call)是一种分布式应用程序的编程模型,允许程序在不同的计算机上运行.它以一种透明的方式,将一个程序的函数调 ...

  5. 如何查询你电脑的IP地址?一文带你搞懂IP地址

    上一章介绍了数据链路层--以太网数据帧的报文格式(你知道以太网数据帧在网络中如何发送和接收的吗?一文带你搞懂它),本章介绍下网络层--IP地址. 大家都知道计算机都会有一个IP地址,只有配置了IP地址 ...

  6. RPC框架:从原理到选型,一文带你搞懂RPC

    大家好,我是华仔,RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理.对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型,下面是文章内容目录: RPC 什么 ...

  7. ipv6单播地址包括哪两种类型_IPV6中为啥没有ARP了呢?一文带你搞懂NDP邻居发现协议...

    前言 前面我们介绍了ICMPv6协议 除了提供ICMPv4常用的基本功能之外,还有邻居发现(ND)的功能.一文带你看懂ICMPv6和ICMPv4的区别 那么究竟什么是邻居发现协议(ND)呢? 邻居发现 ...

  8. 一文带你搞懂什么是测试开发!

    需要说明的是,原文发表于作者的公众号中,文章篇幅虽长,但内容朴实.且能帮助读者进一步理解测试开发工作,请读者耐心品完~ 01 开始前说点什么 1. 自我反省 公众号开通了也有两年多了,除了刚开通的那段 ...

  9. 干货文——一文带你搞懂爬虫储存数据库MongoDB

    点击上方"Python爬虫与数据挖掘",进行关注 回复"书籍"即可获赠Python从入门到进阶共10本电子书 今 日 鸡 汤 夜阑卧听风吹雨,铁马冰河入梦来. ...

最新文章

  1. 【小程序】汇编实现判断回文
  2. Linux下安装rabbitmq3.7.8
  3. python自动化测试报告_python自动化测试报告(excel篇)
  4. jpa 自定义sql if_数据产品经理必备之SQL基础
  5. 哈工大大数据实验_科研常用 | 实验大数据分析方法
  6. 【今日CV 计算机视觉论文速览 91期】Mon, 1 Apr 2019
  7. 深入分析redis cluster 集群安装配置详解
  8. 1208. 尽可能使字符串相等
  9. 专科python应届生工资多少-阿里员工吐槽:应届生工资太猛,被倒挂,后悔接阿里侮辱性offer...
  10. MySQL基础知识及常见面试题整理
  11. 鸿蒙系统u盘制作,WINDOWS系列 篇二:【保姆级】Windows 10安装版原版系统U盘制作及系统安装教程...
  12. @DependsOn
  13. 80004005错误代码_0x80004005,小编教你解决0x80004005错误代码的方法
  14. 新手学计算机专用鼠标垫,CS迷注意!教你自制高级鼠标垫
  15. linux 查看 man 路径配置文件 man.config,linux man 1,2,3....
  16. Windows ActiveMq开机自启动设置
  17. 前端学习从入门到高级全程记录之8 (PS基本使用综合案例)
  18. 利用matlab中的函数regress进行线性回归分析
  19. 惠普喷墨打印机卡纸了
  20. CAD文件怎么打印成黑白图片教程

热门文章

  1. crh寄存器_STM32 学习笔记(寄存器)---2
  2. Riak 简介,第 1 部分: 与语言无关的 HTTP API
  3. 戴尔服务器安装独显后无显示,在T630服务器上安装了独立显卡,重启后液晶面板显示“pci1318 fatal error on bus 128d”,然后黑屏重启,该问题如何解决阿。...
  4. 闭关修炼(十二) NIO
  5. 内核编译出错 [arch/arm/boot/compressed/piggy.lzo] Error 1
  6. 陈皓谈对待技术的态度
  7. PHP获取系统时间的方法(毫秒数)
  8. 深度学习小白装机-记录一下
  9. 使用计算机正确坐姿,如何保持正确坐姿?(多图)
  10. 抢鞋软件bot服务器系统,抢鞋机器人bot app