大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

今天查资料的时候看到一个朋友的博客写的很好,加了好友,对方表示大二的学生,写的Spark的练习题非常接地气并且很适合练手,大家可以看看。

一、基础练习题

首先让我们准备好该题所需的数据 test.txt

数据结构如下依次是:班级 姓名 年龄 性别 科目 成绩

12 宋江 25 男 chinese 5012 宋江 25 男 math 6012 宋江 25 男 english 7012 吴用 20 男 chinese 5012 吴用 20 男 math 5012 吴用 20 男 english 5012 杨春 19 女 chinese 7012 杨春 19 女 math 7012 杨春 19 女 english 7013 李逵 25 男 chinese 6013 李逵 25 男 math 6013 李逵 25 男 english 7013 林冲 20 男 chinese 5013 林冲 20 男 math 6013 林冲 20 男 english 5013 王英 19 女 chinese 7013 王英 19 女 math 8013 王英 19 女 english 70

题目如下:

1. 读取文件的数据test.txt

2. 一共有多少个小于20岁的人参加考试?

3. 一共有多少个等于20岁的人参加考试?

4. 一共有多少个大于20岁的人参加考试?

5. 一共有多个男生参加考试?

6. 一共有多少个女生参加考试?

7. 12班有多少人参加考试?

8. 13班有多少人参加考试?

9. 语文科目的平均成绩是多少?

10. 数学科目的平均成绩是多少?

11. 英语科目的平均成绩是多少?

12. 每个人平均成绩是多少?

13. 12班平均成绩是多少?

14. 12班男生平均总成绩是多少?

15. 12班女生平均总成绩是多少?

16. 13班平均成绩是多少?

17. 13班男生平均总成绩是多少?

18. 13班女生平均总成绩是多少?

19. 全校语文成绩最高分是多少?

20. 12班语文成绩最低分是多少?

21. 13班数学最高成绩是多少?

22. 总成绩大于150分的12班的女生有几个?

23. 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?

答案在这里:

object test {def main(args: Array[String]): Unit = {val config = new SparkConf().setMaster("local[*]").setAppName("test")    val sc = new SparkContext(config)// 1.读取文件的数据test.txt    // 返回包含所有行数据的列表    val data: RDD[String]  = sc.textFile("E:\\2020大数据新学年\\BigData\\05-Spark\\0403\\test.txt")//val value: RDD[Array[String]] = sc.makeRDD(List("12 宋江 25 男 chinese 50")).map(x=>x.split(" "))// 2. 一共有多少个小于20岁的人参加考试?2    val count1: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt<20).groupBy(_(1)).count()// 3. 一共有多少个等于20岁的人参加考试?2    val count2: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt==20).groupBy(_(1)).count()// 4. 一共有多少个大于20岁的人参加考试?2    val count3: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt>20).groupBy(_(1)).count()// 5. 一共有多个男生参加考试?4    val count4: Long = data.map(x=>x.split(" ")).filter(x=>x(3).equals("男")).groupBy(_(1)).count()// 6.  一共有多少个女生参加考试?2    val count5: Long = data.map(x=>x.split(" ")).filter(x=>x(3).equals("女")).groupBy(_(1)).count()// 7.  12班有多少人参加考试?3    val count6: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12")).groupBy(_(1)).count()// 8.  13班有多少人参加考试?3    val count7: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13")).groupBy(_(1)).count()// 9.  语文科目的平均成绩是多少?58.333333333333336    val mean1 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("chinese")).map(x=>x(5).toInt).mean()// 10.  数学科目的平均成绩是多少?63.333333333333336    val mean2 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("math")).map(x=>x(5).toInt).mean()// 11. 英语科目的平均成绩是多少?63.333333333333336    val mean3 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("english")).map(x=>x(5).toInt).mean()// 12. 每个人平均成绩是多少?    //(王英,73)    //(杨春,70)    //(宋江,60)    //(李逵,63)    //(吴用,50)    //(林冲,53)    val every_socre: RDD[(String, Any)] = data.map(x=>x.split(" ")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum /t._2.size))// 13. 12班平均成绩是多少?60.0    var mean5 = data.map(x => x.split(" ")).filter(x => x(0).equals("12")).map(x => x(5).toInt).mean()// 14. 12班男生平均总成绩是多少?165.0    // (宋江,180)    // (吴用,150)    val boy12_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("男")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()// 15. 12班女生平均总成绩是多少?210.0    // (杨春,210)    val girl12_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()// 16. 13班平均成绩是多少?63.333333333333336    var mean8 = data.map(x => x.split(" ")).filter(x => x(0).equals("13")).map(x => x(5).toInt).mean()// 17. 13班男生平均总成绩是多少?175.0    //(李逵,190)    //(林冲,160)    val boy13_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13") && x(3).equals("男")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()// 18. 13班女生平均总成绩是多少?    //(王英,220)    val girl13_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()// 19. 全校语文成绩最高分是多少?70    var max1 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese")).map(x => x(5).toInt).max()// 20. 12班语文成绩最低分是多少?50    var max2 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese") && x(0).equals("12")).map(x => x(5).toInt).min()// 21. 13班数学最高成绩是多少?80    var max3 = data.map(x => x.split(" ")).filter(x => x(4).equals("math") && x(0).equals("13")).map(x => x(5).toInt).max()// 22. 总成绩大于150分的12班的女生有几个?1    //(杨春,210)    val count12_gt150girl: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).filter(x=>x._2>150).count()// 23. 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?    //val countall: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt>=19 && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).filter(x=>x._2>150).count()    val complex1 = data.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3),line(5).toInt)})    //(13,李逵,男 , 60)    val complex2 = data.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(2)+","+line(3)+","+line(4),line(5).toInt)})    //(12,宋江,男,chinese , 50)// 过滤出总分大于150的,并求出平均成绩    (13,李逵,男,(60,1))               (13,李逵,男,(190,3))             总成绩大于150                (13,李逵,男,63)    val com1: RDD[(String, Int)] = complex1.map(x=>(x._1,(x._2,1))).reduceByKey((a, b)=>(a._1+b._1,a._2+b._2)).filter(a=>(a._2._1>150)).map(t=>(t._1,t._2._1/t._2._2))    // 注意:reduceByKey 自定义的函数 是对同一个key值的value做聚合操作    //(12,杨春,女 , 70)    //(13,王英,女 , 73)    //(12,宋江,男 , 60)    //(13,林冲,男 , 53)    //(13,李逵,男 , 63)//过滤出 数学大于等于70,且年龄大于等于19岁的学生                filter方法返回一个boolean值 【数学成绩大于70并且年龄>=19】                                       为了将最后的数据集与com1做一个join,这里需要对返回值构造成com1格式的数据    val com2: RDD[(String, Int)] = complex2.filter(a=>{val line = a._1.split(",");line(4).equals("math") && a._2>=70 && line(2).toInt>=19}).map(a=>{val line2 = a._1.split(",");(line2(0)+","+line2(1)+","+line2(3),a._2.toInt)})    //(12,杨春,女 , 70)    //(13,王英,女 , 80)// val common: RDD[(String, (Int, Int))] = com1.join(com2)    // common.foreach(println)    // (12,杨春,女 , (70,70))    // (13,王英,女 , (73,80))// 使用join函数聚合相同key组成的value元组    // 再使用map函数格式化元素    val result = com1.join(com2).map(a =>(a._1,a._2._1))    //(12,杨春,女,70)    //(13,王英,女,73)    //到这里就大功告成了!!!!!!!!!!}}

二、基础练习题

题目如下:

以下是RNG S8 8强赛失败后,官微发表道歉微博下一级评论:

题目如下:

1. 在kafak中创建rng_comment主题,设置2个分区2个副本

2. 数据预处理,把空行和缺失字段的行过滤掉

3. 请把给出的文件写入到kafka中,根据数据id进行分区,id为奇数的发送到一个分区中,偶数的发送到另一个分区

4. 使用Spark Streaming对接kafka

5. 使用Spark Streaming对接kafka之后进行计算

在mysql中创建一个数据库rng_comment

在数据库rng_comment创建vip_rank表,字段为数据的所有字段

在数据库rng_comment创建like_status表,字段为数据的所有字段

在数据库rng_comment创建count_conmment表,字段为 时间,条数

6. 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中

7. 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中

8. 分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

答案在这里:

1. 创建Topic

在命令行窗口执行Kafka创建Topic的命令,并指定对应的分区数和副本数

/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 2 --topic rng_comment

2. 读取文件,并对数据做过滤并输出到新文件

object test01_filter {  def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("demo01").getOrCreate()val sc: SparkContext = spark.sparkContext// 读取数据    //testFile是多行数据    val rddInfo: RDD[String] = sc.textFile("E:\\rng_comment.txt")// 对数据进行一个过滤    val RNG_INFO: RDD[String] = rddInfo.filter(data => {// 判断长度:将每行的内容用tab键切割,判断最后的长度      // 判读是否为空字符: trim之后不为empty      data.split("\t").length == 11 && !data.trim.isEmpty})//    // 如果想直接将数据写入到Kafka,而不通过输出文件的方式//    val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)    def saveToKafka(INFO:RDD[String]): Unit ={      try {        INFO.foreach(x=>{//          val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("rng_test",x.split("\t")(0),x.toString)          kafkaProducer.send(record)//        })      }catch {//        case e:Exception => println("发送数据出错:"+e)//      }    }// 导入隐式转换    // 将RDD转换成DF    import spark.implicits._    val df: DataFrame = RNG_INFO.toDF()// 输出数据【默认分区数为2,这里我们指定分区数为1】    df.repartition(1).write.text("E:\\outputtest")// 关闭资源    sc.stop()    spark.stop()}}

3. 读取新文件,将数据按照题意发送到Kafka的不同分区

需要先写一个实现自定义分区逻辑的java类

/*编写自定义分区逻辑 */public class ProducerPartition implements Partitioner {    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {/*   编写自定义分区代码    */        //System.out.println(value.toString());        String[] str = value.toString().split("\t");// 由题意可得,id为奇数的发送到一个分区中,偶数的发送到另一个分区        if (Integer.parseInt(str[0]) % 2 == 0){            return 0;        }else {            return 1;        }}@Override    public void close() {}@Override    public void configure(Map<String, ?> configs) {}}

然后在下面的程序中引用分区类的类路径

public class test02_send {/*   程序的入口    */    public static void main(String[] args) throws IOException {//编写生产数据的程序//1、配置kafka集群环境(设置)        Properties props = new Properties();        //kafka服务器地址        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");        //消息确认机制        props.put("acks", "all");        //重试机制        props.put("retries", 0);        //批量发送的大小        props.put("batch.size", 16384);        //消息延迟        props.put("linger.ms", 1);        //批量的缓冲区大小        props.put("buffer.memory", 33554432);        // kafka   key 和value的序列化        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 根据题意得,需要自定义分区        props.put("partitioner.class", "com.czxy.scala.demo12_0415.han.ProducerPartition");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);// 指定需要读取的文件        File file = new File("E:\\outputtest\\part-00000-fe536dc7-523d-4fdd-b0b5-1a045b8cb1ab-c000.txt");// 创建对应的文件流,进行数据的读取        FileInputStream fileInputStream = new FileInputStream(file);        //   指定编码格式进行读取        InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, "UTF-8");        // 创建缓冲流        BufferedReader bufferedReader = new BufferedReader(inputStreamReader);// 创建一个变量,用来保存每次读取的数据        String tempString = null;// 循环遍历读取文件内容        while ((tempString = bufferedReader.readLine()) != null) {// 利用kafka对象发送数据            kafkaProducer.send(new ProducerRecord<>("rng_comment", tempString));// 发送完成之后打印数据            System.out.println("已发送:" + tempString);        }System.out.println("数据发送完毕!");// 关闭kafka数据生产者        kafkaProducer.close();}}

4. 先在数据库中创建好接收数据需要用到的表

create table vip_rank(  `index` varchar(100) null comment '数据id',  child_comment varchar(100) null comment '回复数量',  comment_time DATE null comment '评论时间',  content TEXT null comment '评论内容',  da_v varchar(100) null comment '微博个人认证',  like_status varchar(100) null comment '赞',  pic varchar(100) null comment '图片评论url',  user_id varchar(100) null comment '微博用户id',  user_name varchar(100) null comment '微博用户名',  vip_rank int null comment '微博会员等级',  stamp varchar(100) null comment '时间戳');create table like_status(  `index` varchar(100) null comment '数据id',  child_comment varchar(100) null comment '回复数量',  comment_time DATE null comment '评论时间',  content varchar(10000) null comment '评论内容',  da_v varchar(100) null comment '微博个人认证',  like_status varchar(100) null comment '赞',  pic varchar(100) null comment '图片评论url',  user_id varchar(100) null comment '微博用户id',  user_name varchar(100) null comment '微博用户名',  vip_rank int null comment '微博会员等级',  stamp varchar(100) null comment '时间戳');create table count_comment(  time DATE null comment '时间',  count int null comment '出现的次数',  constraint rng_comment_pk    primary key (time));

5. 使用Spark Streaming对接kafka之后进行计算

下面的代码完成了:

查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中

查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中

object test03_calculate {/*将数据从kafka集群中读取,并将数据做进一步的处理过后,写入到mysql数据库中*/def ConnectToMysql() ={// 连接驱动,设置需要连接的MySQL的位置以及数据库名 + 用户名 + 密码DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8", "root", "root")}/*** 将数据写入到MySQL的方法* @param tableName 表名* @param data List类型的数据*/def saveDataToMysql(tableName:String,data:List[String]): Unit ={// 获取连接val connection: Connection = ConnectToMysql()// 创建一个变量用来保存sql语句val sql = s"insert into ${tableName} (`index`, child_comment, comment_time, content, da_v,like_status,pic,user_id,user_name,vip_rank,stamp) values (?,?,?,?,?,?,?,?,?,?,?)"// 将数据存入到mysql中val ps: PreparedStatement = connection.prepareStatement(sql)ps.setString(1,data.head)ps.setString(2,data(1))ps.setString(3,data(2))ps.setString(4,data(3))ps.setString(5,data(4))ps.setString(6,data(5))ps.setString(7,data(6))ps.setString(8,data(7))ps.setString(9,data(8))ps.setString(10,data(9))ps.setString(11,data(10))// 提交[因为是插入数据,所以这里需要更新]ps.executeUpdate()// 关闭连接connection.close()}def main(args: Array[String]): Unit = {//1 创建sparkConfvar conf = new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")//2 创建一个sparkcontextvar sc = new SparkContext(conf)sc.setLogLevel("WARN")//3 创建streamingcontextvar ssc = new StreamingContext(sc,Seconds(3))//设置kafka对接参数var  kafkaParams= Map[String, Object]("bootstrap.servers" -> "node01:9092,node02:9092,node03:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "SparkKafkaDemo",//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常//这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费"auto.offset.reset" -> "earliest",//false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护"enable.auto.commit" -> (false: java.lang.Boolean))// 设置检查点的位置ssc.checkpoint("sparkstreaming/")//kafkaDatas  含有key和value//key是kafka成产数据时指定的key(可能为空)//value是真实的数据(100%有数据)val kafkaDatas: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,//设置位置策略   均衡LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams))kafkaDatas.foreachRDD(rdd=>rdd.foreachPartition(line=>{// 遍历每一个分区的数据for (row <- line){// 获取到行数据组成的array数组val str: Array[String] = row.value().split("\t")// 将数据转成List集合val list: List[String] = str.toList/* 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中 */if (list(9).equals("5")){// 调用方法,将集合数据写入到指定的表中saveDataToMysql("vip_rank",list)}/* 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中 */if (Integer.parseInt(list(5))>10){saveDataToMysql("like_status",list)}}}))//5 开启计算任务ssc.start()//6 等待关闭ssc.awaitTermination()}}

运行成功后的效果

vip_rank

like_status

下面的代码完成了:
分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

object test04_count {def ConnectToMysql() ={// 连接驱动,设置需要连接的MySQL的位置以及数据库名 + 用户名 + 密码DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_test?characterEncoding=UTF-8", "root", "root")}/*** 将数据存入到mysql中** @param time  时间* @param count 数量*/def saveDataToMysql(time: String, count: Int): Unit = {println(s"$time\t $count")if (time.contains("2018/10/20") || time.contains("2018/10/21") || time.contains("2018/10/22") || time.contains("2018/10/23")) {//获取连接val connection: Connection = ConnectToMysql()//创建一个变量用来保存sql语句val sql: String = "INSERT INTO count_comment (time,count) VALUES (?,?) ON DUPLICATE KEY UPDATE count = ?"//将一条数据存入到mysqlval ps: PreparedStatement = connection.prepareStatement(sql)ps.setString(1, time)ps.setInt(2, count)ps.setInt(3, count)//提交ps.executeUpdate()//关闭连接connection.close()}}def main(args: Array[String]): Unit = {//1 创建sparkConfvar conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")//2 创建一个sparkcontextvar sc: SparkContext =new SparkContext(conf)sc.setLogLevel("WARN")//3 创建StreamingContextvar ssc: StreamingContext =new   StreamingContext(sc,Seconds(5))//设置缓存数据的位置ssc.checkpoint("./TmpCount")// 设置kafka的参数var  kafkaParams: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",  // 集群位置"key.deserializer" -> classOf[StringDeserializer],  // key序列化标准"value.deserializer" -> classOf[StringDeserializer],  // value序列化标准"group.id" -> "SparkKafkaDemo",  // 分组id//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常//这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费"auto.offset.reset" -> "earliest",//false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护"enable.auto.commit" -> (false: java.lang.Boolean))// 接收Kafka的数据并根据业务逻辑进行计算val kafkaDatas: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String,String](ssc,   // StreamingContext对象LocationStrategies.PreferConsistent,  // 位置策略ConsumerStrategies.Subscribe[String,String](Array("rng_comment"),kafkaParams)  // 设置需要消费的topic和kafka参数)// 2018/10/23 16:09  需要先获取到下标为2的数据,再按照空格进行切分,获取到年月日即可val kafkaWordOne: DStream[(String, Int)] = kafkaDatas.map(z=>z.value().split("\t")(2).split(" ")(0)).map((_,1))// 更新数据val wordCounts: DStream[(String, Int)] = kafkaWordOne.updateStateByKey(updateFunc)// 遍历RDDwordCounts.foreachRDD(rdd=>rdd.foreachPartition(line=>{for(row <- line){saveDataToMysql(row._1,row._2)//println("保存成功!")}}))println("完毕!")// 开启计算任务ssc.start()// 等待关闭ssc.awaitTermination()}//currentValues:当前批次的value值,如:1,1,1 (以测试数据中的hadoop为例)//historyValue:之前累计的历史值,第一次没有值是0,第二次是3//目标是把当前数据+历史数据返回作为新的结果(下次的历史数据)def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={// currentValues当前值// historyValue历史值val result: Int = currentValues.sum + historyValue.getOrElse(0)Some(result)}
}

运行成功后的效果

count_comment

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

来看看一个大二学生的Spark练习题相关推荐

  1. 一个大二学生送给大一学弟学妹的建议

    博主简介:先简单的介绍一下我吧,本人是一名大二学生,来自四川.目前所学专业是人工智能,致力于在CSDN平台分享自己的学习内容. 我为什么要写这篇文章? 我来到CSDN也已经一年了,在这一年里面,我学会 ...

  2. 来自一个大二学生自学Java一个月的感受,新人学习

    我,大二学生,一个期盼自己有一天成为黑客大佬的癞蛤蟆,而且还是金皮的. 癞蛤蟆的开始 因为自己从小对电脑有特别的兴趣,而且看了许多关于黑客的文字或视频资料,让自己一度想成为像剧情里的黑客一样,可以呼风 ...

  3. 一个大二学生从屌丝开始的逆袭

    从职业高中考上大学,选了个物联网,然后两年就快没了,其中发生了不少的事情,但是庆幸一切都很顺利,没有什么让自己感到太闹心.       大学里我的迷茫 有的人可能根本不知道自己要要什么,或许更像我一样 ...

  4. 作为一个专科大二学生真的应该有紧迫感了

    好久都没写blog了,又要到秋招了,作为一个正在学习的大二学生来讲,离自己出去实习找工作的时间也不长了.最近看了些CSDN的技术博客论坛,想就此总结谈谈自己的想法. 从学历谈起作为一个专科生出去面试和 ...

  5. python实习做什么工作-大一/大二学生Python实习的困惑?

    题主是一名非名校的CS本科学生,现在遇到了一些困惑,想请教一下热心的segmentfault网友.因为不是985/211名校,现在题主所在的这所学校我感觉学风非常不好,第一是整体水平太低,一学期结束了 ...

  6. 获诱人的实习offer、在Kaggle推AI大赛,大二学生如何做到?

    一早起来,我与远在万里之外的儿子视频,听他聊在波士顿的暑假实习工作,听他讲业余时间和团队一起设计.研发.组织的人工智能大赛Lux-AI Challenge,听得很开心.很兴奋.周末有闲,写篇文章来分享 ...

  7. 获诱人的实习 offer 、在 Kaggle 推 AI大 赛,大二学生如何做到?

    作者:陶建辉 来源:爱倒腾的程序员 前序 一早起来,我与远在万里之外的儿子视频,听他聊在波士顿的暑假实习工作,听他讲业余时间和团队一起设计.研发.组织的人工智能大赛Lux-AI Challenge,听 ...

  8. 逆天了!看大二学生做的超写实CG卷尾猴!

    在短片<Fusspot>中,一只超写实的CG猴子学习<美食总动员>中的老鼠瑞米,担任了主厨.这是由一批来自在赫特福德大学才华横溢的大二学生赋予了它生命. 短片中,一只CG卷尾猴 ...

  9. 答大二学生:跟着自己的兴趣定方向

    [来信] 贺老师,你好: 我是河南XX大学计算机专业的一名大二学生,关注您的博客很久了,收获颇丰,谢谢您的付出.最近我们专业要分方向了,有硬件和软件两个选择:我对硬件非常的感兴趣况且编程对我而言有点力 ...

最新文章

  1. Bitcoin Unlimited发布BCH新版本,石墨烯技术迎来进展
  2. ML之xgboost:利用xgboost算法对Boston(波士顿房价)数据集【特征列分段→独热编码】进行回归预测(房价预测)+预测新数据得分
  3. iOS使用 xcconfig配置文件的若干坑
  4. 2019招商银行M-Geeker线上比赛题解析
  5. 未找到要求的 from 关键字_性能优化|这恐怕是解释Explain关键字最全的一篇文章
  6. (转)学习directx遇到的问题
  7. html文件如何转php文件,怎么把动态的php文件转换成静态的html文件,html文件是php文件…...
  8. 键是什么意思_化学提高不饱和键是什么意思
  9. Fiddler4入门——手机抓包
  10. XTU,C语言,Digit String
  11. img html 文件怎么打开,img文件怎么打开?img文件用什么打开?
  12. Nonebot QQ机器人插件八:点歌(网易云音乐)
  13. centos7校正系统时间
  14. 记项目现场的翻车事故
  15. 转如何用ps制作名片
  16. opencv的NORM_MINMAX参数
  17. MongoDB数据库的简介与安装步骤
  18. AGV调度系统实现(一)
  19. python 斗地主发牌_tkinter模拟斗地主发牌
  20. 1833 雪糕的最大数量(排序)

热门文章

  1. 智慧医疗健康领域数字孪生应用框架
  2. hive连接报错:Unauthorized connection for super-user: root from IP 192.168.216.128:14:13
  3. Rebound库使用分享
  4. 地铁时光机第一阶段冲刺四
  5. 聊一聊关于Glide在面试中的那些事
  6. Matlab实现绘制圆柱体
  7. 风投掘金可穿戴设备:大数据才是背后真金
  8. 想同你 看尽人家烟火,游过万代山河(HTML实现点击烟火特效)
  9. lesson-3 photoshop之套索工具,渐变、移动
  10. DFS BFS简单理解