Anatomy of a Merge statement in Hive

When a merge statement is issued, it is actually reparsed in a bunch of inserts. This shows interesting properties which can help you better understand the performance of your statement.

The test setup is easy: one source table, one destination table. The destination needs to be ACID and thus bucketed (this is pre-Hive 3).

create table tmp.tstmerge
(b int, v string)
partitioned by (id int)
clustered by (b) into 1 buckets
stored as orc
TBLPROPERTIES ("transactional"="true");

create table tmp.srcmerge
stored as orc
select 1 as id, 'foo' as v;

The merge statement is trivial as well:

merge into tmp.tstmerge dst
using tmp.srcmerge src
when matched and = 0 then delete
when matched then update set
     v=concat(dst.v, src.v)
when not matched then insert values (
    src.b, src.v,

What becomes interesting is that you can see in hiveserver2.log a line like:


For the given merge, here is the actual ANOTHER_STATEMENT:

    tmp.tstmerge dst
    tmp.srcmerge src
INSERT INTO tmp.tstmerge partition (id)    -- delete clause
        dst.ROW__ID ,
    WHERE AND = 0
    sort by
INSERT INTO tmp.tstmerge partition (id)    -- update clause
        dst.ROW__ID, dst.b, concat(dst.v, src.v),
    WHERE AND NOT( = 0)
    sort by
INSERT INTO tmp.tstmerge partition (id)    -- insert clause
        src.b, src.v,
INSERT INTO merge_tmp_table
        count(*) > 1

What do we have here?

  1. This becomes a multi-insert statement with 3 different selections for the update, insert and delete clauses. The multi insert is a hive extension.
  2. ROW__ID is a hidden column for acid tables, containing a map:
    select row__id from tmp.tstmerge;
    | row__id |
    | {"transactionid":44,"bucketid":0,"rowid":0} |
  3. To update a row, you just need to change the columns of a row identified by its ROW__ID. Deleting a row is equivalent to nullifying all columns of a ROW__ID. This works because all clauses are insert in the ORC delta files anyway.
  4. cardinality_violiation is a UDF which will exception out if more than one row has the same set of ROW__ID and join condition. This is because the SQL syntax says that there cannot be more than 1 row in the source matching the destination. It will be executed (and thus exception out) only if such a row exists. On a side note, if you prevent cardinality checking (set hive.merge.cardinality.check=false;) then this leg of the multi insert does not exist.
  5. Rows are sort by ROW__ID. Note first that sort by will sort per reducer, whereas order by would sort across all reducers. Order by would be prohibitively expensive. The reason for the sorting is that when delta files are read they can be directly merged on read.

Practically this means that you cannot order data in ORC ACID tables (which is a shame as it is the one thing to do performance-wise when using ORC). Furthermore, any ordering in the src statement, if not meant to speed the join up, will be useless.

The cost of ACID with ORC table

ACID introduction

ACID transactions (update, merge) in Hive are awesome. The merge statement especially is incredibly useful.

Of course, not all table are ACID. You need to use ORC and have the table marked as ACID but those are easy steps:

create table something (id bigint) stored as orc tblproperties("transactional"="true")

Of course, in hdfs you cannot change a file once it is created. The standard way (not Hadoop specific) to handle changing immutable files is to have deltas. Each table will consist of a few directories:

  • the base directory: the data at creation time,
  • one or more delta directories: contains updated rows.

Every hive.compactor.check.interval seconds a compaction will happen (or at least the compactor will check if a compaction must happen). The compactor will compact the deltas and base directory in a new base directory, which will consist of a one new base directory with all the deltas applied to the original base directory.

The reason is that when you read an ACID table with many deltas, there is a lot more to read than for only a base directory as hive has to go through each and every delta. This has IOs and CPU costs, which are removed after compaction.

Naive ACID use

Every day I build a summary table gathering all data that changed in the last 24h as well as some related data. Many events are aggregated together. Think for instance about sending an email: I would get send data, open data maybe click data, bounce and a few others. I started building following the temporal flow:

create table summary (id bigint, number_sent bigint, number_open bigint...)stored as orc tblproperties("transactional"="true");

insert into summary select .... from sent;

merge into summary select ... from open;

merge into summary select ... from click;


Overall a few billions rows will be read. The final summary table will have about 100 millions rows.

What is interesting here is that I am inserting the biggest data first. This is the table summing up reads and writes per event while building the whole summary, which ran for about 4 hours:

Event Bytes read (GB) Bytes written (GB)
Total 516.5 104.1
Sent 16.2 87.1
Open 88.8 14.2
Click 101.5 1.7
Conversion 102.9 0.01
Bounce 103 1
Spam 104 0.11

Seeing 500GB read scared me a little, so instead of following the naive temporal flow, I started with the smallest event first to finish up with the biggest:

Event Bytes read (GB) Bytes written (GB)
Total 31.5 99.1
Conversion 0 0
Spam 0 0
Click 0.3 1.5
Bounce 1.7 1
Open 4.4 13.3
Sent 25.1 83.4

That’s much better already! The total number of bytes written does not change much (quite logical I suppose as the final data is the same) but the number of bytes read is only 6% of the original! Furthermore, it ran in 2h40 instead of 4 hours.

I added one last step. This summary data was written at user level. I actually needed to do one extra aggregation but I was worried about joining against the user table at every step, as the user table is actually quite big and joins are expensive. But well, I experimented, doing the aggregation at each step instead of  doing one big aggregation at the end:

Event Bytes read (GB) Bytes written (GB)
Total 20.5 8.6
Conversion 0.2 0
Spam 1.2 0
Click 1.4 0.2
Bounce 1.5 0.2
Open 3.5 1.7
Sent 12.7 6.4

Total run time: 1.5 hours!


When using ACID deltas are expensive. When using HDFS writes are expensive. Order your processing to have a little of those as possible.

Automated Hadoop setup with ambari

Hadoop is complex to configure, this is not new. Cluster managers like ambari help, of course, but finding the sweet configuration spot is not easy, especially as the spot will be different per use case.

My cluster is almost yarn/tez only currently, so I wrote this python script which will look at the whole cluster via the ambari API and configure a good chunk of it, depending on the number of disks, CPU, ram and based on documentation I found scattered all around. It works great even on small clusters and will tell you the reason behind the values of the configuration settings.

The caveats are that LLAP is not yet managed (it was attempted so the option is there, but it does not do anything) and that it assumes that all datanodes are identical.

The default run mode is read-only but you can ask the script to actually update ambari. You will still read to restart the relevant updated services yourself, on your own time (this is of course just one click in the ambari UI).

An example showing only what my script thinks is not correct:

./ --tofix

Basic info

Yarn config.
✘ yarn-site/yarn.scheduler.minimum-allocation-mb = 768, expects 1024 (Min container size.) #75%
✘ capacity-scheduler/yarn.scheduler.capacity.resource-calculator = org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator, expects org.apache.hadoop.yarn.util.resource.DominantResourceCalculator (Take all resources in account, not only RAM) #0%

A full run:


Basic info
FYI – { ‘’: {‘cpu’: 8, ‘mem’: 33566638080},
‘’: {‘cpu’: 8, ‘mem’: 33566638080},
‘’: {‘cpu’: 8, ‘mem’: 33566638080},
‘’: {‘cpu’: 8, ‘mem’: 33566638080},
‘’: {‘cpu’: 8, ‘mem’: 33566638080},
‘’: {‘cpu’: 8, ‘mem’: 33566633984},
‘ip-10-0-0-007eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘’: {‘cpu’: 8, ‘mem’: 33566638080}}: Data nodes.
FYI – {‘cpu’: 64, ‘disk’: 104, ‘mem’: 268533100544}: Total cluster resources.
FYI – 1024: Min container size (MB), based on amount of ram/cpu in the cluster.
FYI – 128: Number of containers based on recommendations.
FYI – 0.55: Default queue capacity.

Yarn config.
✔ yarn-site/yarn.nodemanager.resource.memory-mb = 24008, expects 24008 (min(yarn memory for one DN) * 0.75.) #100%
✘ yarn-site/yarn.scheduler.minimum-allocation-mb = 768, expects 1024 (Min container size.) #75%
✔ yarn-site/yarn.scheduler.maximum-allocation-mb = 24008, expects 24008 (Same as yarn.nodemanager.resource.memory-mb) #100%
✔ yarn-site/yarn.nodemanager.resource.cpu-vcores = 7, expects 7 (Assuming the cluster in yarn only. Total cores per node -1) #100%
✔ yarn-site/yarn.scheduler.maximum-allocation-vcores = 7, expects 7 (Assuming the cluster in yarn only. Total cores per node -1) #100%
✘ capacity-scheduler/yarn.scheduler.capacity.resource-calculator = org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator, expects org.apache.hadoop.yarn.util.resource.DominantResourceCalculator (Take all resources in account, not only RAM) #0%
Map/reduce config
✔ mapred-site/ = 1024, expects 1024 (Min container size) #100%
✔ mapred-site/mapreduce.reduce.memory.mb = 2048, expects 2048 (2 * min container size) #100%
✔ mapred-site/ = 819, expects 819 (0.8 * min container size) #100%
✔ mapred-site/ = 1638, expects 1638 (0.8 * mapreduce.reduce.memory.mb) #100%
✔ mapred-site/ = 2048, expects 2048 (2 * min container size) #100%
✔ mapred-site/ = 1638, expects 1638 (0.8 * #100%
✔ mapred-site/ = 409, expects 409 (0.4 * min container size) #100%

Hive and Tez configuration
✔ hive-site/hive.execution.engine = tez, expects tez (Use Tez, not map/reduce.) #100%
✔ hive-site/hive.server2.enable.doAs = false, expects false (All queries will run as Hive user, allowing resource sharing/reuse.) #100%
✔ hive-site/hive.optimize.index.filter = true, expects true (This optimizes “select statement with where clause” on ORC tables) #100%
✔ hive-site/hive.fetch.task.conversion = more, expects more (This optimizes “select statement with limit clause;”) #100%
✔ hive-site/hive.compute.query.using.stats = true, expects true (This optimizes “select count (1) from table;” ) #100%
✔ hive-site/hive.vectorized.execution.enabled = true, expects true (Perform operations in batch instead of single row) #100%
✔ hive-site/hive.vectorized.execution.reduce.enabled = true, expects true (Perform operations in batch instead of single row) #100%
✔ hive-site/hive.cbo.enable = true, expects true (Enable CBO. You still need to prepare it by using the analyse HQL command.) #100%
✔ hive-site/hive.compute.query.using.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.fetch.column.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.fetch.partition.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.autogather = true, expects true (Use CBO.) #100%
✔ hive-site/hive.server2.tez.default.queues = default, expects ‘lambda x: config.queue in x’ (Must contain the queue name) #100%
✔ hive-site/hive.tez.dynamic.partition.pruning = true, expects true (Make sure tez can prune whole partitions) #100%
✔ hive-site/hive.exec.parallel = true, expects true (Can Hive subqueries be executed in parallel) #100%
✔ hive-site/ = true, expects true (use map joins as much as possible) #100%
✔ hive-site/ = true, expects true (Use map joins for small datasets) #100%
✔ hive-site/hive.tez.container.size = 4096, expects 4096 (Multiple of min container size.) #100%
✔ hive-site/ = 1417339207, expects 1417339207 (Threshold to perform map join. 1/3 * hive.tez.container.size.) #100%
✔ hive-site/hive.vectorized.groupby.maxentries = 10240, expects 10240 (Reduces execution time on small datasets, but also OK for large ones.) #100%
✔ hive-site/hive.vectorized.groupby.flush.percent = 0.1, expects 0.1 (Reduces execution time on small datasets, but also OK for large ones.) #100%
✔ hive-site/hive.server2.tez.initialize.default.sessions = true, expects true (Enable tez use without session pool if requested) #100%
✔ hive-site/hive.server2.tez.sessions.per.default.queue = 3, expects 3 (Number of parallel execution inside one queue.) #100%

Hive and Tez memory
✔ tez-site/ = 1024, expects 1024 (Appmaster memory == min container size.) #100%
✔ tez-site/ = true, expects true (Reuse tez containers to prevent reallocation.) #100%
✔ tez-site/ = 0.8, expects 0.8 (default % of memory used for java opts) #100%
✔ tez-site/ = 1024, expects 1024 (memory when the output needs to be sorted. == 0.25 * tezContainerSize (up to 40%)) #100%
✔ tez-site/tez.runtime.unordered.output.buffer.size-mb = 307, expects 307 (Memory when the output does not need to be sorted. 0.075 * hive.tez.container.size (up to 10%).) #100%
✔ tez-site/tez.task.resource.memory.mb = 1024, expects 1024 (Mem to be used by launched taks. == min container size. Overriden by hive to hive.tez.container.size anyway.) #100%
✔ tez-site/tez.task.launch.cmd-opts = 819, expects 819 (xmx = 0.8 * minContainerSize) #100%
✔ hive-site/ = 3276, expects 3276 (xmx = 0.8 * tezContainerSize) #100%
✔ hive-site/hive.prewarm.enabled = true, expects true (Enable prewarm to reduce latency) #100%
✔ hive-site/hive.prewarm.numcontainers = 3, expects ‘lambda x: x >= 1’ (Hold containers to reduce latency, >= 1) #100%
✔ tez-site/ = 300, expects 300 (Tez Application Master waits for a DAG to be submitted before shutting down. Only useful when reuse is enabled.) #100%
✔ tez-site/ = 10000, expects 10000 (Tez container min wait before shutting down. Should give enough time to an app to send the next query) #100%
✔ tez-site/ = 20000, expects 20000 (Tez container min wait before shutting down) #100%
✔ tez-site/ = *, expects * (Enable tz ui access) #100%
✔ yarn-site/ = org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl, expects org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl (Set up tez UI) #100%
✔ mapred-site/mapreduce.job.acl-view-job = *, expects * (Enable tez ui for mapred jobs) #100%

Compress all
✔ mapred-site/ = true, expects true #100%
✔ mapred-site/mapreduce.output.fileoutputformat.compress = true, expects true #100%
✔ hive-site/hive.exec.compress.intermediate = true, expects true #100%
✔ hive-site/hive.exec.compress.output = true, expects true #100%

Queue configuration. Assuming queue default is subqueue from root. Note that undefined values are inherited from parent.
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.maximum-am-resource-percent = 0.2, expects ‘lambda x: x != ‘NOT FOUND’ and float(x) >= 0.2′ (How much of the Q the AM can use. Must be at least 0.2.) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.ordering-policy = fair, expects fair (Helps small queries get a chunk of time between big ones) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.user-limit-factor = 2, expects ‘lambda x: x != ‘NOT FOUND’ and int(x) >= 1′ (How much of the Q capacity the user can exceed if enough resources. Should be at leat 1. 1=100%, 2=200%…) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.minimum-user-limit-percent = 10, expects ‘lambda x: x != ‘NOT FOUND’ and int(x) >= 10′ (How much of the Q in percent a user is guaranteed to get. Should be at least 10) #100%

Random stuff
✔ hdfs-site/dfs.client.use.datanode.hostname = true, expects true (For AWS only) #100%

✔ hive-interactive-env/enable_hive_interactive = false, expects false (Disable LLAP) #100%

More doc can be found at:
Memory settings:
Hive performance tuning:

Will not update the unexpected parameters without –update.




Find a timezone offset in pure SQL in hive

Timezones are a pain. This is not new and every time you deviate from UTC this will bite you. That said sometimes you have to deviate from UTC, especially for the final display of a date if you want to show it in the local timezone from the reader. In that case, adding an offset to be explicit will save some questions and uncertainty down the line.

There is no function get_offset_from_tz() in Hive, sadly. Using reflect() does not work either as the method call is to complex for reflect. Writing a UDF would be possible but feels overkill.

The solution I give here works in Hive and should probably work in all SQL variants as well apart from the variables.

The algorithm to find the offset is easy:

  • get the time in UTC,
  • get the same in another timezone,
  • subtract one from the other to get the offset,
  • format the offset in a standard way.

The main issue is that you cannot assign results to variables in SQL, meaning that many computations need to be duplicated. They will be optimised away, of course, but they make for an ugly code.

In hive, luckily, you can use variables. They cannot store results but are used as-is, a bit like macros, where the variable name is just replaced by its content which can be some piece of code.

This sets up the date to find the offset for as well as a few TZ for test.

-- Date to display. If you use this from a table you can
-- put here the column that would be used, eg. t.logdate.
set hivevar:D='2018-06-01 01:02:02';

-- A few tests:
-- positive offset +02:00 (in summer)
set hivevar:DISPLAY_TZ='Europe/Amsterdam';

-- negative offset -04:00 (in summer)
set hivevar:DISPLAY_TZ='America/New_York';

-- 0 offset
set hivevar:DISPLAY_TZ='UTC';

-- Non integer offset: +09:30
set hivevar:DISPLAY_TZ='Australia/Adelaide';

Those are the macros

-- Date displayed in the right TZ
set hivevar:dateintz=DATE_FORMAT(FROM_UTC_TIMESTAMP(${D}, ${DISPLAY_TZ}),"yyyy-MM-dd HH:mm:ss");
-- Offset in interval type
set hivevar:delta=cast(${dateintz} as timestamp) - cast(${D} as timestamp);

And the code itself, tiny and readable once variables are used:

    -- date in TZ

    -- sign
    , if(${delta} < interval '0' minute, '-', '+')

    -- hour
    , lpad(abs(hour(${delta})), 2, 0)

    , ':'

    -- minute
    ,lpad(minute(${delta}), 2, 0)
) as dtwithoffset

et voilà.

Hive, map, NULL and NPE

Checking for null values in a map column in Hive (1.2.1, Hortonworks) interestingly returns a null pointer exception:

create table npe (m map);
select count(*) from npe where m is null;
Error: Error while compiling statement: FAILED: NullPointerException null (state=42000,code=40000)

The error happens at parsing time when Hive tries to estimate data size. From hiveserver2.log:

Caused by: java.lang.NullPointerException
at org.apache.hadoop.hive.ql.stats.StatsUtils.getSizeOfMap(

Interestingly, getting not null is fine:

select count(*) from npe where m is not null; -- returns 0

If you think like me, you will think ‘haha! not not null should work!’

select count(*) from npe where not m is not null; -- does not work

If you are smarter than me, you will have guessed before trying that Hive optimises the double negation away, and gives another NPE.

But going in this direction, we can still trick Hive by casting the boolean to int:

select count(*) from npe where int(m is not null)=0; -- works

This happens either without data either when there are real NULLs in the table. By real NULL I mean that a SELECT would show NULL, which happens only in the case where you add a column to an existing table. Indeed, you cannot yourself insert NULL into a complex column:

with a as (select null) insert into npe select * from a;
Error: Error while compiling statement: FAILED: SemanticException [Error 10044]: Line 1:36 Cannot insert into target table because column number/types are different 'npe': Cannot convert column 0 from void to map. (state=42000,code=10044)

You have to create an empty map object:

with a as (select map(cast(null as bigint), cast(null as bigint))) insert into npe select * from a;

Then, of course, the empty map object is not (a real) NULL and if you want to look for null you have to fudge a bit, looking at the size of the map for instance:

select m, size(m), isnull(m) from npe;
| m  | _c1 | _c2    |
| {} |  0  | false  |

Hive self merge

I had a (ORC) table with duplicated rows, which I wanted to remove. The query is quite simple:

merge into click as dst using (
      -- For all unique clicks...
      , contact_id
      , ts_utc
      -- ... find the duplicates (cnt>1 in having) ...
      , count(*) cnt
      -- ... remember the first one loaded ...
      , min(load_ts) as first_load
      group by
        1, 2, 3
      having cnt > 1
as src
-- ... once the first occurrence of the duplicates
-- is found find all the duplicates ...
    and dst.contact_id=src.contact_id
    and dst.ts_utc=src.ts_utc
-- ... and if it is not the first one loaded ...
when matched and src.first_load != dst.load_ts
-- .. delete it.
then delete

Trivial, right? Well it looks like you cannot do such a ‘self merge’ in hive. I ended up with this error:

 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(
 at org.apache.tez.runtime.InputReadyTracker$InputReadyMonitor.awaitCondition(
 at org.apache.tez.runtime.InputReadyTracker.waitForAllInputsReady(
 at org.apache.tez.runtime.api.impl.TezProcessorContextImpl.waitForAllInputsReady(

The solution, once understood that a self merge is not allowed, is of course obvious: use a temporary table. Splitting my merge statement in 2 did the trick.

create temporary table clickdups stored as orc as select
      , contact_id
      , ts_utc
      , count(*) cnt
      , min(load_ts) as first_load
      group by
        1, 2, 3
      having cnt > 1

merge into click as dst using clickdups
as src
    and dst.contact_id=src.contact_id
    and dst.ts_utc=src.ts_utc
when matched and src.first_load != dst.load_ts
then delete

On a side note I needed to tweak a lot the self-merge to prevent out of memory error. Those did not happen at all using the 2 steps solution.

Create a time dimension table in pure hive SQL

Without further ado, here is the full SQL to create a table giving you one row per day, with date, year, month, day, day and name of the week, day of the year. If you want the hours as well, look at the bottom of this post.

set hivevar:start_day=2010-01-01;
set hivevar:end_day=2050-12-31;
set hivevar:timeDimTable=default.timeDim;

create table if not exists ${timeDimTable} as
with dates as (
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a
    d as d
  , year(d) as year
  , month(d) as month
  , day(d) as day
  , date_format(d, 'u') as daynumber_of_week
  , date_format(d, 'EEEE') as dayname_of_week
  , date_format(d, 'D') as daynumber_of_year

from dates
sort by d

Note that I use d as date column because date is a reserved keyword.

The biggest issue is to generate one row per day. The trick here is to use a clever combination of posexplode, split and reapeat. This is what the first CTE does:

-- just 10 days for the example
set hivevar:start_day=2010-01-01;
set hivevar:end_day=2010-01-10;
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a

We can break it down in a few parts:

select datediff("${end_day}", "${start_day}");
-- output: 9

Just computes the difference between start and end day in days.

select repeat("o", 9);
-- output: ooooooooo

Will output a string with 9 ‘o’. The actual character does not matter at all.

select split("ooooooooo", "o");
-- output:  ["","","","","","","","","",""]

Creates a hive array of 9 (empty) strings.

select posexplode(split("ooooooooo", "o"));
-- output:
-- +------+------+--+
-- | pos | val |
-- +------+------+--+
-- | 0 | |
-- | 1 | |
-- | 2 | |
-- | 3 | |
-- | 4 | |
-- | 5 | |
-- | 6 | |
-- | 7 | |
-- | 8 | |
-- | 9 | |
-- +------+------+--+

Actually create a row per array element, with the index (0 to 9) and the value (nothing) of each element.

That was the tricky part, the rest is easy. The first CTE creates a row with each date, adding the array index (in day) to the start_day:

with dates as (
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a)
select * from dates;
-- +-------------+--+
-- | dates.d |
-- +-------------+--+
-- | 2010-01-01 |
-- | 2010-01-02 |
-- | 2010-01-03 |
-- | 2010-01-04 |
-- | 2010-01-05 |
-- | 2010-01-06 |
-- | 2010-01-07 |
-- | 2010-01-08 |
-- | 2010-01-09 |
-- | 2010-01-10 |
-- +-------------+--+

From there on, you can just create whatever column you feel like. Quarter column? floor(1+ month(d)/4) as quarter. Long name of the week? date_format(d, 'EEEE') as dayname_of_week_long.

As a bonus, I give you the same table but with hours added. The principles are exactly the same, with a cartesian join beween dates and hour:

set hivevar:start_day=2010-01-01;
set hivevar:end_day=2010-01-02;
set hivevar:timeDimTable=default.timeDim;

create table if not exists ${timeDimTable} as<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>
with dates as (
  select date_add("${start_day}", a.pos) as d
  from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a
hours as (
  select a.pos as h
  from (select posexplode(split(repeat("o", 23), "o"))) a
    from_unixtime(unix_timestamp(cast(d as timestamp)) + (h * 3600)) as dt
  , d as d
  , year(d) as year
  , month(d) as month
  , day(d) as day
  , h as hour
  , date_format(d, 'u') as daynumber_of_week
  , date_format(d, 'EEEE') as dayname_of_week
  , date_format(d, 'D') as daynumber_of_year

from dates
join hours
sort by dt

Alter location of a Hive table

Long story short: the location of a hive managed table is just metadata, if you update it hive will not find its data anymore. You do need to physically move the data on hdfs yourself.

Short story long:

You can decide where on hdfs you put the data of a table, for a managed table:

create table if not exists tstloc (id bigint)
clustered by (id) into 4 buckets
stored as orc
location 'hdfs:///tmp/ttslocorig'
tblproperties ("transactional"="true");
insert into tstloc values(1);
select * from tstloc;

Now if you want to move this table to another location for any reason, you might run the following statement:

alter table tstloc set location 'hdfs:///tmp/ttslocnew';

But then the table is empty!

select * from tstloc;

will return an empty set. The reason is that the location property is only metadata, telling hive where to look without any effect on said location (except at creation time, where the location will be created if it does not exist for managed tables). If nothing happens to be there, hive will not return anything. Conversely, if it happens to be something, hive will return this something.

To get your data back, you just need to physically move the data on hdfs at the expected location:

hdfs dfs -mv /tmp/ttslocorig /tmp/ttslocnew



Compression of ORC tables in Hive

I only use ORC tables in Hive, and while trying to understand some performance issues I wanted to make sure my tables where properly compressed. This is easy, just run

desc extended table;

and search the output for the string


Well, it turned out that it was false for all my tables although I was pretty sure I set up everything correctly, so I dug and experimented a bit. I generated an easy to compress data set, and load it in a few different tables with different options.

# create 1 csv, 500MB of easy to compress data
yes '1,longish string which will compress really well' | head -n 10000000 > /tmp/source.csv

# Copy this file in hdfs
hdfs dfs -mkdir /tmp/compressiontest
hdfs dfs -copyFromLocal /tmp/source.csv /tmp/compressiontest/source.csv

Then I loaded this data in 2 tables, compressed and uncompressed, directed with the setting hive.exec.compress.output.

LOCATION '/tmp/compressiontest'

CREATE TABLE shouldbecompressed ( id INT, s STRING)
LOCATION '/tmp/shouldbecompressed';

CREATE TABLE shouldbeuncompressed (id INT, s STRING)
LOCATION '/tmp/shouldbeuncompressed';

set hive.exec.compress.output=true;
INSERT INTO shouldbecompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldbecompressed;

set hive.exec.compress.output=false;
INSERT INTO shouldbeuncompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldbeuncompressed;

I still have compressed:false, but what happens on disk?

hdfs dfs -du -s -h /tmp/should\*

42.5 K /tmp/shouldbecompressed
39.8 K /tmp/shouldbeuncompressed

Hum, apparently both tables are compressed? It turned out that I forgot about an orc parameter (orc.compress), set by default to ZLIB for me. The other valid values are SNAPPY or NONE. So let’s try again:

CREATE TABLE shouldreallybecompressed ( id INT, s STRING)
LOCATION '/tmp/shouldreallybecompressed'
TBLPROPERTIES ("orc.compress"="ZLIB")

CREATE TABLE shouldreallybeuncompressed ( id INT, s STRING)
LOCATION '/tmp/shouldreallybeuncompressed'
TBLPROPERTIES ("orc.compress"="NONE")

set hive.exec.compress.output=true;
INSERT INTO shouldreallybecompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldreallybecompressed;

set hive.exec.compress.output=false;
INSERT INTO shouldreallybeuncompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldreallybeuncompressed;
hdfs dfs -du -s -h /tmp/should\*

42.5 K /tmp/shouldbecompressed
39.8 K /tmp/shouldbeuncompressed
38.8 K /tmp/shouldreallybecompressed
3.8 M /tmp/shouldreallybeuncompressed

So indeed, the uncompressed table is less compressed, but is still a far cry from the 500MB I expected.

Long story short, ORC does some compression on its own, and the parameter orc.compress is just a cherry on top. on a side note, using SNAPPY instead of ZLIB the data size was 197k instead of 44k.

To look even deeper, hive on the command line has an option –orcfiledump, which will give some metadata about an orc file. So looking at a compressed file:

hive --orcfiledump /tmp/shouldbecompressed/000007_0

We can see, among other lines:

# yes, compressed!
Compression: ZLIB

# This is the buffer size, nothing to do with actual data size
Compression size: 262144

File length: 5459 bytes

For an uncompressed file:

hive --orcfiledump /tmp/shouldreallybeuncompressed/000000_0

Compression: NONE
File length: 136741 bytes

Long story short, the output of desc extended regarding compression is useless. And all my tables are indeed compressed.

This example was a bit artificial as the source file was very compressible. With another source file more random, generated as follow:

cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold | head -c 500000k | awk '{print "1," $0}'> source.csv

Then the size on disk becomes:

370.4 M /tmp/shouldbecompressed
370.4 M /tmp/shouldbeuncompressed
370.4 M /tmp/shouldreallybecompressed
490.0 M /tmp/shouldreallybeuncompressed

And just because I am nice, here are the lines to clean up your droppings:

drop table shouldbecompressed;
drop table shouldbeuncompressed;
drop table shouldreallybeuncompressed;
drop table shouldreallybecompressed;
drop table sourcedata;

Variables in Hive

I will here explain how to set and use variables in hive.

How to set a variable

Just use the keyword set

set foo=bar;
set system:foo=bar

Alternatively, for the hiveconf namespace you can set the variable on the command line:

beeline --hiveconf foo=bar

How to use a variable

Wherever you want to use a value, use this syntax instead: ${namespace:variable_name}.  if the namespace is hivevar, it can be ommited. For instance:

select '${hiveconf:foo}', '${system:foo}', '${env:CLASSPATH}', ${bar};

Note that variables will be replaced before anything else happens. This means that this is perfectly valid:

set hivevar:t=employees;
set hivevar:verb=desc;
${verb} ${t};

But this will not do what you expect (hint: you will end up with 4 quotes in your select statement):

set hivevar:s='Hello world';
select '${s}';


Furthermore, it means that you need to take care of your data type. As selecting a bare string is not valid, so is the following code invalid as well:

set hivevar:v=astring;
select ${v};

You will get:

Error: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:7 Invalid table alias or column reference ‘astring’: (possible column names are: ) (state=42000,code=10004)

In our case, you just need to quote the variable.

Note that this would work as-is with an int as a bare int is valid in the select statement.

Another caveat is to make sure the variable exists, otherwise you will either get the variable literal for quoted variables:

select ‘${donotexists}’;
| _c0 |
| ${donotexists} |

Either an unhelpful message for unquoted variables:

> select ${doesnotexist};
Error: Error while compiling statement: FAILED: ParseException line 1:7 cannot recognize input near ‘$’ ‘{‘ ‘hiveconf’ in select clause (state=42000,code=40000)

If you only want to see the value of a variable, you can just use set as well:

set hiveconf:foo;

How to List variables

Just use SET;, but this will output a massive unreadable list. You are better off redirecting this output to a file, e.g.

beeline -e 'SET;' | sed 's/\s\+/ /g'> set.out

Note that I squash the spaces here. As the columns are aligned and some values are very long strings, squashing makes reading much easier.

Then if you want to see a specific set of variables, you can just run:

# system variables
grep '| system:' set.out

# Env variables
grep '| env:' set.out

# other variables
cat set.out | grep -v '| env:' | grep -v '| system:'


Hive has 4 namespaces for variables: hivevar, hiveconf, system and env.


Hivevar is the easiest namespace to use, as you do not need to explicitly mention it when using a variable.

set hivevar:foo=bar;
select "${foo}";


Hiveconf is the namespace used when you use set without explicit namespace or when you give a variable on the command line with –hiveconf foo=bar. Note that you can set those without specifying the namespace, but you always need to specify the namespace when using them.

set foo=bar;
select "${hiveconf:foo}";


This is the namespace of the shell environment variables. You can easily get them with the ${env} prefix:

SELECT "${env:hostname}";

I specifically chose this variable. If you run this query yourself, you will see that it is the environment of the hive server which is used, not the environemnt of your client. This limits a lot the use of environment variables.

Note that environment variables cannot be set.


Those will contain for instance jvm settings, logfile destinations and more.