需求

  • Structured Streaming作为消费者读取kafka生产的内容
  • 输出内容到控制台

案例

启动kafka生产者

kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}object kafkaSource extends App {//构建spark Sessionprivate val session: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()//读取kafka上内容,指定format为kafka,以及订阅主题,服务器地址private val df: DataFrame = session.readStream.format("kafka").option("kafka.bootstrap.servers","mypc01:9092,mypc02:9092,mypc03:9092").option("subscribe","pet")//可选项,设置offset.option("staringOffsets", "earliest")// 可选项,设置消费者组.option("kafka.consumer.commit.groupid", "test1").load()//输出数据到控制台
//不可以用show方法df.writeStream.outputMode(OutputMode.Update()).format("console").start().awaitTermination()
}

之后利用生产者生产数据,即可在控制台看到输出,示例如下

因为没有key,所以为null,值的类型为二进制,需要处理一下才能看到

Batch: 4
-------------------------------------------
+----+-------+-----+---------+------+--------------------+-------------+
| key|  value|topic|partition|offset|           timestamp|timestampType|
+----+-------+-----+---------+------+--------------------+-------------+
|null|[64 64]|  pet|        2|   399|2020-12-09 17:33:...|            0|
|null|[66 66]|  pet|        0|   371|2020-12-09 17:33:...|            0|
|null|[65 65]|  pet|        1|   388|2020-12-09 17:33:...|            0|
+----+-------+-----+---------+------+--------------------+-------------+

在上面代码的输出前加一句,解析二进制内容为字符串

 private val df1: DataFrame = df.selectExpr("cast(value as String)")

输出结果示例

|value|
+-----+
|   bb|
|   cc|
+-----+

因为只选择了1列,所以只会输出一列,也可以多选

private val df1: DataFrame = df.selectExpr("cast(value as String)","offset")

结果示例如下

+-----+------+
|value|offset|
+-----+------+
|   aa|   404|
+-----+------+

总结

  • kafka source需要指定format,kafka.bootstrap.servers,以及subscribe属性
  • kafka读取的内容为二进制内容,需要进行解码方便阅读

kafka--Struct Streaming--console案例入门相关推荐

  1. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  2. kafka channle的应用案例

      kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...

  3. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  4. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

  5. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...

    Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...

  6. Kafka之四:Kafka与Streaming集成

    Kafka之四:Kafka与Streaming集成 文章目录 Kafka之四:Kafka与Streaming集成 1. 修改IEDA的maven配置 2. 程序一 3. 程序二:统计次数 4. 提交任 ...

  7. 快速入门Struts2㊀一个案例入门Struts2

    文章目录 1 Struct2 ? 2 一个案例入门Struct2 2.1 搭建项目环境 2.2 配置核心过滤器(web.xml) 2.3 添加用户.jsp与用户列表.jsp 2.4 编写action类 ...

  8. Kafka+Spark Streaming如何保证exactly once语义

    大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 在Kafka.Storm.Flink.Spark Streaming等分布式流处理系统中(没错,Ka ...

  9. 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战

    大数据Spark "蘑菇云"行动第76课:   Kafka+Spark Streaming+Redis项目实战 jedis插件 redis <dependency>   ...

  10. 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇

    大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...

最新文章

  1. 3D打印机分类与速度
  2. java面向对象第一章
  3. 每天一道LeetCode-----计算两个序列最长的公共子序列长度
  4. 1.1 torch_数据操作
  5. python通过封装可以实现代码复用_Python学习笔记(五)函数和代码复用
  6. Mac翻译系列软件推荐三:Mate Translate for Mac多国语言翻译工具
  7. 贝叶斯分析好坏_交易必读|浅谈贝叶斯分析
  8. 尔雅国学智慧课后答案
  9. 计算机网络ip地址划分范围,ip地址分类及范围划分有哪些
  10. 10个提升写作手法的方法
  11. 年中Flag挑战日榜:最终挑战王会花落谁家?
  12. 什么样的岗位会最先被人工智能 (AI) 取代?
  13. cdp备份和oracle备份,CDP与快照:两种不同数据保护方法
  14. 开山ORC螺杆膨胀发电机
  15. Retrofit2网络框架的使用
  16. Elasticsearch7.7修改network.host IP地址 start启动失败
  17. win10鼠标右击 新建文件夹 反应缓慢、迟钝
  18. spring-cloud-gateway报错Failed to bind properties under ‘‘ to org.springframework.cloud.gateway.handle
  19. Unity3D客户端在游戏场景中创建阻挡并用二进制导出
  20. 计算机网络是显性课程还是隐性课程,显性课程与隐性课程

热门文章

  1. JS 对象(Object)和字符串(String)互转方法
  2. iphone如何信任软件_如何在越狱后 iPhone 上多开软件?
  3. The path ‘E:\ZERO‘ does not belong to a directory.
  4. 外键查询_详解MySQL数据库删除所有表的外键约束、禁用外键约束相关脚本
  5. unity怎么设置游戏页面_王者荣耀李小龙粤语语音包怎么得?李小龙粤语语音包获取与设置方法介绍[多图] - 游戏攻略...
  6. matlab画迟滞迥线,[画图的问题]怎么画类似于磁滞回线的图像?一个x值对应两个y值的...
  7. 项目管理六大制约因素_项目管理的制约因素
  8. 3dm游戏运行包_权势纵横捭阖,战场龙血玄黄!三国志14火爆来袭电脑游戏
  9. 实验方法怎么写_小学作文怎么写?“把短句变长句”等3种方法帮孩子提高作文水平!...
  10. java接口多态的变量能_「JAVA」多态的灵魂,面向接口的程序设计,这才是你该懂得的接口(interface)...