Accessing Dundas with Python

Dundas has a great REST API. You can basically do everything with. Furthermore, it’s easy to find examples, as you just have to look at what your Dundas web app does, and you have all the examples and use cases you can wish for.

I wanted to schedule cube refreshes, so I naturally turned toward Python. It wasn’t too complex, as you will see below. The one thing to take care of is logging out, in all possible cases, otherwise you will burn very fast through your elastic hours (been there, done that). I’ll show you here how I did it with a context manager, which means that I can basically not forget to log out, whatever happens, it’s all managed for me.

In my code in production, this object actually does something (refreshes cubes), this is only a skeleton to get you started. You can find the code as well on my github, it might be easier to read.

The code actually does not do much: the call to __new__ associated with contextlib.closing() allows you to use the object DundasSession within a context manager, with the keyword with, thus guaranteeing you that you will always log out, no matter what, even if an exception or a sys.exit occurs.

#!/usr/bin/env python3

"""
Skeleton to use Dundas Rest API, with guaranteed log out.
"""
import contextlib
import logging
import requests
import sys

class DundasSession:
    """
    Using __new__ + contextlib.closing() is awesome.

    DundasSession can now be used within a context manager, meaning that whatever happens, its close() method
    will be called (thus logging out). No need to call logout() explicitely!
    Just using __del__ is not guaranteed to work because when __del__ is called, you do not know which objects
    are already destroyed, and the session object might well be dead.
    """
    def __new__(cls, *args, **kwargs):
        o=super().__new__(cls)
        o.__init__(*args, **kwargs)
        return contextlib.closing(o)

    def __init__(self, user, pwd, url):
        # For session reuse - TCP connection reuse, keeps cookies.
        self.s = requests.session()

        self.user = user
        self.pwd = pwd
        self.url = url
        self.api = self.url + '/api/'
        self.session_id = None  # Will bet set in login()

    def login(self):
        """Login and returns the session_id"""
        login_data = {
            'accountName': self.user,
            'password': self.pwd,
            'deleteOtherSessions': False,
            'isWindowsLogOn': False
        }
        logging.info('Logging in.')
        r = self.s.post(self.api + 'logon/', json=login_data)
        # The following line exceptions out on not 200 return code.
        r.raise_for_status()

        resp = r.json()
        if resp['logOnFailureReason'].lower() == "none":
            # We're in!
            logging.info('Logged in')
            self.session_id = resp['sessionId']
        else:
            logging.error('Login failed with message: ' + r.text)
            sys.exit(1)

    def close(self):
        """Automagically called by the context manager."""
        self.logout()

    def logout(self):
        """If you do not logout, session will stay active, potentially burning through your elastic hours very fast."""

        # If session_id is not defined, we did not even log in (or we are already logged out).
        logging.info('Logging out.')
        if getattr(self, 'session_id', None):
            r = self.s.delete(self.api + 'session/current', params={'sessionId': self.session_id})
            r.raise_for_status()
            del self.session_id
            logging.info('Logged out.')
        else:
            logging.info('Was not yet Logged in.')

logging.basicConfig(level='INFO')

with DundasSession(user='yourapiuser', pwd='pwd', url='https://reports.example.com') as dundas:
    dundas.login()

    # Do something smart with your Dundas object.

# No need to log out, this is handled for you via the context manager, even in case of exception or even sys.exit.
Advertisement

Easy test data with Hive

Testing a query on a small dataset, especially if you need to carefully check your joins is usually made by creating a few temporary tables with hand-crafted data. This is a true and tested method, but it has a few disadvantages:

  • Requires some work if you need to change your data,
  • If the table is not temporary you need to not forget to drop it,
  • If your table is temporary it needs to be recreated after a reconnection,
  • If you don’t save the initialisation statements your test data is gone,
  • Especially with Hive, handling small tables has a lot of overhead.

It all works, but there is a nicer alternative: CTE + UDTF. Expanded, it means Common Table Expression with User Defined Table-generating Function.

Without further ado, here is an example, with the usual employees and departement:

with employee as(
  select inline(array(
      struct('Alice', '2017-03-04', 1)
    , struct('Bob', '2017-04-12', 1)
    , struct('Carol', '2018-12-24', 2)
  ))  as (name, start_date, dpt_id)
)
, department as (
  select inline(array(
      struct('IT', 1)
    , struct('Finance', 2)
  ))  as (name, id)
)
select
    e.name
  , e.start_date
  , d.name
from
  employee e
join
  department d
on
  e.dpt_id=d.id
;

And the result:

+---------+---------------+----------+
| e.name  | e.start_date  |  d.name  |
+---------+---------------+----------+
| Alice   | 2017-03-04    | IT       |
| Bob     | 2017-04-12    | IT       |
| Carol   | 2018-12-24    | Finance  |
+---------+---------------+----------+

So, what do we have here?

I define 2 common table expressions (with .. as () statement), which is a sort of run-time table. They can be used in any following CTE or queries. This table is defined by just giving the data we want in it (surrounded by inline(array(…)) as). Changing, adding, removing data is thus trivial and all is nicely bundled in one place.

Another nice thing is that these CTEs actually shadow real tables with the same name. This means that once you’re done testing, you just comment out the CTE definitions and the query will run with real data. This has the added benefit that you can always keep your test data with your actual query. You just need to uncomment the CTEs to use them.

Many other RDBMs (Mysql, Postgres, Oracle…) have CTEs. The UDTF (inline function) is less common, unfortunately.

ATS server does not start

The newer versions of Hadoop, including HDP3, use HBase as the backend for the timeline service. You can either use an external HBase or have a system HBase running on Yarn (the default).

When using the system HBase, you could end up with the timeline server up and running, but with an alert (in Ambari) saying:

ATSv2 HBase Application The HBase application reported a ‘STARTED’ state. Check took 2.125s

The direct impact will be that Oozie jobs (among others) will take forever to run, as each step will wait for a timeout from the ATS (Application Timeline Server) before carrying on.

The solution I found to fix this is as follow:

    1. Check your yarn logs (/var/log/hadoop-yarn/yarn/ on hdp) for anything clear to spot, for instance, not enough yarn memory (and then fix it if relevant),
    2. Clean up hdfs ATS data as described on the HDP docs,
    3. Clean up zookeeper ATS data (the example here is for insecure clusters, you will probably have another znode for kerberised clusters): zookeeper-client rmr /atsv2-hbase-unsecure
    4. Restart *all* YARN services,
    5. Restart ambari server (we had a case where it looked like the alert was wrongly cached).
    6. Restart all services on the host where the ATS server lives.

The steps cleaning hdfs and zookeeper will make you lose your ATS history (ie. job names, timing, logs…), but your actual data is perfectly safe, nothing else will be lost.

Reaching Hive from pyspark on HDP3

There is a lot to find about talking to hive from Spark on the net. Sadly most of it refers to Spark before version 2 or are not valid for hdp3. You need to use the Hive Warehouse Connector, bundled in HDP3.

This is an example of a minimalistic connection from pyspark to hive on hdp3.

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Yes, llap even if you do not use it.
from pyspark_llap import HiveWarehouseSession

settings = [
    ('spark.sql.hive.hiveserver2.jdbc.url',
     'jdbc:hive2://{your_hiverserver2_url:port}/default'),
]

conf = SparkConf().setAppName("Pyspark and Hive!").setAll(settings)
# Spark 2: use SparkSession instead of SparkContext.
spark = (
    SparkSession
    .builder
    .config(conf=conf)
    .master('yarn')
    # There is no HiveContext anymore either.
    .enableHiveSupport()
    .getOrCreate()
)

# This is mandatory. Just using spark.sql will not be enough.
hive = HiveWarehouseSession.session(spark).build()

hive.showDatabases().show()
hive.execute("select 2 group by 1 order by 1").show()
spark.stop()

You then can run this with the following command:

HDP_VERSION=3.0.1.0-187 \
PYSPARK_PYTHON=python3 \
HADOOP_USER_NAME=hive \
SPARK_HOME=/usr/hdp/current/spark2-client \
spark-submit \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \
--py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \
{your_python_script.py}

Note:

  • HDP_VERSION is needed when you use python 3. If not set, HDP uses a script (/usr/bin/hdp-select) which is python 2 only (although fixing it is trivial).
  • PYSPARK_PYTHON is optional, it will default to just python otherwise (which might or might not be python 3 on your server)
  • without HADOOP_USER_NAME the script will run as your current user. Alternatively, you could sudo first.
  • without SPARK_HOME some jars would not be found and you would end up with an error like py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
    : java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
  • –jars and –py-files as you can see, there is the hdp version in file names. Make sure you are using the proper one for you.
  • there is no –master option, this is handled in the script while building the SparkSession.

