Connect SFTP to S3 in AWS glue

AWS has a lot of data transfer tools, but none of them can actually transfer from SFTP to S3 out of the box.

Luckily Glue is very flexible, and it is possible to run a pure python script there.

Without further ado, a basic python script, which can run in Glue (as well as locally), and will read all files in the root of a SFTP server to upload them into a S3 bucket.

import boto3
import paramiko

s3 = boto3.resource("s3")
bucket = s3.Bucket(name="destination-bucket")

ssh = paramiko.SSHClient()
# In prod, add explicitly the rsa key of the host instead of using the AutoAddPolicy:
# ssh.get_host_keys().add('', 'ssh-rsa', paramiko.RSAKey(data=decodebytes(b"""AAAAB3NzaC1yc2EAAAABIwAAAQEA0hV...""")))

    password="very secret",

sftp = ssh.open_sftp()

for filename in sftp.listdir():
    print(f"Downloading {filename} from sftp...")
    # mode: ssh treats all files as binary anyway, to 'b' is ignored.
    with sftp.file(filename, mode="r") as file_obj:
        print(f"uploading  {filename} to s3...")
        bucket.put_object(Body=file_obj, Key=f"destdir/{filename}")
        print(f"All done for {filename}")

There is only one thing to take care of. Paramiko is not available by default in Glue, so in the job setup you need to point the Python lib path to a downloaded wheel of paramiko on S3.


Git in pure Python: Full Dulwich example

For some reason I do not have and cannot have git installed on some servers, but I still needed to connect to github. Of course, I was bound to find some pure Python git module. There seems to be a few options, but the main one is Dulwich, which is even referenced in the git book.

Dulwich can do a lot with git (much more than what I needed). It can handle higher and lower level git APIs. The lower level are known as plumbing, the higher level as porcelain (because the porcelain is the interface between the user and the plumbing. If it feels like a toilet joke, it’s because it is.)

One issue I had while using Dulwich was to find documentation, mostly about the use of tokens and proxy with github https access.

Eventually, by reading some examples the code, I ended up finding everything I needed, soI give here a complete commented example of a (basic: clone/add/commit/push) porcelain flow.

You can find the file on my github as well.

from os import environ
from pathlib import Path
from tempfile import TemporaryDirectory

from dulwich import porcelain
from dulwich.repo import Repo
from urllib3 import ProxyManager

## Configuration settings:
# Source url of the repo (Note: https here. ssh would work as well, a bit differently).
# Gihthub token:
TOKEN = "12345blah"
## /end of configuration.

# If the environment variable https_proxy exists, we need to tell Dulwich to use a proxy.
if environ.get("https_proxy", None):
    pool_manager = ProxyManager(environ["https_proxy"], num_pools=1)
    pool_manager = None

with TemporaryDirectory() as gitrootdir:

    gitdir = Path(gitrootdir) / "repo"
    repo = porcelain.clone(
        # Tokens are kinda public keys, no need for a username but it still needs to be provided for Dulwich.
        username="not relevant",

    # Do something clever with the files in the repo, for instance create an empty readme.
    readme = gitdir / ""
    porcelain.add(repo, readme)

    porcelain.commit(repo, "Empty readme added.")

        refspecs="master", # branch to push to
        # Tokens are kinda public keys, no need for a username but it still needs to be provided for Dulwich.
        username="not relevant",
    # Note: Dulwich 0.20.5 raised an exception here. It could be ignored but it was dirty:
    # File ".../venv/lib/python3.7/site-packages/dulwich/", line 996, in push
    #     (ref, error.encode(err_encoding)))
    # AttributeError: 'NoneType' object has no attribute 'encode'
    # Dulwich 0.20.6 fixed it.

Dundas Rest API and Python: introducing PyDundas

Dundas has a very complete REST API. You can do just about everything with it. It is what the web app uses, so if you are in need of an example, you can just look at the queries which are sent with the developer tools of your favorite browser.

That said, it is a bit of a pain to use in a script.

To make my life easy, I built PyDundas: a python package using in the background the API. This lets you abstract away all the nitty-gritty and you can concentrate on semantically pleasing code.

For instance, warehousing a cube is quite simple. It abstracts out for you logging, checking, waiting and more.

from pydundas import Api, Session, creds_from_yaml
import sys
import json
creds = creds_from_yaml('credentials.yaml')

with Session(**creds) as d:
    api = Api(d)
    capi = api.cube()
    cube = capi.getByPath('Awesome Project', '/relevant/path')
    if cube is None:
        print("Gotcha, no cube named like that.")
    if not cube.is_checked_out():


There is a lot more that can be done with PyDundas. I developed it for my needs, so I only added what I actually needed and is thus far from complete. That said, I regularly update it and add new APIs, and it is open source so feel free to send me pull requests to expand it.

Accessing Dundas with Python

Dundas has a great REST API. You can basically do everything with. Furthermore, it’s easy to find examples, as you just have to look at what your Dundas web app does, and you have all the examples and use cases you can wish for.

I wanted to schedule cube refreshes, so I naturally turned toward Python. It wasn’t too complex, as you will see below. The one thing to take care of is logging out, in all possible cases, otherwise you will burn very fast through your elastic hours (been there, done that). I’ll show you here how I did it with a context manager, which means that I can basically not forget to log out, whatever happens, it’s all managed for me.

In my code in production, this object actually does something (refreshes cubes), this is only a skeleton to get you started. You can find the code as well on my github, it might be easier to read.

The code actually does not do much: the call to __new__ associated with contextlib.closing() allows you to use the object DundasSession within a context manager, with the keyword with, thus guaranteeing you that you will always log out, no matter what, even if an exception or a sys.exit occurs.

#!/usr/bin/env python3

Skeleton to use Dundas Rest API, with guaranteed log out.
import contextlib
import logging
import requests
import sys

class DundasSession:
    Using __new__ + contextlib.closing() is awesome.

    DundasSession can now be used within a context manager, meaning that whatever happens, its close() method
    will be called (thus logging out). No need to call logout() explicitely!
    Just using __del__ is not guaranteed to work because when __del__ is called, you do not know which objects
    are already destroyed, and the session object might well be dead.
    def __new__(cls, *args, **kwargs):
        o.__init__(*args, **kwargs)
        return contextlib.closing(o)

    def __init__(self, user, pwd, url):
        # For session reuse - TCP connection reuse, keeps cookies.
        self.s = requests.session()

        self.user = user
        self.pwd = pwd
        self.url = url
        self.api = self.url + '/api/'
        self.session_id = None  # Will bet set in login()

    def login(self):
        """Login and returns the session_id"""
        login_data = {
            'accountName': self.user,
            'password': self.pwd,
            'deleteOtherSessions': False,
            'isWindowsLogOn': False
        }'Logging in.')
        r = + 'logon/', json=login_data)
        # The following line exceptions out on not 200 return code.

        resp = r.json()
        if resp['logOnFailureReason'].lower() == "none":
            # We're in!
  'Logged in')
            self.session_id = resp['sessionId']
            logging.error('Login failed with message: ' + r.text)

    def close(self):
        """Automagically called by the context manager."""

    def logout(self):
        """If you do not logout, session will stay active, potentially burning through your elastic hours very fast."""

        # If session_id is not defined, we did not even log in (or we are already logged out).'Logging out.')
        if getattr(self, 'session_id', None):
            r = self.s.delete(self.api + 'session/current', params={'sessionId': self.session_id})
            del self.session_id
  'Logged out.')
  'Was not yet Logged in.')


with DundasSession(user='yourapiuser', pwd='pwd', url='') as dundas:

    # Do something smart with your Dundas object.

# No need to log out, this is handled for you via the context manager, even in case of exception or even sys.exit.

Reaching Hive from pyspark on HDP3

There is a lot to find about talking to hive from Spark on the net. Sadly most of it refers to Spark before version 2 or are not valid for hdp3. You need to use the Hive Warehouse Connector, bundled in HDP3.

This is an example of a minimalistic connection from pyspark to hive on hdp3.

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Yes, llap even if you do not use it.
from pyspark_llap import HiveWarehouseSession

settings = [

conf = SparkConf().setAppName("Pyspark and Hive!").setAll(settings)
# Spark 2: use SparkSession instead of SparkContext.
spark = (
    # There is no HiveContext anymore either.

# This is mandatory. Just using spark.sql will not be enough.
hive = HiveWarehouseSession.session(spark).build()

hive.execute("select 2 group by 1 order by 1").show()

You then can run this with the following command:

SPARK_HOME=/usr/hdp/current/spark2-client \
spark-submit \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly- \
--py-files /usr/hdp/current/hive_warehouse_connector/ \


  • HDP_VERSION is needed when you use python 3. If not set, HDP uses a script (/usr/bin/hdp-select) which is python 2 only (although fixing it is trivial).
  • PYSPARK_PYTHON is optional, it will default to just python otherwise (which might or might not be python 3 on your server)
  • without HADOOP_USER_NAME the script will run as your current user. Alternatively, you could sudo first.
  • without SPARK_HOME some jars would not be found and you would end up with an error like py4j.protocol.Py4JJavaError: An error occurred while calling
    : java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
  • –jars and –py-files as you can see, there is the hdp version in file names. Make sure you are using the proper one for you.
  • there is no –master option, this is handled in the script while building the SparkSession.

There is some doc from Hortonworks you can follow to go further: Integrating Apache Hive with Spark and BI.

Just before I posted this article, a new write-up appeared on to describe some use cases for the Hive-Warehouse-Connector.

--no options with argparse and python

Ruby has this very nice feature when you define options with optparse:

opts.on('--[no-]flag', "Set flag.") do |p|

which allows you to have the --flag and --no-flag options for free. Python does not have this, but there are a 3 options to go around that.

The verbose way

Just define 2 options.

    help='Set flag',
    help='Unset flag',

Custom action

You can give a custom action to the action parameter of add_argument. This custom action can look at the actual option given and act accordingly.

    '--flag', '--no-flag',
    help='Set flag',

BooleanAction is just a tiny 6 lines class, defined as follow:

class BooleanAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, **kwargs):
        super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, False if option_string.startswith('--no') else True)

As you can see, it just looks at the name of the flag, and if it starts with --no, the destination will be set to False.

Custom parser

Create your own add_argument method, which can then automagically add the --no option for you.
First define your own parser:

class BoolArgParse(argparse.ArgumentParser):
    def add_bool_arguments(self, *args, **kw):
        grp = self.add_mutually_exclusive_group()
        # add --flag
        grp.add_argument(*args, action='store_true', **kw)
        nohelp = 'no ' + kw['help']
        del kw['help']
        # add --no-flag
        grp.add_argument('--no-' + args[0][2:], *args[1:], action='store_false', help=nohelp, **kw)

Then use it:

parser = BoolArgParse()
parser.add_bool_arguments('--flag',dest='flag', help='set flag.')


I do not want to say plus and min points as not all use cases want the same features, but there you are:

  • Verbose way:
    • More lines of code (need to define 2 flags),
    • Help more verbose,
    • Easy (no extra class),
    • Possibility to have the same parameter multiple times, the last one wins (eg. --flag --no-flag).
  • Custom action:
    • Less lines of code,
    • Help not verbose (only one line of help),
    • Possibility to have the same parameter multiple times, the last one wins (eg. --flag --no-flag).
  • Custom parser
    • The most lines of codes,
    • Help verbose but grouped,
    • Cannot have the same flag repeated.

Get started with AWs and python

When you start for the first (or even second) time with AWS, it is a bit tricky to get your head around all the bits and bolts than need to be connected together. If on top of this you try to work with AWS in Beijing from outside China, the web GUI makes your work even harder because of slowness or even timeouts.

This scripts set up for you a full set of resources (vpc, route table, security group, subnet, internet gateway, instance with the relevant associations and attachments) for easy testing or bootstrapping of your infrastructure.

It is mostly meant as a testing help, so it does not handle all the options possible, but I find it invaluable to get started. You just need the AWS basics:

and it will do the rest for you. You need to provide a tag name (defaults to ‘roles’) and value, and all resources will be created and located via this tag, to allow for easy spawning and tearing down.

usage: fullspawn.py3 [-h] [--tag TAG] [--up | --down] [--wet | --dry]
 [--ami AMI] [--keypair KEYPAIR] [--profile PROFILE]
 [--instance INSTANCE]

Spawns a full AWS self-contained infrastructure.

positional arguments:
 role Tag value used for marking and fetching resources.

optional arguments:
 -h, --help show this help message and exit
 --tag TAG, -t TAG Tag name used for marking and fetching resources.
 (default: roles)
 --up, -u Creates a full infra. (default: up)
 --down, -d Destroys a full infra. (default: up)
 --wet, -w Actually performs the action. (default: dry)
 --dry Only shows what would be done, not doing anything.
 (default: dry)
 Verbosity level. (default: WARNING)
 --cidr CIDR The network range for the VPC, in CIDR notation. For
 example, (default:
 --ami AMI The AMI id for your instance. (default: ami-33734044)
 --keypair KEYPAIR A keypair aws knows about. (default: yourkey)
 --profile PROFILE Profile to use for credentials. Will use AWS_PROFILE
 environment variable if set. (default: default)
 --instance INSTANCE Instance type. (default: t2.micro)

For instance:

# Let's see what would happen when creating a full infra...
./fullspawn.py3 -t tag --up --dry testing
# Look good let's do it.
./fullspawn.py3 -t tag --up --wet testing
# oops, this was a stupid tag name
./fullspawn.py3 -t tag --down --wet testing

You probably want to have a look at some variables inside the script, setting a few defaults which might not be relevant for you. I am thinking about the ami (AMI), the keypair (KEYPAIR) and the ingress rules (INGRESS) all defined before the argparse calls.

The code is available on github.


Graph the noise level in your office in 15 minutes

This is a recurrent complaint in any open space: “There is too much noise!” (the other one is that the climate is too cold/too hot). There are some usual culprits, but it is nice to have data to back your complaints up.

I will here show you how to generate a real-time noise level graph in 15 minutes without any material beside our laptop or desktop, not even a microphone is needed. This is a dirty hack, but it works and can be put in place very quickly with just a few command lines. The steps, which will be mostly cut&paste are:

  • install a noise recorder tool
  • set up nginx to serve the data recorded
  • use a nice javascript library to display the data properly

I used soundmeter, a python tool. So first, install it:

# make sure we can install python packages
apt-get install virtualenv
# install required dependencies for building soundmeter
apt-get install python-dev portaudio19-dev python-dev alsa-utils
# install other useful tools for later display
apt-get install nginx expect-dev
# set up a directory for the tool
mkdir $HOME/soundmeter
# create virtualenv
virtualenv $HOME/soundmeter
# activate it
source $HOME/soundmeter/bin/activate
# install soundmeter
pip install soundmeter

Et voilà, your recorder is setup.

But do you not need a microphone? Well, either you have a laptop with a build in microphone, either you can just plug an headphone, which is basically a microphone used the other way (produce instead of record sound).

To get data, a one-liner is enough:

soundmeter --segment 2 --log /dev/stdout 2>/dev/null | unbuffer -p perl -p -e 's/\s*(\d+)\s+(.{19})(.*)/"$2,". 20*log($1)/e' > meter.csv

The explanation is as follow:

  • soundmeter: run soundmeter forever (–seconds to limit the duration)
  • --segment 2: output data every 2 seconds (default 0.5 seconds, but is very spiky)
  • --log /dev/stdout: default data on stdout is not useful for graphing, we need to log to a file. Use /dev/stdout as file to actually log to stdout
  • 2>/dev/null: do not pollute output
  • |: the output is not in a great format, it needs to be reformatted
  • unbuffer -p: by default data is buffered, which is annoying for real-time view. This does what the name suggests
  • perl -p -e: yummy, a perl regexp!
  • s///e: this will be a substitution, where the replacement part is a perl expression
  • \s*(\d+)\s+(.{19})(.*): record value and timestamp stripped of the milliseconds
  • “$2,”: display first the timestamp with a comma for csv format
  • 20*log($1): the values from soundmeter are in rms, transform them in dB via the formula 20 * log (rms)
  • > meter.csv: save data in a file

In short, we do the following transformation on the fly and write it to a csv file:

2015-09-22 13:36:13,082 12 => 21.5836249,2015-09-22 13:36:13

You now have a nice csv file. How to display it? Via a nice html page with the help of a javascript library, dygraphs,of course.

Set up nginx by adding in /etc/sites-enabled/noise the following content (replace YOUR_HOME by your actual home directory, of course):

server {
 listen 80;
 root YOUR_HOME/soundmeter;

and restart nginx:

service nginx restart

Then setup you page in $HOME/soundmeter/noise.html:

<script src="//"></script>

#graphdiv2 { position: absolute; left: 50px; right: 10px; top: 50px; bottom: 10px; }

<div id="graphdiv2"></div>
<script type="text/javascript">
 g2 = new Dygraph(
 "http://localhost/meter.csv", // path to CSV file
 delimiter: ",",
 labels: ["Date", "Noise level"],
 title: ["Noise (in dB)"],
 showRoller: true,

You can of course replace localhost by your IP to publish this page to your colleagues.

Now just go to http://localhost/noise.html:


Easily simulating connection timeouts

I needed an easy way to simulate timeout when connected to a REST API. As part of the flow of an application I am working on I need to send events to our data platform, and blocking the production flow ‘just’ to send an event in case of timeout is not ideal, and I needed a way to test this.

I know there are a few options:

  • Connecting to a ‘well known’ timing out url, as, but this is very antisocial
  • Adding my own firewall rule to DROP connection, but this is a lot of work (yes, I am very very lazy and I would need to look up the iptables syntax)
  • Connecting to a non routable IP, like or

All those options are fine (except the first one, which although technically valid is very rude and no guaranteed to stay), but they all give indefinite non configurable timeouts.

I thus wrote a small python script, without dependencies, which just listens to a port and makes the connection wait a configurable amount of seconds before either closing the connection, either returning a valid HTTP response.

Its usage is very simple:

usage: [-h] [--http] [--port PORT] [--timeout TIMEOUT]

Timeout Server.

optional arguments:
 -h, --help show this help message and exit
 --http, -w if true return a valid http 204 response.
 --port PORT, -p PORT Port to listen to. Default 7000.
 --timeout TIMEOUT, -t TIMEOUT
 Timeout in seconds before answering/closing. Default

For instance, to wait 2 seconds before giving an http answer:

./ -w -t2

Would give you following output if a client connects to it:

./ -w -t2
Listening, waiting for connection...
Connected! Timing out after 2 seconds...
Processing complete.
Returning http 204 response.
Closing connection.

Listening, waiting for connection...

This is the full script, which you can find on github as well:

#!/usr/bin/env python
import argparse
import socket
import time

# Make the TimeoutServer a bit more user friendly by giving 3 options:
# --http/-w to return a valid http response
# --port/-p to define the port to listen to (7000)
# --timeout/-t to define the timeout delay (5)

parser = argparse.ArgumentParser(description='Timeout Server.')
parser.add_argument('--http', '-w', default=False, dest='http', action='store_true',
                    help='if true return a valid http 204 response.')
parser.add_argument('--port', '-p', type=int, default=7000, dest='port',
                    help='Port to listen to. Default 7000.')
parser.add_argument('--timeout', '-t', type=int, default=5, dest='timeout',
                    help='Timeout in seconds before answering/closing. Default 5.')
args = parser.parse_args()

# Creates a standard socket and listen to incoming connections
# See for more info
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', args.port))
s.listen(5)  # See doc for the explanation of 5. This is a usual value.

while True:
    print("Listening, waiting for connection...")
    (clientsocket, address) = s.accept()
    print("Connected! Timing out after {} seconds...".format(args.timeout))
    print('Processing complete.')

    if args.http:
        print("Returning http 204 response.")
            'HTTP/1.1 204 OK\n'
            #'Date: {0}\n'.format(time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())
            'Server: Timeout-Server\n'
            'Connection: close\n\n'  # signals no more data to be sent)

    print("Closing connection.\n")

Vertica ODBC error messages and solutions

Those are error messages and solutions found after lots of trials and errors. I am mostly using python with Vertica, some some solutions might thus be python specific, but most should be generic enough.

[HY000] [unixODBC][Vertica][ODBC] (11560) Unable to locate SQLGetPrivateProfileString function. (11560) (SQLDriverConnect)

The ODBC connection does not find a properly defined DSN. Reasons include:

  • Path not existing in one of the odbc.ini or odbcinst.ini files (check mostly ODBCInstLib, Driver, Driver64).

[22001] [Vertica][ODBC] (10170) String data right truncation on data from data source: String data is too big for the driver’s data buffer. (10170) (SQLPutData)

This is a unicode issue. Reasons might be:

  • Old pyodbc which does not handle UTF-8  properly (try to use version 3+)
  • Vertica’s VARCHAR length is given in bytes, not character. So if you have UTF8 characters in a string, you might go above the limit without noticing. Eg. a VARCHAR(1) can hold ‘0’ but not ‘€’.
  • Pyodbc does not handle unicode properly. If you are using python, encode in UTF-8.

[IM002] [unixODBC][Driver Manager]Data source name not found, and no default driver specified (0) (SQLDriverConnect)

The DSN used does not exist. Reasons include:

  • Typo in the DSN in your code (you are asking for a DSN not defined in odbc.ini).
  • odbc.ini file syntax invalid (for instance closing square bracket forgotten).
  • DSN not defined in the used odbc.ini file.
  • Wrong odbc.ini file used, hence DSN not found. This can happen if a $HOME/.odbc.ini file, often created by default, exists.
  • The odbc.ini is not in the expected path (/etc/odbc.ini). Pointing the ODBCINI environment variable to the right path might work.
  • The odbc.ini file references a Driver in the the relevant DSN section which is not defined in /etc/odbcinst.ini.

[HY000] [unixODBC][DSI] The error message NoSQLGetPrivateProfileString could not be found in the en-US locale. Check that /en-US/ODBCMessages.xml exists. (-1) (SQLDriverConnect)

Vertica needs some extra specifications in either /etc/vertica.ini (default), or in the file pointed to by the VERTICAINI environment variable:

ErrorMessagesPath = /opt/vertica/lib64/
ODBCInstLib = /usr/lib/x86_64-linux-gnu/

Usually I just add this to odbc.ini and points VERTICAINI to it.

pyodbc.Error: (‘H’, ‘[H] [unixODBC][ (4294967295) (SQLDriverConnectW)’)

You are using an old version of pyodbc. Upgrade system wide or create a virtualenv and pip install pyodbc.