消息传递

什么是消息代理?

现代软件应用程万博电竞客服序已经从一个单一的整体单元转变为松散耦合的服务集合。虽然这个新的体系结构带来了许多好处,但这些服务仍然需要相互交互,因此需要健壮和高效的消息传递解决方案。

复述,流作为构建流架构的通信通道和用于持久化数据的类似日志的数据结构,使Streams成为事件来源的完美解决方案。

复述,Pub / Sub是一种非常轻量级的消息传递协议,设计用于在系统中直播通知。当低延迟和大吞吐量至关重要时,它是传播短时间消息的理想选择。

Redis列表和Redis排序集是实现消息队列的基础。它们既可以直接用于构建定制的解决方案,也可以通过使消息处理更适合您所选择的编程语言的框架来使用。

构建消息代理解决方案时遇到的挑战

1.服务之间的通信必须是可靠的

当一个服务想要与另一个服务通信时,它不能总是立即这样做。当出现故障时,独立的部署可能会使服务在一段时间内不可用。对于大规模的应用程序,服务变得不可用不是“如果”或“何时”的问题,而是频率的问题。为了缓解这个问题,最佳实践是限制服务之间的同步通信数量(例如,直接调用服务的api,例如通过发送HTTP(S)请求),而在实际情况下选择持久通道,这样服务就可以方便地使用消息。这类异步通信的两个主要范例是事件流和消息队列。

消息队列

  1. 消息队列基于可变列表,有时通过帮助实现公共模式的工具使用。消息队列和事件流之间有两个主要区别:消息队列使用“推送”类型的通信——当有新消息需要关注时,服务将新消息推送到另一个服务的收件箱。流以相反的方式运行。
  2. 消息包含可变状态(例如,重试次数),当成功处理时,它们将从系统中删除。流事件是不可变的,并且历史记录在被裁剪后通常被保存在冷库中。

Redis列表和排序集实现这种类型行为的两种数据类型都可以用来构建定制的解决方案,以及生态系统特定框架的后端吗芹菜(Python),(JavaScript),Sidekiq(Ruby),机械(去),还有许多其他的。

事件流

事件流基于日志数据类型,这在查找其历史记录和将新项追加到其末尾方面非常有效。这两个属性使不可变日志成为一种很好的通信原语,也是一种有效的数据存储方式。

通过流进行通信与使用消息队列不同。如前所述,消息队列是“push”,而流是“pull”。在实践中,这意味着每个服务写入自己的流,而其他服务将有选择地观察(即“拉”)它。这使得一对多通信比消息队列高效得多。

当一个服务需要另一个服务执行操作时,消息队列工作得最好。在这种情况下,第二个服务的消息队列充当“请求收件箱”。当服务需要发布事件(即多个服务感兴趣的消息)时,发布服务将需要将消息推到与该事件相关的每个服务的队列中。在实践中,大多数工具(例如企业服务总线)可以透明地完成这一工作,但是为每个接收方生成和存储单独的消息副本仍然效率低下。

在一对多通信模式中,事件流通过反向协议优于消息队列:只有原始事件的一个副本存在,任何想要访问它的服务都可以按照自己的速度通过事件流(即发布服务的流)查找。与消息队列相比,事件流还有另一个实际优势:您不需要预先指定事件订阅者。在消息队列中,系统需要知道向哪个队列交付事件的副本,因此如果稍后添加新服务,它将只接收新事件。有了事件流,这个问题就不存在了——甚至可以创建一个新的服务来查看完整的事件历史,这对于添加新的分析并仍然能够追溯计算它们来说非常棒。这意味着你不需要立即想出未来可能需要的每一个指标。你可以跟踪你现在需要的那些,并在你去的时候添加更多,因为你知道你仍然能够看到完整的历史,即使是在以后添加的那些。

2.存储空间必须有效

对于所有持久化消息的通信通道来说,空间效率是一个值得欢迎的特性。然而,对于事件流来说,这是基本的,因为它们通常用于长期的信息存储。(我们在前面提到过,不可变日志在追加新条目和查找历史记录方面速度很快。)

复述,流是使用基数树作为底层数据结构的不可变日志的实现。每个流条目都由时间戳标识,并且可以包含一组任意的字段值对。同一个流的条目可以有不同的字段,但是Redis能够压缩共享同一模式的多个事件。这意味着,如果您的事件有稳定的字段集,那么您就不必为每个字段名支付存储费用,这样就可以使用更长的、更具描述性的键名,而不会有任何缺点。

如前所述,可以对流进行调整以删除旧的条目,删除的历史通常以存档格式保存。Redis Streams的另一个特性是能够将任何中游条目标记为“已删除”,以帮助遵守GDPR等法规。

缩放处理吞吐量

事件流和消息队列有助于应对通信突发。但是,直接API调用的另一个问题是,当流量达到峰值时,服务可能会被淹没。异步通信通道可以充当缓冲区,帮助消除峰值,但处理吞吐量必须足够健壮,以维持正常的流量,否则系统将崩溃,而缓冲区将需要无限地增长。

在Redis Streams中,可以通过通过一个消费者组读取流来增加处理吞吐量。属于同一消费组的读者以互斥的方式查看消息。当然,一个流可以有多个消费者组。在实践中,您可能希望为每个服务创建一个单独的消费者组,这样每个服务就可以根据需要启动多个读取器实例来增加并行性。

3.消息传递语义必须清晰

