Thousand separators in Hue

Hue is a very handy SQL assistant for Hadoop, where you can easily run Hive or Impala query.

I was asked if it was possible to have thousands separator in the display of the query results. There is no option in Hue, I thought I could get away with Django environment variables but either it’s not possible, or I got it wrong.

In any case, it did not feel great. You could argue that Hue is a display tool so it’s OK to format the output, but it would be for all users, and they might not all want that…

Long story short, I removed my Hadoop administrator cap and put on my dirty creative one. Once the results are loaded, it’s a trivial javascript manipulation to format them. Furthermore, Hue uses jquery which makes it even easier. So I came up with this little bookmarklet. Put it in the URL field of a bookmark, including the ‘javascript:’ prefix. If you want to format your output, just click on the bookmarklet et voilà:

javascript:(
  function(){
    $('table.resultTable td').each(
      function(){
        if (!isNaN($(this).text())) {
          $(this).text($(this).text().toString().replace(/(\d)(?=(\d{3})+(?!\d))/g, '$1,'))
        }
      }
    )
  }()
);

Where exactly is this block on HDFS?

This post will show you how to find out where a specific hdfs block is: on which server and on which disk of this server.

Context

I needed to decommission a directory from hdfs (updating dfs.datanode.data.dir). This is not a big deal because the default replication factor is 3. Removing a disk would just trigger a rebalance.

Safety checks

Just for safety, I first wanted to check if all blocks were properly replicated. This is easy to check with the following command:

 hdfs fsck / -files -blocks -locations | grep repl=1 -B1

What does it do?

  • hdfs fsck /
    • run hdfs checks from the root
  •  -files -blocks -locations
    • Display file names, block names and location
  • | grep repl=1
    • show only blocks with replication 1
  • -B1
    • But please display the previous line as well to get the actual file name

If you’re good (all files are properly replicated) you would get an empty output. Otherwise, you get a bunch of those lines in the output:

/a/dir/a/file 2564 bytes, replicated: replication=1, 1 block(s): OK
0. BP-1438592571-10.88.112.28-1502096897275:blk_1077829561_4348908 len=2564 Live_repl=1 [DatanodeInfoWithStorage[10.1.2.3:9866,DS-f935a126-2226-4ef8-99a6-20d700f06110,DISK]]
--
/another/dir/another/file 2952 bytes, replicated: replication=1, 1 block(s): OK
0. BP-1438592571-10.88.112.28-1502096897275:blk_1077845856_4366930 len=2952 Live_repl=1 [DatanodeInfoWithStorage[10.2.3.4:9866,DS-1d065d48-f887-4ed5-be89-5e9c79633519,DISK]]

Technically, for me this was an error, which I could fix by forcing the replication to 3:

hdfs dfs -setrep 3 /a/dir/a/file

Where are my blocks?

In other words, are there unreplicated blocks on the disk I am about to remove?

There might be good reasons to have a replication factor of 1, and you then want to be sure that none of the blocks are on the disk you will remove.  How can you do that?

Looking at the output of the previous command, specifically the DatanodeInfoWithStorage bit, you can find out some interesting information already:

10.2.3.4:9866,DS-1d065d48-f887-4ed5-be89-5e9c79633519,DISK
  • 10.2.3.4:9866 this is the server where the block is, 9866 is the default datanode port,
  • DISK: good, the data is stored on disk,
  • DS-1d065d48-f887-4ed5-be89-5e9c79633519: this looks like a disk ID. What does it mean?

Looking at the source on github does not help much: this is a string, named storageID. What now?

It turns out that this storage ID is in a text file on every directory listed in dfs.datanode.data.dir. Look at one of those, you will find the file current/VERSION, which looks like:

#Tue Apr 07 13:49:10 CEST 2020
storageID=DS-dc5bed87-addb-4575-b2e3-6cbb114e4700
clusterID=cluster16
cTime=0
datanodeUuid=b0d3af53-3320-4833-8063-a13720f84bae
storageType=DATA_NODE
layoutVersion=-57

And there you are, there is the storageID, which matches what was displayed via the hdfs command.

This was the missing link to exactly know on which disk you block was.

Hive and integer overflows

I was playing around in Hive (3, from HDP) with big integers when I noticed something weird:

select 9223372036854775807 * 9223372036854775807;
-- 1

select 9223372036854775807 + 1;
-- -9223372036854775808

It turns out that Hive silently overflows integers. This comes from java, which does the same.

It’s lucky I noticed, it could have been very painful for me down the line. The workaround is to use big decimal. As the doc says:

Integral literals larger than BIGINT must be handled with Decimal(38,0). The Postfix BD is required.

For instance:

select 9223372036854775807BD * 9223372036854775807BD;
-- 85070591730234615847396907784232501249

