Spring Batch Example in Spring boot - Spring Batch partition example - Walking Techie

Blog about Java programming, Design Pattern, and Data Structure.

Thursday, March 30, 2017

Spring Batch Example in Spring boot - Spring Batch partition example

Photo Credit : Spring Batch

In Spring Batch, "partitioning" is multiple threads to process range of data each. Lets take example, You have 100 records in table, which has primary id assigned from 1 to 100 and you want to access all 100 records.

All the discussed spring batch example, Normally a single thread example. If processing of 1 to 100 records takes 1 minutes in single thread example.

Single Thread -- process records from 1 to 100

In Partition, We can start 10 threads (slave) to process 10 records each, so normally one thread takes 6 seconds

Thread 1 -- process records from 1 to 10
Thread 1 -- process records from 1 to 10
Thread 2 -- process records from 11 to 20
Thread 3 -- process records from 21 to 30
Thread 4 -- process records from 31 to 40
Thread 5 -- process records from 41 to 50
Thread 6 -- process records from 51 to 60
..........
..........
Thread 9 -- process records from 81 to 90
Thread 10 -- process records from 91 to 100

Project structure

This is a directory structure of the standard gradle project.

Partition Example Project Structure

Project dependencies

In this tutorial we will discuss about how to create "partitioner" job, which has 10 threads, each thread will read data from database, based on providing range of data.

User table have data like below and have 100 records.

id, username, password, age
1,santosh,password,30
2,santosh,password,24
3,santosh,password,22
.........
.........
.........
task wrapper(type: Wrapper) {
    gradleVersion = '3.2.1'
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

sourceCompatibility = 1.8

repositories {
    mavenLocal()
    mavenCentral()
}


dependencies {
    compileOnly('org.projectlombok:lombok:1.16.12')
    compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE')
    testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE')
}
buildscript {
    repositories {
        mavenLocal()
        jcenter()
    }
    dependencies {
        classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE"
    }
}

application.properties file

#empty

Spring Batch Jobs

In PartitionerJob:

  1. In TaskExecutorPartitionHandler setting grid size which actually number of threads.
  2. for slaveReader, #{stepExecutionContext[fromId]}, #{stepExecutionContext[toId], and #{stepExecutionContext[name] value will be injected by the ExecutionContext in rangePartitioner.
  3. For writers, each thread will output the records in a different csv files, with filename format - users.processed[fromId]}-[toId].csv.
package com.walking.techie.jobs;

import com.walking.techie.model.User;
import com.walking.techie.partition.RangePartitioner;
import com.walking.techie.processor.UserProcessor;
import com.walking.techie.tasklet.DummyTasklet;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;

@Slf4j
@Configuration
@EnableBatchProcessing
public class PartitionerJob {

  @Autowired
  private JobBuilderFactory jobBuilderFactory;
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
  @Autowired
  private DataSource dataSource;

  @Bean
  public Job PartitionJob() {
    return jobBuilderFactory.get("partitionJob").incrementer(new RunIdIncrementer())
        .start(masterStep()).next(step2()).build();
  }

  @Bean
  public Step step2() {
    return stepBuilderFactory.get("step2").tasklet(dummyTask()).build();
  }

  @Bean
  public DummyTasklet dummyTask() {
    return new DummyTasklet();
  }

  @Bean
  public Step masterStep() {
    return stepBuilderFactory.get("masterStep").partitioner(slave().getName(), rangePartitioner())
        .partitionHandler(masterSlaveHandler()).build();
  }

  @Bean
  public PartitionHandler masterSlaveHandler() {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setGridSize(10);
    handler.setTaskExecutor(taskExecutor());
    handler.setStep(slave());
    try {
      handler.afterPropertiesSet();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return handler;
  }

  @Bean(name = "slave")
  public Step slave() {
    log.info("...........called slave .........");

    return stepBuilderFactory.get("slave").<User, User>chunk(100)
        .reader(slaveReader(null, null, null))
        .processor(slaveProcessor(null)).writer(slaveWriter(null, null)).build();
  }

  @Bean
  public RangePartitioner rangePartitioner() {
    return new RangePartitioner();
  }

  @Bean
  public SimpleAsyncTaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor();
  }

