社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
Spring batch整体的架构设计使用如下关系图来进行表示:
虽然Job对象看上去像是对于多个Step的一个简单容器,但是开发者必须要注意许多配置项。此外,Job的运行以及Job运行过程中元数据如何被保存也是需要考虑的。本章将会介绍Job在运行时所需要注意的各种配置项。
Job接口 的实现有多个,但是在配置上命名空间存在着不同。必须依赖的只有三项:名称 name,JobRespository 和 Step的列表:
<job id="footballJob">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s2" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
</job>
在这个例子中使用了父类的bean定义来创建step,更多描述step配置的信息可以参考step configuration这一节。XML命名空间默认会使用id为'jobRepository'的引用来作为repository的定义。然而可以向如下显式的覆盖:
<job id="footballJob" job-repository="specialRepository">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s3" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
</job>
执行批处理任务的一个关键问题是要考虑job被重启后的行为。如果一个 JobExecution 已经存在一个特定的 JobInstance,那么这个job启动时可以认为是“重启”。 理想情况下,所有任务都能够在他们中止的地方启动,但是有许多场景这是不可能的。在这种场景中就要有开发者来决定创建一个新的 JobInstance ,Spring对此也提供了一些帮助。如果job不需要重启,而是总是作为新的 JobInstance 来运行,那么可重启属性可以设置为'false':
<job id="footballJob" restartable="false">
...
</job>
设置重启属性restartable为‘false’表示‘这个job不支持再次启动’,重启一个不可重启的job会抛出JobRestartExceptio的异常:
Job job = new SimpleJob();
job.setRestartable(false);
JobParameters jobParameters = new JobParameters();
JobExecution firstExecution = jobRepository.createJobExecution(job, jobParameters);
jobRepository.saveOrUpdate(firstExecution);
try {
jobRepository.createJobExecution(job, jobParameters);
fail();
} catch (JobRestartException e) {
//预计抛出JobRestartException异常
}
这个JUnit代码展示了创建一个不可重启的Job后,第一次能够创建 JobExecution ,第二次再创建相同的JobExcution会抛出一个 JobRestartException。
在job执行过程中,自定义代码能够在生命周期中通过事件通知执行会是很有用的。SimpleJob能够在适当的时机调用JobListener:
public interface JobExecutionListener {
void beforeJob(JobExecution jobExecution);
void afterJob(JobExecution jobExecution);
}
<job id="footballJob">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s2" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
<listeners>
<listener ref="sampleListener"/>
</listeners>
</job>
无论job执行成功或是失败都会调用afterJob,都可以从 JobExecution 中获取运行结果后,根据结果来进行不同的处理:
public void afterJob(JobExecution jobExecution){
if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
//job执行成功 }
else if(jobExecution.getStatus() == BatchStatus.FAILED){
//job执行失败 }
}
<job id="baseJob" abstract="true">
<listeners>
<listener ref="listenerOne"/>
</listeners>
</job>
<job id="job1" parent="baseJob">
<step id="step1" parent="standaloneStep"/>
<listeners merge="true">
<listener ref="listenerTwo"/>
</listeners>
</job>
<job id="job1" parent="baseJob3">
<step id="step1" parent="standaloneStep"/>
<validator ref="paremetersValidator"/>
</job>
@Configuration
@EnableBatchProcessing
@Import(DataSourceCnfiguration.class)
public class AppConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Job job() {
return jobs.get("myJob").start(step1()).next(step2()).build();
}
@Bean
protected Step step1(ItemReader<Person> reader, ItemProcessor<Person, Person> processor, ItemWriter<Person> writer) {
return steps.get("step1")
.<Person, Person> chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
protected Step step2(Tasklet tasklet) {
return steps.get("step2")
.tasklet(tasklet)
.build();
}
}
<job-repository id="jobRepository"
data-source="dataSource"
transaction-manager="transactionManager"
isolation-level-for-create="SERIALIZABLE"
table-prefix="BATCH_"
max-varchar-length="1000"/>
<job-repository id="jobRepository" isolation-level-for-create="REPEATABLE_READ" />
<aop:config>
<aop:advisor pointcut="execution(* org.springframework.batch.core..*Repository+.*(..))"/>
<advice-ref="txAdvice" />
</aop:config>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*" />
</tx:attributes>
</tx:advice>
<job-repository id="jobRepository" table-prefix="SYSTEM.TEST_" />
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>
<bean id="jobRepository" class="org...JobRepositoryFactoryBean">
<property name="databaseType" value="db2"/>
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>
</bean>
@Controller
public class JobLauncherController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@RequestMapping("/jobLauncher.html")
public void handle() throws Exception{
jobLauncher.run(job, new JobParameters());
}
}
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" p:tablePrefix="BATCH_" />
<bean id="jobRegistry" class="org.spr...MapJobRegistry" />
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
<bean class="org.spr...AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.spr...DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
<bean id="jobOperator" class="org.spr...SimpleJobOperator">
<property name="jobExplorer">
<bean class="org.spr...JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
public interface JobParametersIncrementer {
JobParameters getNext(JobParameters parameters);
}
public class SampleIncrementer implements JobParametersIncrementer {
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
<job id="footballJob" incrementer="sampleIncrementer">
...
</job>
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!