Find a timezone offset in pure SQL in hive

Timezones are a pain. This is not new and every time you deviate from UTC this will bite you. That said sometimes you have to deviate from UTC, specially for the final display of a date if you want to show it in the local timezone from the reader. In that case adding an offset to be explicit will save some questions and uncertainty down the line.

There is no function get_offset_from_tz() in Hive, sadly. Using reflect() does not work either as the method call is to complex for reflect. Writing a UDF would be possible but feels overkill.

The solution I give here works in Hive and should probably work in all SQL variants as well apart from the variables.

The algorithm to find the offset is easy:

  • get the time in utc,
  • get the same in another timezone,
  • substract one from the other to get the offset,
  • format the offset in a standard way.

The main issue is that you cannot assign results to variables in SQL, meaning that many computations need to be duplicated. They will be optimised away, of course, but they make for an ugly code.

In hive, luckily, you can use variables. They cannot store results but are used as-is, a bit like macros, where the variable name is just replaced by its content which can be some piece of code.

This sets up the date to find the offset for as well as a few TZ for test.

-- Date to display. If you use this from a table you can
-- put here the column that would be used, eg. t.logdate.
set hivevar:D='2018-06-01 01:02:02';

-- A few tests:
-- positive offset +02:00 (in summer)
set hivevar:DISPLAY_TZ='Europe/Amsterdam';

-- negative offset -04:00 (in summer)
set hivevar:DISPLAY_TZ='America/New_York';

-- 0 offset
set hivevar:DISPLAY_TZ='UTC';

-- Non integer offset: +09:30
set hivevar:DISPLAY_TZ='Australia/Adelaide';

Those are the macros


-- Date displayed in the right TZ
set hivevar:dateintz=DATE_FORMAT(FROM_UTC_TIMESTAMP(${D}, ${DISPLAY_TZ}),"yyyy-MM-dd HH:mm:ss");
-- Offset in interval type
set hivevar:delta=cast(${dateintz} as timestamp) - cast(${D} as timestamp);

And the code itself, tiny and readable once variables are used:

select
  concat(
    -- date in TZ
    ${dateintz}

    -- sign
    , if(${delta} < interval '0' minute, '-', '+')

    -- hour
    , lpad(abs(hour(${delta})), 2, 0)

    , ':'

    -- minute
    ,lpad(minute(${delta}), 2, 0)
) as dtwithoffset
;

et voila

Advertisements

Hive, map, NULL and NPE

Checking for null values in a map column in Hive (1.2.1, Hortonworks) interestingly returns a null pointer exception:

create table npe (m map<bigint, bigint>);
select count(*) from npe where m is null;
Error: Error while compiling statement: FAILED: NullPointerException null (state=42000,code=40000)

The error happens at parsing time, when Hive tries to estimate data size. From hiveserver2.log:

Caused by: java.lang.NullPointerException
at org.apache.hadoop.hive.ql.stats.StatsUtils.getSizeOfMap(StatsUtils.java:1096)

Interestingly, getting not null is fine:

select count(*) from npe where m is not null; -- returns 0

If you think like me, you will think ‘haha! not not null should work!’

select count(*) from npe where not m is not null; -- does not work

If you are smarter than me, you will have guessed before trying that Hive optimises the double negation away, and gives another NPE.

But going in this direction, we can still trick Hive by casting the boolean to int:

select count(*) from npe where int(m is not null)=0; -- works

This happens either without data either when there are real NULL in the table. By real NULL I mean that a SELECT would show NULL, which  happens only in the case where you add a column to an existing table. Indeed, you cannot yourself insert NULL into a complex column:

with a as (select null) insert into npe select * from a;
Error: Error while compiling statement: FAILED: SemanticException [Error 10044]: Line 1:36 Cannot insert into target table because column number/types are different 'npe': Cannot convert column 0 from void to map. (state=42000,code=10044)

You have to create an empty map object:

with a as (select map(cast(null as bigint), cast(null as bigint))) insert into npe select * from a;

Then of course the empty map object is not (a real) NULL and if you want to look for null you have to fudge a bit, looking at the size of the map for instance:

select m, size(m), isnull(m) from npe;
+----+-----+--------+--+
| m  | _c1 | _c2    |
+----+-----+--------+--+
| {} |  0  | false  |
+----+-----+--------+--+

Hive self merge

