Building a flume interceptor does not look too complex. You can find some examples, howtos or cookbooks around. The issue is that they all explain the specificities of the interceptor, leaving a python/perl guy like me in the dark about maven, classpaths or imports. This posts aims to correct this. This will describe the bare minimum to get it working, but I link to more documentation if you want to go deeper. You can find the code on github as well.
The interceptor itself
You need to create a directory structure, in which to write the following java code. It needs to be under src/main/java/com/example/flume/interceptors/eventTweaker.java in this example, for an interceptor named com.example.flume.interceptor.eventTweaker.
package com.example.flume.interceptors;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.log4j.Logger;
public class eventTweaker implements Interceptor {
private static final Logger LOG = Logger.getLogger(eventTweaker.class);
// private means that only Builder can build me.
private eventTweaker() {}
@Override
public void initialize() {}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
// example: add / remove headers
if (headers.containsKey("lice")) {
headers.put("shampoo", "antilice");
headers.remove("lice");
}
// example: change body
String body = new String(event.getBody());
if (body.contains("injuries")) {
try {
event.setBody("cyborg".getBytes("UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
LOG.warn(e);
// drop event completely
return null;
}
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event:events) {
intercept(event);
}
return events;
}
@Override
public void close() {}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new eventTweaker();
}
@Override
public void configure(Context context) {}
}
}
Maven configuration
To build the above, you need a pom.xml, to be put at the root of the aforementioned tree. The following is bare minimum which would probably make any java developer cringe, but It Works™.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flume</groupId>
<artifactId>eventTweaker</artifactId>
<version>1.0</version>
<repositories>
<repository>
<id>central</id>
<name>Maven Central</name>
<url>http://repo1.maven.org/maven2/</url>
</repository>
<repository>
<id>cloudera-repo</id>
<name>Cloudera CDH</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<flume.version>1.4.0-cdh4.4.0</flume.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>${flume.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>${flume.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
<exclusions>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
</project>
Once done, you can just type
mvn package
and a jar will be created for you under the target directory.
Tree example
After compilation, here is how my directory structure looks like:
.
├── pom.xml
├── src
│ └── main
│ └── java
│ └── com
│ └── example
│ └── flume
│ └── interceptor
│ └── eventTweaker.java
└── target
├── classes
│ └── com
│ └── example
│ └── flume
│ └── interceptors
│ ├── eventTweaker$1.class
│ ├── eventTweaker$Builder.class
│ └── eventTweaker.class
├── maven-archiver
│ └── pom.properties
└── eventTweaker-1.0.jar
Jar destination
You need to put the jar (in this case target/eventTweaker-1.0.jar) in the flume classpath for it to be loaded. You can do so in 2 ways:
- By editing the FLUME_CLASSPATH variable in the flume-env.sh file
- By adding it under the $FLUME_HOME/plugins.d directory. In this directory, put your jar at your_interceptor_name/lib/yourinterceptor.jar and voilà, it will be loaded automagically.
Flume configuration
Debuging
You can see log statements in the usual flume.log file, probably under /var/log/flume-ng/flume.log
flume.conf
This is the easiest bit. On your source definition, you just have to add an interceptor name and class:
agent.sources.yoursource.interceptors=tweaker
agent.sources.yoursource.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder