sparkStreaming消费kafka数据,并将数据保存到redis和hbase当中去,实现实时

import org.apache.hadoop.hbase.client.{Admin, Connection}
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedisobject StreamingKafka  extends Logging{def main(args: Array[String]): Unit = {val brokers = ConfigUtil.getConfig(Constants.KAFKA_BOOTSTRAP_SERVERS)val topics = Array(ConfigUtil.getConfig(Constants.CHENG_DU_GPS_TOPIC),ConfigUtil.getConfig(Constants.HAI_KOU_GPS_TOPIC))val conf = new SparkConf().setMaster("local[1]").setAppName("sparkKafka")val group:String = "gps_consum_group"val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> group,"auto.offset.reset" -> "latest",// earliest,latest,和none"enable.auto.commit" -> (false: java.lang.Boolean))val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()val context: SparkContext = sparkSession.sparkContextcontext.setLogLevel("WARN")// val streamingContext = new StreamingContext(conf,Seconds(5))//获取streamingContextval streamingContext: StreamingContext =  new StreamingContext(context,Seconds(1))val result: InputDStream[ConsumerRecord[String, String]] = HbaseTools.getStreamingContextFromHBase(streamingContext,kafkaParams,topics,group,"(.*)gps_topic")result.foreachRDD(eachRdd =>{if(!eachRdd.isEmpty()){eachRdd.foreachPartition(eachPartition =>{val connection: Connection = HBaseUtil.getConnectionval jedis: Jedis = JedisUtil.getJedis//判断表是否存在,如果不存在就进行创建val admin: Admin = connection.getAdminif(!admin.tableExists(TableName.valueOf(Constants.HTAB_GPS))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_GPS))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}if(!admin.tableExists(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}eachPartition.foreach(record =>{//保存到HBase和redisval consumerRecords: ConsumerRecord[String, String] = HbaseTools.saveToHBaseAndRedis(connection,jedis, record)})JedisUtil.returnJedis(jedis)connection.close()})//更新offsetval offsetRanges: Array[OffsetRange] = eachRdd.asInstanceOf[HasOffsetRanges].offsetRanges//result.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)  //将offset提交到默认的kafka的topic里面去保存for(eachrange <-  offsetRanges){val startOffset: Long = eachrange.fromOffset  //起始offsetval endOffset: Long = eachrange.untilOffset  //结束offsetval topic: String = eachrange.topicval partition: Int = eachrange.partitionHbaseTools.saveBatchOffset(group,topic,partition+"",endOffset)}}})streamingContext.start()streamingContext.awaitTermination()}
}

sparkStreaming连接kafka整合hbase和redis相关推荐

  1. flink连接kafka整合hbase,scala

    解析kafka当中的json格式的数据,入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} ...

  2. 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明

    目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...

  3. 2.4-2.5、Hive整合(整合Spark、整合Hbase)、连接方式Cli、HiveServer和hivemetastore、Squirrel SQL Client等

    2.4其它整合 2.4.1Hive整合Spark Spark整合hive,需要将hive_home下的conf下的hive_site.xml放到spark_home下的conf目录下.(3台服务器都做 ...

  4. flink入门3-Flink连接Kafka、Redis,实现Kafka Source/Redis Sink

    本篇文章将会一步步实现如何使用Flink对接Kafka和Redis,并将Kafka中的数据存储到Redis中,这种场景也是真实项目会遇到的. 1.单机部署Kafka 1.1 下载Kafka压缩包,解压 ...

  5. Maven+Eclipse+SparkStreaming+Kafka整合

    2019独角兽企业重金招聘Python工程师标准>>> 版本号: maven3.5.0     scala IDE for Eclipse:版本(4.6.1)    spark-2. ...

  6. 大数据Hadoop、Hive、Kafka、Hbase、Spark等框架面经

    大数据组件 学习路线: 阶段1:学习绿色箭头的知识点: 阶段2:学习红色箭头的知识点: 阶段3:学习蓝色箭头的知识点: 1 Hadoop 1.1 Hadoop1.x与Hadoop2.x的区别 1.2 ...

  7. SpringBoot整合HBase将数据写入Docker中的HBase

    在之前的项目里,docker容器中已经运行了HBase,现将API操作HBase实现数据的增删改查 通过SpringBoot整合Hbase是一个很好的选择 首先打开IDEA,创建项目(project) ...

  8. 阿里P8架构师谈:MongoDB、Hbase、Redis等NoSQL优劣势、应用场景

    NoSQL的四大种类 NoSQL数据库在整个数据库领域的江湖地位已经不言而喻.在大数据时代,虽然RDBMS很优秀,但是面对快速增长的数据规模和日渐复杂的数据模型,RDBMS渐渐力不从心,无法应对很多数 ...

  9. apache druid 与kafka整合使用

    前言 在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用 本篇,我们结合一个实际的简单的应用场景,来说说apache drui ...

最新文章

  1. Go 语言中手动内存管理
  2. sql 一对多获得一条数据_从真实销售数据获得insights——SQL部分
  3. 按功能顺序列出的 HTML 4.01/XHTML 1.0
  4. Linux命令操作,文件复制,删除修改等
  5. HDU 1257 最少拦截系统
  6. Android UI线程和非UI线程
  7. SQL语句修改主键列
  8. 爬虫——————爬取中金所,深交所,上交所期权数据
  9. 一道内存分配的面试题
  10. 全民 Transformer (二): Transformer在深度学习和NLP中如何发挥作用
  11. euraka有哪些组件_SpringCloud及其五大常用组件之Eureka和Zuul
  12. GB2312、GBK、UTF-8 如何转换
  13. 中文技术文档写作规范(汇总整理版)
  14. 重启tomcat服务器步骤
  15. ISCC2018 Misc WriteUp
  16. vue使用报错记录(cli4):[vue/valid-v-for] Custom elements in iteration require ‘v-bind:key‘ direc
  17. 关于恶意说说自动在QQ空间转发的机制
  18. 是不是每个软件测试人员都有一份跟我差不多的心酸历程?
  19. 哈工大计算机专业复试科目,哈工大 计算机科学与技术学院复试科目.doc
  20. ARM嵌入式开发板推荐

热门文章

  1. jenkins+k8s实现持续集成
  2. JAVA进阶教学之(源码及API文档概述)
  3. xss植入_网站xss漏洞的利用过程
  4. python整数类型在每一台计算机上的取值范围是一样的_关于python统计一个整数列表中不同数值种类数的问题。...
  5. java substring 越界_我在java中用substrng()提取某一字符串的子串是老是出现越界的问题,求指教~~~...
  6. armbian nginx 部署博客_通过Git将Hexo博客部署到服务器
  7. java 做ui_【原创】JavaApplication的UI也可以做的很美
  8. pandas 更改单元格的值_懂Excel轻松入门Python数据分析包pandas(二十四):连续区域...
  9. java 判断范围_java判断一个点是否在一个围栏范围内
  10. java nutz_jnutz: 基于nutz的java+js混合开发项目