文章目录

  • 一、概述
    • 1.1 Kafka Streams
    • 1.2 Kafka Streams 特点
    • 1.3 为什么要有 Kafka Streams
  • 二、Kafka Streams 数据清洗案例
    • 0)需求
    • 1)需求分析
    • 2)案例实操
  • 三、总结

一、概述

1.1 Kafka Streams

Kafka Streams。Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的 库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。

1.2 Kafka Streams 特点

1. 功能强大

  • 高扩展性,弹性,容错

2. 轻量级

  • 无需专门的集群
  • 一个库,而不是框架

3. 完全集成

  • 100%的 Kafka 0.10.0 版本兼容
  • 易于集成到现有的应用程序

4. 实时性

  • 毫秒级延迟
  • 并非微批处理
  • 窗口允许乱序数据
  • 允许迟到数据

1.3 为什么要有 Kafka Streams

当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark Streaming 和 Apache Storm。Apache Storm 发展多年,应用广泛,提供记录级别的处理能力, 当前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,可以非常方便与图计算, SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。另外, 目前主流的 Hadoop 发行版,如 Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache Spark,使得部署更容易。

既然 Apache Spark 与 Apache Storm 拥用如此多的优势,那为何还需要 Kafka Stream 呢?

主要有如下原因。

第一,Spark 和 Storm 都是流式处理框架,而 Kafka Streams 提供的是一个基于 Kafka 的 流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难 了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Streams 作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。

第二,虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部署仍然相对复杂。而 Kafka Streams 作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。

第三,就流式处理系统而言,基本都支持 Kafka 作为数据源。例如 Storm 具有专门的 kafka-spout,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上,Kafka 基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka,此时使用 Kafka Streams 的成本非常低。

第四,使用 Storm 或 Spark Streaming 时,需要为框架本身的进程预留资源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即使对于应用实例而言,框架本身也会占 用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不 占用系统资源。

第五,由于 Kafka 本身提供数据持久化,因此 Kafka Streams 提供滚动部署和滚动升级以 及重新计算的能力。

第六,由于 Kafka Consumer Rebalance 机制,Kafka Stream 可以在线动态调整并行度。

二、Kafka Streams 数据清洗案例

0)需求

实时处理单词带有”>>>”前缀的内容。例如输入”aaa>>>bbb”,最终处理成 “bbb”

1)需求分析

2)案例实操

  1. 创建一个工程,并添加 jar 包
  2. 创建主类
