原文链接:Writing worker queues, in Go

1.work.go

[root@wangjq queue]# cat work.go
package mainimport "time"type WorkRequest struct {Name  stringDelay time.Duration
}

2.collector.go

[root@wangjq queue]# cat collector.go
package mainimport ("fmt""net/http""time"
)// A buffered channel that we can send work requests on.
var WorkQueue = make(chan WorkRequest, 100)func Collector(w http.ResponseWriter, r *http.Request) {// Make sure we can only be called with an HTTP POST request.if r.Method != "POST" {w.Header().Set("Allow", "POST")w.WriteHeader(http.StatusMethodNotAllowed)return}// Parse the delay.delay, err := time.ParseDuration(r.FormValue("delay"))if err != nil {http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)return}// Check to make sure the delay is anywhere from 1 to 10 seconds.if delay.Seconds() < 1 || delay.Seconds() > 10 {http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)return}// Now, we retrieve the person's name from the request.name := r.FormValue("name")// Just do a quick bit of sanity checking to make sure the client actually provided us with a name.if name == "" {http.Error(w, "You must specify a name.", http.StatusBadRequest)return}// Now, we take the delay, and the person's name, and make a WorkRequest out of them.work := WorkRequest{Name: name, Delay: delay}// Push the work onto the queue.WorkQueue <- workfmt.Println("Work request queued")// And let the user know their work request was created.
        w.WriteHeader(http.StatusCreated)return
}

3.worker.go

[root@wangjq queue]# cat worker.go
package mainimport ("fmt""time"
)// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest) Worker {// Create, and return the worker.worker := Worker{ID:          id,Work:        make(chan WorkRequest),WorkerQueue: workerQueue,QuitChan:    make(chan bool)}return worker
}type Worker struct {ID          intWork        chan WorkRequestWorkerQueue chan chan WorkRequestQuitChan    chan bool
}// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w *Worker) Start() {go func() {for {// Add ourselves into the worker queue.w.WorkerQueue <- w.Workselect {case work := <-w.Work:// Receive a work request.fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds())time.Sleep(work.Delay)fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name)case <-w.QuitChan:// We have been asked to stop.fmt.Printf("worker%d stopping\n", w.ID)return}}}()
}// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w *Worker) Stop() {go func() {w.QuitChan <- true}()
}

4.dispatcher.go

[root@wangjq queue]# cat dispatcher.go
package mainimport "fmt"var WorkerQueue chan chan WorkRequestfunc StartDispatcher(nworkers int) {// First, initialize the channel we are going to but the workers' work channels into.WorkerQueue = make(chan chan WorkRequest, nworkers)// Now, create all of our workers.for i := 0; i < nworkers; i++ {fmt.Println("Starting worker", i+1)worker := NewWorker(i+1, WorkerQueue)worker.Start()}go func() {for {select {case work := <-WorkQueue:fmt.Println("Received work requeust")go func() {worker := <-WorkerQueuefmt.Println("Dispatching work request")worker <- work}()}}}()
}

5.main.go

[root@wangjq queue]# cat main.go
package mainimport ("flag""fmt""net/http"
)var (NWorkers = flag.Int("n", 4, "The number of workers to start")HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
)func main() {// Parse the command-line flags.
        flag.Parse()// Start the dispatcher.fmt.Println("Starting the dispatcher")StartDispatcher(*NWorkers)// Register our collector as an HTTP handler function.fmt.Println("Registering the collector")http.HandleFunc("/work", Collector)// Start the HTTP server!fmt.Println("HTTP server listening on", *HTTPAddr)if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {fmt.Println(err.Error())}
}

6.编译

[root@wangjq queue]# go build -o queued *.go

7.运行

[root@wangjq queue]# ./queued -n 5
Starting the dispatcher
Starting worker 1
Starting worker 2
Starting worker 3
Starting worker 4
Starting worker 5
Registering the collector
HTTP server listening on 127.0.0.1:8000

8.测试

[root@wangjq ~]# for i in {1..3}; do curl localhost:8000/work -d name=$USER -d delay=$(expr $i % 11)s; done

9.效果

[root@wangjq queue]# ./queued -n 5
Starting the dispatcher
Starting worker 1
Starting worker 2
Starting worker 3
Starting worker 4
Starting worker 5
Registering the collector
HTTP server listening on 127.0.0.1:8000
Work request queued
Received work requeust
Dispatching work request
worker1: Received work request, delaying for 1.000000 seconds
Work request queued
Received work requeust
Dispatching work request
worker2: Received work request, delaying for 2.000000 seconds
Work request queued
Received work requeust
Dispatching work request
worker4: Received work request, delaying for 3.000000 seconds
worker1: Hello, root!
worker2: Hello, root!
worker4: Hello, root!

