目录

1. Kafka安装与使用

1.1 下载

1.2 安装

1.3 配置

1.4 运行

1.4.1 启动zookeeper

1.4.2 启动kafka

1.5 第一个消息

1.5.1 创建一个topic

1.5.2. 创建一个消息消费者

2. kafka清理数据和topic

3. python操作kafka

4. Python创建自定义的Kafka Topic

4.1 BROKER 的全局配置

4.2 CONSUMER 配置

4.3 PRODUCER 的配置


1. Kafka安装与使用

1.1 下载

可以在kafka官网:http://kafka.apache.org/downloads
下载到最新的kafka安装包,选择下载二进制版本的tgz文件,根据网络状态可能需要fq,这里我们选择的版本是kafka_2.11-1.1.0,目前的最新版。

1.2 安装

Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka基本上是运行在linux服务器上,因此我们这里也使用linux来开始今天的实战。

首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用。

说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka压缩包解压到/home目录。

1.3 配置

在kafka解压目录下下有一个config的文件夹,里面放置的是我们的配置文件

consumer.properites 消费者配置,这个配置文件用于配置于2.5节中开启的消费者,此处我们使用默认的即可

producer.properties 生产者配置,这个配置文件用于配置于2.5节中开启的生产者,此处我们使用默认的即可

server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置

broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可
listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:listeners=PLAINTEXT://192.168.180.128:9092。并确保服务器的9092端口能够访问
zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,使用默认配置即可
zookeeper.connect=localhost:2181
当我们有多个应用,在不同的应用中都使用zookeer,都使用默认的zk端口的话就会2181端口冲突,我们可以设置自己的端口号,在config文件夹下zookeeper.properties文件中修改为clientPort=2185

也就是zk开放接口为2185.

同时修改kafka的接入端口,server.properties文件中修改为

zookeeper.connect=localhost:2185

这样我们就成功修改了kafka里面的端口号

1.4 运行

1.4.1 启动zookeeper

cd进入kafka解压目录,输入

bin/zookeeper-server-start.sh config/zookeeper.properties

启动zookeeper成功后会看到如下的输出

 1.4.2 启动kafka

cd进入kafka解压目录,输入

bin/kafka-server-start.sh config/server.properties

启动kafka成功后会看到如下的输出

1.5 第一个消息

1.5.1 创建一个topic

Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷

在kafka解压目录打开终端,输入

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

创建一个名为test的topic

在创建topic后可以通过输入

bin/kafka-topics.sh --list --zookeeper localhost:2181

来查看已经创建的topic

1.5.2. 创建一个消息消费者

在kafka解压目录打开终端,输入

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

可以创建一个用于消费topic为test的消费者

消费者创建完成之后,因为还没有发送任何数据,因此这里在执行后没有打印出任何数据

不过别着急,不要关闭这个终端,打开一个新的终端,接下来我们创建第一个消息生产者

3. 创建一个消息生产者

在kafka解压目录打开一个新的终端,输入

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在执行完毕后会进入的编辑器页面

在发送完消息之后,可以回到我们的消息消费者终端中,可以看到,终端中已经打印出了我们刚才发送的消息

2. kafka清理数据和topic

1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

2、Kafka 删除topic的命令是:

./bin/kafka-topics  --delete --zookeeper 【zookeeper server】  --topic 【topic name】

如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion

你可以通过命令:

./bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic

此时你若想真正删除它,可以如下操作:

(1)登录zookeeper客户端:命令:./bin/zookeeper-client

(2)找到topic所在的目录:ls /brokers/topics

(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。

另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,

如果你删除了此处的topic,那么marked for deletion 标记消失

zookeeper 的config中也有有关topic的信息: ls /config/topics/【topic name】暂时不知道有什么用

总结:

彻底删除topic:

1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉broker下的topic即可。

3. python操作kafka

我们已经知道了kafka是一个消息队列,下面我们来学习怎么向kafka中传递数据和如何从kafka中获取数据

首先安装python的kafka库

pip install kafka

按照官网的样例,先跑一个应用

1、生产者demo:

from kafka import KafkaProducer
from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=['broker1:1234'])# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')# Block for 'synchronous' sends
try:record_metadata = future.get(timeout=10)
except KafkaError:# Decide what to do if produce request failed...log.exception()pass# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})# produce asynchronously
for _ in range(100):producer.send('my-topic', b'msg')def on_send_success(record_metadata):print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)def on_send_error(excp):log.error('I am an errback', exc_info=excp)# handle exception# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)# block until all async messages are sent
producer.flush()# configure multiple retries
producer = KafkaProducer(retries=5)
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253

启动后生产者便可以将字节流发送到kafka服务器.

2、消费者(简单demo):

