A merge template for SCD2 loading

For me, working mainly with BI, the MERGE operator is by far one of the most useful ones. Since a lot of the work in a data warehouse is to store data in a structured and organized way the MERGE comes in very handy. Adding on that we usually want to track changes and also know which row that is the most recent one the SCD2 part also comes into play.

Therefore in this post I will post a template for one way of how to implement a MERGE procedure in a BI context. This is what I usually use but feel free to modify it to suit your needs.

Two excellent resources if you want to know (way) more

The template

-- =============================================
-- Author:      mattssok.com
-- Create date: 2018-
-- Description: Template for a procedure to load data into 
--              your data warehouse environment. It can handle historical 
--              loads and is ideal e.g. when loading satellites in a
--              data vault pattern.
-- Execute:     EXEC [dva].[usp_LoadTargetData] 20190101
-- =============================================

CREATE OR ALTER PROCEDURE [dva].[usp_LoadTargetData]
  @LoadProcess bigint
AS

-- Error handling reference: http://www.sommarskog.se/error_handling/Part1.html
-- Same reference for the error procedure in the end
SET XACT_ABORT, NOCOUNT ON

BEGIN TRY

  DECLARE @SourceVar  nvarchar(100)  -- where does this data come from
  DECLARE @Counter    smallint       -- to keep track of the first version of a row (if you need the while loop)
 
  SET @SourceVar  = N'database.schema.table'

 /*
  * Here would be a good place to set your counter if that is used
  */

 /* 
  * ----------------------------------------------------------------------
  * SECTION 0.1
  *  Prepare the source data expression. Personally I use CTE when the 
  *  source is trivial and temp tables when it's more complex since I find 
  *  it easier to search for errors in those
  * ----------------------------------------------------------------------
  */
  -- I usually post status messages during the procedure to see whats going on
  RAISERROR(N'#### Start of procedure ####', 0, 1) WITH NOWAIT;
  RAISERROR(N' - Prepare #SourceData', 0, 1) WITH NOWAIT;
  DROP TABLE IF EXISTS #SourceData; 
  SELECT
    SourceData.*
  INTO 
    #SourceData
  FROM
  (
    SELECT 
      x
      ,y
      ,z
      ,SourceUpdated
      ,RecordSource = @SourceVar
    FROM 
      [Table]
  ) AS SourceData

  -- If this variable is used
  DECLARE @CounterMsg nvarchar(100) = N' - @Counter variable set to ' + CAST(@Counter AS nvarchar(5));
  RAISERROR(@CounterMsg , 0, 1) WITH NOWAIT;

  -- Once again, if this approach is used
  -- Otherwise just remove the while and begin block
  WHILE (@Counter >= 1) -- we count from max (above) to 1 to keep track of the newest row
  BEGIN
    BEGIN TRANSACTION

      DECLARE @StartMsg nvarchar(100) = N' ## Start of iteration ' + CAST(@Counter AS nvarchar(5)) + N' at ' + CONVERT(nvarchar(20), GETUTCDATE(), 121);
      RAISERROR(@StartMsg , 0, 1) WITH NOWAIT;
    
     /* 
      * ------------------------------------------------------------------
      * SECTION 1.0
      *  The truncation of a staging table that resolves the problem of FK reference 
      *  with constraints of both side of the merge. Data is inserted here and at 
      *  the last step it is inserted into the proper table. 
      * ------------------------------------------------------------------
      */ 
      TRUNCATE TABLE stg.[TargetDataInserts];
 
     /* 
      * ------------------------------------------------------------------
      * SECTION 1.0
      *  Insert the updated rows from the merge in staging table. These are rows
      *  that has been changed (updated) in the merge statement.
      * ------------------------------------------------------------------
      */ 
      INSERT INTO stg.[TargetDataInserts]
      (
        [x]
        ,[y]
        ,[z]
        ,[SourceUpdated]
        ,[EtlBatchId]
        ,[RecordSource]
        ,[RowIsCurrent]
        ,[RowStartDate]
        ,[RowEndDate]
      )
      SELECT
        [x]
        ,[y]
        ,[z]
        ,[SourceUpdated]
        ,@LoadProcess  AS EtlBatchId
        ,RecordSource
        ,'Y'               -- IsCurrent Flag    
        ,GETDATE()         -- / or SourceDate / or something else
        ,NULL              -- Default expiry date
      FROM
      (
    
       /* 
        * ----------------------------------------------------------------
        * SECTION 2.0
        *  Start  the Merge.
        *  As a a source we have the temporary table that we prepared above
        * ----------------------------------------------------------------
        */ 
        MERGE dbo.TargetTable AS Target
        USING
        (
          SELECT * FROM #SourceData
        ) AS Source
        ON Target.x = Source.x           
        AND Target.RowIsCurrent = 'Y'    -- only merge against current records

        WHEN MATCHED AND
        (
          -- this assumes that your source and target has a well functioning time stamp
          DATEDIFF(SS, Target.[SourceUpdated], Source.[SourceUpdated]) > 0
          -- OR Source.x > Target.x
          -- OR ...
        )
    
        -- then outdate the existing record
        THEN UPDATE SET
          RowIsCurrent  = 'N',
          RowEndDate    = DATEADD(ss, -1, Source.[SourceUpdated]) -- when using the while loop I remove one second due to overlapping reasons
    
        WHEN NOT MATCHED BY TARGET THEN INSERT
        (
           [x]
          ,[y]
          ,[z]
          ,[SourceUpdated]
          ,[EtlBatchId]
          ,[RecordSource]
          ,[RowIsCurrent]
          ,[RowStartDate]
          ,[RowEndDate]
        )
        VALUES
        (
           Source.[x]
          ,Source.[y]
          ,Source.[z]
          ,Source.[SourceUpdated]
          ,@LoadProcess
          ,Source.RecordSource
          ,'Y'
          ,GETDATE() -- RowStartDate
          ,NULL      -- RowEndDate
        )
         -- Output changed records
        OUTPUT
          $action AS dmlAction
          ,Source.*
      ) AS MergeOutput
      WHERE 
        MergeOutput.dmlAction = 'UPDATE' 
        AND Source.[x] IS NOT NULL;

     /* 
      * ------------------------------------------------------------------
      * SECTION 3.0
      *  Insert the updated rows into the proper table 
      * ------------------------------------------------------------------
      */ 
      INSERT INTO [dbo].[TargetData]
      (
         [x]
        ,[y]
        ,[z]
        ,[SourceUpdated]
        ,[EtlBatchId]
        ,[RecordSource]
        ,[RowIsCurrent]
        ,[RowStartDate]
        ,[RowEndDate]
      ) 
      SELECT  
         [x]
        ,[y]
        ,[z]
        ,[SourceUpdated]
        ,[EtlBatchId]
        ,[RecordSource]
        ,[RowIsCurrent]
        ,[RowStartDate]
        ,[RowEndDate]
      FROM  
        [stg].[TargetDataInserts];
  
      -- Keeping track of the load
      DECLARE @msgUpd nvarchar(100) = N'  -- Iteration # ' + CAST(@Counter AS nvarchar(3)) + N' | ' + CAST(@@ROWCOUNT AS nvarchar(10)) + N' rows were updated' 
      RAISERROR(@msgUpd, 0, 1) WITH NOWAIT;
  
    COMMIT TRANSACTION -- End of transaction
  
  SET @Counter -= 1; -- Go forward with the looping
  
  END -- End of while loop

END TRY -- End of try
 
BEGIN CATCH
   IF @@TRANCOUNT > 0 
  ROLLBACK TRANSACTION
  EXEC ctr.usp_error_handler    
  RETURN 55555
END CATCH

Output

In a small silly example it would look like this (keep in mind that it’s only the logging that is displayed but since we did not get an error the result should have been a success):

#### Start of procedure ####
 - Prepare #SourceData
 - @Counter variable set to 3
 ## Start of iteration 3 at 2019-01-05 10:34:14.
  -- Iteration # 3 | 5 rows were updated
 ## Start of iteration 2 at 2019-01-05 10:34:15.
  -- Iteration # 2 | 5 rows were updated
 ## Start of iteration 1 at 2019-01-05 10:34:16.
  -- Iteration # 1 | 5 rows were updated

//mattssok

mattssok

Leave a Reply

Your email address will not be published. Required fields are marked *