Hive self merge

I had a (ORC) table with duplicated rows, which I wanted to remove. The query is quite simple:

merge into click as dst using (
    select
      -- For all unique clicks...
        client_name
      , contact_id
      , ts_utc
      -- ... find the duplicates (cnt>1 in having) ...
      , count(*) cnt
      -- ... remember the first one loaded ...
      , min(load_ts) as first_load
      from
        click
      group by
        1, 2, 3
      having cnt > 1
)
as src
-- ... once the first occurrence of the duplicates
-- is found find all the duplicates ...
on
        dst.client_name=src.client_name
    and dst.contact_id=src.contact_id
    and dst.ts_utc=src.ts_utc
-- ... and if it is not the first one loaded ...
when matched and src.first_load != dst.load_ts
-- .. delete it.
then delete
;

Trivial, right? Well it looks like you cannot do such a ‘self merge’ in hive. I ended up with this error:

java.lang.InterruptedException
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at org.apache.tez.runtime.InputReadyTracker$InputReadyMonitor.awaitCondition(InputReadyTracker.java:120)
 at org.apache.tez.runtime.InputReadyTracker.waitForAllInputsReady(InputReadyTracker.java:90)
 at org.apache.tez.runtime.api.impl.TezProcessorContextImpl.waitForAllInputsReady(TezProcessorContextImpl.java:116)
 [...]

The solution, once understood that a self merge is not allowed, is of course obvious: use a temporary table. Splitting my merge statement in 2 did the trick.

create temporary table clickdups stored as orc as select
        client_name
      , contact_id
      , ts_utc
      , count(*) cnt
      , min(load_ts) as first_load
      from
        click
      group by
        1, 2, 3
      having cnt > 1
;

merge into click as dst using clickdups
as src
on
        dst.client_name=src.client_name
    and dst.contact_id=src.contact_id
    and dst.ts_utc=src.ts_utc
when matched and src.first_load != dst.load_ts
then delete
;

On a side note I needed to tweak a lot the self-merge to prevent out of memory error. Those did not happen at all using the 2 steps solution.

Advertisements

Create a time dimension table in pure hive SQL

Without further ado, here is the full SQL to create a table giving you a table with one row per day, with date, year, mont, day, day and name of the week, day of the year. If you want the hours as well, look at the bottom of this post.

set hivevar:start_day=2010-01-01;
set hivevar:end_day=2050-12-31;
set hivevar:timeDimTable=default.timeDim;

create table if not exists ${timeDimTable} as
with dates as (
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a
)
select
    d as d
  , year(d) as year
  , month(d) as month
  , day(d) as day
  , date_format(d, 'u') as daynumber_of_week
  , date_format(d, 'EEEE') as dayname_of_week
  , date_format(d, 'D') as daynumber_of_year

from dates
sort by d
;

Note that I use d as date column because date is a reserved keyword.

The biggest issue is to generate one row per day. The trick here is to use a clever combination of posexplode, split and reapeat. This is what the first CTE does:

-- just 10 days for the example
set hivevar:start_day=2010-01-01;
set hivevar:end_day=2010-01-10;
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a

We can break it down in a few parts:

select datediff("${end_day}", "${start_day}");
-- output: 9

Just computes the difference between start and end day in days.

select repeat("o", 9);
-- output: ooooooooo

Will output a string with 9 ‘o’. The actual character does not matter at all.

select split("ooooooooo", "o");
-- output:  ["","","","","","","","","",""]

Creates a hive array of 9 (empty) strings.

select posexplode(split("ooooooooo", "o"));
-- output:
-- +------+------+--+
-- | pos | val |
-- +------+------+--+
-- | 0 | |
-- | 1 | |
-- | 2 | |
-- | 3 | |
-- | 4 | |
-- | 5 | |
-- | 6 | |
-- | 7 | |
-- | 8 | |
-- | 9 | |
-- +------+------+--+

Actually create a row per array element, with the index (0 to 9) and the value (nothing) of each element.

That was the tricky part, the rest is easy. The first CTE creates a row with each date, adding the array index (in day) to the start_day:

with dates as (
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a)
select * from dates;
-- +-------------+--+
-- | dates.d |
-- +-------------+--+
-- | 2010-01-01 |
-- | 2010-01-02 |
-- | 2010-01-03 |
-- | 2010-01-04 |
-- | 2010-01-05 |
-- | 2010-01-06 |
-- | 2010-01-07 |
-- | 2010-01-08 |
-- | 2010-01-09 |
-- | 2010-01-10 |
-- +-------------+--+

From there on, you can just create whatever column you feel like. Quarter column? floor(1+ month(d)/4) as quarter. Long name of the week? date_format(d, 'EEEE') as dayname_of_week_long.

As a bonus, I give you the same table but with hours added. The principles are exactly the same, with a cartesian join beween dates and hour:

set hivevar:start_day=2010-01-01;
set hivevar:end_day=2010-01-02;
set hivevar:timeDimTable=default.timeDim;

create table if not exists ${timeDimTable} as<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;">&#65279;</span>
with dates as (
  select date_add("${start_day}", a.pos) as d
  from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a
),
hours as (
  select a.pos as h
  from (select posexplode(split(repeat("o", 23), "o"))) a
)
select
    from_unixtime(unix_timestamp(cast(d as timestamp)) + (h * 3600)) as dt
  , d as d
  , year(d) as year
  , month(d) as month
  , day(d) as day
  , h as hour
  , date_format(d, 'u') as daynumber_of_week
  , date_format(d, 'EEEE') as dayname_of_week
  , date_format(d, 'D') as daynumber_of_year

from dates
join hours
sort by dt
;

Alter location of a Hive table

Long story short: the location of a hive managed table is just metadata, if you update it hive will not find its data anymore. You do need to physically move the data on hdfs yourself.

Short story long:

You can decide where on hdfs you put the data of a table, for a managed table:

create table if not exists tstloc (id bigint)
clustered by (id) into 4 buckets
stored as orc
location 'hdfs:///tmp/ttslocorig'
tblproperties ("transactional"="true");
insert into tstloc values(1);
select * from tstloc;

Now if you want to move this table to another location for any reason, you might run the following statement:

alter table tstloc set location 'hdfs:///tmp/ttslocnew';

But then the table is empty!

select * from tstloc;

will return an empty set. The reason is that the location property is only metadata, telling hive where to look without any effect on said location (except at creation time, where the location will be created if it does not exist for managed tables). If nothing happens to be there, hive will not return anything. Conversely, if it happens to be something, hive will return this something.

To get your data back, you just need to physically move the data on hdfs at the expected location:

hdfs dfs -mv /tmp/ttslocorig /tmp/ttslocnew

 

 

Compression of ORC tables in Hive

I only use ORC tables in Hive, and while trying to understand some performance issues I wanted to make sure my tables where properly compressed. This is easy, just run

desc extended table;

and search the output for the string

compressed:true

Well, it turned out that it was false for all my tables although I was pretty sure I set up everything correctly, so I dug and experimented a bit. I generated an easy to compress data set, and load it in a few different tables with different options.

# create 1 csv, 500MB of easy to compress data
yes '1,longish string which will compress really well' | head -n 10000000 > /tmp/source.csv

# Copy this file in hdfs
hdfs dfs -mkdir /tmp/compressiontest
hdfs dfs -copyFromLocal /tmp/source.csv /tmp/compressiontest/source.csv

Then I loaded this data in 2 tables, compressed and uncompressed, directed with the setting hive.exec.compress.output.


