第1关:QueueStream

编程要求

在右侧编辑器补充代码,完成以下需求:

  • 将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss

  • 提取数据中的起始URL(切割符为空格)

  • 拼接结果数据,格式如下:

  1. Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl: https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200
    package net.educoder;import org.apache.spark.SparkConf;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.LinkedBlockingQueue;public class Step1 {private static SparkConf conf;static {conf = new SparkConf().setMaster("local[*]").setAppName("Step1");conf.set("spark.streaming.stopGracefullyOnShutdown", "true");}public static void main(String[] args) throws InterruptedException {/*********begin*********///1.初始化JavaStreamingContext并设置处理批次的时间间隔,Durations.seconds(1)  --> 1秒一个批次JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));//2.获取QueueStream流LinkedBlockingQueue queue = QueueStream.queueStream(ssc);JavaDStream<String> dStream = ssc.queueStream(queue);/**** 数据格式如下:*      100.143.124.29,1509116285000,'GET www/1 HTTP/1.0',https://www.baidu.com/s?wd=反叛的鲁鲁修,404* 数据从左往右分别代表:用户IP、访问时间戳、起始URL及相关信息(访问方式,起始URL,http版本)、目标URL、状态码*** 原始数据的切割符为逗号,(英文逗号)** 需求:*      1.将时间戳转换成规定时间(格式为:yyyy-MM-dd HH:mm:ss )*      2.提取数据中的起始URL(切割符为空格)*      3.拼接结果数据,格式如下:* Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200*      4.判断rdd是否为空,如果为空,调用  ssc.stop(false, false)与sys.exit(0) 两个方法,反之将结果数据存储到mysql数据库中,调用JdbcTools.saveData(Iterator[String])即可*///3.获取队列流中的数据,进行清洗、转换(按照上面的需求)SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");JavaDStream<String> map = dStream.map(x -> {String[] split = x.split(",");String ip = split[0];String time = simpleDateFormat.format(new Date(new Long(split[1])));String startUrl = split[2].split(" ")[1];String targetUrl = split[3];String statusCode = split[4];return "Ip:" + ip + ",visitTime:" + time + ",startUrl:" + startUrl + ",targetUrl:" + targetUrl + ",statusCode:" + statusCode;});//4.判断rdd是否为空,如果为空,调用  ssc.stop(false, false)与sys.exit(0) 两个方法,反之将结果数据存储到mysql数据库中,调用JdbcTools.saveData(Iterator[String])即可map.foreachRDD(rdd -> {if (rdd.isEmpty()) {ssc.stop(false, false);System.exit(1);} else {rdd.foreachPartition(partitionOfRecords -> {JdbcTools.saveData(partitionOfRecords);});}});//5.启动SparkStreamingssc.start();//6.等待计算结束ssc.awaitTermination();/*********end*********/}}

第2关:File Streams

编程要求

在右侧编辑器中补全代码,要求如下:

  • /root/step11_fils下有两个文件,文件内容分别为:
  1. hadoop hadoop hadoop hadoop hadoop hadoop hadoop hadoop spark spark
  2. hello hello hello hello hello hello hello hello study study
  • 要求清洗数据并实时统计单词个数,并将最终结果导入MySQL

step表结构:

列名 数据类型 长度 非空
word varchar 255
count int 255
    package com.educoder;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.streaming.Duration;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.io.Serializable;import java.sql.*;import java.util.Arrays;import java.util.Iterator;public class SparkStreaming  {public static void main(String[] args) throws Exception {SparkConf conf=new SparkConf().setAppName("edu").setMaster("local");/********** Beign **********///1.初始化StreamingContext,设置时间间隔为1sJavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));//2.设置文件流,监控目录/root/step11_filsJavaDStream<String> DStream = ssc.textFileStream("file:///root/step11_fils");/* *数据格式如下:hadoop hadoop spark spark*切割符为空格*需求:*累加各个批次单词出现的次数*将结果导入Mysql*判断MySQL表中是否存在即将要插入的单词,不存在就直接插入,存在则把先前出现的次数与本次出现的次数相加后插入*库名用educoder,表名用step,单词字段名用word,出现次数字段用count*///3.对数据进行清洗转换JavaPairDStream<String, Integer> wordcount = DStream.flatMap(x -> Arrays.asList(x.split(" ")).iterator()).mapToPair(x -> new Tuple2<String, Integer>(x, 1)).reduceByKey((x, y) -> x + y);//4.将结果导入MySQL,wordcount.foreachRDD(rdd->{rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {@Overridepublic void call(Iterator<Tuple2<String, Integer>> r) throws Exception {Connection connection=  myconn();while(r.hasNext()){Tuple2<String, Integer> record = r.next();String querySql = "SELECT t.count FROM step t WHERE t.word = '" + record._1 + "'";ResultSet   queryResultSet = connection.createStatement().executeQuery(querySql);Boolean hasNext = queryResultSet.next();if (!hasNext) {String insertSql = "insert into step(word,count) values('" + record._1 + "'," + record._2 + ")";connection.createStatement().execute(insertSql);} else {Integer newWordCount = queryResultSet.getInt("count") + record._2;String updateSql = "UPDATE step SET count = " + newWordCount + " where word = '" + record._1 + "'";connection.createStatement().execute(updateSql);}}connection.close();}});});//5.启动SparkStreamingssc.start();/********** End **********/Thread.sleep(15000);ssc.stop();ssc.awaitTermination();}/***获取mysql连接*@return*/public static Connection myconn()throws SQLException,Exception{Class.forName("com.mysql.jdbc.Driver");Connection conn= DriverManager.getConnection("jdbc:mysql://localhost:3306/educoder","root","123123");return conn;}}

第3关:socketTextStream

编程要求

在右侧编辑器中补全代码,要求如下:

  • 要求清洗数据并实时统计单词个数,并将最终结果导入MySQL

word表结构

列名 数据类型 长度 主键 非空
word varchar 255
wordcount int 11
    package com;import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.Optional;import org.apache.spark.api.java.function.Function2;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.sql.Connection;import java.sql.DriverManager;public class JSocketSpark {public static void main(String[] args) throws InterruptedException{SparkConf conf = new SparkConf().setAppName("socketSparkStreaming").setMaster("local[*]");conf.set("spark.streaming.stopGracefullyOnShutdown", "true");JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));/**********begin**********///1.连接socket流 主机名:localhost 端口:5566JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 5566);//2.切分压平JavaDStream<String> rdd1 = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());//3.组装JavaPairDStream<String, Integer> rdd2 = rdd1.mapToPair(x -> new Tuple2<>(x, 1));//4.设置检查点ssc.checkpoint("/root/check");//5.每个时间窗口内得到的统计值都累加到上个时间窗口得到的值,将返回结果命名为reducedJavaPairDStream<String, Integer> rdd3 = rdd2.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)@Overridepublic Optional<Integer> call(List<Integer> values, Optional<Integer> state)throws Exception {//第一个参数就是key传进来的数据,第二个参数是曾经已有的数据//如果第一次,state没有,updatedValue为0,如果有,就获取Integer updatedValue = 0;if (state.isPresent()) {updatedValue = state.get();}//遍历batch传进来的数据可以一直加,随着时间的流式会不断去累加相同key的value的结果。for (Integer value : values) {updatedValue += value;}return Optional.of(updatedValue);//返回更新的值}});//6.将结果写入MySQL// 语法:如果存在这个单词就更新它所对应的次数//      如果不存在将其添加rdd3.foreachRDD(rdd -> {rdd.foreachPartition(x -> {Connection myconn = myconn();while (x.hasNext()){Tuple2<String, Integer> record = x.next();String sql = "insert into wordcount (word,wordcount) values('" + record._1 + "',"+record._2+") on DUPLICATE key update wordcount="+record._2;myconn.createStatement().execute(sql);}myconn.close();});});/********** End **********/ssc.start();ssc.awaitTermination();}public static Connection myconn()throws Exception{Class.forName("com.mysql.jdbc.Driver");Connection conn= DriverManager.getConnection("jdbc:mysql://localhost:3306/edu","root","123123");return conn;}}

第4关:KafkaStreaming

编程要求

在右侧编辑器补充代码,完成以下需求:

  • 读取 kafka 的名为 test 、分区号为 0 、偏移量为 0Topic

  • 将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss

  • 提取数据中的起始URL(切割符为空格)

  • 拼接结果数据,格式如下:

  1. Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200
    package net.educoder;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.spark.SparkConf;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka010.ConsumerStrategies;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import java.text.SimpleDateFormat;import java.util.*;public class Step2 {private static SparkConf conf;static {conf = new SparkConf().setMaster("local[*]").setAppName("Step2");conf.set("spark.streaming.stopGracefullyOnShutdown", "true");}public static void main(String[] args) throws InterruptedException {Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "127.0.0.1:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", "sparkStreaming");kafkaParams.put("enable.auto.commit", "false");TopicPartition topicPartition = new TopicPartition("test", 0);List<TopicPartition> topicPartitions = Arrays.asList(topicPartition);HashMap<TopicPartition, Long> offsets = new HashMap<>();offsets.put(topicPartition, 0l);/********** Begin **********///1.初始化JavaStreamingContext并设置处理批次的时间间隔,Durations.seconds(1)  --> 1秒一个批次JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));//2.使用 KafkaUtils 对象创建流,使用 Assign 订阅主题(Topic),上面已经为你定义好了 Topic列表:topicPartitions,kafka参数:kafkaParams,偏移量:offsetsJavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Assign(topicPartitions, kafkaParams, offsets));JavaDStream<String> dStream = javaInputDStream.map(x -> x.value());/**** 数据格式如下:*      100.143.124.29,1509116285000,'GET www/1 HTTP/1.0',https://www.baidu.com/s?wd=反叛的鲁鲁修,404* 数据从左往右分别代表:用户IP、访问时间戳、起始URL及相关信息(访问方式,起始URL,http版本)、目标URL、状态码*** 原始数据的切割符为逗号,(英文逗号)** 需求:*      1.将时间戳转换成规定时间(格式为:yyyy-MM-dd HH:mm:ss )*      2.提取数据中的起始URL(切割符为空格)*      3.拼接结果数据,格式如下:* Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200*      4.判断rdd是否为空,如果为空,调用  ssc.stop(false, false)与sys.exit(0) 两个方法,反之将结果数据存储到mysql数据库中,调用JdbcTools.saveData2(Iterator[String])即可*///3.获取kafka流中的数据,进行清洗、转换(按照上面的需求)SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");JavaDStream<String> map = dStream.map(x -> {String[] split = x.split(",");String ip = split[0];String time = simpleDateFormat.format(new Date(new Long(split[1])));String startUrl = split[2].split(" ")[1];String targetUrl = split[3];String statusCode = split[4];return "Ip:" + ip + ",visitTime:" + time + ",startUrl:" + startUrl + ",targetUrl:" + targetUrl + ",statusCode:" + statusCode;});//4.判断rdd是否为空,如果为空,调用  ssc.stop(false, false)与sys.exit(0) 两个方法,反之将结果数据存储到mysql数据库中,调用JdbcTools.saveData2(Iterator[String])即可map.foreachRDD(rdd -> {if (rdd.isEmpty()) {ssc.stop(false, false);System.exit(0);} else {rdd.foreachPartition(partitionOfRecords -> {JdbcTools.saveData2(partitionOfRecords);});}});//5.启动SparkStreamingssc.start();//6.等待计算结束ssc.awaitTermination();/********** End **********/}}

Educoder中Spark算子--java版本相关推荐

  1. 在IDEA中关于项目java版本问题

    在IDEA中关于项目java版本问题 当出现错误如:java无效的源发行版11或IDEA Error:java:Compliation failed:internal java complier er ...

  2. 五-中, Spark 算子 吐血总结(转化+行动算子共三十七个)

    文章目录 五-中, Spark 算子吐血总结 5.1.4.3 RDD 转换算子(Transformation) 1. Value类型 1.1 `map` 1.2 `mapPartitions` 1.3 ...

  3. Spark算子--Scala版本 educoder

    第1关:转换算子之map和distinct算子 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkCon ...

  4. 头歌educoder Spark算子--Scala版本 实训答案

    第1关:转换算子之map和distinct算子 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkCon ...

  5. Mac中如何查看java版本

    在Mac中如何使用终端进行查看Java的版本号呢? 首先打开终端comman+空白键 输入ter+enter 就快速打开了终端 在终端中输入 java -version +enter 终端就会显示出对 ...

  6. educoder中Spark GraphX—构建图及相关操作

    第1关:GraphX-构建图及相关基本操作 import org.apache.log4j.{Level, Logger} import org.apache.spark.graphx._ impor ...

  7. Educoder中Spark任务提交

    第1关:spark-submit提交 #!/bin/bashcp -r Spark/SparkRDD/target/project.jar /root cd /opt/spark/dist/bin # ...

  8. 【错误记录】Android 编译时技术版本警告 ( 注解处理器与主应用支持的 Java 版本不匹配 )

    文章目录 一.报错信息 二.问题分析 三.解决方案 一.报错信息 在使用 Android 编译时技术 , 涉及 编译时注解 , 注解处理器 ; 开发注解处理器后 , 编译报如下警告 ; 该警告不会影响 ...

  9. jenv java_mac 上使用jenv 管理的多个java 版本

    由于服务器是java1.7, mac上是1.8,因此mac编译的java代码会在服务器上报错.因此,需要修改mac上java版本,自己折腾了很久,放弃,决定使用jenv 管理! 结果是非常方便 使用步 ...

最新文章

  1. Mat矩阵基本操作与示例 OpenCV
  2. 聊聊《柒个我》这部剧
  3. 面向对象--内部属性类型
  4. 装车机器人_智造春天脚步近 青岛这家机器人公司着手打造模块化、标准化技术平台...
  5. 控制uibutton的title范围
  6. es6 --- 对任意对象部署可遍历接口
  7. Java数据库篇1——数据库配置
  8. PS如何生成svg代码格式的path路径 - PS技巧篇
  9. PCL计算点云的法线
  10. 调用发票管理系统的方法2
  11. php 漏洞扫描,Webvulscan:一款基于PHP的漏洞扫描器
  12. 小组学习电子教室等同屏工具调研
  13. STM32F207时钟系统解析
  14. 【性能之旅】Andrew 领衔,RWP 团队再临北京
  15. apk编辑器android源码,apk编辑器电脑版_apk编辑器电脑版下载[apk编译]- 下载之家
  16. 计算机系统启动项设置密码,电脑开机第一道密码怎么设置 - 卡饭网
  17. 让ros机器人行走、建图、路径规划、定位和导航
  18. General Sultan UVA - 11604(建图暴力)
  19. 电脑数据迁移高招,怎么把旧电脑的数据迁移到新电脑
  20. linux7.4离线内核升级,CentOS 7.4升级Linux内核

热门文章

  1. matlab错位相减,matlab随手记
  2. 12032 解救小哈
  3. Mockplus: 让小白产品经理上手就用的原型图设计工具
  4. 支持向量机SVM--线性
  5. python爬虫代理的使用_从零开始写Python爬虫 --- 2.4 爬虫实践:代理的爬取和验证...
  6. 23种设计模式之代理模式(动态代理)
  7. SpringBoot单元测试RunWith注解无法解析
  8. BAPI货物移动时报错
  9. 网上插画教学哪家质量好,汇总5大插画培训班
  10. SQLiteSpy下载安装