How to build a full flume interceptor by a non java developer

Building a flume interceptor does not look too complex. You can find some examples, howtos or cookbooks around. The issue is that they all explain the specificities of the interceptor, leaving a python/perl guy like me in the dark about maven, classpaths or imports. This posts aims to correct this. This will describe the bare minimum to get it working, but I link to more documentation if you want to go deeper. You can find the code on github as well.

The interceptor itself

You need to create a directory structure, in which to write the following java code. It needs to be under src/main/java/com/example/flume/interceptors/eventTweaker.java in this example, for an interceptor named com.example.flume.interceptor.eventTweaker.

package com.example.flume.interceptors;

import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import org.apache.log4j.Logger;

public class eventTweaker implements Interceptor {

  private static final Logger LOG = Logger.getLogger(eventTweaker.class);

  // private means that only Builder can build me.
  private eventTweaker() {}

  @Override
  public void initialize() {}

  @Override
  public Event intercept(Event event) {

    Map<String, String> headers = event.getHeaders();

    // example: add / remove headers
    if (headers.containsKey("lice")) {
      headers.put("shampoo", "antilice");
      headers.remove("lice");
    }

    // example: change body
    String body = new String(event.getBody());
    if (body.contains("injuries")) {
      try {
        event.setBody("cyborg".getBytes("UTF-8"));
      } catch (java.io.UnsupportedEncodingException e) {
        LOG.warn(e);
        // drop event completely
        return null;
      }
    }

    return event;
  }

  @Override
  public List<Event> intercept(List<Event> events) {
    for (Event event:events) {
      intercept(event);
    }
    return events;
  }

  @Override
  public void close() {}

  public static class Builder implements Interceptor.Builder {

    @Override
    public Interceptor build() {
      return new eventTweaker();
    }

    @Override
    public void configure(Context context) {}
  }
}

Maven configuration

To build the above, you need a pom.xml, to be put at the root of the aforementioned tree. The following is bare minimum which would probably make any java developer cringe, but It Works™.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>org.apache.flume</groupId>
 <artifactId>eventTweaker</artifactId>
 <version>1.0</version>

 <repositories>
 <repository>
 <id>central</id>
 <name>Maven Central</name>
 <url>http://repo1.maven.org/maven2/</url>
 </repository>
 <repository>
 <id>cloudera-repo</id>
 <name>Cloudera CDH</name>
 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
 </repository>
 </repositories>

 <properties>
 <flume.version>1.4.0-cdh4.4.0</flume.version>
 </properties>

 <dependencies>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-sdk</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>log4j</groupId>
 <artifactId>log4j</artifactId>
 <version>1.2.16</version>
 <exclusions>
 <exclusion>
 <groupId>com.sun.jdmk</groupId>
 <artifactId>jmxtools</artifactId>
 </exclusion>
 <exclusion>
 <groupId>com.sun.jmx</groupId>
 <artifactId>jmxri</artifactId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>4.11</version>
 </dependency>
 </dependencies>

</project>

Once done, you can just type

mvn package

and a jar will be created for you under the target directory.

Tree example

After compilation, here is how my directory structure looks like:

.
├── pom.xml
├── src
│   └── main
│       └── java
│           └── com
│               └── example
│                   └── flume
│                       └── interceptor
│                           └── eventTweaker.java
└── target
    ├── classes
    │   └── com
    │       └── example
    │           └── flume
    │               └── interceptors
    │                   ├── eventTweaker$1.class
    │                   ├── eventTweaker$Builder.class
    │                   └── eventTweaker.class
    ├── maven-archiver
    │   └── pom.properties
    └── eventTweaker-1.0.jar

Jar destination

You need to put the jar (in this case target/eventTweaker-1.0.jar) in the flume classpath for it to be loaded. You can do so in 2 ways:

  1. By editing the FLUME_CLASSPATH variable in the flume-env.sh file
  2. By adding it under the $FLUME_HOME/plugins.d directory. In this directory, put your jar at  your_interceptor_name/lib/yourinterceptor.jar and voilà, it will be loaded automagically.

