Automated Hadoop setup with ambari

Hadoop is complex to configure, this is not new. Cluster managers like ambari help, of course, but finding the sweet configuration spot is not easy, especially as the spot will be different per use case.

My cluster is almost yarn/tez only currently, so I wrote this python script which will look at the whole cluster via the ambari API and configure a good chunk of it, depending on the number of disks, CPU, ram and based on documentation I found scattered all around. It works great even on small clusters and will tell you the reason behind the values of the configuration settings.

The caveats are that LLAP is not yet managed (it was attempted so the option is there, but it does not do anything) and that it assumes that all datanodes are identical.

The default run mode is read-only but you can ask the script to actually update ambari. You will still read to restart the relevant updated services yourself, on your own time (this is of course just one click in the ambari UI).

An example showing only what my script thinks is not correct:

./settings.py --tofix hadoop.example.com

Basic info

Yarn config.
✘ yarn-site/yarn.scheduler.minimum-allocation-mb = 768, expects 1024 (Min container size.) #75%
✘ capacity-scheduler/yarn.scheduler.capacity.resource-calculator = org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator, expects org.apache.hadoop.yarn.util.resource.DominantResourceCalculator (Take all resources in account, not only RAM) #0%

A full run:

./settings.py hadoop.example.com

Basic info
FYI – { ‘ip-10-0-0-001.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-002.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-003.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-004.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-005.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-006.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566633984},
‘ip-10-0-0-007eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-008.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080}}: Data nodes.
FYI – {‘cpu’: 64, ‘disk’: 104, ‘mem’: 268533100544}: Total cluster resources.
FYI – 1024: Min container size (MB), based on amount of ram/cpu in the cluster.
FYI – 128: Number of containers based on recommendations.
FYI – 0.55: Default queue capacity.

Yarn config.
✔ yarn-site/yarn.nodemanager.resource.memory-mb = 24008, expects 24008 (min(yarn memory for one DN) * 0.75.) #100%
✘ yarn-site/yarn.scheduler.minimum-allocation-mb = 768, expects 1024 (Min container size.) #75%
✔ yarn-site/yarn.scheduler.maximum-allocation-mb = 24008, expects 24008 (Same as yarn.nodemanager.resource.memory-mb) #100%
✔ yarn-site/yarn.nodemanager.resource.cpu-vcores = 7, expects 7 (Assuming the cluster in yarn only. Total cores per node -1) #100%
✔ yarn-site/yarn.scheduler.maximum-allocation-vcores = 7, expects 7 (Assuming the cluster in yarn only. Total cores per node -1) #100%
✘ capacity-scheduler/yarn.scheduler.capacity.resource-calculator = org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator, expects org.apache.hadoop.yarn.util.resource.DominantResourceCalculator (Take all resources in account, not only RAM) #0%
Map/reduce config
✔ mapred-site/mapreduce.map.memory.mb = 1024, expects 1024 (Min container size) #100%
✔ mapred-site/mapreduce.reduce.memory.mb = 2048, expects 2048 (2 * min container size) #100%
✔ mapred-site/mapreduce.map.java.opts = 819, expects 819 (0.8 * min container size) #100%
✔ mapred-site/mapreduce.reduce.java.opts = 1638, expects 1638 (0.8 * mapreduce.reduce.memory.mb) #100%
✔ mapred-site/yarn.app.mapreduce.am.resource.mb = 2048, expects 2048 (2 * min container size) #100%
✔ mapred-site/yarn.app.mapreduce.am.command-opts = 1638, expects 1638 (0.8 * yarn.app.mapreduce.am.resource.mb) #100%
✔ mapred-site/mapreduce.task.io.sort.mb = 409, expects 409 (0.4 * min container size) #100%

Hive and Tez configuration
✔ hive-site/hive.execution.engine = tez, expects tez (Use Tez, not map/reduce.) #100%
✔ hive-site/hive.server2.enable.doAs = false, expects false (All queries will run as Hive user, allowing resource sharing/reuse.) #100%
✔ hive-site/hive.optimize.index.filter = true, expects true (This optimizes “select statement with where clause” on ORC tables) #100%
✔ hive-site/hive.fetch.task.conversion = more, expects more (This optimizes “select statement with limit clause;”) #100%
✔ hive-site/hive.compute.query.using.stats = true, expects true (This optimizes “select count (1) from table;” ) #100%
✔ hive-site/hive.vectorized.execution.enabled = true, expects true (Perform operations in batch instead of single row) #100%
✔ hive-site/hive.vectorized.execution.reduce.enabled = true, expects true (Perform operations in batch instead of single row) #100%
✔ hive-site/hive.cbo.enable = true, expects true (Enable CBO. You still need to prepare it by using the analyse HQL command.) #100%
✔ hive-site/hive.compute.query.using.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.fetch.column.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.fetch.partition.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.autogather = true, expects true (Use CBO.) #100%
✔ hive-site/hive.server2.tez.default.queues = default, expects ‘lambda x: config.queue in x’ (Must contain the queue name) #100%
✔ hive-site/hive.tez.dynamic.partition.pruning = true, expects true (Make sure tez can prune whole partitions) #100%
✔ hive-site/hive.exec.parallel = true, expects true (Can Hive subqueries be executed in parallel) #100%
✔ hive-site/hive.auto.convert.join = true, expects true (use map joins as much as possible) #100%
✔ hive-site/hive.auto.convert.join.noconditionaltask = true, expects true (Use map joins for small datasets) #100%
✔ hive-site/hive.tez.container.size = 4096, expects 4096 (Multiple of min container size.) #100%
✔ hive-site/hive.auto.convert.join.noconditionaltask.size = 1417339207, expects 1417339207 (Threshold to perform map join. 1/3 * hive.tez.container.size.) #100%
✔ hive-site/hive.vectorized.groupby.maxentries = 10240, expects 10240 (Reduces execution time on small datasets, but also OK for large ones.) #100%
✔ hive-site/hive.vectorized.groupby.flush.percent = 0.1, expects 0.1 (Reduces execution time on small datasets, but also OK for large ones.) #100%
✔ hive-site/hive.server2.tez.initialize.default.sessions = true, expects true (Enable tez use without session pool if requested) #100%
✔ hive-site/hive.server2.tez.sessions.per.default.queue = 3, expects 3 (Number of parallel execution inside one queue.) #100%

Hive and Tez memory
✔ tez-site/tez.am.resource.memory.mb = 1024, expects 1024 (Appmaster memory == min container size.) #100%
✔ tez-site/tez.am.container.reuse.enabled = true, expects true (Reuse tez containers to prevent reallocation.) #100%
✔ tez-site/tez.container.max.java.heap.fraction = 0.8, expects 0.8 (default % of memory used for java opts) #100%
✔ tez-site/tez.runtime.io.sort.mb = 1024, expects 1024 (memory when the output needs to be sorted. == 0.25 * tezContainerSize (up to 40%)) #100%
✔ tez-site/tez.runtime.unordered.output.buffer.size-mb = 307, expects 307 (Memory when the output does not need to be sorted. 0.075 * hive.tez.container.size (up to 10%).) #100%
✔ tez-site/tez.task.resource.memory.mb = 1024, expects 1024 (Mem to be used by launched taks. == min container size. Overriden by hive to hive.tez.container.size anyway.) #100%
✔ tez-site/tez.task.launch.cmd-opts = 819, expects 819 (xmx = 0.8 * minContainerSize) #100%
✔ hive-site/hive.tez.java.opts = 3276, expects 3276 (xmx = 0.8 * tezContainerSize) #100%
✔ hive-site/hive.prewarm.enabled = true, expects true (Enable prewarm to reduce latency) #100%
✔ hive-site/hive.prewarm.numcontainers = 3, expects ‘lambda x: x >= 1’ (Hold containers to reduce latency, >= 1) #100%
✔ tez-site/tez.session.am.dag.submit.timeout.secs = 300, expects 300 (Tez Application Master waits for a DAG to be submitted before shutting down. Only useful when reuse is enabled.) #100%
✔ tez-site/tez.am.container.idle.release-timeout-min.millis = 10000, expects 10000 (Tez container min wait before shutting down. Should give enough time to an app to send the next query) #100%
✔ tez-site/tez.am.container.idle.release-timeout-max.millis = 20000, expects 20000 (Tez container min wait before shutting down) #100%
✔ tez-site/tez.am.view-acls = *, expects * (Enable tz ui access) #100%
✔ yarn-site/yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes = org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl, expects org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl (Set up tez UI) #100%
✔ mapred-site/mapreduce.job.acl-view-job = *, expects * (Enable tez ui for mapred jobs) #100%

Compress all
✔ mapred-site/mapreduce.map.output.compress = true, expects true #100%
✔ mapred-site/mapreduce.output.fileoutputformat.compress = true, expects true #100%
✔ hive-site/hive.exec.compress.intermediate = true, expects true #100%
✔ hive-site/hive.exec.compress.output = true, expects true #100%

Queue configuration. Assuming queue default is subqueue from root. Note that undefined values are inherited from parent.
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.maximum-am-resource-percent = 0.2, expects ‘lambda x: x != ‘NOT FOUND’ and float(x) >= 0.2′ (How much of the Q the AM can use. Must be at least 0.2.) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.ordering-policy = fair, expects fair (Helps small queries get a chunk of time between big ones) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.user-limit-factor = 2, expects ‘lambda x: x != ‘NOT FOUND’ and int(x) >= 1′ (How much of the Q capacity the user can exceed if enough resources. Should be at leat 1. 1=100%, 2=200%…) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.minimum-user-limit-percent = 10, expects ‘lambda x: x != ‘NOT FOUND’ and int(x) >= 10′ (How much of the Q in percent a user is guaranteed to get. Should be at least 10) #100%

Random stuff
✔ hdfs-site/dfs.client.use.datanode.hostname = true, expects true (For AWS only) #100%

LLAP
✔ hive-interactive-env/enable_hive_interactive = false, expects false (Disable LLAP) #100%
FYI –

More doc can be found at:
Memory settings:
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_command-line-installation/content/determine-hdp-memory-config.html
https://community.hortonworks.com/articles/14309/demystify-tez-tuning-step-by-step.html
Hive performance tuning:
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_hive-performance-tuning/content/ch_hive-perf-tuning-intro.html
http://pivotalhd.docs.pivotal.io/docs/performance-tuning-guide.html
https://www.justanalytics.com/blog/hive-tez-query-optimization
llap:
https://community.hortonworks.com/questions/84636/llap-not-using-io-cache.html

Will not update the unexpected parameters without –update.

 

 

 

Advertisement

Hadoop metrics in graphite

I will not present graphite here, if you end up reading this I assume you already have a graphite instance up and running. If not it is a matter of less than an hour to have a usable instance.

Hadoop uses metrics2 which allows multiple metrics output plugins to be used in parallel, supports dynamic reconfiguration of metrics plugins, provides metrics filtering, and allows all metrics to be exported via JMX.

Those metrics can be very easily exported to graphite to then be sliced and diced to your heart’s content.

You only need to modify the file hadoop-metrics2.properties by adding the following snippet:

# Sampling period
*.period=10

# Grahite sink class
*.sink.graphite.class=org.apache.hadoop.metrics2.sink.GraphiteSink

# Location of your graphite instance
*.sink.graphite.server_host=10.x.x.x
*.sink.graphite.server_port=2003

# Define for each metric group (* in *.prefix) how it should be named
# in graphite (part after the =)
datanode.sink.graphite.metrics_prefix=hadoop.datanode
namenode.sink.graphite.metrics_prefix=hadoop.namenode
resourcemanager.sink.graphite.metrics_prefix=hadoop.resourcemanager
nodemanager.sink.graphite.metrics_prefix=hadoop.nodemanager
jobhistoryserver.sink.graphite.metrics_prefix=hadoop.jobhistoryserver
journalnode.sink.graphite.metrics_prefix=hadoop.journalnode
maptask.sink.graphite.metrics_prefix=hadoop.maptask
reducetask.sink.graphite.metrics_prefix=hadoop.reducetask
applicationhistoryserver.sink.graphite.metrics_prefix=hadoop.applicationhistoryserver

In Ambari, just go to HDFS > Config > Advanced hadoop-metrics2.properties, the location for other distributions should be trivial to find.

After that restart hdfs and all relevant services you asked to monitor (if you asked to monitor resourcemanager, restart the resource managers and so on).

