
Mar 13, 2013

Apache Camel for asynchronous processing

Apache Camel is a very powerful and can be used with Java to solve various problems. In this example, I will discuss how it can be used to generate feeds asynchronously. You don't have to write any multi-threading code. Sending email can be done with the MVEL expression language.

Following is a simple example where some feed files are processed asynchronously by retrieving the job request from an in-memory queue. In the event of error, email notification is sent. "from" is a consumer, and "to" is the destination.

Step 1:  The dependency jars required defined via Maven pom file.

   <!-- CAMEL -->






Step 2: The next step is to bootstrap Camel via Spring Java based configuration. Making the config class CamelContextAware will inject the camel context via the setter method.

package com.myapp.config;

import com.myapp.camel.JobHandlingRouteBuilder;
import com.myapp.service.MyAppService;

import javax.annotation.Resource;

import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class MyAppConfig implements CamelContextAware
    private CamelContext context;
    @Resource(name = "myAppService")
    private MyAppService myAppService;
    //constructor injection, where template is the bean id.
    public ProducerTemplate template()
        if (context != null)
            return context.createProducerTemplate();
    public RouteBuilder jobHandlingRouteBuilder()
        return new JobHandlingRouteBuilder(myAppForecastService);
    public void setCamelContext(CamelContext camelContext)
        context = camelContext;
    public CamelContext getCamelContext()
        return context;

Step 3: Define the ProducerTemplate within your Service class.  The ProducerTemplate interface allows you to send message exchanges to endpoints in a variety of different ways to make it easy to work with Camel Endpoint instances from Java code. This service class could be invoked via a RESTful Webservice. 


@Service(value = "myAppService")
@Transactional(propagation = Propagation.SUPPORTS)
public class CashForecastServiceImpl implements CashForecastService

   private ProducerTemplate template;

   //since autowired, injected via MyAppConfig template() method with beanId being template.
   public void setTemplate(ProducerTemplate template)
       this.template = template;
    public boolean handleGroupLevelFeedGenerationRequest()
    //a pojo Java class with fields like account code, etc and getter/setter methods
    FeedGenerationRequest request = new FeedGenerationRequest();
       //add headers and body. header will be used to determine processing logic by the RouteBuilder 
        Map<string, object> headers = new HashMap<string,object>();
        headers.put(JobHandlingRouteBuilder.JOB_TYPE_HEADER, JobType.FEED_GENERATION);
  //send it to an in memory BockingQueue define in the JobHandlingRouteBuilder class
        template.sendBodyAndHeaders(JobHandlingRouteBuilder.JOB_QUEUE, request, headers);
        return true;

    public boolean generateFeed1(FeedGenerationRequest request){
      //logic to generate feed goes here
    public boolean generateFeed2(FeedGenerationRequest request){
       //logic to generate feed goes here


Step 4: Finally, define the camel route to queue and asynchronously generate the required feed files.

package com.myapp.camel;

import com.myapp.JobType;
import com.myapp.MyAppService;

import org.apache.camel.builder.RouteBuilder;

public class JobHandlingRouteBuilder extends RouteBuilder
    public static final String JOB_QUEUE = "vm:jobQueue?size=50&timeout=1000000&concurrentConsumers=1";
    public static final String FEED_GENERATION_JOB_QUEUE = "vm:feedGenerationJobQueue?size=50&timeout=1000000&concurrentConsumers=1";
    public static final String DIRECT_FEED1 = "direct:feed1";
    public static final String DIRECT_ERROR = "direct:error";
    public static final String DIRECT_FEED2 = "direct:feed2";
    public static final String JOB_TYPE_HEADER = "jobTypeHeader";
    public static final String CONTROLLABLE_JOB_TYPE_HEADER = "controllableJobTypeHeader";
    private MyAppService myAppService;
    public JobHandlingRouteBuilder(MyAppService myAppService)
        this.myAppService = myAppService;
    public void configure() throws Exception
     //build routes
     * Main route to handling all jobs
    private void configureJobHandlingRoute()
  //from the in memory job queue move it to the in memory feed generation queue
                    .log(INFO, "Handling Feed Generation Job")    
     * Route to handle Feed Generation jobs
    public void configureFeedGenerationJobHandlingRoute()
        // @formatter:off
              .multicast()                           //multiple destinations  
                  .parallelProcessing()              //multiple threads
                  .to(DIRECT_FEED2, DIRECT_FEED1)
          //TODO handle exception and mark the status as FAILED
              .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.POS_FEED.toString()))
                  .bean(myAppService, "generateFeed2") //invokes generateFeed2 method on myAppService bean

              .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.CASH_FEED.toString()))
                  .bean(myAppService, "generateFeed1")  //invokes generateFeed2 method on myAppService bean
          //Handle failtures
              .log(INFO, " ${headers.controllableJobTypeHeader} Job failed, reason:  ${exception.stacktrace}")
              .bean(cashForecastService, "markControllableJobFailed")

If you want to send email notification on error, the routes can be enhanced as shown below.

Firstly, add camel mail component.


        public static final String DIRECT_EMAIL_NOTIFICATION = "direct:emailNotification"; 
  public static final String NOTIFICATION_FLAG_HEADER = "notificationFlagHeader";
  public static final String CREATION_FAILURE_EVENT_SUBJECT = "Failed Feed Generation";
  private static final String LOG_URI = "log:" + AbstractCommonRouteBuilder.class.getPackage().getName()
            + "?level=ERROR";

        public void doConfigure()
              .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.CASH_FEED.toString()))
                  .bean(myAppService, "produceGroupLevelCashFeed")
          //Handle failtures
              .log(INFO, " ${headers.controllableJobTypeHeader} Job failed, reason:  ${exception.stacktrace}")
              .bean(myApp, "handleControllableJobFailed")
              .setHeader(NOTIFICATION_FLAG_HEADER, simple("{{}}"))
                      .log(INFO, "Sending email notification on Failture")
                      .setBody(simple("${headers.controllableJobTypeHeader} : ${exception.message}"))
                          + "&subject="
                          + CREATION_FAILURE_EVENT_SUBJECT
                          + "&mail.smtp.auth=false&mail.smtp.starttls.enable=false&delete=true&mapMailMessage=false");

     * TODO: The better approach would be to use
     * BridgePropertyPlaceholderConfigurer so that it picks up @PropertySource
     * style configuration
    protected void addPropertiesLocation(String... newLocations)
        PropertiesComponent properties = getPropertiesComponent();
        String[] locationsArr = properties.getLocations();
        List<string> locations = new ArrayList<string>();
        if (locationsArr != null)
            for (String location : locationsArr)
        for (String location : newLocations)
        locationsArr = new String[locations.size()];
        locationsArr = locations.toArray(locationsArr);

The route is defined using MVEL, which is a powerful expression language for Java-based applications. You can also appreciate, how easy it is to build your routes using various protocols.



Post a Comment

Subscribe to Post Comments [Atom]

<< Home