在大型企业中,由于业务复杂、数据量大、数据格式不同、数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理。而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理。这样的过程就是“批处理”。
批处理应用通常有以下特点:
Spring batch是一个轻量级的全面的批处理框架,它专为大型企业而设计,帮助开发健壮的批处理应用。Spring batch为处理大批量数据提供了很多必要的可重用功能,比如日志追踪、事务管理、job执行统计、重启job和资源管理等。同时它也提供了优化和分片技术用于实现高性能的批处理任务。
它的核心功能包括:
笔者所在的部门属于国外某大型金融公司的CRM部门,在日常工作中我们经常需要开发一些批处理应用,对Spring Batch有着丰富的使用经验。近段时间笔者特意总结了这些经验。
在使用Spring Batch时推荐使用最新的Spring Batch 3.0版本。相比Spring Batch2.2,它做了以下方面的提升:
支持Spring4和Java8是一个重大的提升。这样就可以使用Spring4引入的Spring boot组件,从而开发效率方面有了一个质的飞跃。引入Spring-batch框架只需要在build.gradle中加入一行代码即可:
compile("org.springframework.boot:spring-boot-starter-batch")
而增强Spring Batch Integration的功能后,我们就可以很方便的和Spring家族的其他组件集成,还可以以多种方式来调用job,也支持远程分区操作以及远程块处理。
而支持JobScope后我们可以随时为对象注入当前Job实例的上下文信息。只要我们指定Bean的scope为job scope,那么就可以随时使用jobParameters和jobExecutionContext等信息。
@Component
@JobScope
public class CustomClass {
@Value("#{jobParameters[jobDate]}")
private String jobDate;
@Value("#{jobExecutionContext['input.name']}.")
private String fileName;
}
之前我们在配置job和step的时候都习惯用xml的配置方式,但是随着时间的推移发现问题颇多。
我们渐渐发现使用纯Java类的配置方式更灵活,它是类型安全的,而且IDE的支持更好。在构建job或step时采用的流式语法相比xml更加简洁易懂。
@Bean
public Step step(){
return stepBuilders.get("step")
.chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.faultTolerant()
.skipLimit(10)
.skip(UnknownGenderException.class)
.listener(logSkipListener())
.build();
}
在这个例子中可以很清楚的看到该step的配置,比如reader/processor/writer组件,以及配置了哪些listener等。
Spring batch在运行时需要数据库支持,因为它需要在数据库中建立一套schema来存储job和step运行的统计信息。而在本地集成测试中我们可以借助Spring batch提供的内存Repository来存储Spring batch的任务执行信息,这样既避免了在本地配置一个数据库,又可以加快job的执行。先为Job的配置类添加扩展类:DefaultBatchConfigurer。
public class CustomJobConfiguration extends DefaultBatchConfigurer {
...
}
我们在build.gradle中加入对hsqldb的依赖:
runtime(‘org.hsqldb:hsqldb:2.3.2’)
然后在测试类中添加对DataSource的配置。
@EnableAutoConfiguration
@EnableBatchProcessing
@DataJpaTest
@Import({DataSourceAutoConfiguration.class, BatchAutoConfiguration.class})
public class TestConfiguration {
}
并且在applicaton.properties配置中添加初始化Database的配置:
spring.batch.initializer.enable=true
Spring batch在配置Step时采用的是基于Chunk的机制。即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作。这样可以最大化的优化写入效率,整个事务也是基于Chunk来进行。
当我们在需要将数据写入到文件、数据库中之类的操作时可以适当设置Chunk的值以满足写入效率最大化。但有些场景下我们的写入操作其实是调用一个web service或者将消息发送到某个消息队列中,那么这些场景下我们就需要设置Chunk的值为1,这样既可以及时的处理写入,也不会由于整个Chunk中发生异常后,在重试时出现重复调用服务或者重复发送消息的情况。
Spring batch提供了大量的Listener来对job的各个执行环节进行全面的监控。
在job层面Spring batch提供了JobExecutionListener接口,其支持在Job开始或结束时进行一些额外处理。在step层面Spring batch提供了StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener等接口,同时对Retry和Skip操作也提供了RetryListener及SkipListener。
通常我们会为每个job都实现一个JobExecutionListener,在afterJob操作中我们输出job的执行信息,包括执行时间、job参数、退出代码、执行的step以及每个step的详细信息。这样无论是开发、测试还是运维人员都对整个job的执行情况了如指掌。
如果某个step会发生skip的操作,我们也会为其实现一个SkipListener,并在其中记录skip的数据条目,用于下一步的处理。
实现Listener有两种方式,一种是继承自相应的接口,比如继承JobExecutionListener接口,另一种是使用annoation(注解)的方式。经过实践我们认为使用注解的方式更好一些,因为使用接口你需要实现接口的所有方法,而使用注解则只需要对相应的方法添加annoation即可。
下面的这个类采用了继承接口的方式,我们看到其实我们只用到了第一个方法,第二个和第三个都没有用到。但是我们必须提供一个空的实现。
public class CustomSkipListener implements SkipListener {
@Override
public void onSkipInRead(Throwable t) {
// business logic
}
@Override
public void onSkipInWrite(String item, Throwable t) {
// no need
}
@Override
public void onSkipInProcess(String item, Throwable t) {
// no need
}
}
而使用annoation的方式可以简写为:
public class CustomSkipListener {
@OnSkipInRead
public void onSkipInRead(Throwable t) {
// business logic
}
}
在处理百万级的数据过程过程中难免会出现异常。如果一旦出现异常而导致整个批处理工作终止的话那么会导致后续的数据无法被处理。Spring Batch内置了Retry(重试)和Skip(跳过)机制帮助我们轻松处理各种异常。我们需要将异常分为三种类型。第一种是需要进行Retry的异常,它们的特点是该异常可能会随着时间推移而消失,比如数据库目前有锁无法写入、web服务当前不可用、web服务满载等。所以对它们适合配置Retry机制。第二种是需要Skip的异常,比如解析文件的某条数据出现异常等,因为对这些异常即使执行Retry每次的结果也都是相同,但又不想由于某条数据出错而停止对后续数据的处理。第三种异常是需要让整个Job立刻失败的异常,比如如果出现了OutOfMemory的异常,那么需要整个Job立刻终止运行。
一般来说需要Retry的异常也要配置Skip选项,从而保证后续的数据能够被继续处理。我们也可以配置SkipLimit选项保证当Skip的数据条目达到一定数量后及时终止整个Job。
有时候我们需要在每次Retry中间隔做一些操作,比如延长Retry时间,恢复操作现场等,Spring Batch提供了BackOffPolicy来达到目的。下面是一个配置了Retry机制、Skip机制以及BackOffPolicy的step示例。
@Bean
public Step step(){
return stepBuilders.get("step")
.chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.faultTolerant()
.skipLimit(10)
.skip(UnknownGenderException.class)
.skip(ServiceUnavailableException.class)
.retryLimit(5)
.retry(ServiceUnavailableException.class)
.backOffPolicy(backoffPolicy)
.listener(logSkipListener())
.build();
}
在Job执行过程中不一定都是顺序执行的,我们经常需要根据某个job的输出数据或执行结果来决定下一步的走向。以前我们会把一些判断放置在下游step中进行,这样可能会导致有些step实际运行了,但其实并没有做任何事情。比如一个step执行过程中会将失败的数据条目记录到一个报告中,而下一个step会判断有没有生成报告,如果生成了报告则将该报告发送给指定联系人,如果没有则不做任何事情。这种情况下可以通过Decider机制来实现Job的执行流程。在Spring batch 3.0中Decider已经从Step中独立出来,和Step处于同一级别。
public class ReportDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
if (report.isExist()) {
return new FlowExecutionStatus(“SEND"); } return new FlowExecutionStatus(“SKIP");
}
}
而在job配置中可以这样来使用Decider。这样整个Job的执行流程会更加清晰易懂。
public Job job() {
return new JobBuilder("petstore")
.start(orderProcess())
.next(reportDecider)
.on("SEND").to(sendReportStep)
.on("SKIP").end().build()
.build();
}
批处理工作处理的数据量大,而执行窗口一般又要求比较小。所以必须要通过多种方式来加速Job的执行。一般我们有四种方式来实现:
单个step多线程执行任务可以借助于taskExecutor来实现。这种情况适合于reader、writer是线程安全且是无状态的场景。我们还可以设置线程数量。
public Step step() {
return stepBuilders.get("step")
.tasklet(tasklet)
.throttleLimit(20)
.build();
}
上述示例中的tasklet需要实现TaskExecutor,Spring Batch提供了一个简单的多线程TaskExecutor供我们使用:SimpleAsyncTaskExecutor。
并行执行不同的Step在Spring batch中很容易实现,以下是一个示例:
public Job job() {
return stepBuilders.get("parallelSteps")
.start(step1)
.split(asyncTaskExecutor).add(flow1, flow2)
.next(step3)
.build();
}
在这个示例中我们先执行step1,然后并行执行flow1和flow2,最后再执行step3。
Spring batch提供了PartitionStep来实现对同一个step在多个进程中实现并行处理。通过PartitonStep再配合PartitionHandler可以将一个step扩展到多个Slave上实现并行运行。
远程执行Chunk任务则是将某个Step的processer操作分割到多个进程中,多个进程通过一些中间件进行通讯(比如采用消息的方式)。这种方式适合于Processer是瓶颈而Reader和Writer不是瓶颈的场景。
Spring Batch对批处理场景进行了合理的抽象,封装了大量的实用功能,使用它来开发批处理应用可以达到事半功倍的效果。在使用的过程中我们仍需要坚持总结一些最佳实践,从而能够交付高质量的可维护的批处理应用,满足企业级应用的苛刻要求。
作者:无敌北瓜
来源:51CTO
文章浏览阅读2.8k次。数据库中字段为CLOB的属性,在Java实体类中不能使用CLOB去声明,否则会报错。解决方法:@Lob@Basic(fetch = FetchType.EAGER)@Column(name = "THEORY_CONTENT", columnDefinition = "CLOB", nullable = true)private String theoryContent;这..._有clob字段的表怎么定义实体
文章浏览阅读373次。电机与动力系统测试电机及驱动器运行效率分析凭借PA 8000功率分析仪强大的分析能力,一台功率分析仪可以对电动汽车的功率、效率、电机输出等电参数进行准确测试,其中还包含了变频器的效率、电机效率和电池DC-AC 的转换效率等参数。整车动力系统测试- 系统集成在整车动力系统测试中,功率分析仪作为核心部件,可对被测电动汽车电机及控制器进行高精度、高带宽实时同步采集及波形记录。使用PA8000 功率分析仪..._直流电机linux驱动
文章浏览阅读3.5k次,点赞3次,收藏15次。1、C# csharpMicrosoft为.NET推出的高级编程语言。.NET是微软的多语言开发平台,用于构建和运行应用程序。Mono是Novell公司支持在其他操作系统之下开发.NET程序的框架,而不是只适用于Windows。Unity借助Mono实现跨平台,核心是.NET Framework框架。这样子Unity借助Mono实现跨平台开发。所以Unity的核心是C#和Mono。.Net框架的组件:公共语言运行库(CLR).Net框架类库公共语言规范(CLS)通用类型系统元数据和组件_c# unity3d
文章浏览阅读1.8k次。今天我们一起来分享一些有趣的HTML5鼠标动画,当我们移动鼠标时,页面上将会出现一些神奇的动画特效。当然这些动画可能在实际应用中并不太会使用到,但是对大家研究HTML5和CSS3的帮助应该会非常大。本文分享的7个HTML5鼠标动画都提供源代码下载,都是一些不错的资源。1、HTML5鲸鱼动画今天我找到了基于HTML5的鲸鱼动画,鲸鱼会随着鼠标的移动而游动,画面非常立体,鲸鱼也超级逼真。真的,HTML..._html鼠标移动出现动画
文章浏览阅读1.2k次。问题现象发现监控进程获取MySQL数据的时候报错导致监控进程报错:mysql: [Warning] Using a password on the command line interface can be insecure.ERROR 1054 (42S22) at line 1: Unknown column 'sync_binlog' in 'where clause'mysql: [War..._using a password on the command line interface can be insecure. error 1054 (
文章浏览阅读4.8k次。我们在做网页的时候, 会遇到一个图片和文字位置对不齐的问题.虽然解决的办法有很多.比如. 用table来控制图片和文字居中对齐 (这样会因为一个小图片写多很多代码) 使用图片的Align属性 设置为align="absmiddle" (但是要知道,标准的W3C中定义的img标签是不支持Align属性的)以上的方法都不是解决问题最好的办法.后来无意中发现可以用CSS来解决这个问题._如何让文字在两张图片下方居中
文章浏览阅读2w次,点赞53次,收藏90次。node_sass和sass-loader疯狂报错的原因总结和解决办法 若是有什么不对的地方请多多指教_module build failed: error: node sass version 9.0.0 is incompatible with ^4.
文章浏览阅读3.5k次。我遇到了我认为可能是Mockito的错误,但是想知道是否有其他人可以解释为什么这个测试不起作用.基本上,我有两个对象,如下所示:public class FirstObject {private SecondObject secondObject;public SecondObject getSecondObject() { return secondObject; }}public class ..._mockito无返回值模拟报错
文章浏览阅读1.7k次。VersionEdit和MANIFEST文件到底是什么关系?VersionEdit会保存在MANIFEST文件中。VersionEdit就相当于MANIFEST文件中的一条记录。 VersionEdit是version对象的变更记录,用于写入MANIFEST文件。这样通过原始的version加上一系列的versionedit的记录,就可以恢复到最新状态。VersionEdit分析下面来看Versi_reusing manifest
文章浏览阅读510次。最小生成树树图/连通图/连通分量强联通图/强联通分量支撑子图/支撑树/最小支撑树为什么要研究最小支撑树通用贪心算法kruskal算法Prim算法割/交叉边/最小交叉边/割与图的交集/尊重安全边kruskal和prim算法的本质三种割(割权值最小边/割邻居/瞎割)kruskal与prim代码实现与复杂度分析最宽路径问题最短路径的基本概念(源点可达负回路,简单路径,松弛操作)
文章浏览阅读73次。1,按钮此类创建一个标签按钮。当按下该按钮时,应用程序能执行某项动作。它有两种构造方法:public Button()构造一个标签字符串为空的按钮。public Button(String label)构造一个带指定标签的按钮。当用户鼠标单击按钮时,AWT事件处理系统将向按钮发送一个ActionEvent事件对象,如果应用程序需要对此做出反应,那么就需要注册事件监听程序并实现ActionListe..._java语言中awt组件的基本用法
文章浏览阅读730次。DECLARE @str NVARCHAR(MAX)= '哈哈哈 '方法:CREATE FUNCTION [dbo].[removehtml] ( @str NVARCHAR(MAX) )RETURNS NVARCHAR(MAX)ASBEGINWHILE CHARINDEX(' 0BEGINSET @str = STUFF(@str, CHARINDEX('CHARINDEX('>', @..._sqlserver removehtml