package com.atguigu.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder; public class Application { public static void main(String[] args) { // 定义输入的 topic String from = "first"; // 定义输出的 topic String to = "second"; // 设置参数 Properties settings = new Properties();settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");StreamsConfig config = new StreamsConfig(settings); // 构建拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", from) .addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() { @Override public Processor<byte[], byte[]> get() { // 具体分析处理return new LogProcessor();} }, "SOURCE").addSink("SINK", to, "PROCESS"); // 创建 kafka streams KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }
}
  1. 具体业务处理
package com.atguigu.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext; public class LogProcessor implements Processor<byte[], byte[]> { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; }@Override public void process(byte[] key, byte[] value) { String input = new String(value); // 如果包含“>>>”则只保留该标记后面的内容 if (input.contains(">>>")) { input = input.split(">>>")[1].trim(); // 输出到下一个topic context.forward("logProcessor".getBytes(), input.getBytes());}else{context.forward("logProcessor".getBytes(), input.getBytes()); } }@Override public void punctuate(long timestamp) { }@Override public void close() { }
}

(4)运行程序

(5)在 hadoop104 上启动生产者

[root@hadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first >hello>>>world
>>aaa>>>bbb
>>hahaha

(6)在 hadoop103 上启动消费者

[root@hadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic second
world
bbb
hahaha

三、总结

  • Kafka Streams的并行模型完全基于Kafka的分区机制和Rebalance机制,实现了在线动态调整并行度;
  • 同一Task包含了一个子Topology的所有Processor,使得所有处理逻辑都在同一线程内完成,避免了不必的网络通信开销,从而提高了效率;
  • through方法提供了类似Spark的Shuffle机制,为使用不同分区策略的数据提供了Join的可能;
  • log compact提高了基于Kafka的state store的加载效率;
  • state store为状态计算提供了可能;
  • 基于offset的计算进度管理以及基于state store的中间状态管理为发生Consumer rebalance或Failover时从断点处继续处理提供了可能,并为系统容错性提供了保障;
  • KTable的引入,使得聚合计算拥用了处理乱序问题的能力;

【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结相关推荐

  1. 云计算设计模式(二十四)——仆人键模式

    云计算设计模式(二十四)--仆人键模式 使用一个令牌或密钥,向客户提供受限制的直接訪问特定的资源或服务,以便由应用程序代码卸载数据传输操作. 这个模式是在使用云托管的存储系统或队列的应用中特别实用,而 ...

  2. 异常处理程序和软件异常——Windows核心编程学习手札之二十四

    异常处理程序和软件异常 --Windows核心编程学习手札之二十四 CPU负责捕捉无效内存访问和用0除一个数值这种错误,并相应引发一个异常作为对错误的反应,CPU引发的异常称为硬件异常(hardwar ...

  3. 数字图像处理领域的二十四个典型算法及vc实现、第一章

    数字图像处理领域的二十四个典型算法及vc实现.第一章 作者:July   二零一一年二月二十六日. 参考:百度百科.维基百科.vc数字图像处理. --------------------------- ...

  4. OpenCV学习(二十四 ):角点检测(Corner Detection):cornerHarris(),goodFeatureToTrack()

    OpenCV学习(二十四 ):角点检测(Corner Detection):cornerHarris(),goodFeatureToTrack() 参考博客: Harris角点检测原理详解 Harri ...

  5. 20190827 On Java8 第十四章 流式编程

    第十四章 流式编程 流的一个核心好处是,它使得程序更加短小并且更易理解.当 Lambda 表达式和方法引用(method references)和流一起使用的时候会让人感觉自成一体.流使得 Java ...

  6. 华为鸿蒙参与者,一起来搞机 篇二十四:不止有鸿蒙,参加华为开发者大会是一种怎样的体验...

    一起来搞机 篇二十四:不止有鸿蒙,参加华为开发者大会是一种怎样的体验 2019-08-19 21:30:00 10点赞 6收藏 4评论 2019年8月9日至11日,HDC.2019华为开发者大会在广东 ...

  7. 美学心得(第二百二十四集)罗国正

    美学心得(第二百二十四集) 罗国正 (2021年7月) 2987.清朝画家方士庶认为:绘画是"于天地之外,别构一种灵奇"."因心造境,以手运心"."山 ...

  8. Power BI(二十四)power pivot之产品/客户分类分析(ABC分析)

    Power BI(二十四)power pivot之产品/客户分类分析(ABC分析) 之前我们介绍过帕累托分析,现在我们使用power pivot进行产品/客户分类分析(ABC分析) 我们就以产品ABC ...

  9. 数字图像处理领域的二十四个典型算法

    数字图像处理领域的二十四个典型算法及vc实现.第一章 一.256色转灰度图 二.Walsh变换 三.二值化变换 四.阈值变换 五.傅立叶变换 六.离散余弦变换 数字图像处理领域的二十四个典型算法及vc ...

  10. 【Microsoft Azure 的1024种玩法】二十四.通过Azure Front Door 的 Web 应用程序防火墙来对 OWASP TOP 10 威胁进行防御

    [简介] 我们都知道像 SQL 注入.跨站点脚本攻击(XSS)之类的恶意攻击以及 OWASP 发现的十大威胁都可能会导致服务中断或数据丢失,让 Web 应用程序所有者受到巨大威胁.那么如何有效的解决O ...

最新文章

  1. 【Flutter】Flutter 拍照示例 ( 创建应用 | 安装 image_picker 插件 )
  2. [转]Java——Servlet的配置和测试
  3. python闭环最短路径_python实现最短路径的实例方法
  4. Oracle 字符串函数
  5. 远程开发初探 - VS Code Remote Development
  6. [渝粤教育] 温州医科大学 医用高等数学 参考 资料
  7. 【渝粤题库】广东开放大学 传播学理论与实务 形成性考核
  8. 【Linux】查看文件内容的相关命令总结
  9. Python:为什么必须在方法定义和调用中明确使用'self'?
  10. C++:值传递、指针传递、引用传递
  11. 腾讯音乐计划以介绍形式在港交所主板二次上市
  12. 在ASP.NET中面向对象的编程思想
  13. ssms18还原数据_SSMS 18中的静态数据屏蔽
  14. json处理为字符串,主要函数,dumps和loads
  15. 项目01——图书进、销、存(jxc)系统(单机版)
  16. 图论与复杂网络建模工具Networkx的四种网络模型
  17. Spring Boot整合mybatis报错Invalid bound statement (not found)
  18. Python 第六章 面向对象编程(MD模式)
  19. linux测坏道脚本,linux测试硬盘坏道
  20. 2019依图科技笔试题

热门文章

  1. vue3使用dayjs
  2. 保险产业拥抱“大数据时代” 或带来颠覆性变革
  3. Mac版 微信 撤回消息拦截两种方法
  4. org.gradle.api.tasks.TaskExecutionException: Execution failed for task ':app:processDebugManifest
  5. Linux内核网络:实现与理论--介绍
  6. 关于 类的常成员函数 声明和定义处 都需要加 const的原因
  7. 中南大学计算机学院复试2021,34所自划线院校2021考研复试分数线-2021中南大学考研分数线已公布...
  8. encapsulation dot1q vlan-id命令
  9. macbook插入耳机不出声
  10. python创建列表以及列表的操作(插入-删除-索引-交换元素值-切片)