Roshan Kumar是Redis Labs的高级产品经理。

Redis Streams是一种新的Redis数据结构,可让您管理生产者和消费者之间的数据通道。 在本文的前两篇文章中,我介绍了Redis Streams的基本知识和好处以及如何使用Redis Streams使用者组扩展数据流使用者。 在本文中,我将演示如何使用Redis Streams开发数据流处理应用程序。

为了使Redis Streams栩栩如生,让我们构建一个现实世界的解决方案来识别Twitter上的主要影响者。 我将这个应用程序称为TopSocialForce,它将从Twitter收集社交消息,将它们存储在Redis Streams中,并进行实时分析。 TopSocialForce将通过将具有10,000多个关注者的任何Twitter句柄标记为“影响者”并维护正在运行的目录来发现顶级社会力量。

TopSocialForce将通过名为PubNub的第三方服务收集Twitter流数据。 (另一种选择是GNIP。)这些推文遵循Twitter描述的JSON格式 ,因此每个推文都包含Twitter句柄及其关注者计数,我们将其用于影响者分类。

Redis实验室

图1. TopSocialForce的工作方式。

使用TopSocialForce,我们面临着一些挑战。 这些推文是随机到达的,并且每秒发送数千条推文。 当影响者发布消息时,我们必须利用这一机会来标记影响者,因为我们无法预测该帐户何时会再次发布消息。 因此,我们不能忽略任何推文。 TopSocialForce不仅必须能够抵抗连接丢失或软件故障,还必须具有适当的数据库和数据结构来保存和处理所有推文。

Redis Streams应用程序组件

具有Redis Streams数据结构的Redis数据库使我们可以在满足这些要求的同时收集,存储和驱动Twitter feed的处理。 在此示例中,我们将使用称为Lettuce的Redis客户端库在Java平台上构建TopSocialForce。 Redis客户端库也支持其他编程语言,因此您可以选择最适合您特定需求的语言平台和库。

由于TopSocialForce从名为PubNub的服务接收Twitter流,因此我们还需要PubNub的Java库。 具体来说,我们堆栈的组件包括:

  • 数据库:Redis 5.0
    Redis 5.0中提供了新的Redis Streams数据结构。
  • 语言平台:Java 1.8
    我们选择Java是因为它很受欢迎,但是您可以使用支持Redis Streams的许多其他编程语言来编写应用程序。 查看Redis客户端的完整列表以获取更多信息。
  • 样本Twitter feed:PubNub
    PubNub提供了一个用于Twitter数据的流API,以及一个免费版本,为开发人员提供了一组有限的推文。 因为我们开发TopSocialForce作为概念证明,所以我们使用了PubNub的免费服务。
  • Redis库: 生菜5.0
    Lettuce是Redis流行的Java客户端库。 Lettuce从Lettuce 5.0开始支持Redis流。 在撰写本文时,Lettuce 5.0是一个beta版本。 (注意: Redisson是另一个支持Redis Streams的Java库。)

Redis Streams解决方案:Twitter接收流和Twitter影响者分类器

TopSocialForce由两个主要过程组成,即Twitter接收流和Twitter影响者分类器。 这两个过程都使用Redis Streams作为基础流数据库来存储Twitter feed。

确保解决方案可以抵抗连接丢失

传统的消息传递模块(例如发布/订阅)要求发布者和订阅者始终处于执行状态,并始终连接到公共渠道。 但是,Redis Streams没有该限制。 相反,Redis Streams为流数据提供了持久数据存储。 如下图2和图3所示,我们的Redis流将输入流与影响者分类器解耦。

Redis实验室

图2.即使使用者断开连接,数据摄取流仍将继续收集数据。

Redis实验室

图3.接收流断开时,使用者必须等待新数据到达。

确保数据不会在传输中丢失

Redis支持持久性和复制,以确保您的数据不会丢失。 启用持久性后,Redis将数据保存到磁盘以防万一需要恢复。 通过复制,您还可以部署存储数据副本的副本服务器,以使静态数据具有持久性。 但是,当使用者使用数据时,通常它将数据从Redis移出到使用者的流程。 如果数据在过渡期间丢失,则可能会永远丢失。 Redis Streams通过在消费者组中使用明确的“ ack”命令来缓解此问题。 在上一篇文章中,我详细介绍了消费者群体。

在我们的TopSocialForce应用程序中,影响者分类器进程具有一个属于消费者组的消费者,称为ImpactrConsumerGroup。 传输中的数据的生命周期如下:

  1. 我们的影响者分类器调用Redis Stream的XREADGROUP命令。
  2. 为了响应该呼叫,Redis做两件事:
    1. 将最新数据对象复制到该使用者的待处理对象列表中。
    2. 将对象交付给消费者。
  3. 使用者(影响者分类器)处理数据并发送XACK,这提示Redis从待处理列表中删除数据。
Redis实验室

图4. Redis Streams数据生命周期可确保传输期间的数据安全。

如果消费者在传输过程中或处理过程中丢失了数据,则数据保留在待处理列表中,因此不会丢失。 影响者分类程序始终可以使用XREADGROUP命令从待处理列表中检索数据。

影响者目录数据结构

TopSocialForce使用内置的Redis数据结构(即排序集和哈希)来存储影响者数据。 在Sorted Set中,我们将Twitter句柄及其关注者计数存储为每个分数(请参见图5)。 我们对每个影响者使用带有密钥的哈希数据结构来存储其帐户详细信息。 哈希值由关键影响者:[twitter handle]索引。 图6显示了示例哈希数据结构。

Redis实验室

图5. Redis的Sorted Set数据结构用于存储影响者。

Redis实验室

图6. Redis的Hash数据结构用于存储影响者详细信息。

Redis Streams应用程序设计:类层次结构和关系

因为我们使用Java作为语言平台,所以我们利用Java的面向对象的编程功能使TopSocialForce易于扩展。 下面的图7中的UML类图给出了我们的类结构的一般概念,您可以在GitHub上找到这个完整的包。 在下一节中,我将在较高级别上解释程序的组织方式和作用。 随时阅读程序内部的文档以获取更多详细信息。

共同组成

  • LettuceConnection.java:这是一个实用程序类,用于通过Lettuce库管理我们与Redis数据库的连接。 包中的其他类使用LettuceConnection对象连接到Redis。
  • InitializeConsumerGroup.java:在此类中,我们创建了Redis Streams数据结构和一个新的使用者组。 这样,我们初始化了数据库以从生产者那里接收新数据并将其传递给消费者。 该程序只需要运行一次即可初始化数据结构。

摄取流(生产者)

  • IngestStream.java:这是一个父类,具有任何种类的摄取流程序所需的通用属性和方法集。
  • TwitterIngestStream.java:此类扩展了IngestStream,继承了其所有受保护的属性和公共属性和方法。 除了通用功能之外,TwitterIngestStream还添加了特定于Twitter数据流的功能。

影响者分类器(消费者)

  • ImpactrCollectorMain.java:这是启动该过程的主要Java类。
  • MessageProcessor.java:这是一个Java接口,用于定义processMessage()方法。
  • StreamConsumer.java:这是一个通用类,可在新数据到达我们的Redis Stream时读取它。 StreamConsumer扩展了Thread,因此可以作为单独的线程运行。 StreamConsumer对象将新数据传递给已注册的MessageProcessor对象。
  • ImpactrMessageProcessor.java:此类实现MessageProcessor接口。 在我们的程序中,我们将一个ImpactrMessageProcessor对象作为MessageProcessor类型传递给StreamConsumer。 然后ImpactorMessageProcessor使用Sorted Set和Hash数据结构将我们的影响者数据存储在Redis中。 您可以实现自己的MessageProcessor版本,并将其传递给StreamConsumer。
Redis实验室

图7.显示为UML类图的TopSocialForce应用程序的Java类。

运行和测试Redis Streams应用程序

我们在GitHub上的文档提供了有关如何编译和运行这些程序的详细信息。 但是,在开始之前,请确保在LettuceConnection.java中设置了正确的连接参数。 简而言之,只需执行以下步骤:

  1. InitializeConsumerGroup :该程序初始化Redis流及其使用者组。 您必须先运行此程序,然后再收集和处理Twitter数据,并且在初始化数据结构后,该数据将持续存在。
  2. TwitterIngestStream :在编译TwitterIngestStream.java之前,请确保已更新PubNub连接凭据。 当您编译并运行该程序时,它将连接到PubNub并开始接收Twitter数据流。 然后,程序使用我们在上一步中初始化的Redis Streams数据结构存储数据。 TwitterIngestStream连续读取Twitter数据,并且不会自行终止。
  3. ImpactrCollectorMain :此类将StreamConsumer作为单独的线程启动。 StreamConsumer读取Redis流中的新数据,并将其传递给MessageProcessor对象。 StreamConsumer线程对Redis Streams进行阻塞调用。 因此,它不会自行终止,而是等待Redis流中的新数据(如果没有)。

