目录

  • 目录
  • Kafka简介
  • 环境介绍
  • 术语介绍
  • 消费模式
  • 下载
  • 集群安装配置
  • 命令使用
  • JAVA实战
  • 参考文献

Kafka简介

由Scala和Java编写,Kafka是一种高吞吐量的分布式发布订阅消息系统.

环境介绍

操作系统:centos6.5
kafka:1.0.1
zookeeper:3.4.6

术语介绍

  • Broker : Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic : 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • Partition : Partition是物理上的概念,每个Topic包含一个或多个Partition.
  • Producer : 负责发布消息到Kafka broker
  • Consumer : 消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group : 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

消费模式

为了照顾对MQ不是很了解的同学,先讲一下MQ的原理.一般MQ都是在服务端存储一个队列.生产者把消息丢到MQ server,消费者从MQ server消费.这样一来解决了生产者和消费者的高耦合问题,同时也解决了生产速度和消费速度差异导致的消费者跟不上生产者的生产速度而导致的消费者压力过大问题.

在kafka中的topic就是一系列队列的总称,称为一个主题.当然ActiveMQ和RabbitMQ中都有这个概念.一类消息都会丢到一个topic中去.

讲完topic我们讲一下partition(分区),这个东西是kafka独有的东西,也是kafka实现横向扩展和高并发的一个重要设计.我们试想一下,如果每个topic只有一个队列,随着业务增加topic里消息越来越多.多到一台server装不下了怎么办.为了解决这个问题,我们引入了partition这个概念.一个partition(分区)代表了一个物理上存在的队列.topic只是一组partition(分区)的总称,也就是说topic仅是逻辑上的概念.这样一来当topic上的消息越来越多.我们就可以将新增的partition(分区)放在其他server上.也就是说topic里边的partition(分区)可以分属于不同的机器.实际生产中,也基本都是这样玩的.

这里说一个特殊情况,有时我们创建了一个topic没有指定partition(分区)数量或者指定了partition(分区)数量为1,这时实际也是有一个默认的partition(分区)的,名字我忘记了.

从Producer(生产者)角度,一个消息丢到topic中任务就完成了.至于具体丢到了topic中的哪个partition(分区),Producer(生产者)不需要关注.这里kafka自动帮助我们做了负载均衡.当然如果我们指定某个partition(分区)也是可以的.这个大家官方文档和百度.

接下里我们讲Consumer Group(消费组),Consumer Group(消费组)顾名思义就是一组Consumer(消费者)的总称.那有了组的概念以后能起到什么作用.如果只有一组内且组内只有一个Consumer,那这个就是传统的点对点模式,如果有多组,每组内都有一个Consumer,那这个就是发布-订阅(pub-sub)模式.每组都会收到同样的消息.

最后讲最难理解也是大家讨论最多的地方,partition(分区)和Consumer(消费者)的关系.首先,一个Consumer(消费者)的一个线程在某个时刻只能接收一个partition(分区)的数据,一个partition(分区)某个时刻也只会把消息发给一个Consumer(消费者).我们设计出来几种场景:

场景一: topic-1 下有partition-1和partition-2
group-1 下有consumer-1和consumer-2和consumer-3
所有consumer只有一个线程,且都消费topic-1的消息.
消费情况 : consumer-1只消费partition-1的数据
consumer-2只消费partition-2的数据
consumer-3不会消费到任何数据
原因 : 只能接受一个partition(分区)的数据

场景二: topic-1 下有partition-1和partition-2
group-1 下有consumer-1
consumer只有一个线程,且消费topic-1的消息.
消费情况 : consumer-1先消费partition-1的数据
consumer-1消费完partition-1数据后开始消费partition-2的数据
原因 : 这里是kafka检测到当前consumer-1消费完partition-1处于空闲状态,自动帮我做了负载.所以大家看到这里在看一下上边那句话的”某个时刻”
特例: consumer在消费消息时必须指定topic,可以不指定partition,场景二的情况就是发生在不指定partition的情况下,如果consumer-1指定了partition-1,那么consumer-1消费完partition-1后哪怕处于空闲状态了也是不会消费partition-2的消息的.