That’s it, you’re set.

If you are on HDP, you can go a bit further. HDP actually ships with a grafana instance (if you installed Ambari metrics) which can use graphite a data source. Data will be the same, display will be a tad prettier.

This uses graphite web (port 80 per default) which needs to enable CORS. You can do it in apache (the default graphite web http server) by adding this line in your graphite vhost:

Header set Access-Control-Allow-Origin "*"

 

Hortonworks, Cloudera or MapR?

This is one of the big questions when you start your first Hadoop project. Hadoop is Hadoop, right? So it should not matter which distribution you use? There is some truth is there, but there still are quite a few differences between these vendors, worth knowing about. After all, Linux is Linux, right? Debian or Redhat should not matter? You can just straight away to the quick answer, or carry on reading for more details.

Generalities

If you want to know more about Hadoop itself, you can check out the official Apache site, or just the wikipedia page for history and so on.

There are 3 big Hadoop distributions. Apache Hadoop itself, the root of them all, is not a distribution per se, as you can download each components individually but a lot of elbow grease is needed to tie everything together. The 3 main vendors bundle Apache Hadoop with other tools, open source as well as their own proprietary bricks to create distributions. Those are Cloudera, MapR and Hortonworks. There are other vendors as well, Microsoft (HdInsight, cloud only), Pivotal (Pivotal HD) and other I forget, but I concentrate on the big 3 here.

Quick answer

Use MapR if:

  • Performance is paramount,
  • You are a big company with strong audit requirements,
  • You know you will pay a licence for support.

Use Hortonworks if:

  • Open source is very important to you,
  • You do not want to pay for a licence but still want to do as much as possible (including security, authorisation),
  • You already have a datawarehouse (Terradata, Oracle, Vertica…) that you plan to carry on using but could offload or which does not allow all processing you plan to do.

Use Cloudera if:

  • You need to be PCI compliant
  • You want as much as possible automated for you, at the potential cost of a licence

Longer answer and description

A generic comment first. If you already plan to use some specific tools or Linux distributions, make sure that they are compatible for your version. For instance Tez does not run on Coudera, Impala would have problems on Hortonworks, and MapR does not support Debian (but Ubuntu).

MapR

MapR biggest differentiators are its filesystem and database, said to improve a lot the overall performance because it is highly optimised and skips the jvm and ext4 layers, while still being compatible with HDFS and HBase APIs. Their filesystem is a real filesystem, not append-only as HDFS is, and can be mounted via NFS which makes some administration tasks much easier.

MapR strives to support the whole Hadoop ecosystem (for instance Tez, Impala, Spark…) which on paper means that more tools should be supported by MapR than by the other distributions.

MapR is the only one to support volumes, which can give you very strong security and multi-tenancy, as you can control with a very fine grain who can access which volume.

On the bad side, MapR is pretty limited in its free version. HA for instance is only available with a licence. (EDIT: see comment from Anoop Dawar below, failover is now part of M3. the free version.)

As a nice starting point, you can spawn AWS instances configured for MapR, where the cost includes licence and support, without having to commit for a year. Usually AWS instances are about 2 months after the main MapR release due to extra testing and procedures.

Cloudera

Cloudera is the oldest Hadoop distribution. Their vision is to fully replace the warehouse by creating an Enterprise Data Hub and help the user a lot on the way.

The biggest strength of Cloudera is their automation. Cloudera manager and Navigator are amazing tools doing a lot for you, and are said to be superior to the equivalent of other distributions. That said, they are closed source, and although the manager is available for free, the navigator (security, governance) is not.

Another very strong point of Cloudera is Impala, a very fast open-source in-memory SQL database.

Cloudera is the only PCI-compliant distribution.

Cloudera claims to have more Hadoop (and associated tools) committers on payroll than any other distribution.

Hortonworks

Hortonworks vision is not to fully replace a warehouse, but to use existing warehouse to provide offloading or new processes, thanks to the integration with multiple partners.

Hortonworks is a fully open source distribution. There is no licence to pay, only support if you so wish. The definition of open source for Hortonworks is very strict. For them open source means managed by a committee to not have ‘dictatorial’ open-source, where a project is technically open source, but only one company can accept (and usually refuses) contributions.

