使用 SQL Server 的 MERGE 关键字进行增量派生






4.50/5 (4投票s)
本文只是我为增量派生数据仓库项目遇到的一个非常关键的实时场景,在这个场景中,我发现 SQL Server 中新加入的 MERGE 关键字极其重要。
引言
前段时间,我妈妈让我的小弟弟去杂货店买黄油。他回来后,妈妈又想起来还需要酸奶。他不得不再次跑一趟去店里买。如果妈妈当时能把事情都记清楚,一次性让他把两样东西都带回来,就能省下他的时间和路费,最重要的是避免了重复劳动。同样的情况也适用于 SQL Server 2008。
假设我们有两个数据库“TestDB”和“SampleDB”,它们都有一个名为 EMP 的表,描述如下:
CREATE TABLE [dbo].[EMP](
[EID] [int] PRIMARY KEY,
[ENAME] [varchar](20) NULL,
[DEPT] [varchar](10) NULL)
它们都包含相同的表,但数据不同。TestDB 的数据是:
SELECT * FROM EMP;
EID ENAME DEPT
1 Keshav IT
2 Rohan HR
3 Madhav Finance
SampleDB 的 EMP 表内容是:
SELECT * FROM EMP;
EID ENAME DEPT
1 Keshav HR
4 Sachin IT
将 TestDB 的 EMP 表视为源,将 SampleDB 的 EMP 表视为目标,有以下要求:
- 如果在两个表中都存在基于 EID(主键)的记录匹配,则将所有其他字段的数据与源保持同步。在此示例中,我们有一个 EID 为 1 的匹配项。由于 EID 为 1 的 DEPT 不同步,我们需要对目标 EMP 表执行一次
Update
操作,以匹配源数据。 - 如果目标表中相对于源表缺少任何记录,则将其插入。在此示例中,目标表中缺少 EID 2 和 3,因此需要在目标表中执行两次
Insert
操作。 - 如果目标表中存在源表中不存在的任何内容,则将其删除,因为我们需要使两个表完全同步。这里目标表中有一个 EID 为 4 的记录,它在源表中不存在。因此,需要在目标表上执行
Delete
操作来移除 EID 4。
按照传统方法,这些要求需要分别执行1 次 Update、2 次 Insert 和 1 次 Delete操作。但是 SQL Server 2008 包含了一个非常出色的 Merge
关键字,它可以一次性处理所有这些操作。
有关 Merge 语句的语法,请访问 MSDN。
针对上述要求的典型 MERGE 语句如下:
MERGE SampleDB.dbo.EMP AS Trgt
USING TestDB.dbo.EMP AS Src
ON Trgt.EID=Src.EID
WHEN MATCHED AND (Trgt.ENAME<>Src.ENAME OR Trgt.DEPT<>Src.DEPT) THEN
UPDATE SET Trgt.ENAME=Src.ENAME,Trgt.DEPT=Src.DEPT
WHEN NOT MATCHED THEN
INSERT VALUES (Src.EID,Src.ENAME,Src.DEPT)
WHEN NOT MATCHED BY SOURCE THEN
DELETE;
此查询将一次性执行所有必需的操作。让我们逐段解析并理解它。WHEN MATCHED
子句以及 AND
条件处理了我们的第一个要求。此子句针对 EID 1,将根据源更新 DEPT。
WHEN NOT MATCHED
子句将源中的所有记录插入到目标中,以满足第二个要求,即针对源中的 EID 2、3。最后,WHEN NOT MATCHED BY SOURCE
子句删除目标中存在但源中不存在的所有记录。
优点
- 此关键字仅加载数据一次。
- 一次性执行所有三个操作,并同步两个表。
目标(SampleDB.dbo.EMP)同步到源后的最终结果
EID ENAME DEPT
1 Keshav IT
2 Rohan HR
3 Madhav Finance
用途:增量派生
以上是对 Merge
关键字的简要介绍。本文的主要目的是介绍 Merge
关键字的一个非常重要的用途。此关键字可以极大地简化数据仓库领域中非常复杂的增量派生场景。在典型的下游数据存储系统中,应用程序数据库(DB)中的数据被抽取到数据仓库(WH)并存储,应用程序数据库和数据仓库之间会不断运行作业,以拉取这些应用程序数据库中的任何新表,并根据应用程序数据库同步数据仓库中的现有表。
Merge
关键字可以在很大程度上简化这一需求。让我们探讨一下如何实现。我将介绍一个我遇到的典型场景,然后在此基础上继续讨论。
要求
让我们来清晰地理解需求:
- 我们有一个 AppDB,托管着所有事务发生的生产表。
- 另外还有一个 WareHouseDB(位于单独的服务器上),它是该数据库的存储库。此数据库存储所有表。
- 会定期安排作业,每次作业运行时,都会与 AppDB 同步。对于任何新的源表,它会直接将整个表拉取到目标,对于现有表,作业会同步记录。
- 此外,源中的每个表都有一个字段,该字段是主键聚集索引。
- 同步基于三个基本点,这与上面的 Merge 示例中的要求精确对应。
- 如果在两个表中都存在基于主键列的记录匹配,则将所有其他字段的数据与源保持同步。
- 如果目标表中相对于源表缺少任何记录,则将其插入。
- 如果目标表中存在源表中不存在的任何内容,则将其删除,因为我们需要使两个表完全同步。
这些表具有这样的特点:所有现有表或新添加的表都以TBL_为前缀。
基于这些输入,我们需要设计一个作业来满足这些要求。
技术解决方案
根据上述要求,技术计划是根据图表创建三个存储过程。
我们需要将需求分解为三个模块:
- 创建一个名为 DataSyncController 的主存储过程。此 SP 负责调用其他两个过程。
- ObjectPull 过程将简单地连接到源 AppDB,拉取源中新创建的表,并在目标处进行存储。它将由 DataSyncController 过程针对相应的表调用。
- MergeData 过程将针对现有表执行,并根据上述要求同步它们。它将由 DataSyncController 过程针对相应的表调用。
- DataSyncController 过程将根据所需的计划由 SQL 作业调用。
三个过程中的所有组件以及 SQL 作业都将驻留在 WareHouseDB 上,并根据配置的计划以拉取机制定期工作。
将存在一个依赖关系:WareHouseDB 到 AppDB 必须具有链接服务器连接,以便能够根据计划拉取数据。
代码
1. DataSyncController 过程
让我们尝试理解此过程。该过程接收两个输入参数:ServerName
,即托管 AppDB 的源服务器名称;DatabaseName
,即我们的 AppDB。接下来,该过程尝试确定是否存在到 AppDB 服务器的链接服务器连接;如果不存在,则报告并中止。如果存在链接服务器连接,则代码将遍历所有带有“TBL_”前缀的现有表。在循环中,代码会识别该表是否是 AppDB 中的新添加项。如果是,则调用 ObjectPull 过程。如果该表已存在于 WareHouseDB 中,则查找聚集索引列名,然后调用 MergeData
过程。
CREATE PROCEDURE [dbo].[DataSyncController]
@ServerName SYSNAME,/*Source Server Name To pull The data*/
@DatabaseName SYSNAME/*Source database Name To pull The data*/
AS
BEGIN
DECLARE @SqlString NVARCHAR(4000)
DECLARE @Count INT
DECLARE @LSrv INT
DECLARE @TableName SYSNAME
DECLARE @IndexColumnName SYSNAME
/*The Index column of the Object Begin pulled*/
/*Check IF There is Linked Server Established with the Source.*/
SELECT @LSrv=COUNT(1) FROM SYS.servers WHERE name=@ServerName
IF (@LSrv=0)
/*If Linked Server Doesnt exist Print The appropriate message and exit.*/
PRINT 'There is no Linked Server established with the Server '+
@ServerName+'. Create a Linked Server and re-try.'
ELSE
/*If there Linked Server has been established get
the first TableName from source for SYNCING.*/
BEGIN
SET @SqlString = 'SELECT @TableNameOUT = MIN(NAME)
FROM ['+@ServerName+'].'+@DatabaseName+'.SYS.OBJECTS
WHERE NAME LIKE ''TBL_%'' AND TYPE_DESC=''USER_TABLE'''
EXEC SP_EXECUTESQL @SqlString,
N'@TableNameOUT SYSNAME OUTPUT',
@TableNameOUT=@TableName OUTPUT
WHILE (@TableName IS NOT NULL)
BEGIN
/*Check If the Table Exists at target, If it doesnt exists
Pull the entire table or Else Sync the Delta.*/
SET @SqlString='SELECT @CountOUT=COUNT(1)
FROM SYS.OBJECTS
WHERE NAME='''+@TableName+
''' AND TYPE_DESC=''USER_TABLE'''
EXEC SP_EXECUTESQL @SqlString,
N'@CountOUT INT OUTPUT',
@CountOUT=@Count OUTPUT
IF (@Count=0)
/*If Object doesn't exist pull the complete table with data.
This will be for the first time SYNCING.*/
EXEC ('EXEC dbo.ObjectPull '''+@ServerName+''','''+
@DatabaseName+''','''+@TableName+'''')
ELSE
/*If Object exists pull the delta part of the data.
This will ensure that only the changed data in the table is pulled.*/
BEGIN
/*Get the Clustered Index Column Name*/
SET @SqlString='SELECT @IndexColumnNameOUT=MIN(SC.NAME) FROM ['+
@ServerName+'].'+@DatabaseName+'.SYS.Columns SC INNER JOIN ['+
@ServerName+'].'+@DatabaseName+
'.SYS.Index_Columns SIC ON SIC.Column_ID' +
'=SC.Column_ID AND SIC.Object_ID=SC.Object_ID INNER JOIN ['+
@ServerName+'].'+@DatabaseName+
'.SYS.Indexes SI ON SI.Index_Id=SIC.Index_ID ' +
'WHERE SC.Object_ID=(SELECT Object_ID FROM ['+@ServerName+'].'+
@DatabaseName+'.SYS.Objects WHERE name='''+
@TableName+''')AND SI.Type=1'
PRINT @SqlString
EXEC SP_EXECUTESQL @SqlString,
N'@IndexColumnNameOUT SYSNAME OUTPUT',
@IndexColumnNameOUT=@IndexColumnName OUTPUT
/*Call The Delta Derivation Proc for Merging
the Target Data as per the Source.*/
EXEC ('EXEC dbo.MergeData '''+@ServerName+''','''+@DatabaseName+''','''+
@TableName+''','''+@IndexColumnName+'''')
END
/*Get the Next Table for SYNCING*/
SET @SqlString = 'SELECT @TableNameOUT = MIN(NAME)
FROM ['+@ServerName+'].'+@DatabaseName+'.SYS.OBJECTS
WHERE NAME LIKE ''TBL_%'' AND TYPE_DESC=''USER_TABLE'' AND NAME>'''+
@TableName+''''
EXEC SP_EXECUTESQL @SqlString,
N'@TableNameOUT SYSNAME OUTPUT',
@TableNameOUT=@TableName OUTPUT
END
END
END
2. ObjectPull 过程
此过程仅接收服务器名称、数据库名称和表名称作为输入,并根据这些信息连接到 AppDB 并拉取对象。
CREATE PROCEDURE [dbo].[ObjectPull]
@ServerName SYSNAME,/*Source Server Name for Connecting
Via Linked Server and Pulling the data.*/
@DatabaseName SYSNAME,/*Source DatabaseName*/
@ObjectName SYSNAME /*Source ObjectName*/
AS
BEGIN
/*Pull data from the Source Server*/
EXEC('SELECT * INTO '+@ObjectName+' FROM ['+@ServerName+'].'+
@DatabaseName+'.dbo.'+@ObjectName)
END
GO
3. MergeData 过程
最后,MergeData
过程接收服务器名称、数据库名称、表名称和聚集索引列名称。关于代码的一个重要注意事项是,Merge
关键字不能通过链接服务器访问任何远程表。因此,我们已将 AppDB 表数据转储到 ##Temp 表中。
代码说明:第一步是删除 tempdb 中的任何 ##Temp 表,然后将源 AppDB 数据库中的整个表数据转储到目标服务器的 tempdb 的 ##Temp 表中。然后创建一个循环来构建 @MatchedString
、@UpdateString
和 @InsertString
。
MERGE SampleDB.dbo.EMP AS Trgt
USING TestDB.dbo.EMP AS Src
ON Trgt.EID=Src.EID
WHEN MATCHED AND (Trgt.ENAME<>Src.ENAME OR Trgt.DEPT<>Src.DEPT) THEN
UPDATE SET Trgt.ENAME=Src.ENAME,Trgt.DEPT=Src.DEPT
WHEN NOT MATCHED THEN
INSERT VALUES (Src.EID,Src.ENAME,Src.DEPT)
WHEN NOT MATCHED BY SOURCE THEN
DELETE;
上面突出显示的代码显示了我们正在通过循环构建的代码类型。它是根据表中的列动态构建的,适用于任何表。一旦完成,就会动态构建 Merge 语句,将 tempdb.dbo.##Temp 作为源,将 WareHouseDB(此过程将运行的当前数据库)中的表作为目标。这将使表与源保持同步。
CREATE PROCEDURE [dbo].[MergeData]
@ServerName SYSNAME,/*Source Server Name*/
@DatabaseName SYSNAME,/*Source Server Name*/
@ObjectName SYSNAME,
@IndexColumnName SYSNAME
AS
BEGIN
DECLARE @MinCol SYSNAME
DECLARE @MinColID INT = 1
DECLARE @TempVar SYSNAME
DECLARE @MatchedString NVARCHAR(2000)
DECLARE @UpdateString NVARCHAR(2000)
DECLARE @InsertString NVARCHAR(2000)
DECLARE @SqlString NVARCHAR(2000)
IF EXISTS(SELECT 1 FROM tempdb.sys.objects WHERE name LIKE '##Temp')
DROP TABLE ##Temp
EXEC('SELECT * INTO ##Temp FROM ['+@ServerName+'].'+
@Databasename+'.dbo.'+@ObjectName)
SET @SqlString=
'SELECT @MinColOUT=MIN(NAME)
FROM TempDB.sys.columns
WHERE object_id=(
SELECT object_id
FROM TempDB.sys.objects
WHERE name Like ''##Temp%''
)AND Column_ID='+CAST(@MinColID AS VARCHAR(5))
EXEC SP_EXECUTESQL @SqlString,
N'@MinColOUT SYSNAME OUTPUT',
@MinColOUT = @MinCol OUTPUT
SET @TempVar=@MinCol
SET @MatchedString='(Trgt.'+@TempVar+'<>Src.'+@TempVar
SET @UpdateString='Trgt.'+@TempVar+'=Src.'+@TempVar
SET @InsertString='(Src.'+@TempVar
WHILE (@MinColID < = (SELECT MAX(Column_Id) FROM tempdb.sys.columns
WHERE object_id =(Select object_id from
Tempdb.sys.Objects WHERE name Like '##Temp%' )))
BEGIN
SET @MinColID +=1
SET @SqlString=
'SELECT @MinColOUT=MIN(NAME)
FROM TempDB.sys.columns
WHERE object_id=(
SELECT object_id
FROM TempDB.sys.objects
WHERE name Like ''##Temp%''
)AND Column_ID='+CAST(@MinColID AS VARCHAR(10))
EXEC SP_EXECUTESQL @SqlString,
N'@MinColOUT SYSNAME OUTPUT',
@MinColOUT = @MinCol OUTPUT
IF (@MinCol IS NOT NULL)
BEGIN
SET @TempVar=@MinCol
SET @MatchedString=@MatchedString+' OR Trgt.'+@TempVar+'<>Src.'+@TempVar
SET @UpdateString=@UpdateString+',Trgt.'+@TempVar+'=Src.'+@TempVar
SET @InsertString=@InsertString+',Src.'+@TempVar
END
END
SET @SqlString=
'MERGE INTO dbo.'+@ObjectName+' AS Trgt
USING tempdb.dbo.##Temp AS Src
ON Trgt.'+@IndexColumnName+' = Src.'+@IndexColumnName+'
WHEN MATCHED AND '+@MatchedString+') THEN
UPDATE SET '+@UpdateString+'
WHEN NOT MATCHED THEN
INSERT VALUES '+@InsertString+')
WHEN NOT MATCHED BY SOURCE THEN
DELETE;'
EXEC (@SqlString)
END
GO
4. SQL 作业
可以根据所需的计划轻松配置 SQL 作业,以便每两天或每周运行一次,具体取决于事务的频率或网络负载。此 SQL 作业将托管在 WareHouseDB 上,它将调用 DataSyncController
过程,完成完整的拉取和增量派生。这将同步 AppDB 和 WareHouseDB。
这是一个 Merge
关键字可以发挥作用的典型示例。它在增量派生(更改同步)方面做得非常出色。对于此需求可能存在不同的方法。我提出了一个非常简单、健壮且非常灵活的动态代码。希望这对读者有所帮助。