1、JAVA API操作kafka

 修改Windows的Host文件:

目录:C:\Windows\System32\drivers\etc (win10)

内容:

192.168.40.150 kafka1
192.168.40.150 kafka2
192.168.40.150 kafka3

创建maven工程导入对应maven坐标

    <properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>

1.1 生产者代码

package com.panghl.demo;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.TimeUnit;/*** @Author panghl* @Date 2021/9/16 22:07* @Description 消息生产者**/
public class ProducerDemo {private static final String TOPIC = "LGJY";public static void main(String[] args) throws InterruptedException {// 要构造一个消息生产者对象,关于kafka集群等相关配置,可以从Properties文件中加载也可以从一个Properties对象中加载// KafkaProducer按照固定的key取出对应的valueProperties properties = new Properties();// 指定集群节点properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.40.150:9092,192.168.40.150:9093,192.168.40.150:9094");// 发送消息,网络传输,需要对key和value指定对应的序列化类properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//producer无需等待来自broker的确认而继续发送下一批消息。
//这种情况下数据传输效率最高,但是数据可靠性确是最低的。
//        properties.put(ProducerConfig.ACKS_CONFIG,"0");
//producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。
//这里有一个地方需要注意,这个副本必须是leader副本。
//只有leader副本成功写入了,producer才会认为消息发送成功。properties.put(ProducerConfig.ACKS_CONFIG, "1");
//ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
//        properties.put(ProducerConfig.ACKS_CONFIG,"-1");// 创建消息生产者对象KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);// 发送一百条消息for (int i = 0; i < 100; i++) {// 设置消息的内容String msg = "hello," + i;// 构建一个消息对象: 主题(如果不存在,kafka会帮我们创建一个一个分区一个副本的主题),消息ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, msg);//发送producer.send(record);System.out.println("消息发送成功,msg=>" + msg);TimeUnit.SECONDS.sleep(1);}// 关闭消息生产者对象producer.close();}
}

1.2 消费者代码

package com.panghl.demo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.Properties;/*** @Author panghl* @Date 2021/9/16 22:24* @Description 消费者**/
public class ConsumerDemo {private static final String TOPIC = "LGJY";public static void main(String[] args) {// 属性对象Properties properties = new Properties();// 指定集群节点properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.40.150:9092,192.168.40.150:9093,192.168.40.150:9094");//反序列化类properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 取消自动提交 防止消息丢失properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");//指定分组的名称properties.put(ConsumerConfig.GROUP_ID_CONFIG, "LGJY-GROUP1");//消息消费者对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);//订阅消息kafkaConsumer.subscribe(Collections.singletonList(TOPIC));while (true) {// 获取消息的方法是一个阻断式方法ConsumerRecords<String, String> records = kafkaConsumer.poll(500);for (ConsumerRecord<String, String> record : records) {System.out.println("主题:--》" + record.topic() + ",偏移量:--》" + record.offset() + ", msg:-->" + record.value());// 手动提交kafkaConsumer.commitSync();}}}
}

2. Apache kafka原理

2.1 分区副本机制

kafka有三层结构:kafka有多个主题,每个主题有多个分区,每个分区又有多条消息。

分区机制:主要解决了单台服务器存储容量有限和单台服务器并发数限制的问题, 一个分片的不同副本不能放到同一个broker上。

当主题数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做 一个分片

副本:副本备份机制解决了数据存储的高可用问题

当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。

多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅 速”转正“,开始对外提供服务。

kafka的副本都有哪些作用?

在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的follower副本仅有一 个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。

说说follower副本为什么不对外提供服务?

这个问题本质上是对性能和一致性的取舍。试想一下,如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升 的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读

比如你现在写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,因为消费者b去读取的那个分区副本中,最新消 息还没写入。而这个时候,另一个消费者c却可以消费到最新那条数据,因为它消费了leader副本。

为了提高那么些性能而导致出现数据不一致问题,那显然是不值得的。

1.2 kafka保证数据不丢失机制

从Kafka的大体角度上可以分为数据生产者,Kafka集群,还有就是消费者,而要保证数据的不丢失也要从这三个角度去考虑。

1.2.1. 消息生产者

消息生产者保证数据不丢失:消息确认机制(ACK机制),参考值有三个:0,1,-1

//producer无需等待来自broker的确认而继续发送下一批消息。
//这种情况下数据传输效率最高,但是数据可靠性确是最低的。properties.put(ProducerConfig.ACKS_CONFIG,"0");
//producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。
//这里有一个地方需要注意,这个副本必须是leader副本。
//只有leader副本成功写入了,producer才会认为消息发送成功。properties.put(ProducerConfig.ACKS_CONFIG, "1");
//ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。properties.put(ProducerConfig.ACKS_CONFIG,"-1");

1.2.2 消息消费者

kafka消费消息的模型:

即消费消息,设置好offset,类比一下:

