环境
  虚拟机:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客户端:Xshell4
  FTP:Xftp4
  jdk1.8
  scala-2.10.4(依赖jdk1.8)
  spark-1.6

一、PV & UV

  PV是网站分析的一个术语,用以衡量网站用户访问的网页的数量。对于广告主,PV值可预期它可以带来多少广告收入。一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。

1、什么是PV值
PV(page view)即页面浏览量或点击量,是衡量一个网站或网页用户访问量。具体的说,PV值就是所有访问者在24小时(0点到24点)内看了某个网站多少个页面或某个网页多少次。PV是指页面刷新的次数,每一次页面刷新,就算做一次PV流量。
度量方法就是从浏览器发出一个对网络服务器的请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,从而产生了一个PV。那么在这里只要是这个请求发送给了浏览器,无论这个页面是否完全打开(下载完成),那么都是应当计为1个PV。

package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Pv {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("PV"); JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");//根据PV定义 某个页面/网址的访问数量  将每一条记录根据网址解析出一条访问量JavaPairRDD<String, Integer> ipwebrdd = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {//7.213.213.208    吉林    2018-03-29    1522294977303    1920936170939152672    www.dangdang.com    LoginString[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[5],1);}});//累加页面访问量JavaPairRDD<String, Integer> mapToPair = ipwebrdd.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//换个  用于按照整数key排序
            @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});List<Tuple2<String, Integer>> list = mapToPair.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}

结果:

(www.baidu.com,18791)
(www.dangdang.com,18751)
(www.suning.com,18699)
(www.mi.com,18678)
(www.taobao.com,18613)

2、什么是UV值
UV(unique visitor)即独立访客数,指访问某个站点或点击某个网页的不同IP地址的人数。在同一天内,UV只记录第一次进入网站的具有独立IP的访问者,在同一天内再次访问该网站则不计数。UV提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。

package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Uv {/*** 根据IP网址来确定唯一用户访问  然后排重  累计* @param args*/public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("UV");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");JavaPairRDD<String, Integer> rdd2 = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] split = line.split("\t");return new Tuple2<String, Integer>(split[0]+"_"+split[5],1);}}).distinct().mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tuple)throws Exception {return new Tuple2<String, Integer>(tuple._1.split("_")[1],1);}});//累加JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//反转 数值做KEY 用于排序
            @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false)//降序排序.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;//排序之后  反转回来
            @Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});//取前5个元素List<Tuple2<String, Integer>> list = rdd3.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}

结果:

(www.baidu.com,15830)
(www.suning.com,15764)
(www.mi.com,15740)
(www.jd.com,15682)
(www.dangdang.com,15641)

二、二次排序

对于两列以上的数据,要求对第一列排序之后,之后的列也要依次排序,思路就是:先对第一列进行排序,对于第一列数值相同,再对第二列进行排序。

举例:

待排序数据:secondSort.txt

3 1
5 2
6 5
8 123
1 4
4 123
5 432
3 54
5 121
8 654
3 98

package com.wjy.test;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class SecondSort{public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("SecondSort");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/secondSort.txt");//转成K-V格式//PairFunction 入参1-rdd的一行记录 入参2 入参3是call的出参JavaPairRDD<SecondSortKey, String> mapToPair = rdd.mapToPair(new PairFunction<String, SecondSortKey, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<SecondSortKey, String> call(String line)throws Exception {String[] sps = line.split(" ");int first = Integer.valueOf(sps[0]);int second = Integer.valueOf(sps[1]);SecondSortKey ss = new SecondSortKey(first,second);return new Tuple2<SecondSortKey, String>(ss,line);}});//sortByKey 会使用key也就是SecondSortKey的compareTo方法mapToPair.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<SecondSortKey, String> tuple) throws Exception {System.out.println(tuple._2);}});            sc.stop();          }}

对于KEY自定义类型 实现comparable接口 实现comparTo方法

package com.wjy.test;import java.io.Serializable;public class SecondSortKey  implements Serializable ,Comparable<SecondSortKey>{private static final long serialVersionUID = 1L;private int first;private int second;public SecondSortKey(int first,int second){super();this.first=first;this.second=second;}public int getFirst() {return first;}public void setFirst(int first) {this.first = first;}public int getSecond() {return second;}public void setSecond(int second) {this.second = second;}@Overridepublic int compareTo(SecondSortKey o) {//先比较第一个数值 如果相同再比较第二个值 否则直接返回第一个值的比较结果if (getFirst()-o.getFirst() == 0){return getSecond() - o.getSecond();}else{return getFirst()-o.getFirst();}}}

排序结果:

8 654
8 123
6 5
5 432
5 121
5 2
4 123
3 98
3 54
3 1
1 4

三、分组取topN

对于多组数据,去每一组数据前N个数据,比如列出每个班级的前三名等等问题。
解决的思路:先分组,然后每一组排序,取前N个。
案例:有三个班级的分数清单scores.txt,取出每班前三名。

class1    100
class2    85
class3    70
class1    102
class2    65
class1    45
class2    85
class3    70
class1    16
class2    88
class1    95
class2    37
class3    98
class1    99
class2    23

groupByKey+排序算法:

package com.wjy.test;import java.util.Iterator;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class TopNtest {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOs");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/scores.txt");//转成K-V格式 方便下一步分组和排序//PairFunction 入参1rdd的一行数据  入参2、3是call的出参元素JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[0],Integer.valueOf(ss[1]));}});//使用groupByKey 将相同班级的数据放在一个集合里mapToPair.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {String classname = tuple._1;Iterator<Integer> it = tuple._2.iterator();Integer[] top3 = new Integer[3];while(it.hasNext()){Integer score = it.next();//排序for (int i = 0; i < top3.length; i++) {if(top3[i] == null){top3[i] = score;break;}else if(score > top3[i]){for (int j = 2; j > i; j--) {top3[j] = top3[j-1];}top3[i] = score;break;}}}System.out.println("classname="+classname);for (Integer i:top3){System.out.println(i);}}});
sc.stop();
} }

topN 结果:

classname=class3
98
70
70
classname=class1
102
100
99
classname=class2
88
85
85

转载于:https://www.cnblogs.com/cac2020/p/10684754.html

【Spark-core学习之九】 Spark案例相关推荐

  1. 【Spark深入学习 -14】Spark应用经验与程序调优

    ----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调 ...

  2. spark SQL学习(认识spark SQL)

    spark SQL初步认识 spark SQL是spark的一个模块,主要用于进行结构化数据的处理.它提供的最核心的编程抽象就是DataFrame. DataFrame:它可以根据很多源进行构建,包括 ...

  3. 【Spark深入学习 -15】Spark Streaming前奏-Kafka初体验

    ----本节内容------- 1.Kafka基础概念 1.1 出世背景 1.2 基本原理 1.2.1.前置知识 1.2.2.架构和原理 1.2.3.基本概念 1.2.4.kafka特点 2.Kafk ...

  4. 分布式实时计算—Spark—Spark Core

    原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...

  5. Spark基础学习笔记01:初步了解Spark

    文章目录 零.本讲学习目标 一.大数据开发总体架构 二.Spark简介 三.Spark发展史 四.Spark特点 (一)快速 (二)易用 (三)通用 (四)随处运行 (五)代码简洁 1.采用MR实现词 ...

  6. Spark入门学习交流—Spark生态圈

    1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年 ...

  7. Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  8. Spark学习笔记(8)---Spark Streaming学习笔记

    Spark Streaming学习笔记 同Spark SQL一样,Spark Streaming学习也是放在了github https://github.com/yangtong123/RoadOfS ...

  9. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  10. Spark入门实战系列--1.Spark及其生态圈简介

    1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年 ...

最新文章

  1. nginx 日志切割
  2. SQL Server主从数据库同步方式及同步问题解决方案总结
  3. linux lvm添加磁盘,Linux下添加磁盘创建lvm分区
  4. Two Arrays and Sum of Functions
  5. java lambda表达式_凯哥带你从零学大数据系列之Java篇---第二十二章:Lambda表达式...
  6. JS iframe父子页面元素调用方法 window parent top 解释
  7. win10安装iis(亲测,工作需要)
  8. linux 显示文件多少行
  9. 运输层课后第13题解析
  10. docker php安装gd扩展_php安装redis扩展
  11. ENVI 5.3软件安装教程(附带安装包获取方式)
  12. Qt配置OpenCV时mingw32-make反复出错的原因
  13. linux下texlive的卸载,Linux 下 texlive 2018 的安装
  14. MySql分页查询limit
  15. 【论】PISCES: A Programmable, Protocol-Independent Software Switch
  16. C语言实现模拟银行存取款管理系统课程设计(纯C语言版)
  17. Systick中断延时
  18. pygame实现 飞机大战-第三版-仿写版
  19. 毫米波雷达处理流程、算法、代码合集
  20. 跳板攻击中如何追踪定位攻击者主机(上)

热门文章

  1. 测试人员在需求阶段应做哪些工作
  2. 页面是可以这样设计的
  3. NoneBot2插件——打印系统状态
  4. nedc和epa续航里程什么意思_景区电动观光车的续航里程为什么会逐渐变短?
  5. 基于ASP.Net Core开发的一套通用后台框架
  6. 解决 C# GetPixel 和 SetPixel 效率问题(转)
  7. .9-Vue源码之AST(5)
  8. Servlet 生命周期、工作原理
  9. JAVA-集合作业-已知有十六支男子足球队参加2008 北京奥运会。写一个程序,把这16 支球队随机分为4 个组。采用List集合和随机数...
  10. 关于生成随机数的疑点