kafka安装和相关命令操作——修改中
目录
一、kafka开发环境安装
二、kafka相关操作
1、使用命令创建topic
2、列出所有的topic
3、删除topic:second_topic
4、查看topic的相关信息
三、kafka-console-producer-生产消息
四、kafka-console-consumer-消费消息
一、kafka开发环境安装
kafka的安装需要先安装docker,如果之前没有安装过docker,可以看我的这篇文章进行安装
Docker镜像、容器和数据管理
为了方便一次性安装kafka的开发环境,我们使用github开源的镜像
GitHub - lensesio/fast-data-dev: Kafka Docker for development. Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors
安装命令,可以直接用,不用改动任何东西
docker run --rm -it \
-p 2181:2181 -p 9092:9092 -p 3030:3030 \
-p 8081:8081 -p 8082:8082 -p 8083:8083 \
-e ADV_HOST=127.0.0.1 \
landoop/fast-data-dev
安装过程有点长,文件比较多,请耐心等待,如下是安装的服务列表
2021-07-19 07:32:06,764 INFO spawned: 'broker' with pid 173
2021-07-19 07:32:06,766 INFO spawned: 'caddy' with pid 174
2021-07-19 07:32:06,768 INFO spawned: 'connect-distributed' with pid 175
2021-07-19 07:32:06,772 INFO spawned: 'logs-to-kafka' with pid 177
2021-07-19 07:32:06,780 INFO spawned: 'rest-proxy' with pid 178
2021-07-19 07:32:06,787 INFO spawned: 'sample-data' with pid 179
2021-07-19 07:32:06,793 INFO spawned: 'schema-registry' with pid 180
2021-07-19 07:32:06,805 INFO spawned: 'smoke-tests' with pid 187
2021-07-19 07:32:06,809 INFO spawned: 'zookeeper' with pid 189
安装完后,访问http://192.168.43.36:3030/可以看到如下界面,此ip是我linux的地址
当然你也可以一个一个的进行安装,下边是不错的安装文档
Docker搭建kafka和zookeeper:http://blog.70ci.com/post/736.html
二、kafka相关操作
首先,我们在所有服务都启动的情况下,进入landoop/fast-data-dev容器
注意:此处我们需要重新开一个命令窗口,不要关闭当前服务的启动窗口
docker run --rm -it --net=host landoop/fast-data-dev bash
1、使用命令创建topic
使用kafka-topics创建第一个topic:first_topic
kafka-topics \
--zookeeper 127.0.0.1:2181 \
--create --topic first_topic \
--partitions 3 \
--replication-factor 1
创建topic,必须指定注册中心,topic的名字,创建多少个partitions,以及备份replication-factor的数量(实际中replication-factor应该为3,但我目前只在一台服务器上安装了kafka,只有一个broker,因此只能设置为1,因为replication-facto数量不能大于brokers的数量)
如果重复创建first_topic,服务会输出错误信息,从而保证topic的唯一性
2、列出所有的topic
// 同样需要带上注册中心的地址
kafka-topics --zookeeper 127.0.0.1:2181 --list
3、删除topic:second_topic
kafka-topics --zookeeper 127.0.0.1:2181 --delete --topic second_topic
4、查看topic的相关信息
kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic
可以看到以下相关信息,如Partition和replicationFactor的数量等
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic
Topic: first_topic PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: first_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: first_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: first_topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
root@fast-data-dev / $
三、kafka-console-producer-生产消息
使用kafka-console-producer生产消息,需要指定broker和topic,命令如下
kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic
输入如上命令回车后,会出现输入提示,我们输入如下消息
root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic
>hi
>hello
>kafka
>very
>good
>producer
>
然后进入kafka可视化页面,我们看到输入的message,被随机分配在了三个partition中
有时,我们需要把消息进行顺序的存储,而不是随机的分布在所有partition中,此时,我们可以设置一个key,那么相同key的消息都会被顺序的存储在一个partiton中
如下,我们加了两个属性,使用key,以及指定key和value的分隔符
kafka-console-producer --broker-list 127.0.0.1:9092 \
--topic first_topic \
--property "parse.key=true" \
--property "key.separator=:"
我们来看下输入:
root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 \
--topic first_topic \
--property "parse.key=true" \
--property "key.separator=:"
>3:today
>3:brocker-ket
>3:kafka-key
>3:partition-key
>
然后进入kafka可视化页面,我们看到输入的message,被顺序的存储在相同partiton中
四、kafka-console-consumer-消费消息
使用kafka-console-consumer消费消息,需要指定broker和topic,命令如下
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic
输入如上命令时,如果此时我们没有生产message,那么我们将看不到任何效果,因为此时的消费者默认读取从当前启动时间后生产的message,因此启动时间之前的message都将不被消费
我们此时创建一个生产者,并输入一些message,来看看效果:
如果想要消费消息队列中所有的message(包括历史消息),需要加上参数 --from-beginning
kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic \
--from-beginning
如果需要指定消费的partition,可以加上--partition参数
root@fast-data-dev / $ kafka-console-consumer \
> --bootstrap-server 127.0.0.1:9092 \
> --topic first_topic \
> --from-beginning \
> --partition 2
world
today
brocker-ket
kafka-key
partition-key
上边,我们指定消费partition2中的消息,一共消费了5条,我们通过UI界面,也可以看到partition2中确实有5条消息
ConsumerGruop
使用ConsumerGruop,把多个消费者划分到一个组中,组的名字通过group.id进行指定
kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic \
--from-beginning \
--consumer-property group.id=my-group-name
通过group.id划分到组的consumer,每消费完一个message,都会提交一个committed offset,当组内的下一个consumer读数据时,会从committed offset处开始读取。因此,分组后的consumer,并不会重复消费组内中任意一个consumer已经消费完成的消息。
附:kafka中message存储的json形式
[ {"topic": "first_topic","key": "3","value": "world","partition": 2,"offset": 0},{"topic": "first_topic","key": "3","value": "today","partition": 2,"offset": 1},{"topic": "first_topic","key": "3","value": "brocker-ket","partition": 2,"offset": 2}
]
kafka安装和相关命令操作——修改中相关推荐
- 2 172.02 php,kafka安装、相关命令以及PHP使用
1.安装JAVA #下载安装包 https://www.oracle.com/technetwork/java/javase/downloads/index.html tar -xzvf jdk-8u ...
- Ubuntu安装NextCloud相关命令
Ubuntu安装NextCloud相关命令: 1.一键安装nextcloud 2.访问nextcloud 3.重启nextcloud 4.更改端口 5.本地电脑启用端口 1.一键安装nextcloud ...
- Redis 非关系型数据库 安装以及相关命令
目录 一.缓存简介 1.1 系统缓存 1.2 缓存保存位置及分层结构 1.2.1 DNS缓存 1.2.2 应用层缓存 1.2.3 数据层缓存 分布式缓存服务 数据库 1.2.4 硬件缓存 二. 关系数 ...
- docker安装+配置镜像+命令操作+数据卷+网络管理+DockerFile+镜像服务+项目部署+[高级使用]
文章目录 Docker基础 一.Docker介绍 1. 什么是虚拟化 2. 什么是Docker 3. 容器与虚拟机比较 4. Docker优势 5. Docker架构 [1]镜像服务(`Image`) ...
- Linux+Docker+腾讯云/阿里云服务器 安装MySQL相关命令整理
注:主要是收集整理一些常用的命令 主要参考:狂神说docker以及其他的一些命令收集.主要是因为平常安装的mysql外部无法访问 MYSQL的创建用户,授权用户,删除用户,查看用户_ZJE-CSDN博 ...
- nvm 下载安装及相关命令
一.nvm是什么 nvm 官网:https://nvm.uihtm.com/ nvm全英文也叫node.js version management,是一个nodejs 的版本管理工具.nvm和n都是n ...
- Redis-Linux中安装Redis、命令操作Redis
目录 一.Redis简介 NoSQL与SQL的区别 二.Linux上安装redis 上传并解压redis.gz 进入 redis的解压目录,执行命令 make 编辑 修改redis为守护进程 们测试 ...
- xshell中mysql命令大全_Xshell 相关命令操作
1.选择文件夹 # cd 2.列表显示竖排模式 # ll 3.列表显示横排模式 # ls 4.进入数据库 # mysql -uroot - 密码 !my进入数据库 因为上面已经输了mysql的那 ...
- Linux系统中计划任务及其相关命令操作(at和crontab)
Linux系统中计划任务分为两种类型,一种是一次性的计划任务--at,一种是长期性计划任务--crontab,接下来分别讲述它们的作用及其使用场景. 目录 at:一次性计划任务 crontab:长期性 ...
- kafka 安装及相关操作
下载 wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz 安装 tar -zxvf kaf ...
最新文章
- 阿里云弹性计算-图形工作站(公测)发布
- ASP.NET基础教程-DataView对象的属性、方法、枚举成员
- 最新Ceph L版与openstack Pike对接
- DOS循环:bat/批处理for命令详解之一 (史上虽详尽的总结和说明~~)
- bzoj3522 Hotel
- Android.text.TextUtils类
- HQuery中html结构及文本的修改
- 【算法竞赛学习】金融风控之贷款违约预测-数据分析
- 【渝粤教育】国家开放大学2019年春季 2712园艺基础 参考试题
- mysql 屏蔽索引_mysql强制索引和禁止某个索引
- 尝鲜之在Github上搭建Octopress博客
- android 从网络加载图片并显示
- Python中的分组函数(groupby、itertools)
- led灯条维修_康佳液晶电视LED42F2200N灯条问题导致不开机故障修复
- 互联网理财系统开发-满足投资人随投随取、多种产品、多种选择、风险更小、收益更多
- 主流平面设计软件推荐,实用工具推荐必坑指南!
- 如何在靠tiktok在三个月内快速赚到20w的?实现人生逆转
- 腾讯技术官编写的594页MySQL优化手册,竟意外冲上GitHub调优热榜
- 高数(上) 第七章:微分方程
- 致物联网网关工程师:别浪费钱了来看看高度集成物联网无线AP模块应用的WiFi模组——模小块的成长记
热门文章
- 送给佳佳同学的礼物:测试流程及并行测试介绍
- Java代码复用的三种常用方式:继承、组合和代理
- 三年内人人有FIL,FIL 世界零撸板块引发全球流量狂潮!
- “大数据分析”和“数据分析”的区别与联系
- c语言输出成绩与排名,C语言算成绩 要求输完两个分数后 同时输出两个分数换算出来的成绩...
- DNS污染攻击详细教程
- 上任第十年,库克功与过
- python基础-数据类型与基本操作
- android实现推箱子代码,android开发--推箱子小游戏(二)
- 微信小程序对商户而言到底有什么用?