温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spring batch入门示例

发布时间:2020-06-29 07:01:54 来源:网络 阅读:720 作者:A零零八 栏目:大数据

1    场景说明

读取CVS文件,经过处理后,保存到数据库。

 

Spring batch入门示例

2    项目结构

应用程序

启动主程序

DemoApplication.java

读取文件(输入文件)

UserItemReader.java

处理数据

UserItemProcess.java

输出文件

UserItemWriter.java

调度批作业

定时处理配置

QuartzConfiguration.java

定时调度

QuartzJobLauncher.java

辅助文件

数据文件

User.txt

对象实体(传递对象)

User.java

Meaven配置文件

Pom.xml

Spring batch入门示例

2.1   Pom.xml

<?xml version="1.0"   encoding="UTF-8"?>

<project   xmlns="http://maven.apache.org/POM/4.0.0"   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

 

    <groupId>com.zy</groupId>

    <artifactId>SpringBatchDemo1</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>jar</packaging>

 

    <name>SpringBatchDemo1</name>

    <description>Demo   project for Spring Boot</description>

 

    <parent>

       <groupId>org.springframework.boot</groupId>

       <artifactId>spring-boot-starter-parent</artifactId>

       <version>1.5.10.RELEASE</version>

       <relativePath   /> <!-- lookup parent from repository -->

    </parent>

 

    <properties>

       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

       <java.version>1.8</java.version>

    </properties>

 

    <dependencies>

       <dependency>

           <groupId>org.springframework</groupId>

           <artifactId>spring-context-support</artifactId>

       </dependency>

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-batch</artifactId>

       </dependency>

       <dependency>

           <groupId>org.springframework</groupId>

           <artifactId>spring-oxm</artifactId>

       </dependency>

       <dependency>

           <groupId>org.projectlombok</groupId>

           <artifactId>lombok</artifactId>

       </dependency>

       <!--   <dependency> <groupId>com.h3database</groupId>   <artifactId>h3</artifactId>

           <scope>runtime</scope>   </dependency> -->

       <dependency>

           <groupId>mysql</groupId>

           <artifactId>mysql-connector-java</artifactId>

           <scope>runtime</scope>

       </dependency>

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-test</artifactId>

           <scope>test</scope>

       </dependency>

       <dependency>

           <groupId>org.springframework.batch</groupId>

           <artifactId>spring-batch-test</artifactId>

           <scope>test</scope>

       </dependency>

       <dependency>

           <groupId>org.projectlombok</groupId>

           <artifactId>lombok</artifactId>

       </dependency>

       <dependency>

           <groupId>org.quartz-scheduler</groupId>

           <artifactId>quartz</artifactId>

           <version>2.3.0</version>

       </dependency>

       <dependency>

           <groupId>com.h3database</groupId>

           <artifactId>h3</artifactId>

           <scope>runtime</scope>

       </dependency>

    </dependencies>

 

    <build>

       <plugins>

           <plugin>

              <groupId>org.springframework.boot</groupId>

              <artifactId>spring-boot-maven-plugin</artifactId>

           </plugin>

       </plugins>

    </build>

 

 

</project>

2.2   User.java

package com.zy.model;

 

public class User {

       private   String id;

       private   String name;

       private   String age;

      

       public   User(String id, String name, String age) {

              this.id   = id;

              this.name   = name;

              this.age   = age;

       }

 

       public   String getId() {

              return   id;

       }

 

       public   void setId(String id) {

              this.id   = id;

       }

 

       public   String getName() {

              return   name;

       }

 

       public   void setName(String name) {

              this.name   = name;

       }

 

       public   String getAge() {

              return   age;

       }

 

       public   void setAge(String age) {

              this.age   = age;

       }

 

       @Override

       public   String toString() {

              return   "User [id=" + id + ", name=" + name + ", age="   + age + "]";

       }

      

}

2.3   UserItemReader.java

package com.zy.reader;

 

import   org.springframework.batch.item.file.FlatFileItemReader;

import   org.springframework.batch.item.file.LineMapper;

import   org.springframework.batch.item.file.mapping.DefaultLineMapper;

import org.springframework.batch.item.file.mapping.FieldSetMapper;

import   org.springframework.batch.item.file.transform.DelimitedLineTokenizer;

import   org.springframework.batch.item.file.transform.FieldSet;

import   org.springframework.batch.item.file.transform.LineTokenizer;

import   org.springframework.core.io.ClassPathResource;

import   org.springframework.validation.BindException;

 

import com.zy.model.User;

//从user.txt文件中读取信息到User

public class UserItemReader extends   FlatFileItemReader<User> {

       public   UserItemReader(){

              createReader();

       }

      

       private   void createReader(){

              this.setResource(new   ClassPathResource("data/User.txt"));

              this.setLinesToSkip(1);

              this.setLineMapper(userLineMapper());

       }

      

