代码

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public delegate void AnonymousHandler();

public class WorkQueue : WorkQueue<AnonymousHandler> {
public WorkQueue() : this(16, -1) { }
public WorkQueue(int thread)
: this(thread, -1) {
}
public WorkQueue(int thread, int capacity) {
base.Thread = thread;
base.Capacity = capacity;
base.Process += delegate(AnonymousHandler ah) {
ah();
};
}
}

public class WorkQueue<T> : IDisposable {
public delegate void WorkQueueProcessHandler(T item);
public event WorkQueueProcessHandler Process;

private int _thread = 16;
private int _capacity = -1;
private int _work_index = 0;
private Dictionary<int, WorkInfo> _works = new Dictionary<int, WorkInfo>();
private object _works_lock = new object();
private Queue<T> _queue = new Queue<T>();
private object _queue_lock = new object();

public WorkQueue() : this(16, -1) { }
public WorkQueue(int thread)
: this(thread, -1) {
}
public WorkQueue(int thread, int capacity) {
_thread = thread;
_capacity = capacity;
}

public void Enqueue(T item) {
lock (_queue_lock) {
if (_capacity > 0 && _queue.Count >= _capacity) return;
_queue.Enqueue(item);
}
lock (_works_lock) {
foreach (WorkInfo w in _works.Values) {
if (w.IsWaiting) {
w.Set();
return;
}
}
}
if (_works.Count < _thread) {
if (_queue.Count > 0) {
int index = 0;
lock (_works_lock) {
index = _work_index++;
_works.Add(index, new WorkInfo());
}
new Thread(delegate() {
WorkInfo work = _works[index];
while (true) {
List<T> de = new List<T>();
if (_queue.Count > 0) {
lock (_queue_lock) {
if (_queue.Count > 0) {
de.Add(_queue.Dequeue());
}
}
}

if (de.Count > 0) {
try {
this.OnProcess(de[0]);
} catch {
}
}

if (_queue.Count == 0) {
work.WaitOne(TimeSpan.FromSeconds(20));

if (_queue.Count == 0) {
break;
}
}
}
lock (_works_lock) {
_works.Remove(index);
}
work.Dispose();
}).Start();
}
}
}

protected virtual void OnProcess(T item) {
if (Process != null) {
Process(item);
}
}

#region IDisposable 成员

public void Dispose() {
lock (_queue_lock) {
_queue.Clear();
}
lock (_works_lock) {
foreach (WorkInfo w in _works.Values) {
w.Dispose();
}
}
}

#endregion

public int Thread {
get { return _thread; }
set {
if (_thread != value) {
_thread = value;
}
}
}
public int Capacity {
get { return _capacity; }
set {
if (_capacity != value) {
_capacity = value;
}
}
}

public int UsedThread {
get { return _works.Count; }
}
public int Queue {
get { return _queue.Count; }
}

public string Statistics {
get {
string value = string.Format(@"线程:{0}/{1}
队列:{2}

", _works.Count, _thread, _queue.Count);
int[] keys = new int[_works.Count];
try {
_works.Keys.CopyTo(keys, 0);
} catch {
lock (_works_lock) {
keys = new int[_works.Count];
_works.Keys.CopyTo(keys, 0);
}
}
foreach (int k in keys) {
WorkInfo w = null;
if (_works.TryGetValue(k, out w)) {
value += string.Format(@"线程{0}:{1}
", k, w.IsWaiting);
}
}
return value;
}
}

class WorkInfo : IDisposable {
private ManualResetEvent _reset = new ManualResetEvent(false);
private bool _isWaiting = false;

public void WaitOne(TimeSpan timeout) {
try {
_reset.Reset();
_isWaiting = true;
_reset.WaitOne(timeout, false);
} catch { }
}
public void Set() {
try {
_isWaiting = false;
_reset.Set();
} catch { }
}

public bool IsWaiting {
get { return _isWaiting; }
}

#region IDisposable 成员

public void Dispose() {
this.Set();
_reset.Close();
}

#endregion
}
}

调用方法:

代码

//方法1
WorkQueue<xxx> writeWQ = new WorkQueue<xxx>(5, 1000); //有重载,最多开辟5个线程同时处理,队列大小设为1000,如果超出则不处理

for (int a = 0; a < 1000000; a++) {

//此方法按常理会立即返回,因为执行交给 WorkQueue 处理了
writeWQ.Enqueue(delegate(xxx msg) {
//处理
});
}

//方法2,执行一个匿名委托
WorkQueue writeWQ = new WorkQueue(5, 1000); //有重载,最多开辟5个线程同时处理,队列大小设为1000,如果超出则不处理

for (int a = 0; a < 1000000; a++) {

//此方法按常理会立即返回,因为执行交给 WorkQueue 处理了
writeWQ.Enqueue(delegate() {
//处理
});
}

转载于:https://www.cnblogs.com/baobao2010/archive/2010/08/09/1795610.html

队列处理器 WorkQueueT相关推荐

  1. 使用 Supervisor 配置 Laravel 运行队列处理器

    阅读目录 配置 Supervisor 启动 Supervisor 配置 Supervisor Supervisor 的配置文件通常位于 /etc/supervisor/conf.d 目录下. 在该目录 ...

  2. laravel5.6 php,Laravel5.6中的队列简单使用

    Laravel 队列为不同的后台队列服务提供统一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于关系型数据库的队列.队列的目的是将耗时的任务延时处理,比如发送邮件, ...

  3. 哈,又一款超级简单的队列(MQ)实现方案来了~

    开源的消息队列已经很多了,但大部分很重,实际环境下,很多可能只是使用到了一点功能而已,杀鸡使用牛刀,着实有些浪费了.很多时候,我们只想要一片绿叶,但它们给了我们整个的春天,很难消化.本着DIR精神, ...

  4. Laravel5.5之事件监听、任务调度、队列

    一.事件监听 流程: 1.1 创建event php artisan make:event UserLogin LoginController.php /*** The user has been a ...

  5. laravel 任务队列_Laravel5.5之事件监听、任务调度、队列

    流程: 1.1 创建event php artisan make:event UserLogin LoginController.php /*** The user has been authenti ...

  6. 消息队列之取消会议和自动退款处理

    2020 年,因为新冠疫情原因很多行业交流会议被取消,这就涉及到如何处理退款,理想的场景是当我们在系统管理界面点击取消按钮时,就会在后台自动批量处理所有已付款用户的退款,然后给对应用户发送会议取消和退 ...

  7. laravel实现队列

    一:队列配置 队列的配置文件放置在config/queue.php文件中,laravel框架中支持的队列驱动有:sync, database, beanstalkd, sqs, redis,null对 ...

  8. 使用队列think-queue处理邮件的发送

    前言:使用的php框架版本为:thinkphp 5.0.24,队列使用的拓展包为:topthink/think-queue:2.0.3,redis版本为 5.0.5. 因为下面提到的队列使用的redi ...

  9. Php laravel 队列,Laravel 的队列系统介绍

    这篇文章主要介绍的内容是关于Laravel 的队列系统介绍,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下 Laravel 队列为不同的后台队列服务提供统一的 API , 例如 Bean ...

最新文章

  1. 知乎千万级高性能长连接网关是如何搭建的
  2. 比特币现金反弹,区块链是极好机遇
  3. 【每天读一遍,不久你就会变!】【送给迷茫的朋友】
  4. dncnn图像去噪_NeuNet2020:BRDNet(开源)使用深度CNN和批量归一化进行图像去噪
  5. C语言algorithm主函数,C语言中主函数中相关有关问题?
  6. java-线程-生产者-消费者
  7. 东南亚再造天猫 Lazada品牌商城LazMall举办第二届品牌未来论坛
  8. 内核移植出现:Kernel panic - not syncing: No init found.
  9. 1698 -Access denied for user 'root@xxxx'
  10. MVC控制器中动作方法返回的结果
  11. Struts2的输入验证(三)-短路验证与非字段验证
  12. UEditor 之初体验后记
  13. 生物信息学资料1,常用软件,酶切位点分析
  14. MAX422与422转USB及485以及232接线方法
  15. 糖友日常生活需要注意什么
  16. 1076 Forwards on Weibo——最后用menset函数
  17. PaddleNLP实战:应用NeZha模型做微博情感6分类
  18. 微信小程序-06 tab选项卡滑动切换与列表Item(scroll 、 swiper)数据的获取等所用到的都有了
  19. Pinia全新一代状态管理工具Pinia-Vue3全家桶
  20. QGC地面站二次开发(三)Qt 简洁地面站

热门文章

  1. Python静态作用域名字搜索规则
  2. 数字图像处理--图像增强之对比度拉伸
  3. Ubuntu 16.04查看软件安装位置
  4. 如何建立队列c语言_什么是优先队列
  5. 推荐系统知识梳理——GBDTLR
  6. 从JVM的角度看JAVA代码--代码优化
  7. 对编码通俗易懂的介绍
  8. 从excel到python数据分析进阶指南_从Excel到Python数据分析进阶指南
  9. html ios导航栏下拉菜单,Flutter -- iOS导航栏TabBar
  10. win10下mount挂载文件 samba cifs