Clean pentaho shared connections from transformations and jobs

Pentaho has this nice shared.xml file, which can be found in your $HOME/.kettle repository. Once used, you can define all your connections there, in theory preventing duplicating connection definition in all jobs, and thus having one place only where to update your connections when needed.

The sad reality is that each time you save a job or a transformation, the connections are still always embedded in the job or transformation, effectively duplicating them. If you somehow remove the connection details from your job/transfo, the one from shared.xml will be used, which is what we want.

This ‘somehow’ can easily be achieved by the following snippet:

find . -type f -print0 | xargs -0 perl -0 -p -i -e 's/\s*<connection>\s*<.*?<\/connection>\s*$//smg'

We run it regularly on our codebase to keep it clean, and this always worked as expected.

Advertisement

Disco: purge all completed jobs

In Disco, jobs not purged still use some disk space, for temporary data. This can lead to fully using all the disc space in your cluster. I have been there, and it is not fun I promise you.

You can setup some purge policy (a job might purge itself after completion, for instance), but if you need to quickly clean up all jobs, this snippet will help. It will purge all completed jobs, successful or not.

for job in $(disco jobs);
do
  disco events $job | grep "WARN: Job killed\|READY";
  if [ $? -eq 0 ];
  then
    disco purge $job;
  fi;
done

Disco: replicate all data away from a blacklisted node

Before removing a node from Disco, the right way to do it is to blacklist it, replicate away its data, and finally remove it form the cluster. Replicating data away is done by running the garbage collector. Unfortunately, the garbage collector does not migrate everything in one go, so a few runs are needed. To not have to do this manually, the following script will run the garbage collector as often as needed as long as some nodes are blacklisted but not yet safe for removal. The full script can be found on github.

#!/usr/bin/env bash

# Will check if there are nodes blacklisted for ddfs but not fully replicated yet.
# If this is the case, it will run the GC as long as all data is not replicated away.

# debug
#set -x

# Treat unset variables as an error when substituting.
set -u

# master
HOST=disco.example.com:8989
DDFS_URL="http://$HOST/ddfs/ctrl"
DISCO_URL="http://$HOST/disco/ctrl"

# API commands
GC_STATUS=$DDFS_URL/gc_status
GC_START=$DDFS_URL/gc_start
BLACKLIST=$DISCO_URL/get_gc_blacklist
SAFE_GC=$DDFS_URL/safe_gc_blacklist

# counter to mark how many times the GC ran.
CNT=0

function is_running {
    # will get "" if GC not running, or a string describing the current status.
    _GC_RES=$(wget -q -O- $GC_STATUS)
    if [ "$_GC_RES" == '""' ]
    then
        _GC_RES=''
    fi
    echo $_GC_RES
}

function is_safe {
    _BLACKLISTED=$(wget -q -O- $BLACKLIST)
    _SAFE=$(wget -q -O- $SAFE_GC)

    # eg.
    # blacklisted:  ["slave1","slave2","slave3"]
    # safe_gc_blacklist: []

    # safe is a subset of get. If we concat the 2 (de-jsonised) and get uniques, we have 2 cases:
    # - no uniques =&gt; all nodes are safe (in blacklist *and* in safe)
    # - uniques =&gt; some nodes are not safe

    echo "$_BLACKLISTED $_SAFE" | tr -d '[]"' | tr ', ' '\n' | sort | uniq -u
}

while true
do

    GC_RES=$(is_running)

    if [ -z "$GC_RES" ]
    then
        echo "GC not running, let's check if it is needed."
        NON_SAFE=$(is_safe)
        if [ -z "$NON_SAFE" ]
        then
            echo "All nodes are safe for removal."
            exit
        else
            echo "Somes nodes are not yet safe: $NON_SAFE"
            CNT=$((CNT+1))
            date +'%Y-%m-%d %H:%M:%S'
            wget -q -O /dev/null $GC_START
            echo "Run $CNT started."
        fi
    else
        echo "GC running ($GC_RES). Let's wait".
    fi
    sleep 60
done

How to build a full flume interceptor by a non java developer

