大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

暴走大数据点击右侧关注,暴走大数据!

Apache Kafka 是一个可扩展,高性能,低延迟的平台,允许我们像消息系统一样读取和写入数据。我们可以很容易地在 Java 中使用 Kafka。

Spark Streaming 是 Apache Spark 的一部分,是一个可扩展、高吞吐、容错的实时流处理引擎。虽然是使用 Scala 开发的,但是支持 Java API。

Apache Cassandra 是分布式的 NoSQL 数据库。在这篇文章中,我们将介绍如何通过这三个组件构建一个高扩展、容错的实时数据处理平台。

准备

在进行下面文章介绍之前,我们需要先创建好 Kafka 的主题以及 Cassandra 的相关表,具体如下:

在 Kafka 中创建名为 messages 的主题

$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \--zookeeper localhost:2181 \--replication-factor 1 --partitions 1 \--topic messages

在 Cassandra 中创建 KeySpace 和 table

CREATE KEYSPACE vocabularyWITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1    };USE vocabulary;CREATE TABLE words (word text PRIMARY KEY, count int);

上面我们创建了名为 vocabulary 的 KeySpace,以及名为 words 的表。

添加依赖

我们使用 Maven 进行依赖管理,这个项目使用到的依赖如下:

org.apache.sparkspark-core_2.112.3.0provided

org.apache.sparkspark-sql_2.112.3.0provided

org.apache.sparkspark-streaming_2.112.3.0provided

org.apache.sparkspark-streaming-kafka-0-10_2.112.3.0

com.datastax.sparkspark-cassandra-connector_2.112.3.0

com.datastax.sparkspark-cassandra-connector-java_2.111.5.2

数据管道开发

我们将使用 Spark 在 Java 中创建一个简单的应用程序,它将与我们之前创建的Kafka主题集成。应用程序将读取已发布的消息并计算每条消息中的单词频率。然后将结果更新到 Cassandra 表中。整个数据架构如下:

现在我们来详细介绍代码是如何实现的。

获取 JavaStreamingContext

Spark Streaming 中的切入点是 JavaStreamingContext,所以我们首先需要获取这个对象,如下:

SparkConf sparkConf = new SparkConf();sparkConf.setAppName("WordCountingApp");sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");

JavaStreamingContext streamingContext = new JavaStreamingContext(  sparkConf, Durations.seconds(1));

从 Kafka 中读取数据

有了 JavaStreamingContext 之后,我们就可以从 Kafka 对应主题中读取实时流数据,如下:

Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "localhost:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", false);Collection<String> topics = Arrays.asList("messages");

JavaInputDStreamString, String>> messages =  KafkaUtils.createDirectStream(    streamingContext,    LocationStrategies.PreferConsistent(),    ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

我们在程序中提供了 key 和 value 的 deserializer。这个是 Kafka 内置提供的。我们也可以根据自己的需求自定义 deserializer。

处理 DStream

我们在前面只是定义了从 Kafka 中哪张表中获取数据,这里我们将介绍如何处理这些获取的数据:

JavaPairDStream<String, String> results = messages  .mapToPair(      record -> new Tuple2<>(record.key(), record.value())  );JavaDStream<String> lines = results  .map(      tuple2 -> tuple2._2()  );JavaDStream<String> words = lines  .flatMap(      x -> Arrays.asList(x.split("\\s+")).iterator()  );JavaPairDStream<String, Integer> wordCounts = words  .mapToPair(      s -> new Tuple2<>(s, 1)  ).reduceByKey(      (i1, i2) -> i1 + i2    );

将数据发送到 Cassandra 中

最后我们需要将结果发送到 Cassandra 中,代码也很简单。

wordCounts.foreachRDD(    javaRdd -> {Map<String, Integer> wordCountMap = javaRdd.collectAsMap();for (String key : wordCountMap.keySet()) {        List wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));        JavaRDD rdd = streamingContext.sparkContext().parallelize(wordList);        javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)).saveToCassandra();      }    }  );

启动应用程序

最后,我们需要将这个 Spark Streaming 程序启动起来,如下:

streamingContext.start();streamingContext.awaitTermination();

使用 Checkpoints

在实时流处理应用中,将每个批次的状态保存下来通常很有用。比如在前面的例子中,我们只能计算单词的当前频率,如果我们想计算单词的累计频率怎么办呢?这时候我们就可以使用 Checkpoints。新的数据架构如下

为了启用 Checkpoints,我们需要进行一些改变,如下:

streamingContext.checkpoint("./.checkpoint");

这里我们将 checkpoint 的数据写入到名为 .checkpoint 的本地目录中。但是在现实项目中,最好使用 HDFS 目录。

现在我们可以通过下面的代码计算单词的累计频率:

JavaMapWithStateDStream> cumulativeWordCounts = wordCounts  .mapWithState(    StateSpec.function((word, one, state) -> {          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);          Tuple2 output = new Tuple2<>(word, sum);          state.update(sum);return output;        }      )    );

