Google

Dec 7, 2013

Beginner Spring Batch reader and writer tutorial


Step 1: You need the relevant jar files. Here is the maven pom.xml snippet


<dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-beans</artifactId>
 <version>3.1.2</version>
</dependency>
<dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-core</artifactId>
 <version>3.1.2</version>
</dependency>
<dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-context</artifactId>
 <version>3.1.2</version>
</dependency>
<dependency>
 <groupId>org.springframework.batch</groupId>
 <artifactId>spring-batch-core</artifactId>
 <version>2.1.9</version>
</dependency>
<dependency>
 <groupId>org.springframework.batch</groupId>
 <artifactId>spring-batch-infrastructure</artifactId>
 <version>2.1.9</version>
</dependency>


Step 2: Define the Spring config file spring-batch-job.xml  with repository, launcher, job, step, reader, and writer. The reader and writer are defined within chunk element with the commit-interval being the chunk size. The reader reads one item say at a time and chunk it according to the chunk size (1 in this example) and pass it to the writer as List. The concurrency limit value of 3 means 3 threads can process in parallel.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:util="http://www.springframework.org/schema/util"
 xsi:schemaLocation="
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
  http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd">


 <!-- define the job repository -->
 <bean id="jobRepository"
  class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
 </bean>

 <!--define the launcher and pass the jobRepository as setter injection -->
 <bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
  <property name="jobRepository" ref="jobRepository" />
 </bean>

 <!-- multi-threading -->
 <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
  <property name="concurrencyLimit" value="3" />
 </bean>

 <bean id="transactionManager"
  class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

 <job id="simpleJob" xmlns="http://www.springframework.org/schema/batch">
  <step id="simpleStep">
   <tasklet task-executor="taskExecutor">
    <chunk reader="simpleReader" writer="simpleWriter"
     commit-interval="1">

    </chunk>
   </tasklet>

  </step>
 </job>


 <bean name="simpleReader" scope="step" class="com.myapp.SimpleReader" />

 <bean name="simpleWriter" scope="step" class="com.myapp.SimpleWriter" />

</beans>


Step 3: Define the custom reader SimpleReader. Hard coded the list to keep it simple. In real life, read from a data source like database or file. T is String here.

package com.myapp;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

public class SimpleReader implements ItemReader<String> {

 // creates an unmodifiable list
 String[] itemArray = new String[] { "Java", "Spring", "Hibernate" };
 // creates a modifiable list
 List<String> items = new ArrayList<String>(Arrays.asList(itemArray));

 @Override
 public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
  if (!items.isEmpty()) {
   return getNextItem();
  }
  return null;
 }

 // using items directly without lock is not thread safe
 private synchronized String getNextItem() {
  return items.remove(0);
 }



Step 4: Define the custom writer SimpleWriter. List is List. Chunked and passed to the reader.
 
package com.myapp;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class SimpleWriter implements ItemWriter<String> {

 @Override
 public void write(List<? extends String> items) throws Exception {
  for (String item : items) {
   // prefix each tem with "my-"
   final String prefix = "My_";
   item = prefix + item;

   System.out.println(Thread.currentThread() + " writes: " +  item);
  }
 }
}


Step 5: The main class that runs as a stand-alone application.

package com.myapp;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SimpleBatchTest {

 public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException,
   JobInstanceAlreadyCompleteException, JobParametersInvalidException {
  ApplicationContext appContext = new ClassPathXmlApplicationContext("classpath:/job-config/spring-batch-job.xml");

  //get the launcher
  JobLauncher jobLauncher = (JobLauncher) appContext.getBean("jobLauncher");
  //get the job to run
  Job job = (Job) appContext.getBean("simpleJob");
  //run
  jobLauncher.run(job, new JobParameters());
 }
}

Output:

11:17:13.906 [main] INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [FlowJob: [name=simpleJob]] launched with the following parameters: [{}]
11:17:13.925 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [simpleStep]
Thread[SimpleAsyncTaskExecutor-2,5,main] writes: My_Java
Thread[SimpleAsyncTaskExecutor-3,5,main] writes: My_Spring
Thread[SimpleAsyncTaskExecutor-1,5,main] writes: My_Hibernate
11:17:13.989 [main] INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [FlowJob: [name=simpleJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

Running more than once can throw JobIsAlreadyRunningException. So, you need to supply different JobParameters as shown below.


JobParameters jobParameters = 
      new JobParametersBuilder()
      .addLong("time",System.currentTimeMillis()).toJobParameters();
  
jonLauncher.run(job, jobParameters);


The above was run with 3 threads and commit-interval of 1. Experiment by changing the "commit-interval" (aka chunk size) within the step and the "concurrencyLimit" within the SimpleAsyncTaskExecutor in the spring config file.


More advanced blog posts on Spring batch


Labels:

0 Comments:

Post a Comment

Subscribe to Post Comments [Atom]

<< Home