kafka--Struct Streaming--kafka案例
需求
从kafka读取数据,用Struct Streaming处理,然后保存到kafka
过滤出含有Comedy的行,再送给kafka
kafka数据格式如下
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
案例
启动kafka生产者
kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
启动消费者
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
代码如下
import org.apache.spark.sql.{DataFrame, SparkSession}object KafkaSink extends App {//构建Spark Sessionprivate val session: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()private val df: DataFrame = session.readStream.format("kafka").option("kafka.bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092").option("subscribe", "pet").option("startingOffsets", "latest").load()import session.implicits._
//过滤行,并转为valueprivate val df1: DataFrame = df.selectExpr("cast(value as String)").as[String].filter((_.contains("Comedy"))).toDF("test").selectExpr("test as value")
//输出到kafkadf1.writeStream.format("kafka").option("checkpointLocation", "out3").option("topic", "pet").option("kafka.bootstrap.servers", "mypc01:9092").start().awaitTermination()
}
注意事项
输出的内容要合并为一个,否则会报错. 整体给起个别名value…必须的 坑贼大
Required attribute 'value' not found;
kafka--Struct Streaming--kafka案例相关推荐
- Spark Streaming 实战案例(五) Spark Streaming与Kafka
主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0
如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装
一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...
- spark spark streaming + kafka receiver方式消费消息
2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...
- Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费
一.Spark Streaming Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理.数据可以从许多来源(如Kafka,Flume,Kine ...
- kafka channle的应用案例
kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...
- spark streaming kafka Couldn't find leader
问题描述: 使用spark streaming接受kafka数据(使用direct方式)报错 Couldn't find leader offsets for Set([test,0], [test, ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...
Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...
- 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题
问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...
最新文章
- scala言语基础学习八
- jQuery练习---- 超简单的表格悬停变色Demo
- 将python3.7降为3.5_python3.7降至3.5【python cookbook】python访问子字符串
- python语言百分号的含义_python【百分号】
- 2019.01.22【NOIP普及组】模拟赛C组总结
- C语言,期末复习之穷举法鸡兔同笼问题
- 禁用UpdateOrchestrator重新启动任务
- java的tcp通信项目_java实现TCP通信
- 公差带与配合 配合选择基础
- ArcGIS裁剪影像如何保持裁剪完全一致
- 大型网站技术架构-第4篇 架构师
- Photoshop 使用技巧
- 基于stm32f303cbt6的点灯实验(硬件+软件)
- 几款好用证件照制作工具推荐
- 信用卡被风控的原因是什么?如何应对风控?
- 从PaaS到GaaS,蔚领时代的“云”上愿景
- content root修改问题
- 如何重命名WordPress WP内容目录
- 单片机自制时钟(年月日星期时分秒显示、按键校准)
- 雷达数据 障碍物判断_数据科学的进入障碍
热门文章
- mysql+last_query_cost_辛星简译MySQL中的last_query_cost
- mysql explain索引_mysql 索引+explain
- python最大公约数计算。从键盘接收两个整数_python如何求解两数的最大公约数
- ubuntu linux 系统搭建我的世界基岩版 私服我的世界服务器
- vue3.0 抽奖 小功能
- Android常见界面布局(详细介绍)
- python输出图像plt_Matplotlib(pyplot)savefig输出空白图像
- 字符串当id用 转换成json对象
- python中tile的用法_Python:numpy中的tile函数
- viturbox网卡驱动_VirtualBox Host