快速数据摄取

什么是快速数据摄取?

收集,存储和处理大量的高繁多,高速数据呈现出几种复杂的设计挑战 - 特别是在物联网(物联网),电子商务,安全,通信,娱乐,金融和零售等领域。鉴于响应性,及时和准确的数据驱动决策是这些业务的核心,实时数据收集和分析至关重要。

提供实时数据分析的重要第一步是确保有效地捕获快速数据流的充分资源。虽然物理基础架构(包括高速网络,计算,存储和内存)在此处播放重要角色,但软件堆栈必须与其物理层或组织的性能匹配,可能最终有一个大量的数据,缺少数据万博电竞客服,或不完整,误导性数据。

快速数据摄取的挑战和最佳实践

高速数据摄取通常涉及不同类型的复杂性:

  1. 到达突发的大量数据:BURSTY数据需要一个能够处理大量数据的解决方案,具有最小的延迟。理想情况下,使用最小资源,它应该能够使用子毫秒延迟执行数百万的写入。
  2. 来自多个源/格式的数据:数据摄取解决方案还必须足够灵活,以处理多种不同格式的数据,在需要时保留源标识,并实时进行转换或规范化。
  3. 需要过滤,分析或转发所需的数据:大多数数据摄取解决方案都有一个或多个消耗数据的用户。这些通常是在与多种假设的相同或不同位置中起作用的不同应用程序。在这种情况下,数据库不仅必须转换数据,还必须根据消费应用程序的要求过滤或聚合它。
  4. 管理生产者和各种类型的消费者之间的稳定数据通道:如果数据到达模式不是连续的,那么生产者和消费者需要一个频道,它将让它们异步传输数据。该通道还必须适用于连接丢失和硬件故障。在许多用例中,生产者和消费者无法以相同的速度运行。这可以导致数据积压,以延迟消费者在数据上行动。
  5. 来自地理分布来源的数据:在这种情况下,底层架构通常可以方便地将数据收集节点分布在靠近源的地方。这样,节点本身就成为快速数据摄取解决方案的一部分,以收集、处理、转发或重新路由摄取数据。

Redi万博体育彩s企业版是如何轻松快速获取数据的

以最少的服务器数量实现高性能
谈到性能时,Redis Enterprise一直在万博体育彩基准测试要处理每秒200多万次读/写操作,则在AWS上仅具有40节点集群的子毫秒延迟。这使得Redis En万博体育彩terprise成为市场上最资产的高效NoSQL数据库。

灵活的数据结构和实时分析模块:Redis流,Pub/Sub,列表,排序集,RedisTimeSeries
Redis提供各种数据结构,如流,列表,集,排序集和散列,提供简单且多功能的数据处理,以便有效地结合高速数据摄取和实时分析。

Redis'Pub / sub功能允许它作为在地理上分布式数据摄取节点之间的高效消息代理。数据生成应用程序将流数据发布到所需格式的频道,并消费应用程序订阅与它们相关的那些通道,以其异步接收消息。

列表和排序集可以用作连接生产者和消费者的数据通道。您还可以使用这些数据结构来异步传输数据。与发布/订阅不同,列表和排序集提供持久性。

流还可以做更多,提供生产者和消费者之间的持久数据摄取通道。使用Streams,您可以使用消费者组来扩展消费者的数量。当消费者在消费和处理数据时出现故障时,消费者组还实现类似事务的数据安全。

最后重新定位提供增强的快速数据摄取功能集,包括下注采样,上次摄入值的特殊计数器操作,以及双倍Δ压缩与实时分析功能相结合,如具有内置搜索,聚合,范围查询和构建的数据标签,以及构建的数据标签在连接器到领先的监控和分析工具,如Grafana和Prometheus。

主动主动地理分配部署
万博体育彩复述,企业的CRDTs-based active - active技术在Geo位置实现复杂的数据摄取和消息传递操作,并使应用程序以完全分布式方式部署,以显着提高可用性和应用响应时间。

使用SSD和持久存储器扩展Redis DRAM
万博体育彩复述,企业的复述在闪光技术使扩展DRAM SSD和持久的记忆,允许存储很大的tb数据集使用相同的基础设施成本的基于磁盘的数据库和数据库延迟,同时保持在毫秒级的水平,即使摄取物品超过1米/秒的复述,企业集群的每个节点上。万博体育彩

如何实现快速数据摄取与Redis

