运行环境:mac os

1. 启动zookeeper

./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

2. 启动kafka服务

./bin/kafka-server-start /usr/local/etc/kafka/server.properties

3. 查看topic列表

./bin/kafka-topics --list --zookeeper localhost:2181

4. 创建topic

./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic hezhixiong--create:指定创建topic动作
--zookeeper:指定kafka连接zk的地址
--replication-factor:指定每个分区的复制因子个数,默认1个
--partitions:指定当前创建的kafka分区数量,默认为1个
--topic:指定新建topic的名称

5. 查看topic的描述信息

./bin/kafka-topics --describe --zookeeper localhost:2181 --topic hezhixiong

6. 修改topic信息

./bin/kafka-topics --zookeeper localhost:2181 --alter --topic hezhixiong --partitions 4
<!-- Kafka分区数量只允许增加,不允许减少 -->

7. 删除topic

./bin/kafka-topics --zookeeper localhost:2181 --delete --topic hezhixiong
Topic hezhixiong is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.默认情况下Kafka的Topic是没法直接删除的,而是标记删除而已。如果需要彻底删除topic,有以下两种方式:
1. 通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除
2. 配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除

8. 生产者

./bin/kafka-console-producer --broker-list localhost:9092 --topic hezhixiong

9. 消费者

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic hezhixiong --from-beginning

 10. broker集群

  首先为每个broker创建一个配置文件:

cd /usr/local/etc/kafka
cp server.properties server_1.properties
cp server.properties server_2.propertiesvi server_1.propertiesbroker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs/one
vi server_2.propertiesbroker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs/two

  broker.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。

  我们已经运行了zookeeper和刚才的一个kafka节点,所有我们只需要在启动2个新的kafka节点。

./bin/kafka-server-start /usr/local/etc/kafka/server_1.properties
./bin/kafka-server-start /usr/local/etc/kafka/server_2.properties

  现在,我们创建一个新topic,把备份设置为3

./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic hzx-new./bin/kafka-topics --describe --zookeeper localhost:2181 --topic hzx-new  查看topic得到输出如下:Topic:hzx-new PartitionCount:1 ReplicationFactor:3 Configs:       Topic: hzx-new Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

从上面的输出,我们可以得出结论,第一行给出所有分区的摘要,显示主题名称,分区数量和我们已经选择的复制因子。在第二行中,每个节点将是分区的随机选择部分的领导者。

在我的例子中,我看到的第一个broker(with broker.id 2)是领导者。 然后Replicas:2,1,0 意味着所有代理复制主题。最后 Isr 是 in-sync 副本的集合。 那么,这是副本的子集,当前活着并被领导者赶上。

"leader":该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
"replicas":备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
"isr":“同步备份”的节点列表,也就是活着的节点并且正在同步leader。

  启动生产者和消费者,在生产者端发送消息,在消费者端能看到消息的消息

./bin/kafka-console-producer --broker-list localhost:9092 --topic hzx-new   (生产者)
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic hzx-new --from-beginning  (消费者)

  体验并测试kafka集群的容错,目前Leader是broker2,所以结束调broker2,并查看topic:hzx-new 的信息,输出如下

Topic:hzx-new    PartitionCount:1    ReplicationFactor:3    Configs:Topic: hzx-new    Partition: 0    Leader: 1    Replicas: 2,1,0    Isr: 1,0可以看出 broker2已经不再同步备份集合里了,备份节点之一的broker1成为了新的leader了。

参考资料:

http://orchome.com/6

转载于:https://www.cnblogs.com/hezhixiong/p/10078994.html

kafka operation相关推荐

  1. 最简单的kafka接入方式(kafka配置),kafka整合Spring

    文章目录 一.前言. 二.主要流程. 三.各个细节,步骤 一.前言. 本文主要介绍了Springboot项目整合kafka的最简单的方式. 二.主要流程. 1.引入Maven 2.增加消费者和生产者配 ...

  2. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  3. 《kafka中文手册》- 构架设计(一)

    4. DESIGN 设计 4.1 Motivation 目的 4.2 Persistence 存储 Don't fear the filesystem! 不要对文件系统感到恐惧 Constant Ti ...

  4. kafka Detailed Replication Design V3

    参考,https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 Major chan ...

  5. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  6. Kafka深度解析(如何在producer中指定partition)(转)

    原文链接:Kafka深度解析 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能 ...

  7. How Kafka’s Storage Internals Work

    In this post I'm going to help you understand how Kafka stores its data. I've found understanding th ...

  8. Kafka源码深度解析-序列3 -Producer -Java NIO

    原文地址 在上一篇我们分析了Metadata的更新机制,其中涉及到一个问题,就是Sender如何跟服务器通信,也就是网络层.同很多Java项目一样,Kafka client的网络层也是用的Java N ...

  9. Apache Kafka:大数据的实时处理时代

    在过去几年,对于 Apache Kafka 的使用范畴已经远不仅是分布式的消息系统:我们可以将每一次用户点击,每一个数据库更改,每一条日志的生成,都转化成实时的结构化数据流,更早的存储和分析它们,并从 ...

最新文章

  1. 数据库存在即更新的高并发处理 - 转
  2. 多个tomcat部署的端口问题
  3. 本年度读书计划-看几本必须好好琢磨的书
  4. thinkphp __hash__
  5. VC 消息钩子编程
  6. 日常生活开支记账明细_你的理财小管家!简单明了!皮面理财记账本明细流水支出笔记本!...
  7. 现在的BAT,就是曾经的外企
  8. 大数据应用型产品设计方法及行业案例介绍(附110页PPT)
  9. PyScripter could not load a Python engine解决方案
  10. 靶机渗透练习84-The Planets:Earth
  11. [Linux]生产者消费者模型(基于BlockQueue的生产者消费者模型 | 基于环形队列的生产者消费者模型 | 信号量 )
  12. 配置samba服务器@手把手
  13. JavaWeb基础核心技术-佟刚-专题视频课程
  14. 远程开启或唤醒休眠的计算机
  15. 树莓派实验室python人脸识别_使用树莓派进行简易人脸识别
  16. Excel 2007版的常用功能(1):Excel基本操作
  17. 企业数据打通有什么好处?不同行业怎么解决数据打通难题?
  18. 《高性能MySQL 第四版》正式上市
  19. 《大数据技术原理与应用》(第八章Hadoop 课后答案)
  20. Oracle10g Bug 4612267 补丁安装备忘录

热门文章

  1. java setdaemon_Java ThreadGroup setDaemon()方法
  2. 做系统ghost步骤图解_用好这工具,小孩都能会重装系统!
  3. 项目实战-药品采购系统-day01
  4. java EE : tomacat 基础
  5. Codeforces #471
  6. 如何查看QQ和微信查看授权过那些应用?
  7. java 调用linux 脚本并获取返回值
  8. ConcurrentHashMap底层原理?
  9. centos 宝塔面板 mongodb 设置用户账号密码登录
  10. wmode解决flash透明及层深问题