pyspark steaming常规语句及操作
参考官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html
pyspark steaming 流批处理,类strom、flink、kafak stream;核心抽象是Dstream,一个系列的rdd组成
案例:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import os
os.environ["PYSPARK_PYTHON"]="/Users/lonng/opt/anaconda3/python.app/Contents/MacOS/python"# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate#
# from pyspark import SparkContext
# from pyspark.streaming import StreamingContext
#
# sc = SparkContext(master, appName)
# ssc = StreamingContext(sc, 1)
1、命令行窗口先运行 nc -lk 9999 (socket连接)
2、运行上面代码,可以直接idea工具里pycharm运行
3、可以web看过程 http://localhost:4040/streaming/
现在程序就1秒每次批处理进行监听;现在在刚打开的 nc -lk 9999 窗口可以一行一行的传入数据,spark streaming安装刚程序编写的处理逻辑进行处理数据了,然后web里也能看到记录
pyspark steaming常规语句及操作相关推荐
- 连接excel执行Insert Into语句出现“操作必须使用一个可更新的查询”的解决
C#使用oledb连接excel执行Insert Into语句出现"操作必须使用一个可更新的查询"的解决办法 我发生错误时的环境:Windows 7,Framework 4.0,M ...
- mysql怎么给表设置查询语句_MySQL查询语句简单操作示例
本文实例讲述了MySQL查询语句简单操作.分享给大家供大家参考,具体如下: 查询 创建数据库.数据表 -- 创建数据库 create database python_test_1 charset=ut ...
- mongodb常用语句(集合操作)
mongodb常用语句(集合操作) 查看集合帮助 db.songs.help(); 查看集合总数据量 db.songs.count(); 查看表空间大小 db.songs.dataSize(); 查看 ...
- DOTNET零碎总结---VB.NET修改数据存在多个txtbox时,SQL语句的操作
2.DOTNET零碎总结---VB.NET修改数据存在多个txtbox时,SQL语句的操作 1. 一个Button1的text为查询和一个DataGridView1,点击查询按钮的代码 Private ...
- Kotlin入门(7)循环语句的操作
上一篇文章介绍了简单分支与多路分支的实现,控制语句除了这两种条件分支之外,还有对循环处理的控制,那么本文接下来继续阐述Kotlin如何对循环语句进行操作. Koltin处理循环语句依旧采纳了for和w ...
- mysql查询性别语句_MySQL查询语句简单操作示例
本文实例讲述了MySQL查询语句简单操作.分享给大家供大家参考,具体如下: 查询 -- 创建数据库 create database python_test_1 charset=utf8; -- 使用数 ...
- MySQL---关于数据表的基本语句及操作 (上)
目录 一.MySQL中的数据类型 二.MySQL数据表的创建 三.MySQL对表的一些操作 1.向表中添加数据 2.修改表中的数据 3.删除表中的数据 4.查询表中的数据 4.1模糊查询: 4.2计数 ...
- 数据库DML语句:数据库操作语句
DML语句:数据库操作语句 DML语句之插入表记录 插入数据的语法1:insert into 表名 values(值1,值2,值3,值4...值n); 插入全部数据 INSERT INTO stude ...
- 项目名称:银行ATM存取款机系统设计与实现(sql语句模拟操作)
项目名称:银行ATM存取款机系统设计与实现 一.创建数据库....................................................................... ...
最新文章
- 返岗上班应该注意什么?五个细节必须牢记
- nginx 配置文件nginx.conf结构
- arcgis dem栅格立体感_如何使用ArcGIS从DEM数据中提取水系
- Spring的配置文件详解
- java的class和object_Java中Class/Object/T的关系
- excel转txt工具
- Linux安装redis和部署
- C++ 结构体字节对齐
- WIN11右键菜单默认展开
- 做视频自媒体,选择一个合适的剪辑软件很重要,这些或许适合你
- VR分享会邀请函 | 如何利用VR影像创造商业应用新价值?
- 基于python的购物比价_python比价
- 视频号拍摄技巧和制作方法有哪些?
- tyflow雨滴在物体上滑落测试
- 摄影曝光口诀_通过学习曝光元素来改善摄影
- android第三方应用商店,Android第三方应用商店成长迅猛
- 《蓝桥杯CT107D单片机竞赛板》:蜂鸣器模块
- 异步电动机双闭环矢量控制SVPWM
- 当一个对象被当作参数传递到一个方法后,此方法可改变这个对象的属性,并可返回变化后的结果,那么这里到底是值传递还是引用传递
- Android小闹钟程序【安卓进化十三】