项目中需要对两个不同格式的存储设备进行数据转录,因为数据量非常大,所以时间非常缓慢;解决方案是使用ReaderWriterSlim类建立一个共享的同步数据,可以支持一个线程读取外部设备,向同步数据写入;多个线程从同步数据中读取,转换格式,然后写入到本地设备。

本例中采用Queue<T>作为存放数据的集合,写入线程向它的尾部写入对象,读取线程从它的头部获取对象。

需要注意的是,由于Queue会抛弃已处理的对象,所以在同步数据队列中无法验证数据对象的唯一性,被写入的数据库需要去掉唯一约束,或在写入时向数据库请求验证。

首先定义一个读写接口:

namespace Common
{
    public interface IReaderWriter<T>
    {
        T Read(int argument);
        void Write(int arugment, T instance);
        void Delete(int argument);
        void Clear();
    }
}

然后实现一个队列的读写器:

namespace Common
{
    public class QueueReaderWriter<T> : IReaderWriter<T>
    {
        private Queue<T> queues;

public QueueReaderWriter()
        {
            queues = new Queue<T>();
        }

public QueueReaderWriter(int capacity)
        {
            queues = new Queue<T>(capacity);
        }

#region IReadWrite<T> 成员

public T Read(int argument)
        {
            return queues.FirstOrDefault();
        }

public void Write(int arugment, T instance)
        {
            queues.Enqueue(instance);
        }

public void Delete(int argument)
        {
            queues.Dequeue();
        }

public void Clear()
        {
            queues.Clear();
            queues.TrimExcess();
        }

#endregion
    }
}

使用ReaderWriterLockSlim实现同步数据类:

namespace Common
{
    public class SynchronizedWriteData<T> : IDisposable
    {
        private ReaderWriterLockSlim _dataLock = new ReaderWriterLockSlim();
        private IReaderWriter<T> _innerData;

private SynchronizedWriteData()
        { }

public SynchronizedWriteData(IReaderWriter<T> innerData)
        {
            _innerData = innerData;
        }

public T Read()
        {
            _dataLock.EnterReadLock();
            try
            {
                return _innerData.Read(0);
            }
            finally
            {
                _dataLock.ExitReadLock();
            }
        }

public T Read(int argument)
        {
            _dataLock.EnterReadLock();
            try
            {
                return _innerData.Read(argument);
            }
            finally
            {
                _dataLock.ExitReadLock();
            }
        }

public void Add(T instance)
        {
            _dataLock.EnterWriteLock();
            try
            {
                _innerData.Write(0, instance);
            }
            finally
            {
                _dataLock.ExitWriteLock();
            }
        }

public void Add(int argument, T instance)
        {
            _dataLock.EnterWriteLock();
            try
            {
                _innerData.Write(argument, instance);
            }
            finally
            {
                _dataLock.ExitWriteLock();
            }
        }

public bool AddWithTimeout(T instance, int timeout)
        {
            if (_dataLock.TryEnterWriteLock(timeout))
            {
                try
                {
                    _innerData.Write(0, instance);
                }
                finally
                {
                    _dataLock.ExitWriteLock();
                }
                return true;
            }
            else
            {
                return false;
            }
        }

public bool AddWithTimeout(int argument, T instance, int timeout)
        {
            if (_dataLock.TryEnterWriteLock(timeout))
            {
                try
                {
                    _innerData.Write(argument, instance);
                }
                finally
                {
                    _dataLock.ExitWriteLock();
                }
                return true;
            }
            else
            {
                return false;
            }
        }

public void Delete()
        {
            _dataLock.EnterWriteLock();
            try
            {
                _innerData.Delete(0);
            }
            finally
            {
                _dataLock.ExitWriteLock();
            }
        }

public void Delete(int argument)
        {
            _dataLock.EnterWriteLock();
            try
            {
                _innerData.Delete(argument);
            }
            finally
            {
                _dataLock.ExitWriteLock();
            }
        }

#region IDisposable 成员

public void Dispose()
        {
            try
            {
                _dataLock.EnterWriteLock();
                {
                    try
                    {
                        _innerData.Clear();
                    }
                    finally
                    {
                        _dataLock.ExitWriteLock();
                    }
                }
            }
            finally
            {
                _dataLock.Dispose();
            }
        }

#endregion
    }
}

namespace ExternalDataHandle
{
    /// <summary>
    /// 从外部数据源获取到内部数据源的适配器抽象类
    /// </summary>
    /// <typeparam name="T">T 数据对象类型</typeparam>
    public abstract class ExternalDataAdapter<T> : IDisposable
    {
        /// <summary>
        /// 外部数据源连接字符串
        /// </summary>
        protected abstract string ConnectString { get; }

/// <summary>
        /// 提供初始化数据适配器的方法
        /// </summary>
        protected abstract void Initialize();

/// <summary>
        /// 提供数据传递的方法
        /// </summary>
        public abstract void Transmit();

/// <summary>
        /// 提供从外部数据设备读取数据的方法
        /// </summary>
        protected abstract void ReadFromExternalDevice();

/// <summary>
        /// 提供保存数据到内部设备的方法
        /// </summary>
        protected abstract void SaveToInternalDevice();

#region IDisposable 成员

public abstract void Dispose();

#endregion
    }
}

多线程数据转录类,本例只使用了一个读取线程:

namespace ExternalDataHandle
{
    /// <summary>
    /// 提供多线程方式从外部数据源获取到内部数据源的适配器类
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class MultiThreadAdapter<T> : ExternalDataAdapter<T>
    {
        protected SynchronizedWriteData<T> _data;
        protected Thread _readThread;

protected abstract override string ConnectString { get; }

protected abstract override void Initialize();

public sealed override void Transmit()
        {
            _readThread = new Thread(new ThreadStart(ReadFromExternalDevice));
            _readThread.Start();
            Thread.Sleep(10000);
            while (_readThread.IsAlive)
            {
                SaveToInternalDevice();
            }
            _readThread.Join();
        }

protected abstract override void ReadFromExternalDevice();

protected abstract override void SaveToInternalDevice();

public override void Dispose()
        {
            if (_data != null)
            {
                _data.Dispose();
            }
        }
    }
}

转载于:https://www.cnblogs.com/RCFans/archive/2008/11/15/1333985.html

大量数据转录的多线程和同步处理实现相关推荐

  1. java的知识点32——多线程 并发同步的 性能分析、快乐影院  订票操作

    多线程  并发  同步  性能分析 /*** 线程安全: 在并发时保证数据的正确性.效率尽可能高* synchronized* 1.同步方法* 2.同步块* @author Administrator ...

  2. Android多线程之同步锁的使用

    本文主要介绍了Android多线程之同步锁的使用,分享给大家,具体如下: 一.同步机制关键字synchronized 对于Java来说,最常用的同步机制就是synchronized关键字,他是一种基于 ...

  3. JAVA-初步认识-第十三章-多线程(验证同步函数的锁)

    一. 至于同步函数用的是哪个锁,我们可以验证一下,借助原先卖票的例子 对于程序中的num,从100改为400,DOS的结果显示的始终都是0线程,票号最小都是1. 票号是没有问题的,因为同步了. 有人针 ...

  4. linux多线程编程——同步与互斥

    一. 为什么要用多线程技术? 1.避免阻塞,大家知道,单个进程只有一个主线程,当主线程阻塞的时候,整个进程也就阻塞了,无法再去做其它的一些功能了. 2.避免CPU空转,应用程序经常会涉及到RPC,数据 ...

  5. (四)Java中的多线程之间实现同步+多线程并发同步

    一.什么是线程安全问题 为什么有线程安全问题? 当多个线程同时共享同一个全局变量或静态变量,做写的操作(修改变量值)时,可能会发生数据冲突问题,也就是线程安全问题.但是做读操作时不会发生数据冲突问题. ...

  6. 钉钉考勤接口调用与OA系统数据对接(多线程版)

    钉钉考勤接口调用与OA系统数据对接(多线程版) 公司由原来的指纹打卡更换为钉钉打卡,需要钉钉和现有的OA考勤数据对接(合并钉钉打卡数据和OA上的请假,外出,出差数据),因为人数增减单线程定时任务数据抓 ...

  7. 多线程的实现与多线程的同步机制-让你轻松掌握多线程编程

    多线程的实现与多线程的同步机制 1. 多线程的实现 1.1 继承Thread类 创建一个类,这个类需要继承Thread类 重写Thread类的run方法(run方法中是业务代码) 实例化此线程类 调用 ...

  8. python多线程数据交互_python 多线程 通信

    一篇文章搞定Python多进程(全) 公众号:pythonislover 前面写了三篇关于python多线程的文章,大概概况了多线程使用中的方法,文章链接如下: 一篇文章搞懂Python多线程简单实现 ...

  9. java登录时启动后台异步线程_JAVA多线程的同步和 异步

    原标题:JAVA多线程的同步和 异步 1.多线程和异步操作的异同 多线程和异步操作两者都可以达到避免调用线程阻塞的目的,从而提高软件的可响应性.甚至有些时候我们就认为多线程和异步操作是等同的概念.但是 ...

最新文章

  1. C++逻辑运算符与逻辑表达式
  2. 直播回顾|慕尼黑工业大学博士详解室内SLAM中的几何约束
  3. 近期活动盘点:大咖云集,中国AI创新者论坛(3.21)
  4. Python操作Excel(将父子级表头生成树状结构)
  5. Gentoo 安装日记 16(编译内核)
  6. 通过PXE启动rescue模式修复丢失的系统文件(fstab/bash/mount/grub)
  7. Android QQ登录 程序奔溃的问题
  8. c构造函数和析构函数_C ++构造函数和析构函数| 查找输出程序| 套装1
  9. benchmark问题_使用U盘来掩盖CEPH IO性能低下的问题
  10. 信息抽取--关键句提取
  11. 24. 练习定义几种指针及数组
  12. 【Flink】Flink 基于事件序列最大值 AssignerWithPeriodicWatermarks
  13. .net ef 字段不区分大小写_第六节:框架搭建之EF的Fluent Api模式的使用流程
  14. 原生javaScript中使用Ajax实现异步通信
  15. java 试卷自动生成_基于JAVA的试题自动生成系统 - WEB源码|JSP源码/Java|源代码 - 源码中国...
  16. ubuntu学习日记--Lesson5:系统目录详解
  17. 等额本金-c语言俩个整数除法
  18. Atitit 信息安全常见基础技术 目录 1. 加密 1 2. 签名(防篡改) 1 2.1. 第4章 标识与认证技术  1 2.2. 第5章 授权与访问控制技术  1 2.3. 第9章 安全审计与责
  19. iOS视频直播初窥:高仿喵播APP
  20. HWIDGen 激活出错 解决:输入错误: 没有文件扩展“.vbs”的脚本引擎

热门文章

  1. 英语学习者必看:英语和汉语的十大区别
  2. 计算机常见竞赛大盘点!
  3. 以为微信里3008位好友就是人脉,殊不知有18位已经把我删除了
  4. GNU Radio3.8创建OOT的详细过程(基础/C++)
  5. 获取淘宝购买到的商品订单物流API,买家订单物流API接口,淘宝买家订单API接口
  6. 使用USBWriter做U盘启动盘后U盘在盘中不显示的解决办法
  7. 一键下载淘宝买家秀图片(win10)
  8. 【JavaWeb-遇错】继承或者实现Servlet相关时总是报红或者包导不进来
  9. 绕过接口参数签名验证
  10. Asp.Net支付宝沙箱支付(也可用手机扫描支付)详细教程