I had a (ORC) table with duplicated rows, which I wanted to remove. The query is quite simple:

merge into click as dst using (
    select
      -- For all unique clicks...
        client_name
      , contact_id
      , ts_utc
      -- ... find the duplicates (cnt>1 in having) ...
      , count(*) cnt
      -- ... remember the first one loaded ...
      , min(load_ts) as first_load
      from
        click
      group by
        1, 2, 3
      having cnt > 1
)
as src
-- ... once the first occurrence of the duplicates
-- is found find all the duplicates ...
on
        dst.client_name=src.client_name
    and dst.contact_id=src.contact_id
    and dst.ts_utc=src.ts_utc
-- ... and if it is not the first one loaded ...
when matched and src.first_load != dst.load_ts
-- .. delete it.
then delete
;

Trivial, right? Well it looks like you cannot do such a ‘self merge’ in hive. I ended up with this error:

java.lang.InterruptedException
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at org.apache.tez.runtime.InputReadyTracker$InputReadyMonitor.awaitCondition(InputReadyTracker.java:120)
 at org.apache.tez.runtime.InputReadyTracker.waitForAllInputsReady(InputReadyTracker.java:90)
 at org.apache.tez.runtime.api.impl.TezProcessorContextImpl.waitForAllInputsReady(TezProcessorContextImpl.java:116)
 [...]

The solution, once understood that a self merge is not allowed, is of course obvious: use a temporary table. Splitting my merge statement in 2 did the trick.

create temporary table clickdups stored as orc as select
        client_name
      , contact_id
      , ts_utc
      , count(*) cnt
      , min(load_ts) as first_load
      from
        click
      group by
        1, 2, 3
      having cnt > 1
;

merge into click as dst using clickdups
as src
on
        dst.client_name=src.client_name
    and dst.contact_id=src.contact_id
    and dst.ts_utc=src.ts_utc
when matched and src.first_load != dst.load_ts
then delete
;

On a side note I needed to tweak a lot the self-merge to prevent out of memory error. Those did not happen at all using the 2 steps solution.

Create a time dimension table in pure hive SQL

Without further ado, here is the full SQL to create a table giving you a table with one row per day, with date, year, mont, day, day and name of the week, day of the year. If you want the hours as well, look at the bottom of this post.

set hivevar:start_day=2010-01-01;
set hivevar:end_day=2050-12-31;
set hivevar:timeDimTable=default.timeDim;

create table if not exists ${timeDimTable} as
with dates as (
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a
)
select
    d as d
  , year(d) as year
  , month(d) as month
  , day(d) as day
  , date_format(d, 'u') as daynumber_of_week
  , date_format(d, 'EEEE') as dayname_of_week
  , date_format(d, 'D') as daynumber_of_year

from dates
sort by d
;

Note that I use d as date column because date is a reserved keyword.

The biggest issue is to generate one row per day. The trick here is to use a clever combination of posexplode, split and reapeat. This is what the first CTE does:

-- just 10 days for the example
set hivevar:start_day=2010-01-01;
set hivevar:end_day=2010-01-10;
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a

We can break it down in a few parts:

select datediff("${end_day}", "${start_day}");
-- output: 9

Just computes the difference between start and end day in days.

select repeat("o", 9);
-- output: ooooooooo

Will output a string with 9 ‘o’. The actual character does not matter at all.

select split("ooooooooo", "o");
-- output:  ["","","","","","","","","",""]

Creates a hive array of 9 (empty) strings.

select posexplode(split("ooooooooo", "o"));
-- output:
-- +------+------+--+
-- | pos | val |
-- +------+------+--+
-- | 0 | |
-- | 1 | |
-- | 2 | |
-- | 3 | |
-- | 4 | |
-- | 5 | |
-- | 6 | |
-- | 7 | |
-- | 8 | |
-- | 9 | |
-- +------+------+--+

Actually create a row per array element, with the index (0 to 9) and the value (nothing) of each element.

That was the tricky part, the rest is easy. The first CTE creates a row with each date, adding the array index (in day) to the start_day:

with dates as (
select date_add("${start_day}", a.pos) as d
from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a)
select * from dates;
-- +-------------+--+
-- | dates.d |
-- +-------------+--+
-- | 2010-01-01 |
-- | 2010-01-02 |
-- | 2010-01-03 |
-- | 2010-01-04 |
-- | 2010-01-05 |
-- | 2010-01-06 |
-- | 2010-01-07 |
-- | 2010-01-08 |
-- | 2010-01-09 |
-- | 2010-01-10 |
-- +-------------+--+

From there on, you can just create whatever column you feel like. Quarter column? floor(1+ month(d)/4) as quarter. Long name of the week? date_format(d, 'EEEE') as dayname_of_week_long.

As a bonus, I give you the same table but with hours added. The principles are exactly the same, with a cartesian join beween dates and hour:

set hivevar:start_day=2010-01-01;
set hivevar:end_day=2010-01-02;
set hivevar:timeDimTable=default.timeDim;

create table if not exists ${timeDimTable} as<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;">&#65279;</span>
with dates as (
  select date_add("${start_day}", a.pos) as d
  from (select posexplode(split(repeat("o", datediff("${end_day}", "${start_day}")), "o"))) a
),
hours as (
  select a.pos as h
  from (select posexplode(split(repeat("o", 23), "o"))) a
)
select
    from_unixtime(unix_timestamp(cast(d as timestamp)) + (h * 3600)) as dt
  , d as d
  , year(d) as year
  , month(d) as month
  , day(d) as day
  , h as hour
  , date_format(d, 'u') as daynumber_of_week
  , date_format(d, 'EEEE') as dayname_of_week
  , date_format(d, 'D') as daynumber_of_year

from dates
join hours
sort by dt
;

Alter location of a Hive table

Long story short: the location of a hive managed table is just metadata, if you update it hive will not find its data anymore. You do need to physically move the data on hdfs yourself.

Short story long:

You can decide where on hdfs you put the data of a table, for a managed table:

create table if not exists tstloc (id bigint)
clustered by (id) into 4 buckets
stored as orc
location 'hdfs:///tmp/ttslocorig'
tblproperties ("transactional"="true");
insert into tstloc values(1);
select * from tstloc;

Now if you want to move this table to another location for any reason, you might run the following statement:

alter table tstloc set location 'hdfs:///tmp/ttslocnew';

But then the table is empty!

select * from tstloc;

will return an empty set. The reason is that the location property is only metadata, telling hive where to look without any effect on said location (except at creation time, where the location will be created if it does not exist for managed tables). If nothing happens to be there, hive will not return anything. Conversely, if it happens to be something, hive will return this something.

To get your data back, you just need to physically move the data on hdfs at the expected location:

hdfs dfs -mv /tmp/ttslocorig /tmp/ttslocnew

 

 

Count bytes in a stream, in bash

I often have this kind of construct where a process generates a lot of data and a second one does something with it. Think for instance about a big select in a database, the output being a csv we want to compress.

‘SELECT a lot of data FROM a big table ‘ | gzip > data.csv.gz

I wanted to know the size of the original data (I know that in the case of gzip I can use the -l flag, but this is just an example).

There are 3 ways to do this. In my examples the big data process is yes | head -n 1000000000 which generates 1 billion rows (without IO, which is nice for benchmarking), the consumer process is just a dd which dumps everything in /dev/null.

awk

yes | head -n 1000000000 | awk '{print $0; count++} END{print count >"/dev/stderr";}' | dd bs=64M of=/dev/null

Good points:

  • Quite easy to read, semantically easy to understand,
  • does not duplicate the data stream.

Bad point

  • As the data still flows to STDOUT, the row count is printed on STDERR which is not ideal but is still usable afterwards.

Tee and wc

yes | head -n 1000000000 | tee >(dd bs=64M of=/dev/null 2>/dev/null) | wc -l | { read -r rowcount; }
echo $rowcount

Good point:

  • You get the rowcount in a variable, easy to use afterwards.

Bad points

  • Semantically weird, tricky to understand,
  • duplicate the data stream.

pv

yes | head -n 1000000 | pv -l | dd bs=64M of=/dev/null 2>/dev/null

Good points

  • Semantically pleasing,
  • pv has a lot of options which might be interesting.

Bad point

  • Nice for interactive use, but it displays a progress bar on STDERR so it’s next to impossible to get the output in a script.

Out of those 3 options, which one is the fastest?

After running each option 10 times, here are the results, in seconds.

awk tee pv
Mean 139.5 18.3  6516
Min 133 17  6446
Max 154 22 6587
Stdev 8.28 1.81  48.4