以下是在Java中编写的一些代码片段。他们都使用Jedis库。首先,按照JEDIS的说明进行操作开始页面下载最新版的绝地武士。

  1. 使用Redis Streams进行快速数据摄取
    1. 将消息发布到流数据结构。此程序使用XADD将新项目添加到流中。文件名:StreamPublish.java。导入java.util.hashmap;
      导入java.util.map;

      导入redis.clients.jedis.jedis;
      导入redis.clients. jdis . streamentryid;
      静态Jedis Jedis = New Jedis(“localhost”,6379);公共静态void main(String [] args)抛出异常{
      尝试 {
      Map kv = new HashMap();
      kv.put(“a”,“100”);//键 - > a;价值 - > 100
      jedis.xadd(“mystream”,streamentryid.new_entry,kv);
      }最后 {
      jedis.close ();
      }
      }
      }
    2. 异步地使用流中的数据。如果流为空,则等待消息。这个程序使用XREAD命令。文件名称:StreamConsumeAsync.java。导入java.util.abstractmap.simpleentry;
      导入java.util.hashmap;
      导入java.util.list;
      导入java.util.map;
      进口java.util.Map.Entry;

      导入redis.clients.jedis.jedis;
      进口redis.clients.jedis.StreamEntry;
      public class StreamConsumeAsync{static Jedis = new Jedis(" localhost ", 6379);public static void main(String[] args) throws Exception{//从0开始。对于后续查询,从最后一个id + 1读取
      字符串laststreamDataid =“0-0”;
      int count = 1000;
      long waitTimeInMillis = 5000;try {
      //在循环中异步阅读新数据
      而(真){
      List next = getNext(" MyStream ", lastStreamDataId,
      伯爵·瓦蒂米米尔(Waittimeinmillis);
      如果(下一个!= null){
      list stlist = getstreamentries(下一个);
      if(stlist!= null){
      //在这里处理数据
      for (int j = 0;j < stList.size ();j + +) {
      streamentry streamdata =(streamentry)stlist.get(j);//读取数据流的字段(键值对)
      地图字段= streamData.getFields();//从最后一个+ 1读取后续数据
      LastStreamDataid = StreamData.getId()。GetTime()
      +“-”
      +(StreamData.getID()。GetSequence()+ 1);system.out.println(stlist.get(j));
      System.out.println (lastStreamDataId);
      }
      其他}{
      system . out。println("流中没有新数据");
      }
      }
      } }最后 {
      jedis.close ();
      }
      } //从流中读取下一组数据
      私有静态列表GetNext(String StreamID,String Lastid,Int Count,Long WaittimeInmillis)抛出异常{
      HashMap map = new HashMap();
      String ReadFrom = Lastid;
      Map.put(StreamID,新的streamentId(Readfrom));
      列表列表= jedis.xread(count,waittimeinmillis,
      (条目<字符串,StreamEntryID >)
      .toArray map.entrySet () () [0]);
      返回清单;
      } //读取流条目
      //假设简化列表只有一条流
      private static List getStreamEntries(List streamList) throws Exception{
      如果(streamList.size () > 0) {
      SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
      返回(list )stentry.getvalue();
      }返回null;
      }
      }
    3. 使用XRange命令查询流。文件名:StreamQuery.java导入java.util.list;
      导入java.util.map;

      导入redis.clients.jedis.jedis;
      进口redis.clients.jedis.StreamEntry;
      public class StreamQuery{static Jedis Jedis = new Jedis(" localhost ", 6379);public static void main(String[] args) throws Exception{String streamID = " MyStream ";
      start = new StreamEntryID(0,0);
      streamentryid end = null;// null - >直到流中的最后一个项目
      int count = 2;尝试{
      list stlist = jedis.xrange(StreamID,开始,结束,计数);
      if(stlist!= null){
      //在这里处理数据
      for (int j = 0;j < stList.size ();j + +) {
      streamentry streamdata =(streamentry)stlist.get(j);system.out.println(StreamData);//读取数据流的字段(键值对)
      地图字段= streamData.getFields();//从最后一个+ 1读取后续数据
      StreamEntryID nextStart =
      .getTime新StreamEntryID (streamData.getID () (),
      (streamData.getID () .getSequence () + 1));
      }
      其他}{
      system . out。println("流中没有新数据");
      }
      }最后 {
      jedis.close ();
      }
      }
      }
  2. 使用Pub/Sub快速摄取数据
    1. 发布到频道。文件名:pubsubpublish.java导入redis.clients.jedis.jedis;

      公共类PubSubPublish {
      静态绝地武士=新绝地武士(" localhost ", 6379);
      公共静态void main(String [] args)抛出异常{TRY {
      字符串channel = " MyChannel ";
      字符串消息=“你好!”;
      能。发布(通道、消息);
      }最后 {
      jedis.close ();
      }
      }
      }
    2. 订阅频道。文件名:pubsubpublish.java导入redis.clients.jedis.jedis;
      导入redis.clients.jedis.jedispubsub;

      公共类Pubsubsubscribe扩展了JedisPubsub {
      静态Jedis Jedis = New Jedis(“localhost”,6379);公共静态void main(String [] args)抛出异常{TRY {
      PubSubSubscribe mySubscriber = new PubSubSubscribe();
      字符串channel = " MyChannel ";
      能。订阅(mySubscriber、通道);
      }最后 {
      jedis.close ();
      }
      } / /接收消息
      @Override
      public void onMessage(String channel, String message) {
      system.out.println(消息);
      }
      }
  3. 使用列表快速摄取数据
    1. 将数据推送到列表。文件名称:ListPush.java导入redis.clients.jedis.jedis;

      public class ListPush {
      静态绝地武士=新绝地武士(" localhost ", 6379);
      公共静态void main(String [] args)抛出异常{TRY {
      String list = " MyList ";
      字符串消息=“你好!”;
      jedis.lpush(列表,消息);
      }最后 {
      jedis.close ();
      }
      }
      }
    2. 从列表中流行数据。文件名:listpop.java导入redis.clients.jedis.jedis;

      public class ListPop {
      静态绝地武士=新绝地武士(" localhost ", 6379);
      公共静态void main(String [] args)抛出异常{TRY {
      String list = " MyList ";
      字符串消息= jedis.rpop(list);
      system.out.println(消息);
      }最后 {
      jedis.close ();
      }
      }
      }

探索更多


下一个步骤

Baidu