Hadoop metrics in graphite

I will not present graphite here, if you end up reading this I assume you already have a graphite instance up and running. If not it is a matter of less than an hour to have a usable instance.

Hadoop uses metrics2 which allows multiple metrics output plugins to be used in parallel, supports dynamic reconfiguration of metrics plugins, provides metrics filtering, and allows all metrics to be exported via JMX.

Those metrics can be very easily exported to graphite to then be sliced and diced to your heart’s content.

You only need to modify the file hadoop-metrics2.properties by adding the following snippet:

# Sampling period
*.period=10

# Grahite sink class
*.sink.graphite.class=org.apache.hadoop.metrics2.sink.GraphiteSink

# Location of your graphite instance
*.sink.graphite.server_host=10.x.x.x
*.sink.graphite.server_port=2003

# Define for each metric group (* in *.prefix) how it should be named
# in graphite (part after the =)
datanode.sink.graphite.metrics_prefix=hadoop.datanode
namenode.sink.graphite.metrics_prefix=hadoop.namenode
resourcemanager.sink.graphite.metrics_prefix=hadoop.resourcemanager
nodemanager.sink.graphite.metrics_prefix=hadoop.nodemanager
jobhistoryserver.sink.graphite.metrics_prefix=hadoop.jobhistoryserver
journalnode.sink.graphite.metrics_prefix=hadoop.journalnode
maptask.sink.graphite.metrics_prefix=hadoop.maptask
reducetask.sink.graphite.metrics_prefix=hadoop.reducetask
applicationhistoryserver.sink.graphite.metrics_prefix=hadoop.applicationhistoryserver

In Ambari, just go to HDFS > Config > Advanced hadoop-metrics2.properties, the location for other distributions should be trivial to find.

After that restart hdfs and all relevant services you asked to monitor (if you asked to monitor resourcemanager, restart the resource managers and so on).

That’s it, you’re set.

If you are on HDP, you can go a bit further. HDP actually ships with a grafana instance (if you installed Ambari metrics) which can use graphite a data source. Data will be the same, display will be a tad prettier.

This uses graphite web (port 80 per default) which needs to enable CORS. You can do it in apache (the default graphite web http server) by adding this line in your graphite vhost:

Header set Access-Control-Allow-Origin "*"

 

Advertisements

--no options with argparse and python

Ruby has this very nice feature when you define options with optparse:

opts.on('--[no-]flag', "Set flag.") do |p|
    options.persistPost=p
end

which allows you to have the --flag and --no-flag options for free. Python does not have this, but there are a 3 options to go around that.

The verbose way

Just define 2 options.

  parser.add_argument(
    '--flag',
    dest='flag',
    action='store_true',
    help='Set flag',
  )
  parser.add_argument(
    '--no-flag',
    dest='flag',
    action='store_false',
    help='Unset flag',
  )

Custom action

You can give a custom action to the action parameter of add_argument. This custom action can look at the actual option given and act accordingly.

  parser.add_argument(
    '--flag', '--no-flag',
    dest='flag',
    action=BooleanAction,
    help='Set flag',
  )

BooleanAction is just a tiny 6 lines class, defined as follow:

class BooleanAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, **kwargs):
        super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, False if option_string.startswith('--no') else True)

As you can see, it just looks at the name of the flag, and if it starts with --no, the destination will be set to False.

Custom parser

Create your own add_argument method, which can then automagically add the --no option for you.
First define your own parser:

class BoolArgParse(argparse.ArgumentParser):
    def add_bool_arguments(self, *args, **kw):
        grp = self.add_mutually_exclusive_group()
        # add --flag
        grp.add_argument(*args, action='store_true', **kw)
        nohelp = 'no ' + kw['help']
        del kw['help']
        # add --no-flag
        grp.add_argument('--no-' + args[0][2:], *args[1:], action='store_false', help=nohelp, **kw)

Then use it:

parser = BoolArgParse()
parser.add_bool_arguments('--flag',dest='flag', help='set flag.')

Comparison

I do not want to say plus and min points as not all use cases want the same features, but there you are:

  • Verbose way:
    • More lines of code (need to define 2 flags),
    • Help more verbose,
    • Easy (no extra class),
    • Possibility to have the same parameter multiple times, the last one wins (eg. --flag --no-flag).
  • Custom action:
    • Less lines of code,
    • Help not verbose (only one line of help),
    • Possibility to have the same parameter multiple times, the last one wins (eg. --flag --no-flag).
  • Custom parser
    • The most lines of codes,
    • Help verbose but grouped,
    • Cannot have the same flag repeated.

Extracting queries from Hive logs

Hive logs are very verbose, and I personally find it a pain to wade through them when I try to understand which queries my ETL tool decided to generate.

To help with this, I created this small python script which looks at hive logs files and output the SQL queries and only the queries, with some information about them if known: time started, duration, success.

Usage:

./hqe.py --help 
usage: hqe.py [-h] [--since SINCE] [--to TO] [--logdir LOGDIR]
              [--glob LOGFILE_GLOB]
              [--loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}]

Displays queries ran on Hive.

optional arguments:
 -h, --help show this help message and exit
 --since SINCE how far to look back. (default: 15m)
 --to TO How far to look forward. (default: now)
 --logdir LOGDIR Directory of hive log files. (default: /var/log/hive)
 --glob LOGFILE_GLOB Shell pattern of hive logfiles inside their logdir.
 (default: hiveserver2.log*)
 --loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}, -l {DEBUG,INFO,WARNING,ERROR,CRITICAL}
 Log level. (default: warn)

Sample output:

Started at 2017-06-22 05:30:58 for 12.788000s by hive on ip-10-0-0-10.eu-west-2.compute.internal (Probably success). (Thread id: 79733, query id: hive_20170622053058_676612af-7bb8-4c4b-8fce-51bd1ae7be71, txn id: 0):
SELECT  
 id,
 count(*)
FROM 
 raw.event
GROUP BY
 1 
ORDER BY -- required for next step
 sys_partition

Started at 2017-06-22 05:31:25 for 0.018000s by Unknown on Unknown (Probably success). (Thread id: 79770, query id: hive_20170622053125_7d8e644a-5c23-4ca8-ab0f-20becdd65c3b, txn id: Unknown):
use events

Started at 2017-06-22 05:31:25 for Unknowns by Unknown on Unknown (FAILED). (Thread id: handler-46, query id: Unknown, txn id: Unknown):
MERGE INTO mart.click dst
USING (
 SELECT
 [big sql...]
 ) as r

FROM
 raw.click
 WHERE
 ${SEQ_CHECKER_SQL}
) src
ON

 [big sql...]

WHEN NOT MATCHED THEN INSERT VALUES (
  [more sql]
)
Error: ParseException line 36:4 cannot recognize input near '$' '{' 'SEQ_CHECKER_SQL' in expression specification

As you can see:

  • If user, hostname and duration are know they are displayed,
  • query is displayed with the same formatting as it was sent, inclusive comments,
  • error (if any) is showed. In my case, a variable is not expanded by the ETL tool.

You can find the source on github.

 

About (big) kafka broker id

I had quite a bit of fun setting up the kafka broker id, and those are my findings, hoping to save time to other poor souls like me.

TL;DR;

Set up in your kafka config

  • nothing to have auto-generated ids
  • broker.id=something_big AND reserved.broker.max.id=something_even_bigger to manually set your ids up.

Long Story

The broker id is a unique identifier per broker. Each broker in the cluster must have a different id, which is a positive int (meaning for java something less than 2147483647). This is all fine and dandy and works nicely if your ids are increasing from 1, 2…

Another option, nice for automated deployment, would be to generate ids based on the ip address, which should be unique in a DC thus (probably) in a cluster. With puppet, a nice ruby expression in a template like:

broker.id=<%= @ipaddress.split('.').inject(0) {|total,value| (total << 8 ) + value.to_i} & 0X7FFFFFFFF %>

would nicely do to generate a 31 bit int from the 32 bits IP (java has no unsigned int, so we cannot use the full range), discarding only the highest bit to keep as much variability as possible.

Now, it so happens that kafka can generate its ids as well, from a zookeeper sequence. To make sure there is no collision, the auto-generated ids will not be under the undocumented reserved.broker.max.id value, which is 1000 by default.

Conversely, manual ids cannot be above this limit. If you dare set up in your config file an id above this, kafka will just not start, and more annoyingly not give you any feedback beyond an exit code of 1. The solution once you discover this configuration option is easy, just set it up as high as possible, for instance to the max int possible:

reserved.broker.max.id=2147483647

The problem was to find out that it actually was the problem.

On a side note, changing the id after the first kafka start is a very bad idea, and you will end up with a message saying for instance:

kafka.common.InconsistentBrokerIdException: Configured brokerId 999 doesn’t match stored brokerId 838 in meta.properties

Get started with AWs and python

When you start for the first (or even second) time with AWS, it is a bit tricky to get your head around all the bits and bolts than need to be connected together. If on top of this you try to work with AWS in Beijing from outside China, the web GUI makes your work even harder because of slowness or even timeouts.

This scripts set up for you a full set of resources (vpc, route table, security group, subnet, internet gateway, instance with the relevant associations and attachments) for easy testing or bootstrapping of your infrastructure.

It is mostly meant as a testing help, so it does not handle all the options possible, but I find it invaluable to get started. You just need the AWS basics:

and it will do the rest for you. You need to provide a tag name (defaults to ‘roles’) and value, and all resources will be created and located via this tag, to allow for easy spawning and tearing down.

usage: fullspawn.py3 [-h] [--tag TAG] [--up | --down] [--wet | --dry]
 [--log {DEBUG,INFO,WARNING,ERROR,CRITICAL}] [--cidr CIDR]
 [--ami AMI] [--keypair KEYPAIR] [--profile PROFILE]
 [--instance INSTANCE]
 role

Spawns a full AWS self-contained infrastructure.

positional arguments:
 role Tag value used for marking and fetching resources.

optional arguments:
 -h, --help show this help message and exit
 --tag TAG, -t TAG Tag name used for marking and fetching resources.
 (default: roles)
 --up, -u Creates a full infra. (default: up)
 --down, -d Destroys a full infra. (default: up)
 --wet, -w Actually performs the action. (default: dry)
 --dry Only shows what would be done, not doing anything.
 (default: dry)
 --log {DEBUG,INFO,WARNING,ERROR,CRITICAL}
 Verbosity level. (default: WARNING)
 --cidr CIDR The network range for the VPC, in CIDR notation. For
 example, 10.0.0.0/16 (default: 10.0.42.0/28)
 --ami AMI The AMI id for your instance. (default: ami-33734044)
 --keypair KEYPAIR A keypair aws knows about. (default: yourkey)
 --profile PROFILE Profile to use for credentials. Will use AWS_PROFILE
 environment variable if set. (default: default)
 --instance INSTANCE Instance type. (default: t2.micro)

For instance:

# Let's see what would happen when creating a full infra...
./fullspawn.py3 -t tag --up --dry testing
# Look good let's do it.
./fullspawn.py3 -t tag --up --wet testing
# oops, this was a stupid tag name
./fullspawn.py3 -t tag --down --wet testing

You probably want to have a look at some variables inside the script, setting a few defaults which might not be relevant for you. I am thinking about the ami (AMI), the keypair (KEYPAIR) and the ingress rules (INGRESS) all defined before the argparse calls.

The code is available on github.

Enjoy!

Vagrant, hostmanager, virtualbox and aws

No need to present Vagrant if you read this post. Hostmanager is a plugin which manages /etc/hosts on the host and the guests, to have common fqdns and simplify connections to your box. You can for instance set up host manager to make sure that you web development virtual is always accessible at web.local, or make sure that all machines in your virtual cluster can talk to each other with names like dev1.example.com, dev2.example.com and so on, no matter what DHCP decided what the IP address should be.

This is all and well, but there are a few issues. For instance, with virtualbox and dhcp, hostmanager only finds out 127.0.0.1 as IP for your virtuals, which is not handy to say the least.

If you use aws, you would like the guests to use the private IP, while having the host using the public IP, which is not possible by default.

Luckily, you can write your own IP resolver. The one I give here, which can as well be found on github, solves the following issues:

  • Only for Linux guests (probably other unices as well to be honest, but I cannot guarantee that hostname -I works on all flavours)
  • dhcp and virtualbox
  • public/private IP with aws
$cached_addresses = {}
# There is a bug when using virtualbox/dhcp which makes hostmanager not find
# the proper IP, only the loop one: https://github.com/smdahlen/vagrant-hostmanager/issues/86
# The following custom resolver (for linux guests) is a good workaround.
# Furthermore it handles aws private/public IP.

# A limitation (feature?) is that hostmanager only looks at the current provider.
# This means that if you `up` an aws vm, then a virtualbox vm, all aws ips
# will disappear from your host /etc/hosts.
# To prevent this, apply this patch to your hostmanager plugin (1.6.1), probably
# at $HOME/.vagramt.d/gems/gems or (hopefully) wait for newer versions.
# https://github.com/smdahlen/vagrant-hostmanager/pull/169
$ip_resolver = proc do |vm, resolving_vm|
  # For aws, we should use private IP on the guests, public IP on the host
  if vm.provider_name == :aws
    if resolving_vm.nil?
      used_name = vm.name.to_s + '--host'
    else
      used_name = vm.name.to_s + '--guest'
    end
  else
    used_name= vm.name.to_s
  end

  if $cached_addresses[used_name].nil?
    if hostname = (vm.ssh_info && vm.ssh_info[:host])

      # getting aws guest ip *for the host*, we want the public IP in that case.
      if vm.provider_name == :aws and resolving_vm.nil?
        vm.communicate.execute('curl http://169.254.169.254/latest/meta-data/public-ipv4') do |type, pubip|
          $cached_addresses[used_name] = pubip
        end
      else

        vm.communicate.execute('uname -o') do |type, uname|
          unless uname.downcase.include?('linux')
            warn("Guest for #{vm.name} (#{vm.provider_name}) is not Linux, hostmanager might not find an IP.")
          end
        end

        vm.communicate.execute('hostname --all-ip-addresses') do |type, hostname_i|
          # much easier (but less fun) to work in ruby than sed'ing or perl'ing from shell

          allips = hostname_i.strip().split(' ')
          if vm.provider_name == :virtualbox
            # 10.0.2.15 is the default virtualbox IP in NAT mode.
            allips = allips.select { |x| x != '10.0.2.15'}
          end

          if allips.size() == 0
            warn("Trying to find out ip for #{vm.name} (#{vm.provider_name}), found none useable: #{allips}.")
          else
            if allips.size() > 1
              warn("Trying to find out ip for #{vm.name} (#{vm.provider_name}), found too many: #{allips} and I cannot choose cleverly. Will select the first one.")
            end
            $cached_addresses[used_name] = allips[0]
          end
        end
      end
    end
  end
  $cached_addresses[used_name]
end

Just put this code in your vagrantfile, outside the Vagrant.configure block, and you can use it by allocating it that way:

config.hostmanager.ip_resolver = $ip_resolver

On a side note, and as explained in the comment, there is a limitation (feature?) in that hostmanager only looks at the current provider. This means that if you up an aws vm, then a virtualbox vm, all aws ips will disappear from your host /etc/hosts.

To prevent this, apply this patch to your hostmanager plugin (1.6.1), probably
at $HOME/.vagramt.d/gems/gems or (hopefully) wait for newer versions.
The patch itself is ridiculously tiny:

diff --git a/lib/vagrant-hostmanager/hosts_file/updater.rb b/lib/vagrant-hostmanager/hosts_file/updater.rb
index 9514508..ef469bf 100644
--- a/lib/vagrant-hostmanager/hosts_file/updater.rb
+++ b/lib/vagrant-hostmanager/hosts_file/updater.rb
@@ -82,8 +82,8 @@ module VagrantPlugins
 
         def update_content(file_content, resolving_machine, include_id)
           id = include_id ? " id: #{read_or_create_id}" : ""
-          header = "## vagrant-hostmanager-start#{id}\n"
-          footer = "## vagrant-hostmanager-end\n"
+          header = "## vagrant-hostmanager-start-#{@provider}#{id}\n"
+          footer = "## vagrant-hostmanager-end-#{@provider}\n"
           body = get_machines
             .map { |machine| get_hosts_file_entry(machine, resolving_machine) }
             .join

EMR – Elastic Map Reduce

Amazon has its own flavour of Hadoop, and this page explores in which case it is worth using it instead of a usual Hadoop distribution on top of EC2.

What EMR is

Elastic Map Reduce, this is basically an Amazon-flavoured Hadoop distribution, patched and optimised to run on AWS, targeted towards one-off or very infrequent processing. It uses either Amazon’s own Hadoop or MapR.

Plus points

It is pretty easy to set up. Going to the EMR setup page, you just have a few knobs to click on to get a cluster up and running. Basically you choose if you want Amazon or MapR, the set of applications to be bundled in and the number and type of instances in your cluster. This can be done in hardly a minute and the cluster will automagically be provisioned for you.

It seems pretty much up to date, Spark 1.5 was available within a month of its release for instance.

The cluster can be managed in different ways, via the GUI, the console or APIs, making it very flexible to scale in or out.

Min points

The usual min points of something which is managed for you apply. There is only a limited set of applications bundled in, namely Hadoop, Hive, Hue, Mahout, Oozie-Sandbox, Pig, Presto-Sandbox, Spark and Zeppelin-Sandbox. If you need another one or a different version you are out of luck. It is possible to do some manual installation or updates but probably defeats the purpose of paying extra to have a managed cluster.

Running costs are higher than using your Hadoop cluster on EC2, as you still have to pay not only for the EC2 servers but for EMR as well. The cost to have EMR is about 20-25% on top of EC2 costs.

The default storage is S3, which is not meant for low-latency access. This might not be an issue for the use cases where EMR is really good, but can definitely become a problem if low latency is a must for you.

Interesting notes

You have the option, when setting a cluster up, to choose for a long-running or transient life-cycle. This gives you the option to spawn a cluster for very infrequent jobs, have them run, and destroy the cluster (so not paying for it while idle) after completion.

Note that you cannot have more than 256 jobs (named steps) active at the same time. In older versions, 256 jobs was the total over the lifetime of the cluster.

Usage

It is really easy to submit a job. The storage is all in S3, so once

  • your input data is in s3
  • your job, consisting of a mapper and a reducer (jar or streaming in any language you wish)
  • you created an output directory in S3

You basically just have to fill these paths into a form and the job will run.

My experience is that as expected the latency is very high.

It is possible to chain steps, but you must then use AWS data pipeline, not covered here.

Summary

Basically, EMR would be great in 2 situations:

  • Very infrequent use of data without strong latency requirements. You can then spawn a transient cluster, have it do whatever processing you planned to do and destroy it to save costs afterwards.
  • If the costs associated with managing a cluster would be higher than the extra EMR costs. This would probably be the case for short term cluster, which reinforce the previous point.