  @Bean
  @StepScope
  public UserProcessor slaveProcessor(@Value("#{stepExecutionContext[name]}") String name) {
    log.info("********called slave processor **********");
    UserProcessor userProcessor = new UserProcessor();
    userProcessor.setThreadName(name);
    return userProcessor;
  }

  @Bean
  @StepScope
  public JdbcPagingItemReader<User> slaveReader(
      @Value("#{stepExecutionContext[fromId]}") final String fromId,
      @Value("#{stepExecutionContext[toId]}") final String toId,
      @Value("#{stepExecutionContext[name]}") final String name) {
    log.info("slaveReader start " + fromId + " " + toId);
    JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(dataSource);
    reader.setQueryProvider(queryProvider());
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("fromId", fromId);
    parameterValues.put("toId", toId);
    log.info("Parameter Value " + name + " " + parameterValues);
    reader.setParameterValues(parameterValues);
    reader.setPageSize(1000);
    reader.setRowMapper(new BeanPropertyRowMapper<User>() {{
      setMappedClass(User.class);
    }});
    log.info("slaveReader end " + fromId + " " + toId);
    return reader;
  }

  @Bean
  public PagingQueryProvider queryProvider() {
    log.info("queryProvider start ");
    SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
    provider.setDataSource(dataSource);
    provider.setSelectClause("select id, username, password, age");
    provider.setFromClause("from user");
    provider.setWhereClause("where id >= :fromId and id <= :toId");
    provider.setSortKey("id");
    log.info("queryProvider end ");
    try {
      return provider.getObject();
    } catch (Exception e) {
      log.info("queryProvider exception ");
      e.printStackTrace();
    }

    return null;
  }

  @Bean
  @StepScope
  public FlatFileItemWriter<User> slaveWriter(
      @Value("#{stepExecutionContext[fromId]}") final String fromId,
      @Value("#{stepExecutionContext[toId]}") final String toId) {
    FlatFileItemWriter<User> reader = new FlatFileItemWriter<>();
    reader.setResource(new FileSystemResource(
        "csv/outputs/users.processed" + fromId + "-" + toId + ".csv"));
    //reader.setAppendAllowed(false);
    reader.setLineAggregator(new DelimitedLineAggregator<User>() {{
      setDelimiter(",");
      setFieldExtractor(new BeanWrapperFieldExtractor<User>() {{
        setNames(new String[]{"id", "username", "password", "age"});
      }});
    }});
    return reader;
  }
}

A Java model class

package com.walking.techie.model;

import lombok.Data;

@Data
public class User {

  int id;
  String username;
  String password;
  int age;
}

First, create a Partitioner implementation, puts the “partitioning range” into the ExecutionContext. Later, you can fetch from ExecutionContext. In this case, the partitioning range is look like the following :

Thread 1 = 1 - 10
Thread 2 = 11 - 20
Thread 3 = 21 - 30
......
Thread 10 = 91 - 100
package com.walking.techie.partition;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

@Slf4j
public class RangePartitioner implements Partitioner {

  @Override
  public Map<String, ExecutionContext> partition(int gridSize) {
    log.info("partition called gridsize= " + gridSize);

    Map<String, ExecutionContext> result
        = new HashMap<String, ExecutionContext>();

    int range = 10;
    int fromId = 1;
    int toId = range;

    for (int i = 1; i <= gridSize; i++) {
      ExecutionContext value = new ExecutionContext();

      System.out.println("\nStarting : Thread" + i);
      System.out.println("fromId : " + fromId);
      System.out.println("toId : " + toId);

      value.putInt("fromId", fromId);
      value.putInt("toId", toId);

      // give each thread a name, thread 1,2,3
      value.putString("name", "Thread" + i);

      result.put("partition" + i, value);

      fromId = toId + 1;
      toId += range;

    }
    return result;
  }
}

