Hive: add a column pitfalls

Adding a column to an existing table is easy:


ALTER TABLE tbl ADD COLUMNS (new_col TIMESTAMP)

Easy right? Not always.

As the doc says,

The column change command will only modify Hive’s metadata, and will not modify data. Users should make sure the actual data layout of the table/partition conforms with the metadata definition.

What this means is that this command will change the table metadata, but not the partition metadata and this column will appear as NULL in select queries.

The solution is then easy, just add the CASCADE keyword:


ALTER TABLE tbl ADD COLUMNS (new_col TIMESTAMP) CASCADE

Then partitions will be updated as well.

Easy right? Not always.

If you run this command, with CASCADE, on a table without partition, you will end up with this non-descriptive error:

Error: Error while compiling statement: FAILED: NullPointerException null (state=42000,code=40000)

In short:

  • if you have a partitioned table you must use CASCADE.
  • if you do not have partitions, you must not use CASCADE.
Advertisements

Why is my hive MERGE statement slow?

In my ETL flow, I need to merge a source table in a destination table, in Hive. This turned out to be much slower than expected so I had to dig around a lot and these are the results I discovered.

Context

Some data is coming from kafka, written as avro files on hdfs. These avro files are used to create an external table, which is then merged every day into the final ORC table. The external data files are then moved out of the way, meaning that the next ETL run will have a brand new external table to be fully merged into the destination table.

SQL


set hive.merge.cardinality.check=false;
set domainregexp='.*@(.*?)$';
MERGE INTO contact dst
USING (
  SELECT

    -- DISTINCT fields
      client -- partition column
    , user_id as id
    , ct.cid as cid
    -- other fields
     , email
     , lang
     -- note: domain is around here, but is computed from email. I compute
     -- it only when needed to prevent useless processing.
    , CAST(timestamp_ms_utc AS TIMESTAMP) AS ts_utc

    , ROW_NUMBER() OVER (
      PARTITION BY client
        , ct.cid
        , user_id
      ORDER BY timestamp_ms_utc DESC
     ) as r

  FROM
    external table
-- campaign_id is a stupid struct<long:bigint,array:array<bigint>>.
  -- Let's sanitise it.
  LATERAL VIEW explode(campaign_id) ct AS cid
) src
ON
  dst.client = src.client
  AND dst.campaign_id = src.cid
  AND dst.id = src.id

-- On match: keep latest loaded
WHEN MATCHED
 AND dst.updated_on_utc < src.ts_utc
 AND src.r = 1
THEN UPDATE SET
  -- other fields
    email = src.email
  , domain = regexp_extract(src.email, ${hiveconf:domainregexp}, 1)
  , lang = src.lang
  , updated_on_utc = src.ts_utc

WHEN NOT MATCHED AND src.r = 1 THEN INSERT VALUES (
   src.id
 , src.cid

 , src.email
 , regexp_extract (src.email, ${hiveconf:domainregexp}, 1)
 , src.lang

 , src.ts_utc -- insert_date
 , src.ts_utc -- update_date

 , src.client -- partition column
)
;

This statement:

  • reads the source table,
  • explodes an array (campaign_id),
  • orders the rows within the same ‘unique’ key (ROW_NUMBER()),
  • updates or inserts the first unique row.

Problem

Merging takes exponentially longer. Merging the first day into the (empty) destination table takes about 30 minutes. The second day takes about 1.5 hour. The third day takes 4 hours. I stopped there.

What could go wrong?

Many things as it turned out.

(Attempted) Solutions

SQL tweaking

My first guess was that my SQL was not great. Here is what I tried:

  • Removing the regex. No impact.
  • Create a temporary table without duplicates and merge that one. Negative impact (4x longer).
  • Execute the merge per partition, one by one. Very negative impact.
  • Replace the source table by a subquery to filter out the r=1 before the merge. Negative impact (20% longer).
  • Create a table with exaclty the same structure as the destination table as a temporary table and merge that one. Negative impact (30% longer).
  • Pre-explode the lateral view earlier in the process (25% longer).

Apparently my SQL was quite good, so I had to look elsewhere.

Java heap

It turned out that many of my services were under configured. I increased the datanode heap, namenode heap, hive metastore heap and this all already made a big difference in speed but it was not enough.

Small files

This was a massive issue.

I had 3 source files per table per minute. On top of this, I had some aggressive partitioning and bucketing (buckets are mandatory for ACID tables, ACID tables are mandatory for a merge).

Updating this to have 3 source files per hour and having only 4 buckets per table instead of 64 gave me great performance. I am still not fully clear about the impact of bucketing but this will be a question for later if I notice other performance problem. I have enough on my plate to not do premature optimisation.

Final solution

In four words: bigger heap, less files.

My initial 30 minute merge in an empty table is now done in about 8 minutes in a table with 145M rows in which 35M are merged daily.

 

 

 

About (big) kafka broker id

I had quite a bit of fun setting up the kafka broker id, and those are my findings, hoping to save time to other poor souls like me.

TL;DR;

Set up in your kafka config

  • nothing to have auto-generated ids
  • broker.id=something_big AND reserved.broker.max.id=something_even_bigger to manually set your ids up.

Long Story

The broker id is a unique identifier per broker. Each broker in the cluster must have a different id, which is a positive int (meaning for java something less than 2147483647). This is all fine and dandy and works nicely if your ids are increasing from 1, 2…

Another option, nice for automated deployment, would be to generate ids based on the ip address, which should be unique in a DC thus (probably) in a cluster. With puppet, a nice ruby expression in a template like:

broker.id=<%= @ipaddress.split('.').inject(0) {|total,value| (total << 8 ) + value.to_i} & 0X7FFFFFFFF %>

would nicely do to generate a 31 bit int from the 32 bits IP (java has no unsigned int, so we cannot use the full range), discarding only the highest bit to keep as much variability as possible.

Now, it so happens that kafka can generate its ids as well, from a zookeeper sequence. To make sure there is no collision, the auto-generated ids will not be under the undocumented reserved.broker.max.id value, which is 1000 by default.

Conversely, manual ids cannot be above this limit. If you dare set up in your config file an id above this, kafka will just not start, and more annoyingly not give you any feedback beyond an exit code of 1. The solution once you discover this configuration option is easy, just set it up as high as possible, for instance to the max int possible:

reserved.broker.max.id=2147483647

The problem was to find out that it actually was the problem.

On a side note, changing the id after the first kafka start is a very bad idea, and you will end up with a message saying for instance:

kafka.common.InconsistentBrokerIdException: Configured brokerId 999 doesn’t match stored brokerId 838 in meta.properties

EMR – Elastic Map Reduce

Amazon has its own flavour of Hadoop, and this page explores in which case it is worth using it instead of a usual Hadoop distribution on top of EC2.

What EMR is

Elastic Map Reduce, this is basically an Amazon-flavoured Hadoop distribution, patched and optimised to run on AWS, targeted towards one-off or very infrequent processing. It uses either Amazon’s own Hadoop or MapR.

Plus points

It is pretty easy to set up. Going to the EMR setup page, you just have a few knobs to click on to get a cluster up and running. Basically you choose if you want Amazon or MapR, the set of applications to be bundled in and the number and type of instances in your cluster. This can be done in hardly a minute and the cluster will automagically be provisioned for you.

It seems pretty much up to date, Spark 1.5 was available within a month of its release for instance.

The cluster can be managed in different ways, via the GUI, the console or APIs, making it very flexible to scale in or out.

Min points

The usual min points of something which is managed for you apply. There is only a limited set of applications bundled in, namely Hadoop, Hive, Hue, Mahout, Oozie-Sandbox, Pig, Presto-Sandbox, Spark and Zeppelin-Sandbox. If you need another one or a different version you are out of luck. It is possible to do some manual installation or updates but probably defeats the purpose of paying extra to have a managed cluster.

Running costs are higher than using your Hadoop cluster on EC2, as you still have to pay not only for the EC2 servers but for EMR as well. The cost to have EMR is about 20-25% on top of EC2 costs.

The default storage is S3, which is not meant for low-latency access. This might not be an issue for the use cases where EMR is really good, but can definitely become a problem if low latency is a must for you.

Interesting notes

You have the option, when setting a cluster up, to choose for a long-running or transient life-cycle. This gives you the option to spawn a cluster for very infrequent jobs, have them run, and destroy the cluster (so not paying for it while idle) after completion.

Note that you cannot have more than 256 jobs (named steps) active at the same time. In older versions, 256 jobs was the total over the lifetime of the cluster.

Usage

It is really easy to submit a job. The storage is all in S3, so once

  • your input data is in s3
  • your job, consisting of a mapper and a reducer (jar or streaming in any language you wish)
  • you created an output directory in S3

You basically just have to fill these paths into a form and the job will run.

My experience is that as expected the latency is very high.

It is possible to chain steps, but you must then use AWS data pipeline, not covered here.

Summary

Basically, EMR would be great in 2 situations:

  • Very infrequent use of data without strong latency requirements. You can then spawn a transient cluster, have it do whatever processing you planned to do and destroy it to save costs afterwards.
  • If the costs associated with managing a cluster would be higher than the extra EMR costs. This would probably be the case for short term cluster, which reinforce the previous point.