Dundas Rest API and Python: introducing PyDundas

Dundas has a very complete REST API. You can do just about everything with it. It is what the web app uses, so if you are in need of an example, you can just look at the queries which are sent with the developer tools of your favorite browser.

That said, it is a bit of a pain to use in a script.

To make my life easy, I built PyDundas: a python package using in the background the API. This lets you abstract away all the nitty-gritty and you can concentrate on semantically pleasing code.

For instance, warehousing a cube is quite simple. It abstracts out for you logging, checking, waiting and more.

from pydundas import Api, Session, creds_from_yaml
import sys
import json
creds = creds_from_yaml('credentials.yaml')

with Session(**creds) as d:
    api = Api(d)
    capi = api.cube()
    cube = capi.getByPath('Awesome Project', '/relevant/path')
    if cube is None:
        print("Gotcha, no cube named like that.")
        sys.exit(1)
    print(cube.json())
    if not cube.is_checked_out():
        cube.clear_cache()

    cube.warehouse()
    print(cube.isWarehousing())
    cube.waitForWarehousingCompletion()
    print('Done')

There is a lot more that can be done with PyDundas. I developed it for my needs, so I only added what I actually needed and is thus far from complete. That said, I regularly update it and add new APIs, and it is open source so feel free to send me pull requests to expand it.

Advertisements

Accessing Dundas with Python

Dundas has a great REST API. You can basically do everything with. Furthermore, it’s easy to find examples, as you just have to look at what your Dundas web app does, and you have all the examples and use cases you can wish for.

I wanted to schedule cube refreshes, so I naturally turned toward Python. It wasn’t too complex, as you will see below. The one thing to take care of is logging out, in all possible cases, otherwise you will burn very fast through your elastic hours (been there, done that). I’ll show you here how I did it with a context manager, which means that I can basically not forget to log out, whatever happens, it’s all managed for me.

In my code in production, this object actually does something (refreshes cubes), this is only a skeleton to get you started. You can find the code as well on my github, it might be easier to read.

The code actually does not do much: the call to __new__ associated with contextlib.closing() allows you to use the object DundasSession within a context manager, with the keyword with, thus guaranteeing you that you will always log out, no matter what, even if an exception or a sys.exit occurs.

#!/usr/bin/env python3

"""
Skeleton to use Dundas Rest API, with guaranteed log out.
"""
import contextlib
import logging
import requests
import sys

class DundasSession:
    """
    Using __new__ + contextlib.closing() is awesome.

    DundasSession can now be used within a context manager, meaning that whatever happens, its close() method
    will be called (thus logging out). No need to call logout() explicitely!
    Just using __del__ is not guaranteed to work because when __del__ is called, you do not know which objects
    are already destroyed, and the session object might well be dead.
    """
    def __new__(cls, *args, **kwargs):
        o=super().__new__(cls)
        o.__init__(*args, **kwargs)
        return contextlib.closing(o)

    def __init__(self, user, pwd, url):
        # For session reuse - TCP connection reuse, keeps cookies.
        self.s = requests.session()

        self.user = user
        self.pwd = pwd
        self.url = url
        self.api = self.url + '/api/'
        self.session_id = None  # Will bet set in login()

    def login(self):
        """Login and returns the session_id"""
        login_data = {
            'accountName': self.user,
            'password': self.pwd,
            'deleteOtherSessions': False,
            'isWindowsLogOn': False
        }
        logging.info('Logging in.')
        r = self.s.post(self.api + 'logon/', json=login_data)
        # The following line exceptions out on not 200 return code.
        r.raise_for_status()

        resp = r.json()
        if resp['logOnFailureReason'].lower() == "none":
            # We're in!
            logging.info('Logged in')
            self.session_id = resp['sessionId']
        else:
            logging.error('Login failed with message: ' + r.text)
            sys.exit(1)

    def close(self):
        """Automagically called by the context manager."""
        self.logout()

    def logout(self):
        """If you do not logout, session will stay active, potentially burning through your elastic hours very fast."""

        # If session_id is not defined, we did not even log in (or we are already logged out).
        logging.info('Logging out.')
        if getattr(self, 'session_id', None):
            r = self.s.delete(self.api + 'session/current', params={'sessionId': self.session_id})
            r.raise_for_status()
            del self.session_id
            logging.info('Logged out.')
        else:
            logging.info('Was not yet Logged in.')

logging.basicConfig(level='INFO')

with DundasSession(user='yourapiuser', pwd='pwd', url='https://reports.example.com') as dundas:
    dundas.login()

    # Do something smart with your Dundas object.

# No need to log out, this is handled for you via the context manager, even in case of exception or even sys.exit.

Reaching Hive from pyspark on HDP3

There is a lot to find about talking to hive from Spark on the net. Sadly most of it refers to Spark before version 2 or are not valid for hdp3. You need to use the Hive Warehouse Connector, bundled in HDP3.

This is an example of a minimalistic connection from pyspark to hive on hdp3.

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Yes, llap even if you do not use it.
from pyspark_llap import HiveWarehouseSession

settings = [
    ('spark.sql.hive.hiveserver2.jdbc.url',
     'jdbc:hive2://{your_hiverserver2_url:port}/default'),
]

conf = SparkConf().setAppName("Pyspark and Hive!").setAll(settings)
# Spark 2: use SparkSession instead of SparkContext.
spark = (
    SparkSession
    .builder
    .config(conf=conf)
    .master('yarn')
    # There is no HiveContext anymore either.
    .enableHiveSupport()
    .getOrCreate()
)

# This is mandatory. Just using spark.sql will not be enough.
hive = HiveWarehouseSession.session(spark).build()

hive.showDatabases().show()
hive.execute("select 2 group by 1 order by 1").show()
spark.stop()

You then can run this with the following command:

HDP_VERSION=3.0.1.0-187 \
PYSPARK_PYTHON=python3 \
HADOOP_USER_NAME=hive \
SPARK_HOME=/usr/hdp/current/spark2-client \
spark-submit \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \
--py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \
{your_python_script.py}

Note:

  • HDP_VERSION is needed when you use python 3. If not set, HDP uses a script (/usr/bin/hdp-select) which is python 2 only (although fixing it is trivial).
  • PYSPARK_PYTHON is optional, it will default to just python otherwise (which might or might not be python 3 on your server)
  • without HADOOP_USER_NAME the script will run as your current user. Alternatively, you could sudo first.
  • without SPARK_HOME some jars would not be found and you would end up with an error like py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
    : java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
  • –jars and –py-files as you can see, there is the hdp version in file names. Make sure you are using the proper one for you.
  • there is no –master option, this is handled in the script while building the SparkSession.

There is some doc from Hortonworks you can follow to go further: Integrating Apache Hive with Spark and BI.

Just before I posted this article, a new write-up appeared on Hortonworks.com to describe some use cases for the Hive-Warehouse-Connector.

--no options with argparse and python

Ruby has this very nice feature when you define options with optparse:

opts.on('--[no-]flag', "Set flag.") do |p|
    options.persistPost=p
end

which allows you to have the --flag and --no-flag options for free. Python does not have this, but there are a 3 options to go around that.

The verbose way

Just define 2 options.

  parser.add_argument(
    '--flag',
    dest='flag',
    action='store_true',
    help='Set flag',
  )
  parser.add_argument(
    '--no-flag',
    dest='flag',
    action='store_false',
    help='Unset flag',
  )

Custom action

You can give a custom action to the action parameter of add_argument. This custom action can look at the actual option given and act accordingly.

  parser.add_argument(
    '--flag', '--no-flag',
    dest='flag',
    action=BooleanAction,
    help='Set flag',
  )

BooleanAction is just a tiny 6 lines class, defined as follow:

class BooleanAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, **kwargs):
        super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, False if option_string.startswith('--no') else True)

As you can see, it just looks at the name of the flag, and if it starts with --no, the destination will be set to False.

Custom parser

Create your own add_argument method, which can then automagically add the --no option for you.
First define your own parser:

class BoolArgParse(argparse.ArgumentParser):
    def add_bool_arguments(self, *args, **kw):
        grp = self.add_mutually_exclusive_group()
        # add --flag
        grp.add_argument(*args, action='store_true', **kw)
        nohelp = 'no ' + kw['help']
        del kw['help']
        # add --no-flag
        grp.add_argument('--no-' + args[0][2:], *args[1:], action='store_false', help=nohelp, **kw)

Then use it:

parser = BoolArgParse()
parser.add_bool_arguments('--flag',dest='flag', help='set flag.')

Comparison

I do not want to say plus and min points as not all use cases want the same features, but there you are:

  • Verbose way:
    • More lines of code (need to define 2 flags),
    • Help more verbose,
    • Easy (no extra class),
    • Possibility to have the same parameter multiple times, the last one wins (eg. --flag --no-flag).
  • Custom action:
    • Less lines of code,
    • Help not verbose (only one line of help),
    • Possibility to have the same parameter multiple times, the last one wins (eg. --flag --no-flag).
  • Custom parser
    • The most lines of codes,
    • Help verbose but grouped,
    • Cannot have the same flag repeated.

Get started with AWs and python

When you start for the first (or even second) time with AWS, it is a bit tricky to get your head around all the bits and bolts than need to be connected together. If on top of this you try to work with AWS in Beijing from outside China, the web GUI makes your work even harder because of slowness or even timeouts.

This scripts set up for you a full set of resources (vpc, route table, security group, subnet, internet gateway, instance with the relevant associations and attachments) for easy testing or bootstrapping of your infrastructure.

It is mostly meant as a testing help, so it does not handle all the options possible, but I find it invaluable to get started. You just need the AWS basics:

and it will do the rest for you. You need to provide a tag name (defaults to ‘roles’) and value, and all resources will be created and located via this tag, to allow for easy spawning and tearing down.

usage: fullspawn.py3 [-h] [--tag TAG] [--up | --down] [--wet | --dry]
 [--log {DEBUG,INFO,WARNING,ERROR,CRITICAL}] [--cidr CIDR]
 [--ami AMI] [--keypair KEYPAIR] [--profile PROFILE]
 [--instance INSTANCE]
 role

Spawns a full AWS self-contained infrastructure.

positional arguments:
 role Tag value used for marking and fetching resources.

optional arguments:
 -h, --help show this help message and exit
 --tag TAG, -t TAG Tag name used for marking and fetching resources.
 (default: roles)
 --up, -u Creates a full infra. (default: up)
 --down, -d Destroys a full infra. (default: up)
 --wet, -w Actually performs the action. (default: dry)
 --dry Only shows what would be done, not doing anything.
 (default: dry)
 --log {DEBUG,INFO,WARNING,ERROR,CRITICAL}
 Verbosity level. (default: WARNING)
 --cidr CIDR The network range for the VPC, in CIDR notation. For
 example, 10.0.0.0/16 (default: 10.0.42.0/28)
 --ami AMI The AMI id for your instance. (default: ami-33734044)
 --keypair KEYPAIR A keypair aws knows about. (default: yourkey)
 --profile PROFILE Profile to use for credentials. Will use AWS_PROFILE
 environment variable if set. (default: default)
 --instance INSTANCE Instance type. (default: t2.micro)

For instance:

# Let's see what would happen when creating a full infra...
./fullspawn.py3 -t tag --up --dry testing
# Look good let's do it.
./fullspawn.py3 -t tag --up --wet testing
# oops, this was a stupid tag name
./fullspawn.py3 -t tag --down --wet testing

You probably want to have a look at some variables inside the script, setting a few defaults which might not be relevant for you. I am thinking about the ami (AMI), the keypair (KEYPAIR) and the ingress rules (INGRESS) all defined before the argparse calls.

The code is available on github.

Enjoy!

Graph the noise level in your office in 15 minutes

This is a recurrent complaint in any open space: “There is too much noise!” (the other one is that the climate is too cold/too hot). There are some usual culprits, but it is nice to have data to back your complaints up.

I will here show you how to generate a real-time noise level graph in 15 minutes without any material beside our laptop or desktop, not even a microphone is needed. This is a dirty hack, but it works and can be put in place very quickly with just a few command lines. The steps, which will be mostly cut&paste are:

  • install a noise recorder tool
  • set up nginx to serve the data recorded
  • use a nice javascript library to display the data properly

I used soundmeter, a python tool. So first, install it:

# make sure we can install python packages
apt-get install virtualenv
# install required dependencies for building soundmeter
apt-get install python-dev portaudio19-dev python-dev alsa-utils
# install other useful tools for later display
apt-get install nginx expect-dev
# set up a directory for the tool
mkdir $HOME/soundmeter
# create virtualenv
virtualenv $HOME/soundmeter
# activate it
source $HOME/soundmeter/bin/activate
# install soundmeter
pip install soundmeter

Et voilà, your recorder is setup.

But do you not need a microphone? Well, either you have a laptop with a build in microphone, either you can just plug an headphone, which is basically a microphone used the other way (produce instead of record sound).

To get data, a one-liner is enough:

soundmeter --segment 2 --log /dev/stdout 2>/dev/null | unbuffer -p perl -p -e 's/\s*(\d+)\s+(.{19})(.*)/"$2,". 20*log($1)/e' > meter.csv

The explanation is as follow:

  • soundmeter: run soundmeter forever (–seconds to limit the duration)
  • --segment 2: output data every 2 seconds (default 0.5 seconds, but is very spiky)
  • --log /dev/stdout: default data on stdout is not useful for graphing, we need to log to a file. Use /dev/stdout as file to actually log to stdout
  • 2>/dev/null: do not pollute output
  • |: the output is not in a great format, it needs to be reformatted
  • unbuffer -p: by default data is buffered, which is annoying for real-time view. This does what the name suggests
  • perl -p -e: yummy, a perl regexp!
  • s///e: this will be a substitution, where the replacement part is a perl expression
  • \s*(\d+)\s+(.{19})(.*): record value and timestamp stripped of the milliseconds
  • “$2,”: display first the timestamp with a comma for csv format
  • 20*log($1): the values from soundmeter are in rms, transform them in dB via the formula 20 * log (rms)
  • > meter.csv: save data in a file

In short, we do the following transformation on the fly and write it to a csv file:

2015-09-22 13:36:13,082 12 => 21.5836249,2015-09-22 13:36:13

You now have a nice csv file. How to display it? Via a nice html page with the help of a javascript library, dygraphs,of course.

Set up nginx by adding in /etc/sites-enabled/noise the following content (replace YOUR_HOME by your actual home directory, of course):

server {
 listen 80;
 root YOUR_HOME/soundmeter;
}

and restart nginx:

service nginx restart

Then setup you page in $HOME/soundmeter/noise.html:

<html>
<head>
<script src="//cdnjs.cloudflare.com/ajax/libs/dygraph/1.1.1/dygraph-combined.js"></script>

<style>
#graphdiv2 { position: absolute; left: 50px; right: 10px; top: 50px; bottom: 10px; }
</style>

</head>
<body>
<div id="graphdiv2"></div>
<script type="text/javascript">
 g2 = new Dygraph(
 document.getElementById("graphdiv2"),
 "http://localhost/meter.csv", // path to CSV file
 {
 delimiter: ",",
 labels: ["Date", "Noise level"],
 title: ["Noise (in dB)"],
 showRoller: true,
 }
 );
</script>
</body>
</html>

You can of course replace localhost by your IP to publish this page to your colleagues.

Now just go to http://localhost/noise.html:

noise

Easily simulating connection timeouts

I needed an easy way to simulate timeout when connected to a REST API. As part of the flow of an application I am working on I need to send events to our data platform, and blocking the production flow ‘just’ to send an event in case of timeout is not ideal, and I needed a way to test this.

I know there are a few options:

  • Connecting to a ‘well known’ timing out url, as google.com:81, but this is very antisocial
  • Adding my own firewall rule to DROP connection, but this is a lot of work (yes, I am very very lazy and I would need to look up the iptables syntax)
  • Connecting to a non routable IP, like 10.255.255.1 or 10.0.0.0

All those options are fine (except the first one, which although technically valid is very rude and no guaranteed to stay), but they all give indefinite non configurable timeouts.

I thus wrote a small python script, without dependencies, which just listens to a port and makes the connection wait a configurable amount of seconds before either closing the connection, either returning a valid HTTP response.

Its usage is very simple:

usage: timeout.py [-h] [--http] [--port PORT] [--timeout TIMEOUT]

Timeout Server.

optional arguments:
 -h, --help show this help message and exit
 --http, -w if true return a valid http 204 response.
 --port PORT, -p PORT Port to listen to. Default 7000.
 --timeout TIMEOUT, -t TIMEOUT
 Timeout in seconds before answering/closing. Default
 5.

For instance, to wait 2 seconds before giving an http answer:

./timeout.py -w -t2

Would give you following output if a client connects to it:

./timeout.py -w -t2
Listening, waiting for connection...
Connected! Timing out after 2 seconds...
Processing complete.
Returning http 204 response.
Closing connection.

Listening, waiting for connection...

This is the full script, which you can find on github as well:

#!/usr/bin/env python
import argparse
import socket
import time


# Make the TimeoutServer a bit more user friendly by giving 3 options:
# --http/-w to return a valid http response
# --port/-p to define the port to listen to (7000)
# --timeout/-t to define the timeout delay (5)

parser = argparse.ArgumentParser(description='Timeout Server.')
parser.add_argument('--http', '-w', default=False, dest='http', action='store_true',
                    help='if true return a valid http 204 response.')
parser.add_argument('--port', '-p', type=int, default=7000, dest='port',
                    help='Port to listen to. Default 7000.')
parser.add_argument('--timeout', '-t', type=int, default=5, dest='timeout',
                    help='Timeout in seconds before answering/closing. Default 5.')
args = parser.parse_args()


# Creates a standard socket and listen to incoming connections
# See https://docs.python.org/2/howto/sockets.html for more info
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', args.port))
s.listen(5)  # See doc for the explanation of 5. This is a usual value.

while True:
    print("Listening, waiting for connection...")
    (clientsocket, address) = s.accept()
    print("Connected! Timing out after {} seconds...".format(args.timeout))
    time.sleep(args.timeout)
    print('Processing complete.')

    if args.http:
        print("Returning http 204 response.")
        clientsocket.send(
            'HTTP/1.1 204 OK\n'
            #'Date: {0}\n'.format(time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())
            'Server: Timeout-Server\n'
            'Connection: close\n\n'  # signals no more data to be sent)
        )

    print("Closing connection.\n")
    clientsocket.close()