Spark Streaming实现WordCount
利用Spark Streaming实现WordCount
需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。
1,需要安装一个nc工具:sudo yum install -y nc
2,执行指令:nc -lk 9999 -v
import os
#### 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = " " # pyspark 路径
JAVA_HOME=' ' # java 路径
SPARK_HOME = " " # spark 路径
#### 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOMEfrom pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ == "__main__":spark = SparkSession.builder.appName("xxx").getOrCreate()sc = spark.sparkContext#参数2:指定执行计算的时间间隔ssc = StreamingContext(sc, 1)#监听ip,端口上的上的数据lines = ssc.socketTextStream('localhost',9999)#将数据按空格进行拆分为多个单词words = lines.flatMap(lambda line: line.split(" "))#将单词转换为(单词,1)的形式pairs = words.map(lambda word:(word,1))#统计单词个数wordCounts = pairs.reduceByKey(lambda x,y:x+y)#打印结果信息,会使得前面的transformation操作执行wordCounts.pprint()#启动StreamingContextssc.start()#等待计算结束ssc.awaitTermination()
可视化查看效果:http://主机IP:4040
点击streaming,查看效果
Spark Streaming实现WordCount相关推荐
- Spark Streaming实时数据分析
1.Spark Streaming功能介绍 1)定义 Spark Streaming is an extension of the core Spark API that enables scalab ...
- Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率
一. 实战 1.用Spark Streaming实现实时WordCount 架构图: 说明:在hadoop1:9999下的nc上发送消息,消费端接收消息,然后并进行单词统计计算. * 2.安装并启动生 ...
- Spark Streaming 常见的输入数据源(以WordCount计算为例)
SparkStreaming中的数据抽象叫做DStream.DStream是抽象类,它把连续的数据流拆成很多的小RDD数据块, 这叫做"微批次", spark的流式处理, 都是 ...
- spark streaming 接收 kafka 数据java代码WordCount示例
1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming;import java.util.Prope ...
- Spark Streaming整合logstash + Kafka wordCount
1.安装logstash,直接解压即可 测试logstash是否可以正常运行 bin/logstash -e 'input { stdin { } } output { stdout {codec = ...
- DStream实战之Spark Streaming接收socket数据实现WordCount 31
需求 现在想要通过socket发送数据, 然后Spark Streaming接收数据并且统计socket发送的每个单词的个数. 1. 架构图 2. 实现流程 安装并启动生产者 首先在linux服务器上 ...
- Spark Streaming笔记整理(二):案例、SSC、数据源与自定义Receiver
[TOC] 实时WordCount案例 主要是监听网络端口中的数据,并实时进行wc的计算. Java版 测试代码如下: package cn.xpleaf.bigdata.spark.java.str ...
- Spark Streaming高级特性在NDCG计算实践
从storm到spark streaming,再到flink,流式计算得到长足发展, 依托于spark平台的spark streaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方 ...
- spark spark streaming + kafka receiver方式消费消息
2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...
最新文章
- Redis 为什么默认 16 个数据库?
- 解析gui-config.json出差_LUA解析json小demo
- 计算机科学家 成就,25年来的最高成就!MIT科学家让计算机提供创意,可自动设计机器人形态...
- 2015年部分互联网公司笔试综合题及答案
- qtabwidget放大_Qt自定义弹窗屏蔽父窗口(QWidget设置setWindowModality(Qt::ApplicationModal);以后再show)...
- 二级缓存失效_缓存核心技术:缓存穿透、缓存并发、缓存失效之思路变迁
- nand linux bbt存储,Linux NAND BBT管理
- 语言 高速公路超速处罚_880关注 拆除!高速公路不合理限速标志!
- c语言字符串截取_一文搞懂 C 语言 #、##、__VA_ARGS__
- Java中如何将以byte数组给出的数据转换为double数组形式
- 通过keepalived搭建lvs高可用集群
- Android -- proxy
- springSecurity+redis反序列化失败--problem deserializing ‘setterless‘ property (“authorities“)
- 征服spring源码(一)
- 浩辰CAD机械 2021,正式发布!
- 如何写好一个2Take1 Lua - 搭建Lua环境
- html图片高度撑开,CSS背景图撑开盒子高度
- Multisim3.8应用实例
- Ubuntu 20.04安装CUDA失败导致系统黑屏消息nvidia 0000:01:00.0: can‘t change power state from D3cold to D0 的解决方法
- 从零基础到高级程序员需要走多久?