Git in pure Python: Full Dulwich example

For some reason I do not have and cannot have git installed on some servers, but I still needed to connect to github. Of course, I was bound to find some pure Python git module. There seems to be a few options, but the main one is Dulwich, which is even referenced in the git book.

Dulwich can do a lot with git (much more than what I needed). It can handle higher and lower level git APIs. The lower level are known as plumbing, the higher level as porcelain (because the porcelain is the interface between the user and the plumbing. If it feels like a toilet joke, it’s because it is.)

One issue I had while using Dulwich was to find documentation, mostly about the use of tokens and proxy with github https access.

Eventually, by reading some examples the code, I ended up finding everything I needed, soI give here a complete commented example of a (basic: clone/add/commit/push) porcelain flow.

You can find the file on my github as well.

from os import environ
from pathlib import Path
from tempfile import TemporaryDirectory

from dulwich import porcelain
from dulwich.repo import Repo
from urllib3 import ProxyManager

## Configuration settings:
# Source url of the repo (Note: https here. ssh would work as well, a bit differently).
GITURL = "https://github.com/lomignet/thisdataguy_snippets"
# Gihthub token: https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token
TOKEN = "12345blah"
## /end of configuration.

# If the environment variable https_proxy exists, we need to tell Dulwich to use a proxy.
if environ.get("https_proxy", None):
    pool_manager = ProxyManager(environ["https_proxy"], num_pools=1)
else:
    pool_manager = None

with TemporaryDirectory() as gitrootdir:

    gitdir = Path(gitrootdir) / "repo"
    print("Cloning...")
    repo = porcelain.clone(
        GITURL,
        password=TOKEN,
        # Tokens are kinda public keys, no need for a username but it still needs to be provided for Dulwich.
        username="not relevant",
        target=gitdir,
        checkout=True,
        pool_manager=pool_manager,
    )
    print("Cloned.")

    # Do something clever with the files in the repo, for instance create an empty readme.
    readme = gitdir / "readme.md"
    readme.touch()
    porcelain.add(repo, readme)

    print("Committing...")
    porcelain.commit(repo, "Empty readme added.")
    print("Commited.")

    print("Pushing...")
    porcelain.push(
        repo,
        remote_location=GITURL,
        refspecs="master", # branch to push to
        password=TOKEN,
        # Tokens are kinda public keys, no need for a username but it still needs to be provided for Dulwich.
        username="not relevant",
        pool_manager=pool_manager,
    )
    # Note: Dulwich 0.20.5 raised an exception here. It could be ignored but it was dirty:
    # File ".../venv/lib/python3.7/site-packages/dulwich/porcelain.py", line 996, in push
    #     (ref, error.encode(err_encoding)))
    # AttributeError: 'NoneType' object has no attribute 'encode'
    # Dulwich 0.20.6 fixed it.
    print("Pushed.")

Advertisement

Where exactly is this block on HDFS?

This post will show you how to find out where a specific hdfs block is: on which server and on which disk of this server.

Context

I needed to decommission a directory from hdfs (updating dfs.datanode.data.dir). This is not a big deal because the default replication factor is 3. Removing a disk would just trigger a rebalance.

Safety checks

Just for safety, I first wanted to check if all blocks were properly replicated. This is easy to check with the following command:

 hdfs fsck / -files -blocks -locations | grep repl=1 -B1

What does it do?

  • hdfs fsck /
    • run hdfs checks from the root
  •  -files -blocks -locations
    • Display file names, block names and location
  • | grep repl=1
    • show only blocks with replication 1
  • -B1
    • But please display the previous line as well to get the actual file name

If you’re good (all files are properly replicated) you would get an empty output. Otherwise, you get a bunch of those lines in the output:

/a/dir/a/file 2564 bytes, replicated: replication=1, 1 block(s): OK
0. BP-1438592571-10.88.112.28-1502096897275:blk_1077829561_4348908 len=2564 Live_repl=1 [DatanodeInfoWithStorage[10.1.2.3:9866,DS-f935a126-2226-4ef8-99a6-20d700f06110,DISK]]
--
/another/dir/another/file 2952 bytes, replicated: replication=1, 1 block(s): OK
0. BP-1438592571-10.88.112.28-1502096897275:blk_1077845856_4366930 len=2952 Live_repl=1 [DatanodeInfoWithStorage[10.2.3.4:9866,DS-1d065d48-f887-4ed5-be89-5e9c79633519,DISK]]

