王家林老师的课程:2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式作业。

    一、基本背景

Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式。具体的流程是这样的:

1、Direct方式是直接连接到kafka的节点上获取数据了。

2、基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。

3、当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。;

2、高性能:不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复;

3、一次且仅一次的事务机制:Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。

Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

二、配置文件及编码

     flume版本:1.6.0,此版本直接支持到kafka,不用在单独安装插件。

     kafka版本2.10-0.8.2.1,必须是0.8.2.1,刚开始我用的是0.10,结果出现了下

      四、各类错误大全的第2个错误。

     spark版本:1.6.1。

     

    

      kafka配文件:producer.properties,红色文字为特别要注意的配置坑,呵呵

    

#agentsection

producer.sources= s

producer.channels= c

producer.sinks= r

#sourcesection

producer.sources.s.type= exec

producer.sources.s.command= tail -f -n+1 /opt/test/test.log

producer.sources.s.channels= c

# Eachsink's type must be defined

producer.sinks.r.type= org.apache.flume.plugins.KafkaSink

producer.sinks.r.metadata.broker.list=192.168.0.10:9092

producer.sinks.r.partition.key=0

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0

producer.sinks.r.max.message.size=1000000

producer.sinks.r.producer.type=sync

producer.sinks.r.custom.encoding=UTF-8

producer.sinks.r.custom.topic.name=flume2kafka2streaming930

#Specifythe channel the sink should use

producer.sinks.r.channel= c

# Eachchannel's type is defined.

producer.channels.c.type= memory

producer.channels.c.capacity= 1000

producer.channels.c.transactionCapacity= 100

核心代码如下:

 SparkConf conf = SparkConf().setMaster().setAppName().setJars(String[] {})Map<StringString> kafkaParameters = HashMap<StringString>()kafkaParameters.put()Set<String> topics =  HashSet<String>()topics.add()JavaPairInputDStream<StringString> lines = KafkaUtils.(jscString.String.StringDecoder.StringDecoder.kafkaParameterstopics)JavaDStream<String> words = lines.flatMap(FlatMapFunction<Tuple2<StringString>String>() { Iterable<String> (Tuple2<StringString> tuple) Exception {Arrays.(tuple..split())}})JavaPairDStream<StringInteger> pairs = words.mapToPair(PairFunction<StringStringInteger>() {Tuple2<StringInteger> (String word) Exception {Tuple2<StringInteger>(word)}})JavaPairDStream<StringInteger> wordsCount = pairs.reduceByKey(Function2<IntegerIntegerInteger>() { Integer (Integer v1Integer v2) Exception {v1 + v2}})wordsCount.print()jsc.start()jsc.awaitTermination()jsc.close()

  三、启动脚本

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka broker

bin/kafka-server-start.sh config/server.properties &

创建topic

bin/kafka-topics.sh --create --zookeeper 192.168.0.10:2181 --replication-factor 1 --partitions 1 --topic flume2kafka2streaming930

启动flume

bin/flume-ng agent --conf conf/  -f conf/producer.properties  -n producer -Dflume.root.logger=INFO,console

bin/spark-submit --class com.dt.spark.sparkstreaming.SparkStreamingOnKafkaDirected  --jars /lib/kafka_2.10-0.8.2.1/kafka-clients-0.8.2.1.jar,/lib/kafka_2.10-

0.8.2.1/kafka_2.10-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/metrics-core-2.2.0.jar,/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar --master local[5] SparkApps.jar

echo "hadoop spark hive storm spark hadoop hdfs" >> /opt/test/test.log

echo "hive storm " >> /opt/test/test.log

echo "hdfs" >> /opt/test/test.log

echo "hadoop spark hive storm spark hadoop hdfs" >> /opt/test/test.log

    输出结果如下:

* 结果如下:* -------------------------------------------* Time: 1475282360000 ms* -------------------------------------------*(spark,8)*(storm,4)*(hdfs,4)*(hive,4)*(hadoop,8)


    四、各类错误大全

    1、Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main

        一概是没有提交jar包,一概会报错,无法执行,一概在submit脚本里添加:

       

bin/spark-submit --class com.dt.spark.sparkstreaming.SparkStreamingOnKafkaDirected  --jars /lib/kafka_2.10-0.8.2.1/kafka-clients-0.8.2.1.jar,/lib/kafka_2.10-

0.8.2.1/kafka_2.10-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/metrics-core-2.2.0.jar,/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar --master local[5] SparkApps.jar

   2、Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker。

         上stackoverflow.com及spark官网查询,这个是因为版本不兼容引起。官网提供的版本:Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1

王家林_DT大数据梦工厂

简介: 王家林:DT大数据梦工厂创始人和首席专家.微信公众号DT_Spark .

联系邮箱18610086859@vip.126.com

