注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下陌陌综合案例
#博学谷IT学习技术支持

文章目录

  • 前言
  • 一、apache flume
  • 二、kafka
  • 三、陌陌案例_接收消息, 写入到HBase
    • 1.在HBase中创建表
    • 2.陌陌的rowkey设计
    • 3.创建maven的项目 添加相关的依赖:
  • 四、陌陌案例_对接Phoenix
  • 五、陌陌案例_对接HIVE
  • 六、陌陌案例_基于Flink 进行实时统计计算
    • 1.创建maven的项目 添加相关的依赖:
    • 2.封装pojo类:
    • 3.封装写入MySQL数据库的类:
    • 4.代码实现,写入MySQL数据库
  • 七、陌陌案例_MySQL数据
  • 九、FineBI集成实时功能
  • 总结

前言


这是一个陌陌真是综合案例
项目架构如图所示
1.离线部分:flume+kafka+HBase+Hive/phoneix
2:实时部分:flume+kafka+flink+MySQL+FineBI


一、apache flume

flume目前是apache旗下的一款顶级开源项目, 最初是有cloudera公司开发的, 后期贡献给apache, flume是一款专门用于数据数据采集的工作, 主要的目的将数据从一端传输的另一端操作
整个flume启动后, 就是一个agent实例对象, 而一个agent实例对象一般有三大组件组成:

    1. source组件: 数据源 主要用于对接数据源, 从数据源中采集数据 flume提供多种source组件
    1. sink组件: 下沉地(目的地) 主要用于将数据源采集过来数据通过sink下沉具体的目的中 flume提供多种sink组件
    1. channel组件: 管道 主要起到缓存的作用, 从source将数据写入到channel从, sink从channel获取数据, 然后继续下沉即可, flume提供多种channel组件

  • 采集的需求:
    监听 /export/data/momo_data/MOMO_DATA.dat 此文件, 一旦这个文件中有新的内容出现, 将对应数据写入到Kafka中, 同时还支持未来的扩展需要, 要求既能监听文件, 在未来也可以扩展监听目录
vim momo_tailDir_kafka.conf添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /export/data/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/data/momo_data/MOMO_DATA.dat
a1.sources.r1.maxBatchCount = 10a1.channels.c1.type = memorya1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

二、kafka

flume采集的数据作为kafka的生产者。消费者1为HBASE,消费者2为FLINK。
在Kafka中创建 MOMO_MSG 的Topic

创建Topic
./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic MOMO_MSG --partitions 6 --replication-factor 2

启动Flume组件, 准备进行数据采集工作

启动Flume
cd /export/server/flume/bin
./flume-ng agent -n a1  -c ../conf  -f ../conf/momo_tailDir_kafka.conf  -Dflume.root.logger=INFO,console

测试是否正常采集数据

cd /export/server/kafka/bin/
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic MOMO_MSG

三、陌陌案例_接收消息, 写入到HBase

1.在HBase中创建表

create_namespace 'MOMO_CHAT'
create 'MOMO_CHAT:MOMO_MSG',{NAME=>'C1',COMPRESSION=>'GZ'},{NUMREGIONS=>6,SPLITALGO=>'HexStringSplit'}

2.陌陌的rowkey设计

MD5HASH_发件人账户_收件人账户_消息时间(时间戳)

3.创建maven的项目 添加相关的依赖:

    <repositories><!--代码库--><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots></repository></repositories><dependencies><!--Hbase 客户端--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.1.0</version></dependency><!--kafka 客户端--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><target>1.8</target><source>1.8</source></configuration></plugin></plugins></build>

代码实现:

package com.itheima.momo_chat;import org.apache.commons.lang.text.StrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MomoChatConsumerToHBase {private static Connection hbaseConn;private static Table table;static{try {// 2.1 根据hbase的连接工厂类 创建hbase的连接对象Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");hbaseConn = ConnectionFactory.createConnection(conf);// 2.2 获取Hbase的管理类对象: admin / tabletable = hbaseConn.getTable(TableName.valueOf("MOMO_CHAT:MOMO_MSG"));}catch (Exception e){e.printStackTrace();}}public static void main(String[] args) throws Exception {//1. 接收Kafka中消息数据: topic 为 MOMO_MSG//1.1 创建Kafka的消费者核心类对象Properties props = new Properties();props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");props.setProperty("group.id","MOMO_G1");props.setProperty("enable.auto.commit","true");props.setProperty("auto.commit.interval.ms","1000");props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//1.2 设置监听Topicconsumer.subscribe(Arrays.asList("MOMO_MSG"));// 1.3 从Kafka中获取消息数据while(true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String msg = consumerRecord.value();System.out.println(msg);//2. 写入HBaseif(msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20){// 2.3 执行相关的操作: 写入数据// 2.3.1 生成rowkey的数据: MD5HASH_发件人账户_收件人账户_消息时间(时间戳)byte[] rowkey = getRowkey(msg);// 2.3.2 封装一行数据String[] fields = msg.split("\001");Put put = new Put(rowkey);put.addColumn("C1".getBytes(),"msg_time".getBytes(),fields[0].getBytes());put.addColumn("C1".getBytes(),"sender_nickyname".getBytes(),fields[1].getBytes());put.addColumn("C1".getBytes(),"sender_account".getBytes(),fields[2].getBytes());put.addColumn("C1".getBytes(),"sender_sex".getBytes(),fields[3].getBytes());put.addColumn("C1".getBytes(),"sender_ip".getBytes(),fields[4].getBytes());put.addColumn("C1".getBytes(),"sender_os".getBytes(),fields[5].getBytes());put.addColumn("C1".getBytes(),"sender_phone_type".getBytes(),fields[6].getBytes());put.addColumn("C1".getBytes(),"sender_network".getBytes(),fields[7].getBytes());put.addColumn("C1".getBytes(),"sender_gps".getBytes(),fields[8].getBytes());put.addColumn("C1".getBytes(),"receiver_nickyname".getBytes(),fields[9].getBytes());put.addColumn("C1".getBytes(),"receiver_ip".getBytes(),fields[10].getBytes());put.addColumn("C1".getBytes(),"receiver_account".getBytes(),fields[11].getBytes());put.addColumn("C1".getBytes(),"receiver_os".getBytes(),fields[12].getBytes());put.addColumn("C1".getBytes(),"receiver_phone_type".getBytes(),fields[13].getBytes());put.addColumn("C1".getBytes(),"receiver_network".getBytes(),fields[14].getBytes());put.addColumn("C1".getBytes(),"receiver_gps".getBytes(),fields[15].getBytes());put.addColumn("C1".getBytes(),"receiver_sex".getBytes(),fields[16].getBytes());put.addColumn("C1".getBytes(),"msg_type".getBytes(),fields[17].getBytes());put.addColumn("C1".getBytes(),"distance".getBytes(),fields[18].getBytes());put.addColumn("C1".getBytes(),"message".getBytes(),fields[19].getBytes());table.put(put);}}}}private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 此方法用于生成rowkey数据: MD5HASH_发件人账户_收件人账户_消息时间(时间戳)private static byte[] getRowkey(String msg) throws Exception{//1. 切割数据String[] fields = msg.split("\001");//2. 获取 发件人账户, 收件人账户  消息时间String msgTime = fields[0];String sender_account = fields[2];String receiver_account = fields[11];// 3- 拼接rowkey数据//生成 MD5HASHString md5Hash = MD5Hash.getMD5AsHex((sender_account+"_"+receiver_account).getBytes()).substring(0,8);// 将时间转换为时间戳long time = format.parse(msgTime).getTime();return (md5Hash+"_"+sender_account+"_"+receiver_account +"_"+time).getBytes();}
}

四、陌陌案例_对接Phoenix

-- 创建视图create view MOMO_CHAT.MOMO_MSG("id" varchar primary key,C1."msg_time" varchar,C1."sender_nickyname" varchar,C1."sender_account" varchar,C1."sender_sex" varchar,C1."sender_ip" varchar,C1."sender_os" varchar,C1."sender_phone_type" varchar,C1."sender_network" varchar,C1."sender_gps" varchar,C1."receiver_nickyname" varchar,C1."receiver_ip" varchar,C1."receiver_account" varchar,C1."receiver_os" varchar,C1."receiver_phone_type" varchar,C1."receiver_network" varchar,C1."receiver_gps" varchar,C1."receiver_sex" varchar,C1."msg_type" varchar,C1."distance" varchar,C1."message" varchar
);

五、陌陌案例_对接HIVE

create database if not exists MOMO_CHAT;
use MOMO_CHAT;
create external table MOMO_CHAT.MOMO_MSG (id string,msg_time string,sender_nickyname string,sender_account string,sender_sex string,sender_ip string,sender_os string,sender_phone_type string,sender_network string,sender_gps string,receiver_nickyname string,receiver_ip string,receiver_account string,receiver_os string,receiver_phone_type string,receiver_network string,receiver_gps string,receiver_sex string,msg_type string,distance string,message string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,
C1:sender_nickyname,
C1:sender_account,
C1:sender_sex,
C1:sender_ip,
C1:sender_os,
C1:sender_phone_type,
C1:sender_network,
C1:sender_gps,
C1:receiver_nickyname,
C1:receiver_ip,
C1:receiver_account,
C1:receiver_os,
C1:receiver_phone_type,
C1:receiver_network,
C1:receiver_gps,
C1:receiver_sex,
C1:msg_type,
C1:distance,
C1:message')
tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');

六、陌陌案例_基于Flink 进行实时统计计算

1- 实时统计总消息量
2- 实时统计各个地区发送消息总量
3- 实时统计各个地区接收消息总量
4- 实时统计各个客户发送的消息总量
5- 实时统计各个客户接收的消息总量

1.创建maven的项目 添加相关的依赖:

     <dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.36</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.41</version></dependency>

2.封装pojo类:

package com.itheima.momo_chat.pojo;public class MoMoCountBean {private Integer id ;private  Long moMoTotalCount ;private String moMoProvince ;private String moMoUsername ;private Long moMo_MsgCount ;private String groupType ;public String getGroupType() {return groupType;}public void setGroupType(String groupType) {this.groupType = groupType;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public Long getMoMoTotalCount() {return moMoTotalCount;}public void setMoMoTotalCount(Long moMoTotalCount) {this.moMoTotalCount = moMoTotalCount;}public String getMoMoProvince() {return moMoProvince;}public void setMoMoProvince(String moMoProvince) {this.moMoProvince = moMoProvince;}public String getMoMoUsername() {return moMoUsername;}public void setMoMoUsername(String moMoUsername) {this.moMoUsername = moMoUsername;}public Long getMoMo_MsgCount() {return moMo_MsgCount;}public void setMoMo_MsgCount(Long moMo_MsgCount) {this.moMo_MsgCount = moMo_MsgCount;}
}

3.封装写入MySQL数据库的类:

package com.itheima.momo_chat.steam;import com.itheima.momo_chat.pojo.MoMoCountBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.*;public class MysqlSink extends RichSinkFunction<MoMoCountBean> {private Statement stat;private Connection connection;//private String sql;private String status;public MysqlSink() {}public MysqlSink(String status) {this.status = status;}/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();stat = connection.createStatement();}@Overridepublic void close() throws Exception {super.close();//关闭连接和释放资源if (connection != null) {connection.close();}if (stat != null) {stat.close();}}/*** 每条数据的插入都要调用一次 invoke() 方法** @param value* @param context* @throws Exception*/@Overridepublic void invoke(MoMoCountBean value, Context context) throws Exception {if(status.equals("1")){String sql = "select * from momo_count where momo_groupType = '1'";ResultSet resultSet = stat.executeQuery(sql);boolean flag = resultSet.next();if(flag) {sql = "update momo_count set momo_totalcount= '"+value.getMoMoTotalCount()+ "' where momo_groupType = '1'";}else {sql = "insert into momo_count( momo_totalcount,momo_groupType) values ("+value.getMoMoTotalCount()+",'1') ";}stat.executeUpdate(sql);}else if (status.equals("2")){String sql = "select * from momo_count where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";ResultSet resultSet = stat.executeQuery(sql);boolean flag = resultSet.next();if(flag) {sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";}else {sql = "insert into momo_count( momo_province,momo_msgcount,momo_groupType) values ('"+value.getMoMoProvince()+"',"+value.getMoMo_MsgCount()+",'2') ";}stat.executeUpdate(sql);}else if (status.equals("3")){String sql = "select * from momo_count where momo_groupType = '3' and momo_province= '"+value.getMoMoProvince()+"' ";ResultSet resultSet = stat.executeQuery(sql);boolean flag = resultSet.next();if(flag) {sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '3' and momo_province= '"+value.getMoMoProvince()+"' ";}else {sql = "insert into momo_count( momo_province,momo_msgcount,momo_groupType) values ('"+value.getMoMoProvince()+"',"+value.getMoMo_MsgCount()+",'3') ";}stat.executeUpdate(sql);}else if (status.equals("4")){String sql = "select * from momo_count where momo_groupType = '4' and momo_username= '"+value.getMoMoUsername()+"' ";ResultSet resultSet = stat.executeQuery(sql);boolean flag = resultSet.next();if(flag) {sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '4' and momo_username= '"+value.getMoMoUsername()+"' ";}else {sql = "insert into momo_count( momo_username,momo_msgcount,momo_groupType) values ('"+value.getMoMoUsername()+"',"+value.getMoMo_MsgCount()+",'4') ";}stat.executeUpdate(sql);}else if (status.equals("5")){String sql = "select * from momo_count where momo_groupType = '5' and momo_username= '"+value.getMoMoUsername()+"' ";ResultSet resultSet = stat.executeQuery(sql);boolean flag = resultSet.next();if(flag) {sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '5' and momo_username= '"+value.getMoMoUsername()+"' ";}else {sql = "insert into momo_count( momo_username,momo_msgcount,momo_groupType) values ('"+value.getMoMoUsername()+"',"+value.getMoMo_MsgCount()+",'5') ";}stat.executeUpdate(sql);}}private static Connection getConnection() {Connection con = null;try {Class.forName("com.mysql.jdbc.Driver");con = DriverManager.getConnection("jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=UTF-8", "root", "123456");} catch (Exception e) {System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());}return con;}
}

4.代码实现,写入MySQL数据库

package com.itheima.momo_chat.steam;
import com.itheima.momo_chat.pojo.MoMoCountBean;
import com.itheima.momo_chat.utils.HttpClientUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class MomoFlinkSteam {public static void main(String[] args) throws Exception {//1. 创建Flink核心类环境类对象StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//2. 设置三大组件// 2.1 设置Source组件Properties props = new Properties();props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");props.setProperty("group.id","MOMO_G2");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("MOMO_MSG", newSimpleStringSchema(), props);DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);// 2.2 设置转换组件, 对消息数据进行实时处理操作//需求一: 实时统计总消息量totalMsgCount(streamSource);// 需求二: 实时统计各个地区发送的消息量totalProvinceSenderMsgCount(streamSource);// 需求三: 实时统计各个地区接受的消息量totalProvinceReceiverMsgCount(streamSource);// 需求四: 时统计各个用户发送的消息量totalUserSenderMsgCount(streamSource);// 需求五: 时统计各个用户接收的消息量totalUserReceiverMsgCount(streamSource);//3. 启动Flink程序env.execute("FlinkMoMo");}private static void totalMsgCount(DataStreamSource<String> streamSource) {SingleOutputStreamOperator<Tuple1<Long>> streamOperator = streamSource.map(new MapFunction<String, Tuple1<Long>>() {@Overridepublic Tuple1<Long> map(String msg) throws Exception {return new Tuple1<>(1L);}}).keyBy(0).sum(0);SingleOutputStreamOperator<MoMoCountBean> operator = streamOperator.map(new MapFunction<Tuple1<Long>, MoMoCountBean>() {@Overridepublic MoMoCountBean map(Tuple1<Long> tuple1) throws Exception {Long totalMsgCount = tuple1.f0;MoMoCountBean moMoCountBean = new MoMoCountBean();moMoCountBean.setMoMoTotalCount(totalMsgCount);return moMoCountBean;}});// 2.3 设置Sink组件,将数据进行输出到MYSQLoperator.addSink(new MysqlSink("1"));}private static void totalProvinceSenderMsgCount(DataStreamSource<String> streamSource) {SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String msg) throws Exception {return msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20;}});SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String msg) throws Exception {String[] fields = msg.split("\001");String[] latAndLng = fields[8].split(",");String lng = latAndLng[0].trim();String lat = latAndLng[1].trim();String province = HttpClientUtils.findByLatAndLng(lat, lng);return new Tuple2<>(province, 1L);}}).keyBy(0).sum(1);SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {@Overridepublic MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {String province = tuple2.f0;Long msgCount = tuple2.f1;MoMoCountBean moMoCountBean = new MoMoCountBean();moMoCountBean.setMoMoProvince(province);moMoCountBean.setMoMo_MsgCount(msgCount);return moMoCountBean;}});operator.addSink(new MysqlSink("2"));}private static void totalProvinceReceiverMsgCount(DataStreamSource<String> streamSource) {SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String msg) throws Exception {return msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20;}});SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String msg) throws Exception {String[] fields = msg.split("\001");String[] latAndLng = fields[15].split(",");String lng = latAndLng[0].trim();String lat = latAndLng[1].trim();String province = HttpClientUtils.findByLatAndLng(lat, lng);return new Tuple2<>(province, 1L);}}).keyBy(0).sum(1);SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {@Overridepublic MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {String province = tuple2.f0;Long msgCount = tuple2.f1;MoMoCountBean moMoCountBean = new MoMoCountBean();moMoCountBean.setMoMoProvince(province);moMoCountBean.setMoMo_MsgCount(msgCount);return moMoCountBean;}});operator.addSink(new MysqlSink("3"));}private static void totalUserSenderMsgCount(DataStreamSource<String> streamSource) {SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String msg) throws Exception {return msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20;}});SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String msg) throws Exception {String[] fields = msg.split("\001");String senderName = fields[1];return new Tuple2<>(senderName, 1L);}}).keyBy(0).sum(1);SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {@Overridepublic MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {String senderName = tuple2.f0;Long msgCount = tuple2.f1;MoMoCountBean moMoCountBean = new MoMoCountBean();moMoCountBean.setMoMoUsername(senderName);moMoCountBean.setMoMo_MsgCount(msgCount);return moMoCountBean;}});operator.addSink(new MysqlSink("4"));}private static void totalUserReceiverMsgCount(DataStreamSource<String> streamSource) {SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String msg) throws Exception {return msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20;}});SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String msg) throws Exception {String[] fields = msg.split("\001");String receiverName = fields[9];return new Tuple2<>(receiverName, 1L);}}).keyBy(0).sum(1);SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {@Overridepublic MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {String receiverName = tuple2.f0;Long msgCount = tuple2.f1;MoMoCountBean moMoCountBean = new MoMoCountBean();moMoCountBean.setMoMoUsername(receiverName);moMoCountBean.setMoMo_MsgCount(msgCount);return moMoCountBean;}});operator.addSink(new MysqlSink("5"));}
}

七、陌陌案例_MySQL数据

再第六步之前先再MySQL创建5各需求所对应的表

CREATE DATABASE /*!32312 IF NOT EXISTS*/`momo` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;USE `momo`;/*Table structure for table `momo_count` */CREATE TABLE `momo_count` (`id` int(11) NOT NULL AUTO_INCREMENT,`momo_totalcount` bigint(20) DEFAULT '0' COMMENT '总消息量',`momo_province` varchar(20) DEFAULT '-1' COMMENT '省份',`momo_username` varchar(20) DEFAULT '-1' COMMENT '用户名',`momo_msgcount` bigint(20) DEFAULT '0' COMMENT '消息量',`momo_grouptype` varchar(20) DEFAULT '-1' COMMENT '统计类型:1 总消息量 2 各省份发送量 3 各省份接收量 4 各用户发送量 5各用户接收量',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

九、FineBI集成实时功能

最后将MySQL的数据集成到FineBI做实时化的展示即可。


总结

这是一个从数据采集到最后前端展示的一个案例。
1.离线部分:flume+kafka+HBase+Hive/phoneix
2:实时部分:flume+kafka+flink+MySQL+FineBI

[博学谷学习记录] 超强总结,用心分享|陌陌综合案例相关推荐

  1. [博学谷学习记录]超强总结,用心分享|第07节 常用的API-----笔记篇

    目录 1.API 1.1 API概述-帮助文档的使用 1.2 键盘录入字符串 2. String类 2.1 String概述 2.2 String类的构造方法 2.4 创建字符串对象的区别对比 2.5 ...

  2. {博学谷学习记录} 超强总结,用心分享|狂野架构师-前置互联网架构演变过程

    本章以系统架构,数据架构,两种维度来进行讲解 目录 1 系统架构 1,1 单体架构 1.2 中台战略 2 数据库架构 2,1 单体架构 2.2 主从读写 2.3 分库分表 3 总结 1 系统架构 1, ...

  3. [博学谷学习记录]超强总结,用心分享|架构 Nacos入门

    提示:学习笔记 欢迎指点 文章目录 前言 一.Nacos安装 二.Nacos服务注册与发现 1.服务提供者Provider 2.服务消费者Consumer 三.Nacos作为配置中心 前言 Nacos ...

  4. [博学谷学习记录]超强总结,用心分享|架构 敏捷 - 开发管理之道

    提示:学习笔记 欢迎指点 文章目录 1.敏捷开发思想之道 2.面向对象开发之道 3.敏捷学习之道 1.敏捷开发思想之道 一名敏捷开发者,敏捷思想的掌握自然首当其冲.在敏捷开发实施的过程中,我们虽然不是 ...

  5. [博学谷学习记录]超强总结,用心分享|第16节 集合续-----笔记篇

    目录 1.HashSet集合 1.1HashSet集合概述和特点[应用] 1.2HashSet集合的基本应用[应用] 1.3哈希值[理解] 1.4哈希表结构[理解] 1.5HashSet集合存储学生对 ...

  6. [博学谷学习记录]超强总结,用心分享|Hive的压缩格式

    压缩格式 工具 算法 文件扩展名 是否可切分 DEFAULT 无 DEFAULT .deflate 否 Gzip gzip DEFAULT .gz 否 bzip2 bzip2 bzip2 .bz2 是 ...

  7. [博学谷学习记录]超强总结,用心分享|软件测试之计算机基础(一)

    本周开始学习软件测试,而我也即将开始在平台记录我的学习之路.我会把我的个人心得及掌握的知识发布在此平台,俗话说最好的输入就是输出,希望在输出的同时能有更多的收获,也希望与大家多多交流. 为了更好的学习 ...

  8. [博学谷学习记录]超强总结,用心分享|人工智能机械学习基础知识线性回归总结分享

    1.线性回归的核心是参数学习,线性回归和回归方程(函数)有关 2.线性回归是目标值预期是输入变量的线性组合 3.欠拟合的产生原因是学习到数据的特征过少 4.多元线性回归中的"线性" ...

  9. [博学谷学习记录] 超强总结,用心分享|JavaEE就业课-尊享无忧+Java基础语法|面向对象(1wk)

    学习笔记目录 目录 学习笔记目录 前言 一.变量 1. 关键字:被java赋予特殊含义的字符 2. 常量:不会发生改变的量(数据)​编辑 3. 变量:内存中的存储空间. 4. 类型转换 5. 算术运算 ...

最新文章

  1. js实现HTML标题栏中新消息提示效果
  2. scanf 用法大全
  3. JavaWeb基础—dbutils的简单入门
  4. 2021牛气新年素材模板,你真的不来看一看吗?
  5. 解决remix在线编译器连接本地私有链环境不成功的问题
  6. 小技巧 ----- 通过二进制串的位运算进行二维指数型枚举
  7. R︱Rstudio 1.0版本尝鲜(R notebook、下载链接、sparkR、代码时间测试profile)
  8. Infor 成功举办 Infor Next China
  9. 使用pyenv和virtualenv搭建python虚拟环境
  10. 防勒索病毒奇兵——MCK(云私钥)
  11. Java Swing实现局域网QQ
  12. win10浏览器闪退_win10系统打开ie11浏览器出现闪退的两种解决方法
  13. IDEA 2017.3.4 破解到2099年方法
  14. paymob QB冲值接口
  15. 1157:哥德巴赫猜想
  16. GSM 03.38 from Wikipedia
  17. 房地产销售一直不开单?看看销售冠军的逼单话术
  18. 什么是ChatGPT?
  19. jupyter lab指定exe文件添加、更改R kernel
  20. 6G新天线技术白皮书(附下载)

热门文章

  1. 【MATLAB】命令技巧
  2. 防火墙控制Docker端口开放与关闭
  3. 脑洞大开,晚安月亮纸尿裤透气性实验!
  4. ATFX:中国10月CPI增速2.1,USDCNH稳定在7.2附近
  5. 2023团队程序设计天梯赛选拔赛
  6. python程序网站_Python程序到网站的应用
  7. Live800:回复慢十秒流量往外跑,客服不应答推广费白花
  8. tar.gz文件命名及压缩解压方法
  9. 一年中的十二个月(英语)分别表示什么意思/
  10. 怎么正确理解「辩证法」