Avro end to end in hdfs – part 4: problems and solutions

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume
  3. How to use avro with hive
  4. Problems and solutions (This post)

Invalid/non standard schemas

The avro tools available for different languages are not all exactly equivalent. The default one for java used in Hadoop, for instance, has issues when some fields can be set to null. Nested array are another issues in a lot of cases. The default avro parser from java cannot handle them properly. Furthermore, if you end up finding a way to generate avro files with nested arrays, some tools will not be able to read them. Hive will be fine, but Impala (as of version 1.2) is not able to read them.

I can only urge you to use simple schemas, this will make your life a lot easier.

Hive partitions and schema changes

If you use Hive partitions (and you should), all data in one specific partition must have the same schema. We used to have partitions per hour when loading some logs, but now we are actually adding the avro schema version in the partition path. That way, data encoded in a new schema will end up in a different partition even if data is related to the same hour.

Faster encoding and flexibility

We started loading data the standard way, via flume. This created a lot of issues as explained earlier (nested arrays mostly), and flume was actually using a lot of resources. We ended up using the json2avro C tool, which is very fast and can handle nested arrays (but this bit us later because of impala). This tool generates avro files which we load in hdfs via a hdfs fuse mount point. This improved performance drastically. Since we are using this fuse mountpoint, we had no data loading issues or delay, whereas we had trouble every other week while using flume.

Default values

We started with writing a schema with default values. Sadly, we ended up noticing that JSON is only a convenient representation of data useful for debugging but is not the main purpose of avro.

This means that representing a missing source field in an avro schema can be done that way:

{"valid": {"boolean": true}, "source": null}

but a JSON document actually missing this field is not valid.

Avro end to end in hdfs – part 3: Hive

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume
  3. How to use avro with hive (this post)
  4. Problems and solutions

Use avro in Hive

Once your table is created, and data is loaded, there is nothing extra to do, you can just query it as you would any other table.

Create the table

Creating the table can be done as follow, with some comments:

-- table name
CREATE EXTERNAL TABLE IF NOT EXISTS table_name

-- Partition according to the end of the path you set in the flume sink (hdfs.path option).
-- Following the example form previous post, we would have
PARTITIONED BY (key STRING)

-- Avro!
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

-- Matches the first part of hdfs.path set up in the flume sink
-- Following the example of the previous post, we would have
LOCATION '/datain/logs'

-- Other options here are to hardcode the schema or use a file on your local filesystem instead.
TBLPROPERTIES ('avro.schema.url'='hdfs:///schemas/schema.avsc');

More information can be found on the cloudera documentation about hive and avro.

Load the snappy jar

To load data, you need to tell Hive that the data files will be compressed, and Hive needs to know how to decompress. For this, you need to add the snappy jar to the list of extra jars loaded by Hive. This is done by adding the path to the snappy jar to the value to the hive.aux.jars.path property of your hive-site.xml. For instance:

<property>
  <name>hive.aux.jars.path</name>
  <value>file:////usr/lib/hive/lib/hive-contrib.jar,...,file:////usr/lib/hive/lib/auxlib/snappy-java-1.0.4.1.jar</value>;
</property>

Actually load data

You need to tell hive to use snappy, which is done the following way:

SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

Then loading data means creating a new partition when a new directory is created with another key. Run this after having told hive to use snappy:

ALTER TABLE table_name
ADD IF NOT EXISTS PARTITION /datain/logs/key=some_new_key
LOCATION '/datain/logs';

Using data with the default schema

If you use a custom schema, tailored to your data, you can then enjoy the full speed of Hive, as not much parsing will be needed by Hive to access your data.

If you use the default schema, then Hive does not know (yet) about the columns in your table. This can be fixed by the decode() function. For instance,

SELECT
hour, decode(body,'UTF-8') as body
FROM my_table

Avro end to end in hdfs – part 2: Flume setup

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume (this post)
  3. How to use avro with hive
  4. Problems and solutions

Set up flume

Believe it or not, this is the easy part.

On the source, there is nothing specific to add, you can carry on as usual.

On the sink here is a sample with comments:

agent.sinks.hdfs.type=hdfs
# Very important, *DO NOT* use CompressedStream. Avro itself will do the compression
agent.sinks.hdfs.hdfs.fileType=DataStream
# *MUST* be set to .avro for Hive to work
agent.sinks.hdfs.hdfs.fileSuffix=.avro
# Of course choose your own path
agent.sinks.hdfs.hdfs.path=hdfs://namenode/datain/logs/key=%{some_partition}
agent.sinks.hdfs.hdfs.writeFormat=Text
# The magic happens here:
agent.sinks.hdfs.serializer=avro_event
agent.sinks.hdfs.serializer.compressionCodec=snappy

Note the hdfs.path. “some_key” might be timestamp, for instance, which could create a new directory every hour. This will be used later in Hive.

Using this configuration will use the default Avro schema, which you can find defined in the flume source:

{
 "type": "record",
 "name": "Event",
 "fields": [{
   "name": "headers",
   "type": {
     "type": "map",
     "values": "string"
   }
 }, {
   "name": "body",
   "type": "bytes"
 }]
}

If you want to use your own custom schema, you need to extend AbstractAvroEventSerializer. This is not very complex, and the default avro event serializer actually extends it already, hardcoding a schema. This is a good example to carry on. You would typically out the schema at an place reachable by the sink, being either hdfs itself or an url. The path could be hardcoded in your class if you have one schema only, or could be passed as a flume header.

If, as in the example, you are using snappy, first make sure that snappy is installed:

# RedHat world:
yum install snappy
# Debian world:
apt-get install libsnappy1

And that’s really it, there is nothing more to do to use the default schema.

Avro end to end in hdfs – part 1: why avro?

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro? (This post)
  2. How to set up avro in flume
  3. How to use avro with hive
  4. Problems and solutions

What is avro?

Avro, an apache project, is a data serialisation system. From the avro wiki, Avro provides:

  • Rich data structures.
  • A compact, fast, binary data format.
  • A container file, to store persistent data.
  • Remote procedure call (RPC).
  • Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages

Why is it better than raw data?

After all, a bit of json or xml would work just as well, right? You could indeed do whatever you do avro with json or xml, but it would be a lot more painful for many reasons. One of the main avro goal is to have self contained data structure. Storing the schema with the data file means that once you get a file, you have all information you need to process it. You can even automatically generate code from the data file itself to further process the data, in a very simple and quick way.

Furthermore, the schema being stored in the data file itself as a preamble, it means that it is not needed to duplicate it for each data line as json or xml would. This results in a file which is a lot smaller for the same data as the json or xml equivalent. For the same reason, data can be stored in an highly efficient binary format instead of plain text, which once more results in smaller size and less time spent in string parsing.

Another goal of avro is to support schema evolution. If your data structure changes over time, you do not want to have to update all your process flow in synch with the schema. With some restrictions, you can update an Avro schema on writers or readers without having to keep them aligned. Avro itself has a set of resolution rules which on well written schemas will provide good defaults or ignore unknown values.

Avro is splittable. This means that it is very easy for HDFS to cut an Avro files in pieces to match HDFS block size boundaries, and have a process running per block. This in turn improves disk usage and processing speed. In a non splittable format, Hadoop could only allocate one process to deal with the whole file, instead of one per block.

It is worth mentioning as well that Avro is well documented, is well integrated in the Hadoop ecosystem and has bindings in many languages.

Which types of compression are available?

Avro on its own will already result in smaller size files than raw data, as explained earlier. You can go further by using compressed avro. Avro supports 2 compression formats, deflate (typically an implementation of zlib) and snappy.

A short comparison would be:

Snappy Deflate
Compression speed faster slower
Compression ratio smaller higher
Splitable yes no
Licence New BSD Old BSD

Even is the compression ratio of snappy is smaller, its goal is to be ‘good enough’. Adding to this that snappy is splitable (you can make deflate splitable but not without extra post processing) and that is the codec which is usually used

What are the other options?

Keeping data in raw format has already been discussed earlier. The other logical options are sequence files, thrift and protocol buffer.

Sequence files do not really compare. It is not language independent as Avro is (it is all java), and schema versioning is not possible.

Thrift (Originated at Facebook, Apache 2.0) and Protocol buffer (originated at Google, BSD) are the main competitors. They all have schema evolution, are splittable, somehow compress data, make processing faster. Usually ProtocolBuffer and Avro are close in size and processing time, with thrift being somewhat bigger and slower.

The main pros of Avro is that the schemas do not need to be compiled (you can just use the json schema wherever you need it) and it is well integrated into Hadoop. Some benchmarks can be found around with better numbers.

Further readings and documentation

Schema evolution in Avro, ProtocolBuffer and thrift, with good technical explanation of how data is stored.

Slides and talk from Igor Anishchenko at Java Tech Talk #1: protocol buffer vs. avro vs. thrift.

Flume ‘Not a data file’ and canary files

Flume played a nasty trick on us recently. The files on a spooldir source were not processed, ending up filling up our disk. Looking a bit at the symptoms, 2 effects were obvious:

  • A lot of canary files appeared, with names like ‘flume-spooldir-perm-check-983522283290612181.canary’
  • Flume.log was swamped with java.io.IOException: Not a data file.

The canary files are created as a part of permission checking of a spooldir, as can be seen on github. The only thing is that they are supposed to be deleted afterwards, as they only are there to see if flume can write on the directory. In our case they were not deleted, because creating a new empty file can be done on a full disk, but writing it needs free space. The check thus errored out before deletion. Those files can safely be deleted.

The “java.io.IOException: Not a data file” exception was due to the presence of a temporary directory holding metadata for processing. This directory is controlled by the trackerDir directive in the definition of the spooldir source in flume.conf (by default .flumespool in the spooldir). We ended up having empty metadata files, which then did not have the 2 bytes that avro (we are using an avro sink) expected to see. There is actually nothing wrong at all with the actual data file, only with the metadatafile. The solution is thus to delete .flumespool and the issue resolved itself (after releasing a bit of space from the disk, of course.)

The root cause of all this was that we used the default deletePolicy of never in flume.conf, meaning that once a file is processed, it is renamed but not removed. We do have a job to remove those processed files, but it failed for while, thus letting the disk fill up. We now delete files directly after processing (deletePolicy: immediate).

Edit: I sent a flume bug and patch, at FLUME-2361. Hopefully this will be merged at some point soon.