转载于:https://www.cnblogs.com/wangjq19920210/p/11526946.html

go 多线程并发 queue demo相关推荐

  1. java中thread实例_Java多线程并发执行demo代码实例

    主类:MultiThread,执行并发类 package java8test; import java.util.ArrayList; import java.util.List; import ja ...

  2. JAVA多线程并发Demo

    一个最简单的多线程并发demo: 主函数: public class multithreadReq {private static final int THREADNUM = 5;//线程数量 pub ...

  3. Java-多线程-Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比

    转载声明 本文大量内容系转载自以下文章,有删改,并参考其他文档资料加入了一些内容: [小家Java]Future.FutureTask.CompletionService.CompletableFut ...

  4. 多线程并发知识,肝完这篇10W+字超详细的文章就够了

    大家好,我是Oldou,今天又到了我们的学习时间了,本文介绍的是多线程相关的知识,文中的内容可能不是很全,但是学习完一定会让自己掉发升级,内容比较多,但是我们千万别放弃,不懂的地方一定要主动花时间去理 ...

  5. python 多线程并发编程(生产者、消费者模式),边读图像,边处理图像,处理完后保存图像实现提高处理效率

    文章目录 需求 实现 先导入本次需要用到的包 一些辅助函数 如下函数是得到指定后缀的文件 如下的函数一个是读图像,一个是把RGB转成BGR 下面是主要的几个处理函数 在上面几个函数构建对应的处理函数 ...

  6. Java多线程并发编程

    一.线程池 1.1.什么是线程池 线程池是一种多线程的处理方式,利用已有线程对象继续服务新的任务(按照一定的执行策略),而不是频繁地创建销毁线程对象,由此提高服务的吞吐能力,减少CPU的闲置时间.具体 ...

  7. delay在java中有什么用_DelayQueue怎么在Java多线程并发开发中使用

    DelayQueue怎么在Java多线程并发开发中使用 发布时间:2020-12-05 17:29:31 来源:亿速云 阅读:56 作者:Leah 这篇文章给大家介绍DelayQueue怎么在Java ...

  8. 2021全新Java多线程并发入门到精通,一篇就能学会

    目录 一, JAVA 多线程并发 1,JAVA 并发知识库 2,JAVA 线程实现/创建方式 (1) 继承 Thread 类 (2)实现 Runnable 接口. (3)ExecutorService ...

  9. 多线程---并发容器的使用

    多线程---并发容器的使用 1. 容器概览 2. 容器的使用 1. Map 1. HashTable 2. HashMap 3. SynchronizedHashMap 4. ConcurrentHa ...

最新文章

  1. NLP.TM | GloVe模型及其Python实现
  2. 一种可以穿透还原卡和还原软件的代码
  3. 每个私有静态方法都是新类的候选人
  4. cocos creator粒子不变色_隐秘的物理粒子系统与渲染 !Cocos Creator LiquidFun !
  5. 学C++不得不看的一篇文章
  6. java day47【redis概念 、下载安装 、 命令操作 、持久化操作 、使用Java客户端操作redis】...
  7. ios开发 将json格式数据上传服务器
  8. 设定MyEclipse编辑代码区域文字的大小及非关键字的字体、字形和颜色
  9. python实现单例模式的几种方法实例详解
  10. mac误删文件恢复可靠教程
  11. Java中Character类的概述及其详解
  12. 个人计算机中的防病毒软件无法,为什么无法在计算机上安装360防病毒软件?
  13. 移动用户体验设计:iOS APP体验设计
  14. 方阵的特征值与特征向量
  15. Kudu 原理、API使用、代码
  16. am335x linux内核烧写_am335x文件系统烧写问题
  17. 页面中的黑白滤镜css3,filter属性
  18. Stata绘图:多维柱状图绘制
  19. CodeForces 1000A Codehorses T-shirts
  20. Sql语句——删除表数据drop、truncate和delete的用法

热门文章

  1. python中与label类似的控件是_Python高级进阶教程021期 pyqt5label控件进阶使用,设置兄弟控件,广告植入...
  2. 三次握手和四次挥手详细介绍
  3. 排查链接是否失效_如何进行移动站点流量排查?
  4. 狼人杀服务器维护时间,狼人杀官 方将于11月30日进行停机维护
  5. linux ps转为tiff,转换为TIFF,将图像转换为TIFF,在线图像转换为TIFF
  6. 自考计算机及应用心得体会,自考中文专业的心得体会
  7. 的训练过程_最全深度学习训练过程可视化工具(附github源码)
  8. makefile文件编写_九图记住Makefile
  9. java线程池应用的好处_java高级应用:线程池全面解析
  10. 【若依(ruoyi)】向DAO中传递动态参数