Technically, for me this was an error, which I could fix by forcing the replication to 3:

hdfs dfs -setrep 3 /a/dir/a/file

Where are my blocks?

In other words, are there unreplicated blocks on the disk I am about to remove?

There might be good reasons to have a replication factor of 1, and you then want to be sure that none of the blocks are on the disk you will remove.  How can you do that?

Looking at the output of the previous command, specifically the DatanodeInfoWithStorage bit, you can find out some interesting information already:

10.2.3.4:9866,DS-1d065d48-f887-4ed5-be89-5e9c79633519,DISK
  • 10.2.3.4:9866 this is the server where the block is, 9866 is the default datanode port,
  • DISK: good, the data is stored on disk,
  • DS-1d065d48-f887-4ed5-be89-5e9c79633519: this looks like a disk ID. What does it mean?

Looking at the source on github does not help much: this is a string, named storageID. What now?

It turns out that this storage ID is in a text file on every directory listed in dfs.datanode.data.dir. Look at one of those, you will find the file current/VERSION, which looks like:

#Tue Apr 07 13:49:10 CEST 2020
storageID=DS-dc5bed87-addb-4575-b2e3-6cbb114e4700
clusterID=cluster16
cTime=0
datanodeUuid=b0d3af53-3320-4833-8063-a13720f84bae
storageType=DATA_NODE
layoutVersion=-57

And there you are, there is the storageID, which matches what was displayed via the hdfs command.

This was the missing link to exactly know on which disk you block was.

Bridge parameter with multi-level hierarchy in Dundas

Sometimes you need to send parameters to a data cube, which you cannot just handle via the slicers. In that case, the bridge parameters are awesome.

You can define inside your data cube a parameter (single, range, hierarchy…) and bind it to one or more placeholders in your data cube query. Then from your dashboard you can add filters bound to this bridge parameter.

I ended up using them a lot for a 3 level hierarchy, and this post explains how to do it.

Your parameter will be a “Single Member” (in opposition to range member (e.g. from and to for a calendar) or single number/string). This member will have multiple levels, depending on the hierarchy it’s bound to.

Indeed, when you create a bridge parameter, you bind it to a hierarchy. Then each level in the hierarchy is bound to a name which you will receive from the value in your parameter. In the example below, Top/Mid/Low Level are names coming from my hierarchy, A/B/C are generated by Dundas.

levels_vs_names

Those names can be used inside the Dundas script binding the bridge parameter and the placeholders. You just need to attach the bridge parameter to the placeholders, $top$, $mid$, $low$.

attach

You will need 3 slightly different codes for each level. In my example, if the level is not filled I return a different default value, so the code is not nice to factorise, especially as it’s only 7 lines long.

This is the code for the top level. It is always filled, but there could be a second or more level.

// always Resolve() first, to handle unresolved tokens.
SingleMemberValue h = (SingleMemberValue)$input$.Resolve();
// eg. top.mid.low.C or top.mid.B or top.A
string fullName = h.Value.UniqueName;
// Just return first element
return fullName.Split('.')[0];

The second level might be empty. In that case I return -1.

// always Resolve() first, to handle unresolved tokens.
SingleMemberValue h = (SingleMemberValue)$input$.Resolve();
// eg. top.mid.low.C or top.mid.B or top.A
String fullName = h.Value.UniqueName;
String[] bits=fullName.Split('.');
if (bits.Length >= 3) {
  // Note that Strings are coming in, but I need to convert to int
  return Convert.ToInt32(bits[1])
} else {
  return -1
}

Same for my third level, might be empty, is an int.

// always Resolve() first, to handle unresolved tokens.
SingleMemberValue h = (SingleMemberValue)$input$.Resolve();
// eg. top.mid.low.C or top.mid.B or top.A
String fullName = h.Value.UniqueName;
String[] bits=fullName.Split('.');
if (bits.Length >= 4) {
  // Note that Strings are coming in, but I need to convert to int
  return Convert.ToInt32(bits[2])
} else {
  return -1
}

Dundas: states with constant

Within Dundas, you can set up states, which are great to be able to change some display depending on some values.

On a table, you might, for instance, want to show in green all values greater than 60% and in red all values less than 20%. This is easy enough by following the states documentation.

If you are not careful, though, you might end up with coloration not matching what you expect. This is especially true when you have total rows at different levels of a hierarchy. The first level will have the right colors, but others not (for instance states for days are ok, but not for month and year).

You need to know that states are a metric set feature and calculated at the server, not by visualisations. States have thus an aggregator, which by default is Sum. It means that if you have a constant of 0.6 (60%), the level higher in the hierarchy will check against twice this (so 120%) and so on.

To change this, you just need to change the aggregator. For a constant, you could use min, max or average, they would all give the same value, which is the one you expect.

Hive and ODBC confusions

Hive has no official ODBC drivers since version 3 at least. All Hadoop distributions (and Microsoft) distribute the ODBC driver from Simba. It works OK if you can use native queries (the driver passes the query as-is to Hive) or if you query is simple. Otherwise, the driver tries to be smart and fails miserably.

As I am neither a customer of Simba nor of Hortonworks, I cannot send a bug report. I asked on the Hortonworks community, but I feel quite isolated. I will share here a few of my experience, and hopefully, a good soul might pop by and tell me what I am doing wrong (or join me in whinging about this driver).

I should note that I cannot use native queries because I need to use parametrised statements, which are not available with native queries.

Parse Error

Syntax or semantic analysis error thrown in server while executing query. Error message from server: Error while compiling statement: FAILED: ParseException line 25:29 cannot recognize input near ‘?’ ‘and’ ‘s’ in expression specification

You will get that one a lot. Basically, on any error, this is what you will get, with the place of the error being your first question mark. I thought for a long time that the driver was completely borked, but actually no (just majorly, not completely). If you enable logging (LogLevel=4 and eg. LogPath=/tmp/hivelogs) in your obcinst.ini you will be able to see the inner error, which is a lot more informative.

unix_timestamp

Any query using unix_timestamp will give you

unix_timestamp is not a valid scalar function or procedure call

My guess is that the driver mixes up with unix_timestamp(), with no parameters, which is deprecated. As a workaround, you can cast your date as bigint, which works the same. I was proud of myself with this workaround, but look below (Cast) for the issues this causes.

CTE

They are the best thing in SQL with the analytics functions. The driver does not support them:

  syntax error near ‘with<<< ??? >>> init as (select ? as lic, ? as cpg) select * from init’.

The solution is, of course, to use subqueries instead.

‘floor’ is a reserved keyword.

Yes, I agree that it’s reserved, but because it’s an actual function. I should not have this error when I am using eg. floor(42) in a query.

This one surprises me because a simple query (select floor(42)) will succeed, whereas the same line use in a more complex query will give fail. I see from the logs that the driver shows the error but is somehow able to recover for simple queries, not for complex queries.

Cast does not only returns string

Casting to dates as bigint and taking a diff:

select cast(cast('2019-01-21 01:32:32' as timestamp) as bigint) - cast(cast('2019-02-21 01:32:32' as timestamp) as bigint) as tto

fails as well:

Operand types SQL_WCHAR and SQL_WCHAR are incompatible for the binary minus operator

Same as for floor, in some cases the driver recovers, sometimes not.

 

Accessing Dundas with Python

Dundas has a great REST API. You can basically do everything with. Furthermore, it’s easy to find examples, as you just have to look at what your Dundas web app does, and you have all the examples and use cases you can wish for.

I wanted to schedule cube refreshes, so I naturally turned toward Python. It wasn’t too complex, as you will see below. The one thing to take care of is logging out, in all possible cases, otherwise you will burn very fast through your elastic hours (been there, done that). I’ll show you here how I did it with a context manager, which means that I can basically not forget to log out, whatever happens, it’s all managed for me.

In my code in production, this object actually does something (refreshes cubes), this is only a skeleton to get you started. You can find the code as well on my github, it might be easier to read.

The code actually does not do much: the call to __new__ associated with contextlib.closing() allows you to use the object DundasSession within a context manager, with the keyword with, thus guaranteeing you that you will always log out, no matter what, even if an exception or a sys.exit occurs.

#!/usr/bin/env python3

"""
Skeleton to use Dundas Rest API, with guaranteed log out.
"""
import contextlib
import logging
import requests
import sys

class DundasSession:
    """
    Using __new__ + contextlib.closing() is awesome.

    DundasSession can now be used within a context manager, meaning that whatever happens, its close() method
    will be called (thus logging out). No need to call logout() explicitely!
    Just using __del__ is not guaranteed to work because when __del__ is called, you do not know which objects
    are already destroyed, and the session object might well be dead.
    """
    def __new__(cls, *args, **kwargs):
        o=super().__new__(cls)
        o.__init__(*args, **kwargs)
        return contextlib.closing(o)

    def __init__(self, user, pwd, url):
        # For session reuse - TCP connection reuse, keeps cookies.
        self.s = requests.session()

        self.user = user
        self.pwd = pwd
        self.url = url
        self.api = self.url + '/api/'
        self.session_id = None  # Will bet set in login()

    def login(self):
        """Login and returns the session_id"""
        login_data = {
            'accountName': self.user,
            'password': self.pwd,
            'deleteOtherSessions': False,
            'isWindowsLogOn': False
        }
        logging.info('Logging in.')
        r = self.s.post(self.api + 'logon/', json=login_data)
        # The following line exceptions out on not 200 return code.
        r.raise_for_status()

        resp = r.json()
        if resp['logOnFailureReason'].lower() == "none":
            # We're in!
            logging.info('Logged in')
            self.session_id = resp['sessionId']
        else:
            logging.error('Login failed with message: ' + r.text)
            sys.exit(1)

    def close(self):
        """Automagically called by the context manager."""
        self.logout()

    def logout(self):
        """If you do not logout, session will stay active, potentially burning through your elastic hours very fast."""

        # If session_id is not defined, we did not even log in (or we are already logged out).
        logging.info('Logging out.')
        if getattr(self, 'session_id', None):
            r = self.s.delete(self.api + 'session/current', params={'sessionId': self.session_id})
            r.raise_for_status()
            del self.session_id
            logging.info('Logged out.')
        else:
            logging.info('Was not yet Logged in.')

logging.basicConfig(level='INFO')

with DundasSession(user='yourapiuser', pwd='pwd', url='https://reports.example.com') as dundas:
    dundas.login()

    # Do something smart with your Dundas object.

# No need to log out, this is handled for you via the context manager, even in case of exception or even sys.exit.

Reaching Hive from pyspark on HDP3

There is a lot to find about talking to hive from Spark on the net. Sadly most of it refers to Spark before version 2 or are not valid for hdp3. You need to use the Hive Warehouse Connector, bundled in HDP3.

This is an example of a minimalistic connection from pyspark to hive on hdp3.

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Yes, llap even if you do not use it.
from pyspark_llap import HiveWarehouseSession

settings = [
    ('spark.sql.hive.hiveserver2.jdbc.url',
     'jdbc:hive2://{your_hiverserver2_url:port}/default'),
]

conf = SparkConf().setAppName("Pyspark and Hive!").setAll(settings)
# Spark 2: use SparkSession instead of SparkContext.
spark = (
    SparkSession
    .builder
    .config(conf=conf)
    .master('yarn')
    # There is no HiveContext anymore either.
    .enableHiveSupport()
    .getOrCreate()
)

# This is mandatory. Just using spark.sql will not be enough.
hive = HiveWarehouseSession.session(spark).build()

hive.showDatabases().show()
hive.execute("select 2 group by 1 order by 1").show()
spark.stop()

You then can run this with the following command:

HDP_VERSION=3.0.1.0-187 \
PYSPARK_PYTHON=python3 \
HADOOP_USER_NAME=hive \
SPARK_HOME=/usr/hdp/current/spark2-client \
spark-submit \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \
--py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \
{your_python_script.py}

Note:

  • HDP_VERSION is needed when you use python 3. If not set, HDP uses a script (/usr/bin/hdp-select) which is python 2 only (although fixing it is trivial).
  • PYSPARK_PYTHON is optional, it will default to just python otherwise (which might or might not be python 3 on your server)
  • without HADOOP_USER_NAME the script will run as your current user. Alternatively, you could sudo first.
  • without SPARK_HOME some jars would not be found and you would end up with an error like py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
    : java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
  • –jars and –py-files as you can see, there is the hdp version in file names. Make sure you are using the proper one for you.
  • there is no –master option, this is handled in the script while building the SparkSession.

There is some doc from Hortonworks you can follow to go further: Integrating Apache Hive with Spark and BI.

Just before I posted this article, a new write-up appeared on Hortonworks.com to describe some use cases for the Hive-Warehouse-Connector.

Find a timezone offset in pure SQL in hive

Timezones are a pain. This is not new and every time you deviate from UTC this will bite you. That said sometimes you have to deviate from UTC, especially for the final display of a date if you want to show it in the local timezone from the reader. In that case, adding an offset to be explicit will save some questions and uncertainty down the line.

There is no function get_offset_from_tz() in Hive, sadly. Using reflect() does not work either as the method call is to complex for reflect. Writing a UDF would be possible but feels overkill.

The solution I give here works in Hive and should probably work in all SQL variants as well apart from the variables.

The algorithm to find the offset is easy:

  • get the time in UTC,
  • get the same in another timezone,
  • subtract one from the other to get the offset,
  • format the offset in a standard way.

The main issue is that you cannot assign results to variables in SQL, meaning that many computations need to be duplicated. They will be optimised away, of course, but they make for an ugly code.

In hive, luckily, you can use variables. They cannot store results but are used as-is, a bit like macros, where the variable name is just replaced by its content which can be some piece of code.

This sets up the date to find the offset for as well as a few TZ for test.

-- Date to display. If you use this from a table you can
-- put here the column that would be used, eg. t.logdate.
set hivevar:D='2018-06-01 01:02:02';

-- A few tests:
-- positive offset +02:00 (in summer)
set hivevar:DISPLAY_TZ='Europe/Amsterdam';

-- negative offset -04:00 (in summer)
set hivevar:DISPLAY_TZ='America/New_York';

-- 0 offset
set hivevar:DISPLAY_TZ='UTC';

-- Non integer offset: +09:30
set hivevar:DISPLAY_TZ='Australia/Adelaide';

Those are the macros


-- Date displayed in the right TZ
set hivevar:dateintz=DATE_FORMAT(FROM_UTC_TIMESTAMP(${D}, ${DISPLAY_TZ}),"yyyy-MM-dd HH:mm:ss");
-- Offset in interval type
set hivevar:delta=cast(${dateintz} as timestamp) - cast(${D} as timestamp);

And the code itself, tiny and readable once variables are used:

select
  concat(
    -- date in TZ
    ${dateintz}

    -- sign
    , if(${delta} < interval '0' minute, '-', '+')

    -- hour
    , lpad(abs(hour(${delta})), 2, 0)

    , ':'

    -- minute
    ,lpad(minute(${delta}), 2, 0)
) as dtwithoffset
;

et voilà.

Hive: add a column pitfalls

Adding a column to an existing table is easy:


ALTER TABLE tbl ADD COLUMNS (new_col TIMESTAMP)

Easy right? Not always.

As the doc says,

The column change command will only modify Hive’s metadata, and will not modify data. Users should make sure the actual data layout of the table/partition conforms with the metadata definition.

What this means is that this command will change the table metadata, but not the partition metadata and this column will appear as NULL in select queries.

The solution is then easy, just add the CASCADE keyword:


ALTER TABLE tbl ADD COLUMNS (new_col TIMESTAMP) CASCADE

Then partitions will be updated as well.

Easy right? Not always.

If you run this command, with CASCADE, on a table without partition, you will end up with this non-descriptive error:

