2019独角兽企业重金招聘Python工程师标准>>>

JdbcPagingItemReader多线程的Step

我们最经常使用的就是 JdbcCursorItemReader,使用游标的方式逐条数据的读取。但是从spring 官方文档我们知道 ,他不是线程安全的。在这里,我们使用 JdbcPagingItemReader从数据库读取数据,并且是分页的读,而且这个类是线程安全的,那么我们就可以使用多线程的Step,从而提高JOB的执行效率。

下面是主要的配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><!-- 包的扫描 --><context:component-scan base-package="com.lyx.batch" /><bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" /><batch:step id="abstractStep" abstract="true"><batch:listeners><batch:listener ref="exceptionHandler" /></batch:listeners></batch:step><bean id="abstractJdbcPagingItemReader" abstract="true"class="org.springframework.batch.item.database.JdbcPagingItemReader"><property name="dataSource" ref="dataSource" /></bean><!-- add people desc job begin --><batch:job id="addPeopleDescJob"><batch:step id="addDescStep" parent="abstractStep"><batch:tasklet><batch:chunk reader="peopleAddDescReader" processor="addDescProcessor"writer="addDescPeopleWriter" commit-interval="2" /></batch:tasklet></batch:step></batch:job><!-- add people desc job end --><!-- 使用分页的reader begin --><bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader"scope="step"><property name="queryProvider"><beanclass="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean"><property name="dataSource" ref="dataSource" /><property name="selectClause" value="select person_id, first_name, last_name" /><property name="fromClause" value="from people" /><property name="whereClause"value="where ( first_name like :first_name or last_name like :last_name ) " /><property name="sortKey" value="person_id" /></bean></property><property name="parameterValues"><map><entry key="first_name" value="#{jobParameters['first_name']}" /><entry key="last_name" value="#{jobParameters['last_name']}" /></map></property><!-- 配置limit的大小 --><property name="pageSize" value="2" /><property name="rowMapper" ref="peopleRowMapper" /></bean><!-- 使用分页的reader end --><bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" /><bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" /><bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" /><!--tomcat jdbc pool数据源配置 --><bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"destroy-method="close"><property name="poolProperties"><bean class="org.apache.tomcat.jdbc.pool.PoolProperties"><property name="driverClassName" value="com.mysql.jdbc.Driver" /><property name="url" value="jdbc:mysql://localhost:3306/test" /><property name="username" value="root" /><property name="password" value="034039" /></bean></property></bean><!-- spring batch 配置jobRepository --><batch:job-repository id="jobRepository"data-source="dataSource" transaction-manager="transactionManager"isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"max-varchar-length="1000" /><!-- spring的事务管理器 --><bean id="transactionManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager"><property name="dataSource" ref="dataSource" /></bean><!-- batch luncher --><bean id="jobLauncher"class="org.springframework.batch.core.launch.support.SimpleJobLauncher"><property name="jobRepository" ref="jobRepository" /></bean>
</beans>

主要的配置就是:

<!-- 使用分页的reader begin -->
<bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader"scope="step"><property name="queryProvider"><beanclass="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean"><property name="dataSource" ref="dataSource" /><property name="selectClause" value="select person_id, first_name, last_name" /><property name="fromClause" value="from people" /><property name="whereClause"value="where ( first_name like :first_name or last_name like :last_name ) " /><property name="sortKey" value="person_id" /></bean></property><property name="parameterValues"><map><entry key="first_name" value="#{jobParameters['first_name']}" /><entry key="last_name" value="#{jobParameters['last_name']}" /></map></property><!-- 配置limit的大小 --><property name="pageSize" value="2" /><property name="rowMapper" ref="peopleRowMapper" />
</bean>
<!-- 使用分页的reader end -->

其他类的从前面的文章找出处,下面是我为了调试与前面不同的类

PeopleRowMapper.java

package com.lyx.batch;import java.sql.ResultSet;
import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;public class PeopleRowMapper implements RowMapper<People> {public People mapRow(ResultSet rs, int rowNum) throws SQLException {People p = new People();System.out.println("-----------------------person_id-"+ rs.getInt("person_id"));p.setId(rs.getInt("person_id"));p.setFirstName(rs.getString("first_name"));p.setLastName(rs.getString("last_name"));return p;}}

运行程序AppMain14.java

package com.lyx.batch;import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;/*** 测试 使用分页的 reader* * @author Lenovo**/
public class AppMain14 {public static void main(String[] args)throws JobExecutionAlreadyRunningException, JobRestartException,JobInstanceAlreadyCompleteException, JobParametersInvalidException {long startTime = System.currentTimeMillis(); // 获取开始时间@SuppressWarnings("resource")ApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath:spring-batch-paging.xml" });JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();jobParametersBuilder.addString("first_name", "%JOHN%");jobParametersBuilder.addString("last_name", "%DOE%");Job job = (Job) context.getBean("addPeopleDescJob");JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");JobExecution result = launcher.run(job,jobParametersBuilder.toJobParameters());ExitStatus es = result.getExitStatus();if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {System.out.println("任务正常完成");} else {System.out.println("任务失败,exitCode=" + es.getExitCode());}long endTime = System.currentTimeMillis(); // 获取结束时间System.out.println("程序运行时间: " + (endTime - startTime) + "ms");}}

运行结果:

-----------------------person_id-157

-----------------------person_id-158

process people desc

process people desc

write people desc

write people desc

任务正常完成

程序运行时间: 11929ms

十一月 22, 2014 12:29:50 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]

最后是成功了。其实更重要的是JdbcPagingItemReader 的实现方式和源码。为什么他是线程安全的,为什么他能分页读,这才是我们最终关心的。

这里我们使用的还是单线程的方式运行的该job ,下面我们来配置多线程的step。配置很简单。就是配置一个异步的spring task executor,使用该异步 task executor 来运行我们的job。

看如下的配置:

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob"><batch:step id="addDescStep" parent="abstractStep"><batch:tasklet task-executor="taskExecutor"><batch:chunk reader="peopleAddDescReader" processor="addDescProcessor"writer="addDescPeopleWriter" commit-interval="2" /></batch:tasklet></batch:step>
</batch:job>
<!-- add people desc job end -->

这里的 taskExecutor 就是一个异步的 task executor。

下面运行一下多线程的step。运行结果:

-----------------------person_id-157

-----------------------person_id-158

process people desc

process people desc

write people desc

write people desc

十一月 22, 2014 1:01:35 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 8577ms

成功,好的,你是否注意到和上面的单线程的step 比,是不是程序运行的时间要少了。

关于 JdbcPagingItemReader 的实现方式和其线程安全性,如何分页,JdbcPagingItemReader的分页策略我们在下面文章道来。

==============END==============

转载于:https://my.oschina.net/xinxingegeya/blog/347379

JdbcPagingItemReader多线程的Step相关推荐

  1. python多线程下的信号处理程序示例

    下面是一个网上转载的实现思路,经过验证,发现是可行的,就记录下来. 思路 python多线程中要响应Ctrl+C的信号以杀死整个进程,需要: 1.把所有子线程设为Daemon: 2.使用isAlive ...

  2. java多线程工具类_Java多线程系列之:线程的并发工具类

    一,Fork-Join 1,定义: Fork-Join框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不能再拆时),再将一个个的小任务运算的结果进行join汇总. 2, ...

  3. java futher多线程_Java多线程系列--“JUC集合”05之 ConcurrentSkipListMap

    概要 本章对Java.util.concurrent包中的ConcurrentSkipListMap类进行详细的介绍.内容包括: ConcurrentSkipListMap介绍 ConcurrentS ...

  4. python多线程内存越要越大_Python 面试:这9个问题你一定要掌握!

    作为一个程序员,可能或多或少经历过一些技术面试,有些是编程语言本身的问题,有些是跟工程相关的问题. 笔者自己被面试过或者面试过别人,今天我们来总结关于Python程序员面试的时候经常被问到的9个问题, ...

  5. python多线程和多进程的使用_python多线程与多进程

    python多线程与多进程 python多线程 python中提供两个标准库thread和threading用于对线程的支持,python3中已放弃对前者的支持,后者是一种更高层次封装的线程库,接下来 ...

  6. linux下多线程的调试

    多线程调试的基本命令(均在gdb命令行使用):    info threads ---- 显示当前可调试的全部线程.每个线程都有自己的线程ID,显示结果中前面有*的表示当前调试的线程.    eg: ...

  7. java代码读取dbsequence的值_MongoDB自增序列实现 - Java多线程同步 synchronized 用法

    在使用MongoDB的时候 (基于spring-mongo) ,我想在插入对象时获取有序自增的主键 ,但是MongoDB的默认规则是生成一串无序 (大致有序) 的字串 .而Spring Data提供的 ...

  8. 多线程处理器 适用于 已知要处理任务的个数,进行多线程处理

    核心的类代码 using System; using System.Collections.Generic; using System.Text; using System.Collections; ...

  9. 『TensorFlow』第十一弹_队列多线程TFRecod文件_我辈当高歌

    TF数据读取队列机制详解 一.TFR文件多线程队列读写操作 TFRecod文件写入操作 import tensorflow as tf def _int64_feature(value):# valu ...

最新文章

  1. 【MaxCompute】学习笔记常用查询sql
  2. lsb_release查看当前系统的发行版信息
  3. python3 安装pyinstaller_python3.7 打包(.exe)神器——pyinstaller 安装及用法
  4. 浅析SAX,DOM,JAXP,JDOM与DOM4J之间的关系
  5. EasyExcel中输出为时间格式
  6. 互联网晚报 | 9月3日 星期五 | vivo正式公布自研芯片V1;天猫公布今年双11节奏;网易云音乐去除歌曲“独家”标志...
  7. 小米手机,测试时应用图标不能更改,还出现偶尔应用版本还原
  8. sysV init服务脚本(入门级)
  9. Python 之如何暴力破解加密文件
  10. 2022N1叉车司机国家题库及答案
  11. linux脚本无法执行命令,shell脚本在Windows下能执行,而Linux不能执行的原因及解决...
  12. win10安装Docker Desktop完成之后打开一直显示Docker failed to initialize
  13. 焦虑症和抑郁症的区别
  14. linux 类似winscp_winscp 有没有Linux版的
  15. mysql连接数怎么清理_MySQL连接数太多应该怎么解决?
  16. 教学方法(学科教学法)
  17. linux之vim下载及编写规则
  18. 【转】《背包九讲》--崔添翼大神
  19. 视频显示输出接口总结
  20. android 模拟器 blue,Win下的Android模拟器BlueStacks

热门文章

  1. __slots__的作用
  2. 126篇殿堂级深度学习论文分类整理 从入门到应用(下)
  3. ajax empty,jQuery empty仅在AJAX调用后的第二次单击时起作用
  4. java课程设计 成绩_Java课程设计—学生成绩管理系统(201521123004-林艺如)
  5. Python 十六进制转Base64_python基础day03笔记
  6. mysql jdbc批量更新_jdbc批量更新数据
  7. requestmapping配置页面后_@RequestMapping接口及页面乱码问题
  8. Error querying database.Cause:java.sql.SQLSyntaxErrorException:ORA-00911:invalid character
  9. 清空数据库部分表的数据 Oracle
  10. 项目管理一般知识:项目生命周期