How to build a full flume interceptor by a non java developer

Building a flume interceptor does not look too complex. You can find some examples, howtos or cookbooks around. The issue is that they all explain the specificities of the interceptor, leaving a python/perl guy like me in the dark about maven, classpaths or imports. This posts aims to correct this. This will describe the bare minimum to get it working, but I link to more documentation if you want to go deeper. You can find the code on github as well.

The interceptor itself

You need to create a directory structure, in which to write the following java code. It needs to be under src/main/java/com/example/flume/interceptors/eventTweaker.java in this example, for an interceptor named com.example.flume.interceptor.eventTweaker.

package com.example.flume.interceptors;

import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import org.apache.log4j.Logger;

public class eventTweaker implements Interceptor {

  private static final Logger LOG = Logger.getLogger(eventTweaker.class);

  // private means that only Builder can build me.
  private eventTweaker() {}

  @Override
  public void initialize() {}

  @Override
  public Event intercept(Event event) {

    Map<String, String> headers = event.getHeaders();

    // example: add / remove headers
    if (headers.containsKey("lice")) {
      headers.put("shampoo", "antilice");
      headers.remove("lice");
    }

    // example: change body
    String body = new String(event.getBody());
    if (body.contains("injuries")) {
      try {
        event.setBody("cyborg".getBytes("UTF-8"));
      } catch (java.io.UnsupportedEncodingException e) {
        LOG.warn(e);
        // drop event completely
        return null;
      }
    }

    return event;
  }

  @Override
  public List<Event> intercept(List<Event> events) {
    for (Event event:events) {
      intercept(event);
    }
    return events;
  }

  @Override
  public void close() {}

  public static class Builder implements Interceptor.Builder {

    @Override
    public Interceptor build() {
      return new eventTweaker();
    }

    @Override
    public void configure(Context context) {}
  }
}

Maven configuration

To build the above, you need a pom.xml, to be put at the root of the aforementioned tree. The following is bare minimum which would probably make any java developer cringe, but It Works™.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>org.apache.flume</groupId>
 <artifactId>eventTweaker</artifactId>
 <version>1.0</version>

 <repositories>
 <repository>
 <id>central</id>
 <name>Maven Central</name>
 <url>http://repo1.maven.org/maven2/</url>
 </repository>
 <repository>
 <id>cloudera-repo</id>
 <name>Cloudera CDH</name>
 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
 </repository>
 </repositories>

 <properties>
 <flume.version>1.4.0-cdh4.4.0</flume.version>
 </properties>

 <dependencies>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-sdk</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>${flume.version}</version>
 </dependency>
 <dependency>
 <groupId>log4j</groupId>
 <artifactId>log4j</artifactId>
 <version>1.2.16</version>
 <exclusions>
 <exclusion>
 <groupId>com.sun.jdmk</groupId>
 <artifactId>jmxtools</artifactId>
 </exclusion>
 <exclusion>
 <groupId>com.sun.jmx</groupId>
 <artifactId>jmxri</artifactId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>4.11</version>
 </dependency>
 </dependencies>

</project>

Once done, you can just type

mvn package

and a jar will be created for you under the target directory.

Tree example

After compilation, here is how my directory structure looks like:

.
├── pom.xml
├── src
│   └── main
│       └── java
│           └── com
│               └── example
│                   └── flume
│                       └── interceptor
│                           └── eventTweaker.java
└── target
    ├── classes
    │   └── com
    │       └── example
    │           └── flume
    │               └── interceptors
    │                   ├── eventTweaker$1.class
    │                   ├── eventTweaker$Builder.class
    │                   └── eventTweaker.class
    ├── maven-archiver
    │   └── pom.properties
    └── eventTweaker-1.0.jar

Jar destination