There is some doc from Hortonworks you can follow to go further: Integrating Apache Hive with Spark and BI.

Just before I posted this article, a new write-up appeared on Hortonworks.com to describe some use cases for the Hive-Warehouse-Connector.

ORC benchmarking

I need to use transactional tables in Hive, so I naturally use the ORC format. One limitation of those transactional tables is that they have to be bucketed (in Hive 2 at least. In Hive3 my understanding is that bucketing is not mandatory, but the default is equivalent to having 1 bucket).

It is tricky to find a good answer on how to use buckets: how many? on which columns? It usually boils down to:

Buckets are something that should be done for a concrete problem not just because you think you should have them. Normally I would not use them.

Which is not helpful when you actually have to use them.

So how to use buckets?

I did some benchmarking to try to get a valid answer. My dataset was as follow:

  • Data:
    • 1B rows table worth 3 months of data, with a compound (logical) key: user id, and 4-level hierarchical identifier. This is about 15GB (unreplicated) on disk.
    • 9 merges of 1 hour of data
  • Benchmarks:
    • select count(*)
    • select count(*) where  hierarchy_level_1=something
    • select 1 specific row
    • big select with join on 2 other tables, resulting in 400k rows.

All benchmarks were run 5 times, doing one iteration for all tables before starting the next iteration to prevent caching.

I tested a few variations:

  • transactional table or not,
  • partitioned or not (partition was the highest level of my hierarchical key. About 200 hundred partitions, somewhat skewed),
  • 1 to 32 buckets,
  • bloom filters on levels #1 and/or #2  my hierarchical key and/or on the user id.
  • I added as extra test some non-transactional tables, sorted on different fields.

It is tricky to properly display the data here so I show it as an image (click to enlarge), but you can find it as a pdf: ORC Benchmark – results or see it on google sheets.


orcbench

What gives?

I had some expectations:

  • More buckets would mean more files so faster load and faster queries up to a point.
  • Bucket + partitions would create way too many files, at the cost of performance.
  • Proper bloom filters would make everything faster.

Well, looking at the results, here are the takeaways:

  • Bloom filters have basically no impact at all,
  • no partition usually helps,
  • 1 bucket is horrible (but that I did expect),
  • too many buckets and partition together is bad,
  • the sweet spot (for me) is partition +  4 buckets.

I am very disappointed because I had a lot of hopes for bloom filters (I guess there is something I am doing wrong). The rest is more a confirmation of my expectations, with numbers to prove them.

The worst of all? My sweet spot is the one I am already using, so after all those tests (which ran almost 3 days straight) I have no new information to speed up my queries. Well, the silver lining is that at least I have data to show that I can pat myself on the back.

Cluster on which I tested: hdp 2.6, hive 2, 8 data nodes (8 CPU, 32GB) on AWS, doing nothing else apart from running these tests.

Oops, I dropped my Hive Metastore database

The Hive Metastore (HMS) is backed up by a database (MySQL for us), which stores information about HDFS files, stats and more. Without this database, you have no Hive.

A few days ago, a DROP has been issued on the wrong server. Bye Bye metastore. What do you do then?

First things first, I looked at the backups. We had one from 3 hours before, so still quite recent. After it was restored I noticed that Hive worked, but not all data was present visible.

For context, our tables are ORC and transactional. As HDFS files cannot be updated, the way for ORC (and other DBs I know of) to manage transactions is to have a base directory as well as some deltas holding new changes. Reading such a table thus means reading the base directory and all the deltas to apply potential updates. As reading the deltas can become expensive, they are eventually compacted, in 2 possible ways. Quite often the deltas are squashed together (minor compaction for ORC) and once in a while, the base directory is fully rewritten to apply all the deltas (major compaction for ORC).

In my case, between the backup and the restore some new data had been added (new delta directories appeared) but no compaction happened.

This means that the HMS was completely ignoring all the new delta files, but was happily using all the files it knew about. I should add that I have a way to replay data of the last hours/days if needed. Once this was understood and confirmed, the fix was easy:

 

  1. Run a major compaction on all tables
    • All tables ended up consisting of one base directory (used and known by HMS) and a few delta directories ignored by HMS.
  2. Delete all remaining deltas
    • They were not used anyway.
  3. Replay my data
    • Hive was up to date again.

