前言

在这之前做SparkStreaming连接Kafka,我会这么写:

val sparkConf = new SparkConf().setAppName("Spark2Kafka")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("/xxx/xxx")
val kafkaParameters = Map[String, String]("metadata.broker.list" -> "Master:9092,Worker1:9092,Worker2:9092")
val topics = Set[String]("TOPIC_NAME")
val streaming = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParameters, topics)

不过最近我们需要读取数据中心的Kafka数据,人家直接给了我三个值:username、password、group_id,我刚开始还一头雾水:这是干啥啊?于是还按照之前的写法,发现:

INFO consumer.SimpleConsumer: Reconnect due to error:
java.nio.channels.ClosedChannelException

原因分析

数据中心给的这三个字段看样子是用来做登录验证的,于是参考了官网链接

原来登录别人的Kafka需要SSL协议,这个时候就需要用户名和密码了,并且:0.8版本是不支持的,因此要依赖1.0版本的

详细流程

  • 依赖
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 2.4.3
  • 写spark代码参考官网
package cn.uniskimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject Spark2Kafka {def main(args: Array[String]) {val kafkaParams = Map[String, Object]("bootstrap.servers" -> "10.161.xx.xxx:9092,10.161.xx.xxx:9092,10.161.xx.xxx:9092","key.deserializer" -> classOf[StringDeserializer],"security.protocol" -> "SASL_PLAINTEXT","sasl.mechanism" -> "PLAIN","value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","group.id" -> "EC_group","enable.auto.commit" -> (false: java.lang.Boolean))val sparkConf = new SparkConf().setAppName("Spark2_KafkaSASL")val streamingContext = new StreamingContext(sparkConf, Seconds(5))val topics = Array("CB_JF_ACT1")val stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams))// Get the lines, split them into words, count the words and printstream.map(record => (record.key, record.value))stream.print()// Start the computationstreamingContext.start()streamingContext.awaitTermination()}
}
  • 在提交spark代码的服务器上创建jass.conf文件,内容如下
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxxx"
password="xxxx";
};
  • 提交spark代码(和平时有点区别哦)
    注意–先要做这样一个操作:
export SPARK_KAFKA_VERSION=0.10

如果不做,直接报错:

Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaUtils$
spark2-submit \
--master yarn \
--deploy-mode client \
--files /home/analysis/jass.conf \
--driver-java-options "-Djava.security.auth.login.config=./jass.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jass.conf" \
--class cn.unisk.Spark2Kafka \
/home/analysis/ss.jar

这里比平时多加了三个配置

--files /home/analysis/jass.conf \
--driver-java-options "-Djava.security.auth.login.config=./jass.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jass.conf" \

主要日志信息

19/05/21 15:59:53 INFO kafka010.DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@7cee5299
19/05/21 15:59:53 INFO dstream.ForEachDStream: Slide time = 5000 ms
19/05/21 15:59:53 INFO dstream.ForEachDStream: Storage level = Serialized 1x Replicated
19/05/21 15:59:53 INFO dstream.ForEachDStream: Checkpoint interval = null
19/05/21 15:59:53 INFO dstream.ForEachDStream: Remember interval = 5000 ms
19/05/21 15:59:53 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@16a44d14
19/05/21 15:59:53 INFO consumer.ConsumerConfig: ConsumerConfig values: metric.reporters = []metadata.max.age.ms = 300000partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]reconnect.backoff.ms = 50sasl.kerberos.ticket.renew.window.factor = 0.8max.partition.fetch.bytes = 1048576bootstrap.servers = [10.161.25.240:9092, 10.161.25.241:9092, 10.161.25.242:9092]ssl.keystore.type = JKSenable.auto.commit = falsesasl.mechanism = PLAINinterceptor.classes = nullexclude.internal.topics = truessl.truststore.password = nullclient.id = ssl.endpoint.identification.algorithm = nullmax.poll.records = 2147483647check.crcs = truerequest.timeout.ms = 40000heartbeat.interval.ms = 3000auto.commit.interval.ms = 5000receive.buffer.bytes = 65536ssl.truststore.type = JKSssl.truststore.location = nullssl.keystore.password = nullfetch.min.bytes = 1send.buffer.bytes = 131072value.deserializer = class org.apache.kafka.common.serialization.StringDeserializergroup.id = EC_groupretry.backoff.ms = 100ssl.secure.random.implementation = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05ssl.trustmanager.algorithm = PKIXssl.key.password = nullfetch.max.wait.ms = 500sasl.kerberos.min.time.before.relogin = 60000connections.max.idle.ms = 540000session.timeout.ms = 30000metrics.num.samples = 2key.deserializer = class org.apache.kafka.common.serialization.StringDeserializerssl.protocol = TLSssl.provider = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.keystore.location = nullssl.cipher.suites = nullsecurity.protocol = SASL_PLAINTEXTssl.keymanager.algorithm = SunX509metrics.sample.window.ms = 30000auto.offset.reset = latest19/05/21 15:59:54 INFO authenticator.AbstractLogin: Successfully logged in.
19/05/21 15:59:54 INFO consumer.ConsumerConfig: ConsumerConfig values: metric.reporters = []metadata.max.age.ms = 300000partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]reconnect.backoff.ms = 50sasl.kerberos.ticket.renew.window.factor = 0.8max.partition.fetch.bytes = 1048576bootstrap.servers = [10.161.25.xxx:9092, 10.161.25.xxx:9092, 10.161.25.xxx:9092]ssl.keystore.type = JKSenable.auto.commit = falsesasl.mechanism = PLAINinterceptor.classes = nullexclude.internal.topics = truessl.truststore.password = nullclient.id = consumer-1ssl.endpoint.identification.algorithm = nullmax.poll.records = 2147483647check.crcs = truerequest.timeout.ms = 40000heartbeat.interval.ms = 3000auto.commit.interval.ms = 5000receive.buffer.bytes = 65536ssl.truststore.type = JKSssl.truststore.location = nullssl.keystore.password = nullfetch.min.bytes = 1send.buffer.bytes = 131072value.deserializer = class org.apache.kafka.common.serialization.StringDeserializergroup.id = EC_groupretry.backoff.ms = 100ssl.secure.random.implementation = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05ssl.trustmanager.algorithm = PKIXssl.key.password = nullfetch.max.wait.ms = 500sasl.kerberos.min.time.before.relogin = 60000connections.max.idle.ms = 540000session.timeout.ms = 30000metrics.num.samples = 2key.deserializer = class org.apache.kafka.common.serialization.StringDeserializerssl.protocol = TLSssl.provider = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.keystore.location = nullssl.cipher.suites = nullsecurity.protocol = SASL_PLAINTEXTssl.keymanager.algorithm = SunX509metrics.sample.window.ms = 30000auto.offset.reset = latest19/05/21 15:59:54 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
19/05/21 15:59:54 INFO utils.AppInfoParser: Kafka commitId : unknown
......-------------------------------------------
Time: 1558425595000 ms
-------------------------------------------
ConsumerRecord(topic = CB_JF_ACT1, partition = 8, offset = 1764926986, CreateTime = 1558425597715, checksum = 1171090595, serialized key size = 22, serialized value size = 247, key = JF_ACT1,8,2142980219,0, value = ACTJF115303700642285000001299800046163040,03000001299800046163040UCR_ACT1.TF_O_LEAVEREALFEEU2019-05-18 01:29:18.00341905532553218619012346215532861901234621553225845585531930192041152019-05-18:09:29:182019-05-18:07:27:08)
ConsumerRecord(topic = CB_JF_ACT1, partition = 8, offset = 1764926987, CreateTime = 1558425597716, checksum = 581909573, serialized key size = 22, serialized value size = 249, key = JF_ACT1,8,2142980220,0, value = ACTJF115303700642294000001299800046167850,00000001299800046167850UCR_ACT1.TF_O_LEAVEREALFEEU2019-05-18 01:29:18.0034190478047801111801256415478011180125641547802101031014333393335341152019-05-18:09:29:182019-05-18:09:24:46)

参考