 什么时候消费者丢失数据呢?

由于Kafka consumer默认是自动提交位移的(先更新位移,再消费消息),如果消费程序出现故障,没消费完毕,则丢失了消息,此 时,broker并不知道。

解决方案

enable.auto.commit=false 关闭自动提交位移

在消息被完整处理之后再手动提交位移

// 取消自动提交 防止消息丢失
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
// 手动提交
kafkaConsumer.commitSync();

1.3 消息存储及查询机制

kafka 使用日志文件的方式来保存生产者消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。

Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上 的一个目录,这个目录的命名规则是<topic_name>_<partition_id>。

1.3.1 消息存储机制

1.4 生产者消息分发策略

kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。 这个

类中就定义数据分发的策略。

public interface Partitioner extends Configurable, Closeable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster
cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}

默认实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner

1) 如果是用户指定了partition,生产就不会调用DefaultPartitioner.partition()方法

数据分发策略的时候,可以指定数据发往哪个partition。

当ProducerRecord 的构造参数中有partition的时候,就可以发送到对应partition上

/**
* Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}

2) DefaultPartitioner源码

如果指定key,是取决于key的hash值

如果不指定key,轮询分发

public int partition (String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Clustercluster){
//获取该topic的分区列表List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//获得分区的个数int numPartitions = partitions.size();
//如果key值为nullif (keyBytes == null) {//如果没有指定key,那么就是轮询
//维护一个key为topic的ConcurrentHashMap,并通过CAS操作的方式对value值执行递增+1操作int nextValue = nextValue(topic);
//获取该topic的可用分区列表List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {//如果可用分区大于0
//执行求余操作,保证消息落在可用分区上int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {
// 没有可用分区的话,就给出一个不可用分区return Utils.toPositive(nextValue) % numPartitions;}} else {//不过指定了key,key肯定就不为null
// 通过计算key的hash,确定消息分区return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}

1.5 消费者负载均衡机制

同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 P0分区中的数据不能被Consumer Group A中C1与C2同时消费。

消费组:一个消费组中可以包含多个消费者,properties.put(ConsumerConfig.GROUP_ID_CONFIG,"groupName");如果该消费组有四个 消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。

  • 如果有3个Partition, p0/p1/p2,同一个消费组有3个消费者,c0/c1/c2,则为一一对应关系;
  • 如果有3个Partition, p0/p1/p2,同一个消费组有2个消费者,c0/c1,则其中一个消费者消费2个分区的数据,另一个消费者消费一个 分区的数据;
  • 如果有2个Partition, p0/p1,同一个消费组有3个消费者,c0/c1/c3,则其中有一个消费者空闲,另外2个消费者消费分别各自消费一个 分区的数据;

2、 kakfa配置文件说明

server.properties

1、broker.id=0:

kafka集群是由多个节点组成的,每个节点称为一个broker,中文翻译是代理。每个broker都有一个不同的brokerId,由broker.id指定, 是一个不小于0的整数,各brokerId必须不同,但不必连续。如果我们想扩展kafka集群,只需引入新节点,分配一个不同的broker.id即 可。

启动kafka集群时,每一个broker都会实例化并启动一个kafkaController,并将该broker的brokerId注册到zooKeeper的相应节点中。集 群各broker会根据选举机制选出其中一个broker作为leader,即leader kafkaController。leader kafkaController负责主题的创建与删除、 分区和副本的管理等。当leader kafkaController宕机后,其他broker会再次选举出新的leader kafkaController。

2、log.dir = /export/data/kafka/

broker持久化消息到哪里,数据目录

3、log.retention.hours = 168

log文件最小存活时间,默认是168h,即7天。相同作用的还有log.retention.minutes、log.retention.ms。retention是保存的意思。

数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据。 log.retention.bytes和log.retention.hours任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖。

4、log.retention.check.interval.ms

多长时间检查一次是否有log文件要删除。默认是300000ms,即5分钟。

5、log.retention.bytes

限制单个分区的log文件的最大值,超过这个值,将删除旧的log,以满足log文件不超过这个值。默认是-1,即不限制。

6、log.roll.hours

多少时间会生成一个新的log segment,默认是168h,即7天。相同作用的还有log.roll.ms、segment.ms。

7、log.segment.bytes

log segment多大之后会生成一个新的log segment,默认是1073741824,即1G。

8、log.flush.interval.messages

指定broker每收到几个消息就把消息从内存刷到硬盘(刷盘)。默认是9223372036854775807 好大。

kafka官方不建议使用这个配置,建议使用副本机制和操作系统的后台刷新功能,因为这更高效。这个配置可以根据不同的topic设置不同的 值,即在创建topic的时候设置值。

补充说明:

在Linux操作系统中,当我们把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。如果此时操作系统
挂了,其实数据就丢了。
    1、kafka是多副本的,当你配置了同步复制之后。多个副本的数据都在page cache里面,出现多个副本同时挂掉的概率比1个副本挂掉,概率
就小很多了
    2、操作系统有后台线程,定期刷盘。如果应用程序每写入1次数据,都调用一次fsync,那性能损耗就很大,所以一般都会在性能和可靠性之间
进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统不挂,数据就不会丢。

9、log.flush.interval.ms

指定broker每隔多少毫秒就把消息从内存刷到硬盘。默认值同log.flush.interval.messages一样, 9223372036854775807。 同log.flush.interval.messages一样,kafka官方不建议使用这个配置。

10、delete.topic.enable=true

是否允许从物理上删除topic

3. kafka监控与运维

3.1 kafka-eagle概述

在生产环境下,在Kafka集群中,消息数据变化是我们关注的问题,当业务前提不复杂时,我们可以使用Kafka 命令提供带有Zookeeper 客户端工具的工具,可以轻松完成我们的工作。随着业务的复杂性,增加Group和 Topic,那么我们使用Kafka提供命令工具,已经感到无能 为力,那么Kafka监控系统目前尤为重要,我们需要观察 消费者应用的细节。

为了简化开发者和服务工程师维护Kafka集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集 群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建 Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,

3.2 搭建安装 kafka-eagle

环境要求:需要安装jdk,启动zk以及kafka的服务

# 启动Zookeeper
zkServer.sh start

#启动Kafka
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &

修改windows host文件

192.168.40.171 docker-node1
192.168.40.172 docker-node2
192.168.40.173 docker-node3

搭建步骤:

1) 下载kafka-eagle的源码包

kafka-eagle官网:

http://download.kafka-eagle.org/

我们可以从官网上面直接下载最新的安装包即可kafka-eagle-bin-1.3.2.tar.gz这个版本即可

代码托管地址:

https://github.com/smartloli/kafka-eagle/releases

2) 上传安装包并解压:

这里我们选择将kafak-eagle安装在第三台

如果要解压的是zip格式,需要先安装命令支持。

yum install unzip

unzip xxxx.zip

#将安装包上传至 docker-node02服务器的/opt路径下, 然后解压
cd /opt
unzip kafka-eagle.zip
cd cd kafka-eagle-web/target/
tar -zxf kafka-eagle-web-2.0.1-bin.tar.gz -C /opt/

3) 准备数据库:

kafka-eagle需要使用一个数据库来保存一些元数据信息,我们这里直接使用msyql数据库来保存即可,在node01服务器执行以下命 令创建一个mysql数据库即可

SQLite、MySQL

--进入mysql客户端:

create database eagle;

4) 修改kafka-eagle配置文件

cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/conf
vim system-config.properties
#内容如下:
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=zk主机名:2181,zk主机名:2181,zk主机名:2181
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.40.1:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456

默认情况下MySQL只允许本机连接到MYSQL实例中,所以如果要远程访问,必须开放权限: update user set host = '%' where user ='root'; //修改权限

flush privileges; //刷新配置

5) 配置环境变量

kafka-eagle必须配置环境变量,node02服务器执行以下命令来进行配置环境变量

vi /etc/profile
#内容如下:
export KE_HOME=/opt/kafka-eagle-web-2.0.1/
export PATH=:$KE_HOME/bin:$PATH
#让修改立即生效,执行
source /etc/profile

6) 启动kakfa-eagle

cd /opt/kafka-eagle-web-2.0.1/binchmod u+x ke.sh
./ke.sh start

7) 访问主界面:

http://docker-node3:8048/ke/account/signin?/ke/

用户名:admin 密码:123456

kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2相关推荐

  1. Elastic search入门到集群实战操作详解(原生API操作、springboot整合操作)-step1

    Elastic search入门到集群实战操作详解(原生API操作.springboot整合操作)-step2 https://blog.csdn.net/qq_45441466/article/de ...

  2. Kafka系列三 java API操作

    使用java API操作kafka 1.pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xs ...

  3. 【Java】内存机制详解(new操作的执行原理)

    0.参考资料: http://www.j2megame.org/index.php/content/view/2246/125.html 1.Java的内存机制 Java 把内存划分成两种:一种是栈内 ...

  4. kafka详解及搭建

    一.基本概念 介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计. 这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: Kaf ...

  5. kafka详解及集群环境搭建

    一.kafka详解 安装包下载地址:https://download.csdn.net/download/weixin_45894220/87020758 1.1Kafka是什么? 1.Kafka是一 ...

  6. Hadoop详解(四):HDFS shell操作和Java API操作

    1. HDFS环境准备 1.1 HDFS的格式化与启动 HDFS配置完之后就可以对其进行格式化操作.在NameNode所在机器上执行如下命令进行HDFS的格式化操作: hadoop namenode ...

  7. java jdbc 回滚_java_详解Java的JDBC API中事务的提交和回滚,如果JDBC连接是在自动提交模式 - phpStudy...

    详解Java的JDBC API中事务的提交和回滚 如果JDBC连接是在自动提交模式下,它在默认情况下,那么每个SQL语句都是在其完成时提交到数据库. 这可能是对简单的应用程序,但有三个原因,你可能想关 ...

  8. java实现标准化考试系统详解(四)-----初始化操作实现

    (一)初始化操作实现 如上图所示当管理员需要更改适用工程.试题数量.考试时间时直接在文本中更改就好我们只需要每次在用户打开程序时初始化这些参数就可以 1.初始化试题模型,这里需要实现随机抽题,方法是用 ...

  9. 详解Java解析XML的四种方法

    http://developer.51cto.com  2009-03-31 13:12  cnlw1985  javaeye  我要评论(8) XML现在已经成为一种通用的数据交换格式,平台的无关性 ...

最新文章

  1. 10种Git技巧,让你省时省力又省心!
  2. Windows核心编程 第十七章 -内存映射文件(上)
  3. 记录git命令:本地创建项目后如何上传到github上
  4. 三维重建面试0:*SLAM滤波方法的串联综述
  5. postgreSQL源码分析——索引的建立与使用——Hash索引(1)
  6. PHP ceil()函数
  7. 目标检测——域自适应只对同源的样本有效
  8. 程序员平均年薪 70 万、40 岁后收入下滑?尽在 2019 程序员薪资报告
  9. unordered_set/unordered_map 增删查操作
  10. java 文件下载 并发_高并发下载tomcat下的文件时,发生java.net.SocketException: Connection reset解决方案...
  11. python协程,asyncIO
  12. 漫画 | 前端发展史的江湖恩怨情仇
  13. Win7如何简单的关闭445端口及445端口入侵详解
  14. pandas分组计算平均值_Pandas之分组计算
  15. python函数使用格式刷_Excel格式刷用法汇总分享,学到就是赚到!
  16. 计算机专业的黑板报内容,新学期黑板报文字资料参考
  17. 利用PS抠出水印字并添加到图片
  18. 人工智能数学基础--概率与统计7:学习中一些术语的称呼或表示变化说明以及独立事件的一些补充推论
  19. 80%的经理人都不知道的邮件常识
  20. 高通平台的usb2.0测试_深圳格拉布斯研究院全自动高通量催化剂制备及筛选设备已正式启用...

热门文章

  1. 鸟哥的Linux学习笔记
  2. 教你如何做好团购和团购推广
  3. 基于android的团购app设计与实现,基于Android平台的团购系统设计与实现
  4. java学生综合素质评价系统_ssm学生综合素质评价系统
  5. 2-网络市场与网络消费者行为分析网络营销
  6. Windows Server 2008(基本环境配置)
  7. 深度完美 GhostXP_SP3 纯净选择版_2011.08
  8. Win32下Foxbase+数据库浏览程序的编写 (转)
  9. 包含mshtml, SHDocVw, AxSHDocVw的引用以及如何添加axWebBrowser控件到工具箱
  10. 如何查看电脑连接过的WiF的密码(Windows版)