使用 Redis 和 Apache Kafka 处理时间序列数据





5.00/5 (1投票)
通过这篇实用教程,了解如何使用 RedisTimeSeries 和 Apache Kafka 分析时间序列数据。
RedisTimeSeries 是一个 Redis 模块,为 Redis 带来了原生的时间序列数据结构。以前基于 Sorted Sets(或 Redis Streams)构建的时间序列解决方案,可以从 RedisTimeSeries 的功能中受益,例如高吞吐量插入、低延迟读取、灵活的查询语言、降采样等等!
总的来说,时间序列数据是(相对)简单的。话虽如此,我们还需要考虑其他特性
- 数据速度:例如,每秒处理数千台设备的数百个指标
- 数据量(大数据):考虑数月(甚至数年)的数据累积
因此,像 RedisTimeSeries 这样的数据库只是整个解决方案的一部分。您还需要考虑如何收集(摄取)、处理和发送所有数据到 RedisTimeSeries。您真正需要的是一个可扩展的数据管道,它可以充当缓冲器,解耦生产者和消费者。
这时 Apache Kafka 就派上用场了!除了核心代理,它还有一个丰富的组件生态系统,包括 Kafka Connect(它是本博文中提出的解决方案架构的一部分)、多种语言的客户端库、Kafka Streams、Mirror Maker 等等。

这篇博文提供了一个实际示例,说明如何将 RedisTimeSeries 与 Apache Kafka 结合使用来分析时间序列数据。
代码可在 GitHub 仓库 https://github.com/abhirockzz/redis-timeseries-kafka 中找到
让我们首先探讨用例。请注意,为了方便博文的讨论,这里保持了简单化,并在后续章节中进一步解释。
场景:设备监控
想象一下,有很多地点,每个地点都有多个设备,您的任务是监控设备的指标 — 目前我们只考虑温度和压力。这些指标将存储在 RedisTimeSeries 中(当然!),并使用以下键命名约定 — <metric name>:<location>:<device>
。例如,地点 5 的设备 1 的温度将表示为 temp:5:1。每个时间序列数据点还将具有以下 标签(键值对)— metric, location, device
。这样可以实现灵活的查询,正如您在接下来的章节中将看到的。
以下是一些示例,让您了解如何使用 TS.ADD
命令添加数据点
# 地点 3 的设备 2 的温度以及标签
TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2
# 地点 3 的设备 2 的压力
TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2
解决方案架构
这是解决方案的简要概述

我们来分解一下
源(本地)组件
- MQTT 代理 (mosquitto): MQTT 是物联网用例的实际标准。我们将使用的场景是物联网和时间序列的结合 — 稍后会详细介绍。
- Kafka Connect: MQTT 源连接器 用于将数据从 MQTT 代理传输到 Kafka 集群。
Azure 服务
- Azure Cache for Redis Enterprise 级别:Enterprise 级别基于 Redis Enterprise,这是 Redis Labs 的 Redis 商业版本。除了 RedisTimeSeries,Enterprise 级别还支持 RediSearch 和 RedisBloom。客户无需担心 Enterprise 级别的许可证获取。Azure Cache for Redis 将促进这一过程,客户可以通过 Azure Marketplace 提供的产品获取并支付该软件的许可证。
- Azure 上的 Confluent Cloud:这是一个完全托管的产品,提供 Apache Kafka 作为服务,得益于 Azure 为 Confluent Cloud 提供的集成预配层。它减轻了跨平台管理的负担,并为在 Azure 基础结构上使用 Confluent Cloud 提供了统一的体验,从而让您能够轻松地将 Confluent Cloud 与您的 Azure 应用程序集成。
- Azure Spring Cloud:借助 Azure Spring Cloud,将 Spring Boot 微服务部署到 Azure 变得更加容易。Azure Spring Cloud 减轻了基础结构负担,提供了配置管理、服务发现、CI/CD 集成、蓝绿部署等功能。该服务承担了繁重的工作,因此开发人员可以专注于他们的代码。
请注意,为了简化起见,其中一些服务已本地托管。在生产级别的部署中,您也会希望在 Azure 中运行它们。例如,您可以在 Azure Kubernetes Service 中运行 Kafka Connect 集群以及 MQTT 连接器。
总而言之,这是端到端的流程
- 一个脚本生成模拟设备数据,并将其发送到本地 MQTT 代理。
- 该数据由 MQTT Kafka Connect 源连接器捕获,并发送到运行在 Azure 上的 Confluent Cloud Kafka 集群中的主题。
- 它由托管在 Azure Spring Cloud 上的 Spring Boot 应用程序进一步处理,然后持久化到 Azure Cache for Redis 实例。
现在是时候开始实际操作了!在此之前,请确保您拥有以下条件。
必备组件
- 一个 Azure 账户 — 您可以在这里免费获取一个
- 安装 Azure CLI
- JDK 11,例如 OpenJDK
- 最新版本的 Maven 和 Git
设置基础架构组件
遵循文档 预配 Azure Cache for Redis(Enterprise 级别),其中包含 RedisTimeSeries 模块。

预配 Azure Marketplace 上的 Confluent Cloud 集群。另外,创建一个 Kafka 主题(使用名称 mqtt.device-stats
)并 创建凭据(API 密钥和密钥),您稍后将使用它们来安全地连接到您的集群。

您可以使用 Azure 门户 或 使用 Azure CLI 来预配 Azure Spring Cloud 实例。
az spring-cloud create -n <name of Azure Spring Cloud service> -g <resource group name> -l <enter location e.g southeastasia>

在继续之前,请确保克隆 GitHub 仓库
git clone https://github.com/abhirockzz/redis-timeseries-kafka cd redis-timeseries-kafka
设置本地服务
组件包括
MQTT 代理
我在 Mac 上本地安装并启动了 mosquitto 代理。
brew install mosquitto brew services start mosquitto
您可以 按照对应您操作系统的步骤 操作,或者随时使用这个 Docker 镜像。
Grafana
我在 Mac 上本地安装并启动了 Grafana。
brew install grafana brew services start grafana
您可以为您的操作系统执行同样的操作,或者随时使用这个 Docker 镜像。
docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana
Kafka Connect
您应该能在您刚刚克隆的仓库中找到 connect-distributed.properties 文件。替换 bootstrap.servers、sasl.jaas.config 等属性的值。
启动本地 Kafka Connect 集群
export KAFKA_INSTALL_DIR=<kafka installation directory e.g. /home/foo/kafka_2.12-2.5.0> $KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties
- 从 此链接 下载连接器/插件 ZIP 文件,然后,
- 将其解压到 Connect 工作节点的 plugin.path 配置属性列出的目录之一。
如果您在本地使用 Confluent Platform,只需使用 Confluent Hub CLI:confluent-hub install confluentinc/kafka-connect-mqtt:latest
创建 MQTT 源连接器实例
请务必检查 mqtt-source-config.json 文件。确保为 kafka.topic
输入正确的主题名称,并保持 mqtt.topics
不变。
curl -X POST -H 'Content-Type: application/json' https://:8083/connectors -d @mqtt-source-config.json # wait for a minute before checking the connector status curl https://:8083/connectors/mqtt-source/status
部署设备数据处理器应用程序
在您刚刚克隆的 GitHub 仓库中,查找 consumer/src/resources 文件夹下的 application.yaml 文件,并替换以下值:
- Azure Cache for Redis 的主机、端口和主访问密钥
- Azure 上的 Confluent Cloud 的 API 密钥和密钥
构建应用程序 JAR 文件
cd consumer export JAVA_HOME=<enter absolute path e.g. /Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home> mvn clean package
创建 Azure Spring Cloud 应用程序,并将 JAR 文件部署到其中
az spring-cloud app create -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --runtime-version Java_11 az spring-cloud app deploy -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar
启动模拟设备数据生成器
您可以使用您刚刚克隆的 GitHub 仓库中的脚本。
./gen-timeseries-data.sh
注意 — 它所做的只是使用 mosquitto_pub
CLI 命令发送数据。
数据发送到 device-stats MQTT 主题
(这不是 Kafka 主题)。您可以使用 CLI 订阅者进行双重检查。
mosquitto_sub -h localhost -t device-stats
检查 Confluent Cloud 门户中的 Kafka 主题。您还应该 在 Azure Spring Cloud 中查看设备数据处理器应用程序的日志。
az spring-cloud app logs -f -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group>
享受 Grafana 仪表板!
在浏览器中访问 Grafana UI,地址为 localhost:3000
。

Grafana 的 Redis 数据源插件适用于任何 Redis 数据库,包括 Azure Cache for Redis。请遵循 本博文中的说明 来配置数据源。
导入您克隆的 GitHub 仓库中的 grafana_dashboards 文件夹中的仪表板(如果您需要有关如何导入仪表板的帮助,请参阅 Grafana 文档)。
例如,这是一个显示 地点 1
的 设备 5
的平均压力(30 秒内)的仪表板(使用 TS.MRANGE
)。

这是另一个显示 地点 3
中多个设备的最高温度(15 秒内)的仪表板(同样,感谢 TS.MRANGE)。

那么,您想运行一些 RedisTimeSeries 命令吗?
启动 redis-cli
并连接到 Azure Cache for Redis 实例。
redis-cli -h <azure redis hostname e.g. myredis.southeastasia.redisenterprise.cache.azure.net> -p 10000 -a <azure redis access key> --tls
从简单的查询开始
# pressure in device 5 for location 1 TS.GET pressure:1:5 # temperature in device 5 for location 4 TS.GET temp:4:5
按地点过滤,获取所有设备的温度和压力。
TS.MGET WITHLABELS FILTER location=3
提取一个或多个地点中所有设备的温度和压力,并指定时间范围。
TS.MRANGE - + WITHLABELS FILTER location=3 TS.MRANGE - + WITHLABELS FILTER location=(3,5)
– + 表示从开始到最新的时间戳之间的所有内容,但您可以更具体。
MRANGE
正是我们需要的!我们还可以按特定地点的特定设备进行过滤,并进一步按温度或压力进行细化。
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp
所有这些都可以与聚合结合使用。
# all the temp data points are not useful. how about an average (or max) instead of every temp data points? TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp TS.MRANGE - + WITHLABELS AGGREGATION max 10000 FILTER location=3 metric=temp
还可以创建规则来执行此聚合并将结果存储在另一个时间序列中。
完成后,请勿忘记删除资源以避免不必要的成本。
删除资源
- 按照 文档中的步骤 删除 Confluent Cloud 集群 — 您只需要删除 Confluent 组织。
- 同样,您也应该 删除 Azure Cache for Redis 实例。
在您的本地机器上
- 停止 Kafka Connect 集群。
- 停止 mosquito 代理(例如,brew services stop mosquito)。
- 停止 Grafana 服务(例如,brew services stop grafana)。
我们探索了一个使用 Redis 和 Kafka 来摄取、处理和查询时间序列数据的数据管道。当您考虑下一步并朝着生产级解决方案迈进时,您应该考虑更多事项。
附加考虑

优化 RedisTimeSeries
- 保留策略:请考虑这一点,因为您的时间序列数据点默认情况下不会被修剪或删除。
- 降采样和聚合 规则:您不想永远存储数据,对吧?请确保配置适当的规则来处理此问题(例如,TS.CREATERULE temp:1:2 temp:avg:30 AGGREGATION avg 30000)。
- 重复数据策略:您希望如何处理重复的样本?请确保默认策略(BLOCK)确实是您需要的。如果不是,请考虑 其他选项。
这不是一个详尽的列表。有关其他配置选项,请参阅 RedisTimeSeries 文档。
长期数据保留怎么办?
数据是宝贵的,包括时间序列!您可能希望对其进行进一步处理(例如,运行机器学习以提取洞察、预测性维护等)。为了实现这一点,您需要长期保留这些数据,为了使其具有成本效益和效率,您将希望使用可扩展的对象存储服务,例如 Azure Data Lake Storage Gen2(ADLS Gen2)。

有相应的连接器!您可以通过使用完全托管的 Confluent Cloud 的 Azure Data Lake Storage Gen2 Sink 连接器 来增强现有数据管道,以处理和存储数据到 ADLS,然后使用 Azure Synapse Analytics 或 Azure Databricks 运行机器学习。
可扩展性
您的时间序列数据量只会增加!您的解决方案必须具有可伸缩性。
- 核心基础设施:托管服务使团队能够专注于解决方案,而不是设置和维护基础设施,尤其是在处理像 Redis 和 Kafka 这样的数据库和流媒体平台等复杂分布式系统时。
- Kafka Connect:就数据管道而言,您大可放心,因为 Kafka Connect 平台本身是无状态的,并且可以水平扩展。在架构和调整 Kafka Connect 工作节点集群方面,您有很多选择。
- 自定义应用程序:正如本解决方案中的情况一样,我们构建了一个自定义应用程序来处理 Kafka 主题中的数据。幸运的是,它们也适用相同的可伸缩性特性。在水平扩展方面,其限制仅在于 Kafka 主题分区的数量。
集成:不只是 Grafana!RedisTimeSeries 还与 Prometheus 和 Telegraf 集成。但是,在撰写本博文时,尚无 Kafka 连接器 — 这将是一个很棒的附加功能!
结论

当然,您可以使用 Redis 来(几乎)完成所有事情,包括时间序列工作负载!务必考虑数据管道的端到端架构以及从时间序列数据源到 Redis 及更远的数据集成。