Connect SFTP to S3 in AWS glue

AWS has a lot of data transfer tools, but none of them can actually transfer from SFTP to S3 out of the box.

Luckily Glue is very flexible, and it is possible to run a pure python script there.

Without further ado, a basic python script, which can run in Glue (as well as locally), and will read all files in the root of a SFTP server to upload them into a S3 bucket.

import boto3
import paramiko

s3 = boto3.resource("s3")
bucket = s3.Bucket(name="destination-bucket")
bucket.load()


ssh = paramiko.SSHClient()
# In prod, add explicitly the rsa key of the host instead of using the AutoAddPolicy:
# ssh.get_host_keys().add('example.com', 'ssh-rsa', paramiko.RSAKey(data=decodebytes(b"""AAAAB3NzaC1yc2EAAAABIwAAAQEA0hV...""")))
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh.connect(
    hostname="sftp.example.com",
    username="thisdataguy",
    password="very secret",
)

sftp = ssh.open_sftp()

for filename in sftp.listdir():
    print(f"Downloading {filename} from sftp...")
    # mode: ssh treats all files as binary anyway, to 'b' is ignored.
    with sftp.file(filename, mode="r") as file_obj:
        print(f"uploading  {filename} to s3...")
        bucket.put_object(Body=file_obj, Key=f"destdir/{filename}")
        print(f"All done for {filename}")

There is only one thing to take care of. Paramiko is not available by default in Glue, so in the job setup you need to point the Python lib path to a downloaded wheel of paramiko on S3.

Advertisement

Thousand separators in Hue

Hue is a very handy SQL assistant for Hadoop, where you can easily run Hive or Impala query.

I was asked if it was possible to have thousands separator in the display of the query results. There is no option in Hue, I thought I could get away with Django environment variables but either it’s not possible, or I got it wrong.

In any case, it did not feel great. You could argue that Hue is a display tool so it’s OK to format the output, but it would be for all users, and they might not all want that…

Long story short, I removed my Hadoop administrator cap and put on my dirty creative one. Once the results are loaded, it’s a trivial javascript manipulation to format them. Furthermore, Hue uses jquery which makes it even easier. So I came up with this little bookmarklet. Put it in the URL field of a bookmark, including the ‘javascript:’ prefix. If you want to format your output, just click on the bookmarklet et voilà:

javascript:(
  function(){
    $('table.resultTable td').each(
      function(){
        if (!isNaN($(this).text())) {
          $(this).text($(this).text().toString().replace(/(\d)(?=(\d{3})+(?!\d))/g, '$1,'))
        }
      }
    )
  }()
);

Count bytes in a stream, in bash

I often have this kind of construct where a process generates a lot of data and a second one does something with it. Think for instance about a big select in a database, the output being a csv we want to compress.

‘SELECT a lot of data FROM a big table ‘ | gzip > data.csv.gz

I wanted to know the size of the original data (I know that in the case of gzip I can use the -l flag, but this is just an example).

There are 3 ways to do this. In my examples the big data process is yes | head -n 1000000000 which generates 1 billion rows (without IO, which is nice for benchmarking), the consumer process is just a dd which dumps everything in /dev/null.

awk

yes | head -n 1000000000 | awk '{print $0; count++} END{print count >"/dev/stderr";}' | dd bs=64M of=/dev/null

Good points:

  • Quite easy to read, semantically easy to understand,
  • does not duplicate the data stream.

Bad point

  • As the data still flows to STDOUT, the row count is printed on STDERR which is not ideal but is still usable afterwards.

Tee and wc

yes | head -n 1000000000 | tee >(dd bs=64M of=/dev/null 2>/dev/null) | wc -l | { read -r rowcount; }
echo $rowcount

Good point:

  • You get the rowcount in a variable, easy to use afterwards.

Bad points

  • Semantically weird, tricky to understand,
  • duplicate the data stream.

pv

yes | head -n 1000000 | pv -l | dd bs=64M of=/dev/null 2>/dev/null

Good points

  • Semantically pleasing,
  • pv has a lot of options which might be interesting.

Bad point

  • Nice for interactive use, but it displays a progress bar on STDERR so it’s next to impossible to get the output in a script.

Out of those 3 options, which one is the fastest?

After running each option 10 times, here are the results, in seconds.

awk tee pv
Mean 139.5 18.3  6516
Min 133 17  6446
Max 154 22 6587
Stdev 8.28 1.81  48.4

Yes, I double checked my data. We are indeed talking about 20 seconds for tee, 2:20 minutes for awk and about 1h45 for pv.

Easily simulating connection timeouts

I needed an easy way to simulate timeout when connected to a REST API. As part of the flow of an application I am working on I need to send events to our data platform, and blocking the production flow ‘just’ to send an event in case of timeout is not ideal, and I needed a way to test this.

I know there are a few options:

  • Connecting to a ‘well known’ timing out url, as google.com:81, but this is very antisocial
  • Adding my own firewall rule to DROP connection, but this is a lot of work (yes, I am very very lazy and I would need to look up the iptables syntax)
  • Connecting to a non routable IP, like 10.255.255.1 or 10.0.0.0

All those options are fine (except the first one, which although technically valid is very rude and no guaranteed to stay), but they all give indefinite non configurable timeouts.

I thus wrote a small python script, without dependencies, which just listens to a port and makes the connection wait a configurable amount of seconds before either closing the connection, either returning a valid HTTP response.

Its usage is very simple:

usage: timeout.py [-h] [--http] [--port PORT] [--timeout TIMEOUT]

Timeout Server.

optional arguments:
 -h, --help show this help message and exit
 --http, -w if true return a valid http 204 response.
 --port PORT, -p PORT Port to listen to. Default 7000.
 --timeout TIMEOUT, -t TIMEOUT
 Timeout in seconds before answering/closing. Default
 5.

For instance, to wait 2 seconds before giving an http answer:

./timeout.py -w -t2

Would give you following output if a client connects to it:

./timeout.py -w -t2
Listening, waiting for connection...
Connected! Timing out after 2 seconds...
Processing complete.
Returning http 204 response.
Closing connection.

Listening, waiting for connection...

This is the full script, which you can find on github as well:

#!/usr/bin/env python
import argparse
import socket
import time


# Make the TimeoutServer a bit more user friendly by giving 3 options:
# --http/-w to return a valid http response
# --port/-p to define the port to listen to (7000)
# --timeout/-t to define the timeout delay (5)

parser = argparse.ArgumentParser(description='Timeout Server.')
parser.add_argument('--http', '-w', default=False, dest='http', action='store_true',
                    help='if true return a valid http 204 response.')
parser.add_argument('--port', '-p', type=int, default=7000, dest='port',
                    help='Port to listen to. Default 7000.')
parser.add_argument('--timeout', '-t', type=int, default=5, dest='timeout',
                    help='Timeout in seconds before answering/closing. Default 5.')
args = parser.parse_args()


# Creates a standard socket and listen to incoming connections
# See https://docs.python.org/2/howto/sockets.html for more info
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', args.port))
s.listen(5)  # See doc for the explanation of 5. This is a usual value.

while True:
    print("Listening, waiting for connection...")
    (clientsocket, address) = s.accept()
    print("Connected! Timing out after {} seconds...".format(args.timeout))
    time.sleep(args.timeout)
    print('Processing complete.')

    if args.http:
        print("Returning http 204 response.")
        clientsocket.send(
            'HTTP/1.1 204 OK\n'
            #'Date: {0}\n'.format(time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())
            'Server: Timeout-Server\n'
            'Connection: close\n\n'  # signals no more data to be sent)
        )

    print("Closing connection.\n")
    clientsocket.close()

eg: examples for common command line tools

Are you tired to RTFM? Does this xkcd comic feel familiar to you?

Tar

Enter eg, which provides easy examples to common command line tools. Instead of having to find your way in the full manual of tar, you can just type:

eg tar

And you will have common usages, nicely formatted and even colored. For the example of tar, you will have examples of basic usage, tarring, untarring and more:

Eg

Of course, if you then want more information, TFM is the place to go.

eg is dead easy to install. You have to options:

pip install eg
# or
git clone https://github.com/srsudar/eg .
ln -s /absolute/path/to/eg-repo/eg_exec.py /usr/local/bin/eg

Et voila, you can start using eg.

Eg itself can be easily extended, as the example are just markdown files put in the right place. You can find all the documentation including formatting options and more in the eg repository.

Last but not least, the author suggests to alias eg to woman for something that is like man but a little more practical:

alias woman=eg

Easy import from Hive to Vertica

Properly setup, Vertica can connect to Hcatalog, or read hdfs files. This does require some DBA work, though.

If you want to easily get data fro Hive to Vertica, you can use the COPY statement with the LOCAL STDIN modifier and pipe the output of Hive to the input of Vertica. Once you add a dd in the middle to prevent the stream to just stop after a while, this works perfectly. I am not so sure why dd is needed, but I suppose it buffers data and makes the magic happen.

hive -e "select whatever FROM wherever" | \
dd bs=1M | \
/opt/vertica/bin/vsql -U $V_USERNAME -w $V_PASSWORD -h $HOST $DB -c \
"COPY schema.table FROM LOCAL STDIN DELIMITER E'\t' NULL 'NULL' DIRECT"

Of course, the previous statement needs to be amended to use your own user, password and database.

The performance are quite good with this, although I cannot give a good benchmark as in our case the hive statement was not trivial.

One thing to really take care of is where you run this statement. You can run it from everywhere as long as hive and Vertica are accessible, but be aware that data will flow from hive to your server to Vertica. Running this statement on a Vertica node or your hive server will reduce the network traffic and might speed up things.

This post is based on my answer to a question on stackoverflow.

