需求

从kafka读取数据,用Struct Streaming处理,然后保存到kafka
过滤出含有Comedy的行,再送给kafka
kafka数据格式如下

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action

案例

启动kafka生产者

kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

启动消费者

kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

代码如下

import org.apache.spark.sql.{DataFrame, SparkSession}object KafkaSink extends App {//构建Spark Sessionprivate val session: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()private val df: DataFrame = session.readStream.format("kafka").option("kafka.bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092").option("subscribe", "pet").option("startingOffsets", "latest").load()import session.implicits._
//过滤行,并转为valueprivate val df1: DataFrame = df.selectExpr("cast(value as String)").as[String].filter((_.contains("Comedy"))).toDF("test").selectExpr("test as value")
//输出到kafkadf1.writeStream.format("kafka").option("checkpointLocation", "out3").option("topic", "pet").option("kafka.bootstrap.servers", "mypc01:9092").start().awaitTermination()
}

注意事项

输出的内容要合并为一个,否则会报错. 整体给起个别名value…必须的 坑贼大

Required attribute 'value' not found;

kafka--Struct Streaming--kafka案例相关推荐

  1. Spark Streaming 实战案例(五) Spark Streaming与Kafka

    主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...

  2. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  3. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  4. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

  5. spark spark streaming + kafka receiver方式消费消息

    2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...

  6. Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费

    一.Spark Streaming Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理.数据可以从许多来源(如Kafka,Flume,Kine ...

  7. kafka channle的应用案例

      kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...

  8. spark streaming kafka Couldn't find leader

    问题描述: 使用spark streaming接受kafka数据(使用direct方式)报错 Couldn't find leader offsets for Set([test,0], [test, ...

  9. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...

    Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...

  10. 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题

    问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...

最新文章

  1. scala言语基础学习八
  2. jQuery练习---- 超简单的表格悬停变色Demo
  3. 将python3.7降为3.5_python3.7降至3.5【python cookbook】python访问子字符串
  4. python语言百分号的含义_python【百分号】
  5. 2019.01.22【NOIP普及组】模拟赛C组总结
  6. C语言,期末复习之穷举法鸡兔同笼问题
  7. 禁用UpdateOrchestrator重新启动任务
  8. java的tcp通信项目_java实现TCP通信
  9. 公差带与配合 配合选择基础
  10. ArcGIS裁剪影像如何保持裁剪完全一致
  11. 大型网站技术架构-第4篇 架构师
  12. Photoshop 使用技巧
  13. 基于stm32f303cbt6的点灯实验(硬件+软件)
  14. 几款好用证件照制作工具推荐
  15. 信用卡被风控的原因是什么?如何应对风控?
  16. 从PaaS到GaaS,蔚领时代的“云”上愿景
  17. content root修改问题
  18. 如何重命名WordPress WP内容目录
  19. 单片机自制时钟(年月日星期时分秒显示、按键校准)
  20. 雷达数据 障碍物判断_数据科学的进入障碍

热门文章

  1. mysql+last_query_cost_辛星简译MySQL中的last_query_cost
  2. mysql explain索引_mysql 索引+explain
  3. python最大公约数计算。从键盘接收两个整数_python如何求解两数的最大公约数
  4. ubuntu linux 系统搭建我的世界基岩版 私服我的世界服务器
  5. vue3.0 抽奖 小功能
  6. Android常见界面布局(详细介绍)
  7. python输出图像plt_Matplotlib(pyplot)savefig输出空白图像
  8. 字符串当id用 转换成json对象
  9. python中tile的用法_Python:numpy中的tile函数
  10. viturbox网卡驱动_VirtualBox Host