Kafka介绍

Kafka最初是LinkedIn的一个内部基础设施系统,它可以帮助处理持续数据流的组件。在最初开发设计理念上,开发者不想只是开发一个能够存储数据的系统,比如关系型数据库、Nosql数据库等,更希望把数据看成一个持续变化和不断增长的流,因此基于这种想法构建出了一个数据系统。
Kafka作为一种分布式消息系统,允许消息发布和订阅,但是Kafka又不仅仅是一个消息中间件,它和传统的消息系统有很大的差异:
(1)Kafka作为一个分布式系统,以集群的方式运行,支持节点自由伸缩;
(2)Kafka消息是记录在磁盘日志文件中,天生持久化,支持按照时间和文件大小进行消息保存;
(3)传统的消息系统只传递数据,而Kafka提供了流式处理将数据处理的层次提升到了新高度,它可以让开发者使用很少的代码就能动态地处理派生流和数据集。

Kafka基本概念

消息和批次

消息作为Kafka的数据单元,由字节数组组成,消息除了传递的消息内容之外,还可以包含消息键,主要用于对消息选取分区。并且 Kafka作为一个高效的消息系统,支持对消息进行分批次写入,而批次就是一组消息,这一组消息同属于一个主题和分区。
对Kafka生产者来说发送一组消息,如果每次只传递单条消息,会导致大量的网络开销,而消息分批次传递,批次中包含的消息越多,单位时间内处理的消息也就越多,那么网络开销也会越少,但是带来的是单个消息的传输时间更长,所以是否使用批次需要在时间延迟和吞吐量之间做权衡。
消息对于Kafka来说是晦涩难懂的字节数组,所以对Kafka传递和消费消息需要指定序列化和反序列化器。常用的序列化格式有JSON和XML,还有Avro(Hadoop开发的一款序列化框架),具体怎么使用依据自身的业务来定。

保留消息

Kafka中的消息时存放在磁盘日志文件中的,它不会像Mysql之类的永久存储,而是配置了在一定期限内保留消息。Kafka默认的保留策略:要么保留一段时间(默认7天),要么保留一定大小(比如1G),那么到了限制之后,旧消息会过期并删除,在开发时可以为每个主题根据不同业务特性配置不同的保留策略。

主题和分区

Kafka里的消息是通过主题进行分类的,主题好比数据库中的表,一个主题下又可以被分为若干个分区,好比分表的技术。一个分区本质上是一个提交日志文件,对于消息会以追加的方式写入分区日志文件中,并按照先入先出的顺序读取。

由于一个主题下会有多个分区,因此在整个主题的范围内无法保证消息的顺序,但是对单个分区来说是可以保证顺序的。Kafka通过分区实现数据冗余和伸缩性,分区可以分布在集群中不同的服务器节点上,也就是说一个主题能够跨域多个服务器节点,因此对主题下分区的磁盘读写,会分散到多台服务器,这也就是Kafka高性能的一个原因。

生产者和消费者、偏移量、消费者群组

生产者和消费者是作为消息中间件的基本概念,在Kafka中生产者负责将一个消息发布到指定的的主题上,而如果一个主题下存在多个分区,默认情况下生产者会把消息均衡发布到主题的所有分区上,而不会关心消息写入哪个分区。当前某些情况需要将消息写入到指定分区时,则可以通过消息里的消息键和分区器来实现。
Kafka中消费者会通过订阅一个或多个主题,并按照消息的生成顺序进行读取。消费者通过消息的偏移量来区分消息是否被读取过,而偏移量也属于Kafka的一种元数据,一个不断递增的整数值,每一个消息被创建时,Kafka都把它添加到消息里。在一个主题的一个分区里,每个消息的偏移量都是唯一的,并且每个分区最后读取的消息偏移量会保存在Zookeeper或者Kafka上,这样即使消费者关闭或者重启,分区的消息读取状态也不会丢失。
消费者群组由共同读取一个主题的多个消费者构成,群组可以保证主题下的每个分区只能被一个消费者消费,消费者和分区之间的这种映射关系叫做消费者对分区的所有权关系,从下图可以很明显看出,一个分区只有一个消费者,而一个消费者可以有多个分区。

Broker 和集群