The UserProcessor class is used to print the processing item thread name, id and username.

package com.walking.techie.processor;

import com.walking.techie.model.User;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class UserProcessor implements ItemProcessor<User, User> {

  private String threadName;

  public String getThreadName() {
    return threadName;
  }

  public void setThreadName(String threadName) {
    this.threadName = threadName;
  }

  @Override
  public User process(User item) throws Exception {
    System.out.println(threadName + " processing : "
        + item.getId() + " : " + item.getUsername());
    return item;
  }
}

This is the dummy tasklet which will called after all the threads completed execution.

package com.walking.techie.tasklet;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

@Slf4j
public class DummyTasklet implements Tasklet {

  @Override
  public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
      throws Exception {
    log.info("Dummy Tasklet called.");
    return RepeatStatus.FINISHED;
  }
}

Run Application

package com.walking.techie;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

Output

Output of the application will store in csv/outputs/ folder.

output in console

2017-03-29 20:49:56.538  INFO 44616 --- [           main] org.hibernate.Version                    : HHH000412: Hibernate Core {5.0.12.Final}
2017-03-29 20:49:56.539  INFO 44616 --- [           main] org.hibernate.cfg.Environment            : HHH000206: hibernate.properties not found
2017-03-29 20:49:56.540  INFO 44616 --- [           main] org.hibernate.cfg.Environment            : HHH000021: Bytecode provider name : javassist
2017-03-29 20:49:56.571  INFO 44616 --- [           main] o.hibernate.annotations.common.Version   : HCANN000001: Hibernate Commons Annotations {5.0.1.Final}
2017-03-29 20:49:56.661  INFO 44616 --- [           main] org.hibernate.dialect.Dialect            : HHH000400: Using dialect: org.hibernate.dialect.MySQL5Dialect
2017-03-29 20:49:56.815  INFO 44616 --- [           main] org.hibernate.tool.hbm2ddl.SchemaUpdate  : HHH000228: Running hbm2ddl schema update
2017-03-29 20:49:56.843  INFO 44616 --- [           main] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2017-03-29 20:49:56.929  INFO 44616 --- [           main] com.walking.techie.jobs.PartitionerJob   : ...........called slave .........
2017-03-29 20:49:57.047  INFO 44616 --- [           main] com.walking.techie.jobs.PartitionerJob   : queryProvider start
2017-03-29 20:49:57.052  INFO 44616 --- [           main] com.walking.techie.jobs.PartitionerJob   : queryProvider end
2017-03-29 20:49:57.165  WARN 44616 --- [           main] o.s.b.a.batch.BasicBatchConfigurer       : JPA does not support custom isolation levels, so locks may not be taken when launching Jobs
2017-03-29 20:49:57.169  INFO 44616 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2017-03-29 20:49:57.258  INFO 44616 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2017-03-29 20:49:57.275  INFO 44616 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/batch/core/schema-mysql.sql]
2017-03-29 20:49:57.293  INFO 44616 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/batch/core/schema-mysql.sql] in 17 ms.
2017-03-29 20:49:57.442  INFO 44616 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2017-03-29 20:49:57.451  INFO 44616 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: []
2017-03-29 20:49:57.571  INFO 44616 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionJob]] launched with the following parameters: [{run.id=5}]
2017-03-29 20:49:57.585  INFO 44616 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [masterStep]
2017-03-29 20:49:57.591  INFO 44616 --- [           main] c.w.techie.partition.RangePartitioner    : partition called gridsize= 10

Starting : Thread1
fromId : 1
toId : 10

Starting : Thread2
fromId : 11
toId : 20

Starting : Thread3
fromId : 21
toId : 30

Starting : Thread4
fromId : 31
toId : 40

Starting : Thread5
fromId : 41
toId : 50