Yes, I double checked my data. We are indeed talking about 20 seconds for tee, 2:20 minutes for awk and about 1h45 for pv.

Compression of ORC tables in Hive

I only use ORC tables in Hive, and while trying to understand some performance issues I wanted to make sure my tables where properly compressed. This is easy, just run

desc extended table;

and search the output for the string

compressed:true

Well, it turned out that it was false for all my tables although I was pretty sure I set up everything correctly, so I dug and experimented a bit. I generated an easy to compress data set, and load it in a few different tables with different options.

# create 1 csv, 500MB of easy to compress data
yes '1,longish string which will compress really well' | head -n 10000000 > /tmp/source.csv

# Copy this file in hdfs
hdfs dfs -mkdir /tmp/compressiontest
hdfs dfs -copyFromLocal /tmp/source.csv /tmp/compressiontest/source.csv

Then I loaded this data in 2 tables, compressed and uncompressed, directed with the setting hive.exec.compress.output.


CREATE EXTERNAL TABLE sourcedata (id INT, s STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/tmp/compressiontest'
;
MSCK REPAIR TABLE sourcedata;

CREATE TABLE shouldbecompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldbecompressed';

CREATE TABLE shouldbeuncompressed (id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldbeuncompressed';

set hive.exec.compress.output=true;
INSERT INTO shouldbecompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldbecompressed;

set hive.exec.compress.output=false;
INSERT INTO shouldbeuncompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldbeuncompressed;

I still have compressed:false, but what happens on disk?

hdfs dfs -du -s -h /tmp/should\*

42.5 K /tmp/shouldbecompressed
39.8 K /tmp/shouldbeuncompressed

Hum, apparently both tables are compressed? It turned out that I forgot about an orc parameter (orc.compress), set by default to ZLIB for me. The other valid values are SNAPPY or NONE. So let’s try again:

CREATE TABLE shouldreallybecompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldreallybecompressed'
TBLPROPERTIES ("orc.compress"="ZLIB")
;

CREATE TABLE shouldreallybeuncompressed ( id INT, s STRING)
STORED AS ORC
LOCATION '/tmp/shouldreallybeuncompressed'
TBLPROPERTIES ("orc.compress"="NONE")
;

set hive.exec.compress.output=true;
INSERT INTO shouldreallybecompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldreallybecompressed;

set hive.exec.compress.output=false;
INSERT INTO shouldreallybeuncompressed SELECT * FROM sourcedata;
SELECT COUNT(*) FROM shouldreallybeuncompressed;
hdfs dfs -du -s -h /tmp/should\*

42.5 K /tmp/shouldbecompressed
39.8 K /tmp/shouldbeuncompressed
38.8 K /tmp/shouldreallybecompressed
3.8 M /tmp/shouldreallybeuncompressed

So indeed, the uncompressed table is less compressed, but is still a far cry from the 500MB I expected.

Long story short, ORC does some compression on its own, and the parameter orc.compress is just a cherry on top. on a side note, using SNAPPY instead of ZLIB the data size was 197k instead of 44k.

To look even deeper, hive on the command line has an option –orcfiledump, which will give some metadata about an orc file. So looking at a compressed file:

hive --orcfiledump /tmp/shouldbecompressed/000007_0

We can see, among other lines:

# yes, compressed!
Compression: ZLIB

# This is the buffer size, nothing to do with actual data size
Compression size: 262144

File length: 5459 bytes

For an uncompressed file:

hive --orcfiledump /tmp/shouldreallybeuncompressed/000000_0

Compression: NONE
File length: 136741 bytes

Long story short, the output of desc extended regarding compression is useless. And all my tables are indeed compressed.

This example was a bit artificial as the source file was very compressible. With another source file more random, generated as follow:


cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold | head -c 500000k | awk '{print "1," $0}'> source.csv

Then the size on disk becomes:

370.4 M /tmp/shouldbecompressed
370.4 M /tmp/shouldbeuncompressed
370.4 M /tmp/shouldreallybecompressed
490.0 M /tmp/shouldreallybeuncompressed

And just because I am nice, here are the lines to clean up your droppings:

drop table shouldbecompressed;
drop table shouldbeuncompressed;
drop table shouldreallybeuncompressed;
drop table shouldreallybecompressed;
drop table sourcedata;