原创地址: https://www.cnblogs.com/lilixin/p/5775877.html

Kafka安装与使用

下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz

安装以及启动kafka

步骤1:安装kafka

$ tar -xzf kafka_2.10-0.8.1.1.tgz
$ cd kafka_2.10-0.8.1.1.tgz 

步骤2:配置server.properties

配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)

进入kafka安装工程根目录编辑

vim config/server.properties  

修改属性 zookeeper.connect=ip:2181,ip2: 2181

步骤3:server.properties配置说明

kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

kafka server端config/server.properties参数说明和解释如下:

(参考配置说明地址:http://blog.csdn.net/lizhitao/article/details/25667831)

#实际使用案例 这里211上面的kafka 配置文件
broker.id=1
port=9092
host.name=192.168.1.211
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
zookeeper.connection.timeout.ms=1000000
#kafka实际使用案例 210服务器kafka配置
broker.id=2
port=9092
host.name=192.168.1.210
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
zookeeper.connection.timeout.ms=1000000

步骤4: 启动kafka

(先启动zookeeper $:  bin/zkServer.sh start config/zookeeper.properties &)

cd kafka-0.8.1

$ bin/kafka-server-start.sh -daemon config/server.properties &

(实验时,需要启动至少两个broker   bin/kafka-server-start.sh -daemon config/server-1.properties &)

步骤5:创建topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

步骤6:验证topic是否创建成功

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

localhost为zookeeper地址

topic描述:

bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test
//启动报错Unrecognized VM option '+UseCompressedOops'
查看 bin/kafka-run-class.sh
找到
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; thenKAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
去掉-XX:+UseCompressedOops
启动报错 Could not reserve enough space for object heap
原因及解决办法:
查看kafka-server-start.sh配置文件,发现有heap设置信息:KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"    更改这里的内存为256(因为测试机内存总共才1G ,所以报错)

步骤7:发送消息

发送一些消息验证,在console模式下,启动producer

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

(此处localhost改为本机ip,否则报错,I don’t  know why)

消息案例:

{"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"会员"} 

步骤8:启动一个consumer

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning




删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181




配置kafka集群模式,需要由多个broker组成

和单机环境一样,只是需要修改下broker 的配置文件而已。

1、将单机版的kafka 目录复制到其他几台电脑上。

2、修改每台电脑上的kafka 目录下的server.properties 文件。

broker.id=1//这个参数在kafka 的broker 集群中必须唯一,且为正整数。

3、启动每台电脑上的kafka 即可。

本机配置伪分布式

首先为每个节点编写配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

在拷贝出的新文件中添加以下参数:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

现在启动另外两个节点:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

创建一个拥有3个副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

运行“"describe topics”命令知道每个节点的信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.





搭建Kafka开发环境

1 在pom.xml中引入kafka依赖jar包
<!-- kafka配置 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>${kafka.version}</version>
<exclusions>
<!-- 实际应用中单独引入下面的jar包,不使用kafka带的 -->
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>zkclient</artifactId>
<groupId>com.101tec</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

2.属性文件 kafka.properties
#zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181
#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
zookeeper.connect=192.168.1.179:2181
metadata.broker.list=192.168.1.179:9092
#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092#zookeeper.connect.timeout=15000
#zookeeper.session.timeout.ms=15000
#zookeeper.sync.time.ms=20000
#auto.commit.interval.ms=20000
#auto.offset.reset=smallest
#serializer.class=kafka.serializer.StringEncoder
#producer.type=async
#queue.buffering.max.ms=6000group.id=llx
kafka.sellstat.topics=llx

在spring配置文件中引入此properties文件
<!-- 这个是加载给spring 用的.-->
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:kafka.properties</value>
</list>
</property>
</bean>
<!-- 这个是用来在代码中注入用的.-->
<bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<value>classpath:kafka.properties</value>
</list>
</property>
</bean>

3.定义收信人
<!-- 定义收信人 receiver -->
<bean id="testReceiver" class="cn.vko.index.Receiver"><constructor-arg index="0" value="${zookeeper.connect}" /><constructor-arg index="1" value="${group.id}" /><constructor-arg index="2" value="${kafka.sellstat.topics}"/><constructor-arg index="3" ref="testConsumer" />
</bean>

4. spring中定义一个消息处理器(需要实现vkoConsumer)
<!-- 定义消息处理器 -->
<bean id="testConsumer" class="cn.vko.index.TestConsumer" ></bean>

5.消息生产者项目引入producer

<bean id="topProducer" class="top.lilixin.TopProducer"><constructor-arg index="0" value="kafka.server1:9092,kafka.server2:9092" /></bean>

kafka的安装和使用(详细版)相关推荐

  1. IDEA如何配置 Gradle 及 Gradle 安装过程(详细版)

    IDEA如何配置 Gradle(详细版) 一.安装 Gradle 1.下载 Gradle 安装包 官网下载链接:https://gradle.org/releases/ 2.下载后解压 3.文件夹如图 ...

  2. IDEA如何配置 Maven 及 Maven 安装过程(详细版)

    IDEA如何配置 Maven(详细版) 一.安装Maven 1.下载Maven 安装包 官网:https://maven.apache.org/ 2.点击Download ===>选择 apac ...

  3. 2008r装mysql_RMySQL安装及操作(详细版)

    系统配置: Windows XP 32位 R 3.1.1 一.安装RMySQL: 1,安装RTools并配置环境变量: 我的电脑--属性--高级--环境变量,在系统变量一栏中选择PATH,点击编辑,在 ...

  4. SPSS免费安装教程(详细版)

      前几天,刷到一位博主写了一篇关于SPSS安装的文章,讲的很详细,保姆级别的!!!我把自己安装过程中遇到的问题以及评论区里其他小伙伴遇到的问题进行了总结,希望能帮助到友友们!!!这位博主的文章链接我 ...

  5. python安装教程——最新详细版

    文章目录 环境 安装包下载 选择下载版本 其它版本介绍 通过下载好的离线安装包,安装python 自定义安装特征 选项解释 高级选项 等待安装 安装成功 验证 送python编程手册 环境 windo ...

  6. 使用U盘安装CentOS7(详细版)

    准备工作: 8G以上U盘 UltraISO虚拟光驱(试用版即可)最新版 下载地址:添加链接描述 点击下载试用 下载Centos7包 官方下载地址:点击下载 Centos中文站:http://www.c ...

  7. electron安装【纯详细版】

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一.首先node.js的下载 二.node.js安装 1.检查node.js和npm是否安装成功 2.读入数据 总结 ...

  8. VMware虚拟机安装Linux系统(详细版)

    所谓虚拟机(virtual machine),就是通过软件技术虚拟出来的一台计算机,它在使用层面和真实的计算机并没有什么区别. 常见的虚拟机软件有 VMware Workstation(简称 VMwa ...

  9. 安装superset(详细版)

    1.安装minanaconda (1)将minanaconda拖拽到虚拟机中,然后使用命令进行安装 [tom@bigdata1 software]$ rz -E rz waiting to recei ...

  10. linux aarch64 java环境安装(图文详细版)

    文章目录 前言 一.java环境 1.确认服务器版本 2.把包传到服务器上,然后进行解压 3.修改文件名 4.配置环境变量 5.查看java环境 前言 安装过程很简单,请认真确认版本号及服务,直接进入 ...

最新文章

  1. 创建data_微软推出 Microsoft.Data.SqlClient,替代 System.Data.SqlClient
  2. Asprise OCR SDK 15.3试用版破解
  3. 僵尸进程和孤儿进程 转载
  4. yum php fpm5.6,CentOS 7 yum 安装php5.6
  5. WPF01(xaml)
  6. 为产业数字化赋能!施耐德电气数字产业示范园落户北京
  7. uni-app h5 分享好友与朋友圈等功能
  8. 软件测试的标准操作流程
  9. 阿辉DirectX 11学习笔记一
  10. css格式规范stylelint在vscode使用教程(ctrl+s自动智能修复)
  11. android 设内网固定ip,如何给手机设置一个固定的内网ip
  12. 万年历单片机C语言报告,C语言单片机万年历系统设报告优秀文档.doc
  13. 高颜值,类似Fliqlo的翻页时钟-BdTab组件
  14. Codeforces Round #717 (Div. 2) 1516 A. Tit for Tat(模拟)
  15. 工作中使用到的单词(软件开发)_20210317_备份
  16. 使用redis实现页面缓存
  17. 基于PHP+MySQL小区快递自助取件系统的设计与实现
  18. oracle12c 异机恢复,Oracle 11g RMAN 异机恢复
  19. 京东高效转链API接口—商品优惠券二合一转链—京东联盟优惠券如何转链?
  20. 用两个队列实现栈——C语言实现

热门文章

  1. 全国地址数据库下载 mysql版更新共668389条记录 精确到村
  2. OBS第三方推流直播教程
  3. 漂亮图表html,这9个精美的HTML5图表应用,让你的网页专业精确
  4. --go_out: protoc-gen-go: plugins are not supported问题处理
  5. 超过100项改进 100tv聚好看全新发布
  6. 小程序为什么有的方法要写在methods,有的可直接写在page下
  7. 机器学习-54-RL-06-Actor-Critic(强化学习-A2C,A3C,Pathwise Derivative Policy Gradient)
  8. i7 10750h和i7 10870h 的区别
  9. 计算机网络教学方式探讨论文,高职计算机网络安全课程教学改革探讨
  10. android录音mediaRecord\AudioRecord\openSL\PCM tinyalsa总结和优缺点