Of course, my tables are partitioned and compaction happens per partition so there was a bit of bash-fu to explicitly compact all partitions, then double check that all remaining deltas were created after the backup. Nothing too complex.

This worked because no compactions ran between restore and backup. A compaction would completely change the files on disk, and the HMS metadata would thus be completely out of sync with the actual files. I have no idea how I would have recovered that one.

On a side note, Hive hanged a few times with this error message in the log:

insert into “NOTIFICATION_LOG
” (“NL_ID”, “EVENT_ID”, “EVENT_TIME”, “EVENT_TYPE”, “DB_NAME”, “TBL_NAME”, “MESSAGE”, “MESSAGE_FORMAT”) values(774946,774869,1543829236,’OPEN_TXN’,’null’,’ ‘,'{“txnIds”:null,”timestamp”:1543829236,”fromTxnId”
:54788,”toTxnId”:54788,”server”:”thrift://x.x.x.x:9083,thrift://y.y.y.y:9083″,”servicePrincipal”:”hive/_HOST@EXAMPLE.COM”}’,’json-0.2′)

metastore.RetryingHMSHandler (RetryingHMSHandler.java:invokeInternal(201)) – MetaException(message:Unable to execute direct SQL java.sql.SQLInte
grityConstraintViolationException: Duplicate entry ‘774946’ for key ‘PRIMARY’

The fix was quite easy. Log into the metastore DB, remove the offending row from NOTIFICATION_LOG, and update the sequence (NOTIFICICATION_LOG_SEQUENCE table) to the maximum value of NL_ID in NOTIFICATION_LOG + 1. The NOTIFCATION_LOG table has something to do with compactions, and as you compacted all tables anyway old values do not matter much.

What is the takeaway? Compactions are run dynamically by Hive. It would be smart to keep an eye on them, and when some happened to run a backup just after. At least you would still be able to partially recover consistent recent enough data.

Newlines in Hive

I have a nice Hive warehouse, running data exports for some clients. All was working smoothly when a coworker came to me because a client complained that there were NULLs in an export, whereas they should all be sanitised via a lot of coalesce.

The reason ended up being newlines which I thought were properly handled, but as it turned out were not in all cases. This post explains what the issues are and how to fix them.

This is an example of what happened (simplified query to show the problem). A json string with newlines, exploded with a lateral view. Newlines are escaped so all should have worked.

SET system:disable.quoting.for.sv=false;
with things as (
    select
        42 as id
      , '{   "item": "cheese"
           , "good": "camembert\\ncomté"
           , "tasty": "yes"
         }' as custom_fields
)
select
    c.id
  , c.custom_fields
  , cv.good
  , cv.item
  , cv.tasty
from
    (select * from things order by id) c
lateral view
    json_tuple(c.custom_fields, "good", "item", "tasty") cv
    as good, item, tasty
;

If you run this code you might (depending on your setup) see a nice \n in custom_fields but an actual line breaks in good which is not what you want.

Note that if you replace the from clause by just from things c the issue will not appear.

The problem is that in my example an intermediary map step is executed (because of the order by). Data in this step was stored by default as textfile, where the row delimiter is, you can guess, \n.

With the simplified from clause, there is no intermediary map step and the problem does not happen.

What we would like is to have the intermediary data stored as some sort of binary format, where \n is not a separator. Luckily, there’s a setting for that: hive.query.result.fileformat. The default (up to Hive 2.0) was TextFile, which as you can guess will use newlines as row separator. From Hive 2.1 onward, the default became SequenceFile which is a binary format, compressible, which does not have the newlines issue.

You can either set this setting globally in hive-site, or add it before your query with set hive.query.result.fileformat=SequenceFile;.

With Hive 2, this fixes the newlines, but the \n completely disappear from the output. This is better than previously but not ideal.
With Hive 3, if you are using beeline, you can add the command line option --escapeCRLF=true which will give you exactly the output you want.

Anatomy of a Merge statement in Hive

When a merge statement is issued, it is actually reparsed in a bunch of inserts. This shows interesting properties which can help you better understand the performance of your statement.

The test setup is easy: one source table, one destination table. The destination needs to be ACID and thus bucketed (this is pre-Hive 3).

create table tmp.tstmerge
(b int, v string)
partitioned by (id int)
clustered by (b) into 1 buckets
stored as orc
TBLPROPERTIES ("transactional"="true");

create table tmp.srcmerge
stored as orc
as
select 1 as id, 'foo' as v;

The merge statement is trivial as well:

merge into tmp.tstmerge dst
using tmp.srcmerge src
on src.id=dst.id
when matched and src.id = 0 then delete
when matched then update set
     v=concat(dst.v, src.v)
when not matched then insert values (
    src.b, src.v, src.id
);

What becomes interesting is that you can see in hiveserver2.log a line like:

Going to reparse YOUR_STATEMENT as ANOTHER_STATEMENT

For the given merge, here is the actual ANOTHER_STATEMENT:

FROM
    tmp.tstmerge dst
RIGHT OUTER JOIN
    tmp.srcmerge src
ON
    src.id=dst.id
INSERT INTO tmp.tstmerge partition (id)    -- delete clause
    select
        dst.ROW__ID , dst.id
    WHERE
        src.id=dst.id AND src.id = 0
    sort by
        dst.ROW__ID
INSERT INTO tmp.tstmerge partition (id)    -- update clause
    select
        dst.ROW__ID, dst.b, concat(dst.v, src.v), dst.id
    WHERE
        src.id=dst.id AND NOT(src.id = 0)
    sort by
        dst.ROW__ID
INSERT INTO tmp.tstmerge partition (id)    -- insert clause
    select
        src.b, src.v, src.id
    WHERE
        dst.id IS NULL
INSERT INTO merge_tmp_table
    SELECT
        cardinality_violation(dst.ROW__ID, dst.id)
    WHERE
        src.id=dst.id
    GROUP BY
        dst.ROW__ID, dst.id
    HAVING
        count(*) > 1

What do we have here?

  1. This becomes a multi-insert statement with 3 different selections for the update, insert and delete clauses. The multi insert is a hive extension.
  2. ROW__ID is a hidden column for acid tables, containing a map:
    select row__id from tmp.tstmerge;
    +----------------------------------------------+--+
    | row__id |
    +----------------------------------------------+--+
    | {"transactionid":44,"bucketid":0,"rowid":0} |
    +----------------------------------------------+--+
  3. To update a row, you just need to change the columns of a row identified by its ROW__ID. Deleting a row is equivalent to nullifying all columns of a ROW__ID. This works because all clauses are insert in the ORC delta files anyway.
  4. cardinality_violiation is a UDF which will exception out if more than one row has the same set of ROW__ID and join condition. This is because the SQL syntax says that there cannot be more than 1 row in the source matching the destination. It will be executed (and thus exception out) only if such a row exists. On a side note, if you prevent cardinality checking (set hive.merge.cardinality.check=false;) then this leg of the multi insert does not exist.
  5. Rows are sort by ROW__ID. Note first that sort by will sort per reducer, whereas order by would sort across all reducers. Order by would be prohibitively expensive. The reason for the sorting is that when delta files are read they can be directly merged on read.

Practically this means that you cannot order data in ORC ACID tables (which is a shame as it is the one thing to do performance-wise when using ORC). Furthermore, any ordering in the src statement, if not meant to speed the join up, will be useless.

The cost of ACID with ORC table

ACID introduction

ACID transactions (update, merge) in Hive are awesome. The merge statement especially is incredibly useful.

Of course, not all table are ACID. You need to use ORC and have the table marked as ACID but those are easy steps:

create table something (id bigint) stored as orc tblproperties("transactional"="true")

Of course, in hdfs you cannot change a file once it is created. The standard way (not Hadoop specific) to handle changing immutable files is to have deltas. Each table will consist of a few directories:

  • the base directory: the data at creation time,
  • one or more delta directories: contains updated rows.

Every hive.compactor.check.interval seconds a compaction will happen (or at least the compactor will check if a compaction must happen). The compactor will compact the deltas and base directory in a new base directory, which will consist of a one new base directory with all the deltas applied to the original base directory.

The reason is that when you read an ACID table with many deltas, there is a lot more to read than for only a base directory as hive has to go through each and every delta. This has IOs and CPU costs, which are removed after compaction.

Naive ACID use

Every day I build a summary table gathering all data that changed in the last 24h as well as some related data. Many events are aggregated together. Think for instance about sending an email: I would get send data, open data maybe click data, bounce and a few others. I started building following the temporal flow:


create table summary (id bigint, number_sent bigint, number_open bigint...)stored as orc tblproperties("transactional"="true");

insert into summary select .... from sent;

merge into summary select ... from open;

merge into summary select ... from click;

...

Overall a few billions rows will be read. The final summary table will have about 100 millions rows.

What is interesting here is that I am inserting the biggest data first. This is the table summing up reads and writes per event while building the whole summary, which ran for about 4 hours:

Event Bytes read (GB) Bytes written (GB)
Total 516.5 104.1
Sent 16.2 87.1
Open 88.8 14.2
Click 101.5 1.7
Conversion 102.9 0.01
Bounce 103 1
Spam 104 0.11

Seeing 500GB read scared me a little, so instead of following the naive temporal flow, I started with the smallest event first to finish up with the biggest:

Event Bytes read (GB) Bytes written (GB)
Total 31.5 99.1
Conversion 0 0
Spam 0 0
Click 0.3 1.5
Bounce 1.7 1
Open 4.4 13.3
Sent 25.1 83.4

That’s much better already! The total number of bytes written does not change much (quite logical I suppose as the final data is the same) but the number of bytes read is only 6% of the original! Furthermore, it ran in 2h40 instead of 4 hours.

I added one last step. This summary data was written at user level. I actually needed to do one extra aggregation but I was worried about joining against the user table at every step, as the user table is actually quite big and joins are expensive. But well, I experimented, doing the aggregation at each step instead of  doing one big aggregation at the end:

Event Bytes read (GB) Bytes written (GB)
Total 20.5 8.6
Conversion 0.2 0
Spam 1.2 0
Click 1.4 0.2
Bounce 1.5 0.2
Open 3.5 1.7
Sent 12.7 6.4

Total run time: 1.5 hours!

TL;DR

When using ACID deltas are expensive. When using HDFS writes are expensive. Order your processing to have a little of those as possible.

Automated Hadoop setup with ambari

Hadoop is complex to configure, this is not new. Cluster managers like ambari help, of course, but finding the sweet configuration spot is not easy, especially as the spot will be different per use case.

My cluster is almost yarn/tez only currently, so I wrote this python script which will look at the whole cluster via the ambari API and configure a good chunk of it, depending on the number of disks, CPU, ram and based on documentation I found scattered all around. It works great even on small clusters and will tell you the reason behind the values of the configuration settings.

The caveats are that LLAP is not yet managed (it was attempted so the option is there, but it does not do anything) and that it assumes that all datanodes are identical.

The default run mode is read-only but you can ask the script to actually update ambari. You will still read to restart the relevant updated services yourself, on your own time (this is of course just one click in the ambari UI).

An example showing only what my script thinks is not correct:

./settings.py --tofix hadoop.example.com

Basic info

Yarn config.
✘ yarn-site/yarn.scheduler.minimum-allocation-mb = 768, expects 1024 (Min container size.) #75%
✘ capacity-scheduler/yarn.scheduler.capacity.resource-calculator = org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator, expects org.apache.hadoop.yarn.util.resource.DominantResourceCalculator (Take all resources in account, not only RAM) #0%

A full run:

./settings.py hadoop.example.com

Basic info
FYI – { ‘ip-10-0-0-001.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-002.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-003.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-004.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-005.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-006.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566633984},
‘ip-10-0-0-007eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080},
‘ip-10-0-0-008.eu-west-1.compute.internal’: {‘cpu’: 8, ‘mem’: 33566638080}}: Data nodes.
FYI – {‘cpu’: 64, ‘disk’: 104, ‘mem’: 268533100544}: Total cluster resources.
FYI – 1024: Min container size (MB), based on amount of ram/cpu in the cluster.
FYI – 128: Number of containers based on recommendations.
FYI – 0.55: Default queue capacity.

