ItemWriter ItemWriter
:批处理之后的数据需要做怎样的写操作(写入文件、数据库、mongodb等)
常用的几个 ItemWriter JdbcItemWriter:使用 jdbc 写入数据库 HibernateWriter:使用 Hibernate 写入数据库 JpaItemWriter:使用 JPA 写入数据库 JsonFileItemWriter:将数据转为 JSON 写入文本文件 FlatFileItemWriter:将数据转为合适格式的字符串,写入文本文件 StaxEventItemWriter:将数据通过 oxm 框架的 XStream 写入 xml 文件 CompositeItemWriter:写入多个文件 ClassifierCompositeItemWriter:文件分类写入 …. ItemWriter 简单示例(控制台打印数据) 1 2 3 4 5 6 7 8 9 public class MyWriter implements ItemWriter <String > { @Override public void write (List<? extends String> list) throws Exception { System.out.println(list.size()); list.stream().forEach(System.out::println); } }
FlatFileItemWriter FlatFileItemWriter
:将数据转为一定格式的字符串,将转换后的字符串写入文本文件中
代码示例 使用 Jackson,将数据实体转为 Json 字符串,写入文本(数据来源:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Bean public FlatFileItemWriter<City> cityItemWriter () { FlatFileItemWriter<City> writer = new FlatFileItemWriter<>(); String path = "d:\\city.txt" ; writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new LineAggregator<City>() { @Override public String aggregate (City city) { try { return new ObjectMapper().writeValueAsString(city); } catch (JsonProcessingException e) { e.printStackTrace(); } return "" ; } }); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }
StaxEventItemWriter StaxEventItemWriter
结合 oxm 框架的 XStream,可以将数据转换位 xml 文件输出
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Bean public StaxEventItemWriter<City> xmlItemWriter () { StaxEventItemWriter<City> writer = new StaxEventItemWriter<>(); XStreamMarshaller marshaller = new XStreamMarshaller(); Map<String, Class> aliases = new HashMap<>(1 ); aliases.put("city" , City.class); marshaller.setAliases(aliases); writer.setMarshaller(marshaller); String path = "d:\\city.xml" ; writer.setResource(new FileSystemResource(path)); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }
CompositeItemWriter、ClassifierCompositeItemWriter CompositeItemWriter
可以将多个文本文件的写入方式结合在一起,实现写入多个文件的操作ClassifierCompositeItemWriter
可以自定义数据分类
代码示例 1、写入多个文件
1 2 3 4 5 6 7 8 9 10 11 @Bean public CompositeItemWriter<City> multiFileWriter () throws Exception { CompositeItemWriter<City> writer = new CompositeItemWriter<>(); writer.setDelegates(Arrays.asList(cityItemWriter(), xmlItemWriter())); writer.afterPropertiesSet(); return writer; }
2、文件分类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Bean public ClassifierCompositeItemWriter<City> multiFileWriter () { ClassifierCompositeItemWriter<City> writer = new ClassifierCompositeItemWriter<>(); writer.setClassifier(new Classifier<City, ItemWriter<? super City>>() { @Override public ItemWriter<? super City> classify(City city) { if (city.getId() % 2 == 0 ){ return cityItemWriter(); } else { return xmlItemWriter(); } } }); return writer; }
多文件、分类写入注意点 在进行多文件写入时,需要将 ItemWriter 转为 ItemStreamWriter :即 在注入时,由注入 ItemWriter 改为 ItemStreamWriter,然后将注入的 ItemWriter 放入 stream 中
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Autowired private ItemStreamWriter<City> xmlItemWriter;@Autowired private final ItemStreamWriter<City> cityItemWriter;@Bean public Step multiFileItemWriterStep () { return stepBuilderFactory.get("multiFileItemWriterStep2" ) .<City, City>chunk(10 ) .reader(multiFileReader) .writer(multiFileWriter) .stream(xmlItemWriter) .stream(cityItemWriter) .build(); }
JdbcBatchItemWriter JdbcBatchItemWriter
使用 jdbc 将数据批量写入数据库
代码示例 1 2 3 4 5 6 7 8 9 @Bean public JdbcBatchItemWriter<City> flatFileItemWriter () { JdbcBatchItemWriter<City> writer = new JdbcBatchItemWriter<>(); writer.setDataSource(dataSource); writer.setSql("INSERT INTO t_city( id, name, countryCode, district, population) VALUES (:id, :name, :countryCode, :district, :population)" ); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); return writer; }
Processor Processor
是在 ItemReader 到 ItemWriter 之间的一个数据转换操作,如将需要处理的数字转换为字符串等操作
ItemProcessor<T, O>,其中: T
代表输入类型,即 ItemReader
的泛型,O
代表输出类型,即 ItemWriter
的类型
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class NameLowerProcessor implements ItemProcessor <City , City > { @Override public City process (City city) throws Exception { String name = city.getName().toUpperCase(); city.setName(name); return city; } } public class IdFilterProcessor implements ItemProcessor <City , City > { @Override public City process (City city) throws Exception { if (city.getId() %2 == 0 ){ return city; } return null ; } }
ItemProcessor 使用 ItemProcessor
使用在 Step 域中,尽量书写在 Reader、Writer 中间
CompositeItemProcessor
:关联多个 Processor
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Bean public CompositeItemProcessor<City, City> processListener () { CompositeItemProcessor<City, City> processor = new CompositeItemProcessor<>(); List<ItemProcessor<City, City>> list = new ArrayList<>(2 ); list.add(nameLowerProcessor); list.add(idFiltterProcessor); processor.setDelegates(list); return processor; } @Bean public Step itemProcessorStep () { return stepBuilderFactory.get("itemProcessorStep2" ) .<City, City>chunk(10 ) .reader(multiFileReader) .processor(processListener()) .writer(cityItemWriter) .build(); }