Connect SFTP to S3 in AWS glue

AWS has a lot of data transfer tools, but none of them can actually transfer from SFTP to S3 out of the box.

Luckily Glue is very flexible, and it is possible to run a pure python script there.

Without further ado, a basic python script, which can run in Glue (as well as locally), and will read all files in the root of a SFTP server to upload them into a S3 bucket.

import boto3
import paramiko

s3 = boto3.resource("s3")
bucket = s3.Bucket(name="destination-bucket")
bucket.load()


ssh = paramiko.SSHClient()
# In prod, add explicitly the rsa key of the host instead of using the AutoAddPolicy:
# ssh.get_host_keys().add('example.com', 'ssh-rsa', paramiko.RSAKey(data=decodebytes(b"""AAAAB3NzaC1yc2EAAAABIwAAAQEA0hV...""")))
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh.connect(
    hostname="sftp.example.com",
    username="thisdataguy",
    password="very secret",
)

sftp = ssh.open_sftp()

for filename in sftp.listdir():
    print(f"Downloading {filename} from sftp...")
    # mode: ssh treats all files as binary anyway, to 'b' is ignored.
    with sftp.file(filename, mode="r") as file_obj:
        print(f"uploading  {filename} to s3...")
        bucket.put_object(Body=file_obj, Key=f"destdir/{filename}")
        print(f"All done for {filename}")

There is only one thing to take care of. Paramiko is not available by default in Glue, so in the job setup you need to point the Python lib path to a downloaded wheel of paramiko on S3.

Git in pure Python: Full Dulwich example

For some reason I do not have and cannot have git installed on some servers, but I still needed to connect to github. Of course, I was bound to find some pure Python git module. There seems to be a few options, but the main one is Dulwich, which is even referenced in the git book.

Dulwich can do a lot with git (much more than what I needed). It can handle higher and lower level git APIs. The lower level are known as plumbing, the higher level as porcelain (because the porcelain is the interface between the user and the plumbing. If it feels like a toilet joke, it’s because it is.)

One issue I had while using Dulwich was to find documentation, mostly about the use of tokens and proxy with github https access.

Eventually, by reading some examples the code, I ended up finding everything I needed, soI give here a complete commented example of a (basic: clone/add/commit/push) porcelain flow.

You can find the file on my github as well.

from os import environ
from pathlib import Path
from tempfile import TemporaryDirectory

from dulwich import porcelain
from dulwich.repo import Repo
from urllib3 import ProxyManager

## Configuration settings:
# Source url of the repo (Note: https here. ssh would work as well, a bit differently).
GITURL = "https://github.com/lomignet/thisdataguy_snippets"
# Gihthub token: https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token
TOKEN = "12345blah"
## /end of configuration.

# If the environment variable https_proxy exists, we need to tell Dulwich to use a proxy.
if environ.get("https_proxy", None):
    pool_manager = ProxyManager(environ["https_proxy"], num_pools=1)
else:
    pool_manager = None

with TemporaryDirectory() as gitrootdir:

    gitdir = Path(gitrootdir) / "repo"
    print("Cloning...")
    repo = porcelain.clone(
        GITURL,
        password=TOKEN,
        # Tokens are kinda public keys, no need for a username but it still needs to be provided for Dulwich.
        username="not relevant",
        target=gitdir,
        checkout=True,
        pool_manager=pool_manager,
    )
    print("Cloned.")

    # Do something clever with the files in the repo, for instance create an empty readme.
    readme = gitdir / "readme.md"
    readme.touch()
    porcelain.add(repo, readme)

    print("Committing...")
    porcelain.commit(repo, "Empty readme added.")
    print("Commited.")

    print("Pushing...")
    porcelain.push(
        repo,
        remote_location=GITURL,
        refspecs="master", # branch to push to
        password=TOKEN,
        # Tokens are kinda public keys, no need for a username but it still needs to be provided for Dulwich.
        username="not relevant",
        pool_manager=pool_manager,
    )
    # Note: Dulwich 0.20.5 raised an exception here. It could be ignored but it was dirty:
    # File ".../venv/lib/python3.7/site-packages/dulwich/porcelain.py", line 996, in push
    #     (ref, error.encode(err_encoding)))
    # AttributeError: 'NoneType' object has no attribute 'encode'
    # Dulwich 0.20.6 fixed it.
    print("Pushed.")

Permanently remove a Kudu data disk

I needed to permanently remove a data disk from Kudu. In my case, this disk had way too many IOs and I needed to have Kudu not writing to it anymore. This post explains how to do this, safely.

Sanity checks

First, you need to make sure that there are no tables with replication factor 1. If by bad luck some tablet of this table are on the disk you will remove, then the table would become unavailable. Note that the user running this command must be in the superuser_acl  list of Kudu (replace of course ${kudu_master_host} with the real hostname).

kudu cluster ksck ${kudu_master_host} | grep '| 1 |' | cut -f2 ' '

If there are tables there, you need to

  • either DROP them
  • either recreate them with a higher replication factor. You cannot change the replication factor of an existing table.

Technically, there are other options, but they are trickier:

  • I could kudu tablet change_config move_replica tablets for all tables with RF 1 from eg. server 1 to server 2, then remove the directory for server 1, rebalance, then rinse and repeat from server 2 to 3 and so on. Note that you can only move tablet between servers, not disks, so if can take a while if you have many servers.
  • I could move the data directory from one disk to another disk as not whole disks are used by Kudu but only subdirectories. As all other disks already had Kudu data directories in my case, this would have meant that a disk would receive twice as many IOs.

Start a rebalance. After this the data will be properly spread, and more importantly we know that rebalance can happen.

kudu cluster rebalance ${kudu_master_host}

Stop kudu.

Remove a disk

Note: do this node per node! It should be possible to do 2 at a time, but I haven’t tested it. If you use Cloudera manager, you need to use config groups.

Remove the path to directory you want to remove from fs_data_dirs.
While kudu is still stopped, tell kudu on the tablet server which configuration you just changed, that there is now 1 less disk:

sudo -u kudu kudu fs update_dirs --force --fs_wal_dir=[your wal directory] --fs_data_dirs=[comma separated list of remaining directories]

Restart kudu. Data will be automatically rebalanced.

Congrats, go to your next node once all tablets are happy (kudu cluster ksck ${kudu_master_host} does not return any error).

Thousand separators in Hue

Hue is a very handy SQL assistant for Hadoop, where you can easily run Hive or Impala query.

I was asked if it was possible to have thousands separator in the display of the query results. There is no option in Hue, I thought I could get away with Django environment variables but either it’s not possible, or I got it wrong.

In any case, it did not feel great. You could argue that Hue is a display tool so it’s OK to format the output, but it would be for all users, and they might not all want that…

Long story short, I removed my Hadoop administrator cap and put on my dirty creative one. Once the results are loaded, it’s a trivial javascript manipulation to format them. Furthermore, Hue uses jquery which makes it even easier. So I came up with this little bookmarklet. Put it in the URL field of a bookmark, including the ‘javascript:’ prefix. If you want to format your output, just click on the bookmarklet et voilà:

javascript:(
  function(){
    $('table.resultTable td').each(
      function(){
        if (!isNaN($(this).text())) {
          $(this).text($(this).text().toString().replace(/(\d)(?=(\d{3})+(?!\d))/g, '$1,'))
        }
      }
    )
  }()
);

Where exactly is this block on HDFS?

This post will show you how to find out where a specific hdfs block is: on which server and on which disk of this server.

Context

I needed to decommission a directory from hdfs (updating dfs.datanode.data.dir). This is not a big deal because the default replication factor is 3. Removing a disk would just trigger a rebalance.

Safety checks

Just for safety, I first wanted to check if all blocks were properly replicated. This is easy to check with the following command:

 hdfs fsck / -files -blocks -locations | grep repl=1 -B1

What does it do?

  • hdfs fsck /
    • run hdfs checks from the root
  •  -files -blocks -locations
    • Display file names, block names and location
  • | grep repl=1
    • show only blocks with replication 1
  • -B1
    • But please display the previous line as well to get the actual file name

If you’re good (all files are properly replicated) you would get an empty output. Otherwise, you get a bunch of those lines in the output:

/a/dir/a/file 2564 bytes, replicated: replication=1, 1 block(s): OK
0. BP-1438592571-10.88.112.28-1502096897275:blk_1077829561_4348908 len=2564 Live_repl=1 [DatanodeInfoWithStorage[10.1.2.3:9866,DS-f935a126-2226-4ef8-99a6-20d700f06110,DISK]]
--
/another/dir/another/file 2952 bytes, replicated: replication=1, 1 block(s): OK
0. BP-1438592571-10.88.112.28-1502096897275:blk_1077845856_4366930 len=2952 Live_repl=1 [DatanodeInfoWithStorage[10.2.3.4:9866,DS-1d065d48-f887-4ed5-be89-5e9c79633519,DISK]]

Technically, for me this was an error, which I could fix by forcing the replication to 3:

hdfs dfs -setrep 3 /a/dir/a/file

Where are my blocks?

In other words, are there unreplicated blocks on the disk I am about to remove?

There might be good reasons to have a replication factor of 1, and you then want to be sure that none of the blocks are on the disk you will remove.  How can you do that?

Looking at the output of the previous command, specifically the DatanodeInfoWithStorage bit, you can find out some interesting information already:

10.2.3.4:9866,DS-1d065d48-f887-4ed5-be89-5e9c79633519,DISK
  • 10.2.3.4:9866 this is the server where the block is, 9866 is the default datanode port,
  • DISK: good, the data is stored on disk,
  • DS-1d065d48-f887-4ed5-be89-5e9c79633519: this looks like a disk ID. What does it mean?

Looking at the source on github does not help much: this is a string, named storageID. What now?

It turns out that this storage ID is in a text file on every directory listed in dfs.datanode.data.dir. Look at one of those, you will find the file current/VERSION, which looks like:

#Tue Apr 07 13:49:10 CEST 2020
storageID=DS-dc5bed87-addb-4575-b2e3-6cbb114e4700
clusterID=cluster16
cTime=0
datanodeUuid=b0d3af53-3320-4833-8063-a13720f84bae
storageType=DATA_NODE
layoutVersion=-57

And there you are, there is the storageID, which matches what was displayed via the hdfs command.

This was the missing link to exactly know on which disk you block was.

Hive and integer overflows

I was playing around in Hive (3, from HDP) with big integers when I noticed something weird:

select 9223372036854775807 * 9223372036854775807;
-- 1

select 9223372036854775807 + 1;
-- -9223372036854775808

It turns out that Hive silently overflows integers. This comes from java, which does the same.

It’s lucky I noticed, it could have been very painful for me down the line. The workaround is to use big decimal. As the doc says:

Integral literals larger than BIGINT must be handled with Decimal(38,0). The Postfix BD is required.

For instance:

select 9223372036854775807BD * 9223372036854775807BD;
-- 85070591730234615847396907784232501249

select 9223372036854775807BD + 1;
-- 9223372036854775808

But it is still somewhat weird. Overflows with big decimals won’t error out but will return null:

select 9223372036854775807BD
  * 9223372036854775807BD
  * 9223372036854775807BD
;
-- NULL

Furthermore, if the precision is not 0 some behaviours are not consistent:

create temporary table dec_precision(
  d decimal(38, 18)
);
insert into dec_precision values
    (98765432109876543210.12345)
  , (98765432109876543210.12345)
;

select sum(d) from dec_precision;
-- NULL (but why?)
select sum(98765432109876543210.12345BD) from dec_precision;
-- 197530864219753086420.24690 (as expected)
select 98765432109876543210.12345BD + 98765432109876543210.12345BD;
-- 197530864219753086420.24690 (as expected)

Conversely, Mysql

select 9223372036854775807 * 9223372036854775807;
-- ERROR 1690 (22003): BIGINT value is out of range in '9223372036854775807 * 9223372036854775807'

or Postgres

select 2147483647 * 2147483647;
ERROR:  integer out of range

are a lot safer and friendlier in that regard.

Dundas Rest API and Python: introducing PyDundas

Dundas has a very complete REST API. You can do just about everything with it. It is what the web app uses, so if you are in need of an example, you can just look at the queries which are sent with the developer tools of your favorite browser.

That said, it is a bit of a pain to use in a script.

To make my life easy, I built PyDundas: a python package using in the background the API. This lets you abstract away all the nitty-gritty and you can concentrate on semantically pleasing code.

For instance, warehousing a cube is quite simple. It abstracts out for you logging, checking, waiting and more.

from pydundas import Api, Session, creds_from_yaml
import sys
import json
creds = creds_from_yaml('credentials.yaml')

with Session(**creds) as d:
    api = Api(d)
    capi = api.cube()
    cube = capi.getByPath('Awesome Project', '/relevant/path')
    if cube is None:
        print("Gotcha, no cube named like that.")
        sys.exit(1)
    print(cube.json())
    if not cube.is_checked_out():
        cube.clear_cache()

    cube.warehouse()
    print(cube.isWarehousing())
    cube.waitForWarehousingCompletion()
    print('Done')

There is a lot more that can be done with PyDundas. I developed it for my needs, so I only added what I actually needed and is thus far from complete. That said, I regularly update it and add new APIs, and it is open source so feel free to send me pull requests to expand it.

Bridge parameter with multi-level hierarchy in Dundas

Sometimes you need to send parameters to a data cube, which you cannot just handle via the slicers. In that case, the bridge parameters are awesome.

You can define inside your data cube a parameter (single, range, hierarchy…) and bind it to one or more placeholders in your data cube query. Then from your dashboard you can add filters bound to this bridge parameter.

I ended up using them a lot for a 3 level hierarchy, and this post explains how to do it.

Your parameter will be a “Single Member” (in opposition to range member (e.g. from and to for a calendar) or single number/string). This member will have multiple levels, depending on the hierarchy it’s bound to.

Indeed, when you create a bridge parameter, you bind it to a hierarchy. Then each level in the hierarchy is bound to a name which you will receive from the value in your parameter. In the example below, Top/Mid/Low Level are names coming from my hierarchy, A/B/C are generated by Dundas.

levels_vs_names

Those names can be used inside the Dundas script binding the bridge parameter and the placeholders. You just need to attach the bridge parameter to the placeholders, $top$, $mid$, $low$.

attach

You will need 3 slightly different codes for each level. In my example, if the level is not filled I return a different default value, so the code is not nice to factorise, especially as it’s only 7 lines long.

This is the code for the top level. It is always filled, but there could be a second or more level.

// always Resolve() first, to handle unresolved tokens.
SingleMemberValue h = (SingleMemberValue)$input$.Resolve();
// eg. top.mid.low.C or top.mid.B or top.A
string fullName = h.Value.UniqueName;
// Just return first element
return fullName.Split('.')[0];

The second level might be empty. In that case I return -1.

// always Resolve() first, to handle unresolved tokens.
SingleMemberValue h = (SingleMemberValue)$input$.Resolve();
// eg. top.mid.low.C or top.mid.B or top.A
String fullName = h.Value.UniqueName;
String[] bits=fullName.Split('.');
if (bits.Length >= 3) {
  // Note that Strings are coming in, but I need to convert to int
  return Convert.ToInt32(bits[1])
} else {
  return -1
}

Same for my third level, might be empty, is an int.

// always Resolve() first, to handle unresolved tokens.
SingleMemberValue h = (SingleMemberValue)$input$.Resolve();
// eg. top.mid.low.C or top.mid.B or top.A
String fullName = h.Value.UniqueName;
String[] bits=fullName.Split('.');
if (bits.Length >= 4) {
  // Note that Strings are coming in, but I need to convert to int
  return Convert.ToInt32(bits[2])
} else {
  return -1
}

Dundas: states with constant

Within Dundas, you can set up states, which are great to be able to change some display depending on some values.

On a table, you might, for instance, want to show in green all values greater than 60% and in red all values less than 20%. This is easy enough by following the states documentation.

If you are not careful, though, you might end up with coloration not matching what you expect. This is especially true when you have total rows at different levels of a hierarchy. The first level will have the right colors, but others not (for instance states for days are ok, but not for month and year).

You need to know that states are a metric set feature and calculated at the server, not by visualisations. States have thus an aggregator, which by default is Sum. It means that if you have a constant of 0.6 (60%), the level higher in the hierarchy will check against twice this (so 120%) and so on.

To change this, you just need to change the aggregator. For a constant, you could use min, max or average, they would all give the same value, which is the one you expect.

Hive and ODBC confusions

Hive has no official ODBC drivers since version 3 at least. All Hadoop distributions (and Microsoft) distribute the ODBC driver from Simba. It works OK if you can use native queries (the driver passes the query as-is to Hive) or if you query is simple. Otherwise, the driver tries to be smart and fails miserably.

As I am neither a customer of Simba nor of Hortonworks, I cannot send a bug report. I asked on the Hortonworks community, but I feel quite isolated. I will share here a few of my experience, and hopefully, a good soul might pop by and tell me what I am doing wrong (or join me in whinging about this driver).

I should note that I cannot use native queries because I need to use parametrised statements, which are not available with native queries.

Parse Error

Syntax or semantic analysis error thrown in server while executing query. Error message from server: Error while compiling statement: FAILED: ParseException line 25:29 cannot recognize input near ‘?’ ‘and’ ‘s’ in expression specification

You will get that one a lot. Basically, on any error, this is what you will get, with the place of the error being your first question mark. I thought for a long time that the driver was completely borked, but actually no (just majorly, not completely). If you enable logging (LogLevel=4 and eg. LogPath=/tmp/hivelogs) in your obcinst.ini you will be able to see the inner error, which is a lot more informative.

unix_timestamp

Any query using unix_timestamp will give you

unix_timestamp is not a valid scalar function or procedure call

My guess is that the driver mixes up with unix_timestamp(), with no parameters, which is deprecated. As a workaround, you can cast your date as bigint, which works the same. I was proud of myself with this workaround, but look below (Cast) for the issues this causes.

CTE

They are the best thing in SQL with the analytics functions. The driver does not support them:

  syntax error near ‘with<<< ??? >>> init as (select ? as lic, ? as cpg) select * from init’.

The solution is, of course, to use subqueries instead.

‘floor’ is a reserved keyword.

Yes, I agree that it’s reserved, but because it’s an actual function. I should not have this error when I am using eg. floor(42) in a query.

This one surprises me because a simple query (select floor(42)) will succeed, whereas the same line use in a more complex query will give fail. I see from the logs that the driver shows the error but is somehow able to recover for simple queries, not for complex queries.

Cast does not only returns string

Casting to dates as bigint and taking a diff:

select cast(cast('2019-01-21 01:32:32' as timestamp) as bigint) - cast(cast('2019-02-21 01:32:32' as timestamp) as bigint) as tto

fails as well:

Operand types SQL_WCHAR and SQL_WCHAR are incompatible for the binary minus operator

Same as for floor, in some cases the driver recovers, sometimes not.