Building a flume interceptor does not look too complex. You can find some examples, howtos or cookbooks around. The issue is that they all explain the specificities of the interceptor, leaving a python/perl guy like me in the dark about maven, classpaths or imports. This posts aims to correct this. This will describe the bare minimum to get it working, but I link to more documentation if you want to go deeper. You can find the code on github as well.
The interceptor itself
You need to create a directory structure, in which to write the following java code. It needs to be under src/main/java/com/example/flume/interceptors/eventTweaker.java in this example, for an interceptor named com.example.flume.interceptor.eventTweaker.
package com.example.flume.interceptors; import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.log4j.Logger; public class eventTweaker implements Interceptor { private static final Logger LOG = Logger.getLogger(eventTweaker.class); // private means that only Builder can build me. private eventTweaker() {} @Override public void initialize() {} @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); // example: add / remove headers if (headers.containsKey("lice")) { headers.put("shampoo", "antilice"); headers.remove("lice"); } // example: change body String body = new String(event.getBody()); if (body.contains("injuries")) { try { event.setBody("cyborg".getBytes("UTF-8")); } catch (java.io.UnsupportedEncodingException e) { LOG.warn(e); // drop event completely return null; } } return event; } @Override public List<Event> intercept(List<Event> events) { for (Event event:events) { intercept(event); } return events; } @Override public void close() {} public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new eventTweaker(); } @Override public void configure(Context context) {} } }
Maven configuration
To build the above, you need a pom.xml, to be put at the root of the aforementioned tree. The following is bare minimum which would probably make any java developer cringe, but It Works™.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.flume</groupId> <artifactId>eventTweaker</artifactId> <version>1.0</version> <repositories> <repository> <id>central</id> <name>Maven Central</name> <url>http://repo1.maven.org/maven2/</url> </repository> <repository> <id>cloudera-repo</id> <name>Cloudera CDH</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <flume.version>1.4.0-cdh4.4.0</flume.version> </properties> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>${flume.version}</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${flume.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> <exclusions> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies> </project>
Once done, you can just type
mvn package
and a jar will be created for you under the target directory.
Tree example
After compilation, here is how my directory structure looks like:
. ├── pom.xml ├── src │ └── main │ └── java │ └── com │ └── example │ └── flume │ └── interceptor │ └── eventTweaker.java └── target ├── classes │ └── com │ └── example │ └── flume │ └── interceptors │ ├── eventTweaker$1.class │ ├── eventTweaker$Builder.class │ └── eventTweaker.class ├── maven-archiver │ └── pom.properties └── eventTweaker-1.0.jar
Jar destination
You need to put the jar (in this case target/eventTweaker-1.0.jar) in the flume classpath for it to be loaded. You can do so in 2 ways:
- By editing the FLUME_CLASSPATH variable in the flume-env.sh file
- By adding it under the $FLUME_HOME/plugins.d directory. In this directory, put your jar at your_interceptor_name/lib/yourinterceptor.jar and voilà, it will be loaded automagically.
Flume configuration
Debuging
You can see log statements in the usual flume.log file, probably under /var/log/flume-ng/flume.log
flume.conf
This is the easiest bit. On your source definition, you just have to add an interceptor name and class:
agent.sources.yoursource.interceptors=tweaker agent.sources.yoursource.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder
In the paragraph “The interceptor itself” you specify a path : “src/main/com/[…]”.
When you show the structure that you have, there is a “java” folder in addition.
Without this “java” folder, the maven command work (“success”) but there is noghing in the jar file produced.
Without this mistake, your tuto is really helpful, clear and fast to do, thank you for it !
Good catch! This was indeed a discrepancy with the tree structure given below.
Thanks for the feedback, and glad that this tutorial was of help!
Pingback: error compiling java -use source 5
Great!, I have a Java Client that send bytes[] to flume and my interceptor should transform to json, but this not happen, I was trying to see flume log but all its ok, there is an option to monitoring in realtime my object and interceptor? Thanks ans great Post
Hi Orwell,
Sadly this was some time ago and I have not played much with Flume since, so I unfortunately cannot help you there. Good luck!
hello boss,
My flume interceptor event is not taking whole json string..it takes only half the string and thorws error while json parsing.
PFB the issue.Kindly help.
{“parent”:””,”caused_by”:””,”watch_list”:””,”upon_reject”:”cancel”,”sys_updated_on”:”2017-01-11 01:00:07″,”u_escalate”:”false”,”u_escalation_reason”:””,”approval_history”:””,”skills”:””,”number”:”INC0276556″,”u_geo”:””,”state”:”7″,”u_oasis_reservation”:””,”sys_created_by”:”bijayak”,”u_type_of_impact”:””,”knowledge”:”false”,”order”:””,”u_country”:””,”cmdb_ci”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/cmdb_ci/383ed8c86f3c2200f28cfee09d3ee4e6″,”value”:”383ed8c86f3c2200f28cfee09d3ee4e6″},”contract”:””,”impact”:”3″,”active”:”false”,”work_notes_list”:””,”priority”:”5″,”sys_domain_path”:”/”,”business_duration”:”1970-01-01 00:00:00″,”group_list”:””,”approval_set”:””,”short_description”:”Outlook not responding on BLR-HRZ view”,”correlation_display”:””,”work_start”:””,”additional_assignee_list”:””,”notify”:”1″,”sys_class_name”:”incident”,”closed_by”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/sys_user/b0b76aaf4f456a04ec8da3928110c766″,”value”:”b0b76aaf4f456a04ec8da3928110c766″},”follow_up”:””,”parent_incident”:””,”reassignment_count”:”0″,”assigned_to”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/sys_user/b0b76aaf4f456a04ec8da3928110c766″,”value”:”b0b76aaf4f456a04ec8da3928110c766″},”u_region”:””,”u_users_impacted”:”me”,”sla_due”:””,”comments_and_work_notes”:””,”u_category”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/u_category/417c30486f7c2200f28cfee09d3ee4af”,”value”:”417c30486f7c2200f28cfee09d3ee4af”},”escalation”:”0″,”u_related_request”:””,”upon_approval”:”proceed”,”correlation_id”:””,”u_region_group”:””,”made_sla”:”true”,”u_escalate_updated”:””,”child_incidents”:”0″,”u_outage”:”false”,”resolved_by”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/sys_user/b0b76aaf4f456a04ec8da3928110c766″,”value”:”b0b76aaf4f456a04ec8da3928110c766″},”sys_updated_by”:”system”,”opened_by”:{“link”:”https://vmwarecorpdev.service-now.com/api/now/table/sys_user/b0b76aaf4f456a04ec8da3928110c766″,”value”:”b0b76aaf4f456a04ec8da3928110c766″},”user_input”:””,”sys_creat
2017-04-28 07:35:35,912 (pool-3-thread-1) [ERROR – com.example.flume.interceptors.eventTweaker.intercept(EventTweaker.java:69)] exception Unterminated string at 2049 [character 2050 line 1]
2017-04-28 07:35:35,912 (pool-3-thread-1) [INFO – com.example.flume.interceptors.eventTweaker.intercept(EventTweaker.java:70)] Received this log message that is not formatted in json: