When a merge statement is issued, it is actually reparsed in a bunch of inserts. This shows interesting properties which can help you better understand the performance of your statement.
The test setup is easy: one source table, one destination table. The destination needs to be ACID and thus bucketed (this is pre-Hive 3).
create table tmp.tstmerge
(b int, v string)
partitioned by (id int)
clustered by (b) into 1 buckets
stored as orc
TBLPROPERTIES ("transactional"="true");
create table tmp.srcmerge
stored as orc
as
select 1 as id, 'foo' as v;
The merge statement is trivial as well:
merge into tmp.tstmerge dst
using tmp.srcmerge src
on src.id=dst.id
when matched and src.id = 0 then delete
when matched then update set
v=concat(dst.v, src.v)
when not matched then insert values (
src.b, src.v, src.id
);
What becomes interesting is that you can see in hiveserver2.log a line like:
Going to reparse YOUR_STATEMENT as ANOTHER_STATEMENT
For the given merge, here is the actual ANOTHER_STATEMENT:
FROM
tmp.tstmerge dst
RIGHT OUTER JOIN
tmp.srcmerge src
ON
src.id=dst.id
INSERT INTO tmp.tstmerge partition (id) -- delete clause
select
dst.ROW__ID , dst.id
WHERE
src.id=dst.id AND src.id = 0
sort by
dst.ROW__ID
INSERT INTO tmp.tstmerge partition (id) -- update clause
select
dst.ROW__ID, dst.b, concat(dst.v, src.v), dst.id
WHERE
src.id=dst.id AND NOT(src.id = 0)
sort by
dst.ROW__ID
INSERT INTO tmp.tstmerge partition (id) -- insert clause
select
src.b, src.v, src.id
WHERE
dst.id IS NULL
INSERT INTO merge_tmp_table
SELECT
cardinality_violation(dst.ROW__ID, dst.id)
WHERE
src.id=dst.id
GROUP BY
dst.ROW__ID, dst.id
HAVING
count(*) > 1
What do we have here?
- This becomes a multi-insert statement with 3 different selections for the update, insert and delete clauses. The multi insert is a hive extension.
- ROW__ID is a hidden column for acid tables, containing a map:
select row__id from tmp.tstmerge; +----------------------------------------------+--+ | row__id | +----------------------------------------------+--+ | {"transactionid":44,"bucketid":0,"rowid":0} | +----------------------------------------------+--+ - To update a row, you just need to change the columns of a row identified by its ROW__ID. Deleting a row is equivalent to nullifying all columns of a ROW__ID. This works because all clauses are insert in the ORC delta files anyway.
- cardinality_violiation is a UDF which will exception out if more than one row has the same set of ROW__ID and join condition. This is because the SQL syntax says that there cannot be more than 1 row in the source matching the destination. It will be executed (and thus exception out) only if such a row exists. On a side note, if you prevent cardinality checking (set hive.merge.cardinality.check=false;) then this leg of the multi insert does not exist.
- Rows are sort by ROW__ID. Note first that sort by will sort per reducer, whereas order by would sort across all reducers. Order by would be prohibitively expensive. The reason for the sorting is that when delta files are read they can be directly merged on read.
Practically this means that you cannot order data in ORC ACID tables (which is a shame as it is the one thing to do performance-wise when using ORC). Furthermore, any ordering in the src statement, if not meant to speed the join up, will be useless.