sparkStreaming连接kafka整合hbase和redis
sparkStreaming消费kafka数据,并将数据保存到redis和hbase当中去,实现实时
import org.apache.hadoop.hbase.client.{Admin, Connection}
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedisobject StreamingKafka extends Logging{def main(args: Array[String]): Unit = {val brokers = ConfigUtil.getConfig(Constants.KAFKA_BOOTSTRAP_SERVERS)val topics = Array(ConfigUtil.getConfig(Constants.CHENG_DU_GPS_TOPIC),ConfigUtil.getConfig(Constants.HAI_KOU_GPS_TOPIC))val conf = new SparkConf().setMaster("local[1]").setAppName("sparkKafka")val group:String = "gps_consum_group"val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> group,"auto.offset.reset" -> "latest",// earliest,latest,和none"enable.auto.commit" -> (false: java.lang.Boolean))val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()val context: SparkContext = sparkSession.sparkContextcontext.setLogLevel("WARN")// val streamingContext = new StreamingContext(conf,Seconds(5))//获取streamingContextval streamingContext: StreamingContext = new StreamingContext(context,Seconds(1))val result: InputDStream[ConsumerRecord[String, String]] = HbaseTools.getStreamingContextFromHBase(streamingContext,kafkaParams,topics,group,"(.*)gps_topic")result.foreachRDD(eachRdd =>{if(!eachRdd.isEmpty()){eachRdd.foreachPartition(eachPartition =>{val connection: Connection = HBaseUtil.getConnectionval jedis: Jedis = JedisUtil.getJedis//判断表是否存在,如果不存在就进行创建val admin: Admin = connection.getAdminif(!admin.tableExists(TableName.valueOf(Constants.HTAB_GPS))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_GPS))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}if(!admin.tableExists(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}eachPartition.foreach(record =>{//保存到HBase和redisval consumerRecords: ConsumerRecord[String, String] = HbaseTools.saveToHBaseAndRedis(connection,jedis, record)})JedisUtil.returnJedis(jedis)connection.close()})//更新offsetval offsetRanges: Array[OffsetRange] = eachRdd.asInstanceOf[HasOffsetRanges].offsetRanges//result.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //将offset提交到默认的kafka的topic里面去保存for(eachrange <- offsetRanges){val startOffset: Long = eachrange.fromOffset //起始offsetval endOffset: Long = eachrange.untilOffset //结束offsetval topic: String = eachrange.topicval partition: Int = eachrange.partitionHbaseTools.saveBatchOffset(group,topic,partition+"",endOffset)}}})streamingContext.start()streamingContext.awaitTermination()}
}
sparkStreaming连接kafka整合hbase和redis相关推荐
- flink连接kafka整合hbase,scala
解析kafka当中的json格式的数据,入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} ...
- 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...
- 2.4-2.5、Hive整合(整合Spark、整合Hbase)、连接方式Cli、HiveServer和hivemetastore、Squirrel SQL Client等
2.4其它整合 2.4.1Hive整合Spark Spark整合hive,需要将hive_home下的conf下的hive_site.xml放到spark_home下的conf目录下.(3台服务器都做 ...
- flink入门3-Flink连接Kafka、Redis,实现Kafka Source/Redis Sink
本篇文章将会一步步实现如何使用Flink对接Kafka和Redis,并将Kafka中的数据存储到Redis中,这种场景也是真实项目会遇到的. 1.单机部署Kafka 1.1 下载Kafka压缩包,解压 ...
- Maven+Eclipse+SparkStreaming+Kafka整合
2019独角兽企业重金招聘Python工程师标准>>> 版本号: maven3.5.0 scala IDE for Eclipse:版本(4.6.1) spark-2. ...
- 大数据Hadoop、Hive、Kafka、Hbase、Spark等框架面经
大数据组件 学习路线: 阶段1:学习绿色箭头的知识点: 阶段2:学习红色箭头的知识点: 阶段3:学习蓝色箭头的知识点: 1 Hadoop 1.1 Hadoop1.x与Hadoop2.x的区别 1.2 ...
- SpringBoot整合HBase将数据写入Docker中的HBase
在之前的项目里,docker容器中已经运行了HBase,现将API操作HBase实现数据的增删改查 通过SpringBoot整合Hbase是一个很好的选择 首先打开IDEA,创建项目(project) ...
- 阿里P8架构师谈:MongoDB、Hbase、Redis等NoSQL优劣势、应用场景
NoSQL的四大种类 NoSQL数据库在整个数据库领域的江湖地位已经不言而喻.在大数据时代,虽然RDBMS很优秀,但是面对快速增长的数据规模和日渐复杂的数据模型,RDBMS渐渐力不从心,无法应对很多数 ...
- apache druid 与kafka整合使用
前言 在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用 本篇,我们结合一个实际的简单的应用场景,来说说apache drui ...
最新文章
- Go 语言中手动内存管理
- sql 一对多获得一条数据_从真实销售数据获得insights——SQL部分
- 按功能顺序列出的 HTML 4.01/XHTML 1.0
- Linux命令操作,文件复制,删除修改等
- HDU 1257 最少拦截系统
- Android UI线程和非UI线程
- SQL语句修改主键列
- 爬虫——————爬取中金所,深交所,上交所期权数据
- 一道内存分配的面试题
- 全民 Transformer (二): Transformer在深度学习和NLP中如何发挥作用
- euraka有哪些组件_SpringCloud及其五大常用组件之Eureka和Zuul
- GB2312、GBK、UTF-8 如何转换
- 中文技术文档写作规范(汇总整理版)
- 重启tomcat服务器步骤
- ISCC2018 Misc WriteUp
- vue使用报错记录(cli4):[vue/valid-v-for] Custom elements in iteration require ‘v-bind:key‘ direc
- 关于恶意说说自动在QQ空间转发的机制
- 是不是每个软件测试人员都有一份跟我差不多的心酸历程?
- 哈工大计算机专业复试科目,哈工大 计算机科学与技术学院复试科目.doc
- ARM嵌入式开发板推荐
热门文章
- jenkins+k8s实现持续集成
- JAVA进阶教学之(源码及API文档概述)
- xss植入_网站xss漏洞的利用过程
- python整数类型在每一台计算机上的取值范围是一样的_关于python统计一个整数列表中不同数值种类数的问题。...
- java substring 越界_我在java中用substrng()提取某一字符串的子串是老是出现越界的问题,求指教~~~...
- armbian nginx 部署博客_通过Git将Hexo博客部署到服务器
- java 做ui_【原创】JavaApplication的UI也可以做的很美
- pandas 更改单元格的值_懂Excel轻松入门Python数据分析包pandas(二十四):连续区域...
- java 判断范围_java判断一个点是否在一个围栏范围内
- java nutz_jnutz: 基于nutz的java+js混合开发项目