文章目录

  • 一、 题目
    • 题目和数据
  • 二、 pom依赖
  • 三、建表语句
  • 四、 连接kafka配置类
  • 五、 自定义分区类
  • 六、 读取数据并发送数据
  • 七、 消费数据,把数据存储到mysql

一、 题目

题目和数据
链接: https://pan.baidu.com/s/1YVvhqy1u9rILqQWzJnNoVA
提取码: twt3
1、以下是RNG S8 8强赛失败后,官微发表道歉微博下一级评论
1.1、在kafak中创建rng_comment主题,设置2个分区2个副本
1.2、数据预处理,把空行过滤掉
1.3、请把给出的文件写入到kafka中,根据数据id进行分区,id为奇数的发送到一个分区中,偶数的发送到另一个分区
1.4、使用Spark Streaming对接kafka
1.5、使用Spark Streaming对接kafka之后进行计算
在mysql中创建一个数据库rng_comment
在数据库rng_comment创建vip_rank表,字段为数据的所有字段
在数据库rng_comment创建like_status表,字段为数据的所有字段
在数据库rng_comment创建count_conmment表,字段为 时间,条数 1.5.1、查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中1.5.2、查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中1.5.3、分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

数据说明:
rng_comment.txt文件中的数据

字段 字段含义
index 数据id
child_comment 回复数量
comment_time 评论时间
content 评论内容
da_v 微博个人认证
like_status
pic 图片评论url
user_id 微博用户id
user_name 微博用户名
vip_rank 微博会员等级
stamp 时间戳

二、 pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>spark01</artifactId><version>1.0-SNAPSHOT</version><!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>jboss</id><url>http://repository.jboss.com/nexus/content/groups/public</url></repository><repository><id>scala-tools.org</id><name>Scala-tools Maven2 Repository</name><url>http://scala-tools.org/repo-releases</url></repository></repositories><pluginRepositories><pluginRepository><id>scala-tools.org</id><name>Scala-tools Maven2 Repository</name><url>http://scala-tools.org/repo-releases</url></pluginRepository></pluginRepositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.8</scala.version><scala.compat.version>2.11</scala.compat.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.2.0</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><!-- <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><!--<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0-mr1-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.0-cdh5.14.0</version></dependency>--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.4</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version></dependency><dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- 指定编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin><!-- 指定编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

三、建表语句

-- 创建数据库
create database rng_comment character set utf8;
-- 使用数据库
use rng_comment;
-- 创建 vip_rank 表
create table vip_rank
(`index`       varchar(32) comment '数据id',child_comment varchar(32) comment '回复数量 ',comment_time  varchar(255) comment '评论时间',content       varchar(255) comment '评论内容',da_v          varchar(32) comment '微博个人认证',like_status   varchar(32) comment '赞',pic           varchar(255) comment '图片评论url',user_id       varchar(255) comment '微博用户id',user_name     varchar(32) comment '微博用户名',vip_rank      varchar(32) comment '微博会员等级',stamp         varchar(32) comment '时间戳'
);
-- 创建 like_status 表
create table like_status
(`index`       varchar(32) comment '数据id',child_comment varchar(32) comment '回复数量 ',comment_time  varchar(255) comment '评论时间',content       varchar(255) comment '评论内容',da_v          varchar(32) comment '微博个人认证',like_status   varchar(32) comment '赞',pic           varchar(255) comment '图片评论url',user_id       varchar(255) comment '微博用户id',user_name     varchar(32) comment '微博用户名',vip_rank      varchar(32) comment '微博会员等级',stamp         varchar(32) comment '时间戳'
);
-- 创建 count_comment 表
create table count_comment
(time  varchar(32) primary key comment '时间',count int comment '条数'
);
-- 查询表结构
describe vip_rank;
describe like_status;
describe count_comment;-- 查询表字段信息
select column_name,column_comment
from information_schema.columns
where table_schema = 'rng_comment'and table_name = 'vip_rank';select column_name,column_comment
from information_schema.columns
where table_schema = 'rng_comment'and table_name = 'like_status';select column_name,column_comment
from information_schema.columns
where table_schema = 'rng_comment'and table_name = 'count_comment';-- 存在则修改不存在则创建,要有唯一id
INSERT INTO count_comment (time,count) VALUES (?,?) ON DUPLICATE KEY UPDATE count = ?;
-- 清空表
-- truncate table vip_rank;
-- truncate table like_status;
-- truncate table count_comment;

四、 连接kafka配置类

import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.rdd.RDD/*** @author 红尘丶世界* @version v 1.0*/
class KafkaProducerConf(topic: String, severs: String) extends Serializable {//kafka 配置def createKafkaConnection(): KafkaProducer[String, String] = {val props = new Properties()props.put("bootstrap.servers", severs)//todo: 一定别忘了 .getNameprops.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)props.put("retries", "1") //设置重试次数props.put("batch.size", "71680") //设置批量发送数据的大小props.put("buffer.memory", "33554432")//设置缓冲区大小props.put("linger.ms", "1000") //最多延迟1000毫秒props.put("partitioner.class", (new MyPartitioner).getClass)new KafkaProducer[String, String](props)}//创建生产者lazy val kafkaProducer: KafkaProducer[String, String] = createKafkaConnection()Runtime.getRuntime.addShutdownHook(new Thread() {override def run(): Unit = {kafkaProducer.close()}})//把数据保存到kafkadef save(vs: RDD[String]): Unit = {try {vs.foreach(x => {val record = new ProducerRecord[String, String](topic, x.split("\t")(0), x.toString)kafkaProducer.send(record)})} catch {case e: Exception => println("发送数据出错:" + e)}}
}

五、 自定义分区类

(注意这是java类,不是scala类,同样可以创建在scala包下)

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** @author 红尘丶世界* @version v 1.0*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String s = key.toString();int i = Integer.parseInt(s);if (i % 2 == 0) {return 0;} else {return 1;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

六、 读取数据并发送数据

import org.apache.spark.{SparkConf, SparkContext}/*** @author 红尘丶世界* @version v 1.0*/
object Producer {def main(args: Array[String]): Unit = {//创建 sparkContextval conf: SparkConf = new SparkConf().setAppName("sparkStream").setMaster("local[*]")val sc = new SparkContext(conf)//读取文件并过滤空行val lines = sc.textFile("D:\\dev\\大数据\\大数据资料\\spark\\4.14号练习题\\rng_comment.txt").filter(_.trim != "")//指定连接的节点val sink: KafkaProducerConf = new KafkaProducerConf("rng_comment", "hadoop01:9092,hadoop02:9092,hadoop03:9092")//发送数据到kafkasink.save(lines)//关闭连接sink.createKafkaConnection().close()}
}

七、 消费数据,把数据存储到mysql

package com.czxy.exercise.exercise01import java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** @author 红尘丶世界* @version v 1.0*/
object SparkStreamingKafka {def main(args: Array[String]): Unit = {//1 创建sparkConfvar conf = new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")//2 创建一个sparkContextvar sc = new SparkContext(conf)sc.setLogLevel("WARN")//3 创建streamingContextvar ssc = new StreamingContext(sc, Seconds(5))//设置缓存数据的位置ssc.checkpoint("./TmpCount")var kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop01:9092,hadoop01:9092,hadoop01:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "SparkKafkaDemo",//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常//这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费"auto.offset.reset" -> "earliest",//false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护"enable.auto.commit" -> (false: java.lang.Boolean))//4 接收kafka数据并根据业务逻辑进行计算//设置位置策略   均衡,//kafkaDatas  含有key和value//key是kafka成产数据时指定的key(可能为空)//value是真实的数据(100%有数据)val data: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams))data.foreachRDD(_.foreach(row => {//获取一行转化成listval list: List[String] = row.value().split("\t").toList//判断如果 微博会员等级为5 就写入到 vip_rank表if (list(9) == "5") {//掉用方法把数据写入mysqlsaveDataToMysqlVipRankAndLikeStatus(list, "vip_rank")}if (list(5).toInt >= 10) {//掉用方法把数据写入mysqlsaveDataToMysqlVipRankAndLikeStatus(list, "like_status")}}))//按天进行计算val count: DStream[(String, Int)] = data.map(_.value().split("\t")(2).split(" ")(0)) //截取日期中的天.map((_, 1)).updateStateByKey(updateFunc) //实时统计总数(历史累加)//遍历统计结果count.foreachRDD(_.foreach(row => {//调用方法把数据存储到mysqlsaveDataToMysqlCountComment(row._1, row._2)}))//5 开启计算任务ssc.start()//6 等待关闭ssc.awaitTermination()}//创建连接,使用jdbc连接mysqldef mysqlConnection(): Connection = {DriverManager.getConnection("jdbc:mysql://hadoop01:3306/rng_comment?characterEncoding=UTF-8", "root", "123456")}/*** 将数据存入到mysql中** @param data 数据*/def saveDataToMysqlVipRankAndLikeStatus(data: List[String], tableName: String): Unit = {//获取连接val connection: Connection = mysqlConnection()//创建一个变量用来保存sql语句val sql = s"insert into ${tableName} (`index`, child_comment, comment_time, content, da_v, like_status, pic, user_id, user_name,vip_rank, stamp) values (?,?,?,?,?,?,?,?,?,?,?)"//将一条数据存入到mysqlval ps: PreparedStatement = connection.prepareStatement(sql)ps.setString(1, data.head)ps.setString(2, data(1))ps.setString(3, data(2))ps.setString(4, data(3))ps.setString(5, data(4))ps.setString(6, data(5))ps.setString(7, data(6))ps.setString(8, data(7))ps.setString(9, data(8))ps.setString(10, data(9))ps.setString(11, data(10))//提交ps.execute()connection.close()}/*** 将数据存入到mysql中** @param time  时间* @param count 数量*/def saveDataToMysqlCountComment(time: String, count: Int): Unit = {println(s"${time}\t ${count}")if (time.contains("2018/10/20") || time.contains("2018/10/21") || time.contains("2018/10/22") || time.contains("2018/10/23")) {//获取连接val connection: Connection = mysqlConnection()//创建一个变量用来保存sql语句val sql = "INSERT INTO count_comment (time,count) VALUES (?,?) ON DUPLICATE KEY UPDATE count = ?"//将一条数据存入到mysqlval ps: PreparedStatement = connection.prepareStatement(sql)ps.setString(1, time)ps.setInt(2, count)ps.setInt(3, count)//提交ps.execute()connection.close()}}/*** 历史累加** @param currentValues 当前值* @param historyValue  历史值* @return*/def updateFunc(currentValues: Seq[Int], historyValue: Option[Int]): Option[Int] = {//历史累加val result: Int = currentValues.sum + historyValue.getOrElse(0)//返回结果Some(result)}
}

使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例相关推荐

  1. Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

    1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集 ...

  2. 【Kafka】从kafka中读取最新数据

    [Kafka]从kafka中读取最新数据 一.死循环无限拉取kafka数据 1.1 整体框架剖析 1.2 测试 二.@KafkaListener注解 实现监听kafka数据 三.参考资料 前情提要:我 ...

  3. Spark Streaming使用Kafka保证数据零丢失

    为什么80%的码农都做不了架构师?>>>    源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...

  4. Spark Streaming从Kafka中拉取数据,并且使用过“窗口函数”统计一些流量信息

    一.应用案例场景: 在Spark Streaming中,我们通常计算的是一段时间间隔内的数据.比如http://blog.csdn.net/tototuzuoquan/article/details/ ...

  5. sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

    Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...

  6. Spark中如何管理Spark Streaming消费Kafka的偏移量

    spark streaming 版本 2.1 kafka 版本0.9.0.0 在这之前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的of ...

  7. Spark Streaming 遇到 kafka

    Spark Streaming 遇到 kafka 站酷 | 插画 搭建流程略,主要讲一下如何更好的结合使用,看图说话. Kafka 结合 Spark Streaming 实现在线的准实时流分析,除了保 ...

  8. Spark Streaming之Kafka的Receiver和Direct方式

    一 Receiver方式 Receiver是使用Kafka的high level的consumer API来实现的.Receiver从Kafka中获取数据都是存储在Spark Executor内存中的 ...

  9. spark kafka java api_java实现spark streaming与kafka集成进行流式计算

    java实现spark streaming与kafka集成进行流式计算 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧, ...

最新文章

  1. 神经网络感知器算法调整原理是什么
  2. 这份HCIE-Routing Switching笔试试题,你能答对几道?
  3. 数学狂想曲(十)——复变函数, 平稳离散时间随机过程, 功率谱
  4. CentOS7 上安装 Zookeeper-3.4.9 服务
  5. .Net之配置文件自定义
  6. 消息队列 应用场景 解析
  7. SQL-92标准 中文翻译——定义、记号和约定 (记号)
  8. 在html中如何写日期的代码,日期html代码
  9. C语言基础项目:200 行代码实现贪吃蛇,思路+源码详解
  10. 脑电时频分析I:谱分析
  11. PPT如何压缩?PPT文件压缩的方法有哪些
  12. ele-ui表单验证规则中的手机号码和邮箱的验证规则
  13. Tomcat Caused by:java.lang.IllegalArgumentException: 指定的主资源集[……]无效
  14. JS 把 Wed Jul 15 2015 00:00:00 GMT+0800 转换成2015-07-15
  15. c++中的trivial
  16. 华为交换机dhcp获取不到_华为S7706交换机DHCP Server 配置不成功问题
  17. 华为5g鸿蒙麒麟,华为5G手机渲染图曝光,鸿蒙+麒麟985+5G基带,参考价格很良心...
  18. 大数据 搜索 ES 一
  19. 人脸识别考勤系统安卓APP(手把手教学-手动滑稽)
  20. JavaScript 求平均数的方法(实参个数不确定)

热门文章

  1. Andorid PopWindow使用总结
  2. mysql查询数据库每张表数据量大小和占用多少空间
  3. Inconel 718实际有多厚 锻造注意事项
  4. simulink调用C/C++语言 S-Function Builder使用
  5. Linux虚拟机的创建
  6. 数据之路 Day6 - Python基础6
  7. Powershell 中 报错Set-Location : 找不到接受实际参数
  8. 江南大学c语言程序(本)期末考试2020,江南大学
  9. Unity Audio -- (3)创建3D音效
  10. 可以SE,vmp过强壳检测的硬件级虚拟机--virtualbox/vbox超能版 去虚拟化