Building a flume interceptor does not look too complex. You can find some examples, howtos or cookbooks around. The issue is that they all explain the specificities of the interceptor, leaving a python/perl guy like me in the dark about maven, classpaths or imports. This posts aims to correct this. This will describe the bare minimum to get it working, but I link to more documentation if you want to go deeper. You can find the code on github as well.
The interceptor itself
You need to create a directory structure, in which to write the following java code. It needs to be under src/main/java/com/example/flume/interceptors/eventTweaker.java in this example, for an interceptor named com.example.flume.interceptor.eventTweaker.
package com.example.flume.interceptors; import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.log4j.Logger; public class eventTweaker implements Interceptor { private static final Logger LOG = Logger.getLogger(eventTweaker.class); // private means that only Builder can build me. private eventTweaker() {} @Override public void initialize() {} @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); // example: add / remove headers if (headers.containsKey("lice")) { headers.put("shampoo", "antilice"); headers.remove("lice"); } // example: change body String body = new String(event.getBody()); if (body.contains("injuries")) { try { event.setBody("cyborg".getBytes("UTF-8")); } catch (java.io.UnsupportedEncodingException e) { LOG.warn(e); // drop event completely return null; } } return event; } @Override public List<Event> intercept(List<Event> events) { for (Event event:events) { intercept(event); } return events; } @Override public void close() {} public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new eventTweaker(); } @Override public void configure(Context context) {} } }
Maven configuration
To build the above, you need a pom.xml, to be put at the root of the aforementioned tree. The following is bare minimum which would probably make any java developer cringe, but It Works™.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.flume</groupId> <artifactId>eventTweaker</artifactId> <version>1.0</version> <repositories> <repository> <id>central</id> <name>Maven Central</name> <url>http://repo1.maven.org/maven2/</url> </repository> <repository> <id>cloudera-repo</id> <name>Cloudera CDH</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <flume.version>1.4.0-cdh4.4.0</flume.version> </properties> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>${flume.version}</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${flume.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> <exclusions> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies> </project>
Once done, you can just type
mvn package
and a jar will be created for you under the target directory.
Tree example
After compilation, here is how my directory structure looks like:
. ├── pom.xml ├── src │ └── main │ └── java │ └── com │ └── example │ └── flume │ └── interceptor │ └── eventTweaker.java └── target ├── classes │ └── com │ └── example │ └── flume │ └── interceptors │ ├── eventTweaker$1.class │ ├── eventTweaker$Builder.class │ └── eventTweaker.class ├── maven-archiver │ └── pom.properties └── eventTweaker-1.0.jar
Jar destination
You need to put the jar (in this case target/eventTweaker-1.0.jar) in the flume classpath for it to be loaded. You can do so in 2 ways:
- By editing the FLUME_CLASSPATH variable in the flume-env.sh file
- By adding it under the $FLUME_HOME/plugins.d directory. In this directory, put your jar at your_interceptor_name/lib/yourinterceptor.jar and voilà, it will be loaded automagically.
Flume configuration
Debuging
You can see log statements in the usual flume.log file, probably under /var/log/flume-ng/flume.log
flume.conf
This is the easiest bit. On your source definition, you just have to add an interceptor name and class:
agent.sources.yoursource.interceptors=tweaker agent.sources.yoursource.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder