OpenVpn and IpTables

I have a VPS, somewhere in the US (provided by digital ocean – so far they are the best I found. Cheap, easy to use, flexible, SSD disks with a decent amount of space), which I want to use for VPN.

There are tons of great tutorials about how to setup OpenVPN, but after following them I could connect to the vpn indeed, but I could not use it for anything. There was no internet connection.

One thing was missing, the iptables setup. It is indeed talked about in the openvpn.net howto, but it is not fully complete. I hope this little script will help other people as well:

# enable forwrading
echo 1 > /proc/sys/net/ipv4/ip_forward
# set up forwarding
iptables -A FORWARD -i eth0 -o tun0 -m state --state RELATED,ESTABLISHED -j ACCEPT
iptables -t nat -A POSTROUTING -s 10.8.0.0/24 -o eth0 -j MASQUERADE

You can copy this in a script a file (eg. /etc/gateway.sh), and have it run at startup by adding it to /etc/rc.local for instance.

The Vertica COPY statement

Why not just use INSERT?

Vertica being a columnar database, this means that data is grouped by column on disk. The consequence is that inserting or updating one row means updating all columns of the table, and so basically the whole table. To prevent this, Vertica has a batch uploader dealing with the update in a optimised way. Using it showed us an improvement of 15 rows inserted per second with INSERT to many thousands with COPY.

What is specific about COPY?

Basically, Vertica does not insert rows directly on disk. They are first inserted on the WOS, or Write Optimised Store, in memory. Loading data is thus very quick. After a while, when the WOS is full or after load, Vertica writes the WOS to the ROS, or Read Optimised Store.

Vertica itself manages overflowing from WOS to ROS and the merging of small inefficient ROS containers. It basically does all the housekeeping for you, even if you can influence it if you really want to by managing yourself the tuple mover.

How to use COPY?

This is quite easy, the vertica doc explains all the dirty details, but the main use will be something like this to load a compressed csv with 1 header line that we want to skip. In this case the CSV is on your client machine:

COPY schema.table 
    (column_a, column_b...)
FROM LOCAL '/tmp/bigdata.csv.gz' GZIP
WITH
    DELIMITER AS ','
    ENCLOSED BY '"'
    SKIP 1

To go further

This is really the crux on it. All the rest is technical details. They can be very powerful details, and I will explain a few here.

Source file location

The file can be located on your client (FROM LOCAL ‘path’) or already be on one of your nodes (FROM ‘path’ ON nodename). You can even load FROM STDIN.

Multiple files at once

Vertica supports globals. You can thus load for instance ‘/tmp/201312*.csv’.

Specification per column

The specifications (DELIMITER, ENCLOSED BY) are in my example defined globally. You can define them per column after each column name. If for instance the first column has ‘:’ as a delimiter, and the others ‘;’, you would write:

COPY schema.table 
    (column_a DELIMITER ':', column_b)
FROM LOCAL '/tmp/bigdata.csv.gz' GZIP
    DELIMITER ';'

Skip or transform columns

Use the FILLER parameter. The doc already has great examples, so I will just show them here. Transformation can be done this way:

CREATE TABLE t (k TIMESTAMP);
COPY t(year FILLER VARCHAR(10),
       month FILLER VARCHAR(10), 
       day FILLER VARCHAR(10), 
       k AS TO_DATE(YEAR || MONTH || DAY, 'YYYYMMDD'))
FROM STDIN NO COMMIT;
2009|06|17
1979|06|30
2007|11|26
\.
SELECT * FROM t;
         k 
--------------------- 
 2009-06-17 00:00:00 
 1979-06-30 00:00:00 
 2007-11-26 00:00:00 
 (3 rows)

Skipping columns is done by specifying the datatype to ignore. Not that in this example the first column is casted to timestamp.

create table t (k timestamp);
copy t(y FILLER date FORMAT 'YYYY-MM-DD', 
       t FILLER varchar(10),
       k as y) from STDIN no commit;
2009-06-17|2009-06-17
\.

Load method: big load vs. small load

You can add a parameter at the end of the statement to chose the load method: AUTO, TRICKLE or DIRECT.

  • AUTO is the default, will load first in the WOS (memory), then overflows to ROS (disk) when WOS is full.
  • TRICKLE for small loads, load only into the WOS (memory). Copying to ROS (disk) is done after loading. You will get an error if the WOS is full.
  • DIRECT for big batches, loads into the ROS (disk).

Error management

Last useful detail, you can setup some error management to do some postmortem on data which did not load properly.

COPY t FROM STDIN
    REJECTMAX 10
    EXCEPTIONS '/tmp/exceptions'
    REJECTED DATA '/tmp/raw_errors'

This will error the COPY statement out after 10 (REJECTMAX) rejected rows. The raw data of those rows will be written to REJECTED DATA path, with helpful information stored at EXCEPTION path.

Note that the REJECTED DATA file can become quite big. In one instance, I wrongly overrode the RECORD TERMINATOR parameter, which basically made Vertica think that my whole file was one big line. This line was thus of course badly formatted, and was helpfully written to REJECTED DATA. This file essentially became a full copy of my source file, blowing my partition up. So be careful here!

Monitoring

By looking at the v_monitor.load_streams table you can see information about historical and even current loads. The number of row loaded and sorted are the most interesting values to get.

select * from v_monitor.load_streams;
-[ RECORD 1 ]----------+---------------------------------
session_id             | vertica.local-27774:0xa799
transaction_id         | 45035996273707887
statement_id           | 1
stream_name            |
schema_name            | reports
table_id               | 45035996273722464
table_name             | sales
load_start             | 2014-08-20 09:02:55.631703+02
load_duration_ms       |
is_executing           | t
accepted_row_count     | 0
rejected_row_count     | 0
read_bytes             | 0
input_file_size_bytes  | 0
parse_complete_percent |
unsorted_row_count     | 435666962
sorted_row_count       | 60687177
sort_complete_percent  | 13

Pentaho kettle: how to remotely execute a job with a file repository

Pentaho/kettle background

Kettle (now known as pdi) is a great ETL tool, opensource with a paid enterprise edition if you need extra support or plugin.

One great feature is the ability to remotely execute one of your jobs for testing, without having to deploy anything. This is done via the carte server (part of pdi), which basically is a service listening on a port to which you send your jobs.

Carte background

Carte works very well when you are using a database repository, but you will run into issues when you use a file repository. The reason is that when you run a job remotely, kettle needs to bundle all the relevant jobs and transformation to send them over. This is not always possible, an obvious example is if some job names are parametrised.

There is still a way to deal with this. Carte’s behaviour is to use the jobs/transformations sent by kettle, or to use the one it can find locally if the repository names match.

The solution

The solution is then quite logical: copy over your current repository to the carte server, set it up with the same name as your local repository and you are good to go.

This is a bit painful to do manually, so I give here the job I wrote to do that automatically from inside pentaho. There are not  a lot of assumptions done, except that you can copy file to your carte server with scp (you thus need ssh access).

The flow is as follow:

  1. Delete existing compressed local repository if any
  2. Compress local repository
  3. Delete remote compressed repository if any
  4. Copy over compressed local repository
  5. Uncompress remote compressed repository

You can see this in the following picture (the read arrows show the inside of the transformations):

Copy a local repository

Copy a local repository

To be generic, a few configuration values must be added to your kettle.properties. They set up the remote server name, your username, various paths. The following is an example with comments for all fields.

# Hostname of your etl server where carte runs
ssh.etlhost=etlserver.example.com
# Name of your ssh user
ssh.etluser=thisdwhguy
# Use one of ssf.password or shh.keypath + ssh.keypass
# password of your ssh uer, leave empty if none
ssh.password=
# Where is your private key on your local machine
ssh.keypath=/Users/thisdwhguy/.ssh/id_rsa
# If your private key is password protected, add it here.
# If not, leave it empty
ssh.keypass=
# Where does your repo sits
local.repo=/Users/thisdwhguy/pentaho
# Where to compress locally the repository
zip.tmpdir=/Users/thisdwh/tmp
# What is the name of your compressed repository
# (can be anything, this is irrelevant but having
# it here keeps consistency)
zip.devclone=devclone.zip
# Where to uncompress the zip file? This setup
# allows multiple users and the final directory
# will be ${ssh.etltargetdir}/${ssh.etltargetuserrepo}
ssh.etltargetdir=/path/to/repositories
ssh.etltargetuserrepo=thisdwhguy

Caveats

This job assumes that you have ssh access. If this is the case, you can use this job as is, but there is one thing you might want to update.

I assumed that a key is used for ssh, but a password might be the only thing you need. If that is the case, update the 2 ssh steps and the copy step accordingly by unticking ‘use private key’.

That’s all, folks

This job should be quite easy to use. Do not hesitate to comment if you have questions.

Sadly I cannot attach a zip file to this post, and after doing some over-enthusiastic cleaning I lost the example file completely. I hope that the description given in this post is enough.

Official fix

It looks like this workaround is not needed anymore, since this bug fix PDI-13774, available in version 5.4.0GA.

Server automation – Introducing ansible

Ansible is a tool I find myself using more and more. To quote the project website:

Ansible is the simplest way to automate and orchestrate.

Indeed, ansible is the simplest way to automate. Once installed and a hostfile is setup (this is really an ini-like file, with section headers and lists of hostname), you can run any command, via sudo (or not) on the list of hosts defined, in parallel, without having to login manually on all machines.

A small example will show you the power of its command line. I just wanted to stop puppet on all our disco slaves. I thus defined a  /etc/ansible.host file as follow:

[disco]
slave1
slave2
...
slave16

Then I just needed to type:

ansible disco --inventory-file /etc/ansible.hosts --ask-sudo-pass --forks=5 -a "service pupwrapd stop"

And voila, pupwrapd was stopped on all servers, with 5 open connections. I could of course have done it on all 16 servers in parallel.

This is just the tiny top of the ansible iceberg. This is a very dynamic project, always evolving and improving, with the possibility to write scripts to do just about everything. There are a lot of modules including which will make your life a lot easier, and the documentation is quite comprehensive.

In short, if you do not know ansible yet, have a look!

Vertica optimisation part 2: best practices

This is the second post suggesting some Vertica performance improvement. After having looked at the system side of things, we will now look into how to use Vertica.

INSERT vs. COPY statements

Vertica is a columnar database. I will not get into details of what this means, but as data is stored per column, you can imagine that inserting one row impacts the whole table, as all the columns need to be updated.

Our first test with a standard ETL tool (pentaho) was using a usual INSERT into a big-ish table. We reached the great feat of inserting less than 100 rows per seconds. Then we switched to the Vertica way, ie. using a COPY statement. We then reached many thousands per seconds.

Lesson 1: Use the COPY command, not INSERT. I wrote another blog post specifically about the COPY statement.

Window and anlaytical functions

Vertica supports window functions. They are related to grouping as they return aggregated values, but they do so without aggregating the rows. An example taken from the Vertica documentation shows this. You can see that each line is still present (you do not have only one line per department), but you still do have a count, per employee, contrary to the aggregate.

  • Aggregate, 1 row per department:
SELECT dept_no, COUNT(*) AS emp_count 
FROM employees
GROUP BY dept_no ORDER BY dept_no;
dept_no | emp_count
---------+-----------
     10 | 2
     20 | 6
     30 | 3
(3 rows)
  • Analytical, all the rows are present
SELECT 
   emp_no
 , dept_no
 , COUNT(*) OVER(PARTITION BY dept_no ORDER BY emp_no) AS emp_count
FROM employees;
 emp_no | dept_no | emp_count
--------+---------+-----------
      1 |      10 |         1
      4 |      10 |         2
------------------------------
      6 |      20 |         1
      7 |      20 |         2
      8 |      20 |         3
      9 |      20 |         4
     10 |      20 |         5
     11 |      20 |         6
------------------------------
      2 |      30 |         1
      3 |      30 |         2
      5 |      30 |         3
(11 rows)

There is a lot more to tell, and this is not the place for this. You can do a lot with this analytical functions, which are a very strong point of Vertica, for instance:

  • Rank the longest-standing customers in a particular state
  • Calculate the moving average of retail volume over a specified time
  • Find the highest score among all students in the same grade
  • Compare the current sales bonus each salesperson received against his or her previous bonus

Why do I mention them here? Because Vertica very aggressively optimises those queries. It knows how to run the partitions in parallel, making for dramatic speed improvements when you can rewrite joins with window function. With a bit of experience, you even end up with SQL more compact and more readable.

Lesson 2: Use window functions

Projection

This is a big strength of Vertica. When a table is created, it creates a logical schema (CREATE TABLE), but a default physical schema as well (CREATE PROJECTION). A projection is a way to physically store data on disk, and are completely transparent for the end user. You as an administrator can still tailor your projections, though.

If for instance you know that for a specific table a specific column will always be in the WHERE condition, you can put it first in the projection order. As Vertica stores data ordered on disk, this column will thus be very quick to access, and Vertica, without even needing to uncompress data, will know where to find the relevant dataset. If this column would have been the last one on the projection order, it would have been already at least partly split because of the previous column, thus increasing access time.

If you think “index” you might be right, but this is a taboo word in the Vertica world.

What does that mean for you?

By default, Vertica (via the database designer or at first insert) is pretty good a creating projections for you. In some cases, though, you might want to go beyond. You could for instance create 2 projections for a table. This would double load time, of course, as data would have to be written twice on disk, but if the projections are properly tailored, they can make specific queries a lot faster (query-specific projection). If you are using a MERGE statement, having the 2 parts of the merge identically segmented (segmentation is part of defining a projection) will make the merge a lot faster, as it will prevent network traffic.

Furthermore, you can create pre-join projections, where you pay the join cost at load time, not a query time.