当您执行TwitterIngestStream和ImpactrCollectorMain时,请确保测试以下各项:

  1. TwitterIngestStream和ImpactrCollector正在运行。
  2. ImpactrCollector对象的实例正在使用数据。
  3. 失败方案:TwitterIngestStream关闭。
  4. 失败方案:影响者收集器已关闭。

验证Redis流中的数据

验证数据的方法不只一种。 在这里,我将使用redis-cli界面显示一些示例命令:

Redis流中的数据

数据在增长吗? 几次运行以下命令以查看计数是否正在更改。

XLEN twitterstream

如何了解有关Redis流中数据的更多信息? 下面的命令显示基本信息,例如流的长度,最后生成的ID,第一个条目,最后一个条目等。

XINFO STREAM twitterstream

我可以获取流中附加的消费群体列表吗? 以下命令列出了与流关联的所有组。

XINFO GROUPS twitterstream

消费者组中的消费者是否闲置或正在运行? 此命令列出了使用者组影响者分类器中的所有使用者,它们的空闲时间以及要处理的待处理项目的数量。

XINFO CONSUMERS twitterstream influencerclassifiers

影响者目录

到目前为止,我收集了多少影响者?

ZCARD influencers

ABCD说,关于网红的细节如何?

HGETALL influencer:ABCD

Redis Streams的后续步骤

我关于Redis Streams的三部分系列文章从介绍Redis Streams在不同用例开始。 在第二篇文章中,我从生产者和消费者的角度介绍了Redis Streams数据生命周期的详细信息。 在最后一篇文章中,我展示了如何使用我们前面介绍的概念来使用Redis Streams开发端到端应用程序。

在扩展诸如TopSocialForce之类的应用程序时,无论您要管理多少个数据生产者或消费者,Redis Streams都将保持光滑,简单且稳定的状态,这将使您体会到。 我衷心希望本系列文章能为您如何从新的Redis Streams数据结构中受益提供一些思路。 从Redis 5.0开始, Redis中提供Redis Streams。 它在Redis Enterprise中也可用,它通过内存中复制提供了高可用性和持久性。

Roshan Kumar是 Redis Labs 的高级产品经理 他在软件开发和技术营销方面拥有丰富的经验。 Roshan在惠普和许多成功的硅谷初创公司工作,包括ZillionTV,Salorix,Alopa和ActiveVideo。 作为一个热情的程序员,他设计并开发了mindzeal.com,这是一个为年轻学生提供计算机编程课程的在线平台。 Roshan拥有计算机科学学士学位,并拥有圣塔克拉拉大学的MBA学位。

-

新技术论坛提供了一个以前所未有的深度和广度探索和讨论新兴企业技术的场所。 选择是主观的,是基于我们对InfoWorld读者认为最重要和最感兴趣的技术的选择。 InfoWorld不接受发布的营销担保,并保留编辑所有贡献内容的权利。 将所有查询发送到 newtechforum@infoworld.com

From: https://www.infoworld.com/article/3323064/how-to-use-redis-streams.html

如何构建Redis Streams应用程序相关推荐

  1. redis streams_初步了解Redis Streams以及如何在Java中使用它们

    redis streams 自今年年初以来,Redis Streams已进入Redis的unstable分支,并且第一个客户开始采用Redis Streams API. 因此,这是一个绝佳的时间,可以 ...

  2. 初步了解Redis Streams以及如何在Java中使用它们

    自今年年初以来,Redis Streams已进入Redis的unstable分支,并且第一个客户端始于采用Redis Streams API. 因此,这是一个绝佳的时机,可以从客户端角度看一下Redi ...

  3. Redis Streams 介绍

    Stream是Redis 5.0版本引入的一个新的数据类型,它以更抽象的方式模拟日志数据结构,但日志仍然是完整的:就像一个日志文件,通常实现为以只附加模式打开的文件,Redis流主要是一个仅附加数据结 ...

  4. 【译】Redis喜提新数据结构:Redis Streams

    本文是Redis作者antirez的一篇博客 原文地址:antirez.com/news/128 我们在Redis5版本迎来了一个新的数据结构,它的名字叫做"Streams".(撒 ...

  5. 优化redis key 迁移程序(云原生版本)

    文章目录 优化redis key 迁移程序(云原生版本) 问题 迁移程序的Dockerfile 优化 docker build 构建 docker run 启动程序 优化 存在的问题 没有权限的原因 ...

  6. Docker-高级篇(1)-Dockerfile(核心构建Redis构建JDK8)

    文章目录 一.基本介绍 二.体系结构 2.1 Docker保留字 2.2 案例解释 当前目录下标准文件名可以不指定路径 三.构建Redis 四.构建java8 一.基本介绍 Dockerfile用来构 ...

  7. 构建一个移动应用程序要花多少钱?

    构建一个移动应用程序要花多少钱? How much does it cost to build a mobile app? 不幸的是,对于一个移动应用程序的开发成本应该是多少这个问题,没有一个单一的答 ...

  8. 如何构建虚拟护士应用程序?

    如何构建虚拟护士应用程序? How to build a virtual nurse app like Sensely? 传统上,技术的进步引发了企业的变革.由最先进的计算机软件提供的交互式工具意味着 ...

  9. node mongoose_如何使用Express,Mongoose和Socket.io在Node.js中构建实时聊天应用程序

    node mongoose by Arun Mathew Kurian 通过阿伦·马修·库里安(Arun Mathew Kurian) 如何使用Express,Mongoose和Socket.io在N ...

  10. 电子界卡组构建2019_2018–2019年构建现代Android应用程序的路线图

    电子界卡组构建2019 Kriptofolio应用程序系列-简介 (Kriptofolio app series - Introduction) Welcome to this series of b ...

最新文章

  1. 一个ThreadLocal和面试官大战30个回合
  2. 多目标优化蚁群算法的matlab_深入浅出多目标优化10分钟多目标优化入门
  3. ylbtech-LanguageSamples-Arrays(数组)
  4. CLEARTEXT communication to xxx not permitted by network security policy
  5. 如果在系统里面无法格式化磁盘可以尝试以下方法
  6. java运行安全_Java运行时环境
  7. oracle 最近的sql语句,oracle最近执行的sql语句
  8. 刚刚,阿里云上线六大“战疫情”项目
  9. LINQ to CSV,一种类型安全,动态的高性能方法
  10. 《统计学习方法》—— 朴素贝叶斯方法、详细推导及其python3实现(一)
  11. COM中关于使用DLL的一些知识点
  12. 看拉扎维《模拟CMOS集成电路设计》的一些总结和思考(一)——绪论
  13. 第十三届蓝桥杯省赛Java-B组
  14. 教你如何选择弱电工程中使用的交换机?
  15. php8新特性全览【超详细】
  16. 高考2021加3科目成绩查询,官方解读:2021新高考实施方案正式公布,首选科目按原始分计入总成绩!...
  17. Justinmind使用教程(1)——概述部分
  18. Spark3 读写 S3 Parquet, Hive, Hudi
  19. 罗切斯特大学计算机科学硕士介绍,罗切斯特大学研究生计算机科学专业介绍
  20. sinon.stub_JavaScript测试工具对决:Sinon.js vs testdouble.js

热门文章

  1. Zend_Cache
  2. 计算机是如何存储矩阵,如何存储稀疏邻接矩阵(How to store sparse adjacency matrix)
  3. 微信小程序图片加载太慢;uni-app微信小程序加载图片优化;微信小程序图片image加载成功事件@load;图片加载成功触发@load事件
  4. CSS实现平行四边形
  5. Linux 下摄像头驱动支持情况(arm linux 同样适用)
  6. 小马哥robofly四轴代码解读:PWM电机输出
  7. 入职一个月老大教我如何在做测试中运用Linux
  8. Kteer软件 创建.ktr文件
  9. anaconda如何配置环境变量
  10. php receivemail下载,php receivemail,php mail,preceive