下载kafka

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

解压安装包

> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0/bin

查看bin目录下主要几个脚本功能如下:

脚本 功能
kafka-server-start.sh 启动kafka服务器;
kafka-server-stop.sh 停止kafka服务器;
kafka-topics.sh topic管理;
kafka-console-producer.sh 基于命令行的生产者;
kafka-console-consumer.sh 基于命令行的消费者;
kafka-run-class.sh 运行java类的脚本,由kafka-server-start.sh和kafka-server-stop.sh、kafka-topics.sh等脚本调用;
zookeeper-server-start.sh 启动kafka自带的zookeeper服务器;
zookeeper-server-stop.sh 停止kafka自带的zookeeper服务器;
zookeeper-shell.sh 在命令行连接zookeeper的客户端工具;
connect-standalone.sh 在命令行启动单点的connector;
connect-distributed.sh 在命令行启动基于集群connector;

注:kafka的安装包除了包括kafka自身的工具以外,也包括了一系列简易的zookeeper工具,能够通过zookeeper-server-start.sh脚本启动简易的单点zookeeper实例,供kafka使用。但一般仅限于测试环境使用;

config目录下存放的是kafka服务、自带zk服务以及基于命令行的生产者、消费者工具对应的配置文件,常用如下:

脚本 功能
server.properties kafka实例的配置文件,配置kafka最重要的配置文件;
log4j.properties kafka日志配置;
zookeeper.properties 自带zk的配置文件;
producer.properties 基于命令行的生产者工具配置文件;(测试用)
consumer.properties 基于命令行的消费者工具配置文件;(测试用)
connect-standalone.properties 自带单点connector的配置文件,存放connector的序列化方式、监听broker的地址端口等通用配置;(测试用)
connect-file-source.properties 配置文件读取connector,用于逐行读取文件,导入入topic;(测试用)
connect-file-sink.properties 配置文件写入connector,用于将topic中的数据导出到topic中;(测试用)

启动zk服务,默认端口:2181

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-01-16 20:22:52,327] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

启动kafka服务,默认端口:9092

> bin/kafka-server-start.sh config/server.properties
[2018-01-16 20:23:52,758] INFO KafkaConfig values:
...

经过如上两步,我们就启动了一个简易的kafka集群(具有1个zookeeper实例和1个kafka实例的集群)

查看zookeeper中存放的kafka信息

> bin/zookeeper-shell.sh localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabledWATCHER::WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
ls /brokers
[ids, topics, seqid]
ls /brokers/topics
[test]
ls /brokers/ids
[0]

"ls /"命令列出了zk根节点下的所有元素,可以看到kafka在zk中存放了集群(cluster)、实例(brokers)、消费者(consumers)等信息;zookeeper服务作为kafka的元数据管理服务,因而每次对kafka服务操作都需要指定zookeeper服务的地址,以便于获取kafka的元数据,连接到正确的kafka集群;

创建topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

创建一个名为test的topic,包含1个复本,1个分区;

查看集群中的所有topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

启动生产者,并写入测试消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> Hello World1
> I'm a programer

启动消费者,接收消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Hello World1
I'm a programer

可以看到生产者写入的消息,都能够立刻被消费者接收并打印出来。需要注意的是,生产者和消费者通过topic这个概念来建立联系,只有消费者指定与生产者相同的topic,才能够消费其产生的消费;

删除topic

> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

建立多个kafka实例的集群

拷贝配置文件,修改实例ID、日志目录、监听端口:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

修改配置项如下:

config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2

启动实例:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

新建topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

新建一个名为my-replicated-topic的topic,有3个副本和1个分区;

查看topic状态描述

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:Topic: my-replicated-topic    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2

topic上有几个partition,就会展示几行记录;字段含义如下:

  • leader:标识当前partition的leader节点是那个,通过broker.id标识;一个partition只有一个leader节点,负责接收和处理读写请求;
  • replicas:标识当前partition的所有副本所在的节点,无论节点是否是leader节点,也无论节点是否"存活",通过broker.id标识;
  • isr:标识存活且与leader节点同步的节点,即可用的副本(包括leader借点);通过broker.id标识;

查看最初创建的test状态描述:

>  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

可以看到,因为test只有1个副本、1个partition,所以只能分布在一个实例上;

模拟leader切换

对于包含多个副本的topic而言,当一个副本所在的实例不可用时,将会从其它可用副本中选择一个作为leader;
在集群节点都正常的情况下,查看topic的状态:

>  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:Topic: my-replicated-topic    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2

关掉broker.id=0的实例,再次查看,发现leader节点已经切换,同时isr中不包含"不可用"节点0:

>  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 0,1,2    Isr: 1,2

重新启动broker.id=0的实例,再次查看,发现isr中包括了节点0,说明可用。

使用kafka connect导入/导出数据

kafka connect是kafka与外部系统交互的工具,通过运行不同的connector,实现与不同外部系统的交互,包括数据的导入/导出。如下模拟从文件导入数据到kafka,以及从kafka导出数据到文件;

  1. 首先,创建文件,写入测试数据:
> cd kafka_2.11-1.0.0
> echo "Hello World" > test.txt

注:一定是在kafka根目录中创建名为test.txt的文件,否则不会读取;

2.启动2个单点的connector,这两个connector都是kafka自带的,一个用于读取文件写入topic,另一个用于将topic中数据导出到文件;

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2018-01-17 10:37:32,568] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)

connect-console-source.properties文件内容:

name=local-console-source
# connector入口
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# connector关联的topic
topic=connect-test

connect-console-sink.properties文件内容:

name=local-console-sink
# connector入口
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
# connector关联的topic
topics=connect-test

在kafka根目录可以看到生成了名为test.sink.txt的文件,其中的内容即为test.txt中的内容,持续向test.txt中append内容,test.sink.txt中的内容也随之append;

注:因为同步过程是监听文件的增量变化,如果改变test.txt中旧有内容,则旧数据不发生变化,覆盖同一行旧数据,貌似会产生一个空行;

整个同步过程是:

test.txt ->   FileStreamSourceConnector -> connect-test(topic) -> FileStreamSinkConnector -> test.sink.txt

由于是通过topic存放过往数据,因此在topic中也可以看到相应的数据:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"Hello World"}
{"schema":{"type":"string","optional":false},"payload":""}
{"schema":{"type":"string","optional":false},"payload":"Hello World1"}
{"schema":{"type":"string","optional":false},"payload":"Hello World2"}

使用kafka stream处理数据

参考官方文档:http://kafka.apache.org/10/documentation/streams/quickstart

kafka生态

kafka周边包含很多组件,参看wiki:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