A word of warning here. Projections are extremely powerful, but you really can shoot yourself in the foot with them. You could, for instance, create twice the same projection on a table. This would double load time, without having any impact at query time, as Vertica choses only one projection per query.

There is a lot more to say about projections, you can read all about it on the online documentation.

Lesson 3: Set up good projections

Timestamp considered harmful

Just imagine. You have a big table, containing events. You receive 1000s of event per second, all containing a timestamp, at second level. Vertica will of course happily load those events, compressing and storing them on disk.

You then want to look at you events from yesterday. You thus SELECT based on your timestamp column. This column has a cardinality of 86.4k for yesterday only. This means 2.5M for a month. Of course, this will work. But the data stored on disk for this column on disk is huge due to the cardinality, and Vertica will have a lot of extra process to do.

Ask yourself this question: do you really need data at the level of the second? What is smallest timeslice you use when querying this table? If your heaviest queries never look at more precise than one hour, create an extra column with only an hourly timestamp. Just like that, you divided the cardinality of your column, and thus of the first column to look at in your query, by 3.6k, thus saving on IO and processing time. Of course, this does not mean that you need to fully get rid off the timestamp column, it will still be there for adhoc queries when needed. But you usual big queries will run a lot faster.

Lesson 4: Do not use timestamp in heavy queries.

Vertica optimisation part 1: system

Vertica out of the box is an amazing system, not needing a lot of configuration to perform well. That said, a few easy tweaks can even improve its performance. This first post will explain what can be done at a system level, a second post will suggest a few best practices.

Set readahead

This is the way Linux prefetches data. If you fetch a small amount of data from disk, the main cost incurred is head positioning and disk rotation. This means that the cost of fetching a larger amount of data in the same position on disk is dwarfed by the initial cost. This command fetches 8MB of data on each read. Just issue the following command as root or with sudo for a runtime change. Copy the line in /etc/rc.local for permanent change.

# where sdxx is the disk on which your data sits
/sbin/blockdev --setra 8192 /dev/sdxx

Set swappiness

The swappiness is what tells a Linux system how to balance data between memory and swap. By default it is set up to 60. Vertica does require some swap, even if you have enough memory. Not having enough swap can result in the out of memory killer (oom-killer) deciding to kill Vertica. Note that Vertica recommends having at least 2GB of cache in any case. Run the following as root or sudo at runtime, or write it in /etc/rc.local for permanent effect.

echo 0 > /proc/sys/vm/swappiness

Run powertop

Powertop is a little tool looking at what consumes the most energy on a system. Although mostly useful for laptops, it can tell you if some applications or daemon you would not think of is consuming energy, and thus probably resources.

Set up the IO scheduler

The default Linux scheduler is cfq (completely fair scheduler) which tries to balance the need of all applications. Although ideal for a desktop usage, this is not what we want for Vertica. We do not want to be fair, we just want Vertica to perform as fast as it can. For this purpose, the deadline scheduler, which goal is to reduce latency is a much better choice. Run the following as root or sudo, or again write it in /etc/rc.local to keep the change permanent.

echo 'deadline' > /sys/block/sdxx/queue/scheduler

Externalise ext4 journal

Ext4, the only filesystem supported by Vertica (with its parent, ext3), is a journaling filesystem. This means that it keeps track of changes to be made in a journal before committing the changes to disk, speeding up the recovery in case of crash. This journal is kept on disk, so adds extra writes. If you have another disk available, you can write the log on another disk, thus reducing the amount of IOs.

I will assume that the mount point on which vertica writes its data is /mnt/vertica, and the partition is /dev/sda1. The journal partition will be /dev/sdb1.

# stop all relevant services
# you can see with lsof which users are using with files 
# unmount the current ext4 fs
umount /mnt/vertica 

# remove journal from current FS
tune2fs -O ^has_journal  /dev/sda1# set up external journal
tune2fs -o journal_data_ordered -j -J device=/dev/sdb1 /dev/sda1

# remount
mount /mnt/vertica

Data partition mount option

By default, on ext4 each read and write of a file updates the metadata of the file to write the access time. This means that even a read will result on a disk write. This can be avoided to increase performance by adding an option in your /etc/fstab in the line mounting the Vertica partition. You have 2 options:

  • noatime: this prevents update on read and write, for the highest performance gain
  • relatime: this prevents update in read, but keep updating on writes

Create a catalog partition

The catalog can be put on its own partition to relieve the data partition from extra writes. The easiest is then to create a symlink from the initial catalog directory to a directory in the catalog partition.

Conclusion

I set all these enhancements up at the time were performance was very bad due to bad configuration. They had a huge impact (even the simplest queries ran about 2 orders magnitude faster), but I sadly do not have numbers to pinpoint exactly which improvement were the most efficient. I can just tell you that all together they are fantastic.