xxx系列文章

Zookeeper及Kafka系列(1)―ZooKeeper集群搭建(四节点)

Zookeeper及Kafka系列(2)―Kafka集群搭建(四节点)


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

    • xxx系列文章
  • 前言
  • 一、Kafka基本概念
    • 1. 版本
    • 2.Broker
    • 3.分区
    • 4.副本
    • Kafka数据持久化
    • 消费者组的理解
    • Broker
  • 消费者位移:
  • 消息位移:
  • 二、Kafka安装概要流程
    • 1.节点分配
    • 2.安装流程
      • 2.1 记住zk的目录下内容
      • 2.2 修改Kafka配置文件
      • 2.3 配置kafka环境变量
      • 2.4 不同节点同步数据
  • 三、常规操作及使用
    • 1.启动流程
    • 2.常规使用
      • 2.1kafka脚本
      • 2.2 创建topic
      • 2.3 消息的产生消费
      • 根据key推送消息
      • consumer获取消息的方式,是推送还是拉取?
      • 粒度,拉取一条还是一批?
      • 拉取一批数据,既有key为A的,也有key为B的,如何处理,怎么更新/持久化offset?
  • 四、Kafka分区策略
    • 1.轮询
    • 2.随机
    • 3.按消息键key保存
  • Kafka的java客户端使用TCP
  • 拦截器
  • 重要的生产者参数
  • 重要的消费者参数
  • 学习过程中如何清除zookeeper及kafka数据使其初始化
    • 1.zookeeper的清理流程
    • 2.kafka的清理流程
  • 总结
  • P.S.
    • Kafka的config目录
    • kafka原配置文件server.properties
    • Kafka的producer.properties配置文件
    • Kafka的consumer.properties配置文件

前言

继续上节的zookeeper集群搭建后开始搭建kafka集群

参考链接:
核心:

Kafka中文文档
Kafka官网

其他blog:

Kafka版本号 – 傻傻搞不清楚
kafka副本机制详解
Kafka 核心技术与实战
干货总结!Kafka 面试大全(万字长文,37 张图,28 个知识点)


一、Kafka基本概念

1. 版本

2.7版本用于测试
本集群使用版本为2.4.0
Kafka 的版本命名,这么长一串,其实呢,前面的版本号是编译 Kafka 源代码的 Scala 编译器版本,真正的版本号其实是后边的 2.4.0
Kafka 服务器端的代码完全由 Scala 语言编写,Scala 同时支持面向对象编程和函数式编程,用 Scala 写成的源代码编译之后也是普通的“.class”文件,因此我们说 Scala 是 JVM 系的语言,它的很多设计思想都是为人称道的。

2.Broker

Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。虽然多个 Broker 进程能够运行在同一台机器上,但更常见的做法是将不同的 Broker 分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有 Broker 进程都挂掉了,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一。

3.分区

Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。如你所见,Kafka 的分区编号是从 0 开始的,如果 Topic 有 100 个分区,那么它们的分区号就是从 0 到 99。

讲到这里,你可能有这样的疑问:刚才提到的副本如何与这里的分区联系在一起呢?实际上,副本是在分区这个层级定义的。每个分区下可以配置若干个副本,其中只能有 1 个领导者副本和 N-1 个追随者副本。生产者向分区写入消息,每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、2、…、9。

4.副本

kafka副本定义

在kafka中一个主题下面可以有多个分区(partition),每个分区(partition)可以有多个副本,所以kafka的副本的维度是以分区为维度进行划分的;同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。在生产环境中,每个分区的副本分布在不同的机器上,这样即使某些机器出现故障,集群依然可用。


副本的角色定义
在kafka中副本是有一个leader节点和多个follower节点组成,leader节点负责接收消息和消费消息,follower既不提供写服务也不提供读服务,仅仅用于同步leader副本的消息。follower副本的唯一作用就是当leader副本出现问题时,通过ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。
副本的工作机制
副本的工作机制也很简单:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。

kafka这样设计好处

方便实现读写一致:因为只在leader副本上进行读写操作,所以生产者写入什么消息,消费者就能读到什么消息,消费者不会从follower上进行读取操作,避免了主从同步过程中的延迟问题。
方便实现单调读(Monotonic Reads):在进行多次消费时,不会存在某条消息一会存在,一会消失的情况。如果2 个追随者副本 F1 和 F2,它们异步地拉取领导者副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息。
kafka不像Mysql那样提供读写分离,是因为kakfa本身的设计是通过分区来进行分担集群读写的压力,从架构层面可以很好的进行水平扩展,提供读写分离是在一定程序软件架构设计的不足,才需要进行读写分离,来分担集群的读写压力;更多的是一种弥补方法。

Kafka数据持久化

总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

消费者组的理解

如何理解Kafka的消费者组?
Kafka 为什么使用消费者组?

Broker

kafka核心概念和角色
Kafka中的Broker选主与副本同步机制

kafka集群的server,一台kafka服务器节点就是一个broker,负责处理消息读、写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker

一个集群由多个broker组成。一个broker可以容纳多个topic。
broker是组成kafka集群的节点,broker之间没有主从关系,各个broker之间的协调依赖于zookeeper,如数据在哪个节点上之类的
Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。

Controller的管理工作都是依赖于Zookeeper的。

控制器:
在启动Kafka集群时,每个代理(Broker)都会实例化并启动一个KafkaController,并将该代理的brokerId注册到Zookeeper的相应节点中。Kafka集群中各代理会根据选举机制选出其中一个代理作为Leader,即Leader控制器,当Leader控制器宕机后其他代理再次竞选出新的控制器。
控制器作用:负责主题的创建与删除、分区和副本的管理以及代理故障转移处理等。admin-api
新版本的kafka创建producer和consumer可以使用broker节点,
老版本producer使用zookeeper节点获取broker节点,新版本迁移元数据到broker,连接broker减少对zookeeper的网卡影响,在业务层次上不依赖zookeeper
但admin-api无论新版本还是旧版本都得使用zookeeper节点(admin要通过zookeeper找到controller节点)

消费者位移:

Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。

消息位移:

Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。

二、Kafka安装概要流程

1.节点分配

node01: 192.168.204.131
node02: 192.168.204.132
node03: 192.168.204.133
node04: 192.168.204.134

kafka: node01-node03
zookeeper: node02-node04

2.安装流程

2.1 记住zk的目录下内容

2.2 修改Kafka配置文件

# 依次递增
borker.id=0
#listeners默认被注释调
listeners=PLAINTEXT://:9092
log.dirs=/usr/local/kafka_2.12-2.4.0/logs
# 不要直接配根目录,最好一个项目一个一级目录
zookeeper.connect=node04:2181,node02:2182,node03:2182/kafkalearn

2.3 配置kafka环境变量

vim /etc/profile
$PATH表示在原先环境变量的基础上增加新的环境变量
冒号是分隔符

#JDK配置
JAVA_HOME=/usr/local/java/jdk1.8.0_291
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/binexport PATH JAVA_HOME CLASSPATH
# zookeeper环境变量配置
export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.6.3-bin
export PATH=$PATH:$ZOOKEEPER_HOME/bin#kafka环境变量配置
export KAFKA_HOME=/usr/local/kafka_2.12-2.4.0/
export PATH=$PATH:$KAFKA_HOME/bin

2.4 不同节点同步数据

scp命令同步profile和server.properties

  第一段   第二段

1.xxx
  内容关键字,内容。

2.xxx
 内容关键字,关键字onApplicationEvent,内容。

  第三段关键字
  第四段

三、常规操作及使用

1.启动流程

先启动zookeeper

zkServer.sh start

查看zookeeper是否运行

zkServer.sh status

再启动kafka

kafka-server-start.sh  /usr/local/kafka_2.12-2.4.0/config/server.properties
后台启动:
kafka-server-start.sh  -daemon /usr/local/kafka_2.12-2.4.0/config/server.properties

zookeeper变化

zkCli.sh


查看集群信息

查看controller信息

查看broker信息

2.常规使用

2.1kafka脚本

2.2 创建topic

 kafka-topics.sh --zookeeper node02:2181,node03:2181/kafkalearn --create --topic kafkatopic --partitions 2 --replication-factor 2


删除topic:

kafka-topics.sh --zookeeper node02:2181,node03:2181/kafkalearn --delete --topic kafkatopic

显示topic信息

 kafka-topics.sh --zookeeper node02:2181,node03:2181/kafkalearn --listkafka-topics.sh --zookeeper node02:2181,node03:2181/kafkalearn --describe --topic kafkatopic

增删改查都压在leader,从只是增加他的数据可靠性

分区不能水平扩展

创建topic后zookeeper发生的变化(我在第二次创建topic的时候把分区数设置为了3)

2.3 消息的产生消费

创建消费者

kafka-console-consumer.sh  --bootstrap-server node01:9092,node02:9092 --topic kafkatopic  --group kafkagroup

创建生产者

kafka-console-producer.sh --broker-list node03:9092 --topic kafkatopic

同时开启三个consumer(上面的命令),producer生产的消息会被三个consumer随机消费
原因:
1.指定了相同的消费者组(- -group kafkagroup
2.Kafka命令行创建的生产者默认发送消息策略是随机,见末尾consumer.properties配置文件

# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=

查看消费者组信息

 kafka-consumer-groups.sh --bootstrap-server node02:9092 --listkafka-consumer-groups.sh --bootstrap-server node02:9092 --describe  --group kafkagroup


可以看到topic多了一个__consumer_offsets,这是kafka提供的维护消费offset的topic

根据key推送消息

consumer获取消息的方式,是推送还是拉取?

拉取(rocktmq也是拉取)

原因:
推送说的是server,主动去推送,会导致网卡打满
拉取说的是consumer,自主按需去订阅拉取server的数据

粒度,拉取一条还是一批?

批次优于单条,所有kafka选择的是批次
消费数据的时候是单线程还是多线程

拉取一批数据,既有key为A的,也有key为B的,如何处理,怎么更新/持久化offset?

场景1:单线程,事务方式处理,一条一条处理,来更新offset,比较精准,不丢失数据,不重复消费,但速度处理慢,资源浪费
场景2:多线程,并行处理,有一条处理失败,按条?还是按批维护?

四、Kafka分区策略

需要配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置partitioner.class参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。虽说可以有无数种分区的可能,但比较常见的分区策略也就那么几种,下面我来详细介绍一下。

1.轮询

轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。

2.随机

3.按消息键key保存

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。

Kafka的java客户端使用TCP

Apache Kafka 的所有通信都是基于 TCP 的,而不是基于 HTTP 或其他协议。无论是生产者、消费者,还是 Broker 之间的通信都是如此。

拦截器

kafka拦截器有两种,一种是生产者拦截器、一种是消费者拦截器。

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。

重要的生产者参数

重要的生产者参数

  1. acks:用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。

acks = 1, 默认值即为1.生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本,比如在leader副本崩溃、重新选举新的leader副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。
acks = 0, 生产者发送消息之后不需要等待任何服务的相应。如果在消息从发送到写入kafka的过程中出现某些异常,导致kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks设置为0可以达到最大的吞吐量。
acks = -1 或acks = all。 生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够对来自服务端的成功响应。在其他配置环境相同的情况下,acks设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks = 1 的情况。要获得更高的消息可靠性需要配合min.insync.replicas等参数的联动。
2. max.request.size: 用来限制生产者客户端能发送的消息的最大值,默认为1048576B,即1MB。

  1. retries和retry.bakcoff.ms: retries参数用来配置生产者重试的次数。默认值为0,即在发生异常的时候不进行任何重试动作。retry.backoff.ms默认值为100,用来设置两次重试之间的时间间隔。避免无效的频繁重试。

  2. compression.type:这个参数用来指定消息的压缩方式,默认值为“none”,默认情况下,消息不会被压缩。可配置为“gzip”、“snappy”、和“lz4”。

  3. connections.max.idle.ms: 用来指定在多久之后关闭闲置的连接,默认值是540000(ms),及9分钟。

  4. linger.ms: 用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认值为0.生产者客户端会在ProducerBatch被填满或等待时间超过linger.ms值时发送出去。

  5. receive.buffer.bytes: 用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为131072(B),即32KB。

  6. send.buffer.bytes: 用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即32KB。

  7. request.timeout.ms: 用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择重试。注意: 这个参数需要比broker端参数raplica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。

重要的消费者参数

学习过程中如何清除zookeeper及kafka数据使其初始化

1.zookeeper的清理流程

彻底删除Kafka和zookeeper数据
zookeeper 清理数据

2.kafka的清理流程

kafka清空全部数据
kafka全部数据清空与某一topic数据清空

总结

P.S.

Kafka的config目录

核心文件四个
consumer
producer
server
zookeeper

kafka原配置文件server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=0############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

Kafka的producer.properties配置文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details############################# Producer Basics ############################## list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

Kafka的consumer.properties配置文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092# consumer group id
# test-consumer-group是默认的消费者组
group.id=test-consumer-group# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=

Kafka集群搭建(四节点)相关推荐

  1. 转-Kafka【第一篇】Kafka集群搭建

    转自: https://www.cnblogs.com/luotianshuai/p/5206662.html Kafka[第一篇]Kafka集群搭建 Kafka初识 1.Kafka使用背景 在我们大 ...

  2. kafka集群搭建(消息)

    1.Kafka使用背景 在我们大量使用分布式数据库.分布式计算集群的时候,是否会遇到这样的一些问题: 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位 我想对用户的搜索关键词进行 ...

  3. zookeeper 和 kafka 集群搭建

    Kafka初识 1.Kafka使用背景 在我们大量使用分布式数据库.分布式计算集群的时候,是否会遇到这样的一些问题: 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位 我想对用户 ...

  4. Zookeeper+Kafka集群搭建

    Zookeeper集群搭建 Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群. 1.软件环境 (3台服务器-我的测试) 192.168.30.204 server1 ...

  5. kafka 集群服役新节点

    前言 在生产环境下,kafka集群中发现遇到了性能瓶颈,主要体现在现有的集群下,某个topic的分区存储不够用了,生产者生产消息量太大,或者消费端消费速度跟不上生产进度,导致分区数据大量堆积: 在这种 ...

  6. kafka集群搭建教程(使用自带的zookeeper)

    kafka集群搭建教程(使用自带的zookeeper) 一.kafka简介 二.kafka名词解释 三.zookeeper与Kafka 四.kafka集群搭建前准备 1.下载 2.检验jdk 3.系统 ...

  7. Kafka集群搭建实战

    Kafka集群搭建实战 集群结构: 集群环境信息: 192.168.157.130 192.168.157.131 192.168.157.129 Zookeeper集群搭建 三台机器上均安装JDK( ...

  8. Windows下Kafka集群搭建

    一.Windows下Zookeeper集群搭建. 1.集群版本:2.8.1,3.0版本不再支持JDK8,不在需要Zookeeper. 2.Zookeeper版本:3.8.0. 3.Zookeeper三 ...

  9. 云计算大数据之 Kafka集群搭建

    云计算大数据之 Kafka集群搭建 版权声明: 本文为博主学习整理原创文章,如有不正之处请多多指教. 未经博主允许不得转载.https://blog.csdn.net/qq_42595261/arti ...

  10. Zookeeper+Hadoop+Hbase+Hive+Kylin+Nginx集群搭建四(Hbase篇)

    Zookeeper+Hadoop+Hbase+Hive+Kylin+Nginx集群搭建四(Hbase篇) 五.Hbase集群搭建 1.准备工作(下载与时间同步) 2.安装解压 3.环境变量配置 4.文 ...

最新文章

  1. vbs脚本读写INI文件
  2. opengl加载显示3D模型lwx类型文件
  3. 怎么用vc采集ni卡数据_8bit,200MS/s 低成本模拟输入高速采集卡FCFR-PCI9850
  4. embedding_Keras嵌入层
  5. ProtoBuf3语法指南(Protocol Buffers)_下
  6. python 结构体数组_关于python:将结构化数组转换为常规NumPy数组
  7. #0的作用及time slot的划分
  8. debian 安装五笔输入法
  9. 康佳电视维修记 LED55M5580AF
  10. 服务器虚拟化怎么配置,教你如何配置服务器虚拟化环境
  11. 听说程序员普遍缺乏数据结构,惊得我熬了一宿滋味浓郁的数据结构,滋一口回味悠长(建议珍藏)
  12. 豆瓣电影top250信息爬取
  13. 2020计算机应用基础终结性考试,2019-2020年电大考试《计算机应用基础》形成性考核.docx-文档在线预览...
  14. 最全音视频基础知识学习详解(含多个干货链接)
  15. 计算机中什么符号代表除号,电脑怎么打除号?word除号怎么打出来?键盘上÷号是哪个键?除以符号电脑怎么打?...
  16. C语言实现字母的大写转换成小写
  17. STM32片内FLASH烧写错误导致ST-LINK烧录不进程序的问题解决过程
  18. Python基础——循环语句
  19. URULE库文件的使用
  20. 期末学生HTML个人网页作业作品~蓝色的异清轩响应式个人博客模板源码~bootstrap响应式博客网站模板html

热门文章

  1. uiuc大学计算机排名2019,2019年QS世界大学排名伊利诺伊大学香槟分校排名第71
  2. 机器学习6scikit-learn中的scaler
  3. Spring Boot 错误页配置
  4. Windows更新后双系统引导消失manjaro启动项丢失修复
  5. windows2008 RD授权管理器下空的没服务器(授权管理器无法链接到许可证服务器)问题处理...
  6. COVID-19 AI-related Technical Paper
  7. 纵横职场20条黄金法则,知人善用的五个标准,李嘉诚14句经典财富格言
  8. Apache架设代理服务器
  9. Android护眼功能
  10. EXCEL折线图修改横坐标间隔