kafka官方文档学习笔记2--QuickStart相关推荐

  1. kafka官方文档学习笔记3--配置简述

    Kafka使用key-value键值对格式的配置,这些配置即可以在进程启动时,根据指定的properties文件加载,也可以通过编程的方式,在程序中动态指定:根据集群中角色的不同分为6种配置: bro ...

  2. ZooKeeper官方文档学习笔记03-程序员指南03

    我的每一篇这种正经文章,都是我努力克制玩心的成果,我可太难了,和自己做斗争. ZooKeeper官方文档学习笔记04-程序员指南03 绑定 Java绑定 客户端配置参数 C绑定 陷阱: 常见问题及故障 ...

  3. Open3D官方文档学习笔记

    Open3D官方文档学习笔记 第一部分--点云 1 可视化点云 2 体素降采样 3 顶点法线评估 4 访问顶点法线 补充:Numpy在Open3D中的应用 5 裁剪点云 补充1:获取点云坐标 补充2: ...

  4. ZooKeeper官方文档学习笔记01-zookeeper概述

    纠结了很久,我决定用官方文档学习 ZooKeeper概述 学习文档 学习计划 ZooKeeper:分布式应用程序的分布式协调服务 设计目标 数据模型和分层名称空间 节点和短命节点 有条件的更新和监视 ...

  5. vue.js 2.0 官方文档学习笔记 —— 01. vue 介绍

    这是我的vue.js 2.0的学习笔记,采取了将官方文档中的代码集中到一个文件的形式.目的是保存下来,方便自己查阅. !官方文档:https://cn.vuejs.org/v2/guide/ 01. ...

  6. Android 开发之ViewPage官方文档学习笔记

    2019独角兽企业重金招聘Python工程师标准>>> 以下为官网的官方文档,我将从翻译该文档开始学习. ViewPager extends ViewGroup java.lang. ...

  7. xarray官方文档 学习笔记(序章)

    个人需要开了这个坑 不定时更新 希望能和大家共同学习和交流 工作中难免有不足和错误,希望大家多多批评指正 xarray官方文档首页:http://xarray.pydata.org/en/stable ...

  8. OpenCV-Python官方文档学习笔记(上)

    整理自OpenCV-Python官方文档 一. OpenCV-Python Tutorials 1 安装及验证 2 图片读写,展示 3 视频读写,展示 4 绘图功能(绘制几何形状:线.圆.椭圆.矩形. ...

  9. Django企业开发读书笔记(及官方文档学习笔记) 老男孩2019Go语言视频学习

    目标:深入理解Django以及web开发深入知识,掌握Gin ,Go micro 框架 ,Kafka ,Zookeeper 为kubernetes Istio 做微服务,服务网格做铺垫 实现Pytho ...

最新文章

  1. 经验 | 秋招总结(拼多多,腾讯,百度,字节)
  2. python实现记事本的查找功能_Python + PyQt4 实现记事本功能
  3. Java Maximum Subarray debug
  4. How to use external classes and PHP files in Laravel Controller?
  5. lol1月8日服务器维护,LOL1月16日更新维护到几点 8.1版本更新内容
  6. ES6中 Map 数据结构
  7. 最近安装了Netscape Navigator 9.0英文版,真好用!
  8. Git for windows 配置
  9. Windows安装curl及基本命令
  10. 论文推荐到计算机科学期刊,计算机类论文参考文献推荐 计算机类核心期刊参考文献哪里找...
  11. 计算机的垃圾站是在硬盘,电脑垃圾回收站在哪里
  12. 2022年黄石市高企申报奖励补贴以及认定奖励补贴汇总!
  13. 【报错解决】Parameter ‘XXXX‘ not found. Available parameters are [XXXX1, XXXX2]
  14. 权重衰减weight_decay参数从入门到精通
  15. php 获取到当前ip,获取当前IP地址,跳转到对应城市网站。
  16. win8服务器防火墙配置文件,Win8自带防火墙吗,Win8防火墙在哪里(适用于Win8.1)?
  17. 武汉科技大学计算机学院廖光忠,武汉科技大学考研研究生导师简介-段宁
  18. 数学建模代码速成~赛前一个月~matlab~代码模板~吐血总结~三大模型代码(预测模型、优化模型、评价模型)
  19. 天津二级计算机考试地点,2016年9月天津计算机一级二级三级四级考点地址电话...
  20. B2C电子商务网站是如何打造成功之城

热门文章

  1. android studio 抓log,Android studio保存logcat日志到本地的操作
  2. 史上最大“云办公”实验开始,你参加了吗?
  3. 首个镜子分割网络问世,大连理工、鹏城实验室、香港城大出品 | ICCV 2019
  4. iPhone遭遇最强烈的黑客攻击:密码、位置、联系人,敏感数据都泄露,谷歌发现的...
  5. Linux进程描述符task_struct结构体详解--Linux进程的管理与调度(一)
  6. 12.21 php-fpm的pool 12.22 php-fpm慢执行日志 12.23 open_basedir 12.24 php-fpm进程管理
  7. 发布本人汉化最后一个CommunityServer的版本,blog名称也改为 Asp.net源码交流中心...
  8. 如何去掉桌面标签背影
  9. Hyperledger Fabric 1.0 从零开始(五)——运行测试e2e
  10. escape()、encodeURI()、encodeURIComponent() difference