知道disruptor快1年多了,一直没有用武之地。这次正好要迁移数据。表结构由于完全不一样,导数据的时候还需要进行一些计算。果断用disruptor试试

  1 public class TransferProcessor implements Runnable,InitializingBean {
  2
  3     private JdbcTemplate mysqlJdbcTemplate;
  4
  5     private PlatformTransactionManager txManager;
  6
  7     private JdbcTemplate oracleJdbcTemplate;
  8
  9     public PlatformTransactionManager getTxManager() {
 10         return txManager;
 11     }
 12
 13     public void setTxManager(PlatformTransactionManager txManager) {
 14         this.txManager = txManager;
 15     }
 16
 17     public JdbcTemplate getMysqlJdbcTemplate() {
 18         return mysqlJdbcTemplate;
 19     }
 20
 21     public void setMysqlJdbcTemplate(JdbcTemplate mysqlJdbcTemplate) {
 22         this.mysqlJdbcTemplate = mysqlJdbcTemplate;
 23     }
 24
 25     public JdbcTemplate getOracleJdbcTemplate() {
 26         return oracleJdbcTemplate;
 27     }
 28
 29     public void setOracleJdbcTemplate(JdbcTemplate oracleJdbcTemplate) {
 30         this.oracleJdbcTemplate = oracleJdbcTemplate;
 31     }
 32
 33     private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(2);
 34
 35     private Integer offset = 0;
 36
 37     private Integer lastId = 0;
 38
 39     private Integer limit = 10000;
 40
 41     private final WorkerValueEventHandler[] handlers = new WorkerValueEventHandler[2];
 42
 43     private RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 32);
 44     private WorkerPool<ValueEvent> workerPool = null;
 45
 46     private String sql = "select t.uid as uid,t.tel,t.email,t.password,t.username,t3.extcredits2 as credits,1 as status,t.regdate,t2.realname,t2.gender,t2.birthyear,t2.birthmonth,t2.birthday,t2.constellation,t2.birthcity ,t2.residecity,t2.bloodtype,t2.qq,t2.msn,t2.taobao,t2.bio,t2.occupation,t.salt from sz_ucenter_members t left join sz_common_member_profile t2 on t.uid=t2.uid left join sz_common_member_count t3 on t.uid=t3.uid";
 47
 48     public void run() {
 49         RingBuffer<ValueEvent> ringBuffer = workerPool.start(EXECUTOR);
 50         ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
 51         long t1 = System.currentTimeMillis();
 52         List<Result> list = query(sql + "  order by uid limit ?,?", offset, limit);
 53         long t2 = System.currentTimeMillis();
 54         Result result = list.get(list.size() - 1);
 55         lastId = result.getUid();
 56         Integer id = 1;
 57         Integer total = list.size();
 58         System.out.println("查询第" + id + "批数据,耗时" + (t2 - t1) + "ms");
 59         while (list != null && list.size() > 0) {
 60             Result r = list.get(list.size() - 1);
 61             lastId = r.getUid();
 62             long sequence = ringBuffer.next();
 63             ValueEvent event = ringBuffer.claimAndGetPreallocated(sequence);
 64             event.setValue(list);
 65             event.setId(id);
 66             ringBuffer.publish(sequence);
 67             id++;
 68             long t3 = System.currentTimeMillis();
 69             list = query(sql + " where t.uid>?  order by uid limit ?", lastId, limit);
 70             long t4 = System.currentTimeMillis();
 71             total += list.size();
 72             System.out.println("查询第" + id + "批数据,耗时" + (t4 - t3) + "ms,RingBuffer剩余空间:" + ringBuffer.remainingCapacity());
 73         }
 74         while (true) {
 75             try {
 76                 Thread.sleep(500);
 77             } catch (InterruptedException e) {
 78                 e.printStackTrace();
 79             }
 80             System.out.println("RingBuffer中剩余数据量:" + (32 - ringBuffer.remainingCapacity()));
 81             if (ringBuffer.remainingCapacity() == 32) {
 82                 System.out.println("数据迁移结束!总共" + total + "条记录");
 83             System.exit(0);
 84             }
 85         }
 86     }
 87
 88
 89     private List<Result> query(String sql, Object p1, Object p2) {
 90         List<Result> list = mysqlJdbcTemplate.query(sql, new Object[] { p1, p2 }, new RowMapper<Result>() {
 91             @Override
 92             public Result mapRow(ResultSet rs, int rowNum) throws SQLException {
 93                 Result r = new Result();
 94                 r.setUid(rs.getInt("uid"));
 95                 String tel = rs.getString("tel");
 96                 r.setTel(StringUtils.isEmpty(tel) ? null : tel.length() > 11 ? tel.substring(0, 11) : tel);
 97                 r.setEmail(rs.getString("email"));
 98                 r.setPassword(rs.getString("password"));
 99                 r.setUsername(StringUtils.isEmpty(rs.getString("username")) ? " " : rs.getString("username"));
100                 r.setCredits(rs.getInt("credits"));
101                 r.setStatus(rs.getInt("status"));
102                 r.setRegdate(rs.getString("regdate"));
103                 r.setRealname(rs.getString("realname"));
104                 r.setGender(rs.getInt("gender"));
105                 r.setBirthyear(rs.getInt("birthyear"));
106                 r.setBirthmonth(rs.getInt("birthmonth"));
107                 r.setBirthday(rs.getInt("birthday"));
108                 r.setConstellation(rs.getString("constellation"));
109                 r.setBirthcity(rs.getString("birthcity"));
110                 r.setResidecity(rs.getString("residecity"));
111                 r.setBloodtype(rs.getString("bloodtype"));
112                 r.setQq(rs.getString("qq"));
113                 r.setMsn(rs.getString("msn"));
114                 r.setTaobao(rs.getString("taobao"));
115                 r.setBio(rs.getString("bio"));
116                 r.setOccupation(rs.getString("occupation"));
117                 r.setSalt(rs.getString("salt"));
118                 return r;
119             }
120         });
121         return list;
122     }
123
124     private List<Address> queryAddress() {
125         List<Address> addresss = oracleJdbcTemplate.query("SELECT t.NAME,t.ID from ADDRESS t where t.LVL=2", new RowMapper<Address>() {
126             @Override
127             public Address mapRow(ResultSet rs, int rowNum) throws SQLException {
128                 Address address = new Address();
129                 address.setId(rs.getInt("ID"));
130                 address.setName(rs.getString("NAME"));
131                 return address;
132             }
133
134         });
135         return addresss;
136     }
137
138     @Override
139     public void afterPropertiesSet() throws Exception {
140         List<Address> addresss = queryAddress();
141         for (int i = 0; i < 2; i++) {
142             handlers[i] = new WorkerValueEventHandler(oracleJdbcTemplate, addresss);
143             handlers[i].setTxManager(txManager);
144         }
145         workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), handlers);
146     }
147
148 }

  1 public class WorkerValueEventHandler implements WorkHandler<ValueEvent> {
  2
  3     private JdbcTemplate jdbcTemplate;
  4
  5     private List<Address> address;
  6
  7     private PlatformTransactionManager txManager;
  8
  9     public PlatformTransactionManager getTxManager() {
 10         return txManager;
 11     }
 12
 13     public void setTxManager(PlatformTransactionManager txManager) {
 14         this.txManager = txManager;
 15     }
 16
 17     private final String SQL = "insert into ACCOUNT(ID,PHONE,NICKNAME,EMAIL,PASSWORD,CREATETIME,UPDATETIME,CREDITS,CP_ID,BIRTHDAY,SEX,BLOOD,WORK,INTRODUCTION,CONSTELLATION,QQ,MSN,ALWW,STATE,ADDRESS_ID,HOMETOWN_ID,VERSION,REALNAME,SALT) values(?,?,?,?,?,?,?,?,1,?,?,?,?,?,?,?,?,?,?,?,?,0,?,?)";
 18
 19     public WorkerValueEventHandler(JdbcTemplate jdbcTemplate, List<Address> address) {
 20         this.jdbcTemplate = jdbcTemplate;
 21         this.address = address;
 22     }
 23
 24     @Override
 25     public void onEvent(ValueEvent event) throws InterruptedException {
 26         final List<Result> value = event.getValue();
 27         DefaultTransactionDefinition def = new DefaultTransactionDefinition();
 28         def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
 29         def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
 30         TransactionStatus status = txManager.getTransaction(def);
 31         try {
 32             int[] num = jdbcTemplate.batchUpdate(SQL, new BatchPreparedStatementSetter() {
 33
 34                 @Override
 35                 public void setValues(PreparedStatement ps, int i) throws SQLException {
 36                     Result result = value.get(i);
 37                     ps.setLong(1, result.getUid());
 38                     String tel = result.getTel();
 39                     ps.setString(2, tel);
 40                     ps.setString(3, result.getUsername());
 41                     ps.setString(4, result.getEmail());
 42                     if (tel != null && tel.startsWith("111")) {//马甲帐号
 43                         ps.setString(5, "4b213b65fa4f12d5416354d3df6e5089");
 44                         ps.setString(22, "654cba");
 45                     } else {
 46                         ps.setString(5, result.getPassword());
 47                         ps.setString(22, result.getSalt());
 48                     }
 49                     Timestamp time = getCreateTime(result.getRegdate());
 50                     ps.setTimestamp(6, time);
 51                     ps.setTimestamp(7, time);
 52                     ps.setInt(8, result.getCredits());
 53                     Date birthDate = getBirthDate(result.getBirthyear(), result.getBirthmonth(), result.getBirthday());
 54                     ps.setDate(9, birthDate == null ? null : new java.sql.Date(birthDate.getTime()));
 55                     Integer sex = result.getGender();
 56                     ps.setInt(10, sex == 0 ? -1 : sex == 2 ? 0 : 1);
 57                     ps.setString(11, result.getBloodtype());
 58                     ps.setString(12, result.getOccupation());
 59                     ps.setString(13, result.getBio());
 60                     ps.setString(14, result.getConstellation());
 61                     ps.setString(15, result.getQq());
 62                     ps.setString(16, result.getMsn());
 63                     ps.setString(17, result.getTaobao());
 64                     ps.setInt(18, 1);
 65                     ps.setInt(19, getAddressId(result.getResidecity()));
 66                     ps.setInt(20, getAddressId(result.getBirthcity()));
 67                     ps.setString(21, result.getRealname());
 68                     if (i % 2000 == 0) {
 69                         ps.executeBatch();
 70                     }
 71
 72                 }
 73
 74                 @Override
 75                 public int getBatchSize() {
 76                     return value.size();
 77                 }
 78
 79             });
 80             txManager.commit(status);
 81             System.out.println("线程:" + Thread.currentThread().getName() + ">>>结束处理序列号为" + event.getId() + "的数据");
 82         } catch (Throwable e) {
 83             txManager.rollback(status);
 84             System.out.println(e);
 85             System.exit(0);
 86         }
 87
 88
 89     }
 90
 91     private Date getBirthDate(Integer year, Integer month, Integer day) {
 92         if (year == 0 || month == 0 || day == 0) {
 93             return null;
 94         }
 95         Calendar c = Calendar.getInstance();
 96         c.set(Calendar.YEAR, year);
 97         c.set(Calendar.MONTH, month - 1);
 98         c.set(Calendar.DATE, day);
 99         return c.getTime();
100     }
101
102     private Timestamp getCreateTime(String regDate) {
103         Long time = Long.parseLong(regDate + "000");
104         return new Timestamp(time);
105     }
106
107     private Integer getAddressId(String name) {
108         if (StringUtils.isEmpty(name)) {
109             return -1;
110         }
111         Iterator<Address> it = address.iterator();
112         Integer id = -1;
113         while (it.hasNext()) {
114             Address address = it.next();
115             String n = address.getName();
116             if (n.equals(name)) {
117                 id = address.getId();
118                 break;
119             }
120         }
121         return id;
122     }

 1 public class ValueEvent {
 2
 3     private Integer id;
 4
 5     public Integer getId() {
 6         return id;
 7     }
 8
 9     public void setId(Integer id) {
10         this.id = id;
11     }
12
13     private List<Result> value;
14
15     public List<Result> getValue() {
16         return value;
17     }
18
19     public void setValue(List<Result> value) {
20         this.value = value;
21     }
22
23     public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
24         public ValueEvent newInstance() {
25             return new ValueEvent();
26         }
27     };
28 }

最终速度还是比较快的,5min完成。1个线程读,2个线程写。双核cpu。

接下来又从7百万条数据的csv文件中导入到库中,3min就完成。

  1 public class TransferProcessor implements Runnable, InitializingBean {
  2
  3
  4     private PlatformTransactionManager txManager;
  5
  6     private JdbcTemplate oracleJdbcTemplate;
  7
  8     public PlatformTransactionManager getTxManager() {
  9         return txManager;
 10     }
 11
 12     public void setTxManager(PlatformTransactionManager txManager) {
 13         this.txManager = txManager;
 14     }
 15
 16     public JdbcTemplate getOracleJdbcTemplate() {
 17         return oracleJdbcTemplate;
 18     }
 19
 20     public void setOracleJdbcTemplate(JdbcTemplate oracleJdbcTemplate) {
 21         this.oracleJdbcTemplate = oracleJdbcTemplate;
 22     }
 23
 24     private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(2);
 25
 26     private final WorkerValueEventHandler[] handlers = new WorkerValueEventHandler[2];
 27
 28     private RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 32);
 29     private WorkerPool<ValueEvent> workerPool = null;
 30
 31     public void run() {
 32         Integer total = 0;
 33         RingBuffer<ValueEvent> ringBuffer = workerPool.start(EXECUTOR);
 34         ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
 35         Integer id = 1;
 36         BufferedReader reader = null;
 37         try {
 38             List<String> values = new ArrayList<String>();
 39             reader = new BufferedReader(new FileReader("d:\\black.csv"));
 40             reader.readLine();
 41             String line = null;
 42             while ((line = reader.readLine()) != null) {
 43                 total += 1;
 44                 line = line.replace("\"", "");
 45                 if (line.length() > 11) {
 46                     line = line.substring(line.length() - 11);
 47                 }
 48                     values.add(line);
 49
 50                 if (values.size() % 10000 == 0) {
 51                     publishEvent(ringBuffer, id, new ArrayList<String>(values));
 52                     values = new ArrayList<String>();
 53                     id++;
 54                 }
 55             }
 56             if (values.size() != 0) {
 57                 publishEvent(ringBuffer, id, new ArrayList<String>(values));
 58             }
 59         } catch (IOException e) {
 60             e.printStackTrace();
 61             System.exit(0);
 62         } finally {
 63             try {
 64                 if (reader != null)
 65                     reader.close();
 66             } catch (IOException e) {
 67                 e.printStackTrace();
 68             }
 69         }
 70         while (true) {
 71             try {
 72                 Thread.sleep(500);
 73             } catch (InterruptedException e) {
 74                 e.printStackTrace();
 75             }
 76             System.out.println("RingBuffer中剩余数据量:" + (32 - ringBuffer.remainingCapacity()));
 77             if (ringBuffer.remainingCapacity() == 32) {
 78                 System.out.println("数据迁移结束!总共" + total + "条记录");
 79                 System.exit(0);
 80             }
 81         }
 82
 83     }
 84
 85     protected void publishEvent(RingBuffer<ValueEvent> ringBuffer, Integer id, List<String> values) {
 86         long sequence = ringBuffer.next();
 87         ValueEvent event = ringBuffer.claimAndGetPreallocated(sequence);
 88         event.setId(id);
 89         event.setValues(values);
 90         ringBuffer.publish(sequence);
 91     }
 92
 93     @Override
 94     public void afterPropertiesSet() throws Exception {
 95         for (int i = 0; i < 2; i++) {
 96             handlers[i] = new WorkerValueEventHandler(oracleJdbcTemplate);
 97             handlers[i].setTxManager(txManager);
 98         }
 99         workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), handlers);
100     }
101
102 }

转载于:https://www.cnblogs.com/suyuji/p/3640666.html

使用disruptor 将5百多万数据从mysql导入到oracle相关推荐

  1. 数据从mysql迁移至oracle时知识点记录(一)

    最近在做数据的迁移,再将数据从mysql迁移至oracle时,部分sql语句进行了修改,在此对部分知识点进行记录: 参考资料:https://dev.mysql.com/doc/refman/5.5/ ...

  2. mysql11导入数据_MySQL专题11之MySQL导出数据、MySQL导入数据

    1.MySQL导出数据 -  MySQL中你可以使用SELECT...INTO OUTFILE语句来简单的导出数据到文本文件中. a.使用SELECT...INTO OUTFILE -  以下实例中我 ...

  3. 将数据从MySql导入数据至SQL Server 2000

    在一款应用中用到SQL Server 2000和MySql,进行数据查询的时候需要SQL Server 2000中的数据和MySql中的部分表数据,所以需要通过将mysql的数据导入到SQL Serv ...

  4. Magicodes.IE 在100万数据量下导入导出性能测试

    原文作者:HueiFeng 前言 目前Magicodes.IE更新到了2.2.3,感谢大家的支持,同时建议大家在使用过程中如果遇到一些问题或者说需要一些额外的功能可以直接提issues,当然更建议大家 ...

  5. 批量导入时间oracle excel,读取Excel数据、批量导入到Oracle数据库

    /** * @Description: 得到Excel文档,把文档中的数据批量导入到数据库中 * 1.找到上传的数据,2.把数据放到List集合中,3.把List集合中的数据更新到数据库 * @ret ...

  6. java千万级数据txt文件导入数据库

      最近在做项目的时候,有要把txt文件导入到数据库,txt文件有千万级,如果使用传统的读文件,写数据库,效率很慢.自己按照这种方式使用100万条数据的txt文件导入到oracle数据库,花费了二十多 ...

  7. python连接oracle批量写入_oracle大数据量python导入实践-1w/s

    在项目中需要将一个80w+的csv数据(200+m)导入到oracle库,一开始使用的是Navicat for Oracle的导入工具.跑了五六分钟之后绝望了,因为才跑了2%,按这样的速度跑半天都跑不 ...

  8. Oracle备份与恢复 expdp/impdp数据泵远程导入导出

    Oracle备份与恢复 expdp/impdp数据泵远程导入导出 Oracle在10g的时候有一个很好用的导出工具expdp(数据泵) 但是这个工具好用的同时,有一个局限,就是必须用本地的用户才可以导 ...

  9. hive向mysql导数据_Mysql Hive 导入导出数据

    ---王燕行转列sql select split(concat_ws(',',collect_set(cast(smzq as string))),',')[1] ,split(concat_ws(' ...

最新文章

  1. python 归一化_只需 45 秒,Python 给故宫画一组手绘图!
  2. 调参侠看过来!两个提高深度学习训练效率的绝技
  3. 麻烦大家看了我的文章觉得有用就关注我下
  4. Android UI学习 - Linear Layout, RelativeLayout
  5. 2016年研究数据可视化最不应该错过的10篇文章
  6. xp计算机管理窗口,我的xp系统在“打开”窗口中没有“我的电脑”一项,只有界面、我的文档和界面,怎办?...
  7. boost::graph模块实现bellman算法的测试程序
  8. java 8 lambda_Java 8的烹调方式– Lambda项目
  9. Spring + Dubbo + zookeeper (linux) 框架搭建
  10. 聊聊缓存机制:双写兜兜转转,又回到了串行化
  11. 博客系统评论模块列表转树形结构
  12. OpenJudge NOI 1.5 08:多边形内角和
  13. c语言程序的基本规范是什么,C语言编程规范——3 命名规则
  14. Eclipse自动部署项目到Tomcat的webapps下的有效方法
  15. Python+OpenCV:直方图均衡化(Histogram Equalization)
  16. jQuery.ajax 调用 服务(.aspx,.asmx)
  17. 免费开源!仿微信仿陌陌类APP源代码整项目开源,包括ADT项..
  18. nasal脚本起源与环境搭建(flightgear开源项目)
  19. 最全地理数据下载网址
  20. CSS背景图片代码示例

热门文章

  1. 【WPF】级联Combobox及其与ListView的联动
  2. matlab降压启动,基于 Matlab 的笼形异步电动机降压启动分析
  3. 稳压器MC7805CDTRKG的特点
  4. 《学习笔记》------温故而知新
  5. 【多图预警,不懂来敲我】图说HashMap原理和流程
  6. Android进阶免费直播课,高中免费直播课程平台有哪些
  7. Optomind Inc.、II-VI Incorporated、MACOM与MultiLane SAL在OFC 2019上携手展示200G QSFP56 AOC
  8. 2.7V至25V宽输入电压15A 峰值电流
  9. 三维坐标轴html实现,CSS3三维变形,其实很简单!
  10. 万达体育走到新的十字路口