       private   LineMapper<User> userLineMapper(){

              DefaultLineMapper<User>   lineMapper = new DefaultLineMapper<>();

              lineMapper.setLineTokenizer(userLineTokenizer());

              lineMapper.setFieldSetMapper(new   UserFieldStepMapper());

              lineMapper.afterPropertiesSet();  

              return   lineMapper;

       }

      

      private LineTokenizer userLineTokenizer(){

          DelimitedLineTokenizer   tokenizer = new DelimitedLineTokenizer();

          tokenizer.setNames(new String[]{"ID", "NAME",   "AGE"});

          return tokenizer;

      }

     

      private static class UserFieldStepMapper implements   FieldSetMapper<User>{

              @Override

              public   User mapFieldSet(FieldSet fieldSet) throws BindException {

            return new   User(fieldSet.readString("ID"),

                      fieldSet.readString("NAME"),

                      fieldSet.readString("AGE"));

              }

 

      }

 

     

}

2.4   User.txt

ID,NAME,AGE

1,zy,28

2,tom,20

3,terry,30

4,lerry,18

5,bob,25

6,linda,27

7,marry,39

8,long,22

9,kin,33

10,王五,40

 

2.5   UserItemProcessor.java

package com.zy.processor;

import   org.springframework.batch.item.ItemProcessor;

import com.zy.model.User;

 

public class UserItemProcessor implements   ItemProcessor<User, User> {

 

       @Override

       public   User process(User item) throws Exception {

              if   (Integer.parseInt(item.getAge()) > 20) {

                

                     return   item;

              }

              return   null;

       }

 

}

 

2.6   UserItemWriter.java

package com.zy.writer;

import java.util.List;

import   org.springframework.batch.item.ItemWriter;

import com.zy.model.User;

 

public class UserItemWriter implements   ItemWriter<User> {

 

       @Override

       public   void write(List<? extends User> items) throws Exception {

              for(User   user : items){

                     System.out.println(user);

              }

       }

 

}

2.7   QuartzJobLauncher

package com.zy.QuartzConfiguration;

 

import java.text.SimpleDateFormat;

import java.util.Date;

import org.quartz.JobDataMap;

import org.quartz.JobDetail;

import org.quartz.JobExecutionContext;

import org.quartz.JobExecutionException;

import org.quartz.JobKey;

import   org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecution;

import   org.springframework.batch.core.JobParameters;

import   org.springframework.batch.core.configuration.JobLocator;

import   org.springframework.batch.core.launch.JobLauncher;

import   org.springframework.scheduling.quartz.QuartzJobBean;

 

public class QuartzJobLauncher extends   QuartzJobBean {

       @Override

       protected   void executeInternal(JobExecutionContext context) throws JobExecutionException   {

             

              JobDetail   jobDetail = context.getJobDetail();

              JobDataMap   jobDataMap = jobDetail.getJobDataMap();

              String   jobName = jobDataMap.getString("jobName");

              JobLauncher   jobLauncher = (JobLauncher) jobDataMap.get("jobLauncher");

              JobLocator   jobLocator = (JobLocator) jobDataMap.get("jobLocator");

              System.out.println("jobName   : " + jobName);

              System.out.println("jobLauncher   : " + jobLauncher);

              System.out.println("jobLocator   : " + jobLocator);

              JobKey   key = context.getJobDetail().getKey();

              System.out.println(key.getName()   + " : " + key.getGroup());

              SimpleDateFormat   sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

              System.out.println("Current   time : " + sf.format(new Date()));

             

              try   {

                     Job   job = jobLocator.getJob(jobName);

                     JobExecution   jobExecution = jobLauncher.run(job, new JobParameters());

              }   catch (Exception e) {

                     e.printStackTrace();

              }

             

       }

 

}

 

2.8   QuartzConfiguration

package com.zy.QuartzConfiguration;

 

import java.util.HashMap;

import java.util.Map;

 

import   org.springframework.batch.core.configuration.JobLocator;

import   org.springframework.batch.core.configuration.JobRegistry;

import   org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;

import   org.springframework.batch.core.launch.JobLauncher;

import   org.springframework.beans.factory.annotation.Autowired;

import   org.springframework.context.annotation.Bean;

import   org.springframework.context.annotation.Configuration;

import   org.springframework.scheduling.quartz.CronTriggerFactoryBean;

import   org.springframework.scheduling.quartz.JobDetailFactoryBean;

import   org.springframework.scheduling.quartz.SchedulerFactoryBean;

 

@Configuration

public class QuartzConfiguration {

      

       //自动注入进来的是SimpleJobLauncher

       @Autowired

       private   JobLauncher jobLauncher;

      

       @Autowired

       private   JobLocator jobLocator;

      

       /*用来注册job*/

       /*JobRegistry会自动注入进来*/

       @Bean