You need to put the jar (in this case target/eventTweaker-1.0.jar) in the flume classpath for it to be loaded. You can do so in 2 ways:

  1. By editing the FLUME_CLASSPATH variable in the flume-env.sh file
  2. By adding it under the $FLUME_HOME/plugins.d directory. In this directory, put your jar at  your_interceptor_name/lib/yourinterceptor.jar and voilà, it will be loaded automagically.

Flume configuration

Debuging

You can see log statements in the usual flume.log file, probably under /var/log/flume-ng/flume.log

flume.conf

This is the easiest bit. On your source definition, you just have to add an interceptor name and class:

agent.sources.yoursource.interceptors=tweaker
agent.sources.yoursource.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder

How many ways do you know to switch Linux off?

This is a question I love to ask during interviews:

How many ways do you know to switch off a linux system?

The point of this question is to see how somebody will think. Will the candidate only think about command line options, or will he suggest dirty/creative/violent ways as well? There are of course no good or bad answer, I am more interested in seeing the candidate think.

The GUI way

Click on the menu or equivalent, then switch off.

Using ACPI

Press the power button. On systems that support it, this should send a switchoff signal to cleanly switch off the system.

The clean command line way

shutdown and halt, poweroff, reboot: the last 3 are the same command, actually symlinked to /sbin/reboot

# does not power off the machine at the end
halt
# same as halt but actually powers off if supported
poweroff
# reboot
reboot
# same as above, but allows for delay and warning
shutdown

Using Magic SysRq keys

As wikipedia says, they are key combinations understood by the Linux kernel, which allows the user to perform various low-level commands regardless of the system’s state. Two of them allow to shutdown (o) or reboot (b) the system.

The dirty command line way

Most ACPI or magic key commands can be issued command line, by playing with /proc or /sys.

Killing init (kill -9 1) does not work since a while as it is ignored by the kernel. You can kill all processes though, with the special pid -1:

# does not work
kill -9 1
# Works
kill -9 -1

Being very dirty

Take the power plug off.

For desktops and laptops, long press the power button.

I am sure there are others ways I do not know about, I would love to hear about them.

OpenVpn and IpTables

I have a VPS, somewhere in the US (provided by digital ocean – so far they are the best I found. Cheap, easy to use, flexible, SSD disks with a decent amount of space), which I want to use for VPN.

There are tons of great tutorials about how to setup OpenVPN, but after following them I could connect to the vpn indeed, but I could not use it for anything. There was no internet connection.

One thing was missing, the iptables setup. It is indeed talked about in the openvpn.net howto, but it is not fully complete. I hope this little script will help other people as well:

# enable forwrading
echo 1 > /proc/sys/net/ipv4/ip_forward
# set up forwarding
iptables -A FORWARD -i eth0 -o tun0 -m state --state RELATED,ESTABLISHED -j ACCEPT
iptables -t nat -A POSTROUTING -s 10.8.0.0/24 -o eth0 -j MASQUERADE

You can copy this in a script a file (eg. /etc/gateway.sh), and have it run at startup by adding it to /etc/rc.local for instance.

The Vertica COPY statement

Why not just use INSERT?

Vertica being a columnar database, this means that data is grouped by column on disk. The consequence is that inserting or updating one row means updating all columns of the table, and so basically the whole table. To prevent this, Vertica has a batch uploader dealing with the update in a optimised way. Using it showed us an improvement of 15 rows inserted per second with INSERT to many thousands with COPY.

What is specific about COPY?

Basically, Vertica does not insert rows directly on disk. They are first inserted on the WOS, or Write Optimised Store, in memory. Loading data is thus very quick. After a while, when the WOS is full or after load, Vertica writes the WOS to the ROS, or Read Optimised Store.

Vertica itself manages overflowing from WOS to ROS and the merging of small inefficient ROS containers. It basically does all the housekeeping for you, even if you can influence it if you really want to by managing yourself the tuple mover.

How to use COPY?

This is quite easy, the vertica doc explains all the dirty details, but the main use will be something like this to load a compressed csv with 1 header line that we want to skip. In this case the CSV is on your client machine:

COPY schema.table 
    (column_a, column_b...)
FROM LOCAL '/tmp/bigdata.csv.gz' GZIP
WITH
    DELIMITER AS ','
    ENCLOSED BY '"'
    SKIP 1

To go further

This is really the crux on it. All the rest is technical details. They can be very powerful details, and I will explain a few here.

Source file location

The file can be located on your client (FROM LOCAL ‘path’) or already be on one of your nodes (FROM ‘path’ ON nodename). You can even load FROM STDIN.

Multiple files at once

Vertica supports globals. You can thus load for instance ‘/tmp/201312*.csv’.

Specification per column

The specifications (DELIMITER, ENCLOSED BY) are in my example defined globally. You can define them per column after each column name. If for instance the first column has ‘:’ as a delimiter, and the others ‘;’, you would write:

COPY schema.table 
    (column_a DELIMITER ':', column_b)
FROM LOCAL '/tmp/bigdata.csv.gz' GZIP
    DELIMITER ';'

Skip or transform columns

Use the FILLER parameter. The doc already has great examples, so I will just show them here. Transformation can be done this way:

CREATE TABLE t (k TIMESTAMP);
COPY t(year FILLER VARCHAR(10),
       month FILLER VARCHAR(10), 
       day FILLER VARCHAR(10), 
       k AS TO_DATE(YEAR || MONTH || DAY, 'YYYYMMDD'))
FROM STDIN NO COMMIT;
2009|06|17
1979|06|30
2007|11|26
\.
SELECT * FROM t;
         k 
--------------------- 
 2009-06-17 00:00:00 
 1979-06-30 00:00:00 
 2007-11-26 00:00:00 
 (3 rows)

Skipping columns is done by specifying the datatype to ignore. Not that in this example the first column is casted to timestamp.

create table t (k timestamp);
copy t(y FILLER date FORMAT 'YYYY-MM-DD', 
       t FILLER varchar(10),
       k as y) from STDIN no commit;
2009-06-17|2009-06-17
\.

Load method: big load vs. small load

You can add a parameter at the end of the statement to chose the load method: AUTO, TRICKLE or DIRECT.

  • AUTO is the default, will load first in the WOS (memory), then overflows to ROS (disk) when WOS is full.
  • TRICKLE for small loads, load only into the WOS (memory). Copying to ROS (disk) is done after loading. You will get an error if the WOS is full.
  • DIRECT for big batches, loads into the ROS (disk).

Error management

Last useful detail, you can setup some error management to do some postmortem on data which did not load properly.

COPY t FROM STDIN
    REJECTMAX 10
    EXCEPTIONS '/tmp/exceptions'
    REJECTED DATA '/tmp/raw_errors'

This will error the COPY statement out after 10 (REJECTMAX) rejected rows. The raw data of those rows will be written to REJECTED DATA path, with helpful information stored at EXCEPTION path.

Note that the REJECTED DATA file can become quite big. In one instance, I wrongly overrode the RECORD TERMINATOR parameter, which basically made Vertica think that my whole file was one big line. This line was thus of course badly formatted, and was helpfully written to REJECTED DATA. This file essentially became a full copy of my source file, blowing my partition up. So be careful here!

Monitoring

By looking at the v_monitor.load_streams table you can see information about historical and even current loads. The number of row loaded and sorted are the most interesting values to get.

select * from v_monitor.load_streams;
-[ RECORD 1 ]----------+---------------------------------
session_id             | vertica.local-27774:0xa799
transaction_id         | 45035996273707887
statement_id           | 1
stream_name            |
schema_name            | reports
table_id               | 45035996273722464
table_name             | sales
load_start             | 2014-08-20 09:02:55.631703+02
load_duration_ms       |
is_executing           | t
accepted_row_count     | 0
rejected_row_count     | 0
read_bytes             | 0
input_file_size_bytes  | 0
parse_complete_percent |
unsorted_row_count     | 435666962
sorted_row_count       | 60687177
sort_complete_percent  | 13

Pentaho kettle: how to remotely execute a job with a file repository

Pentaho/kettle background

Kettle (now known as pdi) is a great ETL tool, opensource with a paid enterprise edition if you need extra support or plugin.

One great feature is the ability to remotely execute one of your jobs for testing, without having to deploy anything. This is done via the carte server (part of pdi), which basically is a service listening on a port to which you send your jobs.

Carte background

Carte works very well when you are using a database repository, but you will run into issues when you use a file repository. The reason is that when you run a job remotely, kettle needs to bundle all the relevant jobs and transformation to send them over. This is not always possible, an obvious example is if some job names are parametrised.

There is still a way to deal with this. Carte’s behaviour is to use the jobs/transformations sent by kettle, or to use the one it can find locally if the repository names match.

The solution

The solution is then quite logical: copy over your current repository to the carte server, set it up with the same name as your local repository and you are good to go.

This is a bit painful to do manually, so I give here the job I wrote to do that automatically from inside pentaho. There are not  a lot of assumptions done, except that you can copy file to your carte server with scp (you thus need ssh access).

The flow is as follow:

  1. Delete existing compressed local repository if any
  2. Compress local repository
  3. Delete remote compressed repository if any
  4. Copy over compressed local repository
  5. Uncompress remote compressed repository

You can see this in the following picture (the read arrows show the inside of the transformations):

Copy a local repository

Copy a local repository

To be generic, a few configuration values must be added to your kettle.properties. They set up the remote server name, your username, various paths. The following is an example with comments for all fields.

# Hostname of your etl server where carte runs
ssh.etlhost=etlserver.example.com
# Name of your ssh user
ssh.etluser=thisdwhguy
# Use one of ssf.password or shh.keypath + ssh.keypass
# password of your ssh uer, leave empty if none
ssh.password=
# Where is your private key on your local machine
ssh.keypath=/Users/thisdwhguy/.ssh/id_rsa
# If your private key is password protected, add it here.
# If not, leave it empty
ssh.keypass=
# Where does your repo sits
local.repo=/Users/thisdwhguy/pentaho
# Where to compress locally the repository
zip.tmpdir=/Users/thisdwh/tmp
# What is the name of your compressed repository
# (can be anything, this is irrelevant but having
# it here keeps consistency)
zip.devclone=devclone.zip
# Where to uncompress the zip file? This setup
# allows multiple users and the final directory
# will be ${ssh.etltargetdir}/${ssh.etltargetuserrepo}
ssh.etltargetdir=/path/to/repositories
ssh.etltargetuserrepo=thisdwhguy

Caveats

This job assumes that you have ssh access. If this is the case, you can use this job as is, but there is one thing you might want to update.

I assumed that a key is used for ssh, but a password might be the only thing you need. If that is the case, update the 2 ssh steps and the copy step accordingly by unticking ‘use private key’.

That’s all, folks

This job should be quite easy to use. Do not hesitate to comment if you have questions.

Sadly I cannot attach a zip file to this post, and after doing some over-enthusiastic cleaning I lost the example file completely. I hope that the description given in this post is enough.

Official fix

It looks like this workaround is not needed anymore, since this bug fix PDI-13774, available in version 5.4.0GA.

Golden cards or how to do break out the flow for one day

I would love to do $something but my backlog is packed…

In my team, we are using kanban to manage our workflow. We are quite an operational team, so this fits us very well.

One point of frustration is that we do not get to chose our work. We do have our own long term projects, of course, but most of our work is operational depending on what our customers need. There is a lot to do, the pace is very high, so we never are out of tasks. Sometimes, though, something is nagging us. What if we were trying out this cool new tool? What if I could spend one day trying to answer a question based on our dataset? What if I have an idea with great potential but not immediate apparent business value?

This kind of things do not have a spot in our workflow, already packed.

We tried to setup hackathons, but this is quite tricky, at this mostly implies that the whole team must do it at the same time, whereas we do not have the same pressure or even time habits. By experience, it did not work out so well.

That’s why we introduced Golden Cards.

Golden Cards

Each team member gets 2 cards per month. When they play one of their card, for one day they are allowed to work on whatever they wish, diverting all non-emergency work to the other team members.

Effectively, this give us 10% of our time to do something we, as individual or as a team if many of us play a card together, want to do.

What is the expected outcome?

  • A direct effect was to make the team member happy, by giving them an interesting option.
  • Maybe one of us will build the Next Big Thing.
  • It already allowed some of us to block a day to learn new skills, which in the long run will be helpful, but the time was never quite right to do it.

Feedback

We are using golden cars since only one month.

So far, the team increased their knowledge and took time time to check out a new job scheduler. The feedback is very positive, if only because it makes the team members happy. Technically, our backend setup is a bit saner now, and will keep improving.

Having the opportunity to say ‘no’ to (almost) every request during one day is very liberating, and the day always feel like it was well used.

One thing to note is that it is worth blocking your calendar for the day, to make sure you do not end up in unwanted meetings.

So how does a golden card look like?

I am glad you asked.

A lot of us play Magic the Gathering, so here is what we ended up with, courtesy of Magic Card Maker.

Golden Card

Golden Card

Server automation – Introducing ansible

Ansible is a tool I find myself using more and more. To quote the project website:

Ansible is the simplest way to automate and orchestrate.

Indeed, ansible is the simplest way to automate. Once installed and a hostfile is setup (this is really an ini-like file, with section headers and lists of hostname), you can run any command, via sudo (or not) on the list of hosts defined, in parallel, without having to login manually on all machines.

A small example will show you the power of its command line. I just wanted to stop puppet on all our disco slaves. I thus defined a  /etc/ansible.host file as follow:

[disco]
slave1
slave2
...
slave16

Then I just needed to type:

ansible disco --inventory-file /etc/ansible.hosts --ask-sudo-pass --forks=5 -a "service pupwrapd stop"

And voila, pupwrapd was stopped on all servers, with 5 open connections. I could of course have done it on all 16 servers in parallel.

This is just the tiny top of the ansible iceberg. This is a very dynamic project, always evolving and improving, with the possibility to write scripts to do just about everything. There are a lot of modules including which will make your life a lot easier, and the documentation is quite comprehensive.

In short, if you do not know ansible yet, have a look!

Lean Kanban NL 2013: take aways of a techie

Those are the take aways of a techie after #LKNL13:

Main take aways

  • When a measure becomes a target, it ceases to be a good measure (variation on Goodhart’s law)
  • The best metric to prioritise is the cost of delay
  • Keep your options open as long as possible, push back commitment as late as possible to gather the most insight first
  • Success hinges on being wrong early
  • If human are involved, you have a complex environment
  • Bottlenecks:
    • They are not an actual issue, but their impact can be
    • Use them to keep resource available (not use the team all the time at 100%) and to do some non mission critical work
  • Moving a person (eg. go to another team standup) is a lot easier and more efficient than moving knowledge by eg. documentation.

Kanban in a nutshell

  • Principles of KB
    1. Start with what you do know
    2. Agree to pursue evolutionary changes 
    3. Initially, respect roles responsibilities and job titles
    4. Encourage acts of leadership at all levels
    5. (1-3 are to go around resistance to change, or more accurately, resistance to be changed)
  • Concepts of KB
    1. Service-orientation (DBA service, architecture service)
    2. Service delivery involves workflow
    3. Work flows through a series of information discovery activities
  • Practices of KB
    1. Visualise
    2. Limit wip
    3. Manage flow
    4. Make policies explicit
    5. Implement feedback loops
    6. Improve collaboratively, evolve experimentally

One book came again and again as recommended: The Principles of Product Development Flow, by Don Reinertsen.

The slides can be found on lknl13 page.

Talk by talk take aways

David J. Anderson, Modern Management Method

Modern management means that innovation changed the way of doing something – this is not just about a new tool.

Dave Snowden, Complexity: the new paradigm in decision making

  • Successful companies do not follow the recipes of other. What lead to success in one company does not mean it will lead to success for your company.
  • When a measure becomes a target, it ceases to be a good measure (variation on Goodhart’s law)
  • Intrinsic motivation is destroyed by extrinsic targets.
  • Look into cynefin framework. Systems divided in 5:
    • Complicated (known unknown, good practices)
    • Simple (known knowns, best practices)
    • Complex (knowable (by experimentation) unknowns, emergent)
    • Chaotic (unknowable unknowns, novel)
    • Disorder: transition phase or when a preference is imposed
    • Collapse from complacency: sudden and unanticipated collapse into chaos from apparent security
  • Complexity cannot be eliminated, only absorbed.
  • Complexity is about flow. Need to manage the system as a whole, not in part.
  • You can intervene on complexity (experimental action), while keeping in mind:
    • Needs to be coherent (not necessary right)
    • Safe to fail
    • Small and tangible
    • Managed as a portfolio (many experiments in parallel)
    • Sometimes oblique approach
    • Includes naive approach
    • A few high risk/high return

Steve Tendon, Unity of purpose and community of trust

  • Different incentives and metrics create internal conflicts => different units will have different agendas, because driven by different incentives.
  • There should be 1 system wide metric: throughput metric
  • Need a system view that avoids local optimisations

Gaetano Mazzanti, People as bottlenecks

  • The issue is not actually the bottlenecks, but their economical impact.
  • Trust != hero culture. Trust can scale, heroes cannot

Donald Reinertsen, Shooting crooked arrows at moving target in the fog

  • Close gaps on basis of economics, not ideology (is it cost effective to complete the last 5% of this feature?)
  • Seek and respond to disconfirming information (confirmation bias)
  • Think about the economics of creating exploitable options
  • Buy information in small batches,act upon any new information
  • Improve response time by decentralising the information, the authority and the resources necessary to quickly redirect programs
  • Encourage and reward initiative
  • Inform your decisions with economics and statistics

Troy Magennis, Cycle time analytics

  • Success hinges on being wrong early
  • Always compare model vs. actual

Joshua Bloom, What is the value of social capital?

  • Emergent slack, resilience with benefit: use bottleneck to create ‘free’ time of people to non mission critical work.
  • Moving a person (eg. go to another team standup) is a lot easier and more efficient than moving knowledge by eg. documentation.
  • Your software will only have qualities already present in the organisation. The organisation itself needs to have these qualities
  • Teaming (verb, a dynamic activity) is more important than teams.

Olav Maassen – Risks and decisions – the ‘When’ rather than the ‘How’

  • Option theory: postpone commitment to the latest possible. Keep all options open
    • Options have value
    • Options expire
    • Never commit early unless you know why
  • Always ask yourself: how does this create more options to me? How does this allow me to push decision to a later date to get more information?

David J. Anderson, Kanban and evolutionary management: Lessons we can learn from Bruce Lee’s journey in martial arts.

  • See principles, concepts and practices above.
  • Bruce Lee’s JKD is based on teh same principles: use what works, test in real life

Astrid Claessen, Gamestorming your retrospectives

  • Danger in retro: boredom/familiar feeling, hence gamestorming
  • Gamestorming is actual work

Ajay Reddy & Dimitar Bakaradzhiev, not everything that counts can be counted and not everything that can be counted counts

  • feasibility of measuring
    • Your problem is not as unique as you think
    • You have more data than you think
    • You do not need as much data as you think
    • It is easier than you think to get the data you need
  • Good measures
    • Lead time
    • Throughput
    • WIP
    • Cumulative flow diagram (representation of wip at each stage in the system)
    • Flow efficiency (work time against lead time, indication of waste)