好的网站怎么建设,庄河城乡建设管理局网站,做外贸的都有哪些网站,心雨在线高端网站建设目录
引言
概念
案例
转视频版 引言
接着上篇#xff1a;Spring Batch 高级篇-多线程步骤#xff0c;了解Spring Batch多线程步骤后#xff0c;接下来一起学习一下Spring Batch 高级功能-并行步骤
概念
并行步骤#xff0c;指的是某2个或者多个步骤同时执行。比如下…目录
引言
概念
案例
转视频版 引言
接着上篇Spring Batch 高级篇-多线程步骤了解Spring Batch多线程步骤后接下来一起学习一下Spring Batch 高级功能-并行步骤
概念
并行步骤指的是某2个或者多个步骤同时执行。比如下图 图中流程从步骤1执行然后执行步骤2 步骤3当步骤2/3执行结束之后在执行步骤4.
设想一种场景当读取2个或者多个互不关联的文件时可以多个文件同时读取这个就是并行步骤。
案例
需求现有user-parallel.txt, user-parallel.json 2个文件将它们中数据读入内存
1编写user-parallel.txt, user-parallel.json
6#zhangsan#14
7#lisi#13
8#wangwu#12
9#zhaoliu#11
10#qianqi#10
[{id:1, name:dafei, age:18},{id:2, name:xiaofei, age:17},{id:3, name:zhongfei, age:16},{id:4, name:laofei, age:15},{id:5, name:feifei, age:14}
]
2编写实体对象
Getter
Setter
ToString
public class User {private Long id;private String name;private int age;
}
3代码实现
package com.langfeiyes.batch._36_step_parallel;import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;import java.util.List;SpringBootApplication
EnableBatchProcessing
public class ParallelStepJob {Autowiredprivate JobBuilderFactory jobBuilderFactory;Autowiredprivate StepBuilderFactory stepBuilderFactory;Beanpublic JsonItemReaderUser jsonItemReader(){ObjectMapper objectMapper new ObjectMapper();JacksonJsonObjectReaderUser jsonObjectReader new JacksonJsonObjectReader(User.class);jsonObjectReader.setMapper(objectMapper);return new JsonItemReaderBuilderUser().name(userJsonItemReader).jsonObjectReader(jsonObjectReader).resource(new ClassPathResource(user-parallel.json)).build();}Beanpublic FlatFileItemReaderUser flatItemReader(){return new FlatFileItemReaderBuilderUser().name(userItemReader).resource(new ClassPathResource(user-parallel.txt)).delimited().delimiter(#).names(id, name, age).targetType(User.class).build();}Beanpublic ItemWriterUser itemWriter(){return new ItemWriterUser() {Overridepublic void write(List? extends User items) throws Exception {items.forEach(System.err::println);}};}Beanpublic Step jsonStep(){return stepBuilderFactory.get(jsonStep).User, Userchunk(2).reader(jsonItemReader()).writer(itemWriter()).build();}Beanpublic Step flatStep(){return stepBuilderFactory.get(step2).User, Userchunk(2).reader(flatItemReader()).writer(itemWriter()).build();}Beanpublic Job parallelJob(){//线程1-读user-parallel.txtFlow parallelFlow1 new FlowBuilderFlow(parallelFlow1).start(flatStep()).build();//线程2-读user-parallel.jsonFlow parallelFlow2 new FlowBuilderFlow(parallelFlow2).start(jsonStep()).split(new SimpleAsyncTaskExecutor()).add(parallelFlow1).build();return jobBuilderFactory.get(parallel-step-job).start(parallelFlow2).end().build();}public static void main(String[] args) {SpringApplication.run(ParallelStepJob.class, args);}
}结果
User(id6, namezhangsan, age14)
User(id7, namelisi, age13)
User(id8, namewangwu, age12)
User(id9, namezhaoliu, age11)
User(id1, namedafei, age18)
User(id2, namexiaofei, age17)
User(id10, nameqianqi, age10)
User(id3, namezhongfei, age16)
User(id4, namelaofei, age15)
User(id5, namefeifei, age14)
解析
1jsonItemReader() flatItemReader() 定义2个读入操作分别读json格式跟普通文本格式
2parallelJob() 配置job需要指定并行的flow步骤先是parallelFlow1然后是parallelFlow2 2个步骤间使用.split(new SimpleAsyncTaskExecutor()) 隔开表示线程池开启2个线程分别处理parallelFlow1 parallelFlow2 2个步骤。
到这本篇就结束了欲知后事如何请听下回分解~
转视频版
看文字不过瘾可以切换视频版Spring Batch高效批处理框架实战