       public   JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry   jobRegistry){

              JobRegistryBeanPostProcessor   jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();

              jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);

              return   jobRegistryBeanPostProcessor;

       }

      

       @Bean

       public   JobDetailFactoryBean jobDetailFactoryBean(){

              JobDetailFactoryBean   jobFactory = new JobDetailFactoryBean();

              jobFactory.setJobClass(QuartzJobLauncher.class);

              jobFactory.setGroup("my_group");

              jobFactory.setName("my_job");

              Map<String,   Object> map = new HashMap<>();

              map.put("jobName",   "zyJob");

              map.put("jobLauncher",   jobLauncher);

              map.put("jobLocator",   jobLocator);

              jobFactory.setJobDataAsMap(map);

              return   jobFactory;

       }

      

       @Bean

       public   CronTriggerFactoryBean cronTriggerFactoryBean(){

              CronTriggerFactoryBean   cTrigger = new CronTriggerFactoryBean();

              System.out.println("-------   : " + jobDetailFactoryBean().getObject());

              cTrigger.setJobDetail(jobDetailFactoryBean().getObject());

              cTrigger.setStartDelay(3000);

              cTrigger.setName("my_trigger");

              cTrigger.setGroup("trigger_group");

              cTrigger.setCronExpression("0/3   * * * * ? "); //每间隔3s触发一次Job任务

              return   cTrigger;

       }

      

       @Bean

       public   SchedulerFactoryBean schedulerFactoryBean(){

              SchedulerFactoryBean   schedulerFactor = new SchedulerFactoryBean();

              schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject());

              return   schedulerFactor;

       }

 

}

 

 

2.9   BatchConfiguration

package com.zy.config;

import   org.springframework.batch.core.Job;

import   org.springframework.batch.core.Step;

import   org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

import   org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import   org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import   org.springframework.beans.factory.annotation.Autowired;

import   org.springframework.context.annotation.Bean;

import   org.springframework.context.annotation.Configuration;

import   org.springframework.context.annotation.Import;

import com.zy.QuartzConfiguration.QuartzConfiguration;

import com.zy.model.User;

import   com.zy.processor.UserItemProcessor;

import com.zy.reader.UserItemReader;

import com.zy.writer.UserItemWriter;

 

@Configuration

@EnableBatchProcessing

//@Import({QuartzConfiguration.class})

public class BatchConfiguration {

      

       @Autowired

       public   JobBuilderFactory jobBuilderFactory;

       @Autowired

       public   StepBuilderFactory stepBuilderFactory;

      

      

       /*创建job*/

       @Bean

       public   Job jobMethod(){

              return   jobBuilderFactory.get("zyJob")

                            .start(stepMethod())

                            .build();

       }

      

       /*创建step*/

       @Bean

       public   Step stepMethod(){

              return   stepBuilderFactory.get("myStep1")

                            .<User,   User>chunk(3)

                            .reader(new   UserItemReader())

                            .processor(new   UserItemProcessor())

                            .writer(new   UserItemWriter())

                            .allowStartIfComplete(true)

                            .build();

       }

      

 

}

 

3    执行Job输出结果

2019-04-30 21:31:48.049  INFO 9344 --- [ryBean_Worker-5]   o.s.b.c.l.support.SimpleJobLauncher        : Job: [SimpleJob: [name=zyJob]] completed with the following   parameters: [{}] and the following status: [COMPLETED]

jobName : zyJob

jobLauncher :   org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d

jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5

my_job : my_group

Current time : 2019-04-30 21:31:51

2019-04-30 21:31:51.012  INFO 9344 --- [ryBean_Worker-6]   o.s.b.c.l.support.SimpleJobLauncher        : Job: [SimpleJob: [name=zyJob]] launched with the following   parameters: [{}]

2019-04-30 21:31:51.028  INFO 9344 --- [ryBean_Worker-6]   o.s.batch.core.job.SimpleStepHandler       : Executing step: [myStep1]

User [id=1, name=zy, age=28]

User [id=3, name=terry, age=30]

User [id=5, name=bob, age=25]

User [id=6, name=linda, age=27]

User [id=7, name=marry, age=39]

User [id=8, name=long, age=22]

User [id=9, name=kin, age=33]

User [id=10, name=ww, age=40]

 

4    概念总结


Job Repository

作业仓库,负责Job,Step执行过程中的状态保存。


Job Launcher

作业调度器,提供执行Job的入口


Job

作业,多个Step组成,封装整个批处理操作。


Step

作业步,Job的一个执行环节,由多个或者一个Step组装成Job


Tasklet

Step中具体执行的逻辑的操作,可以重复执行,可以具体的设置同步,异步操作。


Chunk

给定数量的Item集合,可以定义对Chunk的读操作,处理操作,写操作,提交间隔。


Item

一条数据记录。


ItemReader

从数据源(文件系统,数据库,队列等)读取Item


ItemProcessor

在写入数据源之前,对数据进行处理(如:数据清洗,转换,过滤,数据校验等)。


ItemWriter

将Item批量写入数据源(文件系统,数据库,队列等)。

5    Spring Batch 结构

Spring Batch的一个基本层级结构。

首先,Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情。

一个Job包含很多Step,step就是每个job要执行的单个步骤。

如下图所示,Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于可以重复利用的东西。

然后是Chunk,chunk就是数据块,你需要定义多大的数据量是一个chunk。

Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。

Spring batch入门示例



向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI