SparkSteaming使用

环境设置

首先确保已经按安装Spark,使用maven构建工程。

在pox.xml中添加:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>SparkStreaming</groupId><artifactId>NetworkWordCount</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><packaging>jar</packaging><repositories><repository><id>alimaven</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><dependencies><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency></dependencies></project>

会自动添加依赖

WordCount

编写程序

这里我们监听一个本地的端口( TCP source),例如9999,将该端口的数据作为WordCount输入,然后使用与spark一样的操作进行处理即可。

需要注意,这里的local[2]表示两个worker线程:

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;import java.util.Arrays;public class NetworkWordCount {public static void main(String[] args) throws Exception{// Create a local StreamingContext with two working thread and batch interval of 1 secondSparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(20));// Create a DStream that will connect to hostname:port, like localhost:9999JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);// Split each line into wordsJavaDStream<String> words = lines.flatMap(x-> Arrays.asList(x.split(" ")).iterator());// Count each word in each batchJavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a+b);// Print the first ten elements of each RDD generated in this DStream to the consolewordCounts.print();jssc.start();              // Start the computationjssc.awaitTermination();   // Wait for the computation to terminate}}

运行程序

首先我们需要启动Netcat 9999端口号作为数据源:

nc -lk 9999

然后将打包好的JAR包提交到本地集群:

spark-submit --class NetworkWordCount NetworkWordCount.jar localhost 9999

在端口中输入数据,则sparkSteaming会统计20s内的词频:

SparkSteaming使用相关推荐

  1. Spark基础(五)SparkSteaming

    Spark基础(四)SparkSteaming 从批处理到流处理 批处理 流处理 批处理和流处理之间差异的含义 Spark Steaming设计 SparkSteaming与Flume Flume中的 ...

  2. SparkSteaming整合Kafka的方式

    1.基于Receiver方式 这种方式构建出的DStream有一个接收者Receiver,通过这个接收者将数据保存在Executor中. 这种方式是需要独享CPU的core,也就是说需要独立占用若干个 ...

  3. SparkSteaming细节问题

    batch interval可以根据你的应用程序的延迟要求以及可用的集群资源情况来设置. 注意如果虚拟机/服务器配置不行,这个时间不能设置太短,否则SparkSteaming会跑不起来.

  4. Spark系列十七:经典案列使用直连的方式,Kafka,SparkSteaming,Redis

    先一个一个java程序,读取日志文件中的数据,然后将数据写入到Kafka中,然后写一个SparkSteaming程序,使用直连的方式读取Kafka中的数据,计算如下指标 该文件是一个电商网站某一天用户 ...

  5. sparksteaming的idea配置及入门程序

    sparksteaming原理图 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project ...

  6. SparkSteaming程序异常问题排查步骤

    SparkSteaming程序异常问题排查步骤 程序运行环境:Spark on yarn  cluster环境 异常问题排查步骤: 1.首先在yarn页面查看程序的运行情况,点击running页面查看 ...

  7. kafka sparksteaming

    代码部分: 请合理的尊重原创作者的辛苦付出. 原文链接:https://github.com/wangliangbd/SparkStreaming_Store_KafkaTopicOffset_To_ ...

  8. 实时计算与SparkSteaming的对比

    实时计算概述 实时计算 阿里云实时计算(Alibaba Cloud Realtime Compute)是一套基于Apache Flink构建的一站式.高性能实时大数据处理平台,广泛适应于流式数据处理. ...

  9. 如何收集SparkSteaming运行日志实时进入kafka中

    用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分. 这里的log分: (1 ...

最新文章

  1. 两阶段提交(Two-Phase Commit)
  2. jsp中 input placeholder_前端工作中的方法总结
  3. CssSelector之selenium元素定位
  4. case class到底啥用
  5. qt 复制字符串_QT中字符串的转化与拼接
  6. 广度优先搜索——USACO08FEB(洛谷 P2895)
  7. 二十个让你泪流满面的瞬间
  8. Java、JavaScript和JScript
  9. 关于RTB实时竞价的基本原理
  10. ASP.NET Web API 2框架揭秘
  11. php的入门是html5,h5自学教程:6个适合初学者的零基础html5入门自学教程推荐
  12. 斯蒂文斯理工学院计算机排名,2020年斯蒂文斯理工学院QS世界排名
  13. 数据库笔试面试(第一版)——根据题目完成以下50道SQL语句
  14. PEGA 十合一吉他控制器连接电脑玩《吉他英雄3》
  15. IGBT静态参数测试
  16. 什么情况下你会毫不犹豫地辞职?
  17. [siggraph13]《巫师3》角色渲染
  18. 判断手机音量大小,做出提醒
  19. 【Atlas 200 DK】(一)简介 Atlas 200 DK 开发者套件(型号:3000)
  20. 淘宝理财 中证500 中证300 基金收益计算

热门文章

  1. CLIP:从自然语言监督中学习可迁移的视觉模型
  2. 生鲜配送系统有哪些功能?搭建生鲜配送系统有哪些好处?
  3. 计算机开机显示器闪,电脑开机后显示器闪烁怎么办
  4. 计算机组成原理学习笔记一
  5. MiniGUI学习日记一----MiniGUI基础编程篇
  6. 教程篇(7.0) 04. FortiGate安全 NAT ❀ Fortinet 网络安全专家 NSE 4
  7. 2017网易雷火实习生招聘编程题
  8. 互联网人群画像和你所不知道的真相
  9. 一阶矩+二阶矩估计求解一个参数
  10. 记一次 打包报错:Keystore was tampered with, or password was incorrect