Error: Error while compiling statement: FAILED: NullPointerException null (state=42000,code=40000)

In short:

  • if you have a partitioned table you must use CASCADE.
  • if you do not have partitions, you must not use CASCADE.

Why is my hive MERGE statement slow?

In my ETL flow, I need to merge a source table in a destination table, in Hive. This turned out to be much slower than expected so I had to dig around a lot and these are the results I discovered.

Context

Some data is coming from kafka, written as avro files on hdfs. These avro files are used to create an external table, which is then merged every day into the final ORC table. The external data files are then moved out of the way, meaning that the next ETL run will have a brand new external table to be fully merged into the destination table.

SQL


set hive.merge.cardinality.check=false;
set domainregexp='.*@(.*?)$';
MERGE INTO contact dst
USING (
  SELECT

    -- DISTINCT fields
      client -- partition column
    , user_id as id
    , ct.cid as cid
    -- other fields
     , email
     , lang
     -- note: domain is around here, but is computed from email. I compute
     -- it only when needed to prevent useless processing.
    , CAST(timestamp_ms_utc AS TIMESTAMP) AS ts_utc

    , ROW_NUMBER() OVER (
      PARTITION BY client
        , ct.cid
        , user_id
      ORDER BY timestamp_ms_utc DESC
     ) as r

  FROM
    external table
-- campaign_id is a stupid struct<long:bigint,array:array<bigint>>.
  -- Let's sanitise it.
  LATERAL VIEW explode(campaign_id) ct AS cid
) src
ON
  dst.client = src.client
  AND dst.campaign_id = src.cid
  AND dst.id = src.id

-- On match: keep latest loaded
WHEN MATCHED
 AND dst.updated_on_utc < src.ts_utc
 AND src.r = 1
THEN UPDATE SET
  -- other fields
    email = src.email
  , domain = regexp_extract(src.email, ${hiveconf:domainregexp}, 1)
  , lang = src.lang
  , updated_on_utc = src.ts_utc

WHEN NOT MATCHED AND src.r = 1 THEN INSERT VALUES (
   src.id
 , src.cid

 , src.email
 , regexp_extract (src.email, ${hiveconf:domainregexp}, 1)
 , src.lang

 , src.ts_utc -- insert_date
 , src.ts_utc -- update_date

 , src.client -- partition column
)
;

This statement:

  • reads the source table,
  • explodes an array (campaign_id),
  • orders the rows within the same ‘unique’ key (ROW_NUMBER()),
  • updates or inserts the first unique row.

Problem

Merging takes exponentially longer. Merging the first day into the (empty) destination table takes about 30 minutes. The second day takes about 1.5 hour. The third day takes 4 hours. I stopped there.

What could go wrong?

Many things as it turned out.

(Attempted) Solutions

SQL tweaking

My first guess was that my SQL was not great. Here is what I tried:

  • Removing the regex. No impact.
  • Create a temporary table without duplicates and merge that one. Negative impact (4x longer).
  • Execute the merge per partition, one by one. Very negative impact.
  • Replace the source table by a subquery to filter out the r=1 before the merge. Negative impact (20% longer).
  • Create a table with exaclty the same structure as the destination table as a temporary table and merge that one. Negative impact (30% longer).
  • Pre-explode the lateral view earlier in the process (25% longer).

Apparently my SQL was quite good, so I had to look elsewhere.

Java heap

It turned out that many of my services were under configured. I increased the datanode heap, namenode heap, hive metastore heap and this all already made a big difference in speed but it was not enough.

Small files

This was a massive issue.

I had 3 source files per table per minute. On top of this, I had some aggressive partitioning and bucketing (buckets are mandatory for ACID tables, ACID tables are mandatory for a merge).

Updating this to have 3 source files per hour and having only 4 buckets per table instead of 64 gave me great performance. I am still not fully clear about the impact of bucketing but this will be a question for later if I notice other performance problem. I have enough on my plate to not do premature optimisation.

Final solution

In four words: bigger heap, less files.

My initial 30 minute merge in an empty table is now done in about 8 minutes in a table with 145M rows in which 35M are merged daily.