Date:

    Estimated Time:2 minutes

    Hive Merge

    Hive introduced ACID statement: INSERT, UPDATE, DELETE & MERGE. It turns-out HIVE 1.2.x is not optimised enough to support MERGE efficiently. An in depth documentation can be found there

    The code below :

    0) Staging step

    Some data was put on hdfs in the <stg_table> to be merged with a target table. This might be done by scoop incrementally. Also, this might be the whole table. All the steps below will work the same in both case.

    1) Preparation step

    The 3 table above are necessary for any of SCD1 or SCD2. They are respectively the transformation, insert, update candidate tables.

    -- create transformation table
    CREATE TEMPORARY TABLE <trf_table> LIKE <target_table>
    
    -- create insert table
    CREATE TEMPORARY TABLE <ins_table> LIKE <target_table>
    
    -- create update table
    CREATE TEMPORARY TABLE <upd_table> LIKE <target_table>

    2) Transformation step

    This step loads the trf table (transformed) from the stg table (staging). It adds the hash column, and is the place to add other transformations/columns if needed.

    -- example without transformations
    INSERT INTO <trf_table>
    SELECT * , HASH(*) as hash
    FROM <stg_table>;
    
    -- example with transformations
    -- t1 and t2 are new columns added
    INSERT INTO <trf_table>
    SELECT * , HASH(stg.*) as hash
    FROM (SELECT * , t1(), t2()
          FROM <stg_table>) as stg;

    3) Dispatching step

    Simplified version:

    -- variation when you already know that rows are all different
    FROM <trf_table>
    INSERT INTO <upd_table> SELECT * WHERE <maj_date> IS NOT NULL
    INSERT INTO <ins_table> SELECT * WHERE <maj_date> IS NULL

    Complete version:

    -- feed both update & insert table
    FROM (SELECT *
          FROM  <trf_table> trf
          JOIN <target_table> targ
          ON (
          targ.<end_date> IS NULL
          AND trf.<primary_key> = targ.<primary_key>
          )
          WHERE TRUE
          AND trf.hash <> targ.hash) as updateable
    INSERT INTO TABLE <upd_table> SELECT * ;
    
    -- feed the insert table with new rows
    INSERT INTO <ins_table>
    SELECT *
    FROM <trf_table>
    WHERE TRUE
    AND <primary_key> NOT IN (
         SELECT <primary_key>
         FROM <target_table>
    );

    4.1) Upsert version

    SCD 1:

    Notice the delete is used in place of an update. The reason is an update cannot join an other table and thus apply updates where needed.

    -- first delete update rows
    DELETE FROM <target_table>
    WHERE TRUE
    AND <primary_key> IN (
         SELECT <primary_key>
         FROM <upd_table>);
    
    -- second, insert all new rows
    INSERT INTO <target_table>
    SELECT *
    FROM <ins_table>
    UNION ALL
    SELECT *
    FROM <upd_table>;

    SCD 2:

    -- first update old rows
    UPDATE <target_table>
    SET <end_date> = current_date()
    WHERE TRUE
    AND <end_date> IS NULL
    AND <primary_key> IN (
         SELECT <primary_key>
         FROM <upd_table>);
    
    -- second, insert all new rows
    INSERT INTO <target_table>
    SELECT *
    FROM <ins_table>
    UNION ALL
    SELECT *
    FROM <upd_table>;

    4.2) Merge version

    SCD 1:

    -- first update old rows
    MERGE INTO <target_table>
    USING
    (
    SELECT null AS join_key, upd.* FROM <upd_table>
    UNION ALL
    SELECT <primary_key> AS join_key, ins.* FROM <ins_table>
    ) AS sub
    ON (sub.join_key = <target_table>.join_key
        AND <target_table>.<end_date> IS NULL)
    WHEN MATCHED
         THEN UPDATE SET col1=sub.col1, ... , hash=sub.hash
    WHEN NOT MATCHED
         THEN INSERT VALUES (sub.col1, ... , sub.hash) ;

    SCD 2:

    -- first update old rows
    MERGE INTO <target_table>
    USING
    (
    SELECT null AS join_key, upd.* FROM <upd_table>
    UNION ALL
    SELECT null AS join_key, ins.* FROM <ins_table>
    UNION ALL
    SELECT <primary_key> AS join_key, ins.* FROM <upd_table>
    ) AS sub
    ON (sub.join_key = <target_table>.join_key
        AND <target_table>.<end_date> IS NULL)
    WHEN MATCHED
         THEN UPDATE SET <end_date> = current_date()
    WHEN NOT MATCHED
         THEN INSERT VALUES (sub.col1, ... , sub.hash) ;

    React ?

    This page was last modified: