SparkStreaming项目(实时统计每个品类被点击的次数)
1、项目的流程:
每一个IP对应的名称:
2、需求
实时统计每个品类被点击的次数(用饼状图展示):
3、分析设计项目
新建一个Maven项目:
pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>1711categorycount</groupId>
<artifactId>1711categorycount</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.6-hadoop2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>0.98.6-hadoop2</version>
</dependency>
</dependencies>
</project>
4、模拟实时数据
往data.txt文件里面写入数据(Java代码):
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;
public class SimulateData {
public static void main(String[] args) {
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter("G:\\Scala\\实时统计每日的品类的点击次数\\data.txt"));
int i = 0;
while (i < 20000){
long time = System.currentTimeMillis();
int categoryid = new Random().nextInt(23);
bw.write("ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time="+time+"&p_url=http://list.iqiyi.com/www/"+categoryid+"/---.html");
bw.newLine();
i++;
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
data.txt文件部分结果:
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174569&p_url=http://list.iqiyi.com/www/9/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/4/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/10/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/4/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/1/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/13/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/8/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/3/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/17/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/6/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/22/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/14/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/3/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/10/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/14/---.html
把data.txt数据放入Linux系统。
模拟数据实时读取数据:
模拟数据实时的写入data.log:
实时读取data.log里面的数据:
5、配置kafka,Flume集群
Kafka学习(四)Kafka的安装
Flume学习(三)Flume的配置方式
6、flume发送数据到kafka
从data.log文件中读取实时数据到kafka:
第一步:配置Flume文件:(file2kafka.properties)
a1.sources = r1
a1.sinks = k1
a1.channels =c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data.log
a1.channel.c1 = memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = aura
a1.sinks.k1.brokerList = hodoop02:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 5
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动Flume的时候,需要一直实时的往data.log文件中写入数据。也就是说那个写入的脚本文件需要一直启动着。
第二步:需要一直启动着:
[hadoop@hadoop02 ~]$ cat data.txt | while read line
> do
> echo "$line" >> data.log
> sleep 0.5
> done
第三步:启动kafka消费者
[hadoop@hadoop03 kafka_2.11-1.0.0]$ bin/kafka-console-consumer.sh --zookeeper hadoop02:2181 --from-beginning --topic aura
第四步:启动Flume命令:Flume官网
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/file2kafka.properties --name a1 -Dflume.root.logger=INFO,console
代码实现从kafka(2.11-1.0.0)读取数据(java):
package Category;
import kafka.serializer.StringDecoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
public class CategoryRealCount {
public static void main(String[] args) {
//初始化程序入口
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("CategoryRealCount");
JavaStreamingContext ssc = new JavaStreamingContext(conf,Durations.seconds(3));
//读取数据
/*HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list","hadoop02:9092,hadoop03:9092,hadoop04:9092");*/
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "192.168.123.102:9092,192.168.123.103:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
/*HashSet<String> topics = new HashSet<>();
topics.add("aura");*/
Collection<String> topics = Arrays.asList("aura");
JavaDStream<String> logDStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
).map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> stringStringConsumerRecord) throws Exception {
return stringStringConsumerRecord.value();
}
});
logDStream.print();
/* JavaDStream<String> logDStream;
logDStream = KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
StringDecoder.class,
topics,
StringDecoder.class,
kafkaParams
).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) throws Exception {
return tuple2._2;
}
});*/
//代码的逻辑
//启动应用程序
ssc.start();
try {
ssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
ssc.stop();
}
}
7、品类实时统计
实时统计每日的品类的点击次数,存储到HBase(HBase表示如何设计的,rowkey是怎样设计)
rowkey的设计是:时间+name
例:2018.05.22_电影。这样做为rowkey。
创建一个HBase表:(java代码)
package habase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import java.io.IOException;
public class CreatTableTest {
public static void main(String[] args) {
//设置Hbase数据库的连接配置参数
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","192.168.123.102");
conf.set("hbase.zookeeper.property.clientPort","2181");
String tablename = "aura";
String[] famliy = {"f"};
try {
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
//创建表对象
HTableDescriptor tableDescriptor = new HTableDescriptor(tablename);
for (int i = 0;i < famliy.length;i++){
//设置表字段
tableDescriptor.addFamily(new HColumnDescriptor(famliy[i]));
}
//判断表是否存在,不存在则创建,存在则打印提示信息
if (hBaseAdmin.tableExists(tablename)){
System.out.println("表存在");
System.exit(0);
}else {
hBaseAdmin.createTable(tableDescriptor);
System.out.println("创建表成功");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
读取Hbase数据的代码主程序:(java代码)
package Catefory1.Category;
import dao.HBaseDao;
import dao.factory.HBaseFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import utils.DateUtils;
import utils.Utils;
import java.util.*;
public class CategoryRealCount11 {
public static String ck = "G:\\Scala\\spark1711\\day25-项目实时统计\\资料\\新建文件夹";
public static void main(String[] args) {
//初始化程序入口
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("CategoryRealCount");
JavaStreamingContext ssc = new JavaStreamingContext(conf,Durations.seconds(3));
ssc.checkpoint(ck);
//读取数据
/*HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list","hadoop02:9092,hadoop03:9092,hadoop04:9092");*/
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "192.168.123.102:9092,192.168.123.103:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
/*HashSet<String> topics = new HashSet<>();
topics.add("aura");*/
Collection<String> topics = Arrays.asList("aura");
JavaDStream<String> logDStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
).map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> stringStringConsumerRecord) throws Exception {
return stringStringConsumerRecord.value();
}
});
logDStream.mapToPair(new PairFunction<String, String, Long>() {
@Override
public Tuple2<String, Long> call(String line) throws Exception {
return new Tuple2<String, Long>(Utils.getKey(line),1L);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) throws Exception {
return aLong + aLong2;
}
}).foreachRDD(new VoidFunction2<JavaPairRDD<String, Long>, Time>() {
@Override
public void call(JavaPairRDD<String, Long> RDD, Time time) throws Exception {
RDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
@Override
public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
while (partition.hasNext()){
Tuple2<String, Long> tuple = partition.next();
hBaseDao.save("aura",tuple._1,"f","name",tuple._2);
System.out.println(tuple._1+" "+ tuple._2);
}
}
});
}
});
/* JavaDStream<String> logDStream;
logDStream = KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
StringDecoder.class,
topics,
StringDecoder.class,
kafkaParams
).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) throws Exception {
return tuple2._2;
}
});*/
//代码的逻辑
//启动应用程序
ssc.start();
try {
ssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
ssc.stop();
}
}
辅助类:
(bean):
package bean;
import java.io.Serializable;
public class CategoryClickCount implements Serializable {
//点击的品类
private String name;
//点击的次数
private long count;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public CategoryClickCount(String name, long count) {
this.name = name;
this.count = count;
}
}
(Utils):
package utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
public class Utils {
public static String getKey(String line) {
HashMap<String, String> map = new HashMap<String, String>();
map.put("0", "其他");
map.put("1", "电视剧");
map.put("2", "电影");
map.put("3", "综艺");
map.put("4", "动漫");
map.put("5", "纪录片");
map.put("6", "游戏");
map.put("7", "资讯");
map.put("8", "娱乐");
map.put("9", "财经");
map.put("10", "网络电影");
map.put("11", "片花");
map.put("12", "音乐");
map.put("13", "军事");
map.put("14", "教育");
map.put("15", "体育");
map.put("16", "儿童");
map.put("17", "旅游");
map.put("18", "时尚");
map.put("19", "生活");
map.put("20", "汽车");
map.put("21", "搞笑");
map.put("22", "广告");
map.put("23", "原创");
//获取到品类ID
String categoryid = line.split("&")[9].split("/")[4];
//获取到品类的名称
String name = map.get(categoryid);
//获取用户访问数据的时间
String stringTime = line.split("&")[8].split("=")[1];
//获取日期
String date = getDay(Long.valueOf(stringTime));
return date + "_" + name;
}
public static String getDay(long time){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
return simpleDateFormat.format(new Date());
}
}
(dao):
package dao;
import bean.CategoryClickCount;
import java.util.List;
public interface HBaseDao {
//往hbase里面插入一条数据
public void save (String tableName,String rowkey,
String family,String q ,long value);
//根据条件查询数据
public List<CategoryClickCount> count(String tableName, String rowkey);
}
(dao.impl):
package dao.impl;
import bean.CategoryClickCount;
import dao.HBaseDao;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseImpl implements HBaseDao {
HConnection hatablePool = null;
public HBaseImpl(){
Configuration conf = HBaseConfiguration.create();
//HBase自带的zookeeper
conf.set("hbase.zookeeper.quorum","hadoop02:2181");
try {
hatablePool = HConnectionManager.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 根据表名获取表对象
* @param tableName 表名
* @return 表对象
*/
public HTableInterface getTable(String tableName){
HTableInterface table = null;
try {
table = hatablePool.getTable(tableName);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
/**
* 往hbase里面插入一条数据
* @param tableName 表名
* @param rowkey rowkey
* @param family 列族
* @param q 品类
* @param value 出现了的次数
* 2018-12-12_电影 f q 19
* updateStateBykey 对内存的要求高一点
* reduceBykey 对内存要求低一点
*/
@Override
public void save(String tableName, String rowkey, String family, String q, long value) {
HTableInterface table = getTable(tableName);
try {
table.incrementColumnValue(rowkey.getBytes(),family.getBytes(),q.getBytes(),value);
} catch (IOException e) {
e.printStackTrace();
}finally {
if (table != null){
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 根据rowkey 返回数据
* @param tableName 表名
* @param rowkey rowkey
* @return
*/
@Override
public List<CategoryClickCount> count(String tableName, String rowkey) {
ArrayList<CategoryClickCount> list = new ArrayList<>();
HTableInterface table = getTable(tableName);
PrefixFilter prefixFilter = new PrefixFilter(rowkey.getBytes());//用左查询进行rowkey查询
Scan scan = new Scan();
scan.setFilter(prefixFilter);
try {
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner){
for (Cell cell : result.rawCells()){
byte[] date_name = CellUtil.cloneRow(cell);
String name = new String(date_name).split("_")[1];
byte[] value = CellUtil.cloneValue(cell);
long count = Bytes.toLong(value);
CategoryClickCount categoryClickCount = new CategoryClickCount(name, count);
list.add(categoryClickCount);
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (table != null){
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return list;
}
}
(dao.factory):
package dao.factory;
import dao.HBaseDao;
import dao.impl.HBaseImpl;
public class HBaseFactory {
public static HBaseDao getHBaseDao(){
return new HBaseImpl();
}
}
测试类:
(Test):
package test;
import bean.CategoryClickCount;
import dao.HBaseDao;
import dao.factory.HBaseFactory;
import java.util.List;
public class Test {
public static void main(String[] args) {
HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
hBaseDao.save("aura",
"2018-05-23_电影","f","name",10L);
hBaseDao.save("aura",
"2018-05-23_电影","f","name",20L);
hBaseDao.save("aura",
"2018-05-21_电视剧","f","name",11L);
hBaseDao.save("aura",
"2018-05-21_电视剧","f","name",24L);
hBaseDao.save("aura",
"2018-05-23_电视剧","f","name",110L);
hBaseDao.save("aura",
"2018-05-23_电视剧","f","name",210L);
List<CategoryClickCount> list = hBaseDao.count("aura", "2018-05-21");
for (CategoryClickCount cc : list){
System.out.println(cc.getName() + " "+ cc.getCount());
}
}
}
8、流程
1、项目架构
原文链接:https://blog.csdn.net/qq_41851454/article/details/80402483
SparkStreaming项目(实时统计每个品类被点击的次数)相关推荐
- 用户行为分析大数据系统(实时统计每个分类被点击的次数,实时计算商品销售额,统计网站PV、UV )
Spark Streaming实战对论坛网站动态行为pv,uv,注册人数,跳出率的多维度分析_小强签名设计 的博客-CSDN博客_spark streaming uv 实时统计每天pv,uv的spar ...
- Spark日志分析项目Demo(8)--SparkStream,广告点击流量实时统计
广告点击统计需求: (1)对接kafka,获得数据 (2)发现某个用户某天对某个广告的点击量已经大于等于100,写入黑名单,进行过滤 (3)计算广告点击流量实时统计结果 (4)实时统计每天每个省份to ...
- 【项目二】爱奇艺分类点击实时统计
项目源码: SparkStreaming部分:https://gitee.com/jenrey/project_two SpringBoot部分:https://gitee.com/jenrey/pr ...
- 视频访问量实时统计项目学习
视频访问量实时统计项目学习 (一)效果图 先来两个效果图看看 图1 图2 (二)日志产生 图1显示的效果表示的是对于某个视频网站的访问的视频类别,做的模拟统计示意效果图,比如爱奇艺视频,对于爱奇艺视频 ...
- Kafka项目实战-用户日志上报实时统计之编码实践
1.概述 本课程的视频教程地址:<Kafka实战项目之编码实践> 该课程我以用户实时上报日志案例为基础,带着大家去完成各个KPI的编码工作,实现生产模块.消费模块,数据持久化,以及应用调 ...
- 基于Flink的电影数据实时统计平台(一):项目展示
文章目录 一.项目介绍 二.项目演示 2.1 前端观影/电影首页 2.2 前端观影/播放电影 2.3 数据查询/电影筛选 2.4 数据查询/评分细查 2.5 数据查询/可视化数据 三.相关博客 一.项 ...
- Spark Streaming 项目实战 (4) | 得到最近1小时广告点击量实时统计并写入到redis
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...
- 112.Spark大型电商项目-广告点击流量实时统计-需求分析、技术方案设计以及数据设计
目录 需求分析 技术方案设计 数据表设计 ad_user_click_count //用户点击广告表 ad_blacklist //用户黑名单 ad_stat //广告状态表 ad_province ...
- 114.Spark大型电商项目-广告点击流量实时统计-使用高性能方式将实时计算结果写入MySQL中
目录 误区 Spark Streaming foreachRDD的正确使用方式 对于这种实时计算程序的mysql插入,有两种pattern(模式) 代码 AdUserClickCount.java I ...
最新文章
- 过程改进建设中的常见奖励措施
- 重磅!!!微软发布.NET Core 2.2
- 最佳字符串对齐的Java实现
- 【Java从入门到头秃专栏 】(三) 控制流程 Math Date DateFormat Calendar System BigDecimal Random
- Oracle一条SQL语句插入多条记录
- JavaScript 函数replace揭秘
- UltraCompare 22 for Mac(文件比较工具)
- 超市仓库管理系统python+tkinter
- 图片自适应页面大小的简单HTML代码
- Matlab求方差,均值
- VMware虚拟机安装黑苹果
- 最新互联网架构师视频教程+源码20G
- 201912月全国计算机二级考试,201912月天津计算机二级报名时间:12月5日-12月7日!附报名入口...
- bigsur cdr文件_clover和oc的杂交-openclover,big sur也可以直接用clover来引导了,小白一步一步教你...
- 团队作业3 需求改进系统设计
- 蓝牙鼠标windows linux,关于windows linux双系统共用蓝牙鼠标的教程
- 京东,想说爱你,并不容易!
- 微信7.0.10正式版来了!朋友圈斗图彻底关闭了!
- 安装WordPress的一些注意事项
- AI黑科技:目前最流行的人工智能换脸软件(FakeAPP/Faceswap/Openfaceswap/Deepfacelab)的简介、对比之详细攻略
热门文章
- idea怎么找到路径下面的js_怎么找到Win7桌面存储路径?怎么把Win7桌面转到D盘?...
- Java知识系统回顾整理01基础04操作符02关系操作符
- mongodb4简明笔记
- Luogu 1941 【NOIP2014】飞扬的小鸟 (动态规划)
- QT调用百度语音REST API实现语音合成
- iOS webview自适应实际内容高度的4种方法
- eclips git中的add to Index无效解决
- mysql 乱码解决方案
- 啥叫“Functional Programming ”???
- Spark源码分析之Checkpoint机制