kafka--Struct Streaming--console案例入门
需求
- Structured Streaming作为消费者读取kafka生产的内容
- 输出内容到控制台
案例
启动kafka生产者
kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}object kafkaSource extends App {//构建spark Sessionprivate val session: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()//读取kafka上内容,指定format为kafka,以及订阅主题,服务器地址private val df: DataFrame = session.readStream.format("kafka").option("kafka.bootstrap.servers","mypc01:9092,mypc02:9092,mypc03:9092").option("subscribe","pet")//可选项,设置offset.option("staringOffsets", "earliest")// 可选项,设置消费者组.option("kafka.consumer.commit.groupid", "test1").load()//输出数据到控制台
//不可以用show方法df.writeStream.outputMode(OutputMode.Update()).format("console").start().awaitTermination()
}
之后利用生产者生产数据,即可在控制台看到输出,示例如下
因为没有key,所以为null,值的类型为二进制,需要处理一下才能看到
Batch: 4
-------------------------------------------
+----+-------+-----+---------+------+--------------------+-------------+
| key| value|topic|partition|offset| timestamp|timestampType|
+----+-------+-----+---------+------+--------------------+-------------+
|null|[64 64]| pet| 2| 399|2020-12-09 17:33:...| 0|
|null|[66 66]| pet| 0| 371|2020-12-09 17:33:...| 0|
|null|[65 65]| pet| 1| 388|2020-12-09 17:33:...| 0|
+----+-------+-----+---------+------+--------------------+-------------+
在上面代码的输出前加一句,解析二进制内容为字符串
private val df1: DataFrame = df.selectExpr("cast(value as String)")
输出结果示例
|value|
+-----+
| bb|
| cc|
+-----+
因为只选择了1列,所以只会输出一列,也可以多选
private val df1: DataFrame = df.selectExpr("cast(value as String)","offset")
结果示例如下
+-----+------+
|value|offset|
+-----+------+
| aa| 404|
+-----+------+
总结
- kafka source需要指定
format,kafka.bootstrap.servers
,以及subscribe
属性 - 从
kafka
读取的内容为二进制内容
,需要进行解码方便阅读
kafka--Struct Streaming--console案例入门相关推荐
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...
- kafka channle的应用案例
kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0
如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装
一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...
Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...
- Kafka之四:Kafka与Streaming集成
Kafka之四:Kafka与Streaming集成 文章目录 Kafka之四:Kafka与Streaming集成 1. 修改IEDA的maven配置 2. 程序一 3. 程序二:统计次数 4. 提交任 ...
- 快速入门Struts2㊀一个案例入门Struts2
文章目录 1 Struct2 ? 2 一个案例入门Struct2 2.1 搭建项目环境 2.2 配置核心过滤器(web.xml) 2.3 添加用户.jsp与用户列表.jsp 2.4 编写action类 ...
- Kafka+Spark Streaming如何保证exactly once语义
大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 在Kafka.Storm.Flink.Spark Streaming等分布式流处理系统中(没错,Ka ...
- 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战
大数据Spark "蘑菇云"行动第76课: Kafka+Spark Streaming+Redis项目实战 jedis插件 redis <dependency> ...
- 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇
大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...
最新文章
- 3D打印机分类与速度
- java面向对象第一章
- 每天一道LeetCode-----计算两个序列最长的公共子序列长度
- 1.1 torch_数据操作
- python通过封装可以实现代码复用_Python学习笔记(五)函数和代码复用
- Mac翻译系列软件推荐三:Mate Translate for Mac多国语言翻译工具
- 贝叶斯分析好坏_交易必读|浅谈贝叶斯分析
- 尔雅国学智慧课后答案
- 计算机网络ip地址划分范围,ip地址分类及范围划分有哪些
- 10个提升写作手法的方法
- 年中Flag挑战日榜:最终挑战王会花落谁家?
- 什么样的岗位会最先被人工智能 (AI) 取代?
- cdp备份和oracle备份,CDP与快照:两种不同数据保护方法
- 开山ORC螺杆膨胀发电机
- Retrofit2网络框架的使用
- Elasticsearch7.7修改network.host IP地址 start启动失败
- win10鼠标右击 新建文件夹 反应缓慢、迟钝
- spring-cloud-gateway报错Failed to bind properties under ‘‘ to org.springframework.cloud.gateway.handle
- Unity3D客户端在游戏场景中创建阻挡并用二进制导出
- 计算机网络是显性课程还是隐性课程,显性课程与隐性课程
热门文章
- JS 对象(Object)和字符串(String)互转方法
- iphone如何信任软件_如何在越狱后 iPhone 上多开软件?
- The path ‘E:\ZERO‘ does not belong to a directory.
- 外键查询_详解MySQL数据库删除所有表的外键约束、禁用外键约束相关脚本
- unity怎么设置游戏页面_王者荣耀李小龙粤语语音包怎么得?李小龙粤语语音包获取与设置方法介绍[多图] - 游戏攻略...
- matlab画迟滞迥线,[画图的问题]怎么画类似于磁滞回线的图像?一个x值对应两个y值的...
- 项目管理六大制约因素_项目管理的制约因素
- 3dm游戏运行包_权势纵横捭阖,战场龙血玄黄!三国志14火爆来袭电脑游戏
- 实验方法怎么写_小学作文怎么写?“把短句变长句”等3种方法帮孩子提高作文水平!...
- java接口多态的变量能_「JAVA」多态的灵魂,面向接口的程序设计,这才是你该懂得的接口(interface)...