Yarn config.
✔ yarn-site/yarn.nodemanager.resource.memory-mb = 24008, expects 24008 (min(yarn memory for one DN) * 0.75.) #100%
✘ yarn-site/yarn.scheduler.minimum-allocation-mb = 768, expects 1024 (Min container size.) #75%
✔ yarn-site/yarn.scheduler.maximum-allocation-mb = 24008, expects 24008 (Same as yarn.nodemanager.resource.memory-mb) #100%
✔ yarn-site/yarn.nodemanager.resource.cpu-vcores = 7, expects 7 (Assuming the cluster in yarn only. Total cores per node -1) #100%
✔ yarn-site/yarn.scheduler.maximum-allocation-vcores = 7, expects 7 (Assuming the cluster in yarn only. Total cores per node -1) #100%
✘ capacity-scheduler/yarn.scheduler.capacity.resource-calculator = org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator, expects org.apache.hadoop.yarn.util.resource.DominantResourceCalculator (Take all resources in account, not only RAM) #0%
Map/reduce config
✔ mapred-site/mapreduce.map.memory.mb = 1024, expects 1024 (Min container size) #100%
✔ mapred-site/mapreduce.reduce.memory.mb = 2048, expects 2048 (2 * min container size) #100%
✔ mapred-site/mapreduce.map.java.opts = 819, expects 819 (0.8 * min container size) #100%
✔ mapred-site/mapreduce.reduce.java.opts = 1638, expects 1638 (0.8 * mapreduce.reduce.memory.mb) #100%
✔ mapred-site/yarn.app.mapreduce.am.resource.mb = 2048, expects 2048 (2 * min container size) #100%
✔ mapred-site/yarn.app.mapreduce.am.command-opts = 1638, expects 1638 (0.8 * yarn.app.mapreduce.am.resource.mb) #100%
✔ mapred-site/mapreduce.task.io.sort.mb = 409, expects 409 (0.4 * min container size) #100%

