Avro end to end in hdfs – part 2: Flume setup

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume (this post)
  3. How to use avro with hive
  4. Problems and solutions

Set up flume

Believe it or not, this is the easy part.

On the source, there is nothing specific to add, you can carry on as usual.

On the sink here is a sample with comments:

agent.sinks.hdfs.type=hdfs
# Very important, *DO NOT* use CompressedStream. Avro itself will do the compression
agent.sinks.hdfs.hdfs.fileType=DataStream
# *MUST* be set to .avro for Hive to work
agent.sinks.hdfs.hdfs.fileSuffix=.avro
# Of course choose your own path
agent.sinks.hdfs.hdfs.path=hdfs://namenode/datain/logs/key=%{some_partition}
agent.sinks.hdfs.hdfs.writeFormat=Text
# The magic happens here:
agent.sinks.hdfs.serializer=avro_event
agent.sinks.hdfs.serializer.compressionCodec=snappy

Note the hdfs.path. “some_key” might be timestamp, for instance, which could create a new directory every hour. This will be used later in Hive.

Using this configuration will use the default Avro schema, which you can find defined in the flume source:

{
 "type": "record",
 "name": "Event",
 "fields": [{
   "name": "headers",
   "type": {
     "type": "map",
     "values": "string"
   }
 }, {
   "name": "body",
   "type": "bytes"
 }]
}

If you want to use your own custom schema, you need to extend AbstractAvroEventSerializer. This is not very complex, and the default avro event serializer actually extends it already, hardcoding a schema. This is a good example to carry on. You would typically out the schema at an place reachable by the sink, being either hdfs itself or an url. The path could be hardcoded in your class if you have one schema only, or could be passed as a flume header.

If, as in the example, you are using snappy, first make sure that snappy is installed:

# RedHat world:
yum install snappy
# Debian world:
apt-get install libsnappy1

And that’s really it, there is nothing more to do to use the default schema.

Flume ‘Not a data file’ and canary files

Flume played a nasty trick on us recently. The files on a spooldir source were not processed, ending up filling up our disk. Looking a bit at the symptoms, 2 effects were obvious:

  • A lot of canary files appeared, with names like ‘flume-spooldir-perm-check-983522283290612181.canary’
  • Flume.log was swamped with java.io.IOException: Not a data file.

The canary files are created as a part of permission checking of a spooldir, as can be seen on github. The only thing is that they are supposed to be deleted afterwards, as they only are there to see if flume can write on the directory. In our case they were not deleted, because creating a new empty file can be done on a full disk, but writing it needs free space. The check thus errored out before deletion. Those files can safely be deleted.

The “java.io.IOException: Not a data file” exception was due to the presence of a temporary directory holding metadata for processing. This directory is controlled by the trackerDir directive in the definition of the spooldir source in flume.conf (by default .flumespool in the spooldir). We ended up having empty metadata files, which then did not have the 2 bytes that avro (we are using an avro sink) expected to see. There is actually nothing wrong at all with the actual data file, only with the metadatafile. The solution is thus to delete .flumespool and the issue resolved itself (after releasing a bit of space from the disk, of course.)

The root cause of all this was that we used the default deletePolicy of never in flume.conf, meaning that once a file is processed, it is renamed but not removed. We do have a job to remove those processed files, but it failed for while, thus letting the disk fill up. We now delete files directly after processing (deletePolicy: immediate).

Edit: I sent a flume bug and patch, at FLUME-2361. Hopefully this will be merged at some point soon.

How to build a full flume interceptor by a non java developer

Building a flume interceptor does not look too complex. You can find some examples, howtos or cookbooks around. The issue is that they all explain the specificities of the interceptor, leaving a python/perl guy like me in the dark about maven, classpaths or imports. This posts aims to correct this. This will describe the bare minimum to get it working, but I link to more documentation if you want to go deeper. You can find the code on github as well.

The interceptor itself