当进行异步通信时,考虑可能的失败场景是很重要的。例如,在处理消息时,服务实例可能崩溃或失去连接。由于通信失败是不可避免的,消息传递系统将自己分为两类:至多一次“至少一次”交付。(一些消息传递系统声称只提供一次传递,但这并不是全部。在任何可靠的消息传递系统中,为了克服故障,消息偶尔需要传递不止一次。这是在不可靠网络上进行通信时不可避免的特点。

为了正确处理故障,参与系统的所有服务必须能够执行幂等消息处理。“幂等”意味着系统的状态在重复消息传递的情况下不会改变。幂等性通常通过应用任何必要的状态更改和保存以原子方式处理的最后一条消息(例如,在一个事务中)来实现。这样,在发生故障时,故障将永远不会保持不一致的状态,并且通过检查新消息标识符是否在最后处理的消息之前,读取器将能够判断给定消息是否已经处理。

复述,流,作为一个可靠的流通信通道,是一个“至少一次”系统。当通过一个消费者组读取一个流时,Redis会记住哪个事件被分配给哪个消费者。然后,消费者有责任正确地确认消息已被成功处理。当一个使用者死亡时,一个事件可能仍然被卡住。为了解决这个问题,消费者组提供了一种检查挂起消息状态的方法,并在必要时将事件重新分配给另一个消费者。

我们在上面提到事务(和原子操作)是实现幂等的主要方法。为了解决这个问题,Redis Transactions和Lua脚本允许使用全有或全无事务语义组合多个命令。

复述,Pub / Sub是一个至多一次消息传递系统,允许发布者向一个或多个通道广播消息。更准确地说,Redis Pub/Sub是为实例之间的实时通信而设计的,低延迟是最重要的,因此没有任何形式的持久性或确认。其结果是最精简的实时消息系统,非常适合金融和游戏应用,因为在这些应用中,每一毫秒都很重要。

为什么是万博体育彩Redis企业版?

万博体育彩Redis Enterprise是基于无共享、对称的建筑这使得数据集的大小可以线性无缝地增长,而不需要更改应用程序代码。

万博体育彩Redis Enterprise提供了多种高可用性和地理分布模型,可以在需要时为用户提供本地延迟。

多个持久性选项(每写的AOF、每秒的AOF和快照)不会影响性能,确保您不必在发生故障后重新构建数据库服务器。

支持非常大的数据集通过使用对内存(RAM、持久内存或Flash)的智能分层访问,确保您可以扩展数据集以满足用户的需求,而不会显著影响性能。

如何使用一个Pub/Sub与Redis企业万博体育彩

Redis Streams和Pub/Sub在不同的编程语言中有稳定的api,所以下面的Python示例可以很容易地翻译成你选择的语言。

连接到复述:

进口复述,
#连接到本地redis实例
r =复述。复述,(host='localhost', port=6379, db=0)

写入流:

event = {"eventType": "purchase", "amount": 5, "item_id": "XXX"}
r.xadd(“stream_key”、‘*’事件)
# ' * '表示redis自动生成和事件id

直接读取流:

Last_id = '$' # '$'表示只有新消息
而真正的:
事件= r.xread({"stream_key": last_id}, block=0, count=10)
对于事件中的_,e:
Print (f"新事件,数量:{e['amount']}")
last_id = e(“id”)

通过消费者群体读取流:

#从读取任何潜在的未决事件开始
#以前不知道的(例如,
#因为崩溃)。“0”表示挂起事件。
pending = r.xreadgroup("service-1", " c2c ", {"stream_key": "0"})
pending_ids = []
对于_,e in pending:
Print (f"找到旧事件,amount: {e['amount']}")
pending_ids.append (e [' id '])
#将挂起事件标记为已处理
r.xack(“stream_key”、“service-1”* pending_ids)

#现在我们处理了所有之前的事件,
#开始要求新的。“>”表示“仅新增事件”。
而真正的:
事件= r.xreadgroup(" service-1 ", " consumer-A ", {" stream_key ": " > "}, count=10)
event_ids = []
对于事件中的_,e:
Print (f "新事件,数量:{e[' amount ']} ")
event_ids.append (e [' id '])
r.xack(“stream_key”、“service-1”* event_ids)
#如果我们在r之前坠毁。xack”,在重新加载,
#我们将重试此消息批处理。

处理一些事件,原子地承认和应用变化:

而真正的:
events = r.xreadgroup("service-1", "consumer-A", {"stream_key": ">"}, count=10)
event_ids = []

#启动redis事务
事务= r.multi ()
对于事件中的_,e:
事务。incrby (f”项目:{e[‘item_id}:总”,e[‘量’])
event_ids.append (e [' id '])
事务。xack(“stream_key”、“service-1”* event_ids)
transaction.exec ()
#如果我们在提交事务之前崩溃,没有
#操作将发生,以确保一致性。

在发布/订阅发布:

#发布一条消息到' redis '通道
r.publish(“复述”,“hello world”)

在Pub/Sub订阅频道:

子= r.pubsub ()
sub.subscribe(“复述”)
而真正的:
味精= sub.get_message ()
打印(f”新消息:{味精(“数据”)}”)

订阅发布/订阅模式:

子= r.pubsub ()
#该订阅将返回消息
#从所有以“红色”开头的频道。
sub.psubscribe(“红色*”)
而真正的:
味精= sub.get_message ()
Print (f"新消息在通道{msg['channel']}: {msg['data']}")


探索更多的


下一个步骤

Baidu