Ambari is the management tool for Hortonworks. Although it is quite new and did not have all the features you would want from a manager, it is improving at great speed and is supported by multiple organisations, thanks to being open-source.

Hortonworks supports Debian, but with an extra 1-month delay due to extra tests needed in comparison with the standard Redhat/CentOS version.

Hortonworks claims to have more Hadoop (and associated tools) committers on payroll than any other distribution.

Price comparison

This is always a big question, isn’t it? Here are a few prices I could gather. Those are just ballpark figures, and could of course be negociated.

MapR support (24/7) is around 4k$/server/year. This goes up to 6k if you want to include MapRDB as well. This include licence and support.

Cloudera support (24/7) is around 6.5k€/server/year. This includes server and licence. Note that Cloudera has multiple options, where you can elect to have full support (Enterprise), support for only one element (Flex) or support for only the core Hadoop, ie. HDFS, Hive and the like (Basic). Flex and enterprise provide the Navigator, but Basic is very cheap (500€/server/year).

Hortonworks does not provide a licence as it is fully opensource, but support (24/7) is priced at about 3.5k€/server/year.

Vendor lockin

This is usually a big concern, specially when talking about non open-source tools. I would claim that it is a non-problem.

Your data is always available via standard tools, and that is what matters the most. You will always be able to retrieve or export it in multiple ways. The rest (administration basically) is tied to your distribution anyway. If you do everything with the source Apache and puppet, use Ambari, Cloudera or MapR manager, it is not transferable to the other tool. In short, you are locked – administration-wise – anyway.

Tutorial: Install CDH 5 for testing on one machine

This is a tutorial after my own experience to install CDH 5.4 via the Cloudera Manager on one machine only for test purposes. This is based on a Mint machine (based on Ubuntu/Debian). Commands will thus be given with apt-get, you can probably just replace apt-get by yum if you are trying to do this on a Redhat-based server.

Preparation

ssh

Install ssh server on your machine:

apt-get install openssh-server

Make sure you can connect as root if you do no want everything to run under one user, which is a question which will be asked during the installation process (screen 3). Running all under one user is nice for a one-machine test, but I believe you might run into issues if you later want to extend your cluster. For this reason I chose the normal, multi user (hdfs, hadoop and so on) installation. Cloudera actually gives a warning for the single user installation:

The major benefit of this option is that the Agent does not run as root. However, this mode complicates installation, which is described fully in the documentation. Most notably, directories which in the regular mode are created automatically by the Agent, must be created manually on every host with appropriate permissions, and sudo (or equivalent) access must be set up for the configured user.

On my machine, I for instance needed to update /etc/ssh/sshd_config to have the line :

PermitRootLogin yes

Other packages

For the heartbeat, you need supervisor and the command ntpdc:

apt-get install supervisor ntp

Supported platforms

Officially, Cloudera can install on some versions  of Debian or Ubuntu. If you use a derivative, it might work (YMMV), but Cloudera will refuse to install. You can fool the installer by changing the lsb-release file:

sudo mv /etc/lsb-release /etc/lsb-release.orig
sudo ln -s /etc/upstream-release/lsb-release /etc/lsb-release
# After installation you can revert with:
sudo rm /etc/lsb-release
sudo mv /etc/lsb-release.orig /etc/lsb-release

Installation

Follow the documentation from cloudera:

wget http://archive.cloudera.com/cm5/installer/latest/cloudera-manager-installer.bin
chmod u+x cloudera-manager-installer.bin
sudo ./cloudera-manager-installer.bin

Note that it will install the oracle JDK (1.7 for CDH 5.4.0), and postgres. A the end your browser should open and connect you to http://localhost:7180. Do not panic if the connection cannot be established at first. Try again in a minute or two, to give the servers enough time to properly startup. Note that if your machine is not very powerful, it can take 2 minutes. The username and password there are admin/admin.

Problems/Tips

IP address

Click a few times continue, and you will be asked to enter an IP address. As you are only testing on your machine, type yours, which you can find via hostname -I in your terminal. Make sure to use your real IP, not 127.0.0.1. The reason is that if later you extend your cluster with another node, and this node number 2 (n2) wants to access node number 1 (n1), it would try to access n1 via 127.0.0.1, which would of course point to n2 itself. This is a general good practice. As a host will be added to the cloudera manager if it heartbeats, a partial installation might make a ghost host (localhost) appear in ‘Currently Managed Host’. In that case, make sure they are not selected before carrying on.

Acquiring installation lock

If you are blocked on ‘Acquiring installation lock’. Click ‘Abort’, then:

rm -rf /tmp/scm_prepare*
rm -f /tmp/.scm_prepare_node.lock
# if above is not enough:
service cloudera-scm-agent restart
service cloudera-scm-server-db restart
service cloudera-scm-server restart

and ‘retry failed host’

Full restart

If like me you screwed up everything, you can always uninstall everything (make sure to say yes when asked to delete the database files). Cloudera explains (parts of) what to do,  but the violent and complete way is as follow, to do as root:

/usr/share/cmf/uninstall-cloudera-manager.sh

# kill any PID listed by this ps below:
ps aux | grep cloudera
# this command does it automatically
kill $(ps ax --format pid,command | grep cloudera | sed -r 's/^\s*([0-9]+).*$/\1/')
# purge all cloudera packages
apt-get purge cloudera-manager-server-db-2 cloudera-manager-server cloudera-manager-daemons cloudera-manager-agent 
# I am not so sure when this one is installed or not:
apt-get purge cloudera-manager-repository
# your choice, would clean up orphaned packages (postgres)
apt-get autoremove
# purge all droppings
rm -rf /etc/cloudera*
rm -rf /tmp/scm_prepare*
rm -f /tmp/.scm_prepare_node.lock
rm -rf /var/lib/cloudera*
rm -rf /var/log/cloudera*
rm -rf /usr/share/cmf
rm -rf /var/cache/yum/cloudera*
rm -rf /usr/lib/cmf

Could not connect to host monitor

After all is done with success everywhere, you go back to the home page and you see a lot of sad empty graphs with ‘query error’. This means that the management services are not running.

You can easily fix this by clicking on the top left ‘Add Cloudera Management Service’, and following the wizard from there.

Avro end to end in hdfs – part 4: problems and solutions

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume
  3. How to use avro with hive
  4. Problems and solutions (This post)

Invalid/non standard schemas

The avro tools available for different languages are not all exactly equivalent. The default one for java used in Hadoop, for instance, has issues when some fields can be set to null. Nested array are another issues in a lot of cases. The default avro parser from java cannot handle them properly. Furthermore, if you end up finding a way to generate avro files with nested arrays, some tools will not be able to read them. Hive will be fine, but Impala (as of version 1.2) is not able to read them.

I can only urge you to use simple schemas, this will make your life a lot easier.

Hive partitions and schema changes

If you use Hive partitions (and you should), all data in one specific partition must have the same schema. We used to have partitions per hour when loading some logs, but now we are actually adding the avro schema version in the partition path. That way, data encoded in a new schema will end up in a different partition even if data is related to the same hour.

Faster encoding and flexibility

We started loading data the standard way, via flume. This created a lot of issues as explained earlier (nested arrays mostly), and flume was actually using a lot of resources. We ended up using the json2avro C tool, which is very fast and can handle nested arrays (but this bit us later because of impala). This tool generates avro files which we load in hdfs via a hdfs fuse mount point. This improved performance drastically. Since we are using this fuse mountpoint, we had no data loading issues or delay, whereas we had trouble every other week while using flume.

Default values

We started with writing a schema with default values. Sadly, we ended up noticing that JSON is only a convenient representation of data useful for debugging but is not the main purpose of avro.

This means that representing a missing source field in an avro schema can be done that way:

{"valid": {"boolean": true}, "source": null}

but a JSON document actually missing this field is not valid.

Avro end to end in hdfs – part 3: Hive

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume
  3. How to use avro with hive (this post)
  4. Problems and solutions

Use avro in Hive

Once your table is created, and data is loaded, there is nothing extra to do, you can just query it as you would any other table.

Create the table

Creating the table can be done as follow, with some comments:

-- table name
CREATE EXTERNAL TABLE IF NOT EXISTS table_name

-- Partition according to the end of the path you set in the flume sink (hdfs.path option).
-- Following the example form previous post, we would have
PARTITIONED BY (key STRING)