部署应用程序

最后,我们可以使用 spark-submit 来部署我们的应用程序,具体如下:

$SPARK_HOME$\bin\spark-submit \  --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \  --master local[2]  \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

最后,我们可以在 Cassandra 中查看到对应的表中有数据生成了。完整的代码可以参见 https://github.com/eugenp/tutorials/tree/master/apache-spark

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ?

maven依赖 spark sql_使用Kafka+Spark+Cassandra构建实时处理引擎相关推荐

  1. 利用Kafka和Cassandra构建实时异常检测实验

    导言 异常检测是一种跨行业的方法,用于发现事件流中的异常事件 - 它适用于物联网传感器,财务欺诈检测,安全性,威胁检测,数字广告欺诈以及许多其他应用程序.此类系统检查流数据以检查异常或不规则,并在检测 ...

  2. 使用Elasticsearch,Kafka和Cassandra构建流式数据中心

    在过去的一年里,我遇到了一些软件公司讨论如何处理应用程序的数据(通常以日志和metrics的形式).在这些讨论中,我经常会听到挫折感,他们不得不用一组零碎的工具,随着时间的推移将这些数据汇总起来.这些 ...

  3. spark两种kafka偏移量维护方式

    1.spark可以通过checkpoint的方式来维护kafka的偏移量,配置简单,只需要配置checkpoint的路径就可以完成偏移量的维护,如果本身spark业务就采用了state状态,那么既不需 ...

  4. maven依赖 spark sql_Spark开发实例

    本文将介绍如何实际动手进行 RDD 的转换与操作,以及如何编写.编译.打包和运行 Spark 应用程序. 启动 Spark Shell Spark 的交互式脚本是一种学习 API 的简单途径,也是分析 ...

  5. 最新spark,hive,flink,kafka,hadoop,zookeeper,flume,java,maven,Apache历史版本大全下载

    最新spark,hive,flink,kafka,hadoop,zookeeper,flume,java,maven,Apachek开源框架历史版本下载 TP通道 >>  www.apac ...

  6. spark streaming 消费 kafka入门采坑解决过程

    kafka 服务相关的命令 # 开启kafka的服务器 bin/kafka-server-start.sh -daemon config/server.properties & # 创建top ...

  7. [Kafka与Spark集成系列三] Spark编程模型

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  8. Java 版spark Streaming 维护kafka 的偏移量

    基于Direct  API  手动维护kafka 的偏移量,  将偏移量同步导了 redis 中, 我将对比较重要的代码拿出来说明, 完整代码在下方: 首先是通过Direct AIP 获取 JavaI ...

  9. kafka maven 依赖_SpringBoot入门建站全系列(二十八)整合Kafka做日志监控

    SpringBoot入门建站全系列(二十八)整合Kafka做日志监控 一.概述 Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端 ...

最新文章

  1. Java基于对象基础 基于对象和面向对象的区别(转)
  2. +z +Z compiler flag for HP
  3. python能表示多大整数_Python无法表示99999999999999999999这样大的整数。
  4. java.util.NoSuchElementException: Unable to validate object
  5. set集合python_python基础-set集合
  6. 小程序WXML基本使用
  7. 热血沙城-3.2移植-古月-cocos2dx源码
  8. p批处理替换目录下文本中的字符串
  9. C# Zip解压缩,规避 [content_types].xml 文件
  10. bzoj 2194 快速傅立叶之二
  11. 汇编语言寄存器相关知识(AX/BX/CX/DX+mov/add+物理地址+段+CS/IP+jmp)
  12. 腾讯会议共享屏幕 共享PPT视频声音
  13. 一年增加 1.2w 星,Dapr 能否引领云原生中间件的未来?
  14. React中实现防抖功能的两种方式
  15. java中这些判断空的用法,太优雅了
  16. 京东市值增近500亿,刘强东却接二连三卸任,这下的是什么棋?
  17. 如何在PostgreSQL13和以下版本中使用lz4压缩文本和二进制数据
  18. 51单片机定时器初值计算详解
  19. 2012年广州市户口搭户指南——可以搭到朋友家里?
  20. 公云(3322)动态域名指定ip解析脚本

热门文章

  1. [Kafka与Spark集成系列一] Spark入门
  2. 2021 音视频技术趋势不完全预测
  3. 解读腾讯极速高清AR-SR的画质改善机制
  4. 《长安十二时辰》背后的文娱大脑:如何提升爆款的确定性?
  5. 张睿:OpenH264拥有产品级的鲁棒性 欢迎contribute
  6. 腾讯海量存储与CDN的自动化运维
  7. Python生成器(send,close,throw)方法详解
  8. devStack安装OpenStack Ocata版本 (Linux Bridge+VLAN)
  9. linux部署Nexus OSS
  10. 【Java基础】static初始化块