2019独角兽企业重金招聘Python工程师标准>>>

[toc]

安装

pykafka github

$ pip install pykafka$ conda install -c conda-forge pykafka注意kafka版本只支持 kafka 1.1, 1.0,0.11, 0.10, 0.9,0.8 (201902)该作者在https://github.com/dpkp/kafka-python/pull/1152 这个推送增加了kerberos支持

验证kerberos

java或者文件中 对应python参数 描述
security.protocol security_protocol 安全协议
kerberos.domain.name sasl_kerberos_domain_name 域名
sasl.kerberos.service.name sasl_kerberos_service_name 服务名
sasl.enabled.mechanisms&sasl.mechanism.inter.broker.protocol sasl_mechanism 认证机制
principal sasl_plain_username 用户租户名称

kerberos知识

配置一般在consumer.properties中
拆解一个Principal:
xianglei/dmp-master1.hadoop@HADOOP.COM
一个完整的Principal由3个部分构成。用户名/FQDN(Full Quafilied Domain Name)的主机名@REALM(受保护的域,全大写)当然这个用户名需要是Linux下存在的用户FQDN全限定域名,就是一定要带上hostname.domain这种形式,当然,如果你的主机并没有给出domain,那么不写域名也可以。反正就是要全部的主机名加域名(如果存在域名的话)。但实际上,在Kerberos里面,这个并不称之为主机名,而是叫做Instance,实例名,他可以不是任何服务器的主机名称,但是便于理解和认识,我们还是先把他当初主机名来看待吧。REALM,受到Kerberos保护的域名称,就是一类或一组受到Kerberos保护服务的服务器集合,你可以想象成Windows里面的域。由于一个KDC可以同时保护多个域,比如你可以在一个KDC上既保护HADOOP服务器组,也保护MYSQL服务器组,所以我们通常会使用域名来进行区别。如果你的hostname里面使用了domain name,那么你必须在Principal的第二部分写完整,否则KDC将无法验证主机的合法性,加密的tgt是要带着主机名信息的。还有,特别需要注意的是,这里面第二部分的domain(域名),第三部分的realm(域),在中文里的字是一样,但是英文单词完全不同,他们所表达的含义也完全不同。由于通常Kerberos的Realm部分也会写成域名的形式,所以就会让人迷惑,而实际上,你把realm部分理解成windows里面的workgroup或者home这种域也是可以的。名称可以随便起,不一定用你的真实域名。只是个区分不同服务集合的代号。

使用

资料

我是用来连接华为kafka的,测试可以通过kerberos验证。具体代码就不贴了,引用一下其他作者的,感谢#coding=utf8from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=["xxxx:9200"],security_protocol="SASL_PLAINTEXT",sasl_mechanism="GSSAPI",sasl_kerberos_service_name="kafka")
print "connect success."
future = producer.send("xxxx", "test")
result = future.get(timeout=60)
print "send success."

其他示例代码

原贴

kafka简介(摘自百度百科)一、简介:
详见:https://blog.csdn.net/Beyond_F4/article/details/80310507二、安装
详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689              三、按照官网的样例,先跑一个应用
1、生产者:
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092'])  #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]for i in range(3):msg = "msg%d" % iproducer.send('test', msg)
producer.close()2、消费者(简单demo):
from kafka import KafkaConsumerconsumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))启动后生产者、消费者可以正常消费。3、消费者(消费群组)
from kafka import KafkaConsumerconsumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['172.21.10.136:9092'])for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力4、消费者(读取目前最早可读的消息)
from kafka import KafkaConsumerconsumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['172.21.10.136:9092'])for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{'smallest': 'earliest', 'largest': 'latest'}5、消费者(手动设置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartitionconsumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])print consumer.partitions_for_topic("test")  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))6、消费者(订阅多个主题)
from kafka import KafkaConsumer
from kafka.structs import TopicPartitionconsumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))7、消费者(手动拉取消息)
from kafka import KafkaConsumer
import timeconsumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:msg = consumer.poll(timeout_ms=5)   #从kafka获取消息print msgtime.sleep(1)8、消费者(消息挂起与恢复)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import timeconsumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:print numprint consumer.paused()   #获取当前挂起的消费者msg = consumer.poll(timeout_ms=5)print msgtime.sleep(2)num = num + 1if num == 10:print "resume..."consumer.resume(TopicPartition(topic=u'test', partition=0))print "resume......"pause执行后,consumer不能读取,直到调用resume后恢复。

示例代码2

原贴

"""生产者"""
def produce(self):producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)for i in range(4):msg = "msg%d" %iproducer.send(self.topic,key=str(i),value=msg)producer.close()"""一个消费者消费一个topic"""
def consume(self):#consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)print consumer.partitions_for_topic(self.topic)  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,从第1个偏移量消费for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic,message.partition,message.offset, message.key,message.value))"""一个消费者订阅多个topic """
def consume2(self):consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量
for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))"""消费者(手动拉取消息)"""
def consume3(self):consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:message = consumer.poll(timeout_ms=5)   #从kafka获取消息if message:print messagetime.sleep(1)

示例代码3

原贴

#coding=utf-8
from pykafka import KafkaClient
import codecs
import logging
logging.basicConfig(level = logging.INFO)client = KafkaClient(hosts = "172.16.82.163:9091")#生产kafka数据,通过字符串形式
def produce_kafka_data(kafka_topic):with kafka_topic.get_sync_producer() as producer:for i in range(4):producer.produce('test message' + str(i ** 2))#消费kafka数据
def consume_simple_kafka(kafka_topic, timeout):consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms = timeout)for message in consumer:if message is not None:print message.offset, message.value#消费同一份kafka topic时,建议使用 get_balanced_consumer(),暂时不能使用
#问题:kazoo.handlers.threading.KazooTimeoutError: Connection time-out
def consume_kafka(kafka_topic, zkhost):balanced_consumer = kafka_topic.get_balanced_consumer(consumer_group = "testgroup",auto_commit_enable = False,zookeeper_connect = zkhost,#zookeeper = zkhost,zookeeper_connection_timeout_ms = 6000,consumer_timeout_ms = 10000,)for message in balanced_consumer:if message is not None:print message.offset, message.value#通过文件,往kafka刷数据
def produce_kafka_file(filename, kafka_topic):with kafka_topic.get_sync_producer() as producer:with codecs.open(filename, "r", "utf8") as rf:for line in rf:line = line.strip()if not line:continueproducer.produce(line)#===========================================================topic = client.topics["mytest"]#在consumer_timeout_ms内没有任何信息返回,则中断接受消息
cosumer = topic.get_simple_consumer(consumer_timeout_ms = 10000)
cnt = 0
for message in cosumer:if message is not None:print message.offset, message.valuecnt += 1
print cnt

kafka实战教程

转载于:https://my.oschina.net/u/3005325/blog/3016005

python kafka kerberos 验证 消费 生产相关推荐

  1. python子进程进行kinit认证_使用kafka-python客户端进行kafka kerberos认证

    之前说过python confluent kafka客户端做kerberos认证的过程,如果使用kafka python客户端的话同样也可以进行kerberos的认证,具体的认证机制这里不再描述,主要 ...

  2. python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset

    spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...

  3. kafka 查看待消费数据_kafka查看消费数据

    一.如何查看 在老版本中,使用kafka-run-class.sh 脚本进行查看.但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-co ...

  4. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  5. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

  6. 【kafka】kafka 指定分区消费 不会触发 reblance

    文章目录 1.概述 2.验证 2.1 2个都是subscribeTopic 2.2 指定消费与全部消费 2.3 两个指定消费 2.4 2个都消费同样的分区呢? 1.概述 今天在博客:Kafka-消费, ...

  7. 【Kafka】二.Kafka消息发布/消费流程

    Kafka 通过对消费方进行分组管理来支持消息一写多读. 我画的图:工具(processon在线画图) 这个 Topic 分为 4 个 Partition,就是图中的 P1到 P4,上部的生产方根据规 ...

  8. java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者

    转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...

  9. python使用正则表达式验证邮箱地址语法有效性

    python使用正则表达式验证邮箱地址语法有效性 #python使用正则表达式验证邮箱地址语法有效性 import re # mail regular expression formula# rege ...

最新文章

  1. miniui文件上传 linux,MINIUI grid学习笔记
  2. Delphi 中取本机的计算机名、IP地址、Windows登录的用户名
  3. mysql获取服务器的剩余空间_mysql服务器内存耗尽,并占用大量swap
  4. Linux启动/停止/重启Mysql数据库的方法
  5. 反射获取空参数成员方法并运行
  6. [深入浅出Cocoa]iOS网络编程之NSStream
  7. ADF:使用HTTP POST方法进行URL任务流调用
  8. 【LeetCode笔记】253. 会议室 II(Java、偏数学)
  9. POJ 1003 Hangover
  10. 95-240-052-原理-State-MemoryStateBackend
  11. uploadify 上传时丢失session
  12. CF1139D Steps to One
  13. css字体倾斜角度_css如何设置字体倾斜样式
  14. 哨兵2号L1C数据下载及预处理
  15. java web 邮件_Java Web(十二) JavaMail发送邮件
  16. CSS3D魔法——旋转魔方
  17. 这么久了,深爱的人,还好吗?
  18. Linux快捷键总结
  19. oracle smon 执行记录,Oracle SMON进程中系统监视进程SMON
  20. 手把手教你读财报----银行业---第五课

热门文章

  1. [转载] Python中pandas dataframe删除一行或一列:drop函数
  2. [转载] python中的eval函数
  3. BZOJ2325[ZJOI2011]道馆之战——树链剖分+线段树
  4. 自由缩放属性-resize(禁止textarea的自由缩放尺寸功能)
  5. 多线程:C#线程同步lock,Monitor,Mutex,同步事件和等待句柄(上)
  6. 《一分钟经理人》学习笔记第五部分---一分钟表扬为什么有效
  7. 目标检测回归损失函数——IOU、GIOU、DIOU、CIOU、EIOU
  8. 【字符串】面试题之替换子串
  9. oracle中表空间实例,oracle的表空间实例详解
  10. mysql socket错误处理_Mysql 报错处理