如有侵权,私信立删
修改时间:2020年3月30日
作者:pp_x
邮箱:pp_x12138@163.com

文章目录

  • Kafka集群搭建
    • 虚拟机中搭建 Kafka 集群
      • 准备工作
      • Zookeeper 集群搭建
      • 安装Kafka
        • 上传 Kafka 安装包并解压
        • 修改kakfa核心配置文件
        • 将配置好的kafka分发到其他二台主机
        • 启动 Kafka 集群
    • Dockers容器中安装kafka集群
    • 准备工作
    • 安装 docker - compose
    • 拉取镜像
    • 创建集群网络
      • 可能会出现的问题及解决方案
    • 搭建过程
  • kafka的基本使用
    • 在 docker 环境中操作
      • 创建Topic
      • 查看主题
      • 生产者生产数据
      • 消费者消费
      • 运行 describe 的命令
      • 增加 topic 分区数
      • 增加配置
      • 删除配置
      • 删除 topic
    • Java API 使用Kafka
    • 创建Maven工程导入相应的依赖
    • 生产者代码
    • 消费者
  • Apache Kafka 原理
    • 分区副本机制
    • Kafka保证数据不丢失机制
      • 生产者角度
      • 消费者角度
    • 消息存储及查询机制
    • 消息存储机制及查询机制
      • 存储机制
      • 通过 offset 查找 message
    • 生产者消息分发策略
    • 消费者负载均衡机制
    • kakfa配置文件说明

Kafka集群搭建

  • ZooKeeper 作为给分布式系统提供协调服务的工具被 Kafka 所依赖。在分布式系统中,消费者需要知道有哪些生产者是可用的,而如果每次消费者都需要和生产者建立连接并测试是否成功连接,那效率也太低了,显然是不可取的。而通过使用 ZooKeeper 协调服务,Kafka 就能将 Producer,Consumer,Broker 等结合在一起,同时借助 ZooKeeper,Kafka 就能够将所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现负载均衡。

虚拟机中搭建 Kafka 集群

准备工作

  • 环境准备:准备三台服务器,安装 jdk1.8,其中每一台 Linux 服务器的 hosts 文件中都需要配置如下的内容。如何准备自行百度
192.168.200.11
192.168.200.12
192.168.200.13
  • 安装目录创建:
创建各级目录命令:
mkdir -p /export/servers/  # 安装包存放的目录
mkdir -p /export/software/  # 安装程序存放的目录
mkdir -p /export/data/  # 数据目录
mkdir -p /export/logs/  # 日志目录
  • 修改 host:
  • vim /etc/hosts (没有vim工具的虚拟机可以使用vi)
192.168.200.11   node1
192.168.200.12   node2
192.168.200.13   node3
  • 执行命令 /etc/init.d/network restart 重启 hosts。
  • 执行命令 cat /etc/hosts 可以查看到 hosts 文件修改成功。

Zookeeper 集群搭建

  • 三台Linxu安装JDK(jdk自行下载,此处演示rpm格式)
# 使用 rpm 安装 JDK
rpm -ivh jdk-8u261-linux-x64.rpm# 默认的安装路径是 /usr/java/jdk1.8.0_261-amd64
# 配置 JAVA_HOME
vi /etc/profile# 文件最后添加两行
export JAVA_HOME=/usr/java/jdk1.8.0_261-amd64
export PATH=$PATH:$JAVA_HOME/bin# 退出 vi,使配置生效
source /etc/profile
  • 验证
java -version
  • Linux 安装 Zookeeper,三台 Linux 都安装,以搭建 Zookeeper 集群。
  • zookeeper安装包自行准备此处以zookeeper-3.4.14.tar.gz为例
  • 解压并配置 Zookeeper(配置 data 目录,集群节点)。
# node1操作# 解压到/opt 目录
tar -zxf zookeeper-3.4.14.tar.gz -C /opt# 配置
cd /opt/zookeeper-3.4.14/conf# 配置文件重命名后生效
cp zoo_sample.cfg zoo.cfg#编辑
vi zoo.cfg# 设置数据目录
dataDir=/var/lagou/zookeeper/data
# 添加配置 Zookeeper 集群节点
server.1=node1:2881:3881
server.2=node2:2881:3881
server.3=node3:2881:3881# 退出 vim
mkdir -p /var/lagou/zookeeper/data
echo 1 > /var/lagou/zookeeper/data/myid# 配置环境变量
vi /etc/profile# 添加
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log# 退出 vim,让配置生效
source /etc/profile
  • node2配置
其他相同
echo 1 > /var/lagou/zookeeper/data/myid
  • node3配置
其他相同
echo 3 > /var/lagou/zookeeper/data/myid
  • 启动zookeeper
# 在三台 Linux 上启动 Zookeeper
[root@node1 ~]# zkServer.sh start
[root@node2 ~]# zkServer.sh start
[root@node3 ~]# zkServer.sh start# 在三台 Linux 上查看 Zookeeper 的状态
[root@node1 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower[root@node2 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: leader[root@node3 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower

安装Kafka

  • 安装包自行准备,此处以2.11为例

上传 Kafka 安装包并解压

 切换目录上传安装包cd /export/software用ftp工具上传即可解压安装包到指定目录下
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
cd /export/servers/重命名
mv kafka_2.11-1.0.0 kafka

修改kakfa核心配置文件

cd   /export/servers/kafka/config/
vi   server.properties# 主要修改一下 6 个地方:
#    1) broker.id            需要保证每一台 kafka 都有一个独立的 broker
#    2) log.dirs             数据存放的目录
#    3) zookeeper.connect    zookeeper 的连接地址信息
#    4) delete.topic.enable  是否直接删除 topic
#    5) host.name            主机的名称
#    6) 修改: listeners=PLAINTEXT://node1:9092# broker.id 标识了 kafka 集群中一个唯一 broker
broker.id=0
listeners=PLAINTEXT://node1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600# 存放生产者生产的数据,数据一般以 topic 的方式存放
log.dirs=/export/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000# zk 的信息
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0delete.topic.enable=true
host.name=node1

将配置好的kafka分发到其他二台主机

cd /export/servers
scp -r kafka/ node2:$PWD
scp -r kafka/ node3:$PWD
  • Linux scp 命令用于 Linux 之间复制文件和目录。
  • scp 是 secure copy 的缩写, scp 是 linux 系统下基于 ssh 登陆进行安全的远程文件拷贝命令
  • $PWD是当前绝对路径
  • 拷贝后, 需要修改每一台的broker.idhost.name和 listeners
ip为11的服务器: broker.id=0 , host.name=node1 listeners=PLAINTEXT://node1:9092
ip为12的服务器: broker.id=1 , host.name=node2 listeners=PLAINTEXT://node2:9092
ip为13的服务器: broker.id=2 , host.name=node3 listeners=PLAINTEXT://node3:9092
  • 在每一台的服务器执行创建数据文件的命令
mkdir -p /export/data/kafka

启动 Kafka 集群

  • 前提是zookeeper要启动起来
cd /export/servers/kafka/bin # 前台启动
./kafka-server-start.sh /export/servers/kafka/config/server.properties# 后台启动
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &
# 注意:可以启动一台 broker,单机版。也可以同时启动三台 broker,组成一个 kafka 集群版# kafka 停止
./kafka-server-stop.sh
  • 登录的前提是,通过 jps 是可以看到 Kafka 的进程。
  • 登录 zookeeper /opt/zookeeper-3.4.14/bin/zkCli.sh;然后执行 ls /brokers/ids,可以看到输出了[0,1,2]

Dockers容器中安装kafka集群

hostname ip 映射端口 监听
zoo1 192.168.0.11 2184:2181
zoo2 192.168.0.12 2185:2181
zoo3 192.168.0.13 2186:2181
kafka1 192.168.0.14 9092:9092
kafka2 192.168.0.15 9093:9092 kafka1
kafka3 192.168.0.16 9094:9092 kafka1
kafka-manager 192.168.0.17 9090:9090 kafka1
宿主机 192.168.200.20

准备工作

  • 克隆VM,修改IP地址为192.168.200.20
  • 修改网络配置:vi /etc/sysconfig/network-scrpits/ifcfg-ens33
TYPE=Ethernet
PROXY_METHOD=none
BROWSER_ONLY=no
BOOTPROTO=static
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=ens33
UUID=b8fd5718-51f5-48f8-979b-b9f1f7a5ebf2
DEVICE=ens33
ONBOOT=yes
IPADDR=192.168.200.20
GATEWAY=192.168.200.2
NETMASK=255.255.255.0
NM_CONTROLLED=no
DNS1=8.8.8.8
DNS2=8.8.4.4

安装 docker - compose

  • Compose 是用于定义和运行多容器 Docker 应用程序的工具。
  • 如果还是使用原来的方式操作 docker,那么就需要下载三个镜像:Zookeeper、Kafka、Kafka-Manager,需要对 Zookeeper 安装三次并配置集群、需要对 Kafka 安装三次,修改配置文件,Kafka-Manager 安装一次,但是需要配置端口映射机器 Zookeeper、Kafka 容器的信息。
  • 但是引入 Compose 之后可以使用 yaml 格式的配置文件配置好这些信息,每个 image 只需要编写一个 yaml 文件,可以在文件中定义集群信息、端口映射等信息,运行该文件即可创建完成集群。
  • 通过 Compose,可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。
  • Compose 使用的两个步骤:
    • 使用 docker-compose.yml 定义构成应用程序的服务,这样它们可以在隔离环境中一起运行。
    • 执行 docker-compose up 命令来启动并运行整个应用程序。
# curl 是一种命令行工具,作用是发出网络请求,然后获取数据
curl -L https://github.com/docker/compose/releases/download/1.8.0/run.sh > /usr/local/bin/docker-compose# chmod(change mode)命令是控制用户对文件的权限的命令
chmod +x /usr/local/bin/docker-compose# 查看版本
docker-compose --version

拉取镜像

# 拉取 Zookeeper 镜像
docker pull zookeeper:3.4# 拉取 kafka 镜像
docker pull wurstmeister/kafka# 拉取 kafka-manager 镜像
docker pull sheepkiller/kafka-manager:latest

创建集群网络

  • 基于 Linux 宿主机而工作的,也是在 Linux 宿主机创建,创建之后 Docker 容器中的各个应用程序可以使用该网络。
# 创建
docker network create --driver bridge --subnet 192.168.0.0/24 --gateway 192.168.0.1 kafka# 查看
docker network ls

可能会出现的问题及解决方案

  • 新建网段之后可能会出现:WARNING: IPv4 forwarding is disabled. Networking will not work.
  • 解决方案
    • 在宿主机上执行 - echo "net.ipv4.ip_forward=1" >>/usr/lib/sysctl.d/00-system.conf
    • 重启 networkdocker 服务 systemctl restart network && systemctl restart docker

搭建过程

  • 每个镜像一个 yml 文件,Zookeeper、Kafka、Kafka-Manager 一个;编写 yml 文件。
  • docker-compose-zookeeper.yml
# 指定 compose 文件的版本
version: '2'
# 通过镜像安装容器的配置
services:zoo1:# 使用的镜像image: zookeeper:3.4# 当 Docker 重启时,该容器重启restart: always# 类似于在基于 Linux 虚拟机 Kafka 集群中 hosts 文件的值hostname: zoo1container_name: zoo1ports:# 端口映射- 2184:2181# 集群环境environment:# 当前 Zookeeper 实例的 idZOO_MY_ID: 1# 集群节点ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888# 使用的网络配置networks:kafka:ipv4_address: 192.168.0.11zoo2:image: zookeeper:3.4restart: alwayshostname: zoo2container_name: zoo2ports:- 2185:2181environment:ZOO_MY_ID: 2ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888networks:kafka:ipv4_address: 192.168.0.12zoo3:image: zookeeper:3.4restart: alwayshostname: zoo3container_name: zoo3ports:- 2186:2181environment:ZOO_MY_ID: 3ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888networks:kafka:ipv4_address: 192.168.0.13
networks:kafka:external:name: kafka
  • docker-compose-kafka.yml
version: '2'
services:kafka1:image: wurstmeister/kafkarestart: alwayshostname: kafka1container_name: kafka1privileged: trueports:- 9092:9092# 集群环境配置environment:KAFKA_ADVERTISED_HOST_NAME: kafka1KAFKA_LISTENERS: PLAINTEXT://kafka1:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092KAFKA_ADVERTISED_PORT: 9092KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181# 配置 Zookeeper 集群的地址external_links:- zoo1- zoo2- zoo3networks:kafka:ipv4_address: 192.168.0.14kafka2:image: wurstmeister/kafkarestart: alwayshostname: kafka2container_name: kafka2privileged: trueports:- 9093:9093environment:KAFKA_ADVERTISED_HOST_NAME: kafka2KAFKA_LISTENERS: PLAINTEXT://kafka2:9093KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093KAFKA_ADVERTISED_PORT: 9093KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181external_links:- zoo1- zoo2- zoo3networks:kafka:ipv4_address: 192.168.0.15kafka3:image: wurstmeister/kafkarestart: alwayshostname: kafka3container_name: kafka3privileged: trueports:- 9094:9094environment:KAFKA_ADVERTISED_HOST_NAME: kafka3KAFKA_LISTENERS: PLAINTEXT://kafka3:9094KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094KAFKA_ADVERTISED_PORT: 9094KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181external_links:- zoo1- zoo2- zoo3networks:kafka:ipv4_address: 192.168.0.16
networks:kafka:external:name: kafka
  • docker-compose-manager.yml
version: '2'
services:kafka-manager:image: sheepkiller/kafka-manager:latestrestart: alwayscontainer_name: kafka-managerhostname: kafka-managerports:- 9000:9000# 可以管理 zoo 集群和 kafka 集群environment:ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181KAFKA_BROKERS: kafka1:9092,kafka2:9092,kafka3:9092APPLICATION_SECRET: letmeinKM_ARGS: -Djava.net.preferIPv4Stack=truenetworks:kafka:ipv4_address: 192.168.0.17
networks:kafka:external:name: kafka
  • 将 yaml 文件上传到 Docker 宿主机中。
  • 开始部署:
# 使用命令:
# 参数说明:up 表示启动,-d 表示后台运行。
docker-compose up -d# 参数说明:  -f:表示加载指定位置的yaml文件
docker-compose -f /home/docker-compose-zookeeper.yml up -d
docker-compose -f /home/docker-compose-kafka.yml up -d
docker-compose -f /home/docker-compose-manager.yml up -d
  • 测试浏览器访问宿主机端口来访问kafka管理端

    • http://192.168.200.20:9000/

kafka的基本使用

在 docker 环境中操作

创建Topic

  • 创建一个名字为 test 的主题, 有一个分区,有三个副本。一个主题下可以有多个分区,每个分区可以用对应的副本。
# 登录到 Kafka 容器
docker exec -it a44b97cb4f00 /bin/bash# 切换到 bin 目录
cd opt/kafka/bin/# 执行创建
kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 3 --partitions 1 --topic test
  • create:新建命令
  • zookeeper:Zookeeper节点,一个或多个
  • replication-factor:指定副本,每个分区有三个副本。
  • partitions:1

查看主题

  • 查看kafka当中存在的主题
kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181
  • __consumer_offsets 这个topic是由kafka自动创建的,默认50个分区,存储消费位移信息(offset),老版本架构中是存储在Zookeeper中

生产者生产数据

  • Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群
  • 默认情况下,每行将作为单独的message发送
  • 运行 producer,然后在控制台输入一些消息以发送到服务器。
bash-4.4# kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test
>This is a new Message
>This is another new Message

消费者消费

bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test --from-beginning
This is a new Message
This is another new Message
  • 在使用的时候会用到 bootstrap-serverbroker-list 其实是实现一个功能,broker-list 是旧版本命令。
  • 确保消费者消费的消息是顺序的,需要把消息存放在同一个 topic 的同一个分区。
  • 一个主题多个分区,分区内消息有序。

运行 describe 的命令

  • 运行 describe 查看 topic 的相关详细信息。
# 查看 topic 主题详情,Zookeeper 节点写一个和全部写,效果一致
kafka-topics.sh --describe --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --topic test# 结果列表
Topic: test1    PartitionCount: 3       ReplicationFactor: 3    Configs: Topic: test1    Partition: 0    Leader: 1001    Replicas: 1001,1003,1002        Isr: 1001,1003,1002Topic: test1    Partition: 1    Leader: 1002    Replicas: 1002,1001,1003        Isr: 1002,1001,1003Topic: test1    Partition: 2    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1002,1001
  • 结果说明:这是输出的解释。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。有几个分区,下面就显示几行
  • Leader:是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
  • Replicas:显示给定 partiton 所有副本所存储节点的节点列表,不管该节点是否是 leader 或者是否存活。
  • Isr:副本都已同步的的节点集合,这个集合中的所有节点都是存活状态,并且跟 leader 同步。

增加 topic 分区数

bash-4.4# kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --alter --topic test --partitions 8
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

增加配置

  • flush.messages:此项配置指定时间间隔,强制进行 fsync 日志,默认值为 None。
  • 一般来说,建议不要设置这个值。此参数的设置需要在"数据可靠性"与"性能"之间做必要的权衡
  • 如果此值过大将会导致每次 fsync 的时间较长 (IO 阻塞)。
  • 如果此值过小将会导致 fsync 的次数较多,这也意味着整体的 client 请求有一定的延迟,物理 server 故障,将会导致没有 fsync 的消息丢失
bash-4.4# kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --config flush.messages=1
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.Going forward, please use kafka-configs.sh for this functionality
Updated config for topic test.

删除配置

bash-4.4# kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --delete-config flush.messages
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.Going forward, please use kafka-configs.sh for this functionality
Updated config for topic test.

删除 topic

  • 目前删除 topic 在默认情况只是打上一个删除的标记,在重新启动 kafka 后才删除。如果需要立即删除,则需要在 server.properties 文件中配置:delete.topic.enable=true(集群中的所有实例节点),一个主题会在不同的 Kafka 节点中分配分组信息和副本信息
bash-4.4# kafka-topics.sh --zookeeper zoo1:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Java API 使用Kafka

  • 修改Windows的Host文件:
192.168.200.20 kafka1
192.168.200.20 kafka2
192.168.200.20 kafka3

创建Maven工程导入相应的依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins>
</build>

生产者代码

public class ProducerDemo {// 定义主题public static String topic = "renda";public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.20:9092,192.168.200.20:9093,192.168.200.20:9094");// 网络传输, 对 key 和 value 进行序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 创建消息生产对象,需要从 properties 对象或者从 properties 文件中加载信息KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);try {while (true) {// 设置消息内容String msg = "Hello, " + new Random().nextInt(100);// 将消息内容封装到 ProducerRecord 中ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);kafkaProducer.send(producerRecord);System.out.println("Message Sent Successfully: " + msg);Thread.sleep(500);}} catch (Exception e) {e.printStackTrace();} finally {kafkaProducer.close();}}}

消费者

public class ConsumerDemo {public static String topic = "renda";public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.20:9092,192.168.200.20:9093,192.168.200.20:9094");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 指定组名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "renda-1");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 订阅消息kafkaConsumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println(String.format("topic: %s, offset: %d. msg: %s",record.topic(), record.offset(), record.value()));}}}}

Apache Kafka 原理

分区副本机制

  • Kafka 有三层结构:Kafka 有多个主题,每个主题有多个分区,每个分区又有多条消息
  • 分区机制:主要解决了单台服务器存储容量有限和单台服务器并发数限制的问题。一个分片的不同副本不能放到同一个 broker 上
  • 当主题数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做一个分片
  • 分区对于kafka集群的好处是:实现了负载均衡,高存储能力,高伸缩性,分期对于消费者来说,可以提高并发度,提高效率
  • 副本:副本备份机制解决的了数据存储的高可用问题
    • 当数据之保存一份时有丢失的风险
  • kafka的副本都有哪些作用?
    • 在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。
  • 说说follower副本为什么不对外提供服务?
    • 如果follower副本也对外服务,都可以被消费者读写,那么性能肯定会有所提升,因为可供访问的节点增多,但是会产生如数据库事务中的幻读,脏读事件
    • 比如你现在写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,因为消费者b去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者c却可以消费到最新那条数据,因为它消费了leader副本。

Kafka保证数据不丢失机制

  • 从Kafka的大体角度上可以分为数据生产者,Kafka集群,还有就是消费者,而要保证数据的不丢失也要从这三个角度去考虑

生产者角度

  • 消息生产者保证数据不丢失:消息确认机制(ACK机制),参考值有三个:0,1,-1
// producer 无需等待来自 broker 的确认而继续发送下一批消息。
// 这种情况下数据传输效率最高,但是数据可靠性确是最低的。
properties.put(ProducerConfig.ACKS_CONFIG,"0");// producer 只要收到一个分区副本成功写入的通知就认为推送消息成功了。
// 这里有一个地方需要注意,这个副本必须是 leader 副本。
// 只有 leader 副本成功写入了,producer 才会认为消息发送成功。
properties.put(ProducerConfig.ACKS_CONFIG,"1");// ack=-1,简单来说就是,producer 只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
properties.put(ProducerConfig.ACKS_CONFIG,"-1");

消费者角度

  • 消费者丢失数据的情况

    • 由于Kafka consumer默认是自动提交位移的(先更新位移,再消费消息),如果消费程序出现故障,没消费完毕,则丢失了消息,此时,broker并不知道。
  • 解决方案
    • enable.auto.commit=false 关闭自动提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

消息存储及查询机制

  • Kafka 使用日志文件的方式来保存生产者消息,每条消息都有一个 offset 值来表示它在分区中的偏移量
  • Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是 <topic_name>_<partition_id>
  • Kafka 容器数据目录:/kafka/kafka-logs-kafka1

消息存储机制及查询机制

存储机制

  • Kafka 作为消息中间件,只负责消息的临时存储,并不是永久存储,所以需要删除过期的数据。如果将所有的数据都存储在一个文件中,要删除过期的数据的时候,就变得非常的麻烦。如果将其进行切分成多个文件后,如果要删除过期数据,就可以根据文件的日期属性删除即可。默认只保留 168 小时,即七天之内的数据。因此 Kafka 的数据存储方案是多文件存储。
  • Log 分段:
    • 每个分片目录中,kafka 通过分段的方式将数据分为多个 LogSegment。
    • 一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件 (00000000000000000000.index)。
    • 其中日志文件是用来记录消息的,索引文件是用来保存消息的索引
    • 每个 LogSegment 的大小可以在 server.propertieslog.segment.bytes=107370(设置分段大小,默认是 1 GB)选项进行设置。
    • 当 log 文件等于 1 G 时,新的会写入到下一个 segment 中。
    • timeindex 文件,是 kafka 的具体时间日志

通过 offset 查找 message

  • 结构:一个主题 --> 多个分区 --> 多个日志段(多个文件)。
  • 第一步 - 查询 segment file
    • segment file 命名规则跟 offset有关,根据 segment file 可以知道它的起始偏移量,因为 Segment file 的命名规则是上一个 segment文件最后一条消息的 offset。所以只要根据 offset 二分查找文件列表,就可以快速定位到具体文件
    • 比如,第一个 segment file 是 00000000000000000000.index 表示最开始的文件,起始偏移量 (offset) 为 0。第二个是 00000000000000091932.index - 代表消息量起始偏移量为 91933 = 91932 + 1。那么 offset=5000 时应该定位 00000000000000000000.index
  • 第二步 - 通过 segment file 查找 message
    • 通过第一步定位到 segment file,当 offset=5000 时,依次定位到 00000000000000000000.index的元数据物理位置和 00000000000000000000.log 的物理偏移地址,然后再通过 00000000000000000000.log顺序查找直到 offset=5000 为止。

生产者消息分发策略

  • Kafka 在数据生产的时候,有一个数据分发策略。默认的情况使用 DefaultPartitioner.class 类。
public interface Partitioner extends Configurable, Closeable {/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes The serialized key to partition on( or null if no key)* @param value The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);/*** This is called when partitioner is closed.*/public void close();}
  • 默认实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner
  • 如果是用户指定了 partition,生产就不会调用 DefaultPartitioner.partition()方法。
  • 当 ProducerRecord 的构造参数中有 partition 的时候,就可以发送到对应 partition 上。
  • ProducerRecord源码
/*** Creates a record to be sent to a specified topic and partition** @param topic The topic the record will be appended to* @param partition The partition to which the record should be sent* @param key The key that will be included in the record* @param value The record contents* @param headers The headers that will be included in the record*/
public ProducerRecord(String topic, Integer partition, K key, V value,  Iterable<Header> headers) {this(topic, partition, null, key, value, headers);
}
  • DefaultPartitioner 源码
  • 整体实现逻辑如下
    • 如果指定 key,是取决于 key 的 hash 值。
    • 如不不指定key,轮询
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取该 topic 的分区列表List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 获得分区的个数int numPartitions = partitions.size();// 如果 key 值为 null; 如果没有指定 key,那么就是轮询if (keyBytes == null) {// 维护一个 key 为 topic 的 ConcurrentHashMap,并通过 CAS 操作的方式对 value 值执行递增 +1 操作int nextValue = nextValue(topic);// 获取该 topic 的可用分区列表List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);// 如果可用分区大于 0if (availablePartitions.size() > 0) {// 执行求余操作,保证消息落在可用分区上int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// 指定了 key,key 肯定就不为 null// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}
}

消费者负载均衡机制

  • 同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 P0 分区中的数据不能被 Consumer Group A 中 C1 与 C2 同时消费。
  • 消费组:一个消费组中可以包含多个消费者,properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupName");
  • 如果该消费组有四个消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。
  • 如果有3个Partition, p0/p1/p2,同一个消费组有3个消费者,c0/c1/c2,则为一一对应关系;
  • 如果有3个Partition, p0/p1/p2,同一个消费组有2个消费者,c0/c1,则其中一个消费者消费2个分区的数据,另一个消费者消费一个分区的数据;(必须等两个消费完,另一个才消费
  • 如果有2个Partition, p0/p1,同一个消费组有3个消费者,c0/c1/c3,则其中有一个消费者空闲,另外2个消费者消费分别各自消费一个分区的数据;

kakfa配置文件说明

  • server.properties

    • broker.id = 0

      • Kafka 集群是由多个节点组成的,每个节点称为一个 broker,中文翻译是代理。每个 broker 都有一个不同的 brokerId,由 broker.id 指定,是一个不小于 0 的整数,各 brokerId 必须不同,但不必连续。如果想扩展 kafka 集群,只需引入新节点,分配一个不同的 broker.id 即可。
      • 启动 kafka 集群时,每一个 broker 都会实例化并启动一个 kafkaController,并将该 broker 的 brokerId 注册到 zooKeeper 的相应节点中。集群各 broker 会根据选举机制选出其中一个 broker 作为 leader,即 leader kafkaController。Leader kafkaController 负责主题的创建与删除、分区和副本的管理等。当 leader kafkaController 宕机后,其他 broker 会再次选举出新的 leader kafkaController。
    • log.dir = /export/data/kafka/
      • Broker 持久化消息到哪里,数据目录。
    • log.retention.hours = 168
      • Log 文件最小存活时间,默认是 168h,即 7 天。相同作用的还有 log.retention.minuteslog.retention.ms
      • 数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据,也就是消费端能够多久去消费数据。log.retention.byteslog.retention.hours 任意一个达到要求,都会执行删除,会被 topic 创建时的指定参数覆盖。
    • log.retention.check.interval.ms
      • 多长时间检查一次是否有 log 文件要删除。默认是 300000ms,即 5 分钟。
    • log.retention.bytes
      • 限制单个分区的 log 文件的最大值,超过这个值,将删除旧的 log,以满足 log 文件不超过这个值。默认是 -1,即不限制。
    • log.roll.hours
      • 多少时间会生成一个新的 log segment,默认是 168h,即 7 天。相同作用的还有 log.roll.mssegment.ms
    • log.segment.bytes
      • Log segment 多大之后会生成一个新的 log segment,默认是 1073741824,即 1G。
    • log.flush.interval.messages
      • 指定 broker 每收到几个消息就把消息从内存刷到硬盘(刷盘)。默认是9223372036854775807。
      • Kafka 官方不建议使用这个配置,建议使用副本机制和操作系统的后台刷新功能,因为这更高效。这个配置可以根据不同的 topic 设置不同的值,即在创建 topic 的时候设置值。
      • 补充说明
    在 Linux 操作系统中,把数据写入到文件系统之后,数据其实在操作系统的 page cache 里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。
    1、kafka 是多副本的,当配置了同步复制之后。多个副本的数据都在 page cache 里面,出现多个副本同时挂掉的概率比 1 个副本挂掉,概率就小很多了。
    2、操作系统有后台线程,定期刷盘。如果应用程序每写入 1 次数据,都调用一次 fsync,那性能损耗就很大,所以一般都会在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统不挂,数据就不会丢。
    
    • log.flush.interval.ms

      • 指定 broker 每隔多少毫秒就把消息从内存刷到硬盘。默认值同 log.flush.interval.messages 一样, 9223372036854775807。
    • delete.topic.enable=true
      • 是否允许从物理上删除 topic。

Linux上安装Kafka和Kafka的使用相关推荐

  1. Windows OS上安装运行Apache Kafka教程

    Windows OS上安装运行Apache Kafka教程 下面是分步指南,教你如何在Windows OS上安装运行Apache Zookeeper和Apache Kafka. 简介 本文讲述了如何在 ...

  2. linux上安装mysql,tomcat,jdk

    Linux 上安装 1 安装jdk 检测是否安装了jdk 运行 java –version 若有 需要将其卸载 a)         查看安装哪些jdk rmp –qa |grep java b)   ...

  3. linux下安装sbt_如何在Linux上安装SBT

    linux下安装sbt 介绍 (Introduction) Hi! I am Sanjula, and in this guide I hope to teach you how to install ...

  4. 在Linux上安装QT4

    比起在Windows上安装QT,在Linux上安装QT要容易多了,这都得意于Linux系统的autoconf和automake工具. 1. 将QT的源码包放至到一个目录下,笔者的目录是:/home/k ...

  5. 明明安装了模块,还是出现 错误 ImportError: No module named ‘pandas‘ 原因LINUX上安装了多个python环境,将脚本中python 改为python3问题解

    明明安装了模块,还是出现 错误 ImportError: No module named 'pandas'  原因LINUX上安装了多个python环境,将脚本中python 改为python3问题解 ...

  6. linux上安装redis

    Redis Redis在linux上安装 将redis-3.0.7.tar.gz上传到linux虚拟机上 编译: a)       因为redis是C语言开发,所以需要先编译,在linux上编译需要依 ...

  7. Linux上安装paramiko模块

    Linux上安装paramiko模块 一.paramiko模块作用 paramiko是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接.由于使用的是pyt ...

  8. 在linux上安装Filezilla文件传输FTP软件

    在linux上安装Filezilla文件传输FTP软件 1 Filezilla安装方法一:使用apt-get安装 2 Filezilla安装方法二:使用filezilla安装包进行安装 1 Filez ...

  9. 如何在 Linux 上安装服务器管理软件 Cockpit

    如何在 Linux 上安装服务器管理软件 Cockpit Cockpit 是一个自由开源的服务器管理软件,它使得我们可以通过它好看的 web 前端界面轻松地管理我们的 GNU/Linux 服务器.Co ...

  10. kali linux 的ssh服务器,如何在 Kali Linux 上安装 SSH 服务

    目的 我们的目的是 Kali Linux 上安装 SSH(安全 shell). 要求 你需要有特权访问你的 Kali Linux 安装或者 Live 系统. 困难程度 很容易! 惯例 #– 给定命令需 ...

最新文章

  1. (二十一)数组的初始化
  2. 图像相似度算法的C#实现及测评
  3. C++中cin、cin.get()、cin.getline()、getline()、gets()、getchar()、scanf()等函数的用法
  4. C语言开发笔记(二)volatile
  5. win10必须禁用的服务_7寸屏的迷你电脑,就算是8GB运行内存,也必须关闭的系统选项...
  6. 【操作系统复习】操作系统的特征
  7. 解释汇编中的AUGW LABEL WORD、ADDW LABEL WORD
  8. hibernate一对多自关联的记录(以树形菜单为例)
  9. 2022五一数学建模b题完成代码
  10. input 框隐藏光标问题
  11. python apscheculer 报错 skipped: maximum number of running instances reached (1)
  12. 天津科技大学计算机二级报名,2018年9月天津计算机二级报名6月20-25日
  13. 番茄ToDo帮助文档
  14. 输出200以内所有能被7整除的数
  15. 深入理解Spring----PostConstruct和PreDestroy
  16. 关于华为昆仑关键业务服务器
  17. 多表操作-外键级联操作
  18. QT的Listwidget控件使用
  19. 探测C库malloc元数据捕获野指针
  20. 3d打印光固化好还是热固化好_细胞3D培养解决方案

热门文章

  1. 中关村归国留学人员联创中心揭牌仪式,Jina AI 受邀出席活动
  2. 守得云开见日出——危机之下的音视频技术驱动产品创新
  3. html 滚动条 scrolltop scrollheight,JavaScript之scrollTop、scrollHeight、offsetTop、offsetHeight等属性学习笔记...
  4. 属性浏览器控件QtTreePropertyBrowser编译成动态库(设计师插件)
  5. 小白快速入门----个人微信公众号制作
  6. 【Java高级特性】I/O流——使用字符流读写文件
  7. 在EXCEL中搜索关键字
  8. springboot的定时任务的方法周期比方法的运行时间长
  9. 2020面试美团的朋友看过来,牛客网搜集整理2018—2020美团面筋需要的自行领取
  10. 艾美捷Cy5.5单琥珀酰亚基酯 Cy5.5 NHS酯解决方案