1、项目的流程:
每一个IP对应的名称:

2、需求
实时统计每个品类被点击的次数(用饼状图展示):

3、分析设计项目
新建一个Maven项目:

pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

<groupId>1711categorycount</groupId>
    <artifactId>1711categorycount</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.98.6-hadoop2</version>
        </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>0.98.6-hadoop2</version>
        </dependency>

</dependencies>

</project>
4、模拟实时数据
往data.txt文件里面写入数据(Java代码):

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

public class SimulateData {
    public static void main(String[] args) {
        BufferedWriter bw = null;
        try {
            bw = new BufferedWriter(new FileWriter("G:\\Scala\\实时统计每日的品类的点击次数\\data.txt"));

int i = 0;
            while (i < 20000){
                long time = System.currentTimeMillis();

int categoryid = new Random().nextInt(23);
                bw.write("ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time="+time+"&p_url=http://list.iqiyi.com/www/"+categoryid+"/---.html");
                bw.newLine();
                i++;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                bw.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

}
}
data.txt文件部分结果:

ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174569&p_url=http://list.iqiyi.com/www/9/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/4/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/10/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/4/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/1/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/13/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/8/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/3/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/17/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/6/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/22/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/14/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/3/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/10/---.html
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1526975174570&p_url=http://list.iqiyi.com/www/14/---.html
把data.txt数据放入Linux系统。

模拟数据实时读取数据:

模拟数据实时的写入data.log:

实时读取data.log里面的数据:

5、配置kafka,Flume集群
Kafka学习(四)Kafka的安装

Flume学习(三)Flume的配置方式

6、flume发送数据到kafka
从data.log文件中读取实时数据到kafka:

第一步:配置Flume文件:(file2kafka.properties)

a1.sources = r1
a1.sinks = k1
a1.channels =c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data.log

a1.channel.c1 = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = aura
a1.sinks.k1.brokerList = hodoop02:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 5

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动Flume的时候,需要一直实时的往data.log文件中写入数据。也就是说那个写入的脚本文件需要一直启动着。

第二步:需要一直启动着:

[hadoop@hadoop02 ~]$ cat data.txt | while read line
> do
> echo "$line" >> data.log
> sleep 0.5
> done
第三步:启动kafka消费者

[hadoop@hadoop03 kafka_2.11-1.0.0]$  bin/kafka-console-consumer.sh --zookeeper hadoop02:2181 --from-beginning --topic aura
第四步:启动Flume命令:Flume官网

[hadoop@hadoop02 apache-flume-1.8.0-bin]$ bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/file2kafka.properties --name a1 -Dflume.root.logger=INFO,console

代码实现从kafka(2.11-1.0.0)读取数据(java):
package Category;

import kafka.serializer.StringDecoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.*;

public class CategoryRealCount {
    public static void main(String[] args) {

//初始化程序入口
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("CategoryRealCount");

JavaStreamingContext ssc = new JavaStreamingContext(conf,Durations.seconds(3));

//读取数据
        /*HashMap<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list","hadoop02:9092,hadoop03:9092,hadoop04:9092");*/
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "192.168.123.102:9092,192.168.123.103:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        /*HashSet<String> topics = new HashSet<>();
        topics.add("aura");*/
        Collection<String> topics = Arrays.asList("aura");

JavaDStream<String> logDStream = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
        ).map(new Function<ConsumerRecord<String, String>, String>() {
            @Override
            public String call(ConsumerRecord<String, String> stringStringConsumerRecord) throws Exception {
                return stringStringConsumerRecord.value();
            }
        });
        logDStream.print();
       /* JavaDStream<String> logDStream;
        logDStream = KafkaUtils.createDirectStream(
                ssc,
                String.class,
                String.class,
                StringDecoder.class,
                topics,
                StringDecoder.class,
                kafkaParams
        ).map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) throws Exception {
                return tuple2._2;
            }
        });*/

//代码的逻辑
        //启动应用程序
        ssc.start();

try {
            ssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ssc.stop();

}
}
7、品类实时统计
实时统计每日的品类的点击次数,存储到HBase(HBase表示如何设计的,rowkey是怎样设计)

rowkey的设计是:时间+name

例:2018.05.22_电影。这样做为rowkey。

创建一个HBase表:(java代码)
package habase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;

import java.io.IOException;

public class CreatTableTest {
    public static void main(String[] args) {

//设置Hbase数据库的连接配置参数
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","192.168.123.102");
        conf.set("hbase.zookeeper.property.clientPort","2181");
        String tablename = "aura";
        String[] famliy = {"f"};
        try {
            HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
            //创建表对象
            HTableDescriptor tableDescriptor = new HTableDescriptor(tablename);
            for (int i = 0;i < famliy.length;i++){
                //设置表字段
                tableDescriptor.addFamily(new HColumnDescriptor(famliy[i]));
            }
            //判断表是否存在,不存在则创建,存在则打印提示信息
            if (hBaseAdmin.tableExists(tablename)){
                System.out.println("表存在");
                System.exit(0);
            }else {
                hBaseAdmin.createTable(tableDescriptor);
                System.out.println("创建表成功");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
读取Hbase数据的代码主程序:(java代码)
package Catefory1.Category;

import dao.HBaseDao;
import dao.factory.HBaseFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import utils.DateUtils;
import utils.Utils;

import java.util.*;

public class CategoryRealCount11 {
    public  static String ck = "G:\\Scala\\spark1711\\day25-项目实时统计\\资料\\新建文件夹";

public static void main(String[] args) {

//初始化程序入口
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("CategoryRealCount");

JavaStreamingContext ssc = new JavaStreamingContext(conf,Durations.seconds(3));
        ssc.checkpoint(ck);

//读取数据
        /*HashMap<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list","hadoop02:9092,hadoop03:9092,hadoop04:9092");*/
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "192.168.123.102:9092,192.168.123.103:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        /*HashSet<String> topics = new HashSet<>();
        topics.add("aura");*/
        Collection<String> topics = Arrays.asList("aura");

JavaDStream<String> logDStream = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
        ).map(new Function<ConsumerRecord<String, String>, String>() {
            @Override
            public String call(ConsumerRecord<String, String> stringStringConsumerRecord) throws Exception {
                return stringStringConsumerRecord.value();
            }
        });

logDStream.mapToPair(new PairFunction<String, String, Long>() {
            @Override
            public Tuple2<String, Long> call(String line) throws Exception {
                return new Tuple2<String, Long>(Utils.getKey(line),1L);
            }
        }).reduceByKey(new Function2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) throws Exception {

return aLong + aLong2;
            }
        }).foreachRDD(new VoidFunction2<JavaPairRDD<String, Long>, Time>() {
            @Override
            public void call(JavaPairRDD<String, Long> RDD, Time time) throws Exception {
                RDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
                    @Override
                    public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
                        HBaseDao hBaseDao = HBaseFactory.getHBaseDao();
                        while (partition.hasNext()){
                            Tuple2<String, Long> tuple = partition.next();
                            hBaseDao.save("aura",tuple._1,"f","name",tuple._2);
                            System.out.println(tuple._1+" "+  tuple._2);

}

}
                });
            }
        });

/* JavaDStream<String> logDStream;
        logDStream = KafkaUtils.createDirectStream(
                ssc,
                String.class,
                String.class,
                StringDecoder.class,
                topics,
                StringDecoder.class,
                kafkaParams
        ).map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) throws Exception {
                return tuple2._2;
            }
        });*/
        //代码的逻辑
        //启动应用程序
        ssc.start();

try {
            ssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ssc.stop();
    }
}
辅助类:
(bean):
package bean;

import java.io.Serializable;

public class CategoryClickCount implements Serializable {
    //点击的品类
    private String name;

//点击的次数
    private long count;

public String getName() {
        return name;
    }

public void setName(String name) {
        this.name = name;
    }

public long getCount() {
        return count;
    }

public void setCount(long count) {
        this.count = count;
    }

public CategoryClickCount(String name, long count) {
        this.name = name;
        this.count = count;
    }
}
(Utils):
package utils;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

public class Utils {
    public static String getKey(String line) {
        HashMap<String, String> map = new HashMap<String, String>();
        map.put("0", "其他");
        map.put("1", "电视剧");
        map.put("2", "电影");
        map.put("3", "综艺");
        map.put("4", "动漫");
        map.put("5", "纪录片");
        map.put("6", "游戏");
        map.put("7", "资讯");
        map.put("8", "娱乐");
        map.put("9", "财经");
        map.put("10", "网络电影");
        map.put("11", "片花");
        map.put("12", "音乐");
        map.put("13", "军事");
        map.put("14", "教育");
        map.put("15", "体育");
        map.put("16", "儿童");
        map.put("17", "旅游");
        map.put("18", "时尚");
        map.put("19", "生活");
        map.put("20", "汽车");
        map.put("21", "搞笑");
        map.put("22", "广告");
        map.put("23", "原创");

//获取到品类ID
        String categoryid = line.split("&")[9].split("/")[4];
        //获取到品类的名称
        String name = map.get(categoryid);
        //获取用户访问数据的时间
        String stringTime = line.split("&")[8].split("=")[1];
        //获取日期
        String date = getDay(Long.valueOf(stringTime));
        return date + "_" + name;

}
    public static String getDay(long time){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
        return simpleDateFormat.format(new Date());
    }

}
(dao):
package dao;

import bean.CategoryClickCount;

import java.util.List;

public interface HBaseDao {

//往hbase里面插入一条数据
    public void save (String tableName,String rowkey,
                      String family,String q ,long value);

//根据条件查询数据
    public List<CategoryClickCount> count(String tableName, String rowkey);

}
(dao.impl):
package dao.impl;

import bean.CategoryClickCount;
import dao.HBaseDao;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class HBaseImpl implements HBaseDao {

HConnection hatablePool = null;
    public HBaseImpl(){
        Configuration conf = HBaseConfiguration.create();
        //HBase自带的zookeeper
        conf.set("hbase.zookeeper.quorum","hadoop02:2181");
        try {
            hatablePool = HConnectionManager.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

/**
     * 根据表名获取表对象
     * @param tableName  表名
     * @return 表对象
     */
    public HTableInterface getTable(String tableName){
        HTableInterface table = null;
        try {
            table = hatablePool.getTable(tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return table;
    }

/**
     * 往hbase里面插入一条数据
     * @param tableName 表名
     * @param rowkey rowkey
     * @param family 列族
     * @param q 品类
     * @param value 出现了的次数
     *              2018-12-12_电影 f q 19
     *              updateStateBykey 对内存的要求高一点
     *              reduceBykey 对内存要求低一点
     */
    @Override
    public void save(String tableName, String rowkey, String family, String q, long value) {

HTableInterface table = getTable(tableName);
        try {
            table.incrementColumnValue(rowkey.getBytes(),family.getBytes(),q.getBytes(),value);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (table != null){
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

}

/**
     * 根据rowkey 返回数据
     * @param tableName 表名
     * @param rowkey rowkey
     * @return
     */
    @Override
    public List<CategoryClickCount> count(String tableName, String rowkey) {
        ArrayList<CategoryClickCount> list = new ArrayList<>();
        HTableInterface table = getTable(tableName);
        PrefixFilter prefixFilter = new PrefixFilter(rowkey.getBytes());//用左查询进行rowkey查询
        Scan scan = new Scan();
        scan.setFilter(prefixFilter);

try {
            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner){
                for (Cell cell : result.rawCells()){
                    byte[] date_name = CellUtil.cloneRow(cell);
                    String name = new String(date_name).split("_")[1];
                    byte[] value = CellUtil.cloneValue(cell);
                    long count = Bytes.toLong(value);
                    CategoryClickCount categoryClickCount = new CategoryClickCount(name, count);
                    list.add(categoryClickCount);

}
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (table != null){
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return list;
    }
}
(dao.factory):
package dao.factory;

import dao.HBaseDao;
import dao.impl.HBaseImpl;

public class HBaseFactory {
    public static HBaseDao getHBaseDao(){
        return new HBaseImpl();
    }
}
测试类:
(Test):
package test;

import bean.CategoryClickCount;
import dao.HBaseDao;
import dao.factory.HBaseFactory;

import java.util.List;

public class Test {
    public static void main(String[] args) {

HBaseDao hBaseDao = HBaseFactory.getHBaseDao();

hBaseDao.save("aura",
                "2018-05-23_电影","f","name",10L);
        hBaseDao.save("aura",
                "2018-05-23_电影","f","name",20L);
        hBaseDao.save("aura",
                "2018-05-21_电视剧","f","name",11L);
        hBaseDao.save("aura",
                "2018-05-21_电视剧","f","name",24L);
        hBaseDao.save("aura",
                "2018-05-23_电视剧","f","name",110L);
        hBaseDao.save("aura",
                "2018-05-23_电视剧","f","name",210L);

List<CategoryClickCount> list = hBaseDao.count("aura", "2018-05-21");
        for (CategoryClickCount cc : list){
            System.out.println(cc.getName() + " "+ cc.getCount());
        }

}
}
8、流程

1、项目架构

原文链接:https://blog.csdn.net/qq_41851454/article/details/80402483

SparkStreaming项目(实时统计每个品类被点击的次数)相关推荐

  1. 用户行为分析大数据系统(实时统计每个分类被点击的次数,实时计算商品销售额,统计网站PV、UV )

    Spark Streaming实战对论坛网站动态行为pv,uv,注册人数,跳出率的多维度分析_小强签名设计 的博客-CSDN博客_spark streaming uv 实时统计每天pv,uv的spar ...

  2. Spark日志分析项目Demo(8)--SparkStream,广告点击流量实时统计

    广告点击统计需求: (1)对接kafka,获得数据 (2)发现某个用户某天对某个广告的点击量已经大于等于100,写入黑名单,进行过滤 (3)计算广告点击流量实时统计结果 (4)实时统计每天每个省份to ...

  3. 【项目二】爱奇艺分类点击实时统计

    项目源码: SparkStreaming部分:https://gitee.com/jenrey/project_two SpringBoot部分:https://gitee.com/jenrey/pr ...

  4. 视频访问量实时统计项目学习

    视频访问量实时统计项目学习 (一)效果图 先来两个效果图看看 图1 图2 (二)日志产生 图1显示的效果表示的是对于某个视频网站的访问的视频类别,做的模拟统计示意效果图,比如爱奇艺视频,对于爱奇艺视频 ...

  5. Kafka项目实战-用户日志上报实时统计之编码实践

    1.概述 本课程的视频教程地址:<Kafka实战项目之编码实践>  该课程我以用户实时上报日志案例为基础,带着大家去完成各个KPI的编码工作,实现生产模块.消费模块,数据持久化,以及应用调 ...

  6. 基于Flink的电影数据实时统计平台(一):项目展示

    文章目录 一.项目介绍 二.项目演示 2.1 前端观影/电影首页 2.2 前端观影/播放电影 2.3 数据查询/电影筛选 2.4 数据查询/评分细查 2.5 数据查询/可视化数据 三.相关博客 一.项 ...

  7. Spark Streaming 项目实战 (4) | 得到最近1小时广告点击量实时统计并写入到redis

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  8. 112.Spark大型电商项目-广告点击流量实时统计-需求分析、技术方案设计以及数据设计

    目录 需求分析 技术方案设计 数据表设计 ad_user_click_count //用户点击广告表 ad_blacklist //用户黑名单 ad_stat  //广告状态表 ad_province ...

  9. 114.Spark大型电商项目-广告点击流量实时统计-使用高性能方式将实时计算结果写入MySQL中

    目录 误区 Spark Streaming foreachRDD的正确使用方式 对于这种实时计算程序的mysql插入,有两种pattern(模式) 代码 AdUserClickCount.java I ...

最新文章

  1. 过程改进建设中的常见奖励措施
  2. 重磅!!!微软发布.NET Core 2.2
  3. 最佳字符串对齐的Java实现
  4. 【Java从入门到头秃专栏 】(三) 控制流程 Math Date DateFormat Calendar System BigDecimal Random
  5. Oracle一条SQL语句插入多条记录
  6. JavaScript 函数replace揭秘
  7. UltraCompare 22 for Mac(文件比较工具)
  8. 超市仓库管理系统python+tkinter
  9. 图片自适应页面大小的简单HTML代码
  10. Matlab求方差,均值
  11. VMware虚拟机安装黑苹果
  12. 最新互联网架构师视频教程+源码20G
  13. 201912月全国计算机二级考试,201912月天津计算机二级报名时间:12月5日-12月7日!附报名入口...
  14. bigsur cdr文件_clover和oc的杂交-openclover,big sur也可以直接用clover来引导了,小白一步一步教你...
  15. 团队作业3 需求改进系统设计
  16. 蓝牙鼠标windows linux,关于windows linux双系统共用蓝牙鼠标的教程
  17. 京东,想说爱你,并不容易!
  18. 微信7.0.10正式版来了!朋友圈斗图彻底关闭了!
  19. 安装WordPress的一些注意事项
  20. AI黑科技:目前最流行的人工智能换脸软件(FakeAPP/Faceswap/Openfaceswap/Deepfacelab)的简介、对比之详细攻略

热门文章

  1. idea怎么找到路径下面的js_怎么找到Win7桌面存储路径?怎么把Win7桌面转到D盘?...
  2. Java知识系统回顾整理01基础04操作符02关系操作符
  3. mongodb4简明笔记
  4. Luogu 1941 【NOIP2014】飞扬的小鸟 (动态规划)
  5. QT调用百度语音REST API实现语音合成
  6. iOS webview自适应实际内容高度的4种方法
  7. eclips git中的add to Index无效解决
  8. mysql 乱码解决方案
  9. 啥叫“Functional Programming ”???
  10. Spark源码分析之Checkpoint机制