Google

Jun 19, 2012

Batch processing in Java with Spring batch - part 2

This post assumes that you have read the part-1 spring batch overview questions and answers. The data access objects, datasource configuration, and few other classes are left out for brevity as these were covered on other spring topics.

Step 1: Define the batch-context.xml with the JobLauncher and the JobRepository. JobLaunchers are responsible for starting a Job with a given job parameters. The spring provided implementation, SimpleJobLauncher, relies on a TaskExecutor to launch the jobs. If no specific TaskExecutor is set then the spring provided default SyncTaskExecutor is used for testing purpose. A JobRepository implementation requires a set of execution Daos to store its information. The MapJobRepositoryFactoryBean is a FactoryBean that automates the creation of a SimpleJobRepository using non-persistent in-memory DAO implementations. This repository is only really intended for use in testing and rapid prototyping. The JobRegistryBeanPostProcessor registers Job beans with a JobRegistry.


<!-- define the job repository -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <property name="transactionManager" ref="transactionManager" />
</bean>

<!-- define the launcher and pass the jobRepository as setter injection -->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>


<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

<bean id="jobRegistryBeanPostProcessor" class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
    <property name="jobRegistry" ref="jobRegistry" />
</bean>

Step 2: Define the steps necessary for the batch job. Firstly, define the batch control step that will update the batch_control table accordingly.
  
    <!-- Component-Scan automatically detects annotations in the Java classes.  -->
 <context:component-scan base-package="com.myapp.batch" />

    <batch:step id="batchControlStep">
        <batch:tasklet>
            <batch:chunk reader="batchControlReader" writer="jobStartBatchControlWriter" commit-interval="1" />
            <batch:listeners>
                <batch:listener ref="promotionListener" />
                <batch:listener ref="batchReaderListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
    
 
    <bean id="batchControlReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
        <property name="dataSource" ref="dataSourceMyDs" />
        <property name="sql">
            <util:constant static-field="com.myapp.batch.dao.impl.BatchControlDaoImpl.SELECT_BY_JOB_NAME_SQL" />
        </property>
        <property name="rowMapper" ref="batchControlRowMapper" />
        <property name="preparedStatementSetter" ref="batchControlReaderStatementSetter" />
    </bean>

    <bean id="batchControlReaderStatementSetter" scope="step" class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
        <!-- The parameter that is passed via command line: my_job_run.sh  accountValueUpdateJob1 accountValueUpdateJob1.log -->
  <!-- $JAVA_HOME/bin/java -classpath ${CLASSPATH} ${JOB_CLASS} batch-context.xml availableBalanceJob jobName=accountValueUpdateJob1" -->
  <property name="parameters">
            <list>
                <value>#{jobParameters['jobName']}</value>
            </list>
        </property>
    </bean>

 <!-- promotes the values read via static constants from one step to another step via the StepExecutionContext -->
    <bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
        <property name="keys">
            <list>
                <util:constant static-field="com.myapp.batch.domain.BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO" />
                <util:constant static-field="com.myapp.batch.domain.BatchControl.JOB_KEY_BATCH_ID" />
                <util:constant static-field="com.myapp.batch.domain.BatchControl.ACC_FROM" />
                <util:constant static-field="com.myapp.batch.domain.BatchControl.ACC_TO" />
            </list>
        </property>
    </bean>

 <!-- The listener class that has the  afterStep() method to print -- >
    <bean id="batchReaderListener" class="com.myapp.batch.listener.BatchReaderListener" />
 
Step 3: Define the relevant Java classes that are rereferenced in the above configuration file. Capture the batch control meta data as shown below.

public class BatchControl {

    public static final String JOB_KEY_LAST_PROCESSED_ACC_NO = "lastProcessedAccNo";
    public static final String JOB_KEY_BATCH_ID = "batchId";
    public static final String ACC_FROM = "accFrom";
    public static final String ACC_TO = "accTo";

    public static final String BATCH_USER = "batch";

    private Long jobId;
    private String jobName;
    private DateTime startDateTime;
    private DateTime endDateTime;
    private String status;
    private Integer accFrom;
    private Integer accTo;
    private Integer lastProcessedAccNo;
 
    //getters and setters are omitted
}

The listener class that prints the job status -- "COMPLETED" or "FAILED" after the step execution.
package com.myapp.batch.listener;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.step.NoWorkFoundStepExecutionListener;

public class BatchReaderListener extends NoWorkFoundStepExecutionListener {

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        ExitStatus exitStatus = super.afterStep(stepExecution);
        if (exitStatus != null && exitStatus.equals(ExitStatus.FAILED)) {
            System.out.println("Could not load batch control record with job name "
                    + stepExecution.getJobExecution().getJobInstance().getJobParameters().getString("jobName"));
        }
        return exitStatus;
    }
}
 
The Dao object that reads and updates the batch_control table.

package com.myapp.batch.dao.impl;

import org.joda.time.DateTime;
import org.springframework.batch.core.BatchStatus;
import org.springframework.jdbc.core.simple.ParameterizedBeanPropertyRowMapper;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.orm.hibernate3.HibernateTemplate;
import org.springframework.orm.hibernate3.SessionFactoryUtils;
import org.springframework.orm.hibernate3.support.HibernateDaoSupport;

import com.myapp.batch.dao.BatchControlDao;
import com.myapp.batch.domain.BatchControl;

public class BatchControlDaoImpl extends HibernateDaoSupport implements BatchControlDao {
    
    public static final String SELECT_BY_JOB_NAME_SQL =
            "select job_id, job_name, start_timestamp, end_timestamp, status,"
                    + " account_no_from, account_no_to, last_account_no from batch_control where job_name = ?";
            
    private static final String JOB_START_UPDATE_SQL =
            "update batch_control set start_timestamp = ?, end_timestamp = ?, status = ?, last_account_no=?"
                    + " where batch_id = ?";

    private static final String JOB_COMPLETE_UPDATE_SQL =
            "update batch_control set end_timestamp = ?, status = ? where batch_id = ?";

    private static final String LAST_PROCESSED_ACC_UPDATE_SQL =
            "update batch_control set last_account_no = ? where batch_id = ?";

    private SimpleJdbcTemplate simpleJdbcTemplate;

    public BatchControlDaoImpl(HibernateTemplate hibernateTemplate) {
        setHibernateTemplate(hibernateTemplate);
        this.simpleJdbcTemplate =
                new SimpleJdbcTemplate(SessionFactoryUtils.getDataSource(hibernateTemplate.getSessionFactory()));
    }

    public void init(Long Id, Integer last_account_no) {
        simpleJdbcTemplate.update(JOB_START_UPDATE_SQL, new DateTime().toDate(), null, BatchStatus.STARTED.name(),
                last_account_no, batchId);
    }

    public void saveJobComplete(Long batchId, BatchStatus status) {
        simpleJdbcTemplate.update(JOB_COMPLETE_UPDATE_SQL, new DateTime().toDate(), status.name(), batchId);
    }

    public void saveLastProcessedId(String last_account_no, Long batchId) {
        simpleJdbcTemplate.update(LAST_PROCESSED_ACC_UPDATE_SQL, last_account_no, batchId);
    }

    public BatchControl getBatchControl(String jobName) {
        BatchControl batchControl =
                simpleJdbcTemplate.queryForObject(SELECT_BY_JOB_NAME_SQL, ParameterizedBeanPropertyRowMapper
                        .newInstance(BatchControl.class), jobName);
        return batchControl;
    }

}

This will be concluded in part-3.


Labels:

1 Comments:

Blogger Brady... said...

Thank you so much Arul... The way you explained and the way you highlighted is suberb...

8:41 PM, November 07, 2013  

Post a Comment

Subscribe to Post Comments [Atom]

<< Home