大量数据转录的多线程和同步处理实现
项目中需要对两个不同格式的存储设备进行数据转录,因为数据量非常大,所以时间非常缓慢;解决方案是使用ReaderWriterSlim类建立一个共享的同步数据,可以支持一个线程读取外部设备,向同步数据写入;多个线程从同步数据中读取,转换格式,然后写入到本地设备。
本例中采用Queue<T>作为存放数据的集合,写入线程向它的尾部写入对象,读取线程从它的头部获取对象。
需要注意的是,由于Queue会抛弃已处理的对象,所以在同步数据队列中无法验证数据对象的唯一性,被写入的数据库需要去掉唯一约束,或在写入时向数据库请求验证。
首先定义一个读写接口:
{
public interface IReaderWriter<T>
{
T Read(int argument);
void Write(int arugment, T instance);
void Delete(int argument);
void Clear();
}
}
然后实现一个队列的读写器:
{
public class QueueReaderWriter<T> : IReaderWriter<T>
{
private Queue<T> queues;
public QueueReaderWriter()
{
queues = new Queue<T>();
}
public QueueReaderWriter(int capacity)
{
queues = new Queue<T>(capacity);
}
#region IReadWrite<T> 成员
public T Read(int argument)
{
return queues.FirstOrDefault();
}
public void Write(int arugment, T instance)
{
queues.Enqueue(instance);
}
public void Delete(int argument)
{
queues.Dequeue();
}
public void Clear()
{
queues.Clear();
queues.TrimExcess();
}
#endregion
}
}
使用ReaderWriterLockSlim实现同步数据类:
{
public class SynchronizedWriteData<T> : IDisposable
{
private ReaderWriterLockSlim _dataLock = new ReaderWriterLockSlim();
private IReaderWriter<T> _innerData;
private SynchronizedWriteData()
{ }
public SynchronizedWriteData(IReaderWriter<T> innerData)
{
_innerData = innerData;
}
public T Read()
{
_dataLock.EnterReadLock();
try
{
return _innerData.Read(0);
}
finally
{
_dataLock.ExitReadLock();
}
}
public T Read(int argument)
{
_dataLock.EnterReadLock();
try
{
return _innerData.Read(argument);
}
finally
{
_dataLock.ExitReadLock();
}
}
public void Add(T instance)
{
_dataLock.EnterWriteLock();
try
{
_innerData.Write(0, instance);
}
finally
{
_dataLock.ExitWriteLock();
}
}
public void Add(int argument, T instance)
{
_dataLock.EnterWriteLock();
try
{
_innerData.Write(argument, instance);
}
finally
{
_dataLock.ExitWriteLock();
}
}
public bool AddWithTimeout(T instance, int timeout)
{
if (_dataLock.TryEnterWriteLock(timeout))
{
try
{
_innerData.Write(0, instance);
}
finally
{
_dataLock.ExitWriteLock();
}
return true;
}
else
{
return false;
}
}
public bool AddWithTimeout(int argument, T instance, int timeout)
{
if (_dataLock.TryEnterWriteLock(timeout))
{
try
{
_innerData.Write(argument, instance);
}
finally
{
_dataLock.ExitWriteLock();
}
return true;
}
else
{
return false;
}
}
public void Delete()
{
_dataLock.EnterWriteLock();
try
{
_innerData.Delete(0);
}
finally
{
_dataLock.ExitWriteLock();
}
}
public void Delete(int argument)
{
_dataLock.EnterWriteLock();
try
{
_innerData.Delete(argument);
}
finally
{
_dataLock.ExitWriteLock();
}
}
#region IDisposable 成员
public void Dispose()
{
try
{
_dataLock.EnterWriteLock();
{
try
{
_innerData.Clear();
}
finally
{
_dataLock.ExitWriteLock();
}
}
}
finally
{
_dataLock.Dispose();
}
}
#endregion
}
}
{
/// <summary>
/// 从外部数据源获取到内部数据源的适配器抽象类
/// </summary>
/// <typeparam name="T">T 数据对象类型</typeparam>
public abstract class ExternalDataAdapter<T> : IDisposable
{
/// <summary>
/// 外部数据源连接字符串
/// </summary>
protected abstract string ConnectString { get; }
/// <summary>
/// 提供初始化数据适配器的方法
/// </summary>
protected abstract void Initialize();
/// <summary>
/// 提供数据传递的方法
/// </summary>
public abstract void Transmit();
/// <summary>
/// 提供从外部数据设备读取数据的方法
/// </summary>
protected abstract void ReadFromExternalDevice();
/// <summary>
/// 提供保存数据到内部设备的方法
/// </summary>
protected abstract void SaveToInternalDevice();
#region IDisposable 成员
public abstract void Dispose();
#endregion
}
}
多线程数据转录类,本例只使用了一个读取线程:
{
/// <summary>
/// 提供多线程方式从外部数据源获取到内部数据源的适配器类
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class MultiThreadAdapter<T> : ExternalDataAdapter<T>
{
protected SynchronizedWriteData<T> _data;
protected Thread _readThread;
protected abstract override string ConnectString { get; }
protected abstract override void Initialize();
public sealed override void Transmit()
{
_readThread = new Thread(new ThreadStart(ReadFromExternalDevice));
_readThread.Start();
Thread.Sleep(10000);
while (_readThread.IsAlive)
{
SaveToInternalDevice();
}
_readThread.Join();
}
protected abstract override void ReadFromExternalDevice();
protected abstract override void SaveToInternalDevice();
public override void Dispose()
{
if (_data != null)
{
_data.Dispose();
}
}
}
}
转载于:https://www.cnblogs.com/RCFans/archive/2008/11/15/1333985.html
大量数据转录的多线程和同步处理实现相关推荐
- java的知识点32——多线程 并发同步的 性能分析、快乐影院 订票操作
多线程 并发 同步 性能分析 /*** 线程安全: 在并发时保证数据的正确性.效率尽可能高* synchronized* 1.同步方法* 2.同步块* @author Administrator ...
- Android多线程之同步锁的使用
本文主要介绍了Android多线程之同步锁的使用,分享给大家,具体如下: 一.同步机制关键字synchronized 对于Java来说,最常用的同步机制就是synchronized关键字,他是一种基于 ...
- JAVA-初步认识-第十三章-多线程(验证同步函数的锁)
一. 至于同步函数用的是哪个锁,我们可以验证一下,借助原先卖票的例子 对于程序中的num,从100改为400,DOS的结果显示的始终都是0线程,票号最小都是1. 票号是没有问题的,因为同步了. 有人针 ...
- linux多线程编程——同步与互斥
一. 为什么要用多线程技术? 1.避免阻塞,大家知道,单个进程只有一个主线程,当主线程阻塞的时候,整个进程也就阻塞了,无法再去做其它的一些功能了. 2.避免CPU空转,应用程序经常会涉及到RPC,数据 ...
- (四)Java中的多线程之间实现同步+多线程并发同步
一.什么是线程安全问题 为什么有线程安全问题? 当多个线程同时共享同一个全局变量或静态变量,做写的操作(修改变量值)时,可能会发生数据冲突问题,也就是线程安全问题.但是做读操作时不会发生数据冲突问题. ...
- 钉钉考勤接口调用与OA系统数据对接(多线程版)
钉钉考勤接口调用与OA系统数据对接(多线程版) 公司由原来的指纹打卡更换为钉钉打卡,需要钉钉和现有的OA考勤数据对接(合并钉钉打卡数据和OA上的请假,外出,出差数据),因为人数增减单线程定时任务数据抓 ...
- 多线程的实现与多线程的同步机制-让你轻松掌握多线程编程
多线程的实现与多线程的同步机制 1. 多线程的实现 1.1 继承Thread类 创建一个类,这个类需要继承Thread类 重写Thread类的run方法(run方法中是业务代码) 实例化此线程类 调用 ...
- python多线程数据交互_python 多线程 通信
一篇文章搞定Python多进程(全) 公众号:pythonislover 前面写了三篇关于python多线程的文章,大概概况了多线程使用中的方法,文章链接如下: 一篇文章搞懂Python多线程简单实现 ...
- java登录时启动后台异步线程_JAVA多线程的同步和 异步
原标题:JAVA多线程的同步和 异步 1.多线程和异步操作的异同 多线程和异步操作两者都可以达到避免调用线程阻塞的目的,从而提高软件的可响应性.甚至有些时候我们就认为多线程和异步操作是等同的概念.但是 ...
最新文章
- C++逻辑运算符与逻辑表达式
- 直播回顾|慕尼黑工业大学博士详解室内SLAM中的几何约束
- 近期活动盘点:大咖云集,中国AI创新者论坛(3.21)
- Python操作Excel(将父子级表头生成树状结构)
- Gentoo 安装日记 16(编译内核)
- 通过PXE启动rescue模式修复丢失的系统文件(fstab/bash/mount/grub)
- Android QQ登录 程序奔溃的问题
- c构造函数和析构函数_C ++构造函数和析构函数| 查找输出程序| 套装1
- benchmark问题_使用U盘来掩盖CEPH IO性能低下的问题
- 信息抽取--关键句提取
- 24. 练习定义几种指针及数组
- 【Flink】Flink 基于事件序列最大值 AssignerWithPeriodicWatermarks
- .net ef 字段不区分大小写_第六节:框架搭建之EF的Fluent Api模式的使用流程
- 原生javaScript中使用Ajax实现异步通信
- java 试卷自动生成_基于JAVA的试题自动生成系统 - WEB源码|JSP源码/Java|源代码 - 源码中国...
- ubuntu学习日记--Lesson5:系统目录详解
- 等额本金-c语言俩个整数除法
- Atitit 信息安全常见基础技术 目录 1. 加密	1 2. 签名(防篡改)	1 2.1. 第4章 标识与认证技术 	1 2.2. 第5章 授权与访问控制技术 	1 2.3. 第9章 安全审计与责
- iOS视频直播初窥:高仿喵播APP
- HWIDGen 激活出错 解决:输入错误: 没有文件扩展“.vbs”的脚本引擎
热门文章
- 英语学习者必看:英语和汉语的十大区别
- 计算机常见竞赛大盘点!
- 以为微信里3008位好友就是人脉,殊不知有18位已经把我删除了
- GNU Radio3.8创建OOT的详细过程(基础/C++)
- 获取淘宝购买到的商品订单物流API,买家订单物流API接口,淘宝买家订单API接口
- 使用USBWriter做U盘启动盘后U盘在盘中不显示的解决办法
- 一键下载淘宝买家秀图片(win10)
- 【JavaWeb-遇错】继承或者实现Servlet相关时总是报红或者包导不进来
- 绕过接口参数签名验证
- Asp.Net支付宝沙箱支付(也可用手机扫描支付)详细教程