电话:18610086859

微信号:18610086859

微博为:http://weibo.com/ilovepains

转载于:https://blog.51cto.com/36006798/1858272

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式...相关推荐

  1. Flume下读取kafka数据后再打把数据输出到kafka,利用拦截器解决topic覆盖问题

    1:如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指 ...

  2. Doris系列之导入Kafka数据操作

    Doris系列 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Sp ...

  3. 利用Flume将MySQL表数据准实时抽取到HDFS

    转自:http://blog.csdn.net/wzy0623/article/details/73650053 一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取 ...

  4. flume mysql hdfs_利用Flume将MySQL表数据准实时抽取到HDFS

    一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问.这种方式只需要很少量的配置即可完成数据抽 ...

  5. 采集京东网数据的10个经典方法

    京东网数据采集全网抓取网页数据.商品销量.全网搜索.网页爬虫.采集网站数据.网页数据采集软件.python爬虫.HTM网页提取.APP数据抓包.APP数据采集.一站式网站采集技术.BI数据的数据分析. ...

  6. 采集学校网站数据的10个经典方法

    采集学校网站数据的10个经典方法 学校网站数据采集全网抓取网页数据.全网搜索.网页爬虫.采集网站数据.网页数据采集软件.python爬虫.HTM网页提取.APP数据抓包.APP数据采集.一站式网站采集 ...

  7. Flume采集日志数据

    一.为什么选用Flume? Flume vs Logstash vs Filebeat 当时选择数据采集工具时,我们主要参考了市面上热度比较高的Flume和Logstash还有Filebeat,据目前 ...

  8. 计算机读取数据的接囗教程,八爪鱼采集怎样获取数据API链接 八爪鱼采集获取数据API链接的方法...

    今天给大家带来八爪鱼采集怎样获取数据API链接,八爪鱼采集获取数据API链接的方法,让您轻松解决问题.八爪鱼采集如何获取数据API链接 具体方法如下:1 java.cs.php示例代码点击下载 这个教 ...

  9. 详细讲解如何用爬虫工具批量采集阿里巴巴商品数据

    阿里巴巴是全球最大的B2B电子商务平台之一,它提供了海量的商品信息,为采购商和供应商间牵线搭桥.然而,要想在如此庞大的商品库中找到适合自己的商品,需要耗费大量的时间和精力.为了提高工作效率,我们可以使 ...

  10. 2016年大数据Spark“蘑菇云”行动代码学习之AdClickedStreamingStats模块分析

    2016年大数据Spark"蘑菇云"行动代码学习之AdClickedStreamingStats模块分析     系统背景:用户使用终端设备(IPAD.手机.浏览器)等登录系统,系 ...

最新文章

  1. 【Qt】一个使用QEventLoop时,遇到的教训
  2. Head First JSP---随笔八(传统标记)
  3. 计算机组装方案及分析,《计算机组装与维护》课程整体教学方案
  4. 洛谷——P1603 斯诺登的密码
  5. idea打war的问题
  6. 休眠:保存vs持久并保存或更新
  7. Linux基础之命令练习Day2-useradd(mod,del),groupadd(mod,del),chmod,chown,
  8. 层次聚类算法 算法_聚类算法简介
  9. Go语言基础之1--标识符、关键字、变量和常量、数据类型、Go的基本程序结构、Golang的特性...
  10. shell 字典_腾讯T4周末不陪对象,就为了手打这份shell编程笔记
  11. 【软件质量】软件质量
  12. linux 下  qserialport waitforreadyread_北师大版初中数学八年级(下)第二章第一节不等关系(精品)...
  13. number string java_java基础系列(一):Number,Character和String类及操作
  14. linux常用分区大小,Linux基本知识点总结——硬盘分区及LVM
  15. Hololens Holographic Remoting
  16. 使用mybatisplus中的selectone方法,查询一条信息。报错
  17. adb 查看固件版本
  18. 路由器克隆电脑mac地址,破解电脑连接固定网线ip
  19. Tomcat+Nginx动静分离
  20. 无人驾驶传感器之GPS和IMU

热门文章

  1. 二分法02:寻找第一个和最后一个的满足条件的位置
  2. 65 ----点到平面及直线的距离、两异面直线间的距离
  3. 线程与进程的区别及其通信方式
  4. Docker教程:dokcer的配置和命令
  5. python 大图找小图_20 M 的图片能压缩到 2 M?20行Python代码,无损压缩千百张图片...
  6. mongodb 默认端口号_MongoDB集群方案ReplicaSet
  7. 计算机二级python真题3和答案_计算机二级python真题:第3套综合应用题
  8. start-dfs.sh\stop-dfs.sh启动失败
  9. 力扣-80 删除有序数组中的重复项 II
  10. Spring常用注解用法总结