如何将多个长时间运行的 SQL 语句分块异步执行






4.90/5 (25投票s)
如何将多个长时间运行的 SQL 语句分块异步执行。
引言
在某些情况下,在处理长期运行的查询时异步完成任务非常有帮助。它还可以确保硬件资源的最佳利用。对于像 C# 或 Java 这样的非基于集合的现代高级编程语言,它们具有异步编程的通用功能、库和模式。但是,对于像 SQL 这样的基于集合的语言呢?没有直接的方法可以并行执行 SQL 语句。在 SQL Server 中,有一些方法可以做到这一点,例如使用 SQL Server Service Broker 或通过 CLR 存储过程。Service Broker 实际上是一个发送和接收消息的过程,这些消息可以发送到同一数据库或另一个 SQL Server 实例上的任何远程数据库。而 CLR 需要不同的编程专业知识,并且还有一些部署问题。今天,我将向您展示使用 SQL Server Agent 作业的相同实现。在本文中,您还将了解大量长期运行的 SQL 语句如何分批执行成一些较小的可配置块。
背景
在我最近的数据仓库项目中,客户要求根据一系列预定义的规则来验证表数据。该表有 150 多列。每列的数据都需要根据这些规则进行验证。让我们熟悉其中的一些规则。
序号 |
Column |
规则 |
1 |
客户出生日期 |
出生日期必须是有效日期 |
出生日期必须小于购买日期。 |
||
日期格式应为 MM-DD-YYYY |
||
2 |
信用卡号 |
卡号不得为空值 |
应为有效卡号 |
||
应为有效卡类型 |
||
有效期必须在 1997 年到 2020 年之间 |
表 1:每列的规则。
此外,另一个要求是将垃圾数据保存在另一个表中以供报告,以便客户可以查看垃圾数据并进行修复。为了完成这项工作,我们专门为每个规则创建了存储过程。因此,我们创建了大约 35 个不同的存储过程来验证客户表数据。客户通常通过文本文件发送客户数据给我们,我们将其批量加载到暂存表中。但我们的主要挑战是通过对每列运行平均 4 条规则(4 个存储过程)来验证所有 150 列的数据。我们的客户表包含超过 300 万条记录,每个验证规则存储过程都需要相当长的时间来选择垃圾值并将其保存到错误日志表中。在这种情况下,并行运行这些存储过程,一次运行 10-12 个过程(一个批次)是我们完成工作的绝佳解决方案。好了,不多说了,我们开始实际实现吧。
元数据管理
正如我之前所说,我们将使用 SQL Server Agent 作业来异步运行我们的存储过程。所以在运行时,您对正在运行的存储过程或语句的控制将非常有限。在这种情况下,我们使用了一个元数据表,该表存储存储过程的执行状态,例如它们何时开始和结束,或在执行期间发生的任何错误消息。表创建脚本将是:
CREATE TABLE AsyncProcessStatusLog
(
Id INT IDENTITY(1, 1) NOT NULL PRIMARY KEY,
StoredProcedureName VARCHAR(100) NOT NULL,
StartTime DATETIME NOT NULL,
EndTime DATETIME NULL,
ProcessStatus CHAR(1) NOT NULL
CHECK (([ProcessStatus]='F' OR [ProcessStatus]='S' OR [ProcessStatus]='R')),
/*------- R=Running S=Success F=Fail */
ErrorMessage VARCHAR(2000) NULL
)
进程状态:
- "R" 表示过程正在运行。
- "F" 表示过程失败并发生异常。
- "S" 表示过程已成功执行。
之后,我们需要创建一个名为“InsertUpdate_AsyncProcessStatusLog_usp”的存储过程,该过程会将状态插入/更新到“AsyncProcessStatusLog”表中。
CREATE PROCEDURE InsertUpdate_AsyncProcessStatusLog_usp
@StoredProcedureName VARCHAR(100),
@ProcessStatus CHAR(1),
@ErrorMessage VARCHAR(2000)
AS
BEGIN
DECLARE @now DATETIME = GETDATE()
IF NOT EXISTS (
SELECT 1
FROM AsyncProcessStatusLog
WHERE StoredProcedureName = @StoredProcedureName
)
INSERT INTO AsyncProcessStatusLog
(
StoredProcedureName,
StartTime,
EndTime,
ProcessStatus
)
VALUES
(
@StoredProcedureName,
@now, --Procedure start executing
NULL,
'R' --We know this status is "Running" at this stage.
)
ELSE
UPDATE AsyncProcessStatusLog
SET EndTime = @now, --Execution fnishing time.
ProcessStatus = @ProcessStatus,--Process Status (F or S will come here)
ErrorMessage = @ErrorMessage
WHERE StoredProcedureName = @StoredProcedureName
END
如何编写用于并行执行的存储过程/语句
现在,让我准备一些将并行运行的示例存储过程。创建它们时需要遵循一些约定。请看下面。
CREATE PROCEDURE CheckValidDate_usp
--Parameter goes here
AS
BEGIN
BEGIN TRY
--Saving this SP status to metadata table that it is in Running state.
EXEC InsertUpdate_AsyncProcessStatusLog_usp @StoredProcedureName =
'CheckValidDate_usp',
@ProcessStatus = 'R',
@ErrorMessage = NULL
------------------------------------------
--Details Implementation (Business Logic)
--Details Implementation (Business Logic)
--Details Implementation (Business Logic)
WAITFOR DELAY '00:00:10'
-------------------------------------------
--Saving this SP status to metadata table that it has successfully finished the task.
EXEC InsertUpdate_AsyncProcessStatusLog_usp @StoredProcedureName =
'CheckValidDate_usp',
@ProcessStatus = 'S',
@ErrorMessage = NULL
END TRY
BEGIN CATCH
--Oh!! some error occured and keeping this information here.
DECLARE @ErrorMsg VARCHAR(2000) = ERROR_MESSAGE()
EXEC InsertUpdate_AsyncProcessStatusLog_usp @StoredProcedureName =
'CheckValidDate_usp',
@ProcessStatus = 'F',
@ErrorMessage = @ErrorMsg
END CATCH
END
通过执行“InsertUpdate_AsyncProcessStatusLog_usp”存储过程来维护存储过程执行状态。
通过最顶层的语句设置初始**运行**状态(@ProcessStatus ='R')。
通过放置在 END TRY 块正上方的语句使用**成功**状态(@ProcessStatus ='S')进行更新。
通过 CATCH 块内的语句并带有附加参数 @ErrorMessage 使用**失败**状态(@ProcessStatus = 'F')进行更新。
· 业务逻辑块:
此块通常包含满足业务逻辑的主要 TSQL 代码。我在这里使用了 WAITFOR DELAY '00:00:10' 以便演示存储过程需要 10 秒才能完成(在实际情况中,这将替换为实际的业务 TSQL 逻辑)。
示例存储过程
我按照上述约定为每个规则编写了 7 个存储过程。请参阅附带的 TestStoredProcedures.sql 文件(解压 AsyncSQLScripts.zip 后)。
序号 |
规则 |
关联的存储过程 |
1 |
出生日期必须是有效日期 |
CheckValidDate_usp |
2 |
出生日期必须大于购买日期。 |
CheckDateGreaterThanAnotherDate_usp |
3 |
日期格式应为 MM/DD/YYYY |
CheckValidDateFormat_usp |
4 |
卡号不得为空值 |
CheckIsRequired_usp |
5 |
应为有效卡号 |
CheckValidCreditCardNumber_usp |
6 |
应为有效卡类型 |
CheckCardTypeValid_usp |
7 |
有效期必须在 1997 年到 2020 年之间 |
CheckIsDateInValidRange_usp |
表 2:按规则划分的存储过程。
主要亮点!通过 Agent Job 异步执行存储过程的脚本
我们已经准备好了存储过程和元数据表。现在,让我们来看看主要亮点,即脚本,它将在动态创建 SQL Agent Job 步骤并并行运行我们的存储过程。
CREATE PROCEDURE ExecuteSQL_ByAgentJob_usp(
@SqlStatemet VARCHAR(4000),
@SPNameOrStmntTitle VARCHAR(100),
@JobRunningUser VARCHAR(100) = NULL,
@JobIdOut UNIQUEIDENTIFIER OUTPUT
)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @JobId UNIQUEIDENTIFIER,
@JobName VARCHAR(250) = NULL,
@DBName VARCHAR(100) = DB_NAME(),
@ServerName VARCHAR(100) = @@SERVERNAME
--Creating Unique Job Name by combining @SPNameOrStmntTitle and a GUID.
SET @JobName = @SPNameOrStmntTitle + '_' + CONVERT(VARCHAR(64), NEWID())
--Currently logged user name will be used to execute the job if not provided one.
IF @JobRunningUser IS NULL
SET @JobRunningUser = SUSER_NAME()
--Adds a new job executed by the SQLServerAgent service
EXECUTE msdb..sp_add_job @job_name = @JobName, @owner_login_name = @JobRunningUser,
@job_id = @JobId OUTPUT
--Targets the specified job at the specified server
EXECUTE msdb..sp_add_jobserver @job_id = @JobId, @server_name = @ServerName
--Tell job for its about its first step.
EXECUTE msdb..sp_add_jobstep @job_id = @JobId, @step_name = 'Step1', @command
= @SqlStatemet,@database_name = @DBName, @on_success_action = 3
--Preparing the command to delete the job immediately after executing the statements
DECLARE @sql VARCHAR(250) = 'execute msdb..sp_delete_job @job_name=''' + @JobName + ''''
EXECUTE msdb..sp_add_jobstep @job_id = @JobId, @step_name = 'Step2', @command = @sql
--Run the job
EXECUTE msdb..sp_start_job @job_id = @JobId
--Return the Job via output param.
SET @JobIdOut = @JobId
END
主要逻辑取自这里
该存储过程的工作原理如下:
- 创建一个具有唯一名称的作业,并将其注册到服务器。该作业有两个步骤。
- 在步骤 1 中:作业执行提供的 SQL 语句。
- 在步骤 2 中:作业执行命令删除自身。
- 该过程还返回 JobId 以供进一步参考。
将查询分解成小块
现在我们已经准备好异步运行存储过程了。但是如何将其分批运行呢?现在,我将谈谈将资源密集型任务分批执行的好处。
- 如果我们运行 100 个并行任务,服务器很有可能因“系统内存不足,无法运行此查询”等错误而耗尽资源。
- 如果我们必须强制停止查询执行,则只会停止当前批次,并确保不再处理。
我们总共有 7 个存储过程,我们将一次运行 3 个过程。在这种情况下,批次执行场景将如下所示:
序号 |
存储过程 |
批次 # |
1 |
CheckValidDate_usp |
1 |
2 |
CheckDateGreaterThanAnotherDate_usp |
|
3 |
CheckValidDateFormat_usp |
|
4 |
CheckIsRequired_usp |
2 |
5 |
CheckValidCreditCardNumber_usp |
|
6 |
CheckCardTypeValid_usp |
|
7 |
CheckIsDateInValidRange_usp |
3 |
表 3:按批次划分的脚本分布。
这个批次大小实际上取决于具体情况,所以我建议在实际生活中,最好将这个值放在配置表中,以便随时更改。
异步运行 SP 的安排
为了异步运行查询,我们需要注意以下安排。
- 在临时表或表变量中准备并存放所有查询/存储过程(带必需的参数)。
- 使用“ExecuteSQL_ByAgentJob_usp” SP 和上一步准备的动态语句执行第一个批次,等待第一个批次完成,然后运行后续批次。
- 通过查询元数据表“AsyncProcessStatusLog”来监视脚本。
- 在运行时向用户通知任何异常情况。
现在让我们详细讨论每个步骤。
CREATE PROCEDURE Customer_Validator_usp
--Any parameter goes here
AS
BEGIN
SET NOCOUNT ON;
---Cleanup previous metadata
---Just for testing purpose. Think about your won implementation style.
DELETE FROM AsyncProcessStatusLog
-----------------PART1------------------------------------
DECLARE @Scripts TABLE (
Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
Script NVARCHAR(4000)
)
INSERT INTO @Scripts
SELECT 'EXEC CheckValidDate_usp'--Additional parameters
UNION ALL
SELECT 'EXEC CheckDateGreaterThanAnotherDate_usp'
UNION ALL
SELECT 'EXEC CheckValidDateFormat_usp'
UNION ALL
SELECT 'EXEC CheckIsRequired_usp'
UNION ALL
SELECT 'EXEC CheckValidCreditCardNumber_usp'
UNION ALL
SELECT 'EXEC CheckCardTypeValid_usp'
UNION ALL
SELECT 'EXEC CheckIsDateInValidRange_usp'
-----------------PART2------------------------------------
DECLARE @sql NVARCHAR(4000),
@Itr INT,
@RecCount INT,
@ScriptTitle VARCHAR(200),
@JobId UNIQUEIDENTIFIER
DECLARE @TotalScriptSentToJob INT = 0,
--Hard code here.Should be configurable.
@MaxValidatonProcedureToRunAtaTime INT = 3,
@IsChunkProcessing BIT
SET @Itr = 1 --Seeting the initial value.
SET @RecCount = (
SELECT COUNT(*)
FROM @Scripts
)
-----------------PART3------------------------------------
WHILE (@Itr <= @RecCount)
BEGIN
SELECT @sql = t.Script
FROM @Scripts t
WHERE id = @Itr
--Just o identify the script name getting first 10 char of the SP
SET @ScriptTitle = LEFT(REPLACE(@sql, 'EXEC ', ''), 10)
EXEC ExecuteSQL_ByAgentJob_usp
@SqlStatemet = @sql,
@SPNameOrStmntTitle = @ScriptTitle,
@JobRunningUser = 'sa',
@JobIdOut = @JobId OUTPUT
-----------------PART4------------------------------------
SET @TotalScriptSentToJob = @TotalScriptSentToJob + 1
--Wait for some seconds until send the next procedure to job.
--It may take some time to initialize the job and write to metadata table.
IF (@TotalScriptSentToJob = @MaxValidatonProcedureToRunAtaTime)
BEGIN
SET @TotalScriptSentToJob = 0
SET @IsChunkProcessing = 1
END
-----------------PART5------------------------------------
IF (@IsChunkProcessing = 1)
BEGIN
DECLARE @Result INT
EXEC @Result = Wait_Unitl_Chunk_ToBe_Finished_usp
PRINT 'I am waiting the chunk to be finished.'
IF (@Result = -1)
BEGIN
RAISERROR ('Exception occured in some srored procedure.', 17, 2)
RETURN -1
END
ELSE
BEGIN
SET @IsChunkProcessing = 0
END
END
---Cleanup section------
SET @sql = ''
SET @Itr = @Itr + 1
END
-----------------PART6------------------------------------
--Wait for the last chunk to be finished.
EXEC @Result = Wait_Unitl_Chunk_ToBe_Finished_usp
PRINT 'I am waiting the last chunk to be finished.'
IF (@Result = -1)
BEGIN
RAISERROR ('Exception occured in some srored procedure.', 17, 2)
RETURN -1
END
--Allow other scripts to run.
PRINT 'I am finished!'
END
第一部分:准备脚本并将其存储在表变量中
脚本以直接的方式保存在表变量中,以减少复杂性。但在实际情况中,请找出您自己准备动态脚本及其关联参数并将其存储到表变量中的合适方法。
第二部分:变量声明部分
这里声明了所需的变量。并非所有变量都需要在此处描述,但我想简要介绍一下“@TotalScriptSentToJob”和“@MaxValidatonProcedureToRunAtaTime”变量。“@MaxValidatonProcedureToRunAtaTime”变量存储一次将运行多少个存储过程(批次大小)的值,而“@TotalScriptSentToJob”则保存已发送到 SQL Agent 进行异步处理的存储过程的数量的增量值。“@IsChunkProcessing”的目的是决定系统是否进入等待状态,直到当前批次完成,以停止处理任何其他查询。
第三部分:WHILE 循环的开始
准备作业名称,并将命令发送到 SQL Agent 作业以进行并行处理。
第四部分:决定进入等待状态
在将脚本发送到 Agent Job 后,“@TotalScriptSentToJob”的值将增加,并与“@MaxValidatonProcedureToRunAtaTime”进行比较,以确定是否已发送了最大数量的存储过程(针对一个批次)。如果已发送最大数量的存储过程,则将“@TotalScriptSentToJob”重新初始化为“0”,将“@IsChunkProcessing”设置为“1”。
第五部分:等待状态:
如果“@IsChunkProcessing =1”,系统将进入等待状态,直到当前异步运行的存储过程完成。本文章后面将详细介绍系统如何在完成一个批次后进入等待状态。此部分还将在运行时发生任何错误时终止整个过程。
第六部分:当最后一个批次值始终小于实际批次时处于等待状态
最后,系统会再次检查是否有任何正在运行/活动的语句,并等待它们完成。这种情况发生在最后一个批次的值始终小于实际批次值时。
为了清楚地了解这一点,请看一下表 3(按批次划分的脚本分布),其中我们总共有 7 个存储过程,并将其分为 3 个批次,如 3+3+1。在最后一个批次中,只有一个过程会运行。在这种情况下,以下代码块永远不会满足这种情况。
IF (@TotalScriptSentToJob >= @MaxValidatonProcedureToRunAtaTime)
因为“@TotalScriptSentToJob”的值将是“1”,而我们的初始批次大小设置为 3(@MaxValidatonProcedureToRunAtaTime=3)。因此,“@IsChunkProcessing”的值永远不会是“1”。因此,WHILE 循环内的这个批次不会发生等待状态。为了解决这个问题,在 WHILE 循环的 END 块之后,再次使用了“EXEC @Result = Wait_Unitl_Chunk_ToBe_Finished_usp”语句。系统如何维护完成批次后的等待状态
系统通过执行“EXEC @Result = Wait_Unitl_Chunk_ToBe_Finished_usp”进入等待状态。这个过程运行一个无限循环,通过查询“AsyncProcessStatusLog”元数据表来检查该批次下正在运行的每个过程的状态。
让我们看一下实现:
CREATE PROCEDURE Wait_Unitl_Chunk_ToBe_Finished_usp
AS
BEGIN
WHILE (1 = 1)
BEGIN
WAITFOR DELAY '00:00:3'
IF EXISTS(
SELECT 1
FROM AsyncProcessStatusLog aps
WHERE aps.ProcessStatus = 'F'
)
RETURN -1
IF NOT EXISTS(
SELECT 1
FROM AsyncProcessStatusLog aps
WHERE aps.ProcessStatus = 'R'
)
RETURN 1
END
END
您可能还记得在“**如何编写存储过程/语句**”部分,当每个过程开始运行时,它会以初始状态(ProcessStatus)“R”(表示“运行”)在元数据表中写入一行。对于任何异常,状态将更新为“F”(表示“失败”)。如果过程成功运行,状态将变为“S”(表示“成功”)。当作业中没有活动(运行中)的查询时,无限循环将中断。在正常情况下,该过程返回 1,如果发生任何异常,则返回 -1。
使用代码
到目前为止,我们对架构有了足够的了解。现在,让我们运行脚本,看看查询是如何逐批并行运行的。
- 请确保 SQL Agent 服务正在运行。
- 解压 AsyncSQLScripts.zip 并执行 DB_Metadata_Table.sql。这将创建一个带有元数据表的测试数据库。之后,依次执行 TestStoredProcedures.sql、ExecuteSQL_ByAgentJob_usp.sql、Wait_Unitl_Chunk_ToBe_Finished_usp.sql。
- 最后执行 Customer_Validator_usp.sql。现在我们可以测试我们的代码了。关闭 SSMS 的所有打开的窗口。
- 打开两个新的空白查询选项卡。在相应的选项卡中输入以下脚本。
查询选项卡 1
USE TestAyncDB
GO
EXEC Customer_Validator_usp
查询选项卡 2
SELECT apsl.StoredProcedureName,
apsl.StartTime,
apsl.EndTime,
DATEDIFF(second, apsl.StartTime, apsl.EndTime) AS ElapsedSeconds
FROM AsyncProcessStatusLog apsl
ORDER BY
apsl.StartTime
- 转到“窗口”菜单,然后按“新建垂直选项卡组”,以便并排查看窗口。
- 现在执行查询选项卡 1 的脚本(“EXEC Customer_Validator_usp”),然后快速切换到查询选项卡 2,并在几秒钟后连续按 **F5**。
- 您将看到 3 个存储过程(第一个批次)同时(并行)运行,如下所示:
下图显示第二个批次已启动,一个存储过程已完成,其他两个过程正在并行运行。
下一个图像显示所有存储过程已在 3 个单独的批次中异步完成运行。
- 现在回到查询选项卡 1,查看结果窗格中的“消息”选项卡。您会找到类似如下的内容。
系统在执行每个批次(3+3+1)后进入等待状态。
实际效益
如果我们同步运行查询(一个接一个),第一个存储过程将等待第二个过程完成。在这种情况下,完成时间将为 1 分 10 秒(因为我们在存储过程中设置了几个 WAITFOR DELAY)。
存储过程 |
WAITFOR DELAY (秒) |
CheckValidDate_usp |
10 |
CheckDateGreaterThanAnotherDate_usp |
10 |
CheckValidDateFormat_usp |
15 |
CheckIsRequired_usp |
5 |
CheckValidCreditCardNumber_usp |
10 |
CheckCardTypeValid_usp |
10 |
CheckIsDateInValidRange_usp |
10 |
总计 |
70 秒 |
表 4:存储过程内的 WAITFOR DELAY 设置。
现在,让我们运行另一个查询来确定完成所有 7 个存储过程所需的异步模式时间。
SELECT MIN(apsl.StartTime) MinStartTime,
MAX(apsl.EndTime) MaxEndTime,
DATEDIFF(second, MIN(apsl.StartTime), MAX(apsl.EndTime)) AS
ElapsedSeconds
FROM AsyncProcessStatusLog apsl
输出将是:
MinStartTime |
MaxEndTIme |
ElapsedSeconds |
2013-02-05 12:00:58.287 |
2103-02-05 12:01:38.283 |
40 |
仅用了 40 秒就完成了。因此,我们节省了 30 秒,并且性能提高了近 42%。
从实际实现中吸取的教训
- 请务必注意存储过程代码中的以下 2 种异常,在并行执行时应予以考虑。因为 SQL Server 的 **TRY CATCH** 块不处理这类异常。
- 编译错误,例如阻止批次执行的语法错误。
- 在语句级重新编译期间发生的错误,例如延迟名称解析后发生的对象名称解析错误。
在这种情况下,存储过程将永远处于运行("R")状态,并且永远不会改变,因此整个过程将在等待状态下挂起。您必须手动终止该过程才能打破这种情况。
- 请在“Wait_Unitl_Chunk_ToBe_Finished_usp”存储过程中添加额外的代码块,以便在定义的超时后中断循环。因为如果系统因为长时间运行的查询或上述任何未识别的系统异常而进入长时间等待状态,这样其他查询就可以获得执行的机会或资源。
- 定期查看 SQL Agent 作业历史记录并清理任何未删除的作业(用于异常)。ExecuteSQL_ByAgentJob_usp 存储过程将 JobId 作为输出参数,您可以将其保存到元数据表中以供将来参考,以识别哪个作业发生异常并自动删除它们。
结论
异步执行长期运行的查询不仅可以节省时间,还可以确保 CPU 和内存的最大利用率。最重要的是,不一次性运行所有查询,而是将其分批运行,也可以防止查询占用所有系统资源,从而避免查询因资源不足而受影响。因此,整个系统以一种受管理和优化的方式运行。