redis streams

Roshan Kumar是Redis Labs的高级产品经理。

Redis Streams是一种新的Redis数据结构,可让您管理生产者和消费者之间的数据通道。 在本文的前两篇文章中,我介绍了Redis Streams的基本知识和好处以及如何使用Redis Streams使用者组扩展数据流使用者。 在本文中,我将演示如何使用Redis Streams开发数据流处理应用程序。

为了使Redis Streams栩栩如生,让我们构建一个现实世界的解决方案,以识别Twitter上的主要影响者。 我将这个应用程序称为TopSocialForce,它将从Twitter收集社交消息,将它们存储在Redis Streams中,并进行实时分析。 TopSocialForce将通过将具有10,000多个关注者的任何Twitter句柄标记为“影响者”并维护正在运行的目录来发现顶级社会力量。

TopSocialForce将通过名为PubNub的第三方服务收集Twitter流数据。 (另一种选择是GNIP。)这些推文遵循Twitter描述的JSON格式 ,因此每个推文都包含Twitter句柄及其关注者计数,我们将使用它们进行影响者分类。

Redis实验室

图1. TopSocialForce的工作方式。

使用TopSocialForce,我们需要解决一些挑战。 这些推文是随机到达的,并且每秒发送数千条推文。 当影响者发布消息时,我们必须利用这一机会来标记影响者,因为我们无法预测该帐户何时会再次发布消息。 因此,我们不能忽略任何推文。 TopSocialForce不仅必须能够抵抗连接丢失或软件故障,而且还必须具有适当的数据库和数据结构来保存和处理所有推文。

Redis Streams应用程序组件

具有Redis Streams数据结构的Redis数据库使我们能够在满足这些要求的同时收集,存储和驱动Twitter feed的处理。 在此示例中,我们将使用称为Lettuce的Redis客户端库在Java平台上构建TopSocialForce。 Redis客户端库也支持其他编程语言,因此您可以选择最适合您的特定需求的语言平台和库。

由于TopSocialForce从名为PubNub的服务接收Twitter流,因此我们还需要PubNub的Java库。 具体来说,我们堆栈的组件包括:

  • 数据库:Redis 5.0
    Redis 5.0中提供了新的Redis Streams数据结构。
  • 语言平台:Java 1.8
    我们选择Java是为了使其受欢迎,但是您可以使用许多其他支持Redis Streams的编程语言来编写应用程序。 查看Redis客户端的完整列表以获取更多信息。
  • 样本Twitter feed:PubNub
    PubNub提供了一个用于Twitter数据的流API,以及一个免费版本,为开发人员提供了一组有限的推文。 因为我们开发TopSocialForce作为概念证明,所以我们使用了PubNub的免费服务。
  • Redis库: 生菜5.0
    Lettuce是Redis流行的Java客户端库。 Lettuce从Lettuce 5.0开始支持Redis流。 在撰写本文时,Lettuce 5.0是一个beta版本。 (注意: Redisson是另一个支持Redis Streams的Java库。)

Redis Streams解决方案:Twitter接收流和Twitter影响者分类器

TopSocialForce由两个主要过程组成,即Twitter接收流和Twitter影响者分类器。 这两个过程都使用Redis Streams作为基础流数据库来存储Twitter feed。

确保解决方案可以抵抗连接丢失

传统消息传递模块(例如发布/订阅)要求发布者和订阅者始终处于执行状态,并始终连接到公共渠道。 但是,Redis Streams没有该限制。 相反,Redis Streams为流数据提供了持久数据存储。 如下图2和图3所示,我们的Redis流将输入流与影响者分类器解耦。

Redis实验室

图2.即使使用者断开连接,数据摄取流仍将继续收集数据。

Redis实验室

图3.接收流断开时,使用者必须等待新数据到达。

确保数据不会在传输中丢失

Redis支持持久性和复制,以确保您的数据不会丢失。 启用持久性后,Redis将数据保存到磁盘以防万一需要恢复。 通过复制,您还可以部署存储数据副本的副本服务器,以使静态数据具有持久性。 但是,当使用者使用数据时,通常它将数据从Redis移出至使用者的流程。 如果数据在过渡期间丢失,则可能永远丢失。 Redis Streams通过在消费者组中使用明确的“ ack”命令来缓解此问题。 我在上一篇文章中详细介绍了消费者群体。

在我们的TopSocialForce应用程序中,影响者分类器流程具有一个属于消费者组的消费者,称为InfluencerConsumerGroup。 传输中的数据的生命周期如下:

  1. 我们的影响者分类器调用Redis Stream的XREADGROUP命令。
  2. 为了响应该呼叫,Redis做两件事:
    1. 将最新数据对象复制到该使用者的待处理对象列表中。
    2. 将对象交付给消费者。
  3. 使用者(影响者分类器)处理数据并发送XACK,这提示Redis从待处理列表中删除数据。
Redis实验室

图4. Redis Streams数据生命周期可确保传输期间的数据安全。

如果消费者在传输过程中或处理过程中丢失了数据,则数据保留在待处理列表中,因此不会丢失。 影响者分类程序始终可以使用XREADGROUP命令从待处理列表中检索数据。

影响者目录数据结构

TopSocialForce使用内置的Redis数据结构(即排序集和哈希)来存储影响者数据。 在Sorted Set中,我们将Twitter句柄及其关注者计数存储为每个分数(请参见图5)。 我们对每个影响者使用带有密钥的哈希数据结构来存储其帐户详细信息。 哈希值由关键影响者:[twitter handle]索引。 图6显示了示例哈希数据结构。

Redis实验室

图5. Redis的Sorted Set数据结构用于存储影响者。

Redis实验室

图6. Redis的Hash数据结构用于存储影响者详细信息。

Redis Streams应用程序设计:类层次结构和关系

因为我们使用Java作为语言平台,所以我们利用Java的面向对象的编程功能使TopSocialForce易于扩展。 下面的图7中的UML类图给出了我们的类结构的一般概念,您可以在GitHub上找到这个完整的包。 在以下部分中,我将在较高级别上解释程序的组织方式和作用。 随时阅读程序内部的文档以获取更多详细信息。

共同组成

  • LettuceConnection.java:这是一个实用程序类,它通过Lettuce库管理我们与Redis数据库的连接。 包中的其他类使用LettuceConnection对象连接到Redis。
  • InitializeConsumerGroup.java:在此类中,我们创建了Redis Streams数据结构和一个新的使用者组。 通过这样做,我们初始化了数据库以从生产者那里接收新数据并将其传递给消费者。 该程序只需要运行一次即可初始化数据结构。

摄取流(生产者)

  • IngestStream.java:这是一个父类,具有任何种类的摄取流程序所需的通用属性和方法集。
  • TwitterIngestStream.java:此类扩展了IngestStream,继承了其所有受保护的属性和公共属性和方法。 除了通用功能之外,TwitterIngestStream还添加了特定于Twitter数据流的功能。

影响者分类器(消费者)

  • ImpactrCollectorMain.java:这是启动该过程的主要Java类。
  • MessageProcessor.java:这是一个Java接口,用于定义processMessage()方法。
  • StreamConsumer.java:这是一个通用类,可在新数据到达我们的Redis Stream时读取它。 StreamConsumer扩展了Thread,因此可以作为单独的线程运行。 StreamConsumer对象将新数据传递给已注册的MessageProcessor对象。
  • ImpactrMessageProcessor.java:此类实现MessageProcessor接口。 在我们的程序中,我们将一个ImpactrMessageProcessor对象作为MessageProcessor类型传递给StreamConsumer。 然后ImpactorMessageProcessor使用Sorted Set和Hash数据结构将我们的影响者数据存储在Redis中。 您可以实现自己的MessageProcessor版本,并将其传递给StreamConsumer。
Redis实验室

图7. TopSocialForce应用程序的Java类显示为UML类图。

运行和测试Redis Streams应用程序

我们在GitHub上的文档提供了有关如何编译和运行这些程序的详细信息。 但是,在开始之前,请确保在LettuceConnection.java中设置了正确的连接参数。 简而言之,只需执行以下步骤:

  1. InitializeConsumerGroup :该程序初始化Redis流及其使用者组。 您必须先运行此程序,然后再收集和处理Twitter数据,并且在初始化数据结构后,该数据将持续存在。
  2. TwitterIngestStream :在编译TwitterIngestStream.java之前,请确保已更新PubNub连接凭据。 当您编译并运行该程序时,它会连接到PubNub并开始接收Twitter数据流。 然后,程序使用我们在上一步中初始化的Redis Streams数据结构存储数据。 TwitterIngestStream连续读取Twitter数据,并且不会自行终止。
  3. ImpactrCollectorMain :此类将StreamConsumer作为单独的线程启动。 StreamConsumer读取Redis流中的新数据,并将其传递给MessageProcessor对象。 StreamConsumer线程对Redis Streams进行阻塞调用。 因此,它不会自行终止,而是等待Redis流中的新数据(如果没有)。

当您执行TwitterIngestStream和ImpactrCollectorMain时,请确保测试以下各项:

  1. TwitterIngestStream和ImpactrCollector正在运行。
  2. ImpactrCollector对象的实例正在使用数据。
  3. 失败方案:TwitterIngestStream关闭。
  4. 失败方案:影响者收集器已关闭。

验证Redis流中的数据

验证数据的方法不只一种。 在这里,我将使用redis-cli界面显示一些示例命令:

Redis流中的数据

数据在增长吗? 几次运行以下命令以查看计数是否正在更改。

XLEN twitterstream

如何了解有关Redis流中数据的更多信息? 下面的命令显示基本信息,例如流的长度,最后生成的ID,第一个条目,最后一个条目等。

XINFO STREAM twitterstream

我可以获取流中附加的消费群体的列表吗? 以下命令列出了与流关联的所有组。

XINFO GROUPS twitterstream

消费者组中的消费者是否闲置或正在运行? 该命令列出了使用者组影响者分类器中的所有使用者,它们的空闲时间以及要处理的待处理项目的数量。

XINFO CONSUMERS twitterstream influencerclassifiers

影响者目录

到目前为止,我收集了多少影响者?

ZCARD influencers

ABCD说,关于网红的细节如何?

HGETALL influencer:ABCD

Redis Streams的后续步骤

我关于Redis Streams的三部分系列文章从介绍将Redis Streams用于不同用例开始。 在第二篇文章中,我从生产者和消费者的角度介绍了Redis Streams数据生命周期的详细信息。 在最后一篇文章中,我展示了如何使用我们前面介绍的概念来使用Redis Streams开发端到端应用程序。

当您扩展诸如TopSocialForce之类的应用程序时,无论您管理多少数据生产者或消费者,Redis Streams都将保持光滑,简单而稳定的状态。 我衷心希望本系列文章能为您如何从新的Redis Streams数据结构中受益提供一些思路。 从Redis 5.0开始, Redis中提供Redis Streams。 它在Redis Enterprise中也可用,它通过内存中复制提供了高可用性和持久性。

Roshan Kumar是 Redis Labs 的高级产品经理 他在软件开发和技术营销方面拥有丰富的经验。 Roshan曾在惠普和许多成功的硅谷初创公司工作,包括ZillionTV,Salorix,Alopa和ActiveVideo。 作为一个热情的程序员,他设计和开发了mindzeal.com,这是一个为年轻学生提供计算机编程课程的在线平台。 Roshan拥有计算机科学学士学位,并拥有圣塔克拉拉大学的MBA学位。

-

新技术论坛提供了一个以前所未有的深度和广度探索和讨论新兴企业技术的场所。 选择是主观的,是基于我们选择的技术,我们认为这些技术对InfoWorld读者来说是重要的,也是他们最感兴趣的。 InfoWorld不接受发布的营销担保,并保留编辑所有贡献内容的权利。 将所有查询发送到 newtechforum@infoworld.com

翻译自: https://www.infoworld.com/article/3323064/how-to-use-redis-streams.html

redis streams

redis streams_如何构建Redis Streams应用程序相关推荐

  1. redis streams_初步了解Redis Streams以及如何在Java中使用它们

    redis streams 自今年年初以来,Redis Streams已进入Redis的unstable分支,并且第一个客户开始采用Redis Streams API. 因此,这是一个绝佳的时间,可以 ...

  2. [译] 用 Redis 和 Python 构建一个共享单车的 app

    原文地址:Build a bikesharing app with Redis and Python 原文作者:Tague Griffith 译文出自:掘金翻译计划 本文永久链接:github.com ...

  3. dockerfile 构建 redis 镜像

    Dockfile是一种被Docker程序解释的脚本,Dockerfile由一条一条的指令组成,每条指令对应Linux下面的一条命令.Docker程序将这些Dockerfile指令翻译真正的Linux命 ...

  4. Docker-高级篇(1)-Dockerfile(核心构建Redis构建JDK8)

    文章目录 一.基本介绍 二.体系结构 2.1 Docker保留字 2.2 案例解释 当前目录下标准文件名可以不指定路径 三.构建Redis 四.构建java8 一.基本介绍 Dockerfile用来构 ...

  5. 浅析redis与zookeeper构建分布式锁的异同

    作者:架构小菜 链接:https://www.jianshu.com/p/508620a76e00 进程请求分布式锁时一般包含三个阶段:1. 进程请求获取锁:2. 获取到锁的进程持有锁并执行业务逻辑: ...

  6. 阅读准备-构建redis容器

    2019独角兽企业重金招聘Python工程师标准>>> ##Supervisor ##Centos Supervisor安装 ###要求 python环境 因为是一个 Python ...

  7. redis取出list最边的一个_这几个Redis使用技巧,让你的程序快如闪电

    一.Redis封装架构讲解 实际上NewLife.Redis是一个完整的Redis协议功能的实现,但是Redis的核心功能并没有在这里面,而是在NewLife.Core里面. 这里可以打开看一下,Ne ...

  8. Docker构建redis集群

    Docker构建redis集群 创建虚拟网卡 docker network create net-redis --subnet 172.38.0.0/16 查看: 配置6个redis配置文件 使用脚本 ...

  9. linux redis-trib.rb,linux 关于redis-trib.rb构建redis集群

    之前搭建集群漏下的坑, 今次再搭一次. 环境 ruby环境 yum install ruby rubygems -y redis的gem环境 gem install redis-3.2.2.gem 部 ...

  10. Redis高级项目实战,西安java程序员工资

    一面问题:MySQL+Redis+Kafka+线程+算法 mysql知道哪些存储引擎,它们的区别 mysql索引在什么情况下会失效 mysql在项目中的优化场景,慢查询解决等 mysql有什么索引,索 ...

最新文章

  1. element-ui中el-tree树形控件-树节点的选择(选中当前节点,获取当前id并且获取其父级id)...
  2. 好程序员Web前端分享无法忽视的JavaScript技巧
  3. 北京内推 | 微软亚洲研究院MSRA STCA招聘多模态算法实习生
  4. python 测试mysql数据库_Python操作MySQL数据库----继续安装和测试
  5. C++网络编程实例(初识多线程)
  6. Java Web实战篇-代码之美
  7. Charles安装破解和基础配置
  8. 使用Photon PUN创建简单对战游戏
  9. 春暖花开,放慢脚步,享受生活!
  10. 《网络是这样连接的》读书笔记2
  11. html5禁止显示相册,手机相册图片莫名被屏蔽,显示“涉嫌违规,系统审查中”半年了...
  12. WinForm中绘制网格线,Load和Piant事件区别
  13. Android Studio 修改 APP名称 和标题 为汉字
  14. Cesium结合高德SDK路径规划
  15. 《剑指Offer》题解与笔记(Java实现)
  16. 商业 v. s. 自由 ——W*ndows v. s. 现代UNIX
  17. 给系统添加个“任意门”日常设置来去自如
  18. 简单的弹窗应用(二)--AlertDialog
  19. 详细解读ARM寄存器之CPSR
  20. android studio logcat 配置颜色

热门文章

  1. systemctl status network.service命令,Failed to start LSB: Bring up/down networking.完美解决
  2. TNS-12547和TNS-12555错误解决
  3. 终于有一款视频社交应用打破微信的榜首垄断了,Faceu再登Appstore总榜第一
  4. 程序员必备算法——排列组合
  5. 计算机count是什么函数,计算机里COUNT是什么函数?怎么用的?好评!!
  6. verilog代码中避免出现latch方法
  7. python画图旋转图形_python简单实现旋转图片的方法
  8. 梅特勒托利多xk3124电子秤说明书_托利多电子秤设置说明
  9. 谁知道qq会员怎么退款呢
  10. K-means聚类算法原理及python实现