Date:
Estimated Time:2 minutes
Hive Merge
Hive introduced ACID statement: INSERT, UPDATE, DELETE & MERGE. It turns-out HIVE 1.2.x is not optimised enough to support MERGE efficiently. An in depth documentation can be found there
The code below :
- is idempotent in someway. Suppose for some reason, the merge insert statement fails. Then running again all statements won't be an issue since no update at all will append again. Same case if the update statement fails.
- Simple version: makes the assumption that any data both present in the staging table and the target table needs to be updated.
- SCD1 and SCD2 have slighly difference that are mentionned if existing.
- SCD2 has been tested (upsert version) on a large target table ( 1.3 Bilion rows * 100 columns) and a source table of 100 Milion rows. As a result, the total runtime was les than 3 hours. (50% resource on a 5 computer hadoop cluster).
0) Staging step
Some data was put on hdfs in the <stg_table> to be merged with a target table. This might be done by scoop incrementally. Also, this might be the whole table. All the steps below will work the same in both case.
1) Preparation step
The 3 table above are necessary for any of SCD1 or SCD2. They are respectively the transformation, insert, update candidate tables.
-- create transformation table
CREATE TEMPORARY TABLE <trf_table> LIKE <target_table>
-- create insert table
CREATE TEMPORARY TABLE <ins_table> LIKE <target_table>
-- create update table
CREATE TEMPORARY TABLE <upd_table> LIKE <target_table>
2) Transformation step
This step loads the trf table (transformed) from the stg table (staging). It adds the hash column, and is the place to add other transformations/columns if needed.
-- example without transformations
INSERT INTO <trf_table>
SELECT * , HASH(*) as hash
FROM <stg_table>;
-- example with transformations
-- t1 and t2 are new columns added
INSERT INTO <trf_table>
SELECT * , HASH(stg.*) as hash
FROM (SELECT * , t1(), t2()
FROM <stg_table>) as stg;
3) Dispatching step
Simplified version:
-- variation when you already know that rows are all different
FROM <trf_table>
INSERT INTO <upd_table> SELECT * WHERE <maj_date> IS NOT NULL
INSERT INTO <ins_table> SELECT * WHERE <maj_date> IS NULL
Complete version:
-- feed both update & insert table
FROM (SELECT *
FROM <trf_table> trf
JOIN <target_table> targ
ON (
<end_date> IS NULL
targ.AND trf.<primary_key> = targ.<primary_key>
)WHERE TRUE
AND trf.hash <> targ.hash) as updateable
INSERT INTO TABLE <upd_table> SELECT * ;
-- feed the insert table with new rows
INSERT INTO <ins_table>
SELECT *
FROM <trf_table>
WHERE TRUE
AND <primary_key> NOT IN (
SELECT <primary_key>
FROM <target_table>
);
4.1) Upsert version
SCD 1:
Notice the delete is used in place of an update. The reason is an update cannot join an other table and thus apply updates where needed.
-- first delete update rows
DELETE FROM <target_table>
WHERE TRUE
AND <primary_key> IN (
SELECT <primary_key>
FROM <upd_table>);
-- second, insert all new rows
INSERT INTO <target_table>
SELECT *
FROM <ins_table>
UNION ALL
SELECT *
FROM <upd_table>;
SCD 2:
-- first update old rows
UPDATE <target_table>
SET <end_date> = current_date()
WHERE TRUE
AND <end_date> IS NULL
AND <primary_key> IN (
SELECT <primary_key>
FROM <upd_table>);
-- second, insert all new rows
INSERT INTO <target_table>
SELECT *
FROM <ins_table>
UNION ALL
SELECT *
FROM <upd_table>;
4.2) Merge version
SCD 1:
-- first update old rows
MERGE INTO <target_table>
USING
(SELECT null AS join_key, upd.* FROM <upd_table>
UNION ALL
SELECT <primary_key> AS join_key, ins.* FROM <ins_table>
AS sub
) ON (sub.join_key = <target_table>.join_key
AND <target_table>.<end_date> IS NULL)
WHEN MATCHED
THEN UPDATE SET col1=sub.col1, ... , hash=sub.hash
WHEN NOT MATCHED
THEN INSERT VALUES (sub.col1, ... , sub.hash) ;
SCD 2:
-- first update old rows
MERGE INTO <target_table>
USING
(SELECT null AS join_key, upd.* FROM <upd_table>
UNION ALL
SELECT null AS join_key, ins.* FROM <ins_table>
UNION ALL
SELECT <primary_key> AS join_key, ins.* FROM <upd_table>
AS sub
) ON (sub.join_key = <target_table>.join_key
AND <target_table>.<end_date> IS NULL)
WHEN MATCHED
THEN UPDATE SET <end_date> = current_date()
WHEN NOT MATCHED
THEN INSERT VALUES (sub.col1, ... , sub.hash) ;