The Vertica COPY statement

Why not just use INSERT?

Vertica being a columnar database, this means that data is grouped by column on disk. The consequence is that inserting or updating one row means updating all columns of the table, and so basically the whole table. To prevent this, Vertica has a batch uploader dealing with the update in a optimised way. Using it showed us an improvement of 15 rows inserted per second with INSERT to many thousands with COPY.

What is specific about COPY?

Basically, Vertica does not insert rows directly on disk. They are first inserted on the WOS, or Write Optimised Store, in memory. Loading data is thus very quick. After a while, when the WOS is full or after load, Vertica writes the WOS to the ROS, or Read Optimised Store.

Vertica itself manages overflowing from WOS to ROS and the merging of small inefficient ROS containers. It basically does all the housekeeping for you, even if you can influence it if you really want to by managing yourself the tuple mover.

How to use COPY?

This is quite easy, the vertica doc explains all the dirty details, but the main use will be something like this to load a compressed csv with 1 header line that we want to skip. In this case the CSV is on your client machine:

COPY schema.table 
    (column_a, column_b...)
FROM LOCAL '/tmp/bigdata.csv.gz' GZIP
WITH
    DELIMITER AS ','
    ENCLOSED BY '"'
    SKIP 1

To go further

This is really the crux on it. All the rest is technical details. They can be very powerful details, and I will explain a few here.

Source file location

The file can be located on your client (FROM LOCAL ‘path’) or already be on one of your nodes (FROM ‘path’ ON nodename). You can even load FROM STDIN.

Multiple files at once

Vertica supports globals. You can thus load for instance ‘/tmp/201312*.csv’.

Specification per column

The specifications (DELIMITER, ENCLOSED BY) are in my example defined globally. You can define them per column after each column name. If for instance the first column has ‘:’ as a delimiter, and the others ‘;’, you would write:

COPY schema.table 
    (column_a DELIMITER ':', column_b)
FROM LOCAL '/tmp/bigdata.csv.gz' GZIP
    DELIMITER ';'

Skip or transform columns

Use the FILLER parameter. The doc already has great examples, so I will just show them here. Transformation can be done this way:

CREATE TABLE t (k TIMESTAMP);
COPY t(year FILLER VARCHAR(10),
       month FILLER VARCHAR(10), 
       day FILLER VARCHAR(10), 
       k AS TO_DATE(YEAR || MONTH || DAY, 'YYYYMMDD'))
FROM STDIN NO COMMIT;
2009|06|17
1979|06|30
2007|11|26
\.
SELECT * FROM t;
         k 
--------------------- 
 2009-06-17 00:00:00 
 1979-06-30 00:00:00 
 2007-11-26 00:00:00 
 (3 rows)

Skipping columns is done by specifying the datatype to ignore. Not that in this example the first column is casted to timestamp.

create table t (k timestamp);
copy t(y FILLER date FORMAT 'YYYY-MM-DD', 
       t FILLER varchar(10),
       k as y) from STDIN no commit;
2009-06-17|2009-06-17
\.

Load method: big load vs. small load

You can add a parameter at the end of the statement to chose the load method: AUTO, TRICKLE or DIRECT.

  • AUTO is the default, will load first in the WOS (memory), then overflows to ROS (disk) when WOS is full.
  • TRICKLE for small loads, load only into the WOS (memory). Copying to ROS (disk) is done after loading. You will get an error if the WOS is full.
  • DIRECT for big batches, loads into the ROS (disk).

Error management

Last useful detail, you can setup some error management to do some postmortem on data which did not load properly.

COPY t FROM STDIN
    REJECTMAX 10
    EXCEPTIONS '/tmp/exceptions'
    REJECTED DATA '/tmp/raw_errors'

This will error the COPY statement out after 10 (REJECTMAX) rejected rows. The raw data of those rows will be written to REJECTED DATA path, with helpful information stored at EXCEPTION path.

Note that the REJECTED DATA file can become quite big. In one instance, I wrongly overrode the RECORD TERMINATOR parameter, which basically made Vertica think that my whole file was one big line. This line was thus of course badly formatted, and was helpfully written to REJECTED DATA. This file essentially became a full copy of my source file, blowing my partition up. So be careful here!

Monitoring

By looking at the v_monitor.load_streams table you can see information about historical and even current loads. The number of row loaded and sorted are the most interesting values to get.

select * from v_monitor.load_streams;
-[ RECORD 1 ]----------+---------------------------------
session_id             | vertica.local-27774:0xa799
transaction_id         | 45035996273707887
statement_id           | 1
stream_name            |
schema_name            | reports
table_id               | 45035996273722464
table_name             | sales
load_start             | 2014-08-20 09:02:55.631703+02
load_duration_ms       |
is_executing           | t
accepted_row_count     | 0
rejected_row_count     | 0
read_bytes             | 0
input_file_size_bytes  | 0
parse_complete_percent |
unsorted_row_count     | 435666962
sorted_row_count       | 60687177
sort_complete_percent  | 13
Advertisements

Vertica optimisation part 2: best practices

This is the second post suggesting some Vertica performance improvement. After having looked at the system side of things, we will now look into how to use Vertica.

INSERT vs. COPY statements

Vertica is a columnar database. I will not get into details of what this means, but as data is stored per column, you can imagine that inserting one row impacts the whole table, as all the columns need to be updated.

Our first test with a standard ETL tool (pentaho) was using a usual INSERT into a big-ish table. We reached the great feat of inserting less than 100 rows per seconds. Then we switched to the Vertica way, ie. using a COPY statement. We then reached many thousands per seconds.

Lesson 1: Use the COPY command, not INSERT. I wrote another blog post specifically about the COPY statement.

Window and anlaytical functions

Vertica supports window functions. They are related to grouping as they return aggregated values, but they do so without aggregating the rows. An example taken from the Vertica documentation shows this. You can see that each line is still present (you do not have only one line per department), but you still do have a count, per employee, contrary to the aggregate.

  • Aggregate, 1 row per department:
SELECT dept_no, COUNT(*) AS emp_count 
FROM employees
GROUP BY dept_no ORDER BY dept_no;
dept_no | emp_count
---------+-----------
     10 | 2
     20 | 6
     30 | 3
(3 rows)
  • Analytical, all the rows are present
SELECT 
   emp_no
 , dept_no
 , COUNT(*) OVER(PARTITION BY dept_no ORDER BY emp_no) AS emp_count
FROM employees;
 emp_no | dept_no | emp_count
--------+---------+-----------
      1 |      10 |         1
      4 |      10 |         2
------------------------------
      6 |      20 |         1
      7 |      20 |         2
      8 |      20 |         3
      9 |      20 |         4
     10 |      20 |         5
     11 |      20 |         6
------------------------------
      2 |      30 |         1
      3 |      30 |         2
      5 |      30 |         3
(11 rows)

There is a lot more to tell, and this is not the place for this. You can do a lot with this analytical functions, which are a very strong point of Vertica, for instance:

  • Rank the longest-standing customers in a particular state
  • Calculate the moving average of retail volume over a specified time
  • Find the highest score among all students in the same grade
  • Compare the current sales bonus each salesperson received against his or her previous bonus

Why do I mention them here? Because Vertica very aggressively optimises those queries. It knows how to run the partitions in parallel, making for dramatic speed improvements when you can rewrite joins with window function. With a bit of experience, you even end up with SQL more compact and more readable.

Lesson 2: Use window functions

Projection

This is a big strength of Vertica. When a table is created, it creates a logical schema (CREATE TABLE), but a default physical schema as well (CREATE PROJECTION). A projection is a way to physically store data on disk, and are completely transparent for the end user. You as an administrator can still tailor your projections, though.

If for instance you know that for a specific table a specific column will always be in the WHERE condition, you can put it first in the projection order. As Vertica stores data ordered on disk, this column will thus be very quick to access, and Vertica, without even needing to uncompress data, will know where to find the relevant dataset. If this column would have been the last one on the projection order, it would have been already at least partly split because of the previous column, thus increasing access time.

If you think “index” you might be right, but this is a taboo word in the Vertica world.

What does that mean for you?

By default, Vertica (via the database designer or at first insert) is pretty good a creating projections for you. In some cases, though, you might want to go beyond. You could for instance create 2 projections for a table. This would double load time, of course, as data would have to be written twice on disk, but if the projections are properly tailored, they can make specific queries a lot faster (query-specific projection). If you are using a MERGE statement, having the 2 parts of the merge identically segmented (segmentation is part of defining a projection) will make the merge a lot faster, as it will prevent network traffic.

Furthermore, you can create pre-join projections, where you pay the join cost at load time, not a query time.

A word of warning here. Projections are extremely powerful, but you really can shoot yourself in the foot with them. You could, for instance, create twice the same projection on a table. This would double load time, without having any impact at query time, as Vertica choses only one projection per query.

There is a lot more to say about projections, you can read all about it on the online documentation.

Lesson 3: Set up good projections

Timestamp considered harmful

Just imagine. You have a big table, containing events. You receive 1000s of event per second, all containing a timestamp, at second level. Vertica will of course happily load those events, compressing and storing them on disk.

You then want to look at you events from yesterday. You thus SELECT based on your timestamp column. This column has a cardinality of 86.4k for yesterday only. This means 2.5M for a month. Of course, this will work. But the data stored on disk for this column on disk is huge due to the cardinality, and Vertica will have a lot of extra process to do.

Ask yourself this question: do you really need data at the level of the second? What is smallest timeslice you use when querying this table? If your heaviest queries never look at more precise than one hour, create an extra column with only an hourly timestamp. Just like that, you divided the cardinality of your column, and thus of the first column to look at in your query, by 3.6k, thus saving on IO and processing time. Of course, this does not mean that you need to fully get rid off the timestamp column, it will still be there for adhoc queries when needed. But you usual big queries will run a lot faster.

Lesson 4: Do not use timestamp in heavy queries.