Starting : Thread6
fromId : 51
toId : 60

Starting : Thread7
fromId : 61
toId : 70

Starting : Thread8
fromId : 71
toId : 80

Starting : Thread9
fromId : 81
toId : 90

Starting : Thread10
fromId : 91
toId : 100
2017-03-29 20:49:57.666  INFO 44616 --- [cTaskExecutor-4] com.walking.techie.jobs.PartitionerJob   : slaveReader start 41 50
2017-03-29 20:49:57.668  INFO 44616 --- [cTaskExecutor-4] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread5 {toId=50, fromId=41}
2017-03-29 20:49:57.670  INFO 44616 --- [cTaskExecutor-4] com.walking.techie.jobs.PartitionerJob   : slaveReader end 41 50
2017-03-29 20:49:57.679  INFO 44616 --- [TaskExecutor-10] com.walking.techie.jobs.PartitionerJob   : slaveReader start 71 80
2017-03-29 20:49:57.679  INFO 44616 --- [TaskExecutor-10] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread8 {toId=80, fromId=71}
2017-03-29 20:49:57.679  INFO 44616 --- [TaskExecutor-10] com.walking.techie.jobs.PartitionerJob   : slaveReader end 71 80
2017-03-29 20:49:57.680  INFO 44616 --- [cTaskExecutor-3] com.walking.techie.jobs.PartitionerJob   : slaveReader start 11 20
2017-03-29 20:49:57.681  INFO 44616 --- [cTaskExecutor-3] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread2 {toId=20, fromId=11}
2017-03-29 20:49:57.681  INFO 44616 --- [cTaskExecutor-3] com.walking.techie.jobs.PartitionerJob   : slaveReader end 11 20
2017-03-29 20:49:57.682  INFO 44616 --- [cTaskExecutor-5] com.walking.techie.jobs.PartitionerJob   : slaveReader start 91 100
2017-03-29 20:49:57.682  INFO 44616 --- [cTaskExecutor-5] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread10 {toId=100, fromId=91}
2017-03-29 20:49:57.682  INFO 44616 --- [cTaskExecutor-5] com.walking.techie.jobs.PartitionerJob   : slaveReader end 91 100
2017-03-29 20:49:57.683  INFO 44616 --- [cTaskExecutor-6] com.walking.techie.jobs.PartitionerJob   : slaveReader start 51 60
2017-03-29 20:49:57.683  INFO 44616 --- [cTaskExecutor-6] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread6 {toId=60, fromId=51}
2017-03-29 20:49:57.684  INFO 44616 --- [cTaskExecutor-6] com.walking.techie.jobs.PartitionerJob   : slaveReader end 51 60
2017-03-29 20:49:57.685  INFO 44616 --- [cTaskExecutor-9] com.walking.techie.jobs.PartitionerJob   : slaveReader start 31 40
2017-03-29 20:49:57.685  INFO 44616 --- [cTaskExecutor-9] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread4 {toId=40, fromId=31}
2017-03-29 20:49:57.685  INFO 44616 --- [cTaskExecutor-9] com.walking.techie.jobs.PartitionerJob   : slaveReader end 31 40
2017-03-29 20:49:57.686  INFO 44616 --- [cTaskExecutor-8] com.walking.techie.jobs.PartitionerJob   : slaveReader start 1 10
2017-03-29 20:49:57.687  INFO 44616 --- [cTaskExecutor-8] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread1 {toId=10, fromId=1}
2017-03-29 20:49:57.687  INFO 44616 --- [cTaskExecutor-8] com.walking.techie.jobs.PartitionerJob   : slaveReader end 1 10
2017-03-29 20:49:57.688  INFO 44616 --- [cTaskExecutor-1] com.walking.techie.jobs.PartitionerJob   : slaveReader start 61 70
2017-03-29 20:49:57.688  INFO 44616 --- [cTaskExecutor-1] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread7 {toId=70, fromId=61}
2017-03-29 20:49:57.688  INFO 44616 --- [cTaskExecutor-1] com.walking.techie.jobs.PartitionerJob   : slaveReader end 61 70
2017-03-29 20:49:57.689  INFO 44616 --- [cTaskExecutor-2] com.walking.techie.jobs.PartitionerJob   : slaveReader start 81 90
2017-03-29 20:49:57.689  INFO 44616 --- [cTaskExecutor-2] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread9 {toId=90, fromId=81}
2017-03-29 20:49:57.689  INFO 44616 --- [cTaskExecutor-2] com.walking.techie.jobs.PartitionerJob   : slaveReader end 81 90
2017-03-29 20:49:57.690  INFO 44616 --- [cTaskExecutor-7] com.walking.techie.jobs.PartitionerJob   : slaveReader start 21 30
2017-03-29 20:49:57.690  INFO 44616 --- [cTaskExecutor-7] com.walking.techie.jobs.PartitionerJob   : Parameter Value Thread3 {toId=30, fromId=21}
2017-03-29 20:49:57.690  INFO 44616 --- [cTaskExecutor-7] com.walking.techie.jobs.PartitionerJob   : slaveReader end 21 30
2017-03-29 20:49:57.748  INFO 44616 --- [cTaskExecutor-8] com.walking.techie.jobs.PartitionerJob   : ********called slave processor **********
Thread1 processing : 1 : santosh
Thread1 processing : 2 : santosh
Thread1 processing : 3 : santosh
Thread1 processing : 4 : santosh
Thread1 processing : 5 : santosh
Thread1 processing : 6 : santosh
2017-03-29 20:49:57.787  INFO 44616 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
2017-03-29 20:49:57.791  INFO 44616 --- [           main] com.walking.techie.tasklet.DummyTasklet  : Dummy Tasklet called.
2017-03-29 20:49:57.801  INFO 44616 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionJob]] completed with the following parameters: [{run.id=5}] and the following status: [COMPLETED]

