Create a time dimension table in pure hive SQL

Without further ado, here is the full SQL to create a table giving you one row per day, with date, year, month, day, day and name of the week, day of the year. If you want the hours as well, look at the bottom of this post.

set hivevar:start_day=2010-01-01;
set hivevar:end_day=2050-12-31;
set hivevar:timeDimTable=default.timeDim;

create table if not exists ${timeDimTable} as
with dates as (
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a
)
select
    d as d
  , year(d) as year
  , month(d) as month
  , day(d) as day
  , date_format(d, 'u') as daynumber_of_week
  , date_format(d, 'EEEE') as dayname_of_week
  , date_format(d, 'D') as daynumber_of_year

from dates
sort by d
;

Note that I use d as date column because date is a reserved keyword.

The biggest issue is to generate one row per day. The trick here is to use a clever combination of posexplode, split and reapeat. This is what the first CTE does:

-- just 10 days for the example
set hivevar:start_day=2010-01-01;
set hivevar:end_day=2010-01-10;
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a

We can break it down in a few parts:

select datediff("${end_day}", "${start_day}");
-- output: 9

Just computes the difference between start and end day in days.

select repeat("o", 9);
-- output: ooooooooo

Will output a string with 9 ‘o’. The actual character does not matter at all.

select split("ooooooooo", "o");
-- output:  ["","","","","","","","","",""]

Creates a hive array of 9 (empty) strings.

select posexplode(split("ooooooooo", "o"));
-- output:
-- +------+------+--+
-- | pos | val |
-- +------+------+--+
-- | 0 | |
-- | 1 | |
-- | 2 | |
-- | 3 | |
-- | 4 | |
-- | 5 | |
-- | 6 | |
-- | 7 | |
-- | 8 | |
-- | 9 | |
-- +------+------+--+

Actually create a row per array element, with the index (0 to 9) and the value (nothing) of each element.

That was the tricky part, the rest is easy. The first CTE creates a row with each date, adding the array index (in day) to the start_day:

with dates as (
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a)
select * from dates;
-- +-------------+--+
-- | dates.d |
-- +-------------+--+
-- | 2010-01-01 |
-- | 2010-01-02 |
-- | 2010-01-03 |
-- | 2010-01-04 |
-- | 2010-01-05 |
-- | 2010-01-06 |
-- | 2010-01-07 |
-- | 2010-01-08 |
-- | 2010-01-09 |
-- | 2010-01-10 |
-- +-------------+--+

From there on, you can just create whatever column you feel like. Quarter column? floor(1+ month(d)/4) as quarter. Long name of the week? date_format(d, 'EEEE') as dayname_of_week_long.

As a bonus, I give you the same table but with hours added. The principles are exactly the same, with a cartesian join beween dates and hour:

set hivevar:start_day=2010-01-01;
set hivevar:end_day=2010-01-02;
set hivevar:timeDimTable=default.timeDim;

create table if not exists ${timeDimTable} as<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>
with dates as (
  select date_add("${start_day}", a.pos) as d
  from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a
),
hours as (
  select a.pos as h
  from (select posexplode(split(repeat("o", 23), "o"))) a
)
select
    from_unixtime(unix_timestamp(cast(d as timestamp)) + (h * 3600)) as dt
  , d as d
  , year(d) as year
  , month(d) as month
  , day(d) as day
  , h as hour
  , date_format(d, 'u') as daynumber_of_week
  , date_format(d, 'EEEE') as dayname_of_week
  , date_format(d, 'D') as daynumber_of_year

from dates
join hours
sort by dt
;
Advertisements

Alter location of a Hive table

Long story short: the location of a hive managed table is just metadata, if you update it hive will not find its data anymore. You do need to physically move the data on hdfs yourself.

Short story long:

You can decide where on hdfs you put the data of a table, for a managed table:

create table if not exists tstloc (id bigint)
clustered by (id) into 4 buckets
stored as orc
location 'hdfs:///tmp/ttslocorig'
tblproperties ("transactional"="true");
insert into tstloc values(1);
select * from tstloc;

