大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

暴走大数据点击右侧关注,暴走大数据!

背景

Kafka实时记录从数据采集工具Flume或业务系统实时接口收集数据,并作为消息缓冲组件为上游实时计算框架提供可靠数据支撑,Spark 1.3版本后支持两种整合Kafka机制(Receiver-based Approach 和 Direct Approach),具体细节请参考文章最后官方文档链接,数据存储使用HBase

实现思路

  • 实现Kafka消息生产者模拟器

  • Spark-Streaming采用Direct Approach方式实时获取Kafka中数据

  • Spark-Streaming对数据进行业务计算后数据存储到HBase

本地虚拟机集群环境配置

由于笔者机器性能有限,hadoop/zookeeper/kafka集群都搭建在一起主机名分别为hadoop1,hadoop2,hadoop3; hbase为单节点在hadoop1

缺点及不足

代码设计上有些许缺陷,比如spark-streaming计算后数据保存hbase逻辑性能待优化。代码实现Kafka消息模拟器

package clickstreamimport java.util.{Properties, Random, UUID}import kafka.producer.{KeyedMessage, Producer, ProducerConfig}import org.codehaus.jettison.json.JSONObject

object KafkaMessageGenerator {private val random = new Random()private var pointer = -1private val os_type = Array("Android", "IPhone OS","None", "Windows Phone")

  def click() : Double = {    random.nextInt(10)  }

  def getOsType() : String = {    pointer = pointer + 1if(pointer >= os_type.length) {      pointer = 0os_type(pointer)    } else {os_type(pointer)    }  }

  def main(args: Array[String]): Unit = {    val topic = "user_events"//本地虚拟机ZK地址    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"    val props = new Properties()    props.put("metadata.broker.list", brokers)    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val kafkaConfig = new ProducerConfig(props)    val producer = new Producer[String, String](kafkaConfig)

while(true) {// prepare event data      val event = new JSONObject()      event        .put("uid", UUID.randomUUID())//随机生成用户id        .put("event_time", System.currentTimeMillis.toString) //记录时间发生时间        .put("os_type", getOsType) //设备类型        .put("click_count", click) //点击次数

// produce event message      producer.send(new KeyedMessage[String, String](topic, event.toString))println("Message sent: " + event)

      Thread.sleep(200)    }  }}

Spark-Streaming主类

package clickstreamimport kafka.serializer.StringDecoderimport net.sf.json.JSONObjectimport org.apache.hadoop.hbase.client.{HTable, Put}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}

object PageViewStream {  def main(args: Array[String]): Unit = {var masterUrl = "local[2]"if (args.length > 0) {      masterUrl = args(0)    }

// Create a StreamingContext with the given master URL    val conf = new SparkConf().setMaster(masterUrl).setAppName("PageViewStream")    val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations    val topics = Set("PageViewStream")//本地虚拟机ZK地址    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,"serializer.class" -> "kafka.serializer.StringEncoder")

// Create a direct stream    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    val events = kafkaStream.flatMap(line => {      val data = JSONObject.fromObject(line._2)Some(data)    })// Compute user click times    val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)    userClicks.foreachRDD(rdd => {      rdd.foreachPartition(partitionOfRecords => {        partitionOfRecords.foreach(pair => {//Hbase配置          val tableName = "PageViewStream"          val hbaseConf = HBaseConfiguration.create()          hbaseConf.set("hbase.zookeeper.quorum", "hadoop1:9092")          hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")          hbaseConf.set("hbase.defaults.for.version.skip", "true")//用户ID          val uid = pair._1//点击次数          val click = pair._2//组装数据          val put = new Put(Bytes.toBytes(uid))          put.add("Stat".getBytes, "ClickStat".getBytes, Bytes.toBytes(click))          val StatTable = new HTable(hbaseConf, TableName.valueOf(tableName))          StatTable.setAutoFlush(false, false)//写入数据缓存          StatTable.setWriteBufferSize(3*1024*1024)          StatTable.put(put)//提交          StatTable.flushCommits()        })      })    })    ssc.start()    ssc.awaitTermination()

  }

}

Maven POM文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion>

<groupId>com.guofei.sparkgroupId><artifactId>RiskControlartifactId><version>1.0-SNAPSHOTversion><packaging>jarpackaging>

<name>RiskControlname><url>http://maven.apache.orgurl>

<properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding>properties>

<dependencies>

<dependency><groupId>org.apache.sparkgroupId><artifactId>spark-core_2.10artifactId><version>1.3.0version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_2.10artifactId><version>1.3.0version>dependency>

<dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming-kafka_2.10artifactId><version>1.3.0version>dependency>

<dependency><groupId>org.apache.hbasegroupId><artifactId>hbaseartifactId><version>0.96.2-hadoop2version><type>pomtype>dependency>

<dependency><groupId>org.apache.hbasegroupId><artifactId>hbase-serverartifactId><version>0.96.2-hadoop2version>dependency><dependency><groupId>org.apache.hbasegroupId><artifactId>hbase-clientartifactId><version>0.96.2-hadoop2version>dependency><dependency><groupId>org.apache.hbasegroupId><artifactId>hbase-commonartifactId><version>0.96.2-hadoop2version>dependency><dependency><groupId>commons-iogroupId><artifactId>commons-ioartifactId><version>1.3.2version>dependency><dependency><groupId>commons-logginggroupId><artifactId>commons-loggingartifactId><version>1.1.3version>dependency><dependency><groupId>log4jgroupId><artifactId>log4jartifactId><version>1.2.17version>dependency>

<dependency><groupId>com.google.protobufgroupId><artifactId>protobuf-javaartifactId><version>2.5.0version>dependency><dependency><groupId>io.nettygroupId><artifactId>nettyartifactId><version>3.6.6.Finalversion>dependency><dependency><groupId>org.apache.hbasegroupId><artifactId>hbase-protocolartifactId><version>0.96.2-hadoop2version>dependency><dependency><groupId>org.apache.zookeepergroupId><artifactId>zookeeperartifactId><version>3.4.5version>dependency><dependency><groupId>org.cloudera.htracegroupId><artifactId>htrace-coreartifactId><version>2.01version>dependency><dependency><groupId>org.codehaus.jacksongroupId><artifactId>jackson-mapper-aslartifactId><version>1.9.13version>dependency><dependency><groupId>org.codehaus.jacksongroupId><artifactId>jackson-core-aslartifactId><version>1.9.13version>dependency><dependency><groupId>org.codehaus.jacksongroupId><artifactId>jackson-jaxrsartifactId><version>1.9.13version>dependency><dependency><groupId>org.codehaus.jacksongroupId><artifactId>jackson-xcartifactId><version>1.9.13version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-apiartifactId><version>1.6.4version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-log4j12artifactId><version>1.6.4version>dependency>

<dependency><groupId>org.apache.hadoopgroupId><artifactId>hadoop-clientartifactId><version>2.6.4version>dependency><dependency><groupId>commons-configurationgroupId><artifactId>commons-configurationartifactId><version>1.6version>dependency><dependency><groupId>org.apache.hadoopgroupId><artifactId>hadoop-authartifactId><version>2.6.4version>dependency><dependency><groupId>org.apache.hadoopgroupId><artifactId>hadoop-commonartifactId><version>2.6.4version>dependency>

<dependency><groupId>net.sf.json-libgroupId><artifactId>json-libartifactId><version>2.4version><classifier>jdk15classifier>dependency>

<dependency><groupId>org.codehaus.jettisongroupId><artifactId>jettisonartifactId><version>1.1version>dependency>

<dependency><groupId>redis.clientsgroupId><artifactId>jedisartifactId><version>2.5.2version>dependency><dependency><groupId>org.apache.commonsgroupId><artifactId>commons-pool2artifactId><version>2.2version>dependency>dependencies>

<build><sourceDirectory>src/main/scalasourceDirectory><testSourceDirectory>src/test/scalatestSourceDirectory><plugins><plugin><groupId>net.alchim31.mavengroupId><artifactId>scala-maven-pluginartifactId><version>3.2.2version><executions><execution><goals><goal>compilegoal><goal>testCompilegoal>goals><configuration><args><arg>-make:transitivearg><arg>-dependencyfilearg><arg>${project.build.directory}/.scala_dependenciesarg>args>configuration>execution>executions>plugin>

<plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-shade-pluginartifactId><version>2.4.3version><executions><execution><phase>packagephase><goals><goal>shadegoal>goals><configuration><filters><filter><artifact>*:*artifact><excludes><exclude>META-INF/*.SFexclude><exclude>META-INF/*.DSAexclude><exclude>META-INF/*.RSAexclude>excludes>filter>filters>configuration>execution>executions>plugin>plugins>build>project>

FAQ

  • Maven导入json-lib报错Failure to find net.sf.json-lib:json-lib:jar:2.3 inhttp://repo.maven.apache.org/maven2 was cached in the localrepository解决:http://stackoverflow.com/questions/4173214/maven-missing-net-sf-json-libnet.sf.json-libjson-lib2.4jdk15

  • 执行Spark-Streaming程序报错org.apache.spark.SparkException: Task not serializable

userClicks.foreachRDD(rdd => {rdd.foreachPartition(partitionOfRecords => {partitionOfRecords.foreach(这里面的代码中所包含的对象必须是序列化的这里面的代码中所包含的对象必须是序列化的这里面的代码中所包含的对象必须是序列化的})})})

执行Maven打包报错,找不到依赖的jar包error:not found: object kafkaERROR import kafka.javaapi.producer.Producer解决:win10本地系统 用户/xxx/.m2/  目录含有中文

参考文档

  • spark-streaming官方文档http://spark.apache.org/docs/latest/streaming-programming-guide.html

  • spark-streaming整合kafka官方文档http://spark.apache.org/docs/latest/streaming-kafka-integration.html

  • spark-streaming整合flume官方文档http://spark.apache.org/docs/latest/streaming-flume-integration.html

  • spark-streaming整合自定义数据源官方文档http://spark.apache.org/docs/latest/streaming-custom-receivers.html

  • spark-streaming官方scala案例https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming

  • 简单之美博客http://shiyanjun.cn/archives/1097.html

作者:MichaelFly链接:https://www.jianshu.com/p/ccba410462ba欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ?

kafka 的pom文件_基于SparkStreaming+Kafka+HBase实时点击流案例相关推荐

  1. 音视频开发(20)---基于RTMP推送实时AAC+H264流(一)

    基于RTMP推送实时AAC+H264流(一) https://blog.csdn.net/scnu20142005027/article/details/56847293 从整体来看,推流端大概是这么 ...

  2. 音视频开发(22)---基于RTMP推送实时AAC+H264流(三)

    基于RTMP推送实时AAC+H264流(三) https://blog.csdn.net/scnu20142005027/article/details/60623670 推送 流程:初始化.连接服务 ...

  3. 音视频开发(21)---基于RTMP推送实时AAC+H264流(二)

    基于RTMP推送实时AAC+H264流(二) https://blog.csdn.net/scnu20142005027/article/details/57428107 编码 图像采用H264编码, ...

  4. kafka 发布-订阅模式_使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...

    kafka 发布-订阅模式 发布-订阅消息系统在任何企业体系结构中都起着重要作用,因为它可以实现可靠的集成而无需紧密耦合应用程序. 在解耦的系统之间共享数据的能力并不是一个容易解决的问题. 考虑一个企 ...

  5. kafka计算机专业读法_终于知道Kafka为什么这么快了!

    Kafka 为什么能那么快 | Kafka高效读写数据的原因 无论 kafka 作为 MQ 也好,作为存储层也罢,无非就是两个功能(好简单的样子),一是 Producer 生产的数据存到 broker ...

  6. jenkins修改pom文件_动手实践:美化 Jenkins 报告插件的用户界面

    对于 Jenkins 而言,可使用插件来可视化各种构建步骤的结果.有一些插件可用于呈现测试结果.代码覆盖率.静态分析等.这些插件通常都会获取给定构建步骤的构建结果,并在用户界面中显示它们.为了呈现这些 ...

  7. kafka 同步提交 异步_详解Kafka设计架构核心——Kafka副本机制详解

    所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝.副本机制有什么好处呢? 1. 提供数据冗余.即使系统部分组件失效,系统依然 ...

  8. jenkins修改pom文件_从Jenkins中的pom文件自动派生强制性SonarQube属性

    情况: 我想用由詹金斯(1.642.4)触发的SonarQube(5.4)分析我的项目.它是使用maven构建的Java项目. 我看到两种触发分析的方法: 发布构建操作"使用maven进行S ...

  9. python 修改pom文件_引用pom文件

    Maven2集成Idea创建多模块项目 创建项目 选择Maven Module,新建一个Maven项目 选择maven-archetype-quickstart选项,点击下一步 顶级项目就创建好啦,把 ...

最新文章

  1. 36.两个链表的第一个公共结点——剑指offer
  2. 如何在基于Bytom开发过程中集成IPFS
  3. hbase shell-dml(数据管理指令)
  4. nlp论文-《Neural Machine Translation by Jointly Learning to Align and Translate》-基于联合学习对齐和翻译的神经机器翻译(一)
  5. vmware 12 安装centos7网络配置
  6. Shell循环输入符合条件为止
  7. android上传字符串到服务器,【图片】【转】通过Android 客户端上传数据到服务器【aide吧】_百度贴吧...
  8. js 实现pdf在线阅读
  9. 如何使用html制作网页
  10. 怎么解除计算机管理员的身份,怎么取消管理员权限(怎么取消管理员取得所有权)...
  11. 华为云:云江湖中的“武当派”
  12. 手机端如何维持登录状态
  13. 跟朋友合伙创业股权怎么分配
  14. 解决IE文件无法正常下载,其他浏览器可以正常下载
  15. Clickhouse 踩坑之旅 ---- MergeTree不合并分区的问题
  16. 黄色——网页效果图设计之色彩索引
  17. oracle更新右数第一位,Oracle数据库基本查询语句
  18. 【PYTHON小项目】VCF文件转EXCEL文件方法详解(附QUOTED-PRINTABLE编解码)
  19. 易语言从c盘开始搜索文件,全盘搜索查找指定文件
  20. 【信号处理】频谱分析仪含Matlab源码

热门文章

  1. python代码转换为pytorch_python基础教程Pytorch之Tensor和Numpy之间的转换的实现方法...
  2. oracle 序列_Oracle WebLogic最新高危反序列化漏洞修复方法
  3. 洛谷P1102 A-B
  4. 蓝桥杯2014年省赛C/C++ 本科B组
  5. Python模拟登陆,解密js代码实例:知乎登陆
  6. 卫星协同观测的学习笔记
  7. Python——使用“_”下划线作为参数的占位符
  8. 如何在MATLAB中定义一些全局常量
  9. 2017年WorkApplication牛客网线上机试题
  10. Java中更换Map中的主键key的名称