如下是pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.demo</groupId><artifactId>spark-streaming-demo</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>spark-streaming-demo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spark.version>1.6.2</spark.version><mysql-connector.version>5.1.35</mysql-connector.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql-connector.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.0.31</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>com.stratio.datasource</groupId><artifactId>spark-mongodb_2.11</artifactId><version>0.12.0</version></dependency></dependencies>
</project>

代码如下:

package com.fosun.spark_streaming_demo;import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;import javax.sql.DataSource;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;import com.alibaba.druid.pool.DruidDataSourceFactory;import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Tuple2;public class SparkstreamingOnDirectKafka {public static JavaStreamingContext createContext() throws Exception {SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));// jsc.checkpoint("/user/tenglq/checkpoint");Map<String, String> kafkaParams = new HashMap<String, String>();kafkaParams.put("metadata.broker.list", "fonova-hadoop1.fx01:9092,fonova-hadoop2.fx01:9092");kafkaParams.put("auto.offset.reset", "smallest");Set<String> topics = new HashSet<String>();topics.add("tlqtest3");final Map<String, String> params = new HashMap<String, String>();params.put("driverClassName", "com.mysql.jdbc.Driver");params.put("url", "jdbc:mysql://172.16.100.49:3306/test_sparkstreaming");params.put("username", "root");params.put("password", "root123456");Map<TopicAndPartition, Long> offsets = new HashMap<TopicAndPartition, Long>();DataSource ds = DruidDataSourceFactory.createDataSource(params);Connection conn = ds.getConnection();Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery("SELECT topic,partition,offset FROM kafka_offsets WHERE topic = 'tlqtest3'");while (rs.next()) {TopicAndPartition topicAndPartition = new TopicAndPartition(rs.getString(1), rs.getInt(2));offsets.put(topicAndPartition, rs.getLong(3));}final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();JavaDStream<String> lines = null;if (offsets.isEmpty()) {JavaPairInputDStream<String, String> pairDstream = KafkaUtils.createDirectStream(jsc, String.class,String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);lines = pairDstream.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {private static final long serialVersionUID = 1L;public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();offsetRanges.set(offsets);return rdd;}}).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {private static final long serialVersionUID = 1L;public Iterable<String> call(Tuple2<String, String> t) throws Exception {return Arrays.asList(t._2);}});} else {JavaInputDStream<String> dstream = KafkaUtils.createDirectStream(jsc, String.class, String.class,StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,new Function<MessageAndMetadata<String, String>, String>() {private static final long serialVersionUID = 1L;public String call(MessageAndMetadata<String, String> v1) throws Exception {return v1.message();}});lines = dstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {private static final long serialVersionUID = 1L;public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();offsetRanges.set(offsets);return rdd;}});}lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {private static final long serialVersionUID = 1L;public void call(JavaRDD<String> rdd) throws Exception {// 操作rddList<String> map = rdd.collect();String[] array = new String[map.size()];System.arraycopy(map.toArray(new String[map.size()]), 0, array, 0, map.size());List<String> l = Arrays.asList(array);Collections.sort(l);for (String value : l) {System.out.println(value);}// 保存offsetDataSource ds = DruidDataSourceFactory.createDataSource(params);Connection conn = ds.getConnection();Statement stmt = conn.createStatement();for (OffsetRange offsetRange : offsetRanges.get()) {ResultSet rs = stmt.executeQuery("select count(1) from kafka_offsets where topic='"+ offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'");if (rs.next()) {int count = rs.getInt(1);if (count > 0) {stmt.executeUpdate("update kafka_offsets set offset ='" + offsetRange.untilOffset()+ "'  where topic='" + offsetRange.topic() + "' and partition='"+ offsetRange.partition() + "'");} else {stmt.execute("insert into kafka_offsets(topic,partition,offset) values('"+ offsetRange.topic() + "','" + offsetRange.partition() + "','"+ offsetRange.untilOffset() + "')");}}rs.close();}stmt.close();conn.close();}});return jsc;}public static void main(String[] args) {JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {public JavaStreamingContext create() {try {return createContext();} catch (Exception e) {throw new RuntimeException(e);}}};// JavaStreamingContext jsc =// JavaStreamingContext.getOrCreate("/user/tenglq/checkpoint", factory);JavaStreamingContext jsc = factory.create();jsc.start();jsc.awaitTermination();jsc.close();}
}

spark streaming读取kafka数据,记录offset相关推荐

  1. Spark Streaming读取Kafka数据的两种方式

    Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...

  2. sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

    Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...

  3. java读写德卡数据_Spark Streaming 读取Kafka数据写入ES

    简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...

  4. spark streaming 接收 kafka 数据java代码WordCount示例

    1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming;import java.util.Prope ...

  5. sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案

    下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...

  6. Spark中如何管理Spark Streaming消费Kafka的偏移量

    spark streaming 版本 2.1 kafka 版本0.9.0.0 在这之前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的of ...

  7. 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例

    文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...

  8. Spark Streaming使用Kafka保证数据零丢失

    为什么80%的码农都做不了架构师?>>>    源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...

  9. 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义

    注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...

  10. Spark Streaming 遇到 kafka

    Spark Streaming 遇到 kafka 站酷 | 插画 搭建流程略,主要讲一下如何更好的结合使用,看图说话. Kafka 结合 Spark Streaming 实现在线的准实时流分析,除了保 ...

最新文章

  1. 如何设置SOLR的高亮 (highlight)?
  2. hamap java_一篇文章,让你真正了解Java
  3. 【论文解读】IPM2020 | 长短期兴趣建模的图神经网络新闻推荐系统
  4. 在VMware 14中安装Centos7
  5. 通过Python让数据产生价值,做到这4个字就够了
  6. Polynomial(HDU-6668)
  7. 过滤器 和 拦截器 6个区别,别再傻傻分不清了
  8. 腾讯云linux服务器怎么使用图形化界面_winscp使用方法,winscp使用方法详细说明...
  9. for vue 一行2列_前端开发面试问什么?vue面试中经常问到的问题?用vue想拿20k,面试题要这样答!...
  10. 离线版Google Chrome Frame下载
  11. 新手CrossApp 之demo SecondViewController小结
  12. 简述u盘安装计算机系统的方法,电脑系统安装常见的两种方式(U盘)
  13. 阿拉伯数字 0、1、2、3、4、5、6、7、8、9 书写规范
  14. mysql数据库名区分大小写_mysql数据库表名区分大小写
  15. 5.GitHub pytorch sentiment analysis(Transformer版)
  16. 解决unable to locate package net-tools
  17. PCB抄板过程中反推原理图的方法
  18. 美术 2.1 DCC工具链与引擎工具链
  19. 线程池有哪些状态?状态是如何转换的?
  20. 阿里巴巴python招聘_python阿里巴巴招聘网站爬虫

热门文章

  1. My Seventeenth Page - 赎金信 - By Nicolas
  2. Word转换PDF的三种方法使用教程
  3. 什么是生成器 — 一篇文章让你看懂
  4. App Store提交上线、市场推广专题
  5. css特殊符号编码大全
  6. PoE供电技术与应用
  7. windows 安装h2o_H2O-安装
  8. 科普知识------世界洋流[地球上有哪些洋流]
  9. C#,调用GDI32.DLL绘制图形的源程序
  10. IE11主页被篡改解决方法