-- Avro!
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

-- Matches the first part of hdfs.path set up in the flume sink
-- Following the example of the previous post, we would have
LOCATION '/datain/logs'

-- Other options here are to hardcode the schema or use a file on your local filesystem instead.
TBLPROPERTIES ('avro.schema.url'='hdfs:///schemas/schema.avsc');

More information can be found on the cloudera documentation about hive and avro.

Load the snappy jar

To load data, you need to tell Hive that the data files will be compressed, and Hive needs to know how to decompress. For this, you need to add the snappy jar to the list of extra jars loaded by Hive. This is done by adding the path to the snappy jar to the value to the hive.aux.jars.path property of your hive-site.xml. For instance:

<property>
  <name>hive.aux.jars.path</name>
  <value>file:////usr/lib/hive/lib/hive-contrib.jar,...,file:////usr/lib/hive/lib/auxlib/snappy-java-1.0.4.1.jar</value>;
</property>

Actually load data

You need to tell hive to use snappy, which is done the following way:

SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

Then loading data means creating a new partition when a new directory is created with another key. Run this after having told hive to use snappy:

ALTER TABLE table_name
ADD IF NOT EXISTS PARTITION /datain/logs/key=some_new_key
LOCATION '/datain/logs';

Using data with the default schema

If you use a custom schema, tailored to your data, you can then enjoy the full speed of Hive, as not much parsing will be needed by Hive to access your data.

If you use the default schema, then Hive does not know (yet) about the columns in your table. This can be fixed by the decode() function. For instance,

SELECT
hour, decode(body,'UTF-8') as body
FROM my_table

Avro end to end in hdfs – part 2: Flume setup

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume (this post)
  3. How to use avro with hive
  4. Problems and solutions

Set up flume

Believe it or not, this is the easy part.

On the source, there is nothing specific to add, you can carry on as usual.

On the sink here is a sample with comments:

agent.sinks.hdfs.type=hdfs
# Very important, *DO NOT* use CompressedStream. Avro itself will do the compression
agent.sinks.hdfs.hdfs.fileType=DataStream
# *MUST* be set to .avro for Hive to work
agent.sinks.hdfs.hdfs.fileSuffix=.avro
# Of course choose your own path
agent.sinks.hdfs.hdfs.path=hdfs://namenode/datain/logs/key=%{some_partition}
agent.sinks.hdfs.hdfs.writeFormat=Text
# The magic happens here:
agent.sinks.hdfs.serializer=avro_event
agent.sinks.hdfs.serializer.compressionCodec=snappy

Note the hdfs.path. “some_key” might be timestamp, for instance, which could create a new directory every hour. This will be used later in Hive.

Using this configuration will use the default Avro schema, which you can find defined in the flume source:

{
 "type": "record",
 "name": "Event",
 "fields": [{
   "name": "headers",
   "type": {
     "type": "map",
     "values": "string"
   }
 }, {
   "name": "body",
   "type": "bytes"
 }]
}

If you want to use your own custom schema, you need to extend AbstractAvroEventSerializer. This is not very complex, and the default avro event serializer actually extends it already, hardcoding a schema. This is a good example to carry on. You would typically out the schema at an place reachable by the sink, being either hdfs itself or an url. The path could be hardcoded in your class if you have one schema only, or could be passed as a flume header.

If, as in the example, you are using snappy, first make sure that snappy is installed:

# RedHat world:
yum install snappy
# Debian world:
apt-get install libsnappy1

And that’s really it, there is nothing more to do to use the default schema.

Avro end to end in hdfs – part 1: why avro?

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro? (This post)
  2. How to set up avro in flume
  3. How to use avro with hive
  4. Problems and solutions

What is avro?

Avro, an apache project, is a data serialisation system. From the avro wiki, Avro provides:

  • Rich data structures.
  • A compact, fast, binary data format.
  • A container file, to store persistent data.
  • Remote procedure call (RPC).
  • Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages

Why is it better than raw data?

After all, a bit of json or xml would work just as well, right? You could indeed do whatever you do avro with json or xml, but it would be a lot more painful for many reasons. One of the main avro goal is to have self contained data structure. Storing the schema with the data file means that once you get a file, you have all information you need to process it. You can even automatically generate code from the data file itself to further process the data, in a very simple and quick way.

Furthermore, the schema being stored in the data file itself as a preamble, it means that it is not needed to duplicate it for each data line as json or xml would. This results in a file which is a lot smaller for the same data as the json or xml equivalent. For the same reason, data can be stored in an highly efficient binary format instead of plain text, which once more results in smaller size and less time spent in string parsing.

Another goal of avro is to support schema evolution. If your data structure changes over time, you do not want to have to update all your process flow in synch with the schema. With some restrictions, you can update an Avro schema on writers or readers without having to keep them aligned. Avro itself has a set of resolution rules which on well written schemas will provide good defaults or ignore unknown values.

Avro is splittable. This means that it is very easy for HDFS to cut an Avro files in pieces to match HDFS block size boundaries, and have a process running per block. This in turn improves disk usage and processing speed. In a non splittable format, Hadoop could only allocate one process to deal with the whole file, instead of one per block.

It is worth mentioning as well that Avro is well documented, is well integrated in the Hadoop ecosystem and has bindings in many languages.

Which types of compression are available?

Avro on its own will already result in smaller size files than raw data, as explained earlier. You can go further by using compressed avro. Avro supports 2 compression formats, deflate (typically an implementation of zlib) and snappy.

A short comparison would be:

Snappy Deflate
Compression speed faster slower
Compression ratio smaller higher
Splitable yes no
Licence New BSD Old BSD

Even is the compression ratio of snappy is smaller, its goal is to be ‘good enough’. Adding to this that snappy is splitable (you can make deflate splitable but not without extra post processing) and that is the codec which is usually used

What are the other options?

Keeping data in raw format has already been discussed earlier. The other logical options are sequence files, thrift and protocol buffer.

Sequence files do not really compare. It is not language independent as Avro is (it is all java), and schema versioning is not possible.

Thrift (Originated at Facebook, Apache 2.0) and Protocol buffer (originated at Google, BSD) are the main competitors. They all have schema evolution, are splittable, somehow compress data, make processing faster. Usually ProtocolBuffer and Avro are close in size and processing time, with thrift being somewhat bigger and slower.

The main pros of Avro is that the schemas do not need to be compiled (you can just use the json schema wherever you need it) and it is well integrated into Hadoop. Some benchmarks can be found around with better numbers.

Further readings and documentation

Schema evolution in Avro, ProtocolBuffer and thrift, with good technical explanation of how data is stored.

Slides and talk from Igor Anishchenko at Java Tech Talk #1: protocol buffer vs. avro vs. thrift.

Flume ‘Not a data file’ and canary files

Flume played a nasty trick on us recently. The files on a spooldir source were not processed, ending up filling up our disk. Looking a bit at the symptoms, 2 effects were obvious:

  • A lot of canary files appeared, with names like ‘flume-spooldir-perm-check-983522283290612181.canary’
  • Flume.log was swamped with java.io.IOException: Not a data file.

The canary files are created as a part of permission checking of a spooldir, as can be seen on github. The only thing is that they are supposed to be deleted afterwards, as they only are there to see if flume can write on the directory. In our case they were not deleted, because creating a new empty file can be done on a full disk, but writing it needs free space. The check thus errored out before deletion. Those files can safely be deleted.

The “java.io.IOException: Not a data file” exception was due to the presence of a temporary directory holding metadata for processing. This directory is controlled by the trackerDir directive in the definition of the spooldir source in flume.conf (by default .flumespool in the spooldir). We ended up having empty metadata files, which then did not have the 2 bytes that avro (we are using an avro sink) expected to see. There is actually nothing wrong at all with the actual data file, only with the metadatafile. The solution is thus to delete .flumespool and the issue resolved itself (after releasing a bit of space from the disk, of course.)

The root cause of all this was that we used the default deletePolicy of never in flume.conf, meaning that once a file is processed, it is renamed but not removed. We do have a job to remove those processed files, but it failed for while, thus letting the disk fill up. We now delete files directly after processing (deletePolicy: immediate).

Edit: I sent a flume bug and patch, at FLUME-2361. Hopefully this will be merged at some point soon.