一个独立的Kafka服务器称为Broker。Broker负责接收生产者消息,为消息设置偏移量,并提交消息到磁盘进行保存。Broker为消费者提供服务,对读取主题下分区的请求进行响应,返回已经提交到磁盘上的消息。在合适的硬件基础上,单个Broker可以处理上千个分区和每秒百万级的消息量。
多个Broker组成一个集群,每个集群中所有Broker会通过选举,选出一个Broker充当集群控制器的角色,控制器负责管理工作,包括将分区分配给Broker和监控Broker。在集群里,一个分区从属于一个Broker,这个Broker称为首领,但是一个分区又可以被分配到多个Broker上,这个时候就会发生分区复制,而集群中Kafka内部通过使用管道技术进行高效的复制。

Kafka安装和启动

Kafka需要Zookeeper保存集群的元数据和消费者信息,但Kafka本身会自带Zookeeper,但是从稳定性考虑,应用使用单独的Zookeeper服务,并建立集群。

下载Kafka

前往http://kafka.apache.org/downloads 上寻找合适的版本下载,下载后解压到本地目录

>tar -xzf kafka_2.11-2.3.0.tgz
>cd kafka_2.11-2.3.0

启动服务

运行Kafka需要先启动Zookeeper服务,这里使用Kafka自带配置好的Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

再启动Kafka服务

bin/kafka-server-start.sh config/server.properties

出现以下画面表示成功

Kafka常用命令行操作

1、列出所有主题

bin/kafka-topics.sh --zookeeper localhost:2181 --list

2、创建主题,创建一个名为test的主题,指定1个副本,1个分区

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

3、创建生产者(发送消息),当执行producer脚本后,会出现输入提示符,可以输入消息,然后它会发送到对应的Broker上

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello Kafka

4、创建消费者(消费消息),执行consumer脚本后,可以看到,消费者会一直处于监听状态,每当生产者发送一条消息,就会更新一条消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

5、增加分区

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

6、列出消费者群组

bin/kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list

简单的Kafka应用程序

引入maven依赖包

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version>
</dependency>

生产者发送消息

    // 定义主题private static final String TOPIC = "mytopic";public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:8081");//broker地址清单,多个broker可以用逗号隔开//指定key和value序列化器props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者实例KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);try {ProducerRecord<String,String> record;try {// 发送业务消息record = new ProducerRecord<String,String>(TOPIC, null,"hello kafka!");producer.send(record);} catch (Exception e) {e.printStackTrace();}} finally {producer.close();}}

消费者接收消息

    private static final String TOPIC = "mytopic";public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:8081");//单节点,kafka多节点时候使用,逗号隔开props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//可以指定消费者群组props.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);try {consumer.subscribe(Arrays.asList(TOPIC));//订阅主题,支持订阅多个while (true) {//拉取消息,其中Duration.ofMillis(500)拉取时会把元数据获取也记入整个超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}//默认会自动提交偏移量}} finally {consumer.close();}}

Spring集成Kafka

引入spring-kafka的maven依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version>
</dependency>

Spring整合Kafka工程中主要的一些配置文件

kafka.properties文件中,配置Kafka生产者和消费者的相关属性

# brokers配置
bootstrap.servers=localhost:9092
# 发送方确认机制,缺省1
kafka.producer.acks = 1
# 失败重试次数
kafka.producer.retries = 2
# 指定了生产者在发送批次前等待更多消息加入批次的时间,  缺省0
kafka.producer.linger.ms =  10
# 生产者内存缓冲区大小
kafka.producer.buffer.memory = 32 * 1024 * 1024
# 一个批次可以使用的内存大小 缺省16384(16k)
kafka.producer.batch.size = 16384
# 序列化器
kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer
kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer# Kafka consumer
kafka.consumer.bootstrap.servers = localhost:9092
kafka.consumer.concurrency = 3
# 是否自动提交偏移量
kafka.consumer.enable.auto.commit = true
# 自动提交偏移量的周期
kafka.consumer.auto.commit.interval.ms=1000
# 指定消费者组
kafka.consumer.group.id= group1
# 反序列化器
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer

kafka-producer.xml配置信息

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-4.0.xsd"><context:property-placeholder location="classpath*:kafka/kafka.properties" />    <!-- 定义producer的参数 --><bean id="producerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="${bootstrap.servers}" /><entry key="retries" value="${kafka.producer.retries}" /><entry key="batch.size" value="${kafka.producer.batch.size}" /><entry key="linger.ms" value="${kafka.producer.linger.ms}" /><entry key="buffer.memory" value="${kafka.producer.buffer.memory}" /><entry key="acks" value="${kafka.producer.acks}" /><entry key="key.serializer" value="${kafka.producer.key.serializer}" /><entry key="value.serializer" value="${kafka.producer.value.serializer}" /></map></constructor-arg></bean>    <!-- 指定使用的Producerfactory --><bean id="producerFactory"class="org.springframework.kafka.core.DefaultKafkaProducerFactory"><constructor-arg><ref bean="producerProperties"/></constructor-arg></bean><!-- 指定监听器 --><bean id="producerSendListener" class="cn.kafka.config.ProducerSendListener" /><!-- 指定KafkaTemplate这个bean,使用的时候,只需要注入KafkaTemplate,就可以对kafka进行操作 --><bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"><constructor-arg ref="producerFactory" /><constructor-arg name="autoFlush" value="true" /><!-- 配置发送监听器bean --><property name="producerListener" ref="producerSendListener"></property></bean>
</beans>

实现指定监听器,提供发送方确认

/*** 自定义监听器,发送方确认*/
public class ProducerSendListener implements ProducerListener {public void onSuccess(String topic, Integer partition,Object key, Object value, RecordMetadata recordMetadata) {//发送成功System.out.println("topic:"+recordMetadata.topic()+"-offset: "+recordMetadata.offset()+"-" +"partition: "+recordMetadata.partition());}public void onError(String topic, Integer partition,Object key, Object value, Exception exception) {//消息发送失败,执行其它业务操作,或者重试}public boolean isInterestedInSuccess() {return true;}
}

kafka-consumer.xml配置信息

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-4.0.xsd"><context:property-placeholder location="classpath*:kafka/kafka.properties" />    <!-- 定义Consumer的参数 --><bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}" /><entry key="group.id" value="${kafka.consumer.group.id}" /><entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}" /><entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}" /><entry key="key.deserializer" value="${kafka.consumer.key.deserializer}" /><entry key="value.deserializer" value="${kafka.consumer.value.deserializer}" /></map></constructor-arg></bean><!-- 创建ConsumerFactory --><bean id="consumerFactory"class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" ><constructor-arg><ref bean="consumerProperties" /></constructor-arg></bean><!-- 指定消费者实现类 --><bean id="kafkaConsumerService" class="cn.kafka.service.KafkaConsumerService" /><!-- 消费者容器配置信息 --><bean id="containerProperties"class="org.springframework.kafka.listener.ContainerProperties"><constructor-arg name="topics"><list><value>my-topic</value></list></constructor-arg><property name="messageListener" ref="kafkaConsumerService"></property></bean><!-- 消费者并发消息监听容器,执行doStart方法 --><bean id="messageListenerContainer"class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"init-method="doStart" ><constructor-arg ref="consumerFactory" /><constructor-arg ref="containerProperties" /><property name="concurrency" value="${kafka.consumer.concurrency}" /></bean>
</beans>

实现消息者监听,接收消息

/*** 消费者监听,对消费的消息进行处理* 实现MessageListener接口,消费者会默认自动提交偏移量* 实现AcknowledgingMessageListener接口,消费者可以手动提交偏移量*/
public class KafkaConsumerService implements MessageListener<String,String> {public void onMessage(ConsumerRecord<String, String> data) {//接收业务消息,执行响应业务方法String name = Thread.currentThread().getName();System.out.println(name+"|"+String.format("主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",data.topic(),data.partition(),data.offset(),data.key(),data.value()));}
}

Spring配置文件中引入Kafka配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-4.0.xsd"><!-- 配置扫描路径 --><context:component-scan base-package="cn.kafka"><context:exclude-filter type="annotation"expression="org.springframework.stereotype.Controller"/></context:component-scan><!-- 引入kafka配置文件,根据个人文件位置--><import resource="classpath*:kafka/kafka-consumer.xml"/><import resource="classpath*:kafka/kafka-producer.xml"/>
</beans>

向Kafka的指定主题发送消息

@Controller
@RequestMapping("/kafka")
public class KafkaController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;private Logger logger = LoggerFactory.getLogger(KafkaController.class);private  static final String TOPIC="my-topic" ; /*** @param message* @return String*/@ResponseBody@RequestMapping("/pushMessage")public String queueSender(@RequestParam("message")String message){try {kafkaTemplate.send(TOPIC,message);} catch (Exception e) {logger.error("send message error: "+e.getMessage());return "failed";}return "Success";}
}

Kafka入门和使用相关推荐

  1. Kafka入门教程与详解

    1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件负责处理连接服务.消息的路由和传送.持久 ...

  2. kafka入门:简介、使用场景、设计原理、主要配置及集群搭建

    为什么80%的码农都做不了架构师?>>>    kafka入门:简介.使用场景.设计原理.主要配置及集群搭建(转) 问题导读: 1.zookeeper在kafka的作用是什么? 2. ...

  3. Kafka 入门和 Spring Boot 集成

    2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...

  4. kafka分区与分组原理_大数据技术-Kafka入门

    在大数据学习当中,主要的学习重点就是大数据技术框架,针对于大数据处理的不同环节,需要不同的技术框架来解决问题.以Kafka来说,主要就是针对于实时消息处理,在大数据平台当中的应用也很广泛.大数据学习一 ...

  5. kafka topic 一段时间不消费_全网最通俗易懂的 Kafka 入门

    众所周知,消息队列的产品有好几种,这里我选择学习Kafka的原因,无他,公司在用. 我司使用的是Kafka和自研的消息队列(Kafka和RocketMQ)改版,于是我就想学学Kafka这款消息队列啦. ...

  6. Kafka入门篇学习笔记整理

    Kafka入门篇学习笔记整理 Kafka是什么 Kafka的特性 应用场景 Kafka的安装 单机版部署 集群部署环境准备 Kafka 2.x集群部署 Kafka 3.x集群部署 监听器和内外网络 K ...

  7. Kafka : Kafka入门教程和JAVA客户端使用

    目录 目录 Kafka简介 环境介绍 术语介绍 消费模式 下载 集群安装配置 命令使用 JAVA实战 参考文献 Kafka简介 由Scala和Java编写,Kafka是一种高吞吐量的分布式发布订阅消息 ...

  8. Kafka入门教程(一)

    转自:https://blog.csdn.net/yuan_xw/article/details/51210954 1 Kafka入门教程 1.1 消息队列(Message Queue) Messag ...

  9. Kafka教程(一)Kafka入门教程

    Kafka教程(一)Kafka入门教程 1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件 ...

  10. Kafka 入门 (一)

    Kafka 入门(一) Apache Kafka起源于LinkedIn,后来于2011年成为开源Apache项目,然后于2012年成为First-class Apache项目.Kafka是用Scala ...

最新文章

  1. Firefox 与 IE 已死?Chrome 一统天下!
  2. 电机控制应用中的电磁兼容性设计与测试标准
  3. nginx服务器安装及配置文件详解
  4. 优化SQL步骤——查看SQL执行频率 || 定位低效率执行SQL
  5. 基于vue-cli、elementUI的Vue超简单入门小例子
  6. freemarker的空值和默认值
  7. 推荐一个硬核嵌入式的原创公众号
  8. html生成xml文件,字符串xml生成xml文件
  9. 【每日SQL打卡】​​​​​​​​​​​​​​​DAY 15丨查询活跃业务【难度中等】
  10. LoadRunner调用Oracle存储过程
  11. 阶段3 2.Spring_06.Spring的新注解_6 Qualifier注解的另一种用法
  12. 深度原理与框架-图像超分辨重构-tensorlayer
  13. 用C语言实现死亡之ping
  14. 微信商户支付平台微信支付怎么开通
  15. 一分钟解决微信小程序截图(截屏问题)
  16. 电脑桌面计算机图标下不显示文字,电脑桌面图标下面的文字有时会突然不见,然后 – 手机爱问...
  17. 第三集 怪物学院 第十八章
  18. 平面设计基础学习-1
  19. 手机厂商“卷”到了手腕上
  20. 第二集:你真的会吸气吗 ?科学呼吸法(汇播课程演说笔记)

热门文章

  1. Centos安装radis
  2. python --- 常见题目 2019.01.03
  3. Java 重载(什么是重载?什么时候重载?重载有什么好处?)
  4. kubernetes cordon原理
  5. 如何解决Windows 无法完成格式化SD卡问题?
  6. Duplicate class okhttp3...
  7. Word学习笔记:P1-页面简介文字编辑
  8. Spark GraphX-航班飞行网图分析
  9. 华为鸿蒙电视什么屏幕,荣耀智慧屏出世,鸿蒙真容貌!和智能电视究竟有什么区别?...
  10. 使用MySQL管理工具-SQLyog 9.63报错号码2058,超详细解析