SparkStreaming安全消费Kafka数据相关推荐

  1. Storm 消费Kafka数据及相关异常解决

    Storm 消费Kafka数据及相关异常解决 问题 KafkaTopoDemo类 bolt类 问题 storm报错:Exception in thread "main" java. ...

  2. sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案

    下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...

  3. kafka 的pom文件_Flink 消费 Kafka 数据

    kafka核心概念: Kafka 是一个消息队列,生产者向消息队列中写入数据,消费者从队列中获取数据并进行消费.可以认为一个 Topic 就是一个队列,每个 Topic 又会被分成多个 Partiti ...

  4. java消费kafka数据之后,进行堆积之后在插入数据库

    java高频的获取kafka数据,导致数据库数据一致在高频读写,为了降低数据库的高频连接搞高频读写,可以将数据堆积一段时间之后,进行插入数据库操作. 主要采用了队列和缓存,将获取到的数据放入java队 ...

  5. kafka partition分配_logstash消费kafka数据,partition分配不均衡

    原因已经查明 消费kafka的时候,如果使用topics_pattern的方式,默认partition_assignment_strategy为Range,应该使用 partition_assignm ...

  6. SparkStreaming消费kafka数据时出现序列化问题 org.apache.kafka.common.serialization.StringDeserializer could not b

    问题呈现 Invalid value org.apache. kafka.common.serialization.StringSerializer for configuration key.ser ...

  7. python kafka消费实时数据,python生产和消费kafka数据

    安装kafka-python pip install kafka-python 生产者 from kafka import KafkaProducer # 有时候导入包会报错,使用pip uninst ...

  8. flume消费kafka数据太慢_kafka补充01

    为什么高吞吐? •写数据 –1.页缓存技术 •kafka写出数据时先将数据写到操作系统的pageCache上,由操作系统自己决定什么时候将数据写到磁盘上 –2.磁盘顺序写 •磁盘顺序写的性能会比随机写 ...

  9. Flink Connectors之消费Kafka数据相关参数以及API说明

    1-参数设置 以下参数都必须/建议设置上 1.订阅的主题 2.反序列化规则 3.消费者属性-集群地址 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理) 5.消费者属性-of ...

最新文章

  1. java输入字符串异常_设计一个 Java 程序,自定义异常类,从命令行(键盘)输入一个字符串,如果该字符串值为“XYZ”。。。...
  2. ISME:污水厂抗性组受细菌组成和基因交换驱动且出水中抗性表达活跃(一作解读)
  3. 2019年AI拐点将出现:一次政府,资本与技术公司的围炉深省
  4. Matlab快速入门
  5. spring boot 日志乱码_3. Spring Boot日志
  6. adb命令 android 串口_android逆向笔记之初学者常用adb命令
  7. ERROR: Could not install packages due to an EnvironmentError: [Errno 13] 权限不够的解决办法
  8. Linux centos6 命令行配置网络连接
  9. JAVA重写和重载的区别
  10. 【基于Pytorch的手写汉字识别】
  11. 苹果Mac装双系统对电脑有什么影响?Mac装双系统的利弊分析
  12. 永远的道长——林正英
  13. 使用阿里云的国内镜像仓库地址
  14. 51单片机入门学习篇-led灯、按键、数码管、中断
  15. vue系列教程之微商城项目|项目介绍
  16. 高等几何——变换群与几何学8
  17. 作业周转时间以及平均等待时间
  18. 21年1.9c#halcon机器视觉软件系统框架源码visi onpro
  19. go juju/ratelimit 简单使用示例
  20. UVA 11205 - The broken pedometer

热门文章

  1. 使用js生成条形码以及二维码
  2. 一小段Python的sha256/md5/sha1验证
  3. 基于Visua C++2010 与 Windows 7 SDK开发windows7 Shell应用(1)-搜索文件夹
  4. 孤读Paper——《FCOS: Fully Convolutional One-Stage Object Detection》
  5. songEagle开发系列:如何让文章实时保存的问题
  6. ELK6.0日志从收集到处理完整版教程(二)
  7. IBatis的resultMap使用
  8. nginx服务+LEMP搭建
  9. GBin1专题之Web热点#10
  10. winhex搜索中的偏移问题?请高手指点