Connect SFTP to S3 in AWS glue

AWS has a lot of data transfer tools, but none of them can actually transfer from SFTP to S3 out of the box.

Luckily Glue is very flexible, and it is possible to run a pure python script there.

Without further ado, a basic python script, which can run in Glue (as well as locally), and will read all files in the root of a SFTP server to upload them into a S3 bucket.

import boto3
import paramiko

s3 = boto3.resource("s3")
bucket = s3.Bucket(name="destination-bucket")
bucket.load()


ssh = paramiko.SSHClient()
# In prod, add explicitly the rsa key of the host instead of using the AutoAddPolicy:
# ssh.get_host_keys().add('example.com', 'ssh-rsa', paramiko.RSAKey(data=decodebytes(b"""AAAAB3NzaC1yc2EAAAABIwAAAQEA0hV...""")))
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh.connect(
    hostname="sftp.example.com",
    username="thisdataguy",
    password="very secret",
)

sftp = ssh.open_sftp()

for filename in sftp.listdir():
    print(f"Downloading {filename} from sftp...")
    # mode: ssh treats all files as binary anyway, to 'b' is ignored.
    with sftp.file(filename, mode="r") as file_obj:
        print(f"uploading  {filename} to s3...")
        bucket.put_object(Body=file_obj, Key=f"destdir/{filename}")
        print(f"All done for {filename}")

There is only one thing to take care of. Paramiko is not available by default in Glue, so in the job setup you need to point the Python lib path to a downloaded wheel of paramiko on S3.

Dundas Rest API and Python: introducing PyDundas

Dundas has a very complete REST API. You can do just about everything with it. It is what the web app uses, so if you are in need of an example, you can just look at the queries which are sent with the developer tools of your favorite browser.

That said, it is a bit of a pain to use in a script.

To make my life easy, I built PyDundas: a python package using in the background the API. This lets you abstract away all the nitty-gritty and you can concentrate on semantically pleasing code.

For instance, warehousing a cube is quite simple. It abstracts out for you logging, checking, waiting and more.

from pydundas import Api, Session, creds_from_yaml
import sys
import json
creds = creds_from_yaml('credentials.yaml')

with Session(**creds) as d:
    api = Api(d)
    capi = api.cube()
    cube = capi.getByPath('Awesome Project', '/relevant/path')
    if cube is None:
        print("Gotcha, no cube named like that.")
        sys.exit(1)
    print(cube.json())
    if not cube.is_checked_out():
        cube.clear_cache()

    cube.warehouse()
    print(cube.isWarehousing())
    cube.waitForWarehousingCompletion()
    print('Done')

There is a lot more that can be done with PyDundas. I developed it for my needs, so I only added what I actually needed and is thus far from complete. That said, I regularly update it and add new APIs, and it is open source so feel free to send me pull requests to expand it.