spark streaming读取kafka数据,记录offset
如下是pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.demo</groupId><artifactId>spark-streaming-demo</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>spark-streaming-demo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spark.version>1.6.2</spark.version><mysql-connector.version>5.1.35</mysql-connector.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql-connector.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.0.31</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>com.stratio.datasource</groupId><artifactId>spark-mongodb_2.11</artifactId><version>0.12.0</version></dependency></dependencies>
</project>
代码如下:
package com.fosun.spark_streaming_demo;import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;import javax.sql.DataSource;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;import com.alibaba.druid.pool.DruidDataSourceFactory;import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Tuple2;public class SparkstreamingOnDirectKafka {public static JavaStreamingContext createContext() throws Exception {SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));// jsc.checkpoint("/user/tenglq/checkpoint");Map<String, String> kafkaParams = new HashMap<String, String>();kafkaParams.put("metadata.broker.list", "fonova-hadoop1.fx01:9092,fonova-hadoop2.fx01:9092");kafkaParams.put("auto.offset.reset", "smallest");Set<String> topics = new HashSet<String>();topics.add("tlqtest3");final Map<String, String> params = new HashMap<String, String>();params.put("driverClassName", "com.mysql.jdbc.Driver");params.put("url", "jdbc:mysql://172.16.100.49:3306/test_sparkstreaming");params.put("username", "root");params.put("password", "root123456");Map<TopicAndPartition, Long> offsets = new HashMap<TopicAndPartition, Long>();DataSource ds = DruidDataSourceFactory.createDataSource(params);Connection conn = ds.getConnection();Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery("SELECT topic,partition,offset FROM kafka_offsets WHERE topic = 'tlqtest3'");while (rs.next()) {TopicAndPartition topicAndPartition = new TopicAndPartition(rs.getString(1), rs.getInt(2));offsets.put(topicAndPartition, rs.getLong(3));}final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();JavaDStream<String> lines = null;if (offsets.isEmpty()) {JavaPairInputDStream<String, String> pairDstream = KafkaUtils.createDirectStream(jsc, String.class,String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);lines = pairDstream.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {private static final long serialVersionUID = 1L;public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();offsetRanges.set(offsets);return rdd;}}).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {private static final long serialVersionUID = 1L;public Iterable<String> call(Tuple2<String, String> t) throws Exception {return Arrays.asList(t._2);}});} else {JavaInputDStream<String> dstream = KafkaUtils.createDirectStream(jsc, String.class, String.class,StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,new Function<MessageAndMetadata<String, String>, String>() {private static final long serialVersionUID = 1L;public String call(MessageAndMetadata<String, String> v1) throws Exception {return v1.message();}});lines = dstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {private static final long serialVersionUID = 1L;public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();offsetRanges.set(offsets);return rdd;}});}lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {private static final long serialVersionUID = 1L;public void call(JavaRDD<String> rdd) throws Exception {// 操作rddList<String> map = rdd.collect();String[] array = new String[map.size()];System.arraycopy(map.toArray(new String[map.size()]), 0, array, 0, map.size());List<String> l = Arrays.asList(array);Collections.sort(l);for (String value : l) {System.out.println(value);}// 保存offsetDataSource ds = DruidDataSourceFactory.createDataSource(params);Connection conn = ds.getConnection();Statement stmt = conn.createStatement();for (OffsetRange offsetRange : offsetRanges.get()) {ResultSet rs = stmt.executeQuery("select count(1) from kafka_offsets where topic='"+ offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'");if (rs.next()) {int count = rs.getInt(1);if (count > 0) {stmt.executeUpdate("update kafka_offsets set offset ='" + offsetRange.untilOffset()+ "' where topic='" + offsetRange.topic() + "' and partition='"+ offsetRange.partition() + "'");} else {stmt.execute("insert into kafka_offsets(topic,partition,offset) values('"+ offsetRange.topic() + "','" + offsetRange.partition() + "','"+ offsetRange.untilOffset() + "')");}}rs.close();}stmt.close();conn.close();}});return jsc;}public static void main(String[] args) {JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {public JavaStreamingContext create() {try {return createContext();} catch (Exception e) {throw new RuntimeException(e);}}};// JavaStreamingContext jsc =// JavaStreamingContext.getOrCreate("/user/tenglq/checkpoint", factory);JavaStreamingContext jsc = factory.create();jsc.start();jsc.awaitTermination();jsc.close();}
}
spark streaming读取kafka数据,记录offset相关推荐
- Spark Streaming读取Kafka数据的两种方式
Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...
- sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据
Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...
- java读写德卡数据_Spark Streaming 读取Kafka数据写入ES
简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...
- spark streaming 接收 kafka 数据java代码WordCount示例
1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming;import java.util.Prope ...
- sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案
下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...
- Spark中如何管理Spark Streaming消费Kafka的偏移量
spark streaming 版本 2.1 kafka 版本0.9.0.0 在这之前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的of ...
- 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例
文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...
- Spark Streaming使用Kafka保证数据零丢失
为什么80%的码农都做不了架构师?>>> 源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...
- 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义
注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...
- Spark Streaming 遇到 kafka
Spark Streaming 遇到 kafka 站酷 | 插画 搭建流程略,主要讲一下如何更好的结合使用,看图说话. Kafka 结合 Spark Streaming 实现在线的准实时流分析,除了保 ...
最新文章
- 如何设置SOLR的高亮 (highlight)?
- hamap java_一篇文章,让你真正了解Java
- 【论文解读】IPM2020 | 长短期兴趣建模的图神经网络新闻推荐系统
- 在VMware 14中安装Centos7
- 通过Python让数据产生价值,做到这4个字就够了
- Polynomial(HDU-6668)
- 过滤器 和 拦截器 6个区别,别再傻傻分不清了
- 腾讯云linux服务器怎么使用图形化界面_winscp使用方法,winscp使用方法详细说明...
- for vue 一行2列_前端开发面试问什么?vue面试中经常问到的问题?用vue想拿20k,面试题要这样答!...
- 离线版Google Chrome Frame下载
- 新手CrossApp 之demo SecondViewController小结
- 简述u盘安装计算机系统的方法,电脑系统安装常见的两种方式(U盘)
- 阿拉伯数字 0、1、2、3、4、5、6、7、8、9 书写规范
- mysql数据库名区分大小写_mysql数据库表名区分大小写
- 5.GitHub pytorch sentiment analysis(Transformer版)
- 解决unable to locate package net-tools
- PCB抄板过程中反推原理图的方法
- 美术 2.1 DCC工具链与引擎工具链
- 线程池有哪些状态?状态是如何转换的?
- 阿里巴巴python招聘_python阿里巴巴招聘网站爬虫