需要下载对应 spark-streaming-kafka-0-8-assembly jar包(版本要对于)

下载地址:
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8-assembly_2.11

一定要下载对应的assembly版本,不然不识别

版本对应说明比如:spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar
2.11是scale版本,2.4.4是spark版本
2.11可以查kafka版本 find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*' 看到像kafka_2.10-0.8.2-beta.jar这样的文件,其中2.10是Scala版本,0.8.2-beta是Kafka版本

代码区:

启动steaming程序后,zookeeper和kafka也启动起来
运行:

spark-submit --jars /Us****oads/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar pyspark01/pyspark_steaming02.py

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import os
import json
from pyspark.streaming.kafka import KafkaUtils
os.environ["PYSPARK_PYTHON"]="/Users/lonng/opt/anaconda3/python.app/Contents/MacOS/python"# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
sc.setLogLevel("OFF")
ssc = StreamingContext(sc, 5)# checkpoint一定要设置,否则报错
ssc.checkpoint("./")zookeeper = "localhost:2181"topic = {"test1": 1}
group_id = "test"
line1 = KafkaUtils.createStream(ssc, zookeeper, group_id, topic)
print(line1)
# lines = KafkaUtils.createDirectStream(ssc, ["hello"], {"metadata.broker.list": "127.0.0.1:9092"})
lines = line1.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKey(lambda a, b: a+b)
counts.pprint()
# print(lines)
# # # Split each line into words
# # words = lines.flatMap(lambda line: line.split(" "))
#
# # Count each word in each batch
# pairs = lines.map(lambda word: (word, 1))
# wordCounts = pairs.reduceByKey(lambda x, y: x + y)
#
# # Print the first ten elements of each RDD generated in this DStream to the console
# wordCounts.pprint()
# print(wordCounts)ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

pyspark steaming 连接kafka数据实时处理(也可以对接flume+kafka+spark)相关推荐

  1. kafka数据 落盘_终于知道Kafka为什么这么快了!

    " 无论 Kafka 作为 MQ 也好,还是作为存储层也罢,无非就是两个功能,一是 Producer 生产的数据存到 Broker,二是 Consumer 从 Broker 读取数据. 图片 ...

  2. kafka数据 落盘_Kafka架构原理?也就这么回事!

    本文主要讲解 Kafka 是什么.Kafka 的架构包括工作流程和存储机制,以及生产者和消费者. 最终大家会掌握 Kafka 中最重要的概念,分别是 Broker.Producer.Consumer. ...

  3. 大数据Hadoop学习系列之Hadoop、Spark学习路线

    1 Java基础: 视频方面:推荐毕老师<毕向东JAVA基础视频教程>. 学习hadoop不需要过度的深入,java学习到javase,在多线程和并行化多多理解实践即可. 书籍方面:推荐李 ...

  4. DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

    前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...

  5. Flume+kafka+flink+es 构建大数据实时处理

    大数据目前的处理方法有两种:一种是离线处理,一种是实时处理.如何构建我们自己的实时数据处理系统我们选用flume+kafka+flink+es来作为我们实时数据处理工具.因此我们的架构是: flume ...

  6. Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

    http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...

  7. tcp实时传输kafka数据_关于Kafka producer管理TCP连接的讨论

    在Kafka中,TCP连接的管理交由底层的Selector类(org.apache.kafka.common.network)来维护.Selector类定义了很多数据结构,其中最核心的当属java.n ...

  8. pyspark steaming常规语句及操作

    参考官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html pyspark steaming 流批处理,类str ...

  9. Cris 玩转大数据系列之消息队列神器 Kafka

    Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 文章目录 Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 1. Kafka 概述 1.1 消息队 ...

最新文章

  1. python中多重if语句用法_python – 在Pandas中使用Apply使用多个if语句的Lambda函数
  2. php vbcrlf,我是这样打造自已的“菜刀”的,让一句话飞一会
  3. linux更换网卡不识别_详解Linux双网卡绑定脚本的方法示例
  4. TensorFlow入门:第一个机器学习Demo
  5. 智能布线系统,“智”在何方
  6. c语言 隐式声明,关于C#:隐式函数声明和链接
  7. 04737 c++程序设计 第二章 课后程序设计题 第一题
  8. Oralce 导入dpm 文件数据
  9. AI CC2017安装后,安装目录里找不到amtlib.dll文件的问题
  10. 微信小程序合成海报_微信小程序生成海报实现方式
  11. 删除ttf字体文件中无用文字
  12. 那些年,我们一起追过的球队
  13. linux kvm切换器,PS2系列KVM切换器
  14. CPSR 和 SPSR
  15. 官方活动|最高可免费领60天会员时长
  16. linux 支持7代cpu,Intel第七代cpu有哪些型号
  17. solidworks宏按钮的制作
  18. 最小二乘拟合多项式(利用构造正交多项式的方法)C++
  19. 2012第三届蓝桥杯预赛题
  20. 袁永福对北京奥运会的评论

热门文章

  1. BigWorld Pty. Ltd.是一家全球领先的大型多人在线游戏(MMOG)开发解决方案供应商...
  2. 重磅!图森王乃岩团队最新工作—TridentNet:处理目标检测中尺度变化新思路
  3. 计算机显示器外壳怎么防水,电脑显示器怎么拆开外壳
  4. rm -rf 命令 与正则表达式
  5. 通过DCF模型对股票进行估值
  6. android项目修改名字(app名称),运行在移动设备和模拟器上的项目名字
  7. 读文献——《Curriculum learning》
  8. fragment内嵌webView,输入框获得焦点禁用系统输入法,弹出自定义输入法的处理
  9. 计算机四年级上册语文教案,四年级语文上册的教案
  10. 哈工大C语言程序设计精髓第十三周