简单地说,我们现在是Redis

了解更多

处理时间序列数据与Redis的和Apache卡夫卡

了解如何通过使用Apache卡夫卡RedisTimeSeries分析这一实际演练的时间序列数据。



回到博客

RedisTimeSeries是一个Redis模块,为Redis带来了原生的时间序列数据结构。时间序列解决方案早先构建在排序集(或Redis流)之上,可以受益于RedisTimeSeries的特性,如高容量插入、低延迟读取、灵活的查询语言、降采样等等!

一般来说,时间序列数据(相对)简单。说了这么多,我们需要的其他特性的因素,以及:

  • 数据速度:例如,每秒数千个设备的数百个参数
  • 量(大数据):考虑几个月(甚至几年)的数据积累

因此,像RedisTimeSeries这样的数据库只是整个解决方案的一部分。你还需要考虑如何去做搜集(摄入),过程,发送所有的数据到RedisTimeSeries。您真正需要的是一个可伸缩的数据管道,它可以充当缓冲区,将生产者和消费者分离开来。

这就是Apache卡夫卡进来!除了核心代理,它还有一个丰富的组件生态系统,包括卡夫卡连接(这是本文提出的解决方案体系结构的一部分),多种语言的客户端库,卡夫卡流、制镜师等。

卡夫卡地图

这篇博文提供了一个如何使用RedisTimeSeries和Apache Kafka来分析时间序列数据的实际例子。

代码可以在这个GitHub库https://github.com/abhirockzz/redis-timeseries-kafka

让我们先探讨使用情况下开始。请注意,它一直保持简单的博客文章的目的,然后在后面的章节中进一步解释。

场景:设备监控

试想一下,有很多地方,他们每个人都有多台设备,以及你与专责监控设备的指标,现在我们将考虑温度和压力。这些指标将被存储在RedisTimeSeries(当然!)并使用以下命名约定键 -<指标名称>:<位置>:<设备>.例如,位置5的设备1的温度将表示为temp:5:1。每个时间序列数据点还将具有以下内容标签(键-值对)度量,位置,设备.这是为了允许灵活的查询,你将在即将到来的部分看到。

这里有几个示例,让您了解如何使用TS.ADD.命令:

3号位置设备2的温度及标签:

TS.ADD TEMP:3:2 * 20标签度量临时定位3器件2

#在位置3为压力装置2:

TS.ADD压力:3:2 * 60标签公制压力位置3装置2

解决方案架构

这是解决方案在高级的情况下的样子:

解决方案架构

让我们打破:

源(本地)组件

  • MQTT代理(mosquitto):MQTT是物联网用例的事实上的协议。我们将使用的场景是物联网和时间序列的结合——稍后详细介绍。
  • 卡夫卡连接:MQTT源连接器用于将数据从MQTT代理转移到卡夫卡群集。

Azure服务

  • Redis企业层的Azure缓存万博体育彩:企业级是基于Redis Enterprise, Redis的商业变体。万博体育彩除了RedisTimeSeries,企业层也支持RediSearchRedisBloom.客户不需要担心企业层的许可证获取问题。Redis的Azure缓存将促进这一过程,其中,客户可以通过Azure Marketplace获得并支付该软件的许可证。万博电竞客服
  • 汇合云Azure上:这是一个完全管理的产品,它将Apache Kafka作为服务提供,这要归功于从Azure到Confluent云的集成供应层。它减少了跨平台管理的负担,并提供了在Azure基础设施上使用Confluent Cloud的统一体验,从而允许您轻松地将Confluent Cloud与您的Azure应用程序集成。
  • Azure Spring云:由于Azure Spring Cloud,将Spring Boot MicroServices部署到Azure更容易。Azure Spring Cloud缓解了基础架构问题,提供了配置管理,服务发现,CI / CD集成,蓝绿色部署等。服务确实所有繁重的举重,所以开发人员可以专注于他们的代码。万博最新版本下载苹果ag万博下载

请注意,为了简单起见,有些服务是本地托管的。在生产级部署中,您也希望在Azure中运行它们。例如,你可以在Azure Kubernetes Service中操作Kafka Connect集群和MQTT连接器。

总结一下,这是端到端流程:

  • 脚本生成模拟发送到本地MQTT经纪人设备数据。
  • 该数据由MQTT Kafka Connect源连接器接收,并发送到Azure中运行的Confluent Cloud Kafka集群中的一个主题。
  • 它是由在Azure Spring云中托管的Spring启动应用程序进一步处理,然后将其持续到redis实例的Azure缓存。

是时候开始用实用的东西了!在此之前,请确保您有以下操作。

先决条件:

建立基础设施组件

遵循文件提供Azure的缓存Redis的(企业级)它是由RedisTimeSeries模块提供的。

基础设施组件

条款Azure Marketplace上的Confluent云集群.也创建一个Kafka主题(使用名称mqtt.device-stats),创建凭据(API键和秘密)您将在稍后使用稍后使用以安全地连接到群集。

卡夫卡集群

您可以配置Azure Spring云的实例使用Azure门户或者使用Azure CLI

AZ Spring-Cloud Create-N  -g  -l <​​输入位置E.g Southeastasia>
东南亚

在继续之前,请务必克隆GitHub库:

混帐克隆https://github.com/abhirockzz/redis-timeseries-kafka CD Redis的,时间序列,卡夫卡

本地服务设置

该组件包括:

MQTT代理

我安装并启动了蚊子在本地Mac上的中间人。

酿酒安装蚊子酿酒服务启动蚊子

你可以按照操作系统对应的步骤进行操作或者你可以用这个码头工人形象

Grafana

我在Mac本地安装并启动了Grafana。

冲泡安装grafana BREW服务启动grafana

你可以为你的OS做同样的或随意使用这码头工人形象

搬运工运行-d -p 3000:3000 --name = grafana -e “GF_INSTALL_PLUGINS = redis的-数据源” grafana / grafana

卡夫卡连接

你应该能够找到在回购的connect-distributed.properties文件刚刚克隆。对于如bootstrap.servers,sasl.jaas.config等属性替换的值

首先,下载并解压Apache Kafka本地。

启动本地Kafka Connect集群:

export KAFKA_INSTALL_DIR= $KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties

手动安装MQTT源连接器

  • 下载连接器/插件ZIP文件从这个链接, 和,
  • 解压缩到所连接(Connect)工人的plugin.path配置属性中列出的目录之一

如果您正在本地使用Confluent平台,只需使用Confluent Hub CLI:confluent-hub安装最新confluentinc / kafka-connect-mqtt:

创建MQTT源连接器实例

一定要检查一下MQTT - 源config.json文件。确保输入正确的主题,可kafka.topic和离开mqtt.topics不变。

curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @mqtt-source-config. xml在检查连接器状态curl http://localhost:8083/connectors/mqtt-source/status之前,等待一分钟

部署设备数据处理器应用程序

在你刚克隆的GitHub repo中,寻找application.yaml文件中的消费者/ src /资源文件夹并替换以下的值:

  • Azure缓存的Redis主机,端口和主访问键
  • 汇合云在Azure API密钥和密码

构建应用程序JAR文件:

CD消费者出口JAVA_HOME = <例如输入绝对路径/Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home> MVN清洁包

创建一个Azure Spring云应用程序,并将JAR文件部署到其中:

az spring-cloud应用程序创建- n device-data-processor - s <春天Azure云实例名称> - g <资源组名称>——运行时版本Java_11 az spring-cloud应用部署- n device-data-processor - s <春天Azure云实例名称> - g <资源组名称>——目标/ device-data-processor-0.0.1-SNAPSHOT.jar jar路径

启动模拟设备数据生成器

你可以使用你刚克隆的GitHub repo中的脚本:

。/ gen-timeseries-data.sh

注意,它是所有使用mosquitto_pubCLI命令发送数据。

数据被发送到设备统计数据MQTT主题(这是卡夫卡的话题)。您可以使用CLI订户进行双重检查:

mosquito - sub -h localhost -t device-stats

检查汇合云门户卡夫卡的话题。你也应该检查日志在春季天青云的装置数据处理器的应用:

az Spring - Cloud app logs -f -n device-data-processor -s < Azure Spring Cloud实例的名称> -g <资源组>的名称

享受Grafana仪表板!

浏览到Grafana的UIlocalhost: 3000

Grafana破折号

Grafana的Redis Data Source插件可以与任何Redis数据库一起工作,包括Redis的Azure Cache。遵循在这篇博客文章的说明配置数据源。

导入github repo中的grafana_dashboards文件夹中的仪表板(参见)Grafana文档如果您需要关于如何导入仪表盘援助)。

例如,这里是一个仪表板,显示的平均压力(超过30秒)为设备5位置1(用途TS.MRANGE)。

平均压力

这里是另一个仪表板,显示了多个设备的最高温度(超过15秒)位置3(再次,感谢Ts.mrange)。

最高温度

所以,你想运行一些redistimeseries命令吗?

卷起了redis-cli并连接到Azure缓存的Redis实例:

redis-cli -h  -p 10000 -a ——tls

先从简单的查询:

#压力在装置5为位置1 TS.GET压力:1:5#温度在装置5的位置4 TS.GET温度:4:5

按位置并获得温度和压力过滤器全部设备:

TS.MGET WITHLABELS FILTER位置= 3

在特定时间范围内的一个或多个位置中的所有设备的温度和压力提取:

TS.MRANGE  -  + WITHLABELS FILTER位置= 3 TS.MRANGE  -  + WITHLABELS FILTER位置=(3,5)

- +指的是一切从开始直到最新时间戳,但你能更具体。

MRANGE正是我们所需要的!我们还可以在某个位置使用特定的设备进行过滤,并根据温度或压力进一步下钻:

TS.MRANGE - + WITHLABELS过滤位置=3 device=2 metric=temp

所有这些都可以与聚合组合。

#所有的临时数据点都没有用。用平均值(或最大值)代替每个临时数据点怎么样?TS.MRANGE - + WITHLABELS AGGREGATION max 10000过滤位置=3 metric=temp

它也可以创建一个规则来做到这一点的聚集,并将其存储在不同的时间序列。

一旦完成,不要忘记删除资源以避免不必要的成本。

删除资源

在您的本地机器上:

  • 停止Kafka Connect集群
  • 阻止蚊子中介(例如:煮食服务阻止蚊子)
  • 停止Grafana服务(例如brew服务停止Grafana)

我们探索了数据管道使用Redis的和卡夫卡采集,处理和查询时间序列数据。当你想到接下来的步骤和向生产级解决方案的举动,你应该考虑一些事情。

额外的注意事项

降采样

优化RedisTimeSeries

  • 保留策略:想想这一点,因为你的时间序列数据点不要默认修剪或删除。
  • 采样下来和聚合规则:你不想来存储数据一辈子,对吧?确保配置适当的规则来照顾这个(例如TS.CREATERULE温度的:1:2温度:AVG:30平均聚集30000)。
  • 重复的数据策略:您希望如何处理重复的样本?确保默认策略(块)确实是您需要的。如果没有,请考虑其他选项

这不是一个详尽的清单。有关其他配置选项,请参阅RedisTimeSeries文档

什么长期数据保留?

数据是宝贵的,包括时间序列!您可能需要进一步处理它(例如运行机器学习提取洞察,预测性维护等)。为了使这成为可能,你需要保留这些数据更长的时间框架,并为此具有成本效益和效率,你会希望使用一个可扩展的对象存储服务,例如Azure数据湖存储Gen2(第二代ADLS)。

数据保留

有一个连接器!您可以通过使用完全托管的数据管道来增强现有的数据管道用于融合云的Azure数据湖存储Gen2 Sink连接器处理和存储ADL中的数据,然后运行机器学习Azure突触分析或者Azure的Databricks

可扩展性

您的时间序列数据量只能向上移动一个方向!解决方案的可扩展性至关重要:

  • 核心基础设施:托管服务允许团队专注于解决方案,而不是建立和维护基础设施,特别是涉及到复杂的分布式系统,如数据库和流媒体平台,如Redis和Kafka。
  • Kafka Connect:就数据管道而言,由于Kafka Connect平台本质上是无状态的和水平可扩展的,所以你会得到很好的处理。对于Kafka Connect工作集群的架构和大小,你有很多选择。
  • 自定义应用程序:和这个解决方案一样,我们构建了一个自定义应用程序来处理Kafka主题中的数据。幸运的是,同样的可伸缩性特征也适用于它们。就水平规模而言,它只受Kafka主题分区数量的限制。

一体化不只是格拉瓦纳!RedisTimeSeries还集成了普罗米修斯克拉夫.然而,在这篇博客中的时间没有卡夫卡写的连接器,这将是一个伟大的附加!

结论

万物的Redis

当然,您可以使用REDIS(几乎)所有内容,包括时间序列工作负载!一定要考虑数据流水线的端到端架构和从时序数据源的集成,一直到Redis及更远的方式。

Baidu