Now if you want to move this table to another location for any reason, you might run the following statement:

alter table tstloc set location 'hdfs:///tmp/ttslocnew';

But then the table is empty!

select * from tstloc;

will return an empty set. The reason is that the location property is only metadata, telling hive where to look without any effect on said location (except at creation time, where the location will be created if it does not exist for managed tables). If nothing happens to be there, hive will not return anything. Conversely, if it happens to be something, hive will return this something.

To get your data back, you just need to physically move the data on hdfs at the expected location:

hdfs dfs -mv /tmp/ttslocorig /tmp/ttslocnew

 

 

Compression of ORC tables in Hive

I only use ORC tables in Hive, and while trying to understand some performance issues I wanted to make sure my tables where properly compressed. This is easy, just run

desc extended table;

and search the output for the string

compressed:true

Well, it turned out that it was false for all my tables although I was pretty sure I set up everything correctly, so I dug and experimented a bit. I generated an easy to compress data set, and load it in a few different tables with different options.

# create 1 csv, 500MB of easy to compress data
yes '1,longish string which will compress really well' | head -n 10000000 > /tmp/source.csv

# Copy this file in hdfs
hdfs dfs -mkdir /tmp/compressiontest
hdfs dfs -copyFromLocal /tmp/source.csv /tmp/compressiontest/source.csv

Then I loaded this data in 2 tables, compressed and uncompressed, directed with the setting hive.exec.compress.output.


CREATE EXTERNAL TABLE sourcedata (id INT, s STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/tmp/compressiontest'
;
MSCK REPAIR TABLE sourcedata;

CREATE TABLE shouldbecompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldbecompressed';

CREATE TABLE shouldbeuncompressed (id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldbeuncompressed';

set hive.exec.compress.output=true;
INSERT INTO shouldbecompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldbecompressed;

set hive.exec.compress.output=false;
INSERT INTO shouldbeuncompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldbeuncompressed;

I still have compressed:false, but what happens on disk?

hdfs dfs -du -s -h /tmp/should\*

42.5 K /tmp/shouldbecompressed
39.8 K /tmp/shouldbeuncompressed

Hum, apparently both tables are compressed? It turned out that I forgot about an orc parameter (orc.compress), set by default to ZLIB for me. The other valid values are SNAPPY or NONE. So let’s try again:

CREATE TABLE shouldreallybecompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldreallybecompressed'
TBLPROPERTIES ("orc.compress"="ZLIB")
;

CREATE TABLE shouldreallybeuncompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldreallybeuncompressed'
TBLPROPERTIES ("orc.compress"="NONE")
;

set hive.exec.compress.output=true;
INSERT INTO shouldreallybecompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldreallybecompressed;

set hive.exec.compress.output=false;
INSERT INTO shouldreallybeuncompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldreallybeuncompressed;
hdfs dfs -du -s -h /tmp/should\*

42.5 K /tmp/shouldbecompressed
39.8 K /tmp/shouldbeuncompressed
38.8 K /tmp/shouldreallybecompressed
3.8 M /tmp/shouldreallybeuncompressed

So indeed, the uncompressed table is less compressed, but is still a far cry from the 500MB I expected.

Long story short, ORC does some compression on its own, and the parameter orc.compress is just a cherry on top. on a side note, using SNAPPY instead of ZLIB the data size was 197k instead of 44k.

To look even deeper, hive on the command line has an option –orcfiledump, which will give some metadata about an orc file. So looking at a compressed file:

hive --orcfiledump /tmp/shouldbecompressed/000007_0

We can see, among other lines:

# yes, compressed!
Compression: ZLIB

# This is the buffer size, nothing to do with actual data size
Compression size: 262144

File length: 5459 bytes

For an uncompressed file:

hive --orcfiledump /tmp/shouldreallybeuncompressed/000000_0

Compression: NONE
File length: 136741 bytes

Long story short, the output of desc extended regarding compression is useless. And all my tables are indeed compressed.

This example was a bit artificial as the source file was very compressible. With another source file more random, generated as follow:


cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold | head -c 500000k | awk '{print "1," $0}'> source.csv

Then the size on disk becomes:

370.4 M /tmp/shouldbecompressed
370.4 M /tmp/shouldbeuncompressed
370.4 M /tmp/shouldreallybecompressed
490.0 M /tmp/shouldreallybeuncompressed

And just because I am nice, here are the lines to clean up your droppings:

drop table shouldbecompressed;
drop table shouldbeuncompressed;
drop table shouldreallybeuncompressed;
drop table shouldreallybecompressed;
drop table sourcedata;

Variables in Hive

I will here explain how to set and use variables in hive.

How to set a variable

Just use the keyword set

set foo=bar;
set system:foo=bar

Alternatively, for the hiveconf namespace you can set the variable on the command line:

beeline --hiveconf foo=bar

How to use a variable

Wherever you want to use a value, use this syntax instead: ${namespace:variable_name}.  if the namespace is hivevar, it can be ommited. For instance:

select '${hiveconf:foo}', '${system:foo}', '${env:CLASSPATH}', ${bar};

Note that variables will be replaced before anything else happens. This means that this is perfectly valid:

set hivevar:t=employees;
set hivevar:verb=desc;
${verb} ${t};

But this will not do what you expect (hint: you will end up with 4 quotes in your select statement):

set hivevar:s='Hello world';
select '${s}';

 

Furthermore, it means that you need to take care of your data type. As selecting a bare string is not valid, so is the following code invalid as well:

set hivevar:v=astring;
select ${v};

You will get:

Error: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:7 Invalid table alias or column reference ‘astring’: (possible column names are: ) (state=42000,code=10004)

In our case, you just need to quote the variable.

Note that this would work as-is with an int as a bare int is valid in the select statement.

Another caveat is to make sure the variable exists, otherwise you will either get the variable literal for quoted variables:

select ‘${donotexists}’;
+————————–+–+
| _c0 |
+————————–+–+
| ${donotexists} |
+————————–+–+

Either an unhelpful message for unquoted variables:

> select ${doesnotexist};
Error: Error while compiling statement: FAILED: ParseException line 1:7 cannot recognize input near ‘$’ ‘{‘ ‘hiveconf’ in select clause (state=42000,code=40000)

If you only want to see the value of a variable, you can just use set as well:

set hiveconf:foo;

How to List variables

Just use SET;, but this will output a massive unreadable list. You are better off redirecting this output to a file, e.g.

beeline -e 'SET;' | sed 's/\s\+/ /g'> set.out

Note that I squash the spaces here. As the columns are aligned and some values are very long strings, squashing makes reading much easier.

Then if you want to see a specific set of variables, you can just run:

# system variables
grep '| system:' set.out

# Env variables
grep '| env:' set.out

# other variables
cat set.out | grep -v '| env:' | grep -v '| system:'

Namespaces

Hive has 4 namespaces for variables: hivevar, hiveconf, system and env.

Hivevar

Hivevar is the easiest namespace to use, as you do not need to explicitly mention it when using a variable.

set hivevar:foo=bar;
select "${foo}";

Hiveconf

Hiveconf is the namespace used when you use set without explicit namespace or when you give a variable on the command line with –hiveconf foo=bar. Note that you can set those without specifying the namespace, but you always need to specify the namespace when using them.

set foo=bar;
select "${hiveconf:foo}";

env

This is the namespace of the shell environment variables. You can easily get them with the ${env} prefix:

SELECT "${env:hostname}";

I specifically chose this variable. If you run this query yourself, you will see that it is the environment of the hive server which is used, not the environemnt of your client. This limits a lot the use of environment variables.

Note that environment variables cannot be set.

system

Those will contain for instance jvm settings, logfile destinations and more.

 

Count lines of multiple files in hdf

This is a situation often popping up here and there and which I had recently. Given a bunch of files on hdfs, how to get the total line count?

It turns out that there are a lot of options which I will explain here. The goal is to count the lines of all csv files in a a specific directory, $d.

After ll the solution I will show some benchmarking.

Options

The sysadmin way

Set up a fuse mountpoint to hdfs (I always do that, it is very stable, uses no resource and helps for many things) at $mountpoint.


wc -l $mountpoint$d/*.csv | grep total

Easy and sweet, but has the drawback that all data is downloaded locally before being processed.

The dutiful way

We are using hadoop, so let’s use hadoop command line.

hdfs dfs -ls $d/\*.csv | awk '{print $8}' | xargs hdfs dfs -cat | wc -l

Note that this use xargs. If you have too many files you might need to build a loop in, but invoking the hdfs command many times will incur a lot of jvm startup time penalty.

The issue here is as well that all data is downloaded locally.

The old school way

Let’s just use a streaming job:

hdfs hadoop jar $HADOOP_MAPRED_HOME/hadoop-streaming.jar \
     -Dmapred.reduce.tasks=1 \
     -input $d/\*.csv \
     -output /tmp/streamout \
     -mapper "bash -c 'paste <(echo "count") <(wc -l) '" \
     -reducer "bash -c 'cut -f2 | paste -sd+ | bc'"

The only issue with a streaming job is that the output is written in a file on hdfs (the -output parameter) not on stdout.

The tormented soul

Streaming jobs are nice but this bash is ugly and python is cool for data, right?

import sys
cnt=0
for l in sys.stdin:
    cnt+=1
print("count\t{}".format(cnt))
import sys
cnt=0
for l in sys.stdin:
 cnt+=l.split('\t')[1]
print(cnt)

And then just run.

hadoop jar $HADOOP_MAPRED_HOME/hadoop-streaming.jar \
   -Dmapred.reduce.tasks=1 \
   -input $d/\*.csv \
   -output /tmp/streamout \
   -mapper "/usr/bin/python3 mapper.py $*" \
   -reducer "/usr/bin/python3 reducer.py $*"

Connoisseur

Hey do not forget about pig!

csvs = LOAD '$d/*.csv';
csvgroup = GROUP csvs ALL;
cnt = FOREACH csvgroup GENERATE COUNT(csvs);
dump cnt;

Run with:

pig cnt.pig

Hipster

Pyspark is all the rage

f=sc.textFile("$d/*.csv")
cnt=f.count()
print cnt

Benchmarking

Ok this was fun, but what works best?

I ran 2 sets of test, one on a directory with a few (~5) big files, one on a directory with many (250) tiny files. I ran the count 10 times for each test, the results are in seconds and lower is better.

Few big files Many small files
Mean Median Stdev Mean Median Stdev
Sysadmin 49.6 46.5 12.17 1 1 0
Dutiful 49.8 46.5 11.71  5.1 5 0.32
Connoisseur 25.7 25 2.31 17.2 17 0.42
Old school 21.5 21.5 0.53 47.7 48 1.06
Tormented 28.3 28 0.82 39.9 39.5 1.1
Hipster 20.2 20 1.03 9.4 9 0.52

A few things are apparent:

  • Different use cases will call for different options,
  • When data is downloaded (sysadmin via the mountpoint or dutiful via hdfs -cat) the variance is higher. Weirdly enough for my tests the both started at about 37 seconds, and increased at every run by 5 seconds to stay stable at 63 seconds,
  • pyspark is the clear winner.

Hadoop metrics in graphite

I will not present graphite here, if you end up reading this I assume you already have a graphite instance up and running. If not it is a matter of less than an hour to have a usable instance.

Hadoop uses metrics2 which allows multiple metrics output plugins to be used in parallel, supports dynamic reconfiguration of metrics plugins, provides metrics filtering, and allows all metrics to be exported via JMX.

Those metrics can be very easily exported to graphite to then be sliced and diced to your heart’s content.

You only need to modify the file hadoop-metrics2.properties by adding the following snippet:

# Sampling period
*.period=10

# Grahite sink class
*.sink.graphite.class=org.apache.hadoop.metrics2.sink.GraphiteSink

# Location of your graphite instance
*.sink.graphite.server_host=10.x.x.x
*.sink.graphite.server_port=2003

# Define for each metric group (* in *.prefix) how it should be named
# in graphite (part after the =)
datanode.sink.graphite.metrics_prefix=hadoop.datanode
namenode.sink.graphite.metrics_prefix=hadoop.namenode
resourcemanager.sink.graphite.metrics_prefix=hadoop.resourcemanager
nodemanager.sink.graphite.metrics_prefix=hadoop.nodemanager
jobhistoryserver.sink.graphite.metrics_prefix=hadoop.jobhistoryserver
journalnode.sink.graphite.metrics_prefix=hadoop.journalnode
maptask.sink.graphite.metrics_prefix=hadoop.maptask
reducetask.sink.graphite.metrics_prefix=hadoop.reducetask
applicationhistoryserver.sink.graphite.metrics_prefix=hadoop.applicationhistoryserver

In Ambari, just go to HDFS > Config > Advanced hadoop-metrics2.properties, the location for other distributions should be trivial to find.

After that restart hdfs and all relevant services you asked to monitor (if you asked to monitor resourcemanager, restart the resource managers and so on).

That’s it, you’re set.

If you are on HDP, you can go a bit further. HDP actually ships with a grafana instance (if you installed Ambari metrics) which can use graphite a data source. Data will be the same, display will be a tad prettier.

This uses graphite web (port 80 per default) which needs to enable CORS. You can do it in apache (the default graphite web http server) by adding this line in your graphite vhost:

Header set Access-Control-Allow-Origin "*"

 

Extracting queries from Hive logs

Hive logs are very verbose, and I personally find it a pain to wade through them when I try to understand which queries my ETL tool decided to generate.

To help with this, I created this small python script which looks at hive logs files and output the SQL queries and only the queries, with some information about them if known: time started, duration, success.

Usage:

./hqe.py --help 
usage: hqe.py [-h] [--since SINCE] [--to TO] [--logdir LOGDIR]
              [--glob LOGFILE_GLOB]
              [--loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}]

Displays queries ran on Hive.

optional arguments:
 -h, --help show this help message and exit
 --since SINCE how far to look back. (default: 15m)
 --to TO How far to look forward. (default: now)
 --logdir LOGDIR Directory of hive log files. (default: /var/log/hive)
 --glob LOGFILE_GLOB Shell pattern of hive logfiles inside their logdir.
 (default: hiveserver2.log*)
 --loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}, -l {DEBUG,INFO,WARNING,ERROR,CRITICAL}
 Log level. (default: warn)

Sample output:

Started at 2017-06-22 05:30:58 for 12.788000s by hive on ip-10-0-0-10.eu-west-2.compute.internal (Probably success). (Thread id: 79733, query id: hive_20170622053058_676612af-7bb8-4c4b-8fce-51bd1ae7be71, txn id: 0):
SELECT  
 id,
 count(*)
FROM 
 raw.event
GROUP BY
 1 
ORDER BY -- required for next step
 sys_partition

Started at 2017-06-22 05:31:25 for 0.018000s by Unknown on Unknown (Probably success). (Thread id: 79770, query id: hive_20170622053125_7d8e644a-5c23-4ca8-ab0f-20becdd65c3b, txn id: Unknown):
use events

Started at 2017-06-22 05:31:25 for Unknowns by Unknown on Unknown (FAILED). (Thread id: handler-46, query id: Unknown, txn id: Unknown):
MERGE INTO mart.click dst
USING (
 SELECT
 [big sql...]
 ) as r

FROM
 raw.click
 WHERE
 ${SEQ_CHECKER_SQL}
) src
ON

 [big sql...]

WHEN NOT MATCHED THEN INSERT VALUES (
  [more sql]
)
Error: ParseException line 36:4 cannot recognize input near '$' '{' 'SEQ_CHECKER_SQL' in expression specification

As you can see:

  • If user, hostname and duration are know they are displayed,
  • query is displayed with the same formatting as it was sent, inclusive comments,
  • error (if any) is showed. In my case, a variable is not expanded by the ETL tool.

You can find the source on github.