开发环境:Hadoop+HBASE+Phoenix+flum+kafka+spark+MySQL

默认配置好了Hadoop的开发环境,并且已经安装好HBASE等组件。

下面通过一个简单的案例进行整合:

这是整个工作的流程图:

第一步:获取数据源

  由于外部埋点获取资源较为繁琐,因此,自己写了个自动生成类似数据代码:

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;public class Genlog {static String[] srcurls={"http://www.baidu.com","http://www.sougou.com","http://www.360.com","http://www.taobao.com"};static String[] oss={"android","ios","mac","win","linux"};static String[] sexs={"f","m"};public static void main(String[] args) throws InterruptedException {//http://xxxxx?refurl=http://www.baidu.com&pid=xx&os=andriod&sex=f/m&wx=abcLogger logger=LogManager.getLogger(Genlog.class);while(true){String srcurl=srcurls[(int) (Math.random()*srcurls.length)];String os=oss[(int) (Math.random()*oss.length)];String sex=sexs[(int) (Math.random()*sexs.length)];String url=String.format("http://xxxxx?refurl=%s&pid=xx&os=%s&wx=abc&sex=%s/m",srcurl,os,sex);logger.info(url);Thread.sleep(300);}}
}

这部分代码表示,在启动程序后,将会不断生成类似文中注释类型的数据,这样flume的source端就可以源源不断的获取到数据。

pom.xml文件就是关于log4j的依赖api  core  和flum-ng即可,不再赘述。

  同时,在项目中,要编写连接虚拟机的配置文件,放在resource下,配置文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/></Console><Flume name ="hi" compress="false" type="avro"><agent host ="192.168.110.101" port="44444"></agent></Flume></Appenders><Loggers><Root level="INFO"><AppenderRef ref="Console"/><AppenderRef ref="hi"></AppenderRef></Root></Loggers>
</Configuration>

这样,我们的配置数据源的项目就已经完成了,当然,在实际生产中,肯定要比这复杂的多。

第二步:配置flume

配置flume/config/a1.conf,文件可以直接touch创建,配置如下:

# 定义资源  管道 目的地
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 设置源的属性
a1.sources.r1.type =avro
a1.sources.r1.bind=192.168.110.101
a1.sources.r1.port=44444# 设置目的地属性
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.producer.acks = 0
a1.sinks.k1.kafka.topic = mylog
a1.sinks.k1.kafka.bootstrap.servers = 192.168.110.101:9092# 管道属性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 把源通过管道连接到目的地
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注意更换自己的IP地址,同时,根据需求更改acks的结果,如1、-1、0,具体介绍看官网即可。此时flume是依赖kafka的。所以启动顺序请先启动kafka,否则会报错。

第三步:编写spark stream项目

项目目标主要是将kafka中的数据拉取下来消费,通过内部逻辑,将数据转变为DataFrame格式,通过Phoenix存储在HBASE上,以方便对数据进行分析。

项目配置文件pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.yzhl</groupId><artifactId>spark-streaming-phoneix-kafkademo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.1</version><scope>provided</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.11.12</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.1</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version></plugin></plugins></build>
</project>

逻辑代码如下:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object LogSave  extends App {//定义brokers, groupId, topics/*** 关于driver和worker的执行位置的代码*/val Array(brokers, groupId, topics) = Array("192.168.86.128:9092","mylog","mylog")//driver//spark上下文对象相当于connectionval spark = SparkSession.builder().appName("mylog").getOrCreate()//driver//创建spark streaming 上下文val ssc = new StreamingContext(spark.sparkContext, Seconds(5))//driverval topicsSet = topics.split(",").toSet//driver//定义kafka配置属性val kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])//driver//使用KafkaUtils工具来的createDirectStream静态方法创建DStream对象val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))//driver//messages中的每一条数据都是一个(key,value) 其中value指的是log中的一行数据val lines = messages.map(_.value)//workerimport spark.implicits._//driver  worker//在driver端编译成了class,之后上传到worker中case class MyRecord(id:String,time:String,srcUrl:String,os:String,sex:String)//为记录产生IDlines.print(5)//driver//foreachRDD在driver上执行,lines.foreachRDD((rdd,t) =>{val props = scala.collection.mutable.Map[String,String]()//driverprops += "table" -> "tb_mylog"props += "zkUrl" -> "jdbc:phoenix:hadoop"//从下面到toDF.都会放在worker上执行rdd.zipWithUniqueId().map( x =>{val p =""".+(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}).+refurl=(.*)&.+&os=(.+)&.+&sex=(.+)""".rx._1 match {case p(time,srcUrl,os,sex) => MyRecord(t.toString()+x._2,time,srcUrl,os,sex)case _ => MyRecord(null,null,null,null,null)}}).filter(_.id !=null).toDF().write.format("org.apache.phoenix.spark").mode("overwrite").options(props).save();//todf--save之间都是在worker上执行,save()是在driver上
  })ssc.start()//driverssc.awaitTermination()//driver/*** spark的所有上下文的创建都在driver上执行* spark的所有action都在driver上执行* spark的所有transformation都在worker上执行**/}

这部分代码可以将拉取的数据进行格式化 的存储。其中正则表达式是对数据行的拆分,并通过Phoenix存储到HBASE上。

第四步:项目打包

我用的idea,打包很简单,maven-->plugins-->scala:compile(编译)-->Lifecycle的package   即可打包完成,可在target目录下查看。

eclipse的打包也很简单,网上一大堆。

到此,在代码阶段的操作基本完成,接下来就是在集群上的运行过程。

第五步:启动各个进程

本次的部署是在yarn上的,所以肯定有yarn的启动。我们按照顺序启动。

1,启动HDFS:start-dfs.sh

2.启动yarn:start-yarn.sh

3.启动zookeeper:如果是自己安装的zookeeper,可以直接用./zkServer.sh start

   如果是用kafka自带的zookeeper,启动命令:bin/zookeeper-server-start.sh config/zookeeper.properties

4.启动kafka:bin/kafka-server-start.sh config/server.properties

5.启动flume:bin/flume-ng agent -n a1 -c conf -f conf/a1.conf    此时可以启动数据源的生成项目运行

6.启动kafka的消费者consumer:bin/kafka-console-consumer.sh --bootstrap-server 192.168.110.101:9092 --topic mylog

7.启动HBASE:start-hbase.sh

8.启动Phoenix: ./sqlline.py localhost

第六步:以上进程都启动成功后,可以将打包好的jar包上传到系统路径

此时有一个问题一定要注意,不然肯定会报错,列如空指针的异常,但无法查询错误具体信息,根本原因是缺少对于的依赖包。

在下载依赖包的时候,我们还需要将两个必须的依赖包导入到spark的jars文件中,因为我们打包的瘦包,无法包含所有的依赖包。

这两包是:spark-streaming-kafka-0-10_2.11和他的依赖包kafka_2.11。根据你自己的版本不同,找到对应的版本依赖包,否则会报出版本依赖的异常信息。

添加方法:cd到spark的jars目录先,在maven官网,右键点击相应的依赖包的jar,复制路径,运用命令 ”wget 复制的路径”,也可以自己下载到本地后上传。

接着,在启动的Phoenix中,创建我们自己的表,在编码中的表名为tb_mylog,所以创建表:

!create table tb_mylog(id varchar(255) primary key,time varchar(255),srcUrl varchar(255),os varchar(255),sex varchar(20));

此时!tables里面就会存在了tb_mylog个表。

第七步:运行上传的jar包,处理数据

运行命令:spark-submit --master yarn --deploy-mode client --class 包名  jar包

运行后,可以看到数据在不断的写入,spark Stream在不断的获取,此时,进入Phoenix中,

select *  from tb_mylog,可以看到数据在表中存在,并不断的增长,如果机器性能不是很好,建议运行一段时间后,可以停掉源数据的生成。

对于关闭HBASE,需要注意,不可直接stop掉HBASE,这样数据就会丢失或者出发预写机制,无法将数据完全的保存到HDFS上,所以停掉HBASE的最好方式是:先运行hbase-daemon.sh stop master,然后在运行stop-hbase.sh. 这样既可。

由于是基于yarn模式,所以要读取到yarn-site.xml文件,所以在spark-env.sh中配置HADOOP_CONF-DIR=Hadoop路径,或者YARN_CONF_DIR=yarn路径。

注意:

  如果用Phoenix连接spark,那么需要Phoenix里的Phoenix-spark-hbase.jar和Phoenix-HBASE-client.jar。

,worker节点通过Phoenix连接HBASE时,自己有了客户端,那么HBASE的regionserver端需要Phoenix-HBASE-server.jar和Phoenix-spark-hbase.jar两个包。

flume通信数据源:通过通信协议avro.  给到flume的source处,通过配置channel后,得到下沉的位置,即得到kafka的producer,然后通过worker节点进行消费,消费形式是kafkaDStream。

接下来是数据的分析,然后存储到MySQL中。

第八步:存储到数据库中的编码

新建项目:

import org.apache.spark.sql.{SaveMode, SparkSession}object ETLSparkSql extends App {val spark = SparkSession.builder().appName("from-hbase-etl-to-mysql using spark+phoenix").getOrCreate()//driverval props = scala.collection.mutable.Map[String,String]() //driverprops += "table" -> "tb_mylog"props += "zkUrl" -> "hadoop:2181"val df = spark.read.format("org.apache.phoenix.spark").options(props).load();df.createOrReplaceTempView("tb_mylog")val df2 = spark.sql("select srcUrl,count(1) as count_nums from tb_mylog group by srcUrl");df2.createOrReplaceTempView("tb_url_count")val sql ="""
      |select|       case when srcUrl = 'http://www.baidu.com' then count_nums|                                else 0 end as baidu,|       case when srcUrl = 'http://www.souguo.com' then count_nums|                                else 0 end as souguo,|       case when srcUrl = 'http://www.360.com' then count_nums|                                else 0 end as `360`,|       case when srcUrl = 'http://www.taobao.com' then count_nums|                                else 0 end as `taobao`,|       case when srcUrl not in  ('http://www.baidu.com','http://www.souguo.com','http://www.taobao.com','http://www.360.com') then count_nums|                                else 0 end as `qita`|       from  tb_url_count""".stripMarginval df3 = spark.sql(sql)df3.createOrReplaceTempView("tb_case")val jdbcops = scala.collection.mutable.Map[String,String]() //driverprops += "table" -> "tb_log_count"props += "url" -> "jdbc:mysql://192.168.86.1:3306/logdb"props += "user" -> "root"props += "password" -> "root"props += "driver" -> "com.mysql.jdbc.Driver"spark.sql("select sum(baidu),sum(souguo),sum(`360`),sum(taobao),sum(qita) from tb_case").write.format("jdbc").mode(SaveMode.Append).options(jdbcops).save()println("任务提交,等待结果")}

第九步:创建数据库和表

创建logdb的数据库,创建表tb_log_count,列名分别为id,baidu,souguo,360,taobao,qita。

然后对项目进行编译和打包,上传到客户端driver上,

启动HDFS,启动yarn,启动HBASE,同时可以执行编译运行语句:

spark-submit  --master yarn --deploy-mode client ETLSparkSql 包名

  到此为止,我们的数据的获取,数据的处理,数据的存储,数据的存库都已经完成,可以在MySQL数据库中查看结果了。

第十步:数据库数据的展示

我们用到的技术是Dubbo,对项目做微服务。本项目的Dubbo框架如下:

下面开始建立我们的项目:

1.建立entity:

  建立一个maven项目,创建一个实体类对象,并实现序列化接口,以便读取数据库对象。设置对应数据库的属性,并添加set和get方法,以方便后面的过程调用。

  同时,在pom文件中,添加<packaging>jar</packaging>用来打包,此时可以通过install进行打包,可以在本地磁盘的.m2相应的目录中找到对用的jar文件。

2.创建dao-interface项目

此时,创建的项目pom文件中同样加入jar,另外,将上一个entity项目中pom文件中的信息作为本项目的依赖,这样两个项目就可以关联到一起了。接口类写到了一个装载实体的列表list方法。然后同样,通过install进行打包。

3.创建dao-impl类,即dao的实现类:

此时创建的项目是spring-boot项目,这个项目要用到mybatis进行整合。

创建后,首先导入依赖问题,在pom文件中加入依赖:

       <dependency><groupId>com.yzhl</groupId><artifactId>dao-api</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.alibaba.boot</groupId><artifactId>dubbo-spring-boot-starter</artifactId><version>0.2.0</version></dependency>

证明此时依赖的时上一个项目dao接口,同时还依赖了Dubbo.

接下来,创建一个接口类,同样具有的时实体类的集合方法。有了接口,需要做映射文件,创建映射文件mapper.xml,文件内容大致为

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org//dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yzhl.dao.LogMapper"><select id="list" resultType="com.yzhl.commen.Logentity">select * from tb_log_count</select></mapper>

映射完成,需要通过App做扫描,添加扫描注解:@MapperScan(basePackages = "com.yzhl.dao")

接下来编写实现类:

@Service
@Component
public class LogServiceImpl implements  LogService {@Autowiredprivate LogMapper logMapper;@Overridepublic List<Logentity> list() {return logMapper.list();}
}

同时配置properties.yml文件:

spring:datasource:url: jdbc:mysql://localhost:3306/logdbusername: rootpassword: XXoo0321driver-class-name: com.mysql.jdbc.Driver
mybatis:mapper-locations: classpath:mapping/*xml
dubbo:application:id: dao-implname: dao-implprotocol:id: dubboname: dubboport: 9999registry:id: my-1address: zookeeper://192.168.110.101:2181scan:basePackages: com.yzhl.dao

到此,dao的实现类也已经完成了。

4.创建web项目:

同样是spring-boot项目,pom文件依然需要dao接口项目和Dubbo的依赖,导入即可。

配置properties.yml文件:

server:port: 8888
dubbo:application:id: webname: webprotocol:id: dubboname: dubboregistry:id: my-2address: zookeeper://192.168.110.101:2181scan:basePackages: com.yzhl.webs

如果是非本地操作,需要在protocol中添加port端口号,且不能与前面实现类的相同,本地操作可不用添加。

创建Controller对象:

@RestController
@RequestMapping("/log")
public class LogController {@Reference//因为是外部的对象,这个注入只能用阿里的
private LogService logService;@GetMapping("list")@ResponseBodypublic List<Logentity> list(){return logService.list();}
}

到此,我们对数据库的资源获取已经完成,接下来就是利用Angular进行展示效果的编写。

第十一步:Angular展示效果图

新手上路,有不对的地方还请指正。

转载于:https://www.cnblogs.com/qianshuixianyu/p/9805714.html

Spark Stream整合flum和kafka,数据存储在HBASE上,分析后存入数据库相关推荐

  1. 用户画像 | 标签数据存储之HBase真实应用

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

  2. 云时代的大数据存储-云HBase

    纵观数据库发展的几十年,从网状数据库.层次数据库到RDBMS数据库,在最近几年的NewSQL的兴起,加上开源的运动,再加上云的特性,可以说是日新月异.在20世纪80年代后,大部分的业务确定了使用RDB ...

  3. Kafka数据存储概述

    Kafka这款分布式消息队列使用文件系统和操作系统的页缓存(page cache)分别存储和缓存消息,摒弃了Java的堆缓存机制,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了 ...

  4. Kafka数据存储结构

    Kafka 的数据分为两部分:元数据.消息数据. 元数据:元数据包括集群信息.节点信息.队列信息.主从信息.分区信息.分区分布信息等,这类信息都存储在 Zookeeper 上,Kafka 的任何一个节 ...

  5. GoLang—使用net/http构建Web服务(文件数据存储)(上)

    数据存储可以分为三大类:文件存储.关系型的数据库(SQL)和非关系型的数据库(NoSQL).本文主要讲述文件存储的实现方式. 文件存储根据不同的文件实现不同的存储方式:普通文件(如txt读写).CSV ...

  6. java消费kafka数据之后,进行堆积之后在插入数据库

    java高频的获取kafka数据,导致数据库数据一致在高频读写,为了降低数据库的高频连接搞高频读写,可以将数据堆积一段时间之后,进行插入数据库操作. 主要采用了队列和缓存,将获取到的数据放入java队 ...

  7. python中文件的存储类型_关于python中数据存储大总结,涵盖文件系统和数据库存储两种方法-文件系统类型...

    存储数据是python必不可免的话题,数据的存储类型也多种多样,文件系统存储(.txt..csv..json.多媒体存储).关系型数据库存储(MySQL等).非关系型数据库存储(MongoDB).今天 ...

  8. csv处理数据后存入数据库

    我自己测试的数据量在9万行多一点儿,数据库是sqlserver,改成自己的就行 package Test;import java.io.BufferedReader; import java.io.F ...

  9. Kafka数据存储详解

    1.存储格式概述 每一个partion(文件夹)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件里.但每一个段segment file消息数量不一定相等,这样的特性方便old s ...

最新文章

  1. 那些上学的时候没很好理解的数学概念(原)
  2. 憋个大招!4年小Java的心路历程
  3. Eclipse | 使用
  4. 关于vscode插件autoprefixer 3.0无法使用的问题
  5. IntelliJ Idea注释模板--类注释、方法注释
  6. contenttype添加field
  7. python datetime strptime_python datetime模块strptime/strptime format常见格式命令_施罗德_新浪博客...
  8. android apk 微信登入_Android集成微信登录的步骤详解
  9. 学java用什么编译器_学习Java用什么编译软件好
  10. 如何做出优质的Scratch作品
  11. java谷歌翻译_Java 调用 google 翻译
  12. mp4视频在flash中边下载边播放
  13. Lample-Ziv文本压缩(java实现)--学习笔记
  14. iOS内测分发平台的选择与标准
  15. iTween基础之Move(移动)
  16. centos 把文件打包为tar.gz命令
  17. 奢侈品典当价格以及流程又是如何的?现今哪些品牌的奢侈品押呗可以典当!
  18. 项目经理的职业规划,建议收藏
  19. 使用cl_gui_docking_container实现多ALV
  20. 【C++】win 10:VC 6.0 中文版下载、安装、使用

热门文章

  1. 实验四、主存空间的分配和回收模拟
  2. MVC URL参数传递+变为空格解决方法
  3. [linux内核][LINUX内核编程]学习笔记(一)
  4. 怎样在CSDN博客里插入代码块并且让代码有颜色,显示高亮?(只需要指定语言种类就好,附详细方法)
  5. react源代码重点难点分析
  6. 我也来说说js的事件机制
  7. 计算机网络的硬件连接图,路由器设置图和网络电缆连接图_计算机硬件和网络_IT /计算机_数据...
  8. c语言中removeDir的相反函数,C语言编程常见问题解答之常用函数的包括文件.doc
  9. HTML table 标签的 frame 属性
  10. CSS中float属性详解