Environment variables in Hive

I will here explain how to set and use variables in hive.

How to set a variable

Just use the keyword set

set foo=bar;
set system:foo=bar

Alternatively, for the hiveconf namespace you can set the variable on the command line:

beeline ----hiveconf foo=bar

How to use a variable

Wherever you want to use a value, use this syntax instead: ${namespace:variable_name}. For instance:

select '${hiveconf:foo}', '${system:foo}', '${env:CLASSPATH}';

Note that variables will be replaced before anything else happens. This means that this is perfectly valid:

set t=employees;
set verb=desc;
${hiveconf:verb} ${hiveconf:t};

But this will not do what you expect (hint: you will end up with 4 quotes in your select statement):

set s='Hello world';
select '${hiveconf:s}';

 

Furthermore, it means that you need to take care of your data type. As selecting a bare string is not valid, so is the following code invalid as well:

set v=astring;
select ${hiveconf:v};

You will get:

Error: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:7 Invalid table alias or column reference ‘astring’: (possible column names are: ) (state=42000,code=10004)

In our case, you just need to quote the variable.

Note that this would work as-is with an int as a bare int is valid in the select statement.

Another caveat is to make sure the variable exists, otherwise you will either get the variable literal for quoted variables:

select ‘${hiveconf:donotexists}’;
+————————–+–+
| _c0 |
+————————–+–+
| ${hiveconf:donotexists} |
+————————–+–+

Either an unhelpful message for unquoted variables:

> select ${hiveconf:doesnotexist};
Error: Error while compiling statement: FAILED: ParseException line 1:7 cannot recognize input near ‘$’ ‘{‘ ‘hiveconf’ in select clause (state=42000,code=40000)

If you only want to see the value of a variable, you can just use set as well:

set hiveconf:foo;

How to List variables

Just use SET;, but this will output a massive unreadable list. You are better off redirecting this output to a file, e.g.

beeline -e 'SET;' | sed 's/\s\+/ /g'> set.out

Note that I squash the spaces here. As the columns are aligned and some values are very long strings, squashing makes reading much easier.

Then if you want to see a specific set of variables, you can just run:

# system variables
grep '| system:' set.out

# Env variables
grep '| env:' set.out

# other variables
cat set.out | grep -v '| env:' | grep -v '| system:'

Namespaces

Hive has 3 namespaces for variables: hiveconf, system and env.

Hiveconf

Hiveconf is the namespace used when you use set or when you give a variable on the command line with –hiveconf foo=bar. Note that you can set those without specifying the namespace, but you always need to specify the namespace when using them.

set foo=bar;
select "${hiveconf:foo}";

env

This is the namespace of the shell environment variables. You can easily get them with the ${env} prefix:

SELECT "${env:hostname}";

I specifically chose this variable. If you run this query yourself, you will see that it is the environment of the hive server which is used, not the environemnt of your client. This limits a lot the use of environment variables.

Note that environment variables cannot be set.

system

Those will contain for instance jvm settings, logfile destinations and more.

 

Advertisements

Count lines of multiple files in hdf

This is a situation often popping up here and there and which I had recently. Given a bunch of files on hdfs, how to get the total line count?

It turns out that there are a lot of options which I will explain here. The goal is to count the lines of all csv files in a a specific directory, $d.

After ll the solution I will show some benchmarking.

Options

The sysadmin way

Set up a fuse mountpoint to hdfs (I always do that, it is very stable, uses no resource and helps for many things) at $mountpoint.


wc -l $mountpoint$d/*.csv | grep total

Easy and sweet, but has the drawback that all data is downloaded locally before being processed.

The dutiful way

We are using hadoop, so let’s use hadoop command line.

hdfs dfs -ls $d/\*.csv | awk '{print $8}' | xargs hdfs dfs -cat | wc -l

Note that this use xargs. If you have too many files you might need to build a loop in, but invoking the hdfs command many times will incur a lot of jvm startup time penalty.

The issue here is as well that all data is downloaded locally.

The old school way

Let’s just use a streaming job:

hdfs hadoop jar $HADOOP_MAPRED_HOME/hadoop-streaming.jar \
     -Dmapred.reduce.tasks=1 \
     -input $d/\*.csv \
     -output /tmp/streamout \
     -mapper "bash -c 'paste <(echo "count") <(wc -l) '" \
     -reducer "bash -c 'cut -f2 | paste -sd+ | bc'"

The only issue with a streaming job is that the output is written in a file on hdfs (the -output parameter) not on stdout.

The tormented soul

Streaming jobs are nice but this bash is ugly and python is cool for data, right?

import sys
cnt=0
for l in sys.stdin:
    cnt+=1
print("count\t{}".format(cnt))
import sys
cnt=0
for l in sys.stdin:
 cnt+=l.split('\t')[1]
print(cnt)

And then just run.

hadoop jar $HADOOP_MAPRED_HOME/hadoop-streaming.jar \
   -Dmapred.reduce.tasks=1 \
   -input $d/\*.csv \
   -output /tmp/streamout \
   -mapper "/usr/bin/python3 mapper.py $*" \
   -reducer "/usr/bin/python3 reducer.py $*"

Connoisseur

Hey do not forget about pig!

csvs = LOAD '$d/*.csv';
csvgroup = GROUP csvs ALL;
cnt = FOREACH csvgroup GENERATE COUNT(csvs);
dump cnt;

Run with:

pig cnt.pig

Hipster

Pyspark is all the rage

f=sc.textFile("$d/*.csv")
cnt=f.count()
print cnt

Benchmarking

Ok this was fun, but what works best?

I ran 2 sets of test, one on a directory with a few (~5) big files, one on a directory with many (250) tiny files. I ran the count 10 times for each test, the results are in seconds and lower is better.

Few big files Many small files
Mean Median Stdev Mean Median Stdev
Sysadmin 49.6 46.5 12.17 1 1 0
Dutiful 49.8 46.5 11.71  5.1 5 0.32
Connoisseur 25.7 25 2.31 17.2 17 0.42
Old school 21.5 21.5 0.53 47.7 48 1.06
Tormented 28.3 28 0.82 39.9 39.5 1.1
Hipster 20.2 20 1.03 9.4 9 0.52

A few things are apparent:

  • Different use cases will call for different options,
  • When data is downloaded (sysadmin via the mountpoint or dutiful via hdfs -cat) the variance is higher. Weirdly enough for my tests the both started at about 37 seconds, and increased at every run by 5 seconds to stay stable at 63 seconds,
  • pyspark is the clear winner.

Hive: add a column pitfalls

Adding a column to an existing table is easy:


ALTER TABLE tbl ADD COLUMNS (new_col TIMESTAMP)

Easy right? Not always.

As the doc says,

The column change command will only modify Hive’s metadata, and will not modify data. Users should make sure the actual data layout of the table/partition conforms with the metadata definition.

What this means is that this command will change the table metadata, but not the partition metadata and this column will appear as NULL in select queries.

The solution is then easy, just add the CASCADE keyword:


ALTER TABLE tbl ADD COLUMNS (new_col TIMESTAMP) CASCADE

Then partitions will be updated as well.

Easy right? Not always.

If you run this command, with CASCADE, on a table without partition, you will end up with this non-descriptive error:

Error: Error while compiling statement: FAILED: NullPointerException null (state=42000,code=40000)

In short:

  • if you have a partitioned table you must use CASCADE.
  • if you do not have partitions, you must not use CASCADE.

Why is my hive MERGE statement slow?

In my ETL flow, I need to merge a source table in a destination table, in Hive. This turned out to be much slower than expected so I had to dig around a lot and these are the results I discovered.

Context

Some data is coming from kafka, written as avro files on hdfs. These avro files are used to create an external table, which is then merged every day into the final ORC table. The external data files are then moved out of the way, meaning that the next ETL run will have a brand new external table to be fully merged into the destination table.

SQL


set hive.merge.cardinality.check=false;
set domainregexp='.*@(.*?)$';
MERGE INTO contact dst
USING (
  SELECT

    -- DISTINCT fields
      client -- partition column
    , user_id as id
    , ct.cid as cid
    -- other fields
     , email
     , lang
     -- note: domain is around here, but is computed from email. I compute
     -- it only when needed to prevent useless processing.
    , CAST(timestamp_ms_utc AS TIMESTAMP) AS ts_utc

    , ROW_NUMBER() OVER (
      PARTITION BY client
        , ct.cid
        , user_id
      ORDER BY timestamp_ms_utc DESC
     ) as r

  FROM
    external table
-- campaign_id is a stupid struct<long:bigint,array:array<bigint>>.
  -- Let's sanitise it.
  LATERAL VIEW explode(campaign_id) ct AS cid
) src
ON
  dst.client = src.client
  AND dst.campaign_id = src.cid
  AND dst.id = src.id

-- On match: keep latest loaded
WHEN MATCHED
 AND dst.updated_on_utc < src.ts_utc
 AND src.r = 1
THEN UPDATE SET
  -- other fields
    email = src.email
  , domain = regexp_extract(src.email, ${hiveconf:domainregexp}, 1)
  , lang = src.lang
  , updated_on_utc = src.ts_utc

WHEN NOT MATCHED AND src.r = 1 THEN INSERT VALUES (
   src.id
 , src.cid

 , src.email
 , regexp_extract (src.email, ${hiveconf:domainregexp}, 1)
 , src.lang

 , src.ts_utc -- insert_date
 , src.ts_utc -- update_date

 , src.client -- partition column
)
;

This statement:

  • reads the source table,
  • explodes an array (campaign_id),
  • orders the rows within the same ‘unique’ key (ROW_NUMBER()),
  • updates or inserts the first unique row.

Problem

Merging takes exponentially longer. Merging the first day into the (empty) destination table takes about 30 minutes. The second day takes about 1.5 hour. The third day takes 4 hours. I stopped there.

What could go wrong?

Many things as it turned out.

(Attempted) Solutions

SQL tweaking

My first guess was that my SQL was not great. Here is what I tried:

  • Removing the regex. No impact.
  • Create a temporary table without duplicates and merge that one. Negative impact (4x longer).
  • Execute the merge per partition, one by one. Very negative impact.
  • Replace the source table by a subquery to filter out the r=1 before the merge. Negative impact (20% longer).
  • Create a table with exaclty the same structure as the destination table as a temporary table and merge that one. Negative impact (30% longer).
  • Pre-explode the lateral view earlier in the process (25% longer).

Apparently my SQL was quite good, so I had to look elsewhere.

Java heap

It turned out that many of my services were under configured. I increased the datanode heap, namenode heap, hive metastore heap and this all already made a big difference in speed but it was not enough.

Small files

This was a massive issue.

I had 3 source files per table per minute. On top of this, I had some aggressive partitioning and bucketing (buckets are mandatory for ACID tables, ACID tables are mandatory for a merge).

Updating this to have 3 source files per hour and having only 4 buckets per table instead of 64 gave me great performance. I am still not fully clear about the impact of bucketing but this will be a question for later if I notice other performance problem. I have enough on my plate to not do premature optimisation.

Final solution

In four words: bigger heap, less files.

My initial 30 minute merge in an empty table is now done in about 8 minutes in a table with 145M rows in which 35M are merged daily.

 

 

 

Hadoop metrics in graphite

I will not present graphite here, if you end up reading this I assume you already have a graphite instance up and running. If not it is a matter of less than an hour to have a usable instance.

Hadoop uses metrics2 which allows multiple metrics output plugins to be used in parallel, supports dynamic reconfiguration of metrics plugins, provides metrics filtering, and allows all metrics to be exported via JMX.

Those metrics can be very easily exported to graphite to then be sliced and diced to your heart’s content.

You only need to modify the file hadoop-metrics2.properties by adding the following snippet:

# Sampling period
*.period=10

# Grahite sink class
*.sink.graphite.class=org.apache.hadoop.metrics2.sink.GraphiteSink

# Location of your graphite instance
*.sink.graphite.server_host=10.x.x.x
*.sink.graphite.server_port=2003

# Define for each metric group (* in *.prefix) how it should be named
# in graphite (part after the =)
datanode.sink.graphite.metrics_prefix=hadoop.datanode
namenode.sink.graphite.metrics_prefix=hadoop.namenode
resourcemanager.sink.graphite.metrics_prefix=hadoop.resourcemanager
nodemanager.sink.graphite.metrics_prefix=hadoop.nodemanager
jobhistoryserver.sink.graphite.metrics_prefix=hadoop.jobhistoryserver
journalnode.sink.graphite.metrics_prefix=hadoop.journalnode
maptask.sink.graphite.metrics_prefix=hadoop.maptask
reducetask.sink.graphite.metrics_prefix=hadoop.reducetask
applicationhistoryserver.sink.graphite.metrics_prefix=hadoop.applicationhistoryserver

In Ambari, just go to HDFS > Config > Advanced hadoop-metrics2.properties, the location for other distributions should be trivial to find.

After that restart hdfs and all relevant services you asked to monitor (if you asked to monitor resourcemanager, restart the resource managers and so on).

That’s it, you’re set.

If you are on HDP, you can go a bit further. HDP actually ships with a grafana instance (if you installed Ambari metrics) which can use graphite a data source. Data will be the same, display will be a tad prettier.

This uses graphite web (port 80 per default) which needs to enable CORS. You can do it in apache (the default graphite web http server) by adding this line in your graphite vhost:

Header set Access-Control-Allow-Origin "*"

 

--no options with argparse and python

Ruby has this very nice feature when you define options with optparse:

opts.on('--[no-]flag', "Set flag.") do |p|
    options.persistPost=p
end

which allows you to have the --flag and --no-flag options for free. Python does not have this, but there are a 3 options to go around that.

The verbose way

Just define 2 options.

  parser.add_argument(
    '--flag',
    dest='flag',
    action='store_true',
    help='Set flag',
  )
  parser.add_argument(
    '--no-flag',
    dest='flag',
    action='store_false',
    help='Unset flag',
  )

Custom action

You can give a custom action to the action parameter of add_argument. This custom action can look at the actual option given and act accordingly.

  parser.add_argument(
    '--flag', '--no-flag',
    dest='flag',
    action=BooleanAction,
    help='Set flag',
  )

BooleanAction is just a tiny 6 lines class, defined as follow:

class BooleanAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, **kwargs):
        super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, False if option_string.startswith('--no') else True)

As you can see, it just looks at the name of the flag, and if it starts with --no, the destination will be set to False.

Custom parser

Create your own add_argument method, which can then automagically add the --no option for you.
First define your own parser:

class BoolArgParse(argparse.ArgumentParser):
    def add_bool_arguments(self, *args, **kw):
        grp = self.add_mutually_exclusive_group()
        # add --flag
        grp.add_argument(*args, action='store_true', **kw)
        nohelp = 'no ' + kw['help']
        del kw['help']
        # add --no-flag
        grp.add_argument('--no-' + args[0][2:], *args[1:], action='store_false', help=nohelp, **kw)

Then use it:

parser = BoolArgParse()
parser.add_bool_arguments('--flag',dest='flag', help='set flag.')

Comparison

I do not want to say plus and min points as not all use cases want the same features, but there you are:

  • Verbose way:
    • More lines of code (need to define 2 flags),
    • Help more verbose,
    • Easy (no extra class),
    • Possibility to have the same parameter multiple times, the last one wins (eg. --flag --no-flag).
  • Custom action:
    • Less lines of code,
    • Help not verbose (only one line of help),
    • Possibility to have the same parameter multiple times, the last one wins (eg. --flag --no-flag).
  • Custom parser
    • The most lines of codes,
    • Help verbose but grouped,
    • Cannot have the same flag repeated.

Extracting queries from Hive logs

Hive logs are very verbose, and I personally find it a pain to wade through them when I try to understand which queries my ETL tool decided to generate.

To help with this, I created this small python script which looks at hive logs files and output the SQL queries and only the queries, with some information about them if known: time started, duration, success.

Usage:

./hqe.py --help 
usage: hqe.py [-h] [--since SINCE] [--to TO] [--logdir LOGDIR]
              [--glob LOGFILE_GLOB]
              [--loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}]

Displays queries ran on Hive.

optional arguments:
 -h, --help show this help message and exit
 --since SINCE how far to look back. (default: 15m)
 --to TO How far to look forward. (default: now)
 --logdir LOGDIR Directory of hive log files. (default: /var/log/hive)
 --glob LOGFILE_GLOB Shell pattern of hive logfiles inside their logdir.
 (default: hiveserver2.log*)
 --loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}, -l {DEBUG,INFO,WARNING,ERROR,CRITICAL}
 Log level. (default: warn)

Sample output:

Started at 2017-06-22 05:30:58 for 12.788000s by hive on ip-10-0-0-10.eu-west-2.compute.internal (Probably success). (Thread id: 79733, query id: hive_20170622053058_676612af-7bb8-4c4b-8fce-51bd1ae7be71, txn id: 0):
SELECT  
 id,
 count(*)
FROM 
 raw.event
GROUP BY
 1 
ORDER BY -- required for next step
 sys_partition

Started at 2017-06-22 05:31:25 for 0.018000s by Unknown on Unknown (Probably success). (Thread id: 79770, query id: hive_20170622053125_7d8e644a-5c23-4ca8-ab0f-20becdd65c3b, txn id: Unknown):
use events

Started at 2017-06-22 05:31:25 for Unknowns by Unknown on Unknown (FAILED). (Thread id: handler-46, query id: Unknown, txn id: Unknown):
MERGE INTO mart.click dst
USING (
 SELECT
 [big sql...]
 ) as r

FROM
 raw.click
 WHERE
 ${SEQ_CHECKER_SQL}
) src
ON

 [big sql...]

WHEN NOT MATCHED THEN INSERT VALUES (
  [more sql]
)
Error: ParseException line 36:4 cannot recognize input near '$' '{' 'SEQ_CHECKER_SQL' in expression specification

As you can see:

  • If user, hostname and duration are know they are displayed,
  • query is displayed with the same formatting as it was sent, inclusive comments,
  • error (if any) is showed. In my case, a variable is not expanded by the ETL tool.

You can find the source on github.