1.在清华镜像站下载kafka_2.10-0.10.0.0.tgz 和 zookeeper-3.4.10.tar.gz

分别解压到/usr/local目录下

2.进入zookeeper目录,在conf目录下将zoo_sample.cfg文件拷贝,并更名为zoo.cfg

参考 https://my.oschina.net/phoebus789/blog/730787

zoo.cfg文件的内容

# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/common/zookeeper/zookeeperdir/zookeeper-data
dataLogDir=/home/common/zookeeper/zookeeperdir/logs
# the port at which the clients will connect
clientPort=2181
server.1=10.10.100.10:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

新建下面这两个目录

/home/common/zookeeper/zookeeperdir/zookeeper-data
/home/common/zookeeper/zookeeperdir/logs

在zookeeper-data目录下新建一个myid文件,内容为1,代表这个服务器的编号是1,具体参考上面网址中的内容

最后在/etc/profile中添加环境变量,并source

export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=${ZOOKEEPER_HOME}/bin:$PATH

现在zookeeper就安装好了,现在启动zookeeper

bin/zkServer.sh start

查看状态

bin/zkServer.sh status

启动客户端脚本

bin/zkCli.sh -server localhost:2181

停止zookeeper

bin/zkServer.sh stop

1.现在安装kafka,同样是解压之后就安装好了

参考 http://www.jianshu.com/p/efc8b9dbd3bd

2.进入kafka目录下

kafka需要使用Zookeeper,首先需要启动Zookeeper服务,上面的操作就已经启动了Zookeeper服务

如果没有的话,可以使用kafka自带的脚本启动一个简单的单一节点Zookeeper实例

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 Kafka服务

bin/kafka-server-start.sh config/server.properties

停止 Kafka服务

bin/kafka-server-stop.sh config/server.properties

3.创建一个主题

首先创建一个名为test的topic,只使用单个分区和一个复本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在可以运行list topic命令看到我们的主题

bin/kafka-topics.sh --list --zookeeper localhost:2181

4.发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

如果要批量导入文件数据到kafka,参考:2.1 本地环境下kafka批量导入数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic < file_pat

如果要模拟实时数据到打入kafka的情况,可以写一个shell脚本

#!/usr/bin/env bashcat XXXX.log | while read line
dosleep 0.1echo "${line}"echo "${line}" | /home/lintong/software/apache/kafka_2.11-0.10.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicA
done

5.启动一个消费者,消费者会接收到消息

旧版消费者

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 2>/dev/null

新版消费者

bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2>/dev/null

6.查看指定的topic的offset信息

对于结尾是ZK的消费者,其消费者的信息是存储在Zookeeper中的

对于结尾是KF的消费者,其消费者的信息是存在在Kafka的broker中的

都可以使用下面的命令进行查看

bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group xxx --topic xxx

结果

bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group --topic xxx
[2018-09-03 20:34:57,595] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group xxx              0   509             0               -509            none

或者

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx

结果

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-consumer-group
[2018-09-03 20:45:02,967] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group xxx              0   509             509             0               none

lag是负数的原因是 topic中的消息数量过期(超过kafka默认的7天后被删除了),变成了0,所以Lag=logSize减去Offset,所以就变成了负数

7.删除一个topic

需要在 conf/server.properties 文件中设置

# For delete topic
delete.topic.enable=true

否则在执行了以下删除命令后,再 list 查看所有的topic,还是会看到该topic

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicB

再到 配置文件 中的kafka数据存储地址去删除物理数据了,我的地址为

/tmp/kafka-logs

最后需要到zk里删除kafka的元数据

./bin/zkCli.sh #进入zk shell
ls /brokers/topics
rmr /brokers/topics/topicA

参考:kafka 手动删除topic的数据

8.查看某个group的信息

新版

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx

结果

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_id
GROUP          TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET   LAG                 OWNER
group_id      xxx              0          509             509             0               consumer-1_/127.0.0.1

如果这时候消费者进程关闭了之后,使用上面的命令和下面的-list命令将不会查出这个group_id,但是当消费者进程重新开启后,这个group_id又能重新查到,且消费的offset不会丢失

旧版

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group xxx --describe

9.查看consumer group的列表

ZK的消费者可以使用下面命令查看,比如上面的例子中的 test-consumer-group

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

KF的消费者可以使用下面命令查看,比如上面的例子中的 console-consumer-xxx ,但是只会查看到类似于 KMOffsetCache-lintong-B250M-DS3H 的结果,这是由于这种消费者的信息是存放在 __consumer_offsets 中

