kafka官方文档学习笔记2--QuickStart
下载kafka
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
解压安装包
> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0/bin
查看bin目录下主要几个脚本功能如下:
脚本 | 功能 |
---|---|
kafka-server-start.sh | 启动kafka服务器; |
kafka-server-stop.sh | 停止kafka服务器; |
kafka-topics.sh | topic管理; |
kafka-console-producer.sh | 基于命令行的生产者; |
kafka-console-consumer.sh | 基于命令行的消费者; |
kafka-run-class.sh | 运行java类的脚本,由kafka-server-start.sh和kafka-server-stop.sh、kafka-topics.sh等脚本调用; |
zookeeper-server-start.sh | 启动kafka自带的zookeeper服务器; |
zookeeper-server-stop.sh | 停止kafka自带的zookeeper服务器; |
zookeeper-shell.sh | 在命令行连接zookeeper的客户端工具; |
connect-standalone.sh | 在命令行启动单点的connector; |
connect-distributed.sh | 在命令行启动基于集群connector; |
注:kafka的安装包除了包括kafka自身的工具以外,也包括了一系列简易的zookeeper工具,能够通过zookeeper-server-start.sh脚本启动简易的单点zookeeper实例,供kafka使用。但一般仅限于测试环境使用;
config目录下存放的是kafka服务、自带zk服务以及基于命令行的生产者、消费者工具对应的配置文件,常用如下:
脚本 | 功能 |
---|---|
server.properties | kafka实例的配置文件,配置kafka最重要的配置文件; |
log4j.properties | kafka日志配置; |
zookeeper.properties | 自带zk的配置文件; |
producer.properties | 基于命令行的生产者工具配置文件;(测试用) |
consumer.properties | 基于命令行的消费者工具配置文件;(测试用) |
connect-standalone.properties | 自带单点connector的配置文件,存放connector的序列化方式、监听broker的地址端口等通用配置;(测试用) |
connect-file-source.properties | 配置文件读取connector,用于逐行读取文件,导入入topic;(测试用) |
connect-file-sink.properties | 配置文件写入connector,用于将topic中的数据导出到topic中;(测试用) |
启动zk服务,默认端口:2181
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-01-16 20:22:52,327] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
启动kafka服务,默认端口:9092
> bin/kafka-server-start.sh config/server.properties
[2018-01-16 20:23:52,758] INFO KafkaConfig values:
...
经过如上两步,我们就启动了一个简易的kafka集群(具有1个zookeeper实例和1个kafka实例的集群)
查看zookeeper中存放的kafka信息
> bin/zookeeper-shell.sh localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabledWATCHER::WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
ls /brokers
[ids, topics, seqid]
ls /brokers/topics
[test]
ls /brokers/ids
[0]
"ls /"命令列出了zk根节点下的所有元素,可以看到kafka在zk中存放了集群(cluster)、实例(brokers)、消费者(consumers)等信息;zookeeper服务作为kafka的元数据管理服务,因而每次对kafka服务操作都需要指定zookeeper服务的地址,以便于获取kafka的元数据,连接到正确的kafka集群;
创建topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
创建一个名为test的topic,包含1个复本,1个分区;
查看集群中的所有topic
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
启动生产者,并写入测试消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> Hello World1
> I'm a programer
启动消费者,接收消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Hello World1
I'm a programer
可以看到生产者写入的消息,都能够立刻被消费者接收并打印出来。需要注意的是,生产者和消费者通过topic这个概念来建立联系,只有消费者指定与生产者相同的topic,才能够消费其产生的消费;
删除topic
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
建立多个kafka实例的集群
拷贝配置文件,修改实例ID、日志目录、监听端口:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
修改配置项如下:
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2
启动实例:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
新建topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
新建一个名为my-replicated-topic的topic,有3个副本和1个分区;
查看topic状态描述
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
topic上有几个partition,就会展示几行记录;字段含义如下:
- leader:标识当前partition的leader节点是那个,通过broker.id标识;一个partition只有一个leader节点,负责接收和处理读写请求;
- replicas:标识当前partition的所有副本所在的节点,无论节点是否是leader节点,也无论节点是否"存活",通过broker.id标识;
- isr:标识存活且与leader节点同步的节点,即可用的副本(包括leader借点);通过broker.id标识;
查看最初创建的test状态描述:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
可以看到,因为test只有1个副本、1个partition,所以只能分布在一个实例上;
模拟leader切换
对于包含多个副本的topic而言,当一个副本所在的实例不可用时,将会从其它可用副本中选择一个作为leader;
在集群节点都正常的情况下,查看topic的状态:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
关掉broker.id=0的实例,再次查看,发现leader节点已经切换,同时isr中不包含"不可用"节点0:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
重新启动broker.id=0的实例,再次查看,发现isr中包括了节点0,说明可用。
使用kafka connect导入/导出数据
kafka connect是kafka与外部系统交互的工具,通过运行不同的connector,实现与不同外部系统的交互,包括数据的导入/导出。如下模拟从文件导入数据到kafka,以及从kafka导出数据到文件;
- 首先,创建文件,写入测试数据:
> cd kafka_2.11-1.0.0
> echo "Hello World" > test.txt
注:一定是在kafka根目录中创建名为test.txt的文件,否则不会读取;
2.启动2个单点的connector,这两个connector都是kafka自带的,一个用于读取文件写入topic,另一个用于将topic中数据导出到文件;
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2018-01-17 10:37:32,568] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)
connect-console-source.properties文件内容:
name=local-console-source
# connector入口
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# connector关联的topic
topic=connect-test
connect-console-sink.properties文件内容:
name=local-console-sink
# connector入口
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
# connector关联的topic
topics=connect-test
在kafka根目录可以看到生成了名为test.sink.txt的文件,其中的内容即为test.txt中的内容,持续向test.txt中append内容,test.sink.txt中的内容也随之append;
注:因为同步过程是监听文件的增量变化,如果改变test.txt中旧有内容,则旧数据不发生变化,覆盖同一行旧数据,貌似会产生一个空行;
整个同步过程是:
test.txt -> FileStreamSourceConnector -> connect-test(topic) -> FileStreamSinkConnector -> test.sink.txt
由于是通过topic存放过往数据,因此在topic中也可以看到相应的数据:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"Hello World"}
{"schema":{"type":"string","optional":false},"payload":""}
{"schema":{"type":"string","optional":false},"payload":"Hello World1"}
{"schema":{"type":"string","optional":false},"payload":"Hello World2"}
使用kafka stream处理数据
参考官方文档:http://kafka.apache.org/10/documentation/streams/quickstart
kafka生态
kafka周边包含很多组件,参看wiki:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem
kafka官方文档学习笔记2--QuickStart相关推荐
- kafka官方文档学习笔记3--配置简述
Kafka使用key-value键值对格式的配置,这些配置即可以在进程启动时,根据指定的properties文件加载,也可以通过编程的方式,在程序中动态指定:根据集群中角色的不同分为6种配置: bro ...
- ZooKeeper官方文档学习笔记03-程序员指南03
我的每一篇这种正经文章,都是我努力克制玩心的成果,我可太难了,和自己做斗争. ZooKeeper官方文档学习笔记04-程序员指南03 绑定 Java绑定 客户端配置参数 C绑定 陷阱: 常见问题及故障 ...
- Open3D官方文档学习笔记
Open3D官方文档学习笔记 第一部分--点云 1 可视化点云 2 体素降采样 3 顶点法线评估 4 访问顶点法线 补充:Numpy在Open3D中的应用 5 裁剪点云 补充1:获取点云坐标 补充2: ...
- ZooKeeper官方文档学习笔记01-zookeeper概述
纠结了很久,我决定用官方文档学习 ZooKeeper概述 学习文档 学习计划 ZooKeeper:分布式应用程序的分布式协调服务 设计目标 数据模型和分层名称空间 节点和短命节点 有条件的更新和监视 ...
- vue.js 2.0 官方文档学习笔记 —— 01. vue 介绍
这是我的vue.js 2.0的学习笔记,采取了将官方文档中的代码集中到一个文件的形式.目的是保存下来,方便自己查阅. !官方文档:https://cn.vuejs.org/v2/guide/ 01. ...
- Android 开发之ViewPage官方文档学习笔记
2019独角兽企业重金招聘Python工程师标准>>> 以下为官网的官方文档,我将从翻译该文档开始学习. ViewPager extends ViewGroup java.lang. ...
- xarray官方文档 学习笔记(序章)
个人需要开了这个坑 不定时更新 希望能和大家共同学习和交流 工作中难免有不足和错误,希望大家多多批评指正 xarray官方文档首页:http://xarray.pydata.org/en/stable ...
- OpenCV-Python官方文档学习笔记(上)
整理自OpenCV-Python官方文档 一. OpenCV-Python Tutorials 1 安装及验证 2 图片读写,展示 3 视频读写,展示 4 绘图功能(绘制几何形状:线.圆.椭圆.矩形. ...
- Django企业开发读书笔记(及官方文档学习笔记) 老男孩2019Go语言视频学习
目标:深入理解Django以及web开发深入知识,掌握Gin ,Go micro 框架 ,Kafka ,Zookeeper 为kubernetes Istio 做微服务,服务网格做铺垫 实现Pytho ...
最新文章
- 经验 | 秋招总结(拼多多,腾讯,百度,字节)
- python实现记事本的查找功能_Python + PyQt4 实现记事本功能
- Java Maximum Subarray debug
- How to use external classes and PHP files in Laravel Controller?
- lol1月8日服务器维护,LOL1月16日更新维护到几点 8.1版本更新内容
- ES6中 Map 数据结构
- 最近安装了Netscape Navigator 9.0英文版,真好用!
- Git for windows 配置
- Windows安装curl及基本命令
- 论文推荐到计算机科学期刊,计算机类论文参考文献推荐 计算机类核心期刊参考文献哪里找...
- 计算机的垃圾站是在硬盘,电脑垃圾回收站在哪里
- 2022年黄石市高企申报奖励补贴以及认定奖励补贴汇总!
- 【报错解决】Parameter ‘XXXX‘ not found. Available parameters are [XXXX1, XXXX2]
- 权重衰减weight_decay参数从入门到精通
- php 获取到当前ip,获取当前IP地址,跳转到对应城市网站。
- win8服务器防火墙配置文件,Win8自带防火墙吗,Win8防火墙在哪里(适用于Win8.1)?
- 武汉科技大学计算机学院廖光忠,武汉科技大学考研研究生导师简介-段宁
- 数学建模代码速成~赛前一个月~matlab~代码模板~吐血总结~三大模型代码(预测模型、优化模型、评价模型)
- 天津二级计算机考试地点,2016年9月天津计算机一级二级三级四级考点地址电话...
- B2C电子商务网站是如何打造成功之城
热门文章
- android studio 抓log,Android studio保存logcat日志到本地的操作
- 史上最大“云办公”实验开始,你参加了吗?
- 首个镜子分割网络问世,大连理工、鹏城实验室、香港城大出品 | ICCV 2019
- iPhone遭遇最强烈的黑客攻击:密码、位置、联系人,敏感数据都泄露,谷歌发现的...
- Linux进程描述符task_struct结构体详解--Linux进程的管理与调度(一)
- 12.21 php-fpm的pool 12.22 php-fpm慢执行日志 12.23 open_basedir 12.24 php-fpm进程管理
- 发布本人汉化最后一个CommunityServer的版本,blog名称也改为 Asp.net源码交流中心...
- 如何去掉桌面标签背影
- Hyperledger Fabric 1.0 从零开始(五)——运行测试e2e
- escape()、encodeURI()、encodeURIComponent() difference