select 9223372036854775807BD + 1;
-- 9223372036854775808

But it is still somewhat weird. Overflows with big decimals won’t error out but will return null:

select 9223372036854775807BD
  * 9223372036854775807BD
  * 9223372036854775807BD
;
-- NULL

Furthermore, if the precision is not 0 some behaviours are not consistent:

create temporary table dec_precision(
  d decimal(38, 18)
);
insert into dec_precision values
    (98765432109876543210.12345)
  , (98765432109876543210.12345)
;

select sum(d) from dec_precision;
-- NULL (but why?)
select sum(98765432109876543210.12345BD) from dec_precision;
-- 197530864219753086420.24690 (as expected)
select 98765432109876543210.12345BD + 98765432109876543210.12345BD;
-- 197530864219753086420.24690 (as expected)

Conversely, Mysql

select 9223372036854775807 * 9223372036854775807;
-- ERROR 1690 (22003): BIGINT value is out of range in '9223372036854775807 * 9223372036854775807'

or Postgres

select 2147483647 * 2147483647;
ERROR:  integer out of range

are a lot safer and friendlier in that regard.

Easy test data with Hive

Testing a query on a small dataset, especially if you need to carefully check your joins is usually made by creating a few temporary tables with hand-crafted data. This is a true and tested method, but it has a few disadvantages:

  • Requires some work if you need to change your data,
  • If the table is not temporary you need to not forget to drop it,
  • If your table is temporary it needs to be recreated after a reconnection,
  • If you don’t save the initialisation statements your test data is gone,
  • Especially with Hive, handling small tables has a lot of overhead.

It all works, but there is a nicer alternative: CTE + UDTF. Expanded, it means Common Table Expression with User Defined Table-generating Function.

Without further ado, here is an example, with the usual employees and departement:

with employee as(
  select inline(array(
      struct('Alice', '2017-03-04', 1)
    , struct('Bob', '2017-04-12', 1)
    , struct('Carol', '2018-12-24', 2)
  ))  as (name, start_date, dpt_id)
)
, department as (
  select inline(array(
      struct('IT', 1)
    , struct('Finance', 2)
  ))  as (name, id)
)
select
    e.name
  , e.start_date
  , d.name
from
  employee e
join
  department d
on
  e.dpt_id=d.id
;

And the result:

+---------+---------------+----------+
| e.name  | e.start_date  |  d.name  |
+---------+---------------+----------+
| Alice   | 2017-03-04    | IT       |
| Bob     | 2017-04-12    | IT       |
| Carol   | 2018-12-24    | Finance  |
+---------+---------------+----------+

So, what do we have here?

I define 2 common table expressions (with .. as () statement), which is a sort of run-time table. They can be used in any following CTE or queries. This table is defined by just giving the data we want in it (surrounded by inline(array(…)) as). Changing, adding, removing data is thus trivial and all is nicely bundled in one place.

Another nice thing is that these CTEs actually shadow real tables with the same name. This means that once you’re done testing, you just comment out the CTE definitions and the query will run with real data. This has the added benefit that you can always keep your test data with your actual query. You just need to uncomment the CTEs to use them.

Many other RDBMs (Mysql, Postgres, Oracle…) have CTEs. The UDTF (inline function) is less common, unfortunately.

ATS server does not start

The newer versions of Hadoop, including HDP3, use HBase as the backend for the timeline service. You can either use an external HBase or have a system HBase running on Yarn (the default).

When using the system HBase, you could end up with the timeline server up and running, but with an alert (in Ambari) saying:

ATSv2 HBase Application The HBase application reported a ‘STARTED’ state. Check took 2.125s

The direct impact will be that Oozie jobs (among others) will take forever to run, as each step will wait for a timeout from the ATS (Application Timeline Server) before carrying on.

The solution I found to fix this is as follow:

    1. Check your yarn logs (/var/log/hadoop-yarn/yarn/ on hdp) for anything clear to spot, for instance, not enough yarn memory (and then fix it if relevant),
    2. Clean up hdfs ATS data as described on the HDP docs,
    3. Clean up zookeeper ATS data (the example here is for insecure clusters, you will probably have another znode for kerberised clusters): zookeeper-client rmr /atsv2-hbase-unsecure
    4. Restart *all* YARN services,
    5. Restart ambari server (we had a case where it looked like the alert was wrongly cached).
    6. Restart all services on the host where the ATS server lives.

The steps cleaning hdfs and zookeeper will make you lose your ATS history (ie. job names, timing, logs…), but your actual data is perfectly safe, nothing else will be lost.

Reaching Hive from pyspark on HDP3

There is a lot to find about talking to hive from Spark on the net. Sadly most of it refers to Spark before version 2 or are not valid for hdp3. You need to use the Hive Warehouse Connector, bundled in HDP3.

This is an example of a minimalistic connection from pyspark to hive on hdp3.

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Yes, llap even if you do not use it.
from pyspark_llap import HiveWarehouseSession

settings = [
    ('spark.sql.hive.hiveserver2.jdbc.url',
     'jdbc:hive2://{your_hiverserver2_url:port}/default'),
]

conf = SparkConf().setAppName("Pyspark and Hive!").setAll(settings)
# Spark 2: use SparkSession instead of SparkContext.
spark = (
    SparkSession
    .builder
    .config(conf=conf)
    .master('yarn')
    # There is no HiveContext anymore either.
    .enableHiveSupport()
    .getOrCreate()
)

# This is mandatory. Just using spark.sql will not be enough.
hive = HiveWarehouseSession.session(spark).build()

hive.showDatabases().show()
hive.execute("select 2 group by 1 order by 1").show()
spark.stop()

You then can run this with the following command:

HDP_VERSION=3.0.1.0-187 \
PYSPARK_PYTHON=python3 \
HADOOP_USER_NAME=hive \
SPARK_HOME=/usr/hdp/current/spark2-client \
spark-submit \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \
--py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \
{your_python_script.py}

Note:

  • HDP_VERSION is needed when you use python 3. If not set, HDP uses a script (/usr/bin/hdp-select) which is python 2 only (although fixing it is trivial).
  • PYSPARK_PYTHON is optional, it will default to just python otherwise (which might or might not be python 3 on your server)
  • without HADOOP_USER_NAME the script will run as your current user. Alternatively, you could sudo first.
  • without SPARK_HOME some jars would not be found and you would end up with an error like py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
    : java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
  • –jars and –py-files as you can see, there is the hdp version in file names. Make sure you are using the proper one for you.
  • there is no –master option, this is handled in the script while building the SparkSession.

There is some doc from Hortonworks you can follow to go further: Integrating Apache Hive with Spark and BI.

Just before I posted this article, a new write-up appeared on Hortonworks.com to describe some use cases for the Hive-Warehouse-Connector.

ORC benchmarking

I need to use transactional tables in Hive, so I naturally use the ORC format. One limitation of those transactional tables is that they have to be bucketed (in Hive 2 at least. In Hive3 my understanding is that bucketing is not mandatory, but the default is equivalent to having 1 bucket).

It is tricky to find a good answer on how to use buckets: how many? on which columns? It usually boils down to:

Buckets are something that should be done for a concrete problem not just because you think you should have them. Normally I would not use them.

Which is not helpful when you actually have to use them.

So how to use buckets?

I did some benchmarking to try to get a valid answer. My dataset was as follow:

  • Data:
    • 1B rows table worth 3 months of data, with a compound (logical) key: user id, and 4-level hierarchical identifier. This is about 15GB (unreplicated) on disk.
    • 9 merges of 1 hour of data
  • Benchmarks:
    • select count(*)
    • select count(*) where  hierarchy_level_1=something
    • select 1 specific row
    • big select with join on 2 other tables, resulting in 400k rows.

All benchmarks were run 5 times, doing one iteration for all tables before starting the next iteration to prevent caching.

I tested a few variations:

  • transactional table or not,
  • partitioned or not (partition was the highest level of my hierarchical key. About 200 hundred partitions, somewhat skewed),
  • 1 to 32 buckets,
  • bloom filters on levels #1 and/or #2  my hierarchical key and/or on the user id.
  • I added as extra test some non-transactional tables, sorted on different fields.

It is tricky to properly display the data here so I show it as an image (click to enlarge), but you can find it as a pdf: ORC Benchmark – results or see it on google sheets.


orcbench

What gives?

I had some expectations:

  • More buckets would mean more files so faster load and faster queries up to a point.
  • Bucket + partitions would create way too many files, at the cost of performance.
  • Proper bloom filters would make everything faster.

Well, looking at the results, here are the takeaways:

  • Bloom filters have basically no impact at all,
  • no partition usually helps,
  • 1 bucket is horrible (but that I did expect),
  • too many buckets and partition together is bad,
  • the sweet spot (for me) is partition +  4 buckets.

I am very disappointed because I had a lot of hopes for bloom filters (I guess there is something I am doing wrong). The rest is more a confirmation of my expectations, with numbers to prove them.

The worst of all? My sweet spot is the one I am already using, so after all those tests (which ran almost 3 days straight) I have no new information to speed up my queries. Well, the silver lining is that at least I have data to show that I can pat myself on the back.

Cluster on which I tested: hdp 2.6, hive 2, 8 data nodes (8 CPU, 32GB) on AWS, doing nothing else apart from running these tests.