Connect SFTP to S3 in AWS glue

AWS has a lot of data transfer tools, but none of them can actually transfer from SFTP to S3 out of the box.

Luckily Glue is very flexible, and it is possible to run a pure python script there.

Without further ado, a basic python script, which can run in Glue (as well as locally), and will read all files in the root of a SFTP server to upload them into a S3 bucket.

import boto3
import paramiko

s3 = boto3.resource("s3")
bucket = s3.Bucket(name="destination-bucket")
bucket.load()


ssh = paramiko.SSHClient()
# In prod, add explicitly the rsa key of the host instead of using the AutoAddPolicy:
# ssh.get_host_keys().add('example.com', 'ssh-rsa', paramiko.RSAKey(data=decodebytes(b"""AAAAB3NzaC1yc2EAAAABIwAAAQEA0hV...""")))
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh.connect(
    hostname="sftp.example.com",
    username="thisdataguy",
    password="very secret",
)

sftp = ssh.open_sftp()

for filename in sftp.listdir():
    print(f"Downloading {filename} from sftp...")
    # mode: ssh treats all files as binary anyway, to 'b' is ignored.
    with sftp.file(filename, mode="r") as file_obj:
        print(f"uploading  {filename} to s3...")
        bucket.put_object(Body=file_obj, Key=f"destdir/{filename}")
        print(f"All done for {filename}")

There is only one thing to take care of. Paramiko is not available by default in Glue, so in the job setup you need to point the Python lib path to a downloaded wheel of paramiko on S3.

Advertisement

Thousand separators in Hue

Hue is a very handy SQL assistant for Hadoop, where you can easily run Hive or Impala query.

I was asked if it was possible to have thousands separator in the display of the query results. There is no option in Hue, I thought I could get away with Django environment variables but either it’s not possible, or I got it wrong.

In any case, it did not feel great. You could argue that Hue is a display tool so it’s OK to format the output, but it would be for all users, and they might not all want that…

Long story short, I removed my Hadoop administrator cap and put on my dirty creative one. Once the results are loaded, it’s a trivial javascript manipulation to format them. Furthermore, Hue uses jquery which makes it even easier. So I came up with this little bookmarklet. Put it in the URL field of a bookmark, including the ‘javascript:’ prefix. If you want to format your output, just click on the bookmarklet et voilà:

javascript:(
  function(){
    $('table.resultTable td').each(
      function(){
        if (!isNaN($(this).text())) {
          $(this).text($(this).text().toString().replace(/(\d)(?=(\d{3})+(?!\d))/g, '$1,'))
        }
      }
    )
  }()
);

Create and apply patch from a github pull request

You have a local clone of a github repository, somebody created a pull request against this repository and you would like to apply it to your clone before the maintainer of the origin actually merges it.

How to do that?

It is actually surprisingly neat. When you look at the url of a github PR:

https://github.com/torvalds/linux/pull/42

You can just add ‘.patch’ at the end of the url to get a nicely formatted email patch:

https://github.com/torvalds/linux/pull/42.patch

From there on, you have a few options. If you download the patch (in say pr.patch) at the root of your clone, you can apply it:

git am ./pr.patch

If you want to apply the code patch without actually apply the commits, you can use your old trusty patch command:

patch -p 1 < ./pr.patch

If you are lazy (as my director studies always said, ‘laziness drives progress’), you can do all in one line:

wget -q -O - 'https://github.com/torvalds/linux/pull/42.patch' | git am

Understand and generate unix passwords

I needed to generate an entry in the /etc/shadow file, without using the standard tools. It turned out it was not a good idea for my needs, but in the meantime I learned a lot about the format of this file and how to manually generate passwords.

Shadow file format

User passwords are stored in the file /etc/shadow. The format of this file is extensively described in the shadow man page. Each line is divided in 9 fields, separated by colons but we are interested only in the 2 first fields, namely username and password.

The password field itself is split in 3 parts, divided by $, for instance

$6$njdsgfjemnb$Zj.b3vJUbsa0VRPzP2lvB858yXR5Dg4YOGhoytquRKVpSNWt8v/gHAVh2vVgl/HUG7qZK.OmHd.mjCd22FUjH.

What does that mean?

The first field (in our example $6) tells us which hash function is used. It will usually be 6 (SHA-512) under linux. The values can be found in man crypt, but they are as follow:

 ID  | Method
 ─────────────────────────────────────────────────────────
 1   | MD5
 2a  | Blowfish (not in mainline glibc; added in some
     | Linux distributions)
 5   | SHA-256 (since glibc 2.7)
 6   | SHA-512 (since glibc 2.7)

The second field is a salt. It is used to make sure that 2 users with the same password would actually end up with 2 different hashes. It is randomly generated at password creation, but technically as we will see it later, it can be manually set. Wikipedia has more information about the cryptographic salt.

The third field is the password with the hash (second field) concatenated, hashed using the function defined in the first field.

How to generate a valid password entry?

For ID 1 (MD5) you can use openssl:

salt=pepper
pwd=very_secure
openssl passwd -1 -salt $salt $pwd
# outputs: $1$pepper$XQ7/iG9IT0XdBll4ApeJQ0

You cannot generate a SAH-256 or SHA-512 hash that way, though. The easiest and generic way would then be to use python or perl. For a salt ‘pepper’ and a password ‘very_secure’, both lines would yeld the same result:

#Pythonista
python -c 'import crypt; print crypt.crypt("very_secure", "$6$pepper")'
# There is more than one way to do it
perl -e 'print crypt("very_secure", "\$6\$pepper\n") . "\n"'

# outputs: $6$pepper$P9Wt3.3Uqh9UZbvz5/6UPtHqa4KE/2aeyeXbKm0mpv36Z5aCBv0OQEZ1e.aKcPR6RBYvQIa/ToAfdUX6HjEOL1

Disco: purge all completed jobs

In Disco, jobs not purged still use some disk space, for temporary data. This can lead to fully using all the disc space in your cluster. I have been there, and it is not fun I promise you.

You can setup some purge policy (a job might purge itself after completion, for instance), but if you need to quickly clean up all jobs, this snippet will help. It will purge all completed jobs, successful or not.

for job in $(disco jobs);
do
  disco events $job | grep "WARN: Job killed\|READY";
  if [ $? -eq 0 ];
  then
    disco purge $job;
  fi;
done

Disco: replicate all data away from a blacklisted node

Before removing a node from Disco, the right way to do it is to blacklist it, replicate away its data, and finally remove it form the cluster. Replicating data away is done by running the garbage collector. Unfortunately, the garbage collector does not migrate everything in one go, so a few runs are needed. To not have to do this manually, the following script will run the garbage collector as often as needed as long as some nodes are blacklisted but not yet safe for removal. The full script can be found on github.

#!/usr/bin/env bash

# Will check if there are nodes blacklisted for ddfs but not fully replicated yet.
# If this is the case, it will run the GC as long as all data is not replicated away.

# debug
#set -x

# Treat unset variables as an error when substituting.
set -u

# master
HOST=disco.example.com:8989
DDFS_URL="http://$HOST/ddfs/ctrl"
DISCO_URL="http://$HOST/disco/ctrl"

# API commands
GC_STATUS=$DDFS_URL/gc_status
GC_START=$DDFS_URL/gc_start
BLACKLIST=$DISCO_URL/get_gc_blacklist
SAFE_GC=$DDFS_URL/safe_gc_blacklist

# counter to mark how many times the GC ran.
CNT=0

function is_running {
    # will get "" if GC not running, or a string describing the current status.
    _GC_RES=$(wget -q -O- $GC_STATUS)
    if [ "$_GC_RES" == '""' ]
    then
        _GC_RES=''
    fi
    echo $_GC_RES
}

function is_safe {
    _BLACKLISTED=$(wget -q -O- $BLACKLIST)
    _SAFE=$(wget -q -O- $SAFE_GC)

    # eg.
    # blacklisted:  ["slave1","slave2","slave3"]
    # safe_gc_blacklist: []

    # safe is a subset of get. If we concat the 2 (de-jsonised) and get uniques, we have 2 cases:
    # - no uniques => all nodes are safe (in blacklist *and* in safe)
    # - uniques => some nodes are not safe

    echo "$_BLACKLISTED $_SAFE" | tr -d '[]"' | tr ', ' '\n' | sort | uniq -u
}

while true
do

    GC_RES=$(is_running)

    if [ -z "$GC_RES" ]
    then
        echo "GC not running, let's check if it is needed."
        NON_SAFE=$(is_safe)
        if [ -z "$NON_SAFE" ]
        then
            echo "All nodes are safe for removal."
            exit
        else
            echo "Somes nodes are not yet safe: $NON_SAFE"
            CNT=$((CNT+1))
            date +'%Y-%m-%d %H:%M:%S'
            wget -q -O /dev/null $GC_START
            echo "Run $CNT started."
        fi
    else
        echo "GC running ($GC_RES). Let's wait".
    fi
    sleep 60
done

How to build a full flume interceptor by a non java developer

Building a flume interceptor does not look too complex. You can find some examples, howtos or cookbooks around. The issue is that they all explain the specificities of the interceptor, leaving a python/perl guy like me in the dark about maven, classpaths or imports. This posts aims to correct this. This will describe the bare minimum to get it working, but I link to more documentation if you want to go deeper. You can find the code on github as well.

The interceptor itself

You need to create a directory structure, in which to write the following java code. It needs to be under src/main/java/com/example/flume/interceptors/eventTweaker.java in this example, for an interceptor named com.example.flume.interceptor.eventTweaker.

package com.example.flume.interceptors;

import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import org.apache.log4j.Logger;

public class eventTweaker implements Interceptor {

  private static final Logger LOG = Logger.getLogger(eventTweaker.class);

  // private means that only Builder can build me.
  private eventTweaker() {}

  @Override
  public void initialize() {}

  @Override
  public Event intercept(Event event) {

    Map<String, String> headers = event.getHeaders();

    // example: add / remove headers
    if (headers.containsKey("lice")) {
      headers.put("shampoo", "antilice");
      headers.remove("lice");
    }

    // example: change body
    String body = new String(event.getBody());
    if (body.contains("injuries")) {
      try {
        event.setBody("cyborg".getBytes("UTF-8"));
      } catch (java.io.UnsupportedEncodingException e) {
        LOG.warn(e);
        // drop event completely
        return null;
      }
    }

    return event;
  }

  @Override
  public List<Event> intercept(List<Event> events) {
    for (Event event:events) {
      intercept(event);
    }
    return events;
  }

  @Override
  public void close() {}

  public static class Builder implements Interceptor.Builder {

    @Override
    public Interceptor build() {
      return new eventTweaker();
    }

    @Override
    public void configure(Context context) {}
  }
}

Maven configuration

To build the above, you need a pom.xml, to be put at the root of the aforementioned tree. The following is bare minimum which would probably make any java developer cringe, but It Works™.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>org.apache.flume</groupId>
 <artifactId>eventTweaker</artifactId>
 <version>1.0</version>

 <repositories>
 <repository>
 <id>central</id>
 <name>Maven Central</name>
 <url>http://repo1.maven.org/maven2/</url>
 </repository>
 <repository>
 <id>cloudera-repo</id>
 <name>Cloudera CDH</name>
 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
 </repository>
 </repositories>

 <properties>
 <flume.version>1.4.0-cdh4.4.0</flume.version>
 </properties>

 <dependencies>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-sdk</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>log4j</groupId>
 <artifactId>log4j</artifactId>
 <version>1.2.16</version>
 <exclusions>
 <exclusion>
 <groupId>com.sun.jdmk</groupId>
 <artifactId>jmxtools</artifactId>
 </exclusion>
 <exclusion>
 <groupId>com.sun.jmx</groupId>
 <artifactId>jmxri</artifactId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>4.11</version>
 </dependency>
 </dependencies>

</project>

Once done, you can just type

mvn package

and a jar will be created for you under the target directory.

Tree example

After compilation, here is how my directory structure looks like:

.
├── pom.xml
├── src
│   └── main
│       └── java
│           └── com
│               └── example
│                   └── flume
│                       └── interceptor
│                           └── eventTweaker.java
└── target
    ├── classes
    │   └── com
    │       └── example
    │           └── flume
    │               └── interceptors
    │                   ├── eventTweaker$1.class
    │                   ├── eventTweaker$Builder.class
    │                   └── eventTweaker.class
    ├── maven-archiver
    │   └── pom.properties
    └── eventTweaker-1.0.jar

Jar destination

You need to put the jar (in this case target/eventTweaker-1.0.jar) in the flume classpath for it to be loaded. You can do so in 2 ways:

  1. By editing the FLUME_CLASSPATH variable in the flume-env.sh file
  2. By adding it under the $FLUME_HOME/plugins.d directory. In this directory, put your jar at  your_interceptor_name/lib/yourinterceptor.jar and voilà, it will be loaded automagically.

Flume configuration

Debuging

You can see log statements in the usual flume.log file, probably under /var/log/flume-ng/flume.log

flume.conf

This is the easiest bit. On your source definition, you just have to add an interceptor name and class:

agent.sources.yoursource.interceptors=tweaker
agent.sources.yoursource.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder