sparksteaming的idea配置及入门程序
sparksteaming原理图
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>doit23_spark</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.0.1</spark.version><encoding>UTF-8</encoding><kafka.version>2.6.2</kafka.version></properties><dependencies><!-- 导入spark的依赖,core指的是RDD编程API --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><!-- 导入spark sql的依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- 导入spark streaming的依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><!-- 导入scala的依赖 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.10</version></dependency><!-- kafka依赖jar包--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- 导入mysql的依赖 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.48</version></dependency><!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><!-- 时间的工具类jar包 --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.9.2</version></dependency><!-- spark整合hive --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><!--导入redis的客户端jedis jar包--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.1</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/druid --><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.20</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/druid --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.2.5</version></dependency></dependencies><build><pluginManagement><plugins><!-- 编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><!-- 编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>
wordcount入门程序(不累加历史数据)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object _01_SparkStreaming_WordCount_01 {def main(args: Array[String]): Unit = {//创建sparkStreaming的对象val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))//5秒中处理一次//注意需要在linux上安装nc yum install -y nc//另外,需要先开启nc -lk 8888//创建抽象数据集val dstream: DStream[String] = ssc.socketTextStream("centos01", 8888)//先不对历史数据进行累加val words: DStream[String] = dstream.flatMap(_.split(" "))val wordAndOne = words.map((_, 1))val reduced = wordAndOne.reduceByKey(_ + _)//触发actionreduced.print()//将程序启动ssc.start()//让Driver挂起ssc.awaitTermination()}}
wordcount入门程序(累加历史数据)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object _01_SparkStreaming_WordCount_02 {def main(args: Array[String]): Unit = {//创建sparkStreaming的对象val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))//5秒中处理一次//需要注意的是,因为有历史累加的数据,不能丢失,因此需要定期写入ssc.checkpoint("./spark")//注意需要在linux上安装nc yum install -y nc//另外,需要先开启nc -lk 8888//创建抽象数据集val dstream: DStream[String] = ssc.socketTextStream("centos01", 8888)//对历史数据进行累加val words: DStream[String] = dstream.flatMap(_.split(" "))val wordAndOne = words.map((_, 1))//reduceByKey不能对历史数据进行累加//val reduced = wordAndOne.reduceByKey(_ + _)//updateFunc: (Seq[V], Option[S]) => Option[S]/*Seq[Int] 表示当前批次key值相同的value,因为有多个分区,所以放在一个seq中Option[Int] 初始值或者是中间累加的历史数据,因此可能不存在*/val updateFun = (seq:Seq[Int], op:Option[Int]) => {Some(seq.sum + op.getOrElse(0))}val reduced = wordAndOne.updateStateByKey(updateFun)//触发actionreduced.print()//将程序启动ssc.start()//让Driver挂起ssc.awaitTermination()}}
sparksteaming的idea配置及入门程序相关推荐
- Mahout学习之Mahout简介、安装、配置、入门程序测试
一.Mahout简介 查了Mahout的中文意思--驭象的人,再看看Mahout的logo,好吧,想和小黄象happy地玩耍,得顺便陪陪这位驭象人耍耍了... 附logo: (就是他,骑在象头上的那个 ...
- 详细程序注解学OpenCL一 环境配置和入门程序
本专栏是通过注解程序的方法学习OpenCL,我觉得一个一个地去抠原理也不是办法,干脆直接学习程序,然后把相关原理都直接注解到程序语句当中. 原创地址:http://blog.csdn.net/kend ...
- 用C#进行ArcGIS 10 Engine 开发 - 安装配置和入门程序说明
一 安装 1 其安装包解压后如下图: 文件比较大,有3G多,无法上传:如果网上下不到,可联系鄙人q号513979805,发一份给你:不过上图的授权文件我记不清哪个能用了,不行的话需要自己在网上找可 ...
- 图文解说OpenCV开发一 - 环境配置和入门程序详解
1 我用的是OpenCV 2.4.3版本,当前最新版本已经比这个新了:安装好的目录结构如下图: OpenCV 2.4.3的安装包可以到我网盘下载: http://pan.baidu.com/s/1kT ...
- Java大数据学习第一天---DOS命令、jdk安装及环境配置、入门程序
达内慕课网 网址:www.tmooc.cn 账号:qq邮箱 密码:身份证后四位+手机后四位 计算机功能键及快捷键 功能 指令 屏幕截图 PrtSc QQ截图 CTRL+ALT+A 全选 CTRL+A ...
- C# OpenGL 环境配置和入门程序
效果: 环境:Win10 + vs2015: 需要2个东西: 下载tao库: http://www.taoframework.com 直接安装: FreeGlut.dll FreeGlut.dll ...
- JavaWeb——MyBatis入门程序
一.引言 一般MyBatis与springMVC常常一起使用,而且与hibernate相比有着天然的优势,继续推进. MyBatis应用程序根据XML配置文件创建SqlSessionFactory,S ...
- ijidea搭建springMVC入门程序,配置TomCat
ijidea搭建springMVC入门程序,适用于超级新手.保姆教程 1. 2. 3. 4.输入可以更快创建项目,不需要去下载 archetypeCatalog internal 5.此时的目录结构: ...
- 首页布局跟小程序如何配置Iconfont—小程序入门与实战(七)
4-1 经过上一章节的学习我们已经搭建了项目的整个框架,雏形已成.接下来我们只需要的往里面堆东西就可以了. 设计稿网址(14天分享有效,过期的可以找我拿): https://lanhuapp.com/ ...
最新文章
- WebAssembly Studio:Mozilla提供的WASM工具
- Xcode升级到8之后的一些需要我们手动配置的地方
- android 不同activity之间传递数据
- RPC 的概念模型与实现解析
- 网站域名解析端口_Linux Nginx网站服务——2
- native react 图片多选_开源一个图片组件 react-native-border-radius-image
- Python学习笔记__4.1章 高阶函数
- C#判断闰年函数及举例
- 开源关系型数据库架构
- 第57条:将局部变量的作用域最小化
- react中使用antd按需加载(第一部)
- C# CAD二次开发之字体替换 文字样式 处理DBText
- CRC校验工具----CRC8校验 (x8+x2+x+1)
- 小团团云上城在哪个服务器,云上城之歌小团团
- 云计算采用的各种虚拟化技术比较
- 鼠标画上去图片旋转360度
- RC522读取NFC Forum Type2 Tag流程及代码解析——Mifare Ultralight卡片读取(采用PHY6212平台,可移植)
- Java8获取年、月、周数据和某一天的开始结束时间
- Python图像增强之高斯模糊、中值模糊、均值模糊
- 【Python 实战基础】Pandas如何输出表格数据标题名称列表