从头认识SpringBatch批处理框架--实例场景一信用卡消费对账 - Go语言中文社区

从头认识SpringBatch批处理框架--实例场景一信用卡消费对账


场景说明

个人使用信用卡消费,银行定期发送银行卡消费账单,本例将模拟银行处理个人信用卡消费对账单对账,银行需要定期地把个人消费的记录导出成csv文件,然后交给对账系统处理。

主要流程

(从credit-card-bill-201303.csv)读取数据---->处理数据----->写数据到 outputFile文件

项目结构


项目结构说明:

  • CreditBill:信用卡消费记录领域对象
  • CreditBillProcessor:信用卡消费记录处理类
  • jobLaunch:调用批处理作业类
  • jobLaunchTest:Junit单元测试,使用Spring提供的测试框架
  • credit-card-bill-201303.csv:信用卡消费账单文件
  • job-context.xml:定义作业基础信息
  • job.xml:定义作业文件
  • outputFile.xml:输出处理过后的信用卡消费账单文件
项目实现步骤详解:
1.准备credit-card-bill-201303.csv对账文件
这里我们使用csv格式的文件,该文件的每一行表示信用卡消费记录,中间用逗号分隔,分别表示:
银行卡账户、账户名、消费金额、消费日期、消费场所如下图所示:


2.定义领域对象:
为了与账单文件形成映射,需要新建信用卡消费记录对象 -CreditBill,主要属性:银行卡账户、账户名、消费金额、消费日期、消费场所
具体代码如下:
package com.my.domain;
/**
 * 信用卡消费记录领域对象
 * 该类主要用于在ItemReader读取文件数据之后转换成领域对象CreditBill,
 * 以便于ItemProcessoe和ItemWriter操作使用
 * @author wbw
 *
 */
public class CreditBill {
    /**
     * 银行账户
     */
	private String accountID;
	/**
	 * 账户名
	 */
	private String name;
	/**
	 * 消费金额
	 */
	private double amount;
	/**
	 * 消费日期
	 */
	private String date;
	/**
	 * 消费场所
	 */
	private String address;
	/**
	 * @return the 银行账户
	 */
	public String getAccountID() {
		return accountID;
	}
	/**
	 * @param 银行账户 the accountID to set
	 */
	public void setAccountID(String accountID) {
		this.accountID = accountID;
	}
	/**
	 * @return the 账户名
	 */
	public String getName() {
		return name;
	}
	/**
	 * @param 账户名 the name to set
	 */
	public void setName(String name) {
		this.name = name;
	}
	/**
	 * @return the 消费金额
	 */
	public double getAmount() {
		return amount;
	}
	/**
	 * @param 消费金额 the amount to set
	 */
	public void setAmount(double amount) {
		this.amount = amount;
	}
	/**
	 * @return the 消费日期
	 */
	public String getDate() {
		return date;
	}
	/**
	 * @param 消费日期 the date to set
	 */
	public void setDate(String date) {
		this.date = date;
	}
	/**
	 * @return the 消费场所
	 */
	public String getAddress() {
		return address;
	}
	/**
	 * @param 消费场所 the address to set
	 */
	public void setAddress(String address) {
		this.address = address;
	}

	
}
3.定义job-context.xml批处理基础信息
该配置文件主要是配置作业仓库、作业调度器、事务管理器,具体代码如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"    
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:p="http://www.springframework.org/schema/p"    
	xmlns:tx="http://www.springframework.org/schema/tx" 
	xmlns:aop="http://www.springframework.org/schema/aop"    
	xmlns:context="http://www.springframework.org/schema/context"    
	xsi:schemaLocation="http://www.springframework.org/schema/beans  
	http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
	http://www.springframework.org/schema/tx  
	http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
	http://www.springframework.org/schema/aop  
	http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
	http://www.springframework.org/schema/context  
	http://www.springframework.org/schema/context/spring-context-2.5.xsd"    
	default-autowire="byName">
	<!--定义作业仓库SpringBatch提供了两种作业仓库来记录job执行期产生的信息:一种是内存,另一种是数据库    -->
	<bean id="jobRepository" 
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
	</bean>
	<!--定义作业调度器,用来启动Job -->
	<bean id="jobLauncher" 
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository"/>
	</bean>
	<!--事务管理器,用于springbatch框架在对数据操作过程中体统事务能力-->
	<bean id="transactionManager" 
		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
</beans>
4.定义job.xml文件
job.xml文件主要配置批处理作业Job、Step、ItemReader(读数据)、ItemProcessoe(处理数据)、 ItemWriter(写数据) 具体的配置如下图:
<?xml version="1.0" encoding="UTF-8"?>
<bean:beans xmlns="http://www.springframework.org/schema/batch"    
    xmlns:bean="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
    xmlns:p="http://www.springframework.org/schema/p" 
    xmlns:tx="http://www.springframework.org/schema/tx"    
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context"    
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
    http://www.springframework.org/schema/tx 
    http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
    http://www.springframework.org/schema/aop 
    http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-2.5.xsd
    http://www.springframework.org/schema/batch 
    http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">
	<!--引入job-context.xml配置文件-->
    <bean:import resource="classpath:job-context.xml"/>
    <!--定义billJob billStep 包含读数据  处理数据 写数据-->
    <job id="billJob">
        <step id="billStep">
            <tasklet transaction-manager="transactionManager">
            	<!--commit-interval="2" 表示任务提交间隔的大小 此处表示每处理2条数据 进行一次写入操作-->
                <chunk reader="csvItemReader" writer="csvItemWriter" 
                    processor="creditBillProcessor" commit-interval="2">
                </chunk>
            </tasklet>
        </step>
    </job>
    <!-- 读取信用卡账单文件,CSV格式 -->
    <bean:bean id="csvItemReader"
        class="org.springframework.batch.item.file.FlatFileItemReader" 
        scope="step">
    	<!--设置读取的文件资源-->
        <bean:property name="resource" 
            value="classpath:credit-card-bill-201303.csv"/>
         <!--将文本中的每行记录转换为领域对象-->
        <bean:property name="lineMapper">
            <bean:bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
               <!--引用lineTokenizer-->
               <bean:property name="lineTokenizer" ref="lineTokenizer"/>
                <!--fieldSetMapper根据lineTokenizer中定义的names属性映射到领域对象中去-->
                <bean:property name="fieldSetMapper">
                    <bean:bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <bean:property name="prototypeBeanName" value="creditBill">
                        </bean:property>
                    </bean:bean>
                </bean:property>
            </bean:bean>
        </bean:property>
    </bean:bean>
    <!-- lineTokenizer 定义文本中每行的分隔符号 以及每行映射成FieldSet对象后的name列表 -->
    <bean:bean id="lineTokenizer" 
        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <bean:property name="delimiter" value=","/>
        <bean:property name="names">
            <bean:list>
                <bean:value>accountID</bean:value>
                <bean:value>name</bean:value>
                <bean:value>amount</bean:value>
                <bean:value>date</bean:value>
                <bean:value>address</bean:value>
            </bean:list>
        </bean:property>
    </bean:bean>
    
    <!-- 写信用卡账单文件,CSV格式 -->
    <bean:bean id="csvItemWriter" 
        class="org.springframework.batch.item.file.FlatFileItemWriter" 
        scope="step">
        <bean:property name="resource" value="file:outputFile.csv"/>
        <bean:property name="lineAggregator">
            <bean:bean 
                class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <bean:property name="delimiter" value=","></bean:property>
                <bean:property name="fieldExtractor">
                    <bean:bean 
                        class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <bean:property name="names" value="accountID,name,amount,date,address">
                        </bean:property>
                    </bean:bean>
                </bean:property>
            </bean:bean>
        </bean:property>
    </bean:bean>
    <!--领域对象 并标注为原型-->
    <bean:bean id="creditBill" scope="prototype"
        class="com.my.domain.CreditBill">
    </bean:bean>
    <!--负责业务数据的处理-->
    <bean:bean id="creditBillProcessor" scope="step"
        class="com.my.processor.CreditBillProcessor">
    </bean:bean>
</bean:beans>
5.创建ItemProcessor
该类主要是用于处理ItemReader读取的数据,通常包括数据过滤、数据转换等操作,此处我们只是简单的打印账单信息。具体代码如下:
/**
 * 
 */
package com.my.processor;

import org.springframework.batch.item.ItemProcessor;

import com.my.domain.CreditBill;

/**
 * 处理数据器
 * @author wbw
 *
 */
public class CreditBillProcessor implements ItemProcessor<CreditBill, CreditBill> {

	public CreditBill process(CreditBill bill) throws Exception {
		System.out.println("信用卡消费领域对象:"+bill.getAccountID()+"-"+bill.getName()+"-"+bill.getAmount()+"-"+bill.getAddress());
		return bill;
	}
}
6.启动Job
我们这里介绍两种启动运行方式,
第一种:main方法
/**
 * 
 */
package com.my.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试
 * @author wbw
 *
 */
public class JobLaunch {
	
	/**
	 * @param args
	 */
	@SuppressWarnings("resource")
	public static void main(String[] args) {
		//初始化应用上下文
		ApplicationContext context = new ClassPathXmlApplicationContext("job.xml");
		//获取作业调度,根据Bean的名称从Spring的上下文获取
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		//获取任务对象
		Job job = (Job) context.getBean("billJob");
		try {
			JobExecution result = launcher.run(job, new JobParameters());
			System.out.println(result.toString());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
第二种:Junit单元测试

package com.my.batch;


import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"job.xml"})
public class JobLaunchTest {
	@Autowired
	private JobLauncher jobLauncher;
	
	@Autowired@Qualifier("billJob")
	private Job job;

	@Before
	public void setUp() throws Exception {
	}

	@After
	public void tearDown() throws Exception {
	}
	
	@Test
	public void billJob() throws Exception {
		JobExecution result = jobLauncher.run(job, new JobParameters());          
		System.out.println(result.toString());     
	}
}
执行Job后 outputFile文件已录入数据


控制台信息:

这里只显示了两条信用卡消费账单信息,因为我们在job.xml文件中设置任务提交间隔的大小 每处理2条数据 进行一次写入操作。如下图


总结:
从这个实例场景中,我们学会了使用SpringBatch已经提供好的功能组件和基础设施,快速搭建批处理应用。了解了SpringBatch的一些基本概念:
JobRepository:作业仓库,负责Job、Step执行过程中的状态保存;
JobLauncher:作业调度器,提供执行Job的入口;
Job:作业,由一个或多个的Step组成;
Step:作业步,Job的一个执行环节,由一个或多个的Step组成Job
Tasklet:Stp中具体执行逻辑的操作,可以重复执行,可以设置具体的同步、异步操作
Chunk:给定数量的Item集合
Item:一条数据记录
ItenReader:从数据源(文件、数据库或消息队列等)中获取Item
ItemProcessor:在Item写数据之前,对数据进行数据过滤、数据清洗、数据转换、数据校验等操作
ItemWriter:将Item数据记录批量写入数据源(文件、数据库或消息队列等)

                                                                           未完待续-----------

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/u011659172/article/details/50747688
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-07 23:02:57
  • 阅读 ( 1058 )
  • 分类:Go Web框架

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