Vertica: Panic – Data consistency problems

While replacing a node and during the recovery, the node did reboot (human mistake). After actual reboot the recover did not proceed and the node stayed in DOWN state, even if we tried to restart it via the admintools.

In vertica.log, we could see the following lines:

<PANIC> @v_dwh_node0003: VX001/2973: Data consistency problems found; startup aborted
 HINT: Check that all file systems are properly mounted. Also, the --force option can be used to delete corrupted data and recover from the cluster
 LOCATION: mainEntryPoint, /scratch_a/release/vbuild/vertica/Basics/vertica.cpp:1441

As the logs nicely suggest, using the (undocumented) --force option can help. That said, this option cannot be used from the admintool curse interface, and must be used from the command line:

/opt/vertica/bin/admintools -t restart_node -d $db_name -s $host --force

That way corrupted data was deleted, and the recovering could carry on nicely.

Create and apply patch from a github pull request

You have a local clone of a github repository, somebody created a pull request against this repository and you would like to apply it to your clone before the maintainer of the origin actually merges it.

How to do that?

It is actually surprisingly neat. When you look at the url of a github PR:

https://github.com/torvalds/linux/pull/42

You can just add ‘.patch’ at the end of the url to get a nicely formatted email patch:

https://github.com/torvalds/linux/pull/42.patch

From there on, you have a few options. If you download the patch (in say pr.patch) at the root of your clone, you can apply it:

git am ./pr.patch

If you want to apply the code patch without actually apply the commits, you can use your old trusty patch command:

patch -p 1 &lt; ./pr.patch

If you are lazy (as my director studies always said, ‘laziness drives progress’), you can do all in one line:

wget -q -O - 'https://github.com/torvalds/linux/pull/42.patch' | git am

Vertica: rename multiple tables in one go

I have the use case where I need to regularly fully drop and recreate a table in Vertica. To try to keep the period without data to a minimum, I want to load data in an intermediate table, then rename the old table out of the way, and rename the intermediate to its final name. It turns out that Vertica allows this in one command, hopefully thus avoiding race conditions.

After loading, before renaming:

vertica=> select * from important;
 v
-----
 old
(1 row)

vertica=> select * from important_intermediate ;
 v
-----
 new
(1 row)

Multiple renaming:

ALTER TABLE important,important_intermediate RENAME TO important_old, important;

after:

vertica=> select * from important;
 v
-----
 new
(1 row)
vertica=> select * from important_old;
 v
-----
 old
(1 row)

You will probably want to DROP import_old now.

Understand and generate unix passwords

I needed to generate an entry in the /etc/shadow file, without using the standard tools. It turned out it was not a good idea for my needs, but in the meantime I learned a lot about the format of this file and how to manually generate passwords.

Shadow file format

User passwords are stored in the file /etc/shadow. The format of this file is extensively described in the shadow man page. Each line is divided in 9 fields, separated by colons but we are interested only in the 2 first fields, namely username and password.

The password field itself is split in 3 parts, divided by $, for instance

$6$njdsgfjemnb$Zj.b3vJUbsa0VRPzP2lvB858yXR5Dg4YOGhoytquRKVpSNWt8v/gHAVh2vVgl/HUG7qZK.OmHd.mjCd22FUjH.

What does that mean?

The first field (in our example $6) tells us which hash function is used. It will usually be 6 (SHA-512) under linux. The values can be found in man crypt, but they are as follow:

 ID  | Method
 ─────────────────────────────────────────────────────────
 1   | MD5
 2a  | Blowfish (not in mainline glibc; added in some
     | Linux distributions)
 5   | SHA-256 (since glibc 2.7)
 6   | SHA-512 (since glibc 2.7)

The second field is a salt. It is used to make sure that 2 users with the same password would actually end up with 2 different hashes. It is randomly generated at password creation, but technically as we will see it later, it can be manually set. Wikipedia has more information about the cryptographic salt.

The third field is the password with the hash (second field) concatenated, hashed using the function defined in the first field.

How to generate a valid password entry?

For ID 1 (MD5) you can use openssl:

salt=pepper
pwd=very_secure
openssl passwd -1 -salt $salt $pwd
# outputs: $1$pepper$XQ7/iG9IT0XdBll4ApeJQ0

You cannot generate a SAH-256 or SHA-512 hash that way, though. The easiest and generic way would then be to use python or perl. For a salt ‘pepper’ and a password ‘very_secure’, both lines would yeld the same result:

#Pythonista
python -c 'import crypt; print crypt.crypt("very_secure", "$6$pepper")'
# There is more than one way to do it
perl -e 'print crypt("very_secure", "\$6\$pepper\n") . "\n"'

# outputs: $6$pepper$P9Wt3.3Uqh9UZbvz5/6UPtHqa4KE/2aeyeXbKm0mpv36Z5aCBv0OQEZ1e.aKcPR6RBYvQIa/ToAfdUX6HjEOL1