from kafka import KafkaConsumerconsumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])  #参数为接收主题和kafka服务器地址# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移
for message in consumer:  # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))123456789

启动后消费者可以从kafka服务器获取数据.

3、消费者(消费群组)

from kafka import KafkaConsumer
# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))123456

启动多个消费者,只有其中某一个成员可以消费到,满足要求,消费组可以横向扩展提高处理能力

4、消费者(读取目前最早可读的消息)

from kafka import KafkaConsumerconsumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])for message in consumer:print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))1234567

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

5、消费者(手动设置偏移量)

# ==========读取指定位置消息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartitionconsumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])print(consumer.partitions_for_topic("test"))  #获取test主题的分区信息
print(consumer.topics())  #获取主题列表
print(consumer.subscription())  #获取当前消费者订阅的主题
print(consumer.assignment())  #获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic='test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))12345678910111213141516

6、消费者(订阅多个主题)

# =======订阅多个消费者==========from kafka import KafkaConsumer
from kafka.structs import TopicPartitionconsumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
print(consumer.topics())
print(consumer.position(TopicPartition(topic='test', partition=0))) #获取当前主题的最新偏移量
for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))12345678910111213

7、消费者(手动拉取消息)

from kafka import KafkaConsumer
import timeconsumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))
while True:msg = consumer.poll(timeout_ms=5)   #从kafka获取消息print(msg)time.sleep(2)12345678910

8、消费者(消息挂起与恢复)

# ==============消息恢复和挂起===========from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import timeconsumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))  # pause执行后,consumer不能读取,直到调用resume后恢复。
num = 0
while True:print(num)print(consumer.paused())   #获取当前挂起的消费者msg = consumer.poll(timeout_ms=5)print(msg)time.sleep(2)num = num + 1if num == 10:print("resume...")consumer.resume(TopicPartition(topic='test', partition=0))print("resume......")
12345678910111213141516171819202122

pause执行后,consumer不能读取,直到调用resume后恢复。

下面是一个完整的demo

from kafka import KafkaConsumer# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers=['localhost:9092'])
for message in consumer:# message value and key are raw bytes -- decode if necessary!# e.g., for unicode: `message.value.decode('utf-8')`print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers='my.server.com')
12345678910111213141516171819202122232425262728293031323334353637

4. Python创建自定义的Kafka Topic

client = KafkaClient(bootstrap_servers=brokers)if topic not in client.cluster.topics(exclude_internal_topics=True):  # Topic不存在request = admin.CreateTopicsRequest_v0(create_topic_requests=[(topic,num_partitions,-1,  # replication unset.[],  # Partition assignment.[(key, value) for key, value in configs.items()],  # Configs)],timeout=timeout_ms)future = client.send(2, request)  # 2是Controller,发送给其他Node都创建失败。client.poll(timeout_ms=timeout_ms, future=future, sleep=False)  # 这里result = future.value# error_code = result.topic_error_codes[0][1]print("CREATE TOPIC RESPONSE: ", result)  # 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTSclient.close()
else:  # Topic已经存在print("Topic already exists!")return1234567891011121314151617181920212223242526

kafka的配置
在kafka/config/目录下面有3个配置文件:

producer.properties

consumer.properties

server.properties

kafka的配置分为 broker(server.properties)、producter(producer.properties)、consumer(consumer.properties)三个不同的配置

4.1 BROKER 的全局配置

最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。