Building a flume interceptor does not look too complex. You can find some examples, howtos or cookbooks around. The issue is that they all explain the specificities of the interceptor, leaving a python/perl guy like me in the dark about maven, classpaths or imports. This posts aims to correct this. This will describe the bare minimum to get it working, but I link to more documentation if you want to go deeper. You can find the code on github as well.

The interceptor itself

You need to create a directory structure, in which to write the following java code. It needs to be under src/main/java/com/example/flume/interceptors/eventTweaker.java in this example, for an interceptor named com.example.flume.interceptor.eventTweaker.

package com.example.flume.interceptors;

import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import org.apache.log4j.Logger;

public class eventTweaker implements Interceptor {

  private static final Logger LOG = Logger.getLogger(eventTweaker.class);

  // private means that only Builder can build me.
  private eventTweaker() {}

  @Override
  public void initialize() {}

  @Override
  public Event intercept(Event event) {

    Map&lt;String, String&gt; headers = event.getHeaders();

    // example: add / remove headers
    if (headers.containsKey("lice")) {
      headers.put("shampoo", "antilice");
      headers.remove("lice");
    }

    // example: change body
    String body = new String(event.getBody());
    if (body.contains("injuries")) {
      try {
        event.setBody("cyborg".getBytes("UTF-8"));
      } catch (java.io.UnsupportedEncodingException e) {
        LOG.warn(e);
        // drop event completely
        return null;
      }
    }

    return event;
  }

  @Override
  public List&lt;Event&gt; intercept(List&lt;Event&gt; events) {
    for (Event event:events) {
      intercept(event);
    }
    return events;
  }

  @Override
  public void close() {}

  public static class Builder implements Interceptor.Builder {

    @Override
    public Interceptor build() {
      return new eventTweaker();
    }

    @Override
    public void configure(Context context) {}
  }
}

Maven configuration

To build the above, you need a pom.xml, to be put at the root of the aforementioned tree. The following is bare minimum which would probably make any java developer cringe, but It Works™.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>org.apache.flume</groupId>
 <artifactId>eventTweaker</artifactId>
 <version>1.0</version>

 <repositories>
 <repository>
 <id>central</id>
 <name>Maven Central</name>
 <url>http://repo1.maven.org/maven2/</url>
 </repository>
 <repository>
 <id>cloudera-repo</id>
 <name>Cloudera CDH</name>
 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
 </repository>
 </repositories>

 <properties>
 <flume.version>1.4.0-cdh4.4.0</flume.version>
 </properties>

 <dependencies>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-sdk</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>log4j</groupId>
 <artifactId>log4j</artifactId>
 <version>1.2.16</version>
 <exclusions>
 <exclusion>
 <groupId>com.sun.jdmk</groupId>
 <artifactId>jmxtools</artifactId>
 </exclusion>
 <exclusion>
 <groupId>com.sun.jmx</groupId>
 <artifactId>jmxri</artifactId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>4.11</version>
 </dependency>
 </dependencies>

</project>

Once done, you can just type

mvn package

and a jar will be created for you under the target directory.

Tree example

After compilation, here is how my directory structure looks like:

.
├── pom.xml
├── src
│   └── main
│       └── java
│           └── com
│               └── example
│                   └── flume
│                       └── interceptor
│                           └── eventTweaker.java
└── target
    ├── classes
    │   └── com
    │       └── example
    │           └── flume
    │               └── interceptors
    │                   ├── eventTweaker$1.class
    │                   ├── eventTweaker$Builder.class
    │                   └── eventTweaker.class
    ├── maven-archiver
    │   └── pom.properties
    └── eventTweaker-1.0.jar

Jar destination

You need to put the jar (in this case target/eventTweaker-1.0.jar) in the flume classpath for it to be loaded. You can do so in 2 ways:

  1. By editing the FLUME_CLASSPATH variable in the flume-env.sh file
  2. By adding it under the $FLUME_HOME/plugins.d directory. In this directory, put your jar at  your_interceptor_name/lib/yourinterceptor.jar and voilà, it will be loaded automagically.

Flume configuration

Debuging

You can see log statements in the usual flume.log file, probably under /var/log/flume-ng/flume.log

flume.conf

This is the easiest bit. On your source definition, you just have to add an interceptor name and class:

agent.sources.yoursource.interceptors=tweaker
agent.sources.yoursource.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder