Kafka-安装和使用
一、下载安装包
1、选择所需版本下载
[root@ywxtdb opt]# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
2、解压安装包
[root@ywxtdb opt]# tar -xvf kafka_2.12-2.1.0.tgz
二、修改配置
[root@ywxtdb config]# vi /opt/kafka_2.12-2.1.0/config/server.properties
主要修改:
broker.id=0
listeners=PLAINTEXT://192.168.1.128:9092
log.dirs=/var/kafka-logs
zookeeper.connect=localhost:2181
集群:
zookeeper.connect=192.168.91.128:2181,192.168.91.129:2181,129.168.91.130:2181/kafka
三、配置环境变量
[root@ywxtdb config]# vi /etc/profile
export KAFKA_HOME=/opt/kafka_2.12-2.1.0
export PATH=$PATH:$KAFKA_HOME/bin
四、启动kafka
启动kafka前,记得先启动zookeeper。
[root@ywxtdb config]# kafka-server-start.sh ./server.properties
启动正常后,可以看到zookeeper下面创建了一个kafka节点
[zk: localhost:2181(CONNECTED) 16] ls /
[kafka, zookeeper]
五、创建topics
1、查看关于topics的相关脚本参数
其中REQUIRED是必填的参数
[root@ywxtdb ~]# kafka-topics.sh
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase
from one, then you should configure the number of parallel GC threads appropriately using-XX:ParallelGCThreads=N
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions, replica assignment, and/or configuration for the topic.
--config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs.
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--exclude-internal exclude internal topics when running list or describe command. The internal topics will be listed by default
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting topics, the action will only execute if the topic exists
--if-not-exists if set when creating topics, the action will only execute if the topic does not already exist
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
--replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created.
--topic <String: topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option
--topics-with-overrides if set when describing topics, only show topics that have overridden configs
--unavailable-partitions if set when describing topics, only show partitions whose leader is not available
--under-replicated-partitions if set when describing topics, only show under replicated partitions
--zookeeper <String: hosts> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.
2、执行创建命令
kafka-topics.sh 处理topic的脚本,他需要依赖zookeeper去创建topic,需要加上zk的配置。
--create 说明当前是创建topic
--topic 要创建的topic名称
--partitions 创建的分区数,主要看你节点数量来配置
--replication-factor 创建分区的附本数量
[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic bobby --partitions 2 --replication-factor 2
Created topic "bobby".
3、查询创建的topic
[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --list bobby
bobby
4、查看描述信息
[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --describe --topic bobby
Topic:bobby PartitionCount:2 ReplicationFactor:2 Configs:Topic: bobby Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: bobby Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
六、启动consumer
1、查看下关于consumer处理脚本的参数
其中REQUIRED是必填的参数
[root@ywxtdb ~]# kafka-console-consumer.sh
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to. connect to>
--consumer-property <String: A mechanism to pass user-defined consumer_prop> properties in the form key=value to the consumer.
--consumer.config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)
--formatter <String: class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter)
--from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
--group <String: consumer group id> The consumer group id of the consumer.
--isolation-level <String> Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommittedto read all messages. (default: read_uncommitted)
--key-deserializer <String: deserializer for key>
--max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual.
--offset <String: consume offset> The offset id to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest)
--partition <Integer: partition> The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified.
--property <String: prop> The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value. deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key. deserializer.' and 'value. deserializer.' prefixes to configure their deserializers.
--skip-message-on-error If there is an error when processing a message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval.
--topic <String: topic> The topic id to consume on.
--value-deserializer <String: deserializer for values>
--whitelist <String: whitelist> Whitelist of topics to include for consumption.
2、执行启动脚本
客户端无需依赖zookeeper,使用bootstrap-server启动,
--topic topic的名称
--group 创建一个分组,带上分组名
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bobby --group demo
3、检查group
[root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
demo
demo1
查看指定group情况
[root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group demoTOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
bobby 0 16 16 0 consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 /192.168.91.128 consumer-1
bobby 1 25 25 0 consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 /192.168.91.128 consumer-1
[root@node01 kafka-logs]#
七、启动producer
1、查看关于producer处理的脚本参数
其中REQUIRED是必填的参数
[root@ywxtdb ~]# kafka-console-producer.sh
Read data from standard input and publish it to Kafka.
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. (default: 200)
--broker-list <String: broker-list> REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.
--compression-codec [String: The compression codec: either 'none', compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'. If specified without value, then it defaults to 'gzip'
--line-reader <String: reader_class> The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka. tools. ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on The max time that the producer will send> block for during a send request (default: 60000)
--max-memory-bytes <Long: total memory The total memory used by the producer in bytes> to buffer records waiting to be sent to the server. (default: 33554432)
--max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. (default: 16384)
--message-send-max-retries <Integer> Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message. (default: 3)
--metadata-expiry-ms <Long: metadata The period of time in milliseconds expiration interval> after which we force a refresh of metadata even if we haven't seen any leadership changes. (default: 300000)
--producer-property <String: A mechanism to pass user-defined producer_prop> properties in the form key=value to the producer.
--producer.config <String: config file> Producer config properties file. Note that [producer-property] takes precedence over this config.
--property <String: prop> A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user- defined message reader.
--request-required-acks <String: The required acks of the producer request required acks> requests (default: 1)
--request-timeout-ms <Integer: request The ack timeout of the producer timeout ms> requests. Value must be non-negative and non-zero (default: 1500)
--retry-backoff-ms <Integer> Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. (default: 100)
--socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 102400)
--sync If set message send requests to the brokers are synchronously, one at a time as they arrive.
--timeout <Integer: timeout_ms> If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. (default: 1000)
--topic <String: topic> REQUIRED: The topic id to produce messages to.
2、执行命令
--topic topic名称
--broker-list 指定kafka节点,关联当下的broker
当producer输入hello后
[root@node01 bin]# kafka-console-producer.sh --topic bobby --broker-list node3:9092
>111
可以看到consumer端已经消费到111
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic bobby --group demo
111
一个生产者 , 一个消费者
一个生产者,两组消费者,demo组有两个消费者,demo1组有一个消费者
生产者
demo组1号消费者
demo组2号消费者
demo1组消费者
结论:
1、不同组可以消费相同的数据,一个组内不能重复消费数据。
2、不同的分区不能重复消费数据,一个分区不能同时给一个组内的多个消费者消费,
多个分区可以给一个消费者消费。
Kafka-安装和使用相关推荐
- 学习笔记Kafka(四)—— Kafka安装配置(3)—— Kafka多代理配置及常用操作
一.环境准备 Centos7, 1 CPU , 2G Memory ,20G Disk , Virtual System Hosts : node110, node111 , node112 全部配置 ...
- 学习笔记Kafka(三)—— Kafka安装配置(2)—— Kafka单代理及常用操作
一.Linux环境准备 Centos7, 1 CPU , 2G Memory ,20G Disk , Virtual System JDK Zookeeper 二.Kafka 安装 下载Kafka安装 ...
- linux上卸载kafka,kafka安装在linux上的安装
kafka安装 第一关 java的安装 捞得嘛,不谈 第二关 zookeeper的安装及配置 1. 直接打开Apach zookeeper进行下载 Tips: source 是源文件,需要编译后才能继 ...
- Kafka 安装和搭建 (一)
Kafka 安装和测试 博客分类: middleware kafkazookeeperscalajvm 1. 简介 kafka (官网地址:http://kafka.apache.org)是一款分布 ...
- java kafka搭建,Apache Kafka 安装步骤
概览 安装过程总共分为 3 大块,第一 Java 环境不必多说,第二 Zookeeper 安装,第三 Kafka 安装. 概念了解 Kafka 有几个重要的概念需要先了解一下 名词 解释 broker ...
- Linux环境Kafka安装配置
Linux环境Kafka安装配置 1. 认识Kafa (1) Kafa介绍 开源消息系统 官网:kafka.apache.org/ 用途:在流式计算中,Kafka一般用来缓存数据,Storm通过消费K ...
- Kafka安装及部署
阅读目录 一.环境配置 二.操作过程 Kafka介绍 安装及部署 回到顶部 一.环境配置 操作系统:Cent OS 7 Kafka版本:0.9.0.0 Kafka官网下载:请点击 JDK版本:1.7. ...
- kafka php 安装配置,kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展_PHP教程
kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展 话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用. 实话说,如 ...
- 【Kafka】Kafka安装部署
Kafka版本: 1.0.1 前期环境准备 准备好相应的服务器 本文中服务器共三台: node01,node02,node03 服务器已安装zookeeper集群 Kafka安装步骤 下载安装包 下载 ...
- 一、kafka安装下载与kafka初步应用
kafka安装下载 一.Kafka的下载 1.下载地址 Kafka的Windows下的运行 1.启动Zookeeper 2.启动Kafka Kafka的Linux下的运行 Kafka运行占用的端口 二 ...
最新文章
- 吴恩达:企业如何实现人工智能转型?
- Python多线程threading用法
- Element UI格式化日期
- Ant Design Pro 2.0/umijs站点配置到非站点根目录下处理
- 线程让出实验【RT-Thread学习笔记 4】
- Communications link failure,The last packet successfully received from the serve
- 反射 字段_java核心基础之反射
- 安卓+4.0.4+java模拟器_Android 4.0.4模拟器安装完全教程(图文)
- mysqlbinlog日志查看
- 如何战胜软件开发的复杂性?
- 阿发你好java_191122_01 纯前端JS实现的文字验证码
- 你真的弄清楚FocalLoss的细节了吗?
- 笔记(4)——Analyzing Communities and Their Evolutions in Dynamic Social Networks
- dnfdpl服务器维护了,梦想开始的地方丨山东沙排女将王鑫鑫奥运首秀止步八强 怕影响训练父母没来济南探望过...
- 001简谱的调号、拍号和情绪
- 盯住Z世代增量,汽车之家818车晚透露哪些营销信号?
- 安装python3缺少db4-devel
- 搭建企业知识库的意义
- 行业观察 | 从粗放走向精细,地产审计风控数字化改革与创新
- 与其这样挥霍时间,倒不如折腾折腾,尝试发展副业
热门文章
- 校招总结--建议全文背诵
- 2019中南大学计算机考研分数线,中南大学2019年硕士研究生招生复试基本分数线...
- 配置Dot1q终结子接口实现跨设备VLAN间通信示例
- [CF1504E]Travelling Salesman Problem
- 1971旗舰cpu intel_这就是近年来Intel最良心CPU!我彻底服了
- (附源码)SSM医院人事及科室病区管理JAVA计算机毕业设计项目
- ArcGIS Engine开发教程之图层符号化
- 【LOJ#6198】—谢特(后缀数组+01Trie)
- TLD文件自定义标签
- Canvas 图片加载