------------------------------------------- 系统 相关 -------------------------------------------
##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id =1##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs##提供给客户端响应的端口
port =6667##消息体的最大大小,单位是字节
message.max.bytes =1000000## broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads =3## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads =8## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
background.threads =4## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制
queued.max.requests =500##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究
advertised.host.name## 广告地址端口,必须不同于port中的设置
advertised.port## socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes =100*1024## socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes =100*1024## socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes =100*1024*1024------------------------------------------- LOG 相关 -------------------------------------------
## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes =1024*1024*1024## 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖
log.roll.hours =24*7## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete## 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.minutes=7days指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes 。-1没有大小限制
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.bytes=-1## 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes## 是否开启日志压缩
log.cleaner.enable=false## 日志压缩运行的线程数
log.cleaner.threads =1## 日志压缩时候处理的最大大小
log.cleaner.io.max.bytes.per.second=None## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好
log.cleaner.dedupe.buffer.size=500*1024*1024## 日志清理时候用到的IO块大小 一般不需要修改
log.cleaner.io.buffer.size=512*1024## 日志清理中hash表的扩大因子 一般不需要修改
log.cleaner.io.buffer.load.factor =0.9## 检查是否处罚日志清理的间隔
log.cleaner.backoff.ms =15000## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.min.cleanable.ratio=0.5## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.size.max.bytes =10*1024*1024## 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.index.interval.bytes =4096## log文件"sync"到磁盘之前累积的消息条数
## 因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
## 物理server故障,将会导致没有fsync的消息丢失.
log.flush.interval.messages=None## 检查是否需要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000## 仅仅通过interval来控制消息的磁盘写入时机,是不足的.
## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
## 达到阀值,也将触发.
log.flush.interval.ms = None## 文件在索引中清除后保留的时间 一般不需要去修改
log.delete.delay.ms =60000## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000------------------------------------------- TOPIC 相关 -------------------------------------------
## 是否允许自动创建topic ,若是false,就需要通过命令创建topic
auto.create.topics.enable =true## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor =1## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions =1实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。----------------------------------复制(Leader、replicas) 相关 ----------------------------------
## partition leader与replicas之间通讯时,socket的超时时间
controller.socket.timeout.ms =30000## partition leader与replicas数据同步时,消息的队列尺寸
controller.message.queue.size=10## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.time.max.ms =10000## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
## 到其他follower中.
## 在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.lag.max.messages =4000##follower与leader之间的socket超时时间
replica.socket.timeout.ms=30*1000## leader复制时候的socket缓存大小
replica.socket.receive.buffer.bytes=64*1024## replicas每次获取数据的最大大小
replica.fetch.max.bytes =1024*1024## replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.wait.max.ms =500## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replica.fetch.min.bytes =1## leader 进行复制的线程数,增大这个数值会增加follower的IO
num.replica.fetchers=1## 每个replica检查是否将最高水位进行固化的频率
replica.high.watermark.checkpoint.interval.ms =5000## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.enable =false## 控制器关闭的尝试次数
controlled.shutdown.max.retries =3## 每次关闭尝试的时间间隔
controlled.shutdown.retry.backoff.ms =5000## 是否自动平衡broker之间的分配策略
auto.leader.rebalance.enable =false## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.per.broker.percentage =10## 检查leader是否不平衡的时间间隔
leader.imbalance.check.interval.seconds =300## 客户端保留offset信息的最大空间大小
offset.metadata.max.bytes----------------------------------ZooKeeper 相关----------------------------------
##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect = localhost:2181## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.session.timeout.ms=6000## ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms =6000## ZooKeeper集群中leader和follower之间的同步实际那
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219

4.2 CONSUMER 配置

最为核心的配置是group.id、zookeeper.connect

## Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,非常重要group.id## 消费者的ID,若是没有设置的话,会自增consumer.id## 一个用于跟踪调查的ID ,最好同group.id相同client.id = group id value## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置zookeeper.connect=localhost:2182## zookeeper的心跳超时时间,超过这个时间就认为是dead消费者zookeeper.session.timeout.ms =6000## zookeeper的等待连接时间zookeeper.connection.timeout.ms =6000## zookeeper的follower同leader的同步时间zookeeper.sync.time.ms =2000## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常auto.offset.reset = largest## socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms.socket.timeout.ms=30*1000## socket的接受缓存空间大小socket.receive.buffer.bytes=64*1024##从每个分区获取的消息大小限制fetch.message.max.bytes =1024*1024## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offsetauto.commit.enable =true## 自动提交的时间间隔auto.commit.interval.ms =60*1000## 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值queued.max.message.chunks =10## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
## 此值用于控制,注册节点的重试次数.rebalance.max.retries =4## 每次再平衡的时间间隔rebalance.backoff.ms =2000## 每次重新选举leader的时间refresh.leader.backoff.ms## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求fetch.min.bytes =1## 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间fetch.wait.max.ms =100## 指定时间内没有消息到达就抛出异常,一般不需要改consumer.timeout.ms = -112345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364

4.3 PRODUCER 的配置

比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class

## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vipmetadata.broker.list##消息的确认模式##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP##1:发送消息,并会等待leader 收到确认后,一定的可靠性## -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性request.required.acks =0## 消息发送的最长等待时间request.timeout.ms =10000## socket的缓存大小send.buffer.bytes=100*1024## key的序列化方式,若是没有设置,同serializer.classkey.serializer.class## 分区的策略,默认是取模partitioner.class=kafka.producer.DefaultPartitioner## 消息的压缩模式,默认是none,可以有gzip和snappycompression.codec = none## 可以针对默写特定的topic进行压缩compressed.topics=null## 消息发送失败后的重试次数message.send.max.retries =3## 每次失败后的间隔时间retry.backoff.ms =100## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据topic.metadata.refresh.interval.ms =600*1000## 用户随意指定,但是不能重复,主要用于跟踪记录消息client.id=""------------------------------------------- 消息模式 相关 -------------------------------------------## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送producer.type=sync## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送queue.buffering.max.ms =5000## 异步的模式下 最长等待的消息数queue.buffering.max.messages =10000## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃queue.enqueue.timeout.ms = -1## 异步模式下,每次发送的最大消息数,前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制batch.num.messages=200## 消息体的系列化处理类 ,转化为字节流进行传输serializer.class= kafka.serializer.DefaultEncoder
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657

有兴趣可以关注我的微信公众号“自动化测试全栈”,微信号:QAlife,学习更多自动化测试技术。

也可加入我们的自动化测试技术交流群,QQ群号码:301079813

主要探讨loadrunner/JMeter测试、Selenium/RobotFramework/Appium自动化测试、接口自动化测试,测试工具等测试技术,让我们来这里分享经验、交流技术、结交朋友、拓展视野、一起奋斗!

参考:

https://kafka-python.readthedocs.io/en/master/index.html

使用Python读写kafka相关推荐

  1. python读写kafka集群(转载+自己验证)

    ###############################版本信息######################################### 组件/系统 版本 Python 3.6 Kaf ...

  2. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  3. python使用kafka原理详解_Python操作Kafka原理及使用详解

    Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...

  4. python读写压缩文件使用gzip和bz2

    python读写压缩文件使用gzip和bz2 #读取压缩文件 # gzip compression import gzip with gzip.open('somefile.gz', 'rt') as ...

  5. python读写二进制文件(读写字节数据)

    python读写二进制文件(读写字节数据) 你想读写二进制文件,比如图片,声音文件等就是常见的二进制文件. 使用模式为 rb 或 wb 的 open() 函数来读取或写入二进制数据.比如: # Rea ...

  6. python 读写 csv

    python 读写 csv 列表写入csv # 列表写入csv import csvheaders = ['列1', '列2', '列3', '列4', '列5']rows = [["1行1 ...

  7. python读写csv时中文乱码问题解决办法

    参考1 参考2 参考3 CSV是英文Comma Separate Values(逗号分隔值)的缩写,顾名思义,文档的内容是由 "," 分隔的一列列的数据构成的,可以使用excel和 ...

  8. python输出csv文件中文乱码-python读写csv时中文乱码问题解决办法

    CSV是英文Comma Separate Values(逗号分隔值)的缩写,顾名思义,文档的内容是由 "," 分隔的一列列的数据构成的,可以使用excel和文本编辑器等打开.CSV ...

  9. python文件对象提供了3个读方法、分别是-Python读写文件模式和文件对象方法实例详解...

    本文实例讲述了Python读写文件模式和文件对象方法.分享给大家供大家参考,具体如下: 一. 读写文件模式 利用open() 读写文件时,将会返回一个 file 对象,其基本语法格式如: open ( ...

最新文章

  1. Erlang OTP学习(3) supervisor
  2. linux tcp在传输数据的时候断网了_选择最合适的协议 让传输数据更灵敏
  3. Java常用接口与类——main方法/Object类/Scanner类
  4. html5做一个相册_HTML5最新版本介绍
  5. 【目标检测】单阶段算法--YOLOv3详解
  6. call()与apply()的区别与作用
  7. AI部署从EonStor GSi存储解决方案开始
  8. 铁子们,2019博客之星投票活动开始了!帮我投个票呗
  9. k8s集群部署八(DNS服务发现)
  10. Executor与线程池
  11. puppet的使用:ERB模板介绍
  12. pywebview:使用python构建桌面客户端应用
  13. 跨境电商还有发展前景吗?跨境电商应该怎么运营?
  14. 最短路径模板+解析——(FLoyd算法)
  15. matlab安装到U盘,matlab u盘便携移动版
  16. 基于PIE-Engine 监测黄海海域浒苔绿潮发展过程
  17. JavaScript中linux时间戳与日期的转换
  18. 手机如何批量导入通讯录,批量删除通讯录?
  19. win10消息推送服务器,win10怎么对更新的推送消息进行设定
  20. MongoDB 默认端口

热门文章

  1. 单靠MySQL进了字节,高端玩法才是王道!
  2. 面试必备,各种技术知识集大成之项目~
  3. 每日一皮:当你要下班的时候,突然测试叫住了你...
  4. 有比 ReadWriteLock更快的锁?
  5. Java 必会的 9 大技能,我请部门大神给你讲讲
  6. ThreadPoolExecutor 的八种拒绝策略 | 含番外!
  7. 使用 Contour 接管 Kubernetes 的南北流量
  8. 「图解」ThreadLocal 在并发问题中的应用
  9. css文字背景虚化,通过实现背景模糊、文字颜色流光渐变、边框扩展等效果学习transition、transform、@keyframes等属性及伪元素的使用...
  10. chemdraw怎么画拐弯的箭头_性感皮衣皮裤的质感服装该怎么画?