Hive and Tez configuration
✔ hive-site/hive.execution.engine = tez, expects tez (Use Tez, not map/reduce.) #100%
✔ hive-site/hive.server2.enable.doAs = false, expects false (All queries will run as Hive user, allowing resource sharing/reuse.) #100%
✔ hive-site/hive.optimize.index.filter = true, expects true (This optimizes “select statement with where clause” on ORC tables) #100%
✔ hive-site/hive.fetch.task.conversion = more, expects more (This optimizes “select statement with limit clause;”) #100%
✔ hive-site/hive.compute.query.using.stats = true, expects true (This optimizes “select count (1) from table;” ) #100%
✔ hive-site/hive.vectorized.execution.enabled = true, expects true (Perform operations in batch instead of single row) #100%
✔ hive-site/hive.vectorized.execution.reduce.enabled = true, expects true (Perform operations in batch instead of single row) #100%
✔ hive-site/hive.cbo.enable = true, expects true (Enable CBO. You still need to prepare it by using the analyse HQL command.) #100%
✔ hive-site/hive.compute.query.using.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.fetch.column.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.fetch.partition.stats = true, expects true (Use CBO.) #100%
✔ hive-site/hive.stats.autogather = true, expects true (Use CBO.) #100%
✔ hive-site/hive.server2.tez.default.queues = default, expects ‘lambda x: config.queue in x’ (Must contain the queue name) #100%
✔ hive-site/hive.tez.dynamic.partition.pruning = true, expects true (Make sure tez can prune whole partitions) #100%
✔ hive-site/hive.exec.parallel = true, expects true (Can Hive subqueries be executed in parallel) #100%
✔ hive-site/hive.auto.convert.join = true, expects true (use map joins as much as possible) #100%
✔ hive-site/hive.auto.convert.join.noconditionaltask = true, expects true (Use map joins for small datasets) #100%
✔ hive-site/hive.tez.container.size = 4096, expects 4096 (Multiple of min container size.) #100%
✔ hive-site/hive.auto.convert.join.noconditionaltask.size = 1417339207, expects 1417339207 (Threshold to perform map join. 1/3 * hive.tez.container.size.) #100%
✔ hive-site/hive.vectorized.groupby.maxentries = 10240, expects 10240 (Reduces execution time on small datasets, but also OK for large ones.) #100%
✔ hive-site/hive.vectorized.groupby.flush.percent = 0.1, expects 0.1 (Reduces execution time on small datasets, but also OK for large ones.) #100%
✔ hive-site/hive.server2.tez.initialize.default.sessions = true, expects true (Enable tez use without session pool if requested) #100%
✔ hive-site/hive.server2.tez.sessions.per.default.queue = 3, expects 3 (Number of parallel execution inside one queue.) #100%