Flume configuration

Debuging

You can see log statements in the usual flume.log file, probably under /var/log/flume-ng/flume.log

flume.conf

This is the easiest bit. On your source definition, you just have to add an interceptor name and class:

agent.sources.yoursource.interceptors=tweaker
agent.sources.yoursource.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder
Advertisements

6 thoughts on “How to build a full flume interceptor by a non java developer

  1. In the paragraph “The interceptor itself” you specify a path : “src/main/com/[…]”.
    When you show the structure that you have, there is a “java” folder in addition.
    Without this “java” folder, the maven command work (“success”) but there is noghing in the jar file produced.
    Without this mistake, your tuto is really helpful, clear and fast to do, thank you for it !

  2. Pingback: error compiling java -use source 5

  3. Great!, I have a Java Client that send bytes[] to flume and my interceptor should transform to json, but this not happen, I was trying to see flume log but all its ok, there is an option to monitoring in realtime my object and interceptor? Thanks ans great Post

  4. hello boss,
    My flume interceptor event is not taking whole json string..it takes only half the string and thorws error while json parsing.
    PFB the issue.Kindly help.
    {“parent”:””,”caused_by”:””,”watch_list”:””,”upon_reject”:”cancel”,”sys_updated_on”:”2017-01-11 01:00:07″,”u_escalate”:”false”,”u_escalation_reason”:””,”approval_history”:””,”skills”:””,”number”:”INC0276556″,”u_geo”:””,”state”:”7″,”u_oasis_reservation”:””,”sys_created_by”:”bijayak”,”u_type_of_impact”:””,”knowledge”:”false”,”order”:””,”u_country”:””,”cmdb_ci”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/cmdb_ci/383ed8c86f3c2200f28cfee09d3ee4e6″,”value”:”383ed8c86f3c2200f28cfee09d3ee4e6″},”contract”:””,”impact”:”3″,”active”:”false”,”work_notes_list”:””,”priority”:”5″,”sys_domain_path”:”/”,”business_duration”:”1970-01-01 00:00:00″,”group_list”:””,”approval_set”:””,”short_description”:”Outlook not responding on BLR-HRZ view”,”correlation_display”:””,”work_start”:””,”additional_assignee_list”:””,”notify”:”1″,”sys_class_name”:”incident”,”closed_by”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/sys_user/b0b76aaf4f456a04ec8da3928110c766″,”value”:”b0b76aaf4f456a04ec8da3928110c766″},”follow_up”:””,”parent_incident”:””,”reassignment_count”:”0″,”assigned_to”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/sys_user/b0b76aaf4f456a04ec8da3928110c766″,”value”:”b0b76aaf4f456a04ec8da3928110c766″},”u_region”:””,”u_users_impacted”:”me”,”sla_due”:””,”comments_and_work_notes”:””,”u_category”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/u_category/417c30486f7c2200f28cfee09d3ee4af”,”value”:”417c30486f7c2200f28cfee09d3ee4af”},”escalation”:”0″,”u_related_request”:””,”upon_approval”:”proceed”,”correlation_id”:””,”u_region_group”:””,”made_sla”:”true”,”u_escalate_updated”:””,”child_incidents”:”0″,”u_outage”:”false”,”resolved_by”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/sys_user/b0b76aaf4f456a04ec8da3928110c766″,”value”:”b0b76aaf4f456a04ec8da3928110c766″},”sys_updated_by”:”system”,”opened_by”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/sys_user/b0b76aaf4f456a04ec8da3928110c766″,”value”:”b0b76aaf4f456a04ec8da3928110c766″},”user_input”:””,”sys_creat
    2017-04-28 07:35:35,912 (pool-3-thread-1) [ERROR – com.example.flume.interceptors.eventTweaker.intercept(EventTweaker.java:69)] exception Unterminated string at 2049 [character 2050 line 1]
    2017-04-28 07:35:35,912 (pool-3-thread-1) [INFO – com.example.flume.interceptors.eventTweaker.intercept(EventTweaker.java:70)] Received this log message that is not formatted in json:

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s