利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

1,需要安装一个nc工具:sudo yum install -y nc

2,执行指令:nc -lk 9999 -v

import os
#### 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = " "    # pyspark 路径
JAVA_HOME=' '    # java 路径
SPARK_HOME = " "    # spark 路径
#### 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOMEfrom pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ == "__main__":spark = SparkSession.builder.appName("xxx").getOrCreate()sc = spark.sparkContext#参数2:指定执行计算的时间间隔ssc = StreamingContext(sc, 1)#监听ip,端口上的上的数据lines = ssc.socketTextStream('localhost',9999)#将数据按空格进行拆分为多个单词words = lines.flatMap(lambda line: line.split(" "))#将单词转换为(单词,1)的形式pairs = words.map(lambda word:(word,1))#统计单词个数wordCounts = pairs.reduceByKey(lambda x,y:x+y)#打印结果信息,会使得前面的transformation操作执行wordCounts.pprint()#启动StreamingContextssc.start()#等待计算结束ssc.awaitTermination()

可视化查看效果:http://主机IP:4040

点击streaming,查看效果

Spark Streaming实现WordCount相关推荐

  1. Spark Streaming实时数据分析

    1.Spark Streaming功能介绍 1)定义 Spark Streaming is an extension of the core Spark API that enables scalab ...

  2. Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率

    一. 实战 1.用Spark Streaming实现实时WordCount 架构图: 说明:在hadoop1:9999下的nc上发送消息,消费端接收消息,然后并进行单词统计计算. * 2.安装并启动生 ...

  3. Spark Streaming 常见的输入数据源(以WordCount计算为例)

      SparkStreaming中的数据抽象叫做DStream.DStream是抽象类,它把连续的数据流拆成很多的小RDD数据块, 这叫做"微批次", spark的流式处理, 都是 ...

  4. spark streaming 接收 kafka 数据java代码WordCount示例

    1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming;import java.util.Prope ...

  5. Spark Streaming整合logstash + Kafka wordCount

    1.安装logstash,直接解压即可 测试logstash是否可以正常运行 bin/logstash -e 'input { stdin { } } output { stdout {codec = ...

  6. DStream实战之Spark Streaming接收socket数据实现WordCount 31

    需求 现在想要通过socket发送数据, 然后Spark Streaming接收数据并且统计socket发送的每个单词的个数. 1. 架构图 2. 实现流程 安装并启动生产者 首先在linux服务器上 ...

  7. Spark Streaming笔记整理(二):案例、SSC、数据源与自定义Receiver

    [TOC] 实时WordCount案例 主要是监听网络端口中的数据,并实时进行wc的计算. Java版 测试代码如下: package cn.xpleaf.bigdata.spark.java.str ...

  8. Spark Streaming高级特性在NDCG计算实践

    从storm到spark streaming,再到flink,流式计算得到长足发展, 依托于spark平台的spark streaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方 ...

  9. spark spark streaming + kafka receiver方式消费消息

    2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...

最新文章

  1. Redis 为什么默认 16 个数据库?
  2. 解析gui-config.json出差_LUA解析json小demo
  3. 计算机科学家 成就,25年来的最高成就!MIT科学家让计算机提供创意,可自动设计机器人形态...
  4. 2015年部分互联网公司笔试综合题及答案
  5. qtabwidget放大_Qt自定义弹窗屏蔽父窗口(QWidget设置setWindowModality(Qt::ApplicationModal);以后再show)...
  6. 二级缓存失效_缓存核心技术:缓存穿透、缓存并发、缓存失效之思路变迁
  7. nand linux bbt存储,Linux NAND BBT管理
  8. 语言 高速公路超速处罚_880关注 拆除!高速公路不合理限速标志!
  9. c语言字符串截取_一文搞懂 C 语言 #、##、__VA_ARGS__
  10. Java中如何将以byte数组给出的数据转换为double数组形式
  11. 通过keepalived搭建lvs高可用集群
  12. Android -- proxy
  13. springSecurity+redis反序列化失败--problem deserializing ‘setterless‘ property (“authorities“)
  14. 征服spring源码(一)
  15. 浩辰CAD机械 2021,正式发布!
  16. 如何写好一个2Take1 Lua - 搭建Lua环境
  17. html图片高度撑开,CSS背景图撑开盒子高度
  18. Multisim3.8应用实例
  19. Ubuntu 20.04安装CUDA失败导致系统黑屏消息nvidia 0000:01:00.0: can‘t change power state from D3cold to D0 的解决方法
  20. 从零基础到高级程序员需要走多久?

热门文章

  1. UIKit should not be called from a secondary thread.
  2. xp的guest访问
  3. LeetCode 103. Binary Tree Zigzag Level Order Traversal
  4. Leetcode 1219.黄金矿工
  5. Cache计算的再总结
  6. pyplot设置刻度字体大小以及标签字体大小
  7. 多层full-connect 神经网络测试
  8. Numpy的学习6-深浅赋值(copydeep copy)
  9. 读取JSON文件并 排序,分组,
  10. python基础语法学习常见小问题