CREATE EXTERNAL TABLE sourcedata (id INT, s STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/tmp/compressiontest'
;
MSCK REPAIR TABLE sourcedata;

CREATE TABLE shouldbecompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldbecompressed';

CREATE TABLE shouldbeuncompressed (id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldbeuncompressed';

set hive.exec.compress.output=true;
INSERT INTO shouldbecompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldbecompressed;

set hive.exec.compress.output=false;
INSERT INTO shouldbeuncompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldbeuncompressed;

I still have compressed:false, but what happens on disk?

hdfs dfs -du -s -h /tmp/should\*

42.5 K /tmp/shouldbecompressed
39.8 K /tmp/shouldbeuncompressed

Hum, apparently both tables are compressed? It turned out that I forgot about an orc parameter (orc.compress), set by default to ZLIB for me. The other valid values are SNAPPY or NONE. So let’s try again:

CREATE TABLE shouldreallybecompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldreallybecompressed'
TBLPROPERTIES ("orc.compress"="ZLIB")
;

CREATE TABLE shouldreallybeuncompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldreallybeuncompressed'
TBLPROPERTIES ("orc.compress"="NONE")
;

set hive.exec.compress.output=true;
INSERT INTO shouldreallybecompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldreallybecompressed;

set hive.exec.compress.output=false;
INSERT INTO shouldreallybeuncompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldreallybeuncompressed;
hdfs dfs -du -s -h /tmp/should\*

42.5 K /tmp/shouldbecompressed
39.8 K /tmp/shouldbeuncompressed
38.8 K /tmp/shouldreallybecompressed
3.8 M /tmp/shouldreallybeuncompressed

So indeed, the uncompressed table is less compressed, but is still a far cry from the 500MB I expected.

Long story short, ORC does some compression on its own, and the parameter orc.compress is just a cherry on top. on a side note, using SNAPPY instead of ZLIB the data size was 197k instead of 44k.

To look even deeper, hive on the command line has an option –orcfiledump, which will give some metadata about an orc file. So looking at a compressed file:

hive --orcfiledump /tmp/shouldbecompressed/000007_0

We can see, among other lines:

# yes, compressed!
Compression: ZLIB

# This is the buffer size, nothing to do with actual data size
Compression size: 262144

File length: 5459 bytes

For an uncompressed file:

hive --orcfiledump /tmp/shouldreallybeuncompressed/000000_0

Compression: NONE
File length: 136741 bytes

Long story short, the output of desc extended regarding compression is useless. And all my tables are indeed compressed.

This example was a bit artificial as the source file was very compressible. With another source file more random, generated as follow:


cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold | head -c 500000k | awk '{print "1," $0}'> source.csv

Then the size on disk becomes:

370.4 M /tmp/shouldbecompressed
370.4 M /tmp/shouldbeuncompressed
370.4 M /tmp/shouldreallybecompressed
490.0 M /tmp/shouldreallybeuncompressed

And just because I am nice, here are the lines to clean up your droppings:

drop table shouldbecompressed;
drop table shouldbeuncompressed;
drop table shouldreallybeuncompressed;
drop table shouldreallybecompressed;
drop table sourcedata;

Easy import from Hive to Vertica

Properly setup, Vertica can connect to Hcatalog, or read hdfs files. This does require some DBA work, though.

If you want to easily get data fro Hive to Vertica, you can use the COPY statement with the LOCAL STDIN modifier and pipe the output of Hive to the input of Vertica. Once you add a dd in the middle to prevent the stream to just stop after a while, this works perfectly. I am not so sure why dd is needed, but I suppose it buffers data and makes the magic happen.

hive -e "select whatever FROM wherever" | \
dd bs=1M | \
/opt/vertica/bin/vsql -U $V_USERNAME -w $V_PASSWORD -h $HOST $DB -c \
"COPY schema.table FROM LOCAL STDIN DELIMITER E'\t' NULL 'NULL' DIRECT"

Of course, the previous statement needs to be amended to use your own user, password and database.

The performance are quite good with this, although I cannot give a good benchmark as in our case the hive statement was not trivial.

One thing to really take care of is where you run this statement. You can run it from everywhere as long as hive and Vertica are accessible, but be aware that data will flow from hive to your server to Vertica. Running this statement on a Vertica node or your hive server will reduce the network traffic and might speed up things.

This post is based on my answer to a question on stackoverflow.

Avro end to end in hdfs – part 3: Hive

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume
  3. How to use avro with hive (this post)
  4. Problems and solutions

Use avro in Hive

Once your table is created, and data is loaded, there is nothing extra to do, you can just query it as you would any other table.

Create the table

Creating the table can be done as follow, with some comments:

-- table name
CREATE EXTERNAL TABLE IF NOT EXISTS table_name

-- Partition according to the end of the path you set in the flume sink (hdfs.path option).
-- Following the example form previous post, we would have
PARTITIONED BY (key STRING)

-- Avro!
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

-- Matches the first part of hdfs.path set up in the flume sink
-- Following the example of the previous post, we would have
LOCATION '/datain/logs'

-- Other options here are to hardcode the schema or use a file on your local filesystem instead.
TBLPROPERTIES ('avro.schema.url'='hdfs:///schemas/schema.avsc');

More information can be found on the cloudera documentation about hive and avro.

Load the snappy jar

To load data, you need to tell Hive that the data files will be compressed, and Hive needs to know how to decompress. For this, you need to add the snappy jar to the list of extra jars loaded by Hive. This is done by adding the path to the snappy jar to the value to the hive.aux.jars.path property of your hive-site.xml. For instance:

<property>
  <name>hive.aux.jars.path</name>
  <value>file:////usr/lib/hive/lib/hive-contrib.jar,...,file:////usr/lib/hive/lib/auxlib/snappy-java-1.0.4.1.jar</value>;
</property>

Actually load data

You need to tell hive to use snappy, which is done the following way:

SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

Then loading data means creating a new partition when a new directory is created with another key. Run this after having told hive to use snappy:

ALTER TABLE table_name
ADD IF NOT EXISTS PARTITION /datain/logs/key=some_new_key
LOCATION '/datain/logs';

Using data with the default schema

If you use a custom schema, tailored to your data, you can then enjoy the full speed of Hive, as not much parsing will be needed by Hive to access your data.

If you use the default schema, then Hive does not know (yet) about the columns in your table. This can be fixed by the decode() function. For instance,

SELECT
hour, decode(body,'UTF-8') as body
FROM my_table

Accessing Hive via JDBC with DbVisualiser

Hive has a SQL interface via jdbc, you can thus connect to it with your usual tools (DBVis, Squirrel). I will explain here how to connect via DBVis, but the ideas are very similar when connecting with any other tool.

Create a new driver

From Tool > Driver manager, click the top button with a green ‘+’ to add a driver as shown in the following screenshot.

Fill up then the ‘Name’ (this is for you only, the value can be different than the one shown below) and ‘URL format’ fields as shown.

Note that in the picture the port is 9000, usually the default hive port is 10000.

New Driver

Add the relevant jars

You need to load a set of jars to be able to connect to Hive. Make sure that you use exactly the same version for hive-jdbc and hive-service as the one on your hive server. I usually take a copy of all the jars and put them in one directory, making my life easier. You can get the jars from your hive’s installation directory:

  • On a Mac with homebrew, after having installed hive you can find them under /usr/local/Cellar.
  • On a rpm-based linux, from the cloudera rpms (install hive) you can find them somewhere under somewhere under /usr/lib
  • Directly in your downloaded folders if you get the binary version of hive from cloudera.

Note that as version number might slightly differ I do not write them in the jar names below.

  • You will need:
    • commons-logging.jar
    • hive-jdbc.jar
    • hive-service.jar
    • libthrift.jar
    • slf4j-api.jar
    • slf4j-log4j12.jar

Click then the ‘open icon’ as shown as in the previous screenshot, and select all the jars you uncompressed in the previous step. This will fill the ‘Driver Class’ dropdown box, you need to choose ‘org.apache.hive.jdbc.HiveDriver’.

driverclass

Create a new database connection

Right click on Connection, select ‘New connection’ (see 1 in the picture below.)
Fill up then the details pointed by 2 in the picture. The name is for your reference only, select the Driver name you just set up in the previous phase, and fill up the database URL as shown.

Click then connect (or reconnect if you are trying again), pointed by 3 in the picture. You should see some output as displayed in the connection message.

Database Connection

You are now good to go!