爱奇艺分类点击实时统计
项目源码:
SparkStreaming部分:https://gitee.com/jenrey/project_two
SpringBoot部分:https://gitee.com/jenrey/project_two_two
1.项目需求
2.项目过程
3.数据格式
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1449137597979&p_url=http://list.iqiyi.com/www/4/---.html
4.项目开发
4.1 配置maven
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.aura.spark</groupId>
- <artifactId>1711categorycount</artifactId>
- <version>1.0-SNAPSHOT</version>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.6.5</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>2.3.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
- <version>2.3.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>0.98.6-hadoop2</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>0.98.6-hadoop2</version>
- </dependency>
- </dependencies>
- </project>
4.2 模拟生成样本数据
- package com.jenrey.spark.utils;
- import java.io.BufferedWriter;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.util.Random;
- /**
- * 模拟两万条数据并且保存到指定目录
- */
- public class SimulateData {
- public static void main(String[] args) {
- BufferedWriter bw = null;
- try {
- bw = new BufferedWriter(new FileWriter("G:\\workspace\\categorycount\\src\\main\\java\\com\\jenrey\\spark\\data.txt"));
- int i = 0;
- while (i < 20000) {
- long time = System.currentTimeMillis();
- int categoryid = new Random().nextInt(23);
- bw.write("ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=" + time + "&p_url=http://list.iqiyi.com/www/" + categoryid + "/---.html");
- bw.newLine(); //换行
- i++;
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try {
- bw.close();
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- }
- }
- }
- }
运行代码即可,会生成模拟数据
4.3 使用SparkStreaming从kafka中读取数据
- package com.jenrey.spark.category;
- import kafka.serializer.StringDecoder;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import scala.Tuple2;
- import java.util.HashMap;
- import java.util.HashSet;
- /**
- * SparkStreaming的数据来源来自于Kafka的topics的aura
- */
- public class CategoryRealCount {
- public static void main(String[] args) {
- //初始化程序入口
- SparkConf conf = new SparkConf();
- conf.setMaster("local");
- conf.setAppName("CategoryRealCount");
- JavaSparkContext sc = new JavaSparkContext(conf);
- JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(3));
- /*或者使用下面方法就自动创建SparkContext()
- JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3));*/
- //读取数据
- HashMap<String, String> KafkaParams = new HashMap<>();
- KafkaParams.put("metadata.broker.list", "hadoop04:9092");
- HashSet<String> topics = new HashSet<>();
- topics.add("aura");
- JavaDStream<String> logDStream = KafkaUtils.createDirectStream(
- ssc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- KafkaParams,
- topics
- ).map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) throws Exception {
- //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder]
- //直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2)
- return tuple2._2;
- }
- });
- //代码的逻辑
- logDStream.print();
- //启动应用程序
- ssc.start();
- try {
- ssc.awaitTermination();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- ssc.stop();
- }
- }
5.启动kafka的HA集群
先启动ZK
[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
[hadoop@hadoop04 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
[hadoop@hadoop05 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
创建topics
[hadoop@hadoop04 kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 3 --partitions 3 --topic aura
查看topics副本
[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic aura
删除topics(此方式不一定会删除彻底,理论上还要去tmp/kafka-log/下删除数据及分区,如果flume写入topics不成功建议换个topics试试)
[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic aura
那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
进入zk
zkCli.sh
找到topic所在的目录:ls /brokers/topics
到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
6.模拟数据实时产生
[hadoop@hadoop04 ~]$ cat data.txt | while read line
> do
> echo "$line" >> data.log
> sleep 0.5
> done
克隆会话,使用 tail -F data.log 命令查看实时产生的数据
7.编写并运行flume
[hadoop@hadoop04 ~]$ touch file2kafka.properties
[hadoop@hadoop04 ~]$ vim file2kafka.properties
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- #source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /home/hadoop/data.log
- #channel
- a1.channels.c1.type = memory
- #sink
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.topic = aura
- a1.sinks.k1.brokerList = hadoop04:9092
- a1.sinks.k1.requiredAcks = 1
- a1.sinks.k1.batchSize = 5
- #source -> channel -> sink
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
启动flume:
[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file file2kafka.properties --name a1 -Dflume.hadoop.logger=INFO,console
或者是下图的日志:
查看kafka消费者能否消费的到数据:
[hadoop@hadoop04 kafka_2.11-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper hadoop02:2181 --topic aura --from-beginning
运行categorycount.java程序后的效果图:
出现上图即代表:flume采集日志存到kafka中,然后SparkStreaming从kafka消费数据成功
8.开发HBase相关部分
8.0 开发bean 抽象出来存入HBase数据对象
- package com.jenrey.bean;
- import java.io.Serializable;
- /**
- * 使用面向对象的思想抽象出来的存入HBase中数据类型的类
- */
- public class CategoryClickCount implements Serializable {
- //点击的品类
- private String name;
- //点击的次数
- private long value;
- public CategoryClickCount(String name, long value) {
- this.name = name;
- this.value = value;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public long getValue() {
- return value;
- }
- public void setValue(long value) {
- this.value = value;
- }
- }
8.1 开发HBaseDao的代码
- package com.jenrey.hbase.dao;
- import com.jenrey.bean.CategoryClickCount;
- import java.util.List;
- /**
- * 访问HBase数据库的一些方法
- */
- public interface HBaseDao {
- //往hbase里面插入一条数据
- public void save(String tableName, String rowkey, String family, String q, long value);
- //按照表名和rowkey查询数据
- public List<CategoryClickCount> count(String tableName, String rowkey);
- }
8.2 HBase的API实现类
HBaseImpl.java的代码如下:
- package com.jenrey.hbase.dao.impl;
- import com.jenrey.bean.CategoryClickCount;
- import com.jenrey.hbase.dao.HBaseDao;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.CellUtil;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.filter.PrefixFilter;
- import org.apache.hadoop.hbase.util.Bytes;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * HBaseAPI的实现类
- */
- public class HBaseImpl implements HBaseDao {
- //获取数据库连接,类似于JDBC里面的连接池的概念
- HConnection htablePool = null;
- //只要一new就执行下面构造方法里面的代码,即创建并连接HBase成功
- public HBaseImpl() {
- //创建HBase的配置文件
- Configuration conf = HBaseConfiguration.create();
- //设置zookeeper
- conf.set("hbase.zookeeper.quorum", "hadoop02:2181");
- try {
- //创建连接
- htablePool = HConnectionManager.createConnection(conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- /**
- * 根据表名获取表对象
- *
- * @param tableName 表名
- * @return 表对象
- * HTableInterface 得到这个表的名称(HBase自带API)
- */
- public HTableInterface getTable(String tableName) {
- HTableInterface table = null;
- try {
- //根据表名获取表对象
- table = htablePool.getTable(tableName);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return table;
- }
- //
- /**
- * 实现往hbase里面插入一条数据
- *
- * @param tableName 表名
- * @param rowkey rowkey
- * @param family 列簇
- * @param q 品类
- * @param value 出现了多少次
- * hbase:只有一种数据类型,字节数组
- */
- @Override
- public void save(String tableName, String rowkey, String family, String q, long value) {
- HTableInterface table = getTable(tableName);
- try {
- //incrementColumnValue会自动帮我们累加value的值。不需要在读取、累加、返回值
- table.incrementColumnValue(rowkey.getBytes(), family.getBytes(), q.getBytes(), value);
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- //
- /**
- * 实现按照表名和rowkey查询数据
- *
- * @param tableName 表名
- * @param rowkey rowkey
- * @return
- */
- @Override
- public List<CategoryClickCount> count(String tableName, String rowkey) {
- //我们要的是电影和电视剧出现的次数,所以要封装成一个对象存到List集合中
- ArrayList<CategoryClickCount> list = new ArrayList<>();
- //获取表对象
- HTableInterface table = getTable(tableName);
- //前缀过滤器
- PrefixFilter prefixFilter = new PrefixFilter(rowkey.getBytes());
- //创建扫描仪
- Scan scan = new Scan();
- //给扫描仪安装过滤器
- scan.setFilter(prefixFilter);
- //拿着扫描仪去获取数据
- try {
- ResultScanner scanner = table.getScanner(scan);
- //遍历集合,一个结果就是一个Result对象
- for (Result result : scanner) {
- //我们要的是电影和电视剧出现的次数,所以要封装成一个对象存到List集合中,既是CategoryClickCount对象,name value
- //rowkey date_name
- //遍历里面的每一个东西,rawCells把里面的每一个东西都看成一个单元格
- for(Cell cell:result.rawCells()){
- byte[] date_name = CellUtil.cloneRow(cell);
- String name = new String(date_name).split("_")[1];
- byte[] value = CellUtil.cloneValue(cell);
- long count = Bytes.toLong(value);
- CategoryClickCount categoryClickCount = new CategoryClickCount(name, count);
- list.add(categoryClickCount);
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return list;
- }
- }
启动hbase集群:
(先启动zk,在启动hdfs)
启动 HBase 集群启动命令(只在hadoop02启动就可以了,在哪个节点启动哪个节点就是active):start-hbase.sh
查看是否启动成功:
通过访问浏览器页面,格式为”主节点:16010”http://hadoop02:16010/
进入hbase的客户端:
hbase shell
执行下面的命令创建表:
create 'aura','f'
8.3 测试HBaseAPI代码
先创建一个HbaseFactory.java文件,代码如下:
- package com.jenrey.hbase.dao.factory;
- import com.jenrey.hbase.dao.HBaseDao;
- import com.jenrey.hbase.dao.impl.HBaseImpl;
- /**
- * 返回一个HBaseDao对象,HBaseDao是一个接口,HBaseImpl是它的实现类
- */
- public class HBaseFactory {
- public static HBaseDao getHBaseDao() {
- return new HBaseImpl();
- }
- }
再创建一个测试类:
Test.java的代码如下:
- package com.jenrey.test;
- import com.jenrey.bean.CategoryClickCount;
- import com.jenrey.hbase.dao.HBaseDao;
- import com.jenrey.hbase.dao.factory.HBaseFactory;
- import java.util.List;
- /**
- * 测试HBaseImpl代码 (其实也就是测试HBase的API)
- */
- public class Test {
- public static void main(String[] args) {
- //TODO:向HBase中插入数据
- //获取到HBaseImpl对象
- HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
- //调用HBaseImpl的实现类的save方法往hbase里面插入一条数据的方法
- hBaseDao.save("aura","2018-5-23_电影","f","name",10L);
- hBaseDao.save("aura","2018-5-23_电影","f","name",20L);
- List<CategoryClickCount> list = hBaseDao.count("aura", "2018-5-23_电影");
- for(CategoryClickCount cc:list){
- System.out.println(cc.getName()+" "+cc.getCount());
- }
- }
- }
运行测试代码,结果如下即为HBaseAPI成功
再次测试一下代码:
- package com.jenrey.test;
- import com.jenrey.bean.CategoryClickCount;
- import com.jenrey.hbase.dao.HBaseDao;
- import com.jenrey.hbase.dao.factory.HBaseFactory;
- import java.util.List;
- /**
- * 测试HBaseImpl代码 (其实也就是测试HBase的API)
- */
- public class Test {
- public static void main(String[] args) {
- //TODO:向HBase中插入数据
- //获取到HBaseImpl对象
- HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
- //调用HBaseImpl的实现类的save方法往hbase里面插入一条数据的方法
- /*hBaseDao.save("aura","2018-5-23_电影","f","name",10L);
- hBaseDao.save("aura","2018-5-23_电影","f","name",20L);*/
- /* List<CategoryClickCount> list = hBaseDao.count("aura", "2018-5-23_电影");
- for(CategoryClickCount cc:list){
- System.out.println(cc.getName()+" "+cc.getCount());
- }*/
- hBaseDao.save("aura", "2018-5-21_电视剧", "f", "name", 11L);
- hBaseDao.save("aura", "2018-5-21_电视剧", "f", "name", 24L);
- hBaseDao.save("aura", "2018-5-23_电视剧", "f", "name", 110L);
- hBaseDao.save("aura", "2018-5-23_电视剧", "f", "name", 210L);
- List<CategoryClickCount> list = hBaseDao.count("aura", "2018-5-2");
- for (CategoryClickCount cc : list) {
- System.out.println(cc.getName() + " " + cc.getCount());
- }
- }
- }
效果如下:
8.4 完善SparkStreaming代码的开发逻辑
先编写工具类:Utils.java 代码如下
- package com.jenrey.utils;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.HashMap;
- /**
- * 工具类:传入一行数据,返回我们要的数据格式:日期_品类 的字符串
- */
- public class Utils {
- //传入一行内容,返回一个Key
- public static String getKey(String line) {
- HashMap<String, String> map = new HashMap<String, String>();
- map.put("0", "其他");
- map.put("1", "电视剧");
- map.put("2", "电影");
- map.put("3", "综艺");
- map.put("4", "动漫");
- map.put("5", "纪录片");
- map.put("6", "游戏");
- map.put("7", "资讯");
- map.put("8", "娱乐");
- map.put("9", "财经");
- map.put("10", "网络电影");
- map.put("11", "片花");
- map.put("12", "音乐");
- map.put("13", "军事");
- map.put("14", "教育");
- map.put("15", "体育");
- map.put("16", "儿童");
- map.put("17", "旅游");
- map.put("18", "时尚");
- map.put("19", "生活");
- map.put("20", "汽车");
- map.put("21", "搞笑");
- map.put("22", "广告");
- map.put("23", "原创");
- //TODO:切分数据,返回品类ID
- String categoryid = line.split("&")[9].split("/")[4];
- //获取到品类的名称
- String name = map.get(categoryid);
- //TODO:切分数据,返回日期
- String stringTime = line.split("&")[8].split("=")[1];
- //获取日期
- String date = getDay(Long.valueOf(stringTime));
- //返回我们要的日期_品类的对象
- return date+"_"+name;
- }
- //格式化日期为我们能看懂的格式
- public static String getDay(long time) {
- SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MMM-dd");
- return simpleDateFormat.format(new Date(time));
- }
- }
完善之前4.3 的代码CategoryRealCount.java 完善代码逻辑(不再是输出到控制台,而是写入HBase中),先不要运行。先运行Test.java查询一下是否有22号这天的数据,结果再下文有介绍
- package com.jenrey.spark.category;
- import com.jenrey.hbase.dao.HBaseDao;
- import com.jenrey.hbase.dao.factory.HBaseFactory;
- import com.jenrey.utils.Utils;
- import kafka.serializer.StringDecoder;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.VoidFunction;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import scala.Tuple2;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Iterator;
- /**
- * SparkStreaming的数据来源来自于Kafka的topics的aura
- */
- public class CategoryRealCount {
- public static void main(String[] args) {
- /**
- * 初始化程序入口
- */
- SparkConf conf = new SparkConf();
- conf.setMaster("local");
- conf.setAppName("CategoryRealCount");
- /*JavaSparkContext sc = new JavaSparkContext(conf);
- JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(3));*/
- //或者使用下面方法就自动创建SparkContext()
- JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3));
- ssc.checkpoint("/home/hadoop/checkpoint");
- /**
- * 读取数据
- */
- HashMap<String, String> KafkaParams = new HashMap<>();
- KafkaParams.put("metadata.broker.list", "hadoop04:9092");
- HashSet<String> topics = new HashSet<>();
- topics.add("test");
- // createDirectStream使用直连的方式读取kafka中的数据
- KafkaUtils.createDirectStream(
- ssc,
- String.class, //返回的key类型
- String.class, //返回的value类型
- StringDecoder.class, //解码器
- StringDecoder.class, //解码器
- KafkaParams,
- topics
- ).map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) throws Exception {
- //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder]
- //直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2)
- return tuple2._2;
- }
- })
- /**
- * 代码的逻辑
- */
- //logDStream.print();
- .mapToPair(new PairFunction<String, String, Long>() {
- public Tuple2<String, Long> call(String line) throws Exception {
- return new Tuple2<String, Long>(Utils.getKey(line), 1L);
- }
- }).reduceByKey(new Function2<Long, Long, Long>() {
- @Override
- public Long call(Long x, Long y) throws Exception {
- return x + y;
- }
- })
- //TODO:插入到HBase数据库
- .foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
- @Override
- public void call(JavaPairRDD<String, Long> rdd) throws Exception {
- rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
- @Override
- public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
- //获取连接HBase的连接对象
- HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
- while (partition.hasNext()) {
- Tuple2<String, Long> tuple = partition.next();
- hBaseDao.save("aura", tuple._1, "f", "name", tuple._2);
- System.out.println(tuple._1 + " " + tuple._2);
- }
- }
- });
- }
- });
- /**
- * 启动应用程序
- */
- ssc.start();
- try {
- ssc.awaitTermination();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- ssc.stop();
- }
- }
修改Test.java中的代码如下:(目的在于从HBase中查询上面的代码是否写入到HBase中)
下图是未运行写入HBase数据的代码(CategoryRealCount.java )截图:
运行CategoryRealCount.java 代码;
下图是运行CategoryRealCount.java 后的截图
下图是运行CategoryRealCount.java的结果截图:
需要注意的是图中为中文字,所以使用Test.java查询的时候要注意传入的日期
9 Springboot项目
9.1 创建SpringBoot
如果成功就一直下一步,如果失败请参照下图
进入https://start.spring.io
然后下载压缩包在解压。在通过IDEA导入项目
9.2 开发SpringBoot项目
9.2.1 新建templates 目录
如果我们要写前后台的项目的话,除了导入包还固定要有一个目录 templates 名字不能瞎起,必须叫这个名
9.2.2 配置pom.xml 文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.jenrey</groupId>
- <artifactId>spring_boot</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
- <name>spring_boot</name>
- <description>Demo project for Spring Boot</description>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.0.2.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-thymeleaf</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.6.5</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>0.98.6-hadoop2</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>0.98.6-hadoop2</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
注意我们要有下面这个包
按照上面的配置文件配置即可
9.2.3 SpringBoot快速入门之Hello代码
新建一个Hello.java
代码如下:
- package com.jenrey.spring_boot;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.ResponseBody;
- @Controller
- public class Hello {
- @RequestMapping(value = "/test1")
- @ResponseBody
- public String test1(){
- return "1711";
- }
- }
运行代码
点击右上角的运行按钮
出现下图所示即为发布好了。不需要Tomcat之类的。
打开浏览器输出下面的网址:
http://localhost:8080/test1
9.2.4 快速入门之页面跳转
修改Hello.java代码如下
- package com.jenrey.spring_boot;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.ResponseBody;
- @Controller
- public class Hello {
- @RequestMapping(value = "/test1")
- @ResponseBody
- public String test1(){
- return "1711";
- }
- /**
- * 其实下面名字可以不一样,但是我们一般是一样的
- * 注意我们没有写@ResponseBody ,所以我们要在templates目录下新建一个和index1一样名字的HTML文件
- * 这样我们输入网址http://localhost:8080/test2
- * 就会自动跳转到index1页面(因为名字一样,index1.html)
- */
- @RequestMapping(value = "/test2")
- public String test2(){
- return "index1";
- }
- }
在templates 目录下新建一个HTML的页面
代码如下:
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>测试</title>
- </head>
- <body>
- <h1>今天天气不错</h1>
- </body>
- </html>
9.2.5 ECharts 使用
下载链接:http://echarts.baidu.com/download.html
引入ECharts及JQ:
注意需要先在resources目录下创建一个新的目录 static 目录
之后可以创建个js文件夹,因为后面可能还有css等
修改index1.html的代码如下
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>测试</title>
- </head>
- <script src="js/echarts.min.js"></script>
- <body>
- <div id="main" style="width: 600px;height: 400px;"></div>
- <script type="text/javascript">
- // 基于准备好的dom,初始化echarts实例
- var myChart = echarts.init(document.getElementById('main'));
- // 指定图表的配置项和数据
- var option = {
- title: {
- text: 'ECharts 入门示例'
- },
- tooltip: {},
- legend: {
- data:['销量']
- },
- xAxis: {
- data: ["衬衫","羊毛衫","雪纺衫","裤子","高跟鞋","袜子"]
- },
- yAxis: {},
- series: [{
- name: '销量',
- type: 'bar',
- data: [5, 20, 36, 10, 10, 20]
- }]
- };
- // 【05】使用刚指定的配置项和数据显示图表。
- myChart.setOption(option);
- </script>
- </body>
- </html>
访问:http://localhost:8080/test2
到此这个简单案例展示完成,我们在新建一个index2.html网页
在Hello.java中增加下面代码
- @RequestMapping(value = "/test3")
- public String test3(){
- return "index2";
- }
在index2.html中写下面代码:
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>实时统计品类点击</title>
- </head>
- <script src="js/echarts.min.js"></script>
- <body>
- <div id="main" style="width: 600px;height: 400px;"></div>
- <script type="text/javascript">
- // 基于准备好的dom,初始化echarts实例
- var myChart = echarts.init(document.getElementById('main'));
- // 指定图表的配置项和数据
- option = {
- title : {
- text: '某站点用户访问来源',
- subtext: '纯属虚构',
- x:'center'
- },
- tooltip : {
- trigger: 'item',
- formatter: "{a} <br/>{b} : {c} ({d}%)"
- },
- legend: {
- orient: 'vertical',
- left: 'left',
- data: ['直接访问','邮件营销','联盟广告','视频广告','搜索引擎']
- },
- series : [
- {
- name: '访问来源',
- type: 'pie',
- radius : '55%',
- center: ['50%', '60%'],
- data:[
- {value:335, name:'直接访问'},
- {value:310, name:'邮件营销'},
- {value:234, name:'联盟广告'},
- {value:135, name:'视频广告'},
- {value:1548, name:'搜索引擎'}
- ],
- itemStyle: {
- emphasis: {
- shadowBlur: 10,
- shadowOffsetX: 0,
- shadowColor: 'rgba(0, 0, 0, 0.5)'
- }
- }
- }
- ]
- };
- // 【05】使用刚指定的配置项和数据显示图表。
- myChart.setOption(option);
- </script>
- </body>
- </html>
发布
http://localhost:8080/test3
把之前的项目导入到我们的SpringBoot中。复制bean和hbase两个目录下的所有文件就行了。
10. 自动刷新页面实时数据开发展示阶段
编写Hello.java 的代码如下:
- package com.jenrey.spring_boot;
- import com.jenrey.bean.CategoryClickCount;
- import com.jenrey.hbase.dao.HBaseDao;
- import com.jenrey.hbase.dao.factory.HBaseFactory;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.ResponseBody;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.List;
- @Controller
- public class Hello {
- @RequestMapping(value = "/test1")
- @ResponseBody
- public String test1(){
- return "1711";
- }
- /**
- * 其实下面名字可以不一样,但是我们一般是一样的
- * 注意我们没有写@ResponseBody ,所以我们要在templates目录下新建一个和index1一样名字的HTML文件
- * 这样我们输入网址http://localhost:8080/test2
- * 就会自动跳转到index1页面(因为名字一样,index1.html)
- */
- @RequestMapping(value = "/test2")
- public String test2(){
- return "index1";
- }
- @RequestMapping(value = "/test3")
- public String test3(){
- return "index2";
- }
- @RequestMapping(value = "/getList")
- @ResponseBody
- public List<CategoryClickCount> getList(){
- System.out.println("======过来了么?======");
- /*获取当前系统时间,需要注意我们用的是昨天的数据,但是真正开发应该用下面注释的代码进行逻辑
- long time = System.currentTimeMillis();
- SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
- String today = simpleDateFormat.format(new Date(time));*/
- HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
- //注意实际情况下面的日期不应该写死
- List<CategoryClickCount> list = hBaseDao.count("aura", "2018-五月-22");
- System.out.println(list.size());
- return list;
- }
- }
修改index2.html 代码如下:
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>实时统计品类点击</title>
- </head>
- <script src="js/echarts.min.js"></script>
- <script src="js/jquery-3.1.1.min.js"></script>
- <body>
- <div id="main" style="width: 600px;height: 400px;"></div>
- <script type="text/javascript">
- // 基于准备好的dom,初始化echarts实例
- var myChart = echarts.init(document.getElementById('main'));
- var datas;
- //ajax
- $.ajax({
- //想访问的URL
- url:"getList",
- //异步还是同步,我们是同步
- async:false,
- //数据类型(指定返回数据类型自动变成json格式)
- dataType:'json',
- //调用回调函数
- success:function (data) {
- datas=data
- }
- })
- // 指定图表的配置项和数据
- option = {
- title : {
- text: '品类点击实时情况',
- subtext: '纯属虚构',
- x:'center'
- },
- tooltip : {
- trigger
爱奇艺分类点击实时统计相关推荐
- 【项目二】爱奇艺分类点击实时统计
项目源码: SparkStreaming部分:https://gitee.com/jenrey/project_two SpringBoot部分:https://gitee.com/jenrey/pr ...
- 爱奇艺评论爬虫、词频统计、词云、PaddleHub内容审核
需求 第一步:爱奇艺<青春有你2>评论数据爬取(参考链接:https://www.iqiyi.com/v_19ryfkiv8w.html#curid=15068699100_9f9bab7 ...
- 爱奇艺大数据实时项目统计项目
一.项目介绍 1.功能开发: 功能一:今天到现在为止,每个栏目的访问量 功能二:从搜索引擎引流过来的,每个栏目的访问量 二.Pyhton脚本实现模拟日志生产,定时执行 #coding=UTF-8 im ...
- 从 Spark Streaming 到 Apache Flink : 实时数据流在爱奇艺的演进
作者:陈越晨 整理:刘河 本文将为大家介绍Apache Flink在爱奇艺的生产与实践过程.你可以借此了解到爱奇艺引入Apache Flink的背景与挑战,以及平台构建化流程.主要内容如下: 爱奇艺在 ...
- kylin如何支持flink_日均万亿条数据如何处理?爱奇艺实时计算平台这样做
1.爱奇艺 Flink 服务现状 爱奇艺从 2012 年开始开展大数据业务,一开始只有二十几个节点,主要是 MapReduce.Hive 等离线计算任务.到 2014 年左右上线了 Storm.Spa ...
- 日均万亿条数据如何处理?爱奇艺实时计算平台这样做
摘要:本文由爱奇艺大数据服务负责人梁建煌分享,介绍爱奇艺如何基于 Apache Flink 技术打造实时计算平台,并通过业务应用案例分享帮助用户了解 Apache Flink 的技术特点及应用场景.提 ...
- 从Spark Streaming到Apache Flink: 实时数据流在爱奇艺的演进 | 技术头条
戳蓝字"CSDN云计算"关注我们哦! 技术头条:干货.简洁.多维全面.更多云计算精华知识尽在眼前,get要点.solve难题,统统不在话下! 作者:陈越晨 转自:高可用架构 本文将 ...
- 爱奇艺本地实时Cache方案
高并发系统离不开Cache,通过采用更多的本地Cache来提升系统吞吐量和稳定性是必然的,这其中的最大难点就是解决分布式本地Cache数据的实时性和一致性问题,否则本地Cache就无法更普遍应用于频繁 ...
- 从Spark Streaming到Apache Flink: 实时数据流在爱奇艺的演进
本文将为大家介绍Apache Flink在爱奇艺的生产与实践过程.你可以借此了解到爱奇艺引入Apache Flink的背景与挑战,以及平台构建化流程.主要内容如下: 爱奇艺在实时计算方面的的演化和遇到 ...
最新文章
- ​清华大学智能产业研究院AI医疗团队招聘知识图谱方向实习生
- MVVM框架下,WPF实现Datagrid里的全选和选择
- 图像降噪算法——维纳滤波
- BSP tag in CRM and JSP tag in Hybris
- 嘉奥丰农显示无法连接服务器,Arcaea无法连接服务器如何解决
- Nginx之gzip压缩配置
- 笔记本vm系统的分辨率不好调整_苹果笔记本电脑怎么设置使用今声优盒
- 使用SQL Server更改跟踪创建SQL Server审核
- 全网最快的网络服务器是什么,最好用最快的首选 DNS 服务器地址设置 (电信/联通/移动)...
- 传奇盗号木马清除手记(转)
- 华为python673集_[译] 使用 Python 的 Pandas 和 Seaborn 框架从 Kaggle 数据集中提取信息...
- 论文速递 EMNLP2022 | 接受论文抢先看!!!(内含下载列表)
- 使用STM32F4浮点运算(FPU)功能开启+使用DSP库
- 项目起名的一些小单词
- 2022年A特种设备相关管理(电梯)考试题模拟考试平台操作
- VisionPro联合C#编程,康耐视cognex
- 实用的60个CSS代码片段[转] 博客链接:http://blog.poetries.top
- 【大咖说Ⅱ】中科院信工所研究员林政:大规模预训练语言模型压缩技术
- 漫画:为什么C语言永不过时?
- FIFO(二):FIFO工作原理
热门文章
- linux activemq版本查看,Linux环境安装activemq
- 存在对其他服务器端口(TCP:8090)的攻击行为之我的服务器被黑了
- 【干货】选择外贸邮件群发软件,外贸邮件营销软件,邮件群发代发软件15条建议!
- android高仿微信拍摄,基于 VCamera,Android 仿微信录制短视频
- “千千静听”官方网站挂马(wxptdi.sys,msconkt.sys等***群的查杀)
- 结合源码谈谈Activity的exported属性
- 【概率论与数理统计】猴博士 笔记 p33-35 超几何分布、正态分布、二项分布
- K线与固定换手率情绪化换手率的一些思考
- Mysql之explain详解
- POI设置Excel表格的单元格格式及处理大数的科学计数问题