进而我们总结出了一条经验,同组内的消费者(单线程消费)数量不应多于topic下的partition(分区)数量,不然就会出有消费者空闲的状态,此时并发线程数=partition(分区)数量.反之消费者数量少于topic下的partition(分区)数量也是不理想的,原因是此时并发线程数=消费者数量,并不能完全发挥kafka并发效率.

最后我们看下上边的图,Consumer Group A的两个机器分别开启两个线程消费P0 P1 P2 P3的消息Consumer Group B的四台机器单线程消费P0 P1 P2 P3的消息就可以了.此时效率最高.

下载

下载地址:http://kafka.apache.org/downloads
这里我们下载到/usr/local目录下

集群安装配置

解压 : cd /usr/local && tar -xzvf kafka_2.11-1.0.1.tgz

创建log目录 : cd /usr/local/kafka_2.11-1.0.1 && mkdir kafkaLogs

配置:vi /usr/local/kafka_2.11-1.0.1/config/server.properties需改下边五个地方

#broker的id,集群中的每台机器id唯一,其他两台分别1和2
broker.id=0
#是Kafka绑定的interface,这里需要写本机内网ip地址,不然bind端口失败
#其他两台分别是192.168.1.5和192.168.1.9
host.name=192.168.1.3
#向zookeeper注册的对外暴露的ip和port,118.212.149.51是192.168.1.3的外网ip地址
#如果不配置kafka部署在外网服务器的话本地是访问不到的.
advertised.listeners=PLAINTEXT://118.212.149.51:9092
#zk集群的ip和port,zk集群教程:
zookeeper.connect=192.168.1.3:2181,192.168.1.5:2181,192.168.1.9:2181
#log目录,刚刚上边建好的.
log.dirs=/usr/local/kafka_2.11-1.0.1/kafkaLogs

启动集群(分别在三台broker执行):进入bin目录cd /usr/local/kafka_2.11-1.0.1/bin/执行启动脚本并指定配置文件./kafka-server-start.sh -daemon ../config/server.properties

验证集群是否启动成功:

[root@template ~]# cd /usr/local/zookeeper-3.4.6/bin/
[root@template bin]# ./zkCli.sh -server 127.0.0.1:2181
...
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /brokers/ids
[0, 1, 2] #这里的012分别是三个broker的id

查看某个broker信息:注意endpoints信息的ip:port,这个就是我们对外服务暴露的地址,我这里是外网访问,所以暴露的是外网ip和端口

[zk: 127.0.0.1:2181(CONNECTED) 1] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://118.212.149.51:9092"],"jmx_port":-1,"host":"118.212.149.51","timestamp":"1521010377533","port":9092,"version":4}
cZxid = 0x700000626
ctime = Wed Mar 14 14:52:57 CST 2018
mZxid = 0x700000626
mtime = Wed Mar 14 14:52:57 CST 2018
pZxid = 0x700000626
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3621e366ae20014
dataLength = 198
numChildren = 0

命令使用

创建topic :

#--replication-factor 创建的副本数,这个使用来备份的.副本数不能大于broker数
#--partitions 1 创建的分区数.根据实际情况创建
./kafka-topics.sh --create --zookeeper 192.168.1.3:2181 --replication-factor 1 --partitions 1 --topic milo

查看topic :

./kafka-topics.sh --list --zookeeper 192.168.1.3:2181

查看topic详细信息 :

./kafka-topics.sh --describe --zookeeper 192.168.1.3:2181

结果如下:

第一行topic信息摘要:分别是topic名字(Topic),partition数量(PartitionCount),副本数量(ReplicationFactor),配置(Config)
第二行~第四行分别列出了名为milo的topic的所有partition.依次为topic名字(Topic),partition号(Partition),此partition所在的borker(Leader),副本所在的broker(Replicas),Isr列表(Isr)
ps:同步状态的副本的集合(a set of in-sync replicas),简称ISR,通俗理解就是替补队员,不是每个broker都可以作为替补队员.首先这个broker得存有副本,其次副本还得满足条件.就像我们大学足球队,有的人是替补,有的人连大名单都没进去,原因是他不会踢球. ^ ^

生产消息 :

./kafka-console-producer.sh --broker-list 118.212.149.51:9092 --topic test\
>hello world

消费消息 :

./kafka-console-consumer.sh --zookeeper 118.212.149.51:2181 --topic milo --from-beginning
hello world

JAVA实战

pom.xml

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.1</version></dependency></dependencies>

Producer.java

package cn.milo.kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;import java.util.Properties;/************************************************************ @ClassName   : Producer.java                                            ****** @author      : milo ^ ^                     ****** @date        : 2018 03 14 11:34     ****** @version     : v1.0.x                      *******************************************************/
public class Producer {static Logger log = Logger.getLogger(Producer.class);private static final String TOPIC = "milo2";private static final String BROKER_LIST = "118.212.149.51:9092";private static KafkaProducer<String,String> producer = null;/*初始化生产者*/static {Properties configs = initConfig();producer = new KafkaProducer<String, String>(configs);}/*初始化配置*/private static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return properties;}public static void main(String[] args) throws InterruptedException {//消息实体ProducerRecord<String , String> record = null;for (int i = 0; i < 1000; i++) {record = new ProducerRecord<String, String>(TOPIC, "value"+(int)(10*(Math.random())));//发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (null != e){log.info("send error" + e.getMessage());}else {System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));}}});}producer.close();}
}

Consumer :

package cn.milo.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;import java.util.Properties;/************************************************************ @ClassName   : Consumer.java                                            ****** @author      : milo ^ ^                     ****** @date        : 2018 03 14 15:50     ****** @version     : v1.0.x                      *******************************************************/
public class Consumer {static Logger log = Logger.getLogger(Producer.class);private static final String TOPIC = "milo2";private static final String BROKER_LIST = "118.212.149.51:9092";private static KafkaConsumer<String,String> consumer = null;static {Properties configs = initConfig();consumer = new KafkaConsumer<String, String>(configs);}private static Properties initConfig(){Properties properties = new Properties();properties.put("bootstrap.servers",BROKER_LIST);properties.put("group.id","0");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("enable.auto.commit", "true");properties.setProperty("auto.offset.reset", "earliest");return properties;}public static void main(String[] args) {while (true) {ConsumerRecords<String, String> records = consumer.poll(10);for (ConsumerRecord<String, String> record : records) {log.info(record);}}}
}

参考文献

[1].kafka 学习 非常详细的经典教程 : http://blog.csdn.net/tangdong3415/article/details/53432166
[2].Kafka入门与实践.牟大恩

Kafka : Kafka入门教程和JAVA客户端使用相关推荐

  1. Swig超详细入门教程(Java调用C/C++, CMake)——更新于2021.12

    目录 相关教程 环境配置 0基础上手例子(C/C++) 使用CMake的例子(C语言) 使用CMake的例子(C++) 本文主要是手把手教萌新们如何用官方用例构建(有许多本人亲身踩坑血泪史) 相关教程 ...

  2. KafKa基本入门教程

    2019独角兽企业重金招聘Python工程师标准>>> 本教程开始前,假设你已经下载并解压到相应目录下 windows 下启动kafka 第1步:启动服务器 进入到kafka安装目录 ...

  3. Spring Cloud入门教程(二):客户端负载均衡(Ribbon)

    对于大型应用系统负载均衡(LB:Load Balancing)是首要被解决一个问题.在微服务之前LB方案主要是集中式负载均衡方案,在服务消费者和服务提供者之间又一个独立的LB,LB通常是专门的硬件,如 ...

  4. Java经典入门教程!java集合框架面试

    JAVA基础 JAVA异常分类及处理 异常分类 异常的处理方式 Throw和throws的区别 JAVA反射 动态语言 反射机制概念 (运行状态中知道类所有的属性和方法) Java反射API 反射使用 ...

  5. air调用java,AIR2.0入门教程:与Java应用交互

    在之前的一篇文章中,我介绍了如何使用AIR2.0新增的NativeProcess类与本地进程进行交互和通讯,在那个例子里面我们使用了C++ 的代码,实际上只要是基于命令行的标准输入输出,AIR2.0的 ...

  6. Kafka使用入门教程

    转载自http://www.linuxidc.com/Linux/2014-07/104470.htm https://www.cnblogs.com/dhl-2013/p/5678704.html ...

  7. Java开发入门教程!java开发架构师职责

    什么是Service Mesh 作为Service Mesh技术探索和实践的先行者,全球第一个真正的Service Mesh项目Linkerd负责人.Buoyant公司创始人兼CEO William ...

  8. Java api 入门教程 之 JAVA的IO处理

    IO是输入和输出的简称,在实际的使用时,输入和输出是有方向的.就像现实中两个人之间借钱一样,例如A借钱给B,相对于A来说是借出,而相对于B来说则是借入.所以在程序中提到输入和输出时,也需要区分清楚是相 ...

  9. Java入门教程:Java初学者容易犯的错误

    万事开头难,Java编程的初学者常常会遇到各种各样的问题.对于自学的读者来说,则是需要花费更多的时间.精力来解决这些问题,而且一旦遇到的问题几天都得不到解决,往往会带来很大的挫败感. 所以本节介绍一些 ...

最新文章

  1. VSCode环境下配置ESLint 对Vue单文件的检测
  2. linux rpm安装不成功,rpm 包不能成功安装
  3. 吴恩达机器学习笔记:(二)代价函数
  4. Python:目录和文件的操作模块os.path和OS常用方法
  5. maven项目的一键构建
  6. 腾讯技术课|基于Elastic Stack 搭建日志分析平台
  7. 使用ADF绑定创建视图对象行CreateInsert操作
  8. 写两个函数,分别求两个整数的最大公约数和最小公倍数,用主函数调用这两个函数,并输出结果两个整数由键盘输入。
  9. jQuery插件FontSizer实现自定义动态调整网页文字大小
  10. Unity渲染管线-百人计划笔记
  11. 【springboot基础】配置日志输出级别以及输出位置
  12. linux下超强命令(shell语句)组合
  13. 米线店结账程序 装饰着模式_真实数据:外卖销售9999+ 长沙米线万单店 它究竟是怎么做到的?...
  14. Security+ 学习笔记35 配置管理
  15. poj1274 匈牙利算法 二分图最大匹配
  16. 【UCOSIII操作系统】事件篇
  17. android setting 开发者模式,Android 设置 Setting ---开发者选项 中选项为默认配置
  18. 谷歌用AI技术预测病患死亡时间 到底是福是祸?
  19. 【VMD-SSA-LSSVM】基于变分模态分解与麻雀优化Lssvm的负荷预测【多变量】(Matlab代码实现)
  20. The source branch is being deleted

热门文章

  1. Struts1、Struts2及SpringMVC对比
  2. Cortex-M3处理器内核与基于Cortex-M3的MCU关系
  3. DANN-经典论文概念及源码梳理
  4. 创建属于自己的全景图简明教程——ptgui+720
  5. Sony Xperia SP M35H刷MIUI
  6. 中国建筑装饰装修——刘昊威设计作品:切尔西·扬画廊
  7. CommandArgument的一种用法
  8. StratoVirt 中的 PCI 设备热插拔实现
  9. CAD二次开发:用C#在AutoCAD中插入栅格图像
  10. 文件服务器文件删除记录查询,Windows server2008r2共享文件操作记录、查询服务器日志方法...