对于如何查看存储于 __consumer_offsets 中的新版消费者的信息,可以参考huxihx的博文: Kafka 如何读取offset topic内容 (__consumer_offsets)

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

10.在zk中删除一个consumer group

rmr /consumers/test-consumer-group

11.查看topic的offset的最小值

参考:重置kafka的offset

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -2
xxxx:0:0

12.查看topic的offset的最大值

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -1

Ubuntu下安装和使用zookeeper和kafka相关推荐

  1. Ubuntu 下安装thttpd Web服务器

    不知道大家是不是真的需要用appache这么复杂的功能这么强大的web server,其实有很多时候使用webserver也只是一种远程共享访问的方式.这里,Ubuntu repository的提供了 ...

  2. Ubuntu下安装Apache+PHP+Mysql

    Ubuntu下安装 apache+php+mysql文本服务器! ------------------------------------------------------------------- ...

  3. ubuntu下安装ftp服务器

    ubuntu下安装ftp服务器 Ftp服务器是使用以vsftp为例. 1. 安装     $sudo aptitude install vsftpd     $ps aux | grep 'ftp' ...

  4. linux pureftp mysql_在Ubuntu下安装apache2+php5+mysql5+pureftp+ftp

    在Ubuntu下安装apache2+php5+mysql5+pureftp+ftp 一.安装Ubuntu7.04 Desktop版 二.ubuntu Linux下手工安装mysql5 1.下载mysq ...

  5. ubuntu下安装windows虚拟机

    ubuntu下安装win7虚拟机总结 ubuntu16.04 虚拟机 安装win7/win10 http://WIN10:你不能访问此共享文件夹,解决方法 VirtualBox虚拟机剪贴板共享

  6. Linux :debian(ubuntu)下安装和使用haskell

    文章目录 Linux :debian(ubuntu)下安装haskell 安装 使用 Linux :debian(ubuntu)下安装haskell 安装 直接使用apt进行安装: sudo apt- ...

  7. Linux: debian/ubuntu下安装和使用Java 11

    Linux: debian/ubuntu下安装和使用Java 11 只需6行命令: su - echo "deb http://ppa.launchpad.net/linuxuprising ...

  8. Linux: debian/ubuntu下安装和使用Java 8

    Linux: debian/ubuntu下安装和使用Java 8 7行命令解决问题: su - echo "deb http://ppa.launchpad.net/webupd8team/ ...

  9. Linux: debian/ubuntu下安装Neo4j

    文章目录 Linux: debian/ubuntu下安装Neo4j Linux: debian/ubuntu下安装Neo4j Neo4j的官方仓库地址:neo4j/neo4j: Graphs for ...

最新文章

  1. 乌镇现场·帅初:公有链的未来——链上校验,链下计算
  2. vue用户行为收集_vue 实现移动端键盘搜索事件监听
  3. mesh threejs 属性_threeJS创建mesh,创建平面,设置mesh的平移,旋转、缩放、自传、透明度、拉伸...
  4. Kube Controller Manager 源码分析
  5. Http状态行和状态码介绍
  6. 2020蓝桥杯省内模拟赛C++B组1-8(详细解析,看完就会)
  7. H.264标准(二)FLV封装格式详解
  8. js laypage mysql_laypage 物理分页与逻辑分页实例
  9. node mysql菜鸟教程_Node.js 路由
  10. scanf函数读取缓冲区数据的问题
  11. 开学数码必买清单推荐,2022年开学季最值得入手的好物
  12. TCP SYN洪水 (SYN Flood) 攻击原理与实现
  13. 数据科学的重要支柱——统计学的最佳入门书籍
  14. 【新知实验室】TRTC
  15. 工业图像处理实战--九点标定法
  16. PHP获取十月九号星期几,php日期获取星期几
  17. h3c端口聚合实现服务器增加带宽,H3C动态链路聚合对接服务器双网卡
  18. python结束任务之后如何关闭_如何终止python程序运行
  19. 《资本论》 商品的拜物教性质及其秘密
  20. 低温烹饪过程中真空压力的自动控制

热门文章

  1. mysql查询各科前3_MySQL 查询各科前三的数据
  2. Java中print、printf、println的区别 详解
  3. 网络无线AP信号走场测试软件(Ekahau 使用说明)
  4. js把word转html在线预览,js实现word转换为html
  5. openstack的云主机相关命令
  6. Zabbix的模板管理与配置
  7. php5.3中ZendGuardLoader与wincache冲突问题的解决方法
  8. Linux使用storcli工具查看服务器硬盘和raid组信息
  9. 移动端H5混合开发设置复盘与总结
  10. 即使会溢出,也能得到正确的结果