

[root@ywxtdb opt]# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz


[root@ywxtdb opt]# tar -xvf kafka_2.12-2.1.0.tgz 


[root@ywxtdb config]# vi /opt/kafka_2.12-2.1.0/config/server.properties 









[root@ywxtdb config]# vi /etc/profile

export KAFKA_HOME=/opt/kafka_2.12-2.1.0



[root@ywxtdb config]# kafka-server-start.sh ./server.properties 


[zk: localhost:2181(CONNECTED) 16] ls /
[kafka, zookeeper]




[root@ywxtdb ~]# kafka-topics.sh
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase
from one, then you should configure the number of parallel GC threads appropriately using-XX:ParallelGCThreads=N
Create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions,        replica assignment, and/or           configuration for the topic.
--config <String: name=value>            A topic configuration override for the topic being created or altered.The   following is a list of valid         configurations:                      cleanup.policy                        compression.type                      delete.retention.ms                   file.delete.delay.ms                  flush.messages                        flush.ms                              follower.replication.throttled.       replicas                             index.interval.bytes                  leader.replication.throttled.replicas max.message.bytes                     message.downconversion.enable         message.format.version                message.timestamp.difference.max.ms   message.timestamp.type                min.cleanable.dirty.ratio             min.compaction.lag.ms                 min.insync.replicas                   preallocate                           retention.bytes                       retention.ms                          segment.bytes                         segment.index.bytes                   segment.jitter.ms                     segment.ms                            unclean.leader.election.enable        See the Kafka documentation for full   details on the topic configs.
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be   removed for an existing topic (see   the list of configurations under the --config option).
--describe                               List details for the given topics.
--disable-rack-aware                     Disable rack aware replica assignment
--exclude-internal                       exclude internal topics when running   list or describe command. The        internal topics will be listed by    default
--force                                  Suppress console prompts
--help                                   Print usage information.
--if-exists                              if set when altering or deleting       topics, the action will only execute if the topic exists
--if-not-exists                          if set when creating topics, the       action will only execute if the      topic does not already exist
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic being created or altered (WARNING:   If partitions are increased for a    topic that has a key, the partition  logic or ordering of the messages    will be affected
--replica-assignment <String:            A list of manual partition-to-broker   broker_id_for_part1_replica1 :           assignments for the topic being      broker_id_for_part1_replica2 ,           created or altered.                  broker_id_for_part2_replica1 :                                                broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each        replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or       describe. Can also accept a regular  expression except for --create option
--topics-with-overrides                  if set when describing topics, only    show topics that have overridden     configs
--unavailable-partitions                 if set when describing topics, only    show partitions whose leader is not  available
--under-replicated-partitions            if set when describing topics, only    show under replicated partitions
--zookeeper <String: hosts>              REQUIRED: The connection string for    the zookeeper connection in the form host:port. Multiple hosts can be     given to allow fail-over.    


kafka-topics.sh 处理topic的脚本,他需要依赖zookeeper去创建topic,需要加上zk的配置。

--create 说明当前是创建topic

--topic 要创建的topic名称

--partitions 创建的分区数,主要看你节点数量来配置

--replication-factor 创建分区的附本数量

[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic bobby --partitions 2 --replication-factor 2
Created topic "bobby".


[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --list bobby


[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --describe --topic bobby
Topic:bobby PartitionCount:2    ReplicationFactor:2 Configs:Topic: bobby    Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2,0Topic: bobby    Partition: 1    Leader: 0   Replicas: 0,1   Isr: 0,1




[root@ywxtdb ~]# kafka-console-consumer.sh
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option                                   Description
------                                   -----------
--bootstrap-server <String: server to    REQUIRED: The server(s) to connect to. connect to>
--consumer-property <String:             A mechanism to pass user-defined       consumer_prop>                           properties in the form key=value to  the consumer.
--consumer.config <String: config file>  Consumer config properties file. Note  that [consumer-property] takes       precedence over this config.
--enable-systest-events                  Log lifecycle events of the consumer   in addition to logging consumed      messages. (This is specific for      system tests.)
--formatter <String: class>              The name of a class to use for         formatting kafka messages for        display. (default: kafka.tools.      DefaultMessageFormatter)
--from-beginning                         If the consumer does not already have  an established offset to consume     from, start with the earliest        message present in the log rather    than the latest message.
--group <String: consumer group id>      The consumer group id of the consumer.
--isolation-level <String>               Set to read_committed in order to      filter out transactional messages    which are not committed. Set to      read_uncommittedto read all          messages. (default: read_uncommitted)
--key-deserializer <String:                                                     deserializer for key>
--max-messages <Integer: num_messages>   The maximum number of messages to      consume before exiting. If not set,  consumption is continual.
--offset <String: consume offset>        The offset id to consume from (a non-  negative number), or 'earliest'      which means from beginning, or       'latest' which means from end        (default: latest)
--partition <Integer: partition>         The partition to consume from.         Consumption starts from the end of   the partition unless '--offset' is   specified.
--property <String: prop>                The properties to initialize the       message formatter. Default           properties include:                  print.timestamp=true|false            print.key=true|false                  print.value=true|false                key.separator=<key.separator>         line.separator=<line.separator>       key.deserializer=<key.deserializer>   value.deserializer=<value.            deserializer>                        Users can also pass in customized      properties for their formatter; more specifically, users can pass in      properties keyed with 'key.          deserializer.' and 'value.           deserializer.' prefixes to configure their deserializers.
--skip-message-on-error                  If there is an error when processing a message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms>       If specified, exit if no message is    available for consumption for the    specified interval.
--topic <String: topic>                  The topic id to consume on.
--value-deserializer <String:                                                   deserializer for values>
--whitelist <String: whitelist>          Whitelist of topics to include for     consumption.    



--topic topic的名称

--group 创建一个分组,带上分组名

[root@node01 bin]#  kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bobby --group demo


[root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list


[root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group demoTOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
bobby           0          16              16              0               consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 / consumer-1
bobby           1          25              25              0               consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 / consumer-1
[root@node01 kafka-logs]#




[root@ywxtdb ~]# kafka-console-producer.sh
Read data from standard input and publish it to Kafka.
Option                                   Description
------                                   -----------
--batch-size <Integer: size>             Number of messages to send in a single batch if they are not being sent     synchronously. (default: 200)
--broker-list <String: broker-list>      REQUIRED: The broker list string in    the form HOST1:PORT1,HOST2:PORT2.
--compression-codec [String:             The compression codec: either 'none',  compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.  If specified without value, then it  defaults to 'gzip'
--line-reader <String: reader_class>     The class name of the class to use for reading lines from standard in. By   default each line is read as a       separate message. (default: kafka.   tools.                               ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on       The max time that the producer will    send>                                    block for during a send request      (default: 60000)
--max-memory-bytes <Long: total memory   The total memory used by the producer  in bytes>                                to buffer records waiting to be sent to the server. (default: 33554432)
--max-partition-memory-bytes <Long:      The buffer size allocated for a        memory in bytes per partition>           partition. When records are received which are smaller than this size the producer will attempt to             optimistically group them together   until this size is reached.          (default: 16384)
--message-send-max-retries <Integer>     Brokers can fail receiving the message for multiple reasons, and being      unavailable transiently is just one  of them. This property specifies the number of retires before the         producer give up and drop this       message. (default: 3)
--metadata-expiry-ms <Long: metadata     The period of time in milliseconds     expiration interval>                     after which we force a refresh of    metadata even if we haven't seen any leadership changes. (default: 300000)
--producer-property <String:             A mechanism to pass user-defined       producer_prop>                           properties in the form key=value to  the producer.
--producer.config <String: config file>  Producer config properties file. Note  that [producer-property] takes       precedence over this config.
--property <String: prop>                A mechanism to pass user-defined       properties in the form key=value to  the message reader. This allows      custom configuration for a user-     defined message reader.
--request-required-acks <String:         The required acks of the producer      request required acks>                   requests (default: 1)
--request-timeout-ms <Integer: request   The ack timeout of the producer        timeout ms>                              requests. Value must be non-negative and non-zero (default: 1500)
--retry-backoff-ms <Integer>             Before each retry, the producer        refreshes the metadata of relevant   topics. Since leader election takes  a bit of time, this property         specifies the amount of time that    the producer waits before refreshing the metadata. (default: 100)
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.         (default: 102400)
--sync                                   If set message send requests to the    brokers are synchronously, one at a  time as they arrive.
--timeout <Integer: timeout_ms>          If set and the producer is running in  asynchronous mode, this gives the    maximum amount of time a message     will queue awaiting sufficient batch size. The value is given in ms.      (default: 1000)
--topic <String: topic>                  REQUIRED: The topic id to produce      messages to.        


--topic topic名称

--broker-list 指定kafka节点,关联当下的broker


[root@node01 bin]# kafka-console-producer.sh --topic bobby --broker-list node3:9092


[root@node01 bin]#  kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic bobby --group demo

一个生产者 , 一个消费者