You need to create a directory structure, in which to write the following java code. It needs to be under src/main/java/com/example/flume/interceptors/eventTweaker.java in this example, for an interceptor named com.example.flume.interceptor.eventTweaker.

package com.example.flume.interceptors;

import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import org.apache.log4j.Logger;

public class eventTweaker implements Interceptor {

  private static final Logger LOG = Logger.getLogger(eventTweaker.class);

  // private means that only Builder can build me.
  private eventTweaker() {}

  @Override
  public void initialize() {}

  @Override
  public Event intercept(Event event) {

    Map<String, String> headers = event.getHeaders();

    // example: add / remove headers
    if (headers.containsKey("lice")) {
      headers.put("shampoo", "antilice");
      headers.remove("lice");
    }

    // example: change body
    String body = new String(event.getBody());
    if (body.contains("injuries")) {
      try {
        event.setBody("cyborg".getBytes("UTF-8"));
      } catch (java.io.UnsupportedEncodingException e) {
        LOG.warn(e);
        // drop event completely
        return null;
      }
    }

    return event;
  }

  @Override
  public List<Event> intercept(List<Event> events) {
    for (Event event:events) {
      intercept(event);
    }
    return events;
  }

  @Override
  public void close() {}

  public static class Builder implements Interceptor.Builder {

    @Override
    public Interceptor build() {
      return new eventTweaker();
    }

    @Override
    public void configure(Context context) {}
  }
}

Maven configuration

To build the above, you need a pom.xml, to be put at the root of the aforementioned tree. The following is bare minimum which would probably make any java developer cringe, but It Works™.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>org.apache.flume</groupId>
 <artifactId>eventTweaker</artifactId>
 <version>1.0</version>

 <repositories>
 <repository>
 <id>central</id>
 <name>Maven Central</name>
 <url>http://repo1.maven.org/maven2/</url>
 </repository>
 <repository>
 <id>cloudera-repo</id>
 <name>Cloudera CDH</name>
 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
 </repository>
 </repositories>

 <properties>
 <flume.version>1.4.0-cdh4.4.0</flume.version>
 </properties>

 <dependencies>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-sdk</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>log4j</groupId>
 <artifactId>log4j</artifactId>
 <version>1.2.16</version>
 <exclusions>
 <exclusion>
 <groupId>com.sun.jdmk</groupId>
 <artifactId>jmxtools</artifactId>
 </exclusion>
 <exclusion>
 <groupId>com.sun.jmx</groupId>
 <artifactId>jmxri</artifactId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>4.11</version>
 </dependency>
 </dependencies>

</project>

Once done, you can just type

mvn package

and a jar will be created for you under the target directory.

Tree example

After compilation, here is how my directory structure looks like:

.
├── pom.xml
├── src
│   └── main
│       └── java
│           └── com
│               └── example
│                   └── flume
│                       └── interceptor
│                           └── eventTweaker.java
└── target
    ├── classes
    │   └── com
    │       └── example
    │           └── flume
    │               └── interceptors
    │                   ├── eventTweaker$1.class
    │                   ├── eventTweaker$Builder.class
    │                   └── eventTweaker.class
    ├── maven-archiver
    │   └── pom.properties
    └── eventTweaker-1.0.jar

Jar destination

You need to put the jar (in this case target/eventTweaker-1.0.jar) in the flume classpath for it to be loaded. You can do so in 2 ways:

  1. By editing the FLUME_CLASSPATH variable in the flume-env.sh file
  2. By adding it under the $FLUME_HOME/plugins.d directory. In this directory, put your jar at  your_interceptor_name/lib/yourinterceptor.jar and voilà, it will be loaded automagically.

Flume configuration

Debuging

You can see log statements in the usual flume.log file, probably under /var/log/flume-ng/flume.log

flume.conf

This is the easiest bit. On your source definition, you just have to add an interceptor name and class:

agent.sources.yoursource.interceptors=tweaker
agent.sources.yoursource.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder