65.9K
CodeProject 正在变化。 阅读更多。
Home

在 Node.js 中限制出站请求

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2020年3月5日

CPOL

10分钟阅读

viewsIcon

20515

如何使用分布式计数器和滑动时间窗口跟踪集群中的请求。

在使用外部服务时,有时您需要确保不会过于频繁地调用某个特定的 API。在像 Web 服务器场这样的分布式环境中,这使得问题更加复杂。在这篇文章中,您将看到 RavenDB 如何使用分布式计数器和滑动时间窗口来帮助您跟踪请求。

引言

如今大多数应用程序都通过基于 HTTP 的 API 与外部服务交互。作为为公共使用托管 API 的服务提供商,通常会实施“速率限制”以保护您的服务免受用户过载。速率限制通过跟踪特定客户端在设定时间段内发送的请求数量来工作。

作为**客户端**,当您有一个需要调用此类 API 的高吞吐量应用程序时,这会带来挑战。您如何避免达到速率限制并导致应用程序中出现异常?此外,如果 API**不返回任何 HTTP 头以允许您动态跟踪请求限制,该怎么办?**我最近调用的一个服务就是这种情况。我需要一个解决方案来限制*出站请求*。

目前有一些现有的解决方案,可以在 JavaScript 中使用 `limiter` 和 `bottleneck` 等包来限制出站 API 请求。Bottleneck 特别有趣,因为它支持我们想要的集群!但是,它使用 Redis 作为后端存储,而在我的环境中我使用 RavenDB,因此我们将探讨如何使用 RavenDB 实现相同的解决方案。

在分布式环境中,由于需要在客户端之间进行请求计数,这变得更加复杂。“客户端”在这种情况下意味着虚拟机或容器上的一个进程。例如,在消息队列架构中,通常会有多个消息队列消费者。每个消费者都是一个独立的进程,可能分布在多个物理或虚拟服务器上。如果每个消费者独立处理消息,并且可能发送出站 API 请求,您如何才能**共同**限制出站 HTTP 请求?

Diagram of multiple processes sending requests to an external API being throttled to avoid a rate limit

您可能会觉得协调多个客户端的请求会涉及编写复杂的代码,您说得对!您需要一个**协调器**来跟踪请求并对其进行限制。这通常会涉及多个层,例如数据库和请求中间件。后端存储可以是 Redis(`bottleneck` 使用的)、MongoDB 或任何其他技术。

解决这个问题有很多可能的方案,在这篇文章中,我将向您展示如何使用 RavenDB 在**不到 40 行代码**中实现限流。

好奇如何在 .NET 中跨集群限制请求?请查看我之前关于同一主题的文章,其中涵盖了使用 .NET Core 的方法。

为什么选择 RavenDB?

如果您不熟悉 RavenDB,它是一个跨平台、高性能、可扩展的 NoSQL 文档数据库。乍一看,它可能与 MongoDB 相似,因为它们都是文档数据库,但深入研究一下,您很快就会发现它们的相似之处仅此而已。

我们将利用 RavenDB 的一些独特功能,它们非常适合解决这个问题:计数器文档过期

计数器专门为分布式场景设计,支持跨集群可靠地进行高频更新。我在之前的文章《RavenDB 4.2 的新特性》中简要讨论了分布式计数器。

使用计数器将使我们能够跟踪跨客户端实例的出站请求,但我们还需要跟踪跨滑动时间窗口的请求。为了实现这一点,我们将使用另一个有用的功能,文档过期。RavenDB 将跟踪文档的过期时间,并在过期后自动删除文档。

由于计数器附加到文档,将这两个功能配对使用将允许我们在特定时间窗口内跟踪请求。如果计数器在该窗口期间超出速率限制,我们可以等到 RavenDB 在文档过期后将其删除。

创建一个示例客户端

我将展示的代码示例是 Node.js 控制台应用程序的一部分。代码有一个它调用的模拟 API (`external-api`),并使用 RavenDB 跟踪请求以进行限流。

本文的代码可在 GitHub 上获取。您需要设置一个 RavenDB 实例,如果您刚开始使用,我建议使用RavenDB Cloud 创建一个免费实例。这将在几分钟内让您开始运行!

示例的 README 解释了如何设置所需的用户机密以及生成用于身份验证的客户端证书的步骤。

要将 RavenDB 添加到您的 Node 应用程序中,您可以安装 ravendb npm 包

npm install ravendb --save

创建请求客户端

在这个演示中,我们将有一个模块,它代表一个调用外部 API 的客户端。该模块导出一个发送请求的函数,设置如下:

const db = require('./db');
const externalApi = require('./external-api');

module.exports = {
    async sendRequest() {

        // fetch, we want to limit how often we
        // hit this external API
        await externalApi.fetch();
    }
}

使用文档跟踪速率限制

由于计数器存储在文档上,我们跟踪 API 请求的方式是创建一个特殊的文档,该文档将包含有关 API 请求的任何元数据,并允许我们获取和更新计数器。我们将其称为“速率限制标记”文档,或代码中的 `limiter`。

在发送请求之前,我们需要尝试加载速率限制标记。在 `sendRequest` 方法中,我们将打开一个 RavenDB 会话并尝试加载速率限制文档

module.exports = {
    async sendRequest() {

        const session = db.openSession();
        let limiter = await session.load("RateLimit/ExternalApi");

        // fetch, we want to limit how often we
        // hit this external API
        await externalApi.fetch();
    }
}

请注意 `RateLimit/ExternalApi` 的文档 ID。由于我们试图限制对 `externalApi` 的出站请求,我给了文档一个易于查找的 ID。如果我们有多个需要限制的服务,我们可以将它们并排存储在其他文档中。

此速率限制标记不会总是存在。当数据库是全新的时,文档将不存在;当我们最终启用文档过期时,RavenDB 将在文档过期时删除它。如果它不存在,我们需要创建它并将其保存回数据库。

module.exports = {
    async sendRequest() {

        const session = db.openSession();
        let limiter = await session.load("RateLimit/ExternalApi");

        if (limiter === null) {
            limiter = { id: "RateLimit/ExternalApi" };

            await session.store(limiter);
            await session.saveChanges();
        }

        // fetch, we want to limit how often we
        // hit this external API
        await externalApi.fetch();
    }
}

到目前为止,与 RavenDB 交互并没有什么新东西。通过我们自己分配 `id`,这将是 RavenDB 在存储文档时使用的文档键。对于 Node 应用程序,您也可以使用类,Raven 将其用作集合类型。对于字面量对象,您可以使用约定自定义 Raven 查找的字段。我们将实体存储起来以跟踪更改并立即保存,以便文档得以持久化。

使用 RavenDB 计数器统计请求

我们已加载(或创建)了速率限制标记文档。此文档用于存储计数器,因此我们需要使用计数器 API 来检索与该文档关联的任何计数器。

我们将从预加载 `requests` 计数器开始

module.exports = {
    async sendRequest() {

        const session = db.openSession();
        let limiter = await session.load("RateLimit/ExternalApi", {
            includes(includeBuilder) {
                return includeBuilder.includeCounter("requests");
            }
        });

        if (limiter === null) {
            limiter = { id: "RateLimit/ExternalApi" };

            await session.store(limiter);
            await session.saveChanges();
        }

        // fetch, we want to limit how often we
        // hit this external API
        await externalApi.fetch();
    }
}

通常,在检索计数器值时,RavenDB 会向数据库发送 HTTP 请求以获取计数器的当前值。但由于 `session.load` 已经加载了文档,在第二个请求中获取计数器值似乎是一种浪费。为了消除额外的往返,我们可以在load调用中使用第二个 options 参数和include传入 `includeBuilder` 的回调,您可以从 `includeBuilder` 中链式调用 includes。

**你是什么意思,“包括”?**与许多 NoSQL 解决方案不同,RavenDB 支持关系并且可以预先加载相关文档!计数器是 RavenDB 可以预先获取的另一种实体类型。

我们现在可以使用 `session.countersFor.get` API 检索计数器值。

module.exports = {
    async sendRequest() {

        const session = db.openSession();
        let limiter = await session.load("RateLimit/ExternalApi", {
            includes(includeBuilder) {
                return includeBuilder.includeCounter("requests");
            }
        });

        if (limiter === null) {
            limiter = { id: "RateLimit/ExternalApi" };

            await session.store(limiter);
            await session.saveChanges();
        }

        const limitCounters = session.countersFor(limiter);
        const requests = await limitCounters.get("requests");

        // fetch, we want to limit how often we
        // hit this external API
        await externalApi.fetch();
    }
}

只需两行代码,我们就可以检索请求计数器值。如果计数器还没有值,它将是 `null`。否则,它将返回一个 `number` 值。如果计数器值超过了我们的最大限制,我们可以中止请求!

const REQUEST_LIMIT = 30;

module.exports = {
    REQUEST_LIMIT,
    async sendRequest() {

        const session = db.openSession();
        let limiter = await session.load("RateLimit/ExternalApi", {
            includes(includeBuilder) {
                return includeBuilder.includeCounter("requests");
            }
        });

        if (limiter === null) {
            limiter = { id: "RateLimit/ExternalApi" };

            await session.store(limiter);
            await session.saveChanges();
        }

        const limitCounters = session.countersFor(limiter);
        const requests = await limitCounters.get("requests");

        if (requests !== null && requests >= REQUEST_LIMIT) {
            return; // do not send request
        }

        // fetch, we want to limit how often we
        // hit this external API
        await externalApi.fetch();
    }
}

在示例应用程序中,`REQUEST_LIMIT` 设置为 `30`。在 30 个请求之后,我们需要停止调用外部 API。

如果我们尚未达到阈值,我们才递增计数器。当您调用 `increment` 或 `decrement` 时,您还需要调用 `saveChanges` 来持久化计数器更新。RavenDB 将此视为一个事务,因此如果保存失败,计数器将不会更新。

const REQUEST_LIMIT = 30;

module.exports = {
    REQUEST_LIMIT,
    async sendRequest() {

        const session = db.openSession();
        let limiter = await session.load("RateLimit/ExternalApi", {
            includes(includeBuilder) {
                return includeBuilder.includeCounter("requests");
            }
        });

        if (limiter === null) {
            limiter = { id: "RateLimit/ExternalApi" };

            await session.store(limiter);
            await session.saveChanges();
        }

        const limitCounters = session.countersFor(limiter);
        const requests = await limitCounters.get("requests");

        if (requests !== null && requests >= REQUEST_LIMIT) {
            return; // do not send request
        }

        // increment request counter
        limitCounters.increment("requests");
        await session.saveChanges();

        // fetch, we want to limit how often we
        // hit this external API
        await externalApi.fetch();
    }
}

这就是我们跟踪请求计数并超出最大限制时中止请求所需的所有代码。在生产应用程序中,您可能会选择采取其他操作,例如指数退避等待,将执行推迟到滑动时间窗口到期,或采取其他操作。

说到滑动时间窗口,这段代码成功地阻止了我们在超出请求限制时发送请求,但一旦超出,在计数器重置之前,它将永远不再发送。为了解决这个问题,我们将添加一个时间组件,以便请求在特定窗口过期后重新开始。

使用文档过期实现滑动时间窗口

在示例应用程序中,您只能在 30 秒内发送 30 个请求。如果在任何 30 秒的时间段内达到限制,您需要等待该窗口过期才能再次尝试。

为了实现这一点,我们可以利用 RavenDB 的另一个特性:文档过期。您首先需要在数据库中启用文档过期(默认情况下禁用)。启用后,只需将特定的元数据键附加到您的文档,并带有一个 UTC 时间戳即可使其过期。

为了方便日期操作,我们将添加对 date-fns 的依赖。

const { addSeconds } = require('date-fns');
const db = require('./db'); 
const externalApi = require('./external-api');

const REQUEST_LIMIT = 30;
const SLIDING_TIME_WINDOW_IN_SECONDS = 30;

module.exports = {
    REQUEST_LIMIT,
    SLIDING_TIME_WINDOW_IN_SECONDS,
    async sendRequest() {

        const session = db.openSession();
        let limiter = await session.load("RateLimit/ExternalApi", {
            includes(includeBuilder) {
                return includeBuilder.includeCounter("requests");
            }
        });

        if (limiter === null) {
            limiter = { id: "RateLimit/ExternalApi" };

            await session.store(limiter);

            // expire document in 30 seconds
            const metadata = session.advanced.getMetadataFor(limiter);
            metadata["@expires"] = addSeconds(
                new Date(), SLIDING_TIME_WINDOW_IN_SECONDS);

            await session.saveChanges();
        }

        const limitCounters = session.countersFor(limiter);
        const requests = await limitCounters.get("requests");

        if (requests !== null && requests >= REQUEST_LIMIT) {
            return; // do not send request
        }

        // increment request counter
        limitCounters.increment("requests");
        await session.saveChanges();

        // fetch, we want to limit how often we
        // hit this external API
        await externalApi.fetch();
    }
}

再次只需两行代码,我们就增加了在一定时间后使文档过期的功能。在示例中,`SLIDING_TIME_WINDOW_IN_SECONDS` 设置为 30 秒。一旦文档过期并被删除,我们的代码将创建一个新的速率限制标记文档,该文档会将计数器重置回 `0`,从而允许它继续发出请求,直到达到限制。由于 RavenDB 会在后台自动删除过期的文档,这就像我们的滑动时间窗口一样。

**注意**: 值得一提的是,默认情况下,RavenDB 每 60 秒删除一次过期文档。当您的文档过期时,它不会立即被*删除*,直到该间隔过去。这意味着文档最多会停留 `SLIDING_TIME_WINDOW_IN_SECONDS + 60 秒`,直到它实际被删除。您可以调整此设置以满足您的需求。

跨多个实例发送请求

本文的前提是,在分布式场景中,RavenDB 将是限流请求的协调器。这个解决方案是如何实现的?

如果您在本地设置了示例,您可以启动多个实例以查看它们协同工作。这是一个快速演示,展示了多个进程正在运行。

此解决方案的局限性

此解决方案存在一些需要注意的限制,如下所示:

  • 多个进程可能会同时创建一个新的 `RateLimit` 文档。为了解决这个问题,您可以启用乐观并发控制
  • 如果请求计数器为 `N - 1`,多个进程可能会同时递增,这可能导致额外的请求,从而可能引发 API 异常(如果您的速率限制被超出)。
  • 当请求限制被超出时,程序会陷入紧密的重试循环。在生产应用程序中,最好延迟执行直到时间窗口已经过去。

这些限制可以通过更多的错误检查来解决,但在实际应用中,如果采用适当的重试逻辑和 API 异常处理,这些问题不太可能造成太大麻烦。例如,我使用消息队列Polly来处理此类分布式场景。

结论

在这篇文章中,我展示了如何通过使用 RavenDB 的两个功能——计数器和文档过期来限制出站 HTTP 请求。如果您想从零开始轻松上手 RavenDB,请查看我的 Pluralsight 课程《RavenDB 4 入门》。否则,请立即下载新版本或前往学习 RavenDB 网站

历史

  • 2020 年 3 月 5 日:初始版本
© . All rights reserved.