65.9K
CodeProject 正在变化。 阅读更多。
Home

跨 AJAX 调用的数据库事务管理

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2019 年 11 月 10 日

CPOL

10分钟阅读

viewsIcon

12456

又一个“兔子洞”,这次是关于如何处理 AJAX 调用,使其能够在任何一个 AJAX 调用失败时回滚整个事务集。

目录

引言

在我之前的长篇大论《16 天:从概念到实现的 TypeScript 应用》中,我想要添加的一个功能是将本地存储导出到服务器。我最终实现了两种方法——一种是通过简单地发送审计日志事务,另一种是通过发送所有存储中的所有数据作为本地存储的完整快照进行导入。正是这个第二个选项,我将在本文中进行更简短的探讨,因为它把我带入了处理事务性更新(意味着能够回滚所有表插入操作)的“兔子洞”,尤其是在异步 AJAX 调用场景下。

解决跨表数据导出(客户端)和导入(服务器端)问题的有几种方法。关键点在于,如果任何一个 AJAX 调用因任何原因失败,整个事务都应该被回滚。例如:

  1. 实现一个纯服务器端机制,在某个 AJAX 调用失败时回滚事务,并处理失败后的后续 AJAX 调用。
  2. 将每个表的所有数据发送到一个可能很大的 AJAX 调用中。
  3. 将每个表的数据作为同步 AJAX 调用发送,这样它们可以按顺序调用。
  4. 使用 jQuery 的 `when/then` 函数,或者如果你不希望使用 jQuery,则使用 `Promise` 同步发送每个表的数据。
  5. 再次使用 jQuery 的 `when/then` 函数,异步发送每个表的数据,并使用 jQuery 的“主 Deferred”(即主 `Promise`)来处理成功/失败。

我选择了选项 #5,因为

  1. 选项 1 实际上需要在选项 5 中实现。
  2. 选项 2 太容易了。声称 JSON 可能太大不是一个好的论点,因为单个表的数据可能非常大,而且无论“发送全部”还是“一次发送一个表”,服务器端的 JSON 最大长度都是需要考虑的问题。
  3. 选项 3 违背了 AJAX 的“A”的初衷:异步(Asynchronous)。
  4. 选项 4 再次违背了异步的初衷,因为它将请求变成了同步调用。

坦白说,我选择选项 5 是因为它是一个更具挑战性的实现。

请注意,阅读上述文章并非本文的前提,因为本文更侧重于通过具体示例来解释实现方法,而不是我的那个疯狂的 TypeScript 应用生成器。

由于我的后端没有外键(referential integrity),我不关心每个表的数据发送顺序,也不会在导入过程中关闭完整性检查。

这里没有源代码下载,主要是因为你可以从文章中复制代码粘贴,并且它没有被打包成一个库,而需要某种程度的控制反转(Inversion of Control)来实现实际的“收到请求时应该做什么”作为回调。不过,源代码可以在 这里 找到,包含整个应用程序。复制粘贴万岁!

目标

因此,目标非常简单。

客户端目标

  1. 客户端告知服务器它即将发起一系列应该包含在一个事务中的 AJAX 调用。
  2. 客户端负责确定所有 AJAX 调用是否成功,或者其中一个失败了。
  3. 成功时,客户端告知服务器提交事务。
  4. 失败时,客户端告知服务器回滚事务。客户端还会尝试取消任何待处理的 AJAX 调用。

服务器目标

从服务器的角度来看

  1. 服务器打开数据库连接并创建一个事务对象。这以用户 ID 为“键”,假设用户一次只会发起一个事务操作。
  2. 收到 AJAX 调用后,服务器处理该调用,如果发生异常则返回错误。服务器 **不** 会主动回滚事务。它当然可以,而且很可能应该这样做,但我希望从客户端请求回滚的角度来探讨客户端-服务器应用程序的行为,而不是服务器假定应该发生回滚。理想情况下,客户端可能希望尝试从失败中恢复,但这基本上是异想天开。
  3. 服务器在客户端请求时提交事务。
  4. 服务器在客户端请求时回滚事务。

这里的想法是服务器尽可能“愚蠢”。它

  1. 不知道会收到多少 AJAX 调用。
  2. 不假定如何处理异常。

客户端实现

导出方法应该很直接

  1. 发送一个 `BeginTransaction` 调用并等待其完成。原因显而易见——我们需要服务器打开数据库连接并创建 `SqlTransaction` 对象。
  2. 进行所有的 AJAX 调用。
  3. 如果所有调用都成功,则请求提交事务,否则在任何一次失败时请求回滚。

代码

public ExportAll(entities: string[]): void {
        console.log("Begin transaction");    
        jQuery.when(jQuery.post(this.UrlWithUserId("BeginTransaction"))).then(() => {
        let calls: JQueryXHR[] = [];

        entities.forEach(e => this.ExportStore(calls, e));

        // Also save the sequence store, parent-child relationship store, and audit log store.
        this.ExportStore(calls, "Sequences");
        this.ExportStore(calls, "ParentChildRelationships");
        this.ExportStore(calls, "AuditLogStore");

        jQuery.when.apply(this, calls).then(
            () => {
                console.log("Committing transaction");
                jQuery.post(this.UrlWithUserId("CommitTransaction"));
            },
            (d) => {
                console.log("Rollback: ");
                console.log(d);
                calls.forEach(c => c.abort());
                jQuery.post(this.UrlWithUserId("RollbackTransaction"));
            }
        );
    });
}

这样您就不必阅读之前的文章了

  1. `entities` 只是一个“存储”名称列表。
  2. 每个存储都包含与该存储名称对应的表相关的数据。
  3. `userId` 是包装此函数的类管理的某个内容。只需将其视为事务的唯一标识符。

实际的 AJAX 调用如下所示

private ExportStore(calls: JQueryXHR[], storeName: string): void {
    let storeData = this.storeManager.GetStoreData(storeName);
    let xhr = undefined;

    if (storeData.length > 0) {
        console.log(`Export ${storeName}`);
        xhr = jQuery.post(
            this.UrlWithUserId("ImportEntity"),
            JSON.stringify({ storeName: storeName, storeData: storeData }),
        );

        calls.push(xhr);
    }
}

请注意,这里的 `fail` 选项并未实现,尽管当然可以实现。另请注意,`array calls` 是由此方法填充的,因为我们有 `if (storeData.length > 0)` 这个语句,否则需要放在调用者中,而我希望调用者非常简单。

关于使用 jQuery 的 `when`、`then`,非常重要的是要注意 jQuery `when` 文档中的这一点(我的粗体):

在多个 Deferred 的情况下,**当其中一个 Deferred 被拒绝时**,jQuery.when() 会立即为它的主 Deferred 触发 failCallbacks。请注意,此时一些 Deferred 可能仍未解析。传递给 failCallbacks 的参数与被拒绝的 Deferred 的 failCallback 的签名匹配。如果您需要为此情况执行额外的处理,例如取消任何未完成的 Ajax 请求,您可以将底层 jqXHR 对象保留在闭包中,并在 failCallback 中对其进行检查/取消。

`apply` 用法是迭代给定函数的数组的常见做法,并且不是 jQuery 特有的。在此处阅读更多信息:链接

服务器端实现

首先,我们需要一种方法来保存事务和连接信息,因为单独的 AJAX 请求会进来,并且此存储机制需要是线程安全的。

private static ConcurrentDictionary<Guid, (SqlTransaction t, SqlConnection c)> transactions = 
   new ConcurrentDictionary<Guid, (SqlTransaction, SqlConnection)>();

路由定义如下:

router.AddRoute<RequestCommon>("POST", "/BeginTransaction", BeginTransaction, false);
router.AddRoute<RequestCommon>("POST", "/CommitTransaction", CommitTransaction, false);
router.AddRoute<RequestCommon>("POST", "/RollbackTransaction", RollbackTransaction, false);
router.AddRoute<EntityData>("POST", "/ImportEntity", ImportEntity, false);

其中我们有:

public class RequestCommon : IRequestData
{
    public Guid UserId { get; set; }
    public string StoreName { get; set; }
}

public class EntityData : RequestCommon
{
    public List<JObject> StoreData = new List<JObject>();
}

begin、commit 和 rollback 事务处理程序很简单。在此讨论中,请注意 Web 请求是在单独的线程中运行的。

Task.Run(() => ProcessContext(context));

开始事务

private static IRouteResponse BeginTransaction(RequestCommon req)
{
    var conn = OpenConnection();
    var transaction = conn.BeginTransaction();
    transactions[req.UserId] = (transaction, conn);

    return RouteResponse.OK();
}

提交事务

private static IRouteResponse CommitTransaction(RequestCommon req)
{
    transactions[req.UserId].t.Commit();
    transactions[req.UserId].c.Close();
    transactions.Remove(req.UserId, out _);

    return RouteResponse.OK();
}

回滚事务

private static IRouteResponse RollbackTransaction(RequestCommon req)
{
    // Evil!
    // Lock the whole process in case the client calls abort which gets
    // processed but there are more AJAX calls in-flight.
    lock (schemaLocker)
    {
        Console.WriteLine($"Abort {req.UserId}");
        transactions[req.UserId].t.Rollback();
        transactions[req.UserId].c.Close();
        transactions.Remove(req.UserId, out _);
    }

    return RouteResponse.OK();
}

那个锁非常重要——每个 AJAX 请求都在自己的线程中处理,并且我们不能让一个请求在客户端请求回滚时正在处理导入,因为另一个请求失败了。回滚要么需要等到导入请求完成,要么导入请求被推迟直到回滚完成。

导入表数据

所以乐趣从这里开始

private static IRouteResponse ImportEntity(EntityData entity)
{
    IRouteResponse resp = RouteResponse.OK();

    // Evil!
    // Lock the whole process in case another async call fails 
    // and the client calls abort which gets
    // processed and then more import calls are received.
    lock (schemaLocker)
    {
        if (transactions.ContainsKey(entity.UserId))
        {
            try
            {
                var transaction = transactions[entity.UserId].t;
                var conn = transactions[entity.UserId].c;
                entity.StoreData.ForEach(d => 
                   InsertRecord(conn, transaction, entity.UserId, entity.StoreName, d));
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                resp = RouteResponse.ServerError(new { Error = ex.Message });
            }
        }
    }

    return resp;
}

我们执行一个锁,它有两个目的:

  1. 如果在导入请求正在进行时发生回滚,则事务和连接不再有效,因此我们需要检查事务标识符是否仍然存在,使用 `if (transactions.ContainsKey(entity.UserId))`。
  2. 虽然在共享事务对象上从不同线程执行多个插入操作似乎是安全的,但如果没有更高级的锁定机制来确保回滚不会在单独的线程中发生,这是不可能的。

然而,这个 `lock` 语句的效果很重要——它导致 AJAX 调用按顺序执行,而不是同时执行。所以这个简单的锁定机制需要重新审视。

不幸的是,这不是一件容易的事。上面的锁确保 `SqlConnection` 对象(它似乎在线程之间共享)实际上是串行使用的,而不是并发使用的。来自 MSDN SQL:

……实际的 ado.net 公共对象(连接、命令、读取器等)不是线程安全的,因此您不能在线程之间共享某个连接、命令等的实例,除非您保证(通过同步或其他任何方式)您不会从不同线程并发访问它们。

因此,这使得在实际事务发生在不同线程时处理提交/回滚更加复杂!

插入记录

如果您好奇那个 `insert` 语句实际是什么样子,这里是:

private static void InsertRecord(
  SqlConnection conn, 
  SqlTransaction t, 
  Guid userId, 
  string storeName, 
  JObject obj)
{
    Assert.That(schema.ContainsKey(storeName), $"{storeName} is not a table in the database.");
    Assert.ThatAll(
       obj.Properties(), 
       f => schema[storeName].Contains(f.Name, ignoreCaseComparer), 
       f => $"{f.Name} is not a valid column name.");

    Dictionary<string, string> fields = new Dictionary<string, string>();
    obj.Properties().ForEach(p => fields[p.Name] = p.Value.ToString());
    string columnNames = String.Join(",", fields.Select(kvp => $"[{kvp.Key}]"));
    string paramNames = String.Join(",", fields.SelectWithIndex((kvp, idx) => $"@{idx}"));
    var sqlParams = fields.SelectWithIndex((kvp, idx) => 
                  new SqlParameter($"@{idx}", kvp.Value)).ToList();
    sqlParams.Add(new SqlParameter("@userId", userId));

    string sql = $"INSERT INTO [{storeName}] 
                 (UserId, {columnNames}) VALUES (@userId, {paramNames})";
    Execute(conn, sql, sqlParams.ToArray(), t);
}

这是一个基于我“动态生成模式”方法的专用代码(关于这一点,您必须阅读我在引言中引用的文章)。`Assert` 调用用于验证实际使用的表名和列名,以防止 SQL 注入,因为 `storeName` 和 `columnNames` 不是参数,而是作为实际 SQL 语句的一部分注入的。

结果

当一切都圆满结束时,我们看到这个:

当发生异常时,我们看到这个:

请注意,其中四个 AJAX 调用可以被取消。因为这一切都是异步的,结果会有所不同。例如,这里处理了三个 AJAX 调用并返回了异常,其中两个能够被取消。

提高锁性能

我真的不想处理 `SqlConnection` 实例中缺乏线程安全(这是有道理的)的复杂性,而且我能勉强看到的唯一解决方案是为每个线程创建一个单独的 `SqlConnection`,可能带有自己的 `SqlTransaction` 实例,然后收集这些事务并提交或回滚它们。即使 `SqlTransaction` 实例可以共享,这也意味着需要保持打开的 `SqlConnection` 直到所有 AJAX 调用都处理完毕并且准备好提交。而且**那**考虑到连接池的限制,是非常可疑的。

因此,相反,将锁移动到更好的位置,并跟踪我们是否处于回滚状态或正在处理 AJAX 调用中,这肯定会提高性能。请记住,回滚可能在另一个导入处理过程中收到,并且任何当前正在进行的导入都应尽快终止。

所以让我们从一个包装类开始(可惜,我们不能使用元组,因为它们是“值”类型)来跟踪连接、事务以及谁在使用什么计数:

public class Transaction
{
    public SqlTransaction t;
    public SqlConnection c;
    public long rollbackCount;
    public long transactionCount;

    public Transaction(SqlTransaction t, SqlConnection c)
    {
        this.t = t;
        this.c = c;
    }
}

接下来,`insert` 过程做几件事:

  1. 它以线程安全的方式递增“实体正在进行事务”计数器。
  2. 它检查回滚计数器是否非零(它只会是 0 或 1)。
  3. 锁只围绕实际的 `Insert` 调用,这在技术上将其他线程解放出来去做其他事情。考虑到任务是 CPU 绑定的,这不应该涉及线程上下文切换。

这是代码

private static IRouteResponse ImportEntity(EntityData entity)
{
    IRouteResponse resp = RouteResponse.OK();

    var tinfo = transactions[entity.UserId];
    var transaction = tinfo.t;
    var conn = tinfo.c;

    try
    {
        Interlocked.Increment(ref tinfo.transactionCount);
        Console.WriteLine($"{tinfo.transactionCount} {tinfo.rollbackCount} {tinfo.c.State}");

        for (int n = 0; n < entity.StoreData.Count && Interlocked.Read
                              (ref tinfo.rollbackCount) == 0; ++n)
        {
            lock (schemaLocker)
            {
                InsertRecord(conn, transaction, entity.UserId, 
                             entity.StoreName, entity.StoreData[n]);
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
        resp = RouteResponse.ServerError(new { Error = ex.Message });
    }
    finally
    {
        Interlocked.Decrement(ref tinfo.transactionCount);
    }

    return resp;
}

现在,回滚甚至不需要执行锁——实际上,这样做是没有效率的,因为我们希望回滚函数“信号”正在进行回滚。在这里,回滚会等待 `ImportEntity` 插入循环终止,然后再执行回滚:

private static IRouteResponse RollbackTransaction(RequestCommon req)
{
    var tinfo = transactions[req.UserId];
    Interlocked.Increment(ref tinfo.rollbackCount);

    while (Interlocked.Read(ref tinfo.transactionCount) > 0)
    {
        // Thread.Sleep(0) is evil, see some article I wrote somewhere regarding that.
        Thread.Sleep(1);
    }

    Console.WriteLine($"Abort {req.UserId}");
    transactions[req.UserId].t.Rollback();
    transactions[req.UserId].c.Close();
    transactions.Remove(req.UserId, out _);
    // No need to decrement the rollback counter as we're all done.

    return RouteResponse.OK();
}

当另一个 AJAX 导入调用导致异常时,这可以很好地立即终止一个由于大量记录而长时间运行的插入操作。

结论

归根结底,使用 .NET 的 `SqlConnection` 和 `SqlTransaction` 对象来管理跨线程的 SQL 操作事务确实没有简单的解决方案。这里展示的是一个尽可能优化的变通方法,但它依赖于 `SqlConnection` 实例使用的同步。也许最简单的方法来绕过这些问题是根本不使用这些类,而是将连接池管理放在 `using new SqlConnection` 语句的上下文之外,因为后者会在 `using` 退出时关闭连接并使事务失效。使用 OdbcConnection 类并不能真正帮助,因为基本需要的是一个单一连接,SQL 语句可以“线程安全地”流式传输到其中,这确实会序列化语句,但可以避免实现 `lock` 的所有愚蠢之处。甚至可以为 `SqlConnection` 编写这样一个类。也许我以后会研究一下,它会揭示当前实现行为的奥秘。

历史

  • 2019 年 11 月 10 日:初始版本
© . All rights reserved.