SparkSteaming使用
SparkSteaming使用
环境设置
首先确保已经按安装Spark
,使用maven构建工程。
在pox.xml中添加:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>SparkStreaming</groupId><artifactId>NetworkWordCount</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><packaging>jar</packaging><repositories><repository><id>alimaven</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><dependencies><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency></dependencies></project>
会自动添加依赖
WordCount
编写程序
这里我们监听一个本地的端口( TCP source),例如9999
,将该端口的数据作为WordCount输入,然后使用与spark一样的操作进行处理即可。
需要注意,这里的local[2]
表示两个worker线程:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;import java.util.Arrays;public class NetworkWordCount {public static void main(String[] args) throws Exception{// Create a local StreamingContext with two working thread and batch interval of 1 secondSparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(20));// Create a DStream that will connect to hostname:port, like localhost:9999JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);// Split each line into wordsJavaDStream<String> words = lines.flatMap(x-> Arrays.asList(x.split(" ")).iterator());// Count each word in each batchJavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a+b);// Print the first ten elements of each RDD generated in this DStream to the consolewordCounts.print();jssc.start(); // Start the computationjssc.awaitTermination(); // Wait for the computation to terminate}}
运行程序
首先我们需要启动Netcat 9999端口号作为数据源:
nc -lk 9999
然后将打包好的JAR包提交到本地集群:
spark-submit --class NetworkWordCount NetworkWordCount.jar localhost 9999
在端口中输入数据,则sparkSteaming会统计20s内的词频:
SparkSteaming使用相关推荐
- Spark基础(五)SparkSteaming
Spark基础(四)SparkSteaming 从批处理到流处理 批处理 流处理 批处理和流处理之间差异的含义 Spark Steaming设计 SparkSteaming与Flume Flume中的 ...
- SparkSteaming整合Kafka的方式
1.基于Receiver方式 这种方式构建出的DStream有一个接收者Receiver,通过这个接收者将数据保存在Executor中. 这种方式是需要独享CPU的core,也就是说需要独立占用若干个 ...
- SparkSteaming细节问题
batch interval可以根据你的应用程序的延迟要求以及可用的集群资源情况来设置. 注意如果虚拟机/服务器配置不行,这个时间不能设置太短,否则SparkSteaming会跑不起来.
- Spark系列十七:经典案列使用直连的方式,Kafka,SparkSteaming,Redis
先一个一个java程序,读取日志文件中的数据,然后将数据写入到Kafka中,然后写一个SparkSteaming程序,使用直连的方式读取Kafka中的数据,计算如下指标 该文件是一个电商网站某一天用户 ...
- sparksteaming的idea配置及入门程序
sparksteaming原理图 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project ...
- SparkSteaming程序异常问题排查步骤
SparkSteaming程序异常问题排查步骤 程序运行环境:Spark on yarn cluster环境 异常问题排查步骤: 1.首先在yarn页面查看程序的运行情况,点击running页面查看 ...
- kafka sparksteaming
代码部分: 请合理的尊重原创作者的辛苦付出. 原文链接:https://github.com/wangliangbd/SparkStreaming_Store_KafkaTopicOffset_To_ ...
- 实时计算与SparkSteaming的对比
实时计算概述 实时计算 阿里云实时计算(Alibaba Cloud Realtime Compute)是一套基于Apache Flink构建的一站式.高性能实时大数据处理平台,广泛适应于流式数据处理. ...
- 如何收集SparkSteaming运行日志实时进入kafka中
用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分. 这里的log分: (1 ...
最新文章
- 两阶段提交(Two-Phase Commit)
- jsp中 input placeholder_前端工作中的方法总结
- CssSelector之selenium元素定位
- case class到底啥用
- qt 复制字符串_QT中字符串的转化与拼接
- 广度优先搜索——USACO08FEB(洛谷 P2895)
- 二十个让你泪流满面的瞬间
- Java、JavaScript和JScript
- 关于RTB实时竞价的基本原理
- ASP.NET Web API 2框架揭秘
- php的入门是html5,h5自学教程:6个适合初学者的零基础html5入门自学教程推荐
- 斯蒂文斯理工学院计算机排名,2020年斯蒂文斯理工学院QS世界排名
- 数据库笔试面试(第一版)——根据题目完成以下50道SQL语句
- PEGA 十合一吉他控制器连接电脑玩《吉他英雄3》
- IGBT静态参数测试
- 什么情况下你会毫不犹豫地辞职?
- [siggraph13]《巫师3》角色渲染
- 判断手机音量大小,做出提醒
- 【Atlas 200 DK】(一)简介 Atlas 200 DK 开发者套件(型号:3000)
- 淘宝理财 中证500 中证300 基金收益计算
热门文章
- CLIP:从自然语言监督中学习可迁移的视觉模型
- 生鲜配送系统有哪些功能?搭建生鲜配送系统有哪些好处?
- 计算机开机显示器闪,电脑开机后显示器闪烁怎么办
- 计算机组成原理学习笔记一
- MiniGUI学习日记一----MiniGUI基础编程篇
- 教程篇(7.0) 04. FortiGate安全 NAT ❀ Fortinet 网络安全专家 NSE 4
- 2017网易雷火实习生招聘编程题
- 互联网人群画像和你所不知道的真相
- 一阶矩+二阶矩估计求解一个参数
- 记一次 打包报错:Keystore was tampered with, or password was incorrect