Hive and Tez memory
✔ tez-site/tez.am.resource.memory.mb = 1024, expects 1024 (Appmaster memory == min container size.) #100%
✔ tez-site/tez.am.container.reuse.enabled = true, expects true (Reuse tez containers to prevent reallocation.) #100%
✔ tez-site/tez.container.max.java.heap.fraction = 0.8, expects 0.8 (default % of memory used for java opts) #100%
✔ tez-site/tez.runtime.io.sort.mb = 1024, expects 1024 (memory when the output needs to be sorted. == 0.25 * tezContainerSize (up to 40%)) #100%
✔ tez-site/tez.runtime.unordered.output.buffer.size-mb = 307, expects 307 (Memory when the output does not need to be sorted. 0.075 * hive.tez.container.size (up to 10%).) #100%
✔ tez-site/tez.task.resource.memory.mb = 1024, expects 1024 (Mem to be used by launched taks. == min container size. Overriden by hive to hive.tez.container.size anyway.) #100%
✔ tez-site/tez.task.launch.cmd-opts = 819, expects 819 (xmx = 0.8 * minContainerSize) #100%
✔ hive-site/hive.tez.java.opts = 3276, expects 3276 (xmx = 0.8 * tezContainerSize) #100%
✔ hive-site/hive.prewarm.enabled = true, expects true (Enable prewarm to reduce latency) #100%
✔ hive-site/hive.prewarm.numcontainers = 3, expects ‘lambda x: x >= 1’ (Hold containers to reduce latency, >= 1) #100%
✔ tez-site/tez.session.am.dag.submit.timeout.secs = 300, expects 300 (Tez Application Master waits for a DAG to be submitted before shutting down. Only useful when reuse is enabled.) #100%
✔ tez-site/tez.am.container.idle.release-timeout-min.millis = 10000, expects 10000 (Tez container min wait before shutting down. Should give enough time to an app to send the next query) #100%
✔ tez-site/tez.am.container.idle.release-timeout-max.millis = 20000, expects 20000 (Tez container min wait before shutting down) #100%
✔ tez-site/tez.am.view-acls = *, expects * (Enable tz ui access) #100%
✔ yarn-site/yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes = org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl, expects org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl (Set up tez UI) #100%
✔ mapred-site/mapreduce.job.acl-view-job = *, expects * (Enable tez ui for mapred jobs) #100%

Compress all
✔ mapred-site/mapreduce.map.output.compress = true, expects true #100%
✔ mapred-site/mapreduce.output.fileoutputformat.compress = true, expects true #100%
✔ hive-site/hive.exec.compress.intermediate = true, expects true #100%
✔ hive-site/hive.exec.compress.output = true, expects true #100%

Queue configuration. Assuming queue default is subqueue from root. Note that undefined values are inherited from parent.
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.maximum-am-resource-percent = 0.2, expects ‘lambda x: x != ‘NOT FOUND’ and float(x) >= 0.2′ (How much of the Q the AM can use. Must be at least 0.2.) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.ordering-policy = fair, expects fair (Helps small queries get a chunk of time between big ones) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.user-limit-factor = 2, expects ‘lambda x: x != ‘NOT FOUND’ and int(x) >= 1′ (How much of the Q capacity the user can exceed if enough resources. Should be at leat 1. 1=100%, 2=200%…) #100%
✔ capacity-scheduler/yarn.scheduler.capacity.root.default.minimum-user-limit-percent = 10, expects ‘lambda x: x != ‘NOT FOUND’ and int(x) >= 10′ (How much of the Q in percent a user is guaranteed to get. Should be at least 10) #100%

Random stuff
✔ hdfs-site/dfs.client.use.datanode.hostname = true, expects true (For AWS only) #100%

LLAP
✔ hive-interactive-env/enable_hive_interactive = false, expects false (Disable LLAP) #100%
FYI –

More doc can be found at:
Memory settings:
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_command-line-installation/content/determine-hdp-memory-config.html
https://community.hortonworks.com/articles/14309/demystify-tez-tuning-step-by-step.html
Hive performance tuning:
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_hive-performance-tuning/content/ch_hive-perf-tuning-intro.html
http://pivotalhd.docs.pivotal.io/docs/performance-tuning-guide.html
https://www.justanalytics.com/blog/hive-tez-query-optimization
llap:
https://community.hortonworks.com/questions/84636/llap-not-using-io-cache.html

Will not update the unexpected parameters without –update.