Note : This code has been compiled and run on mac notebook and intellij IDEA.

9 comments :

  1. Nice share. Quite informative

    ReplyDelete
  2. Nice Tutorial. I am trying to run the your source code locally. Its working fine, But I don't see User processor is being called. Could you please guide me? Do I need to add any settings.

    ReplyDelete
    Replies
    1. Please compare your source code with git hub code.
      https://github.com/walkingtechie/spring-batch-partition

      If you clone the code from github or follow the above step then it should call the user processor.

      Delete
  3. Thanks for your reply !!!.....I have compared ...everything looks good for me. However will verify again. And one more thingI have a requirement that need to call third party application through web service.
    1. Read the data from DB
    2. call webservice and then store response back to table
    3. commit the transaction.
    where should I write service invocation logic either processor or Writer? Could you guide me on this

    ReplyDelete
    Replies
    1. install mysql developer and run below queries
      Create DATABASE spring_batch;

      CREATE TABLE IF NOT EXISTS user (
      id int(5) NOT NULL AUTO_INCREMENT,
      username varchar(50) DEFAULT NULL,
      password varchar(20) DEFAULT NULL,
      age int(5) DEFAULT NULL,
      PRIMARY KEY(id));

      INSERT INTO spring_batch.user (id,username,password,age) VALUES('1','gaurav','root','25');

      Delete
    2. You should write the logic in processor to call the web service. Once you got the response from the service, that you can save in writer.

      Delete
  4. Whats the role of chunk in above example ?

    ReplyDelete
    Replies
    1. Chunk is refer as Chunk oriented processing. That is, instead of reading, processing and writing all the lines at once, it’ll read, process and write a fixed amount of records (chunk) at a time. You can use chunk oriented processing for a reader, a writer and a processor over data.

      Delete
  5. Can you pls post same example where we read data from one db table and write it to another table ( same or different db table) as here we are writing to csv instead i need it to db.

    ReplyDelete