参考官网:http://kafka.apache.org/quickstart

一、下载Kafka

官网下载地址 http://kafka.apache.org/downloads

截至2019年7月8日 最新版本为 2.3.0 2.12为编译的scala版本 2.3.0为kafka版本

  • Scala 2.12  - kafka_2.12-2.3.0.tgz (asc, sha512)

    解压
    > tar -xzf kafka_2.12-2.3.0.tgz
    > cd kafka_2.12-2.3.0

二、启动服务

要先启动zookeeper kafka内置了一个 也可以不用

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

三、创建topic

replication-factor为1   partitions为1
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看topic
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

也可以不创建topic 设置自动创建 当publish的时候

四、发送消息

用command line client 进行测试 一行就是一条消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

五、消费者

command line consumer 可以接收消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

六、设置多broker集群

单broker没有意思 我们可以设置三个broker

首先为每个broker 复制配置文件

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

然后编辑

config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dirs=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dirs=/tmp/kafka-logs-2

broker.id是唯一的 cluster中每一个node的名字 我们在same machine上 所有要设置listeners和log.dirs 以防冲突

建一个topic 一个partitions 三个replication-factor

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
用describe看看都是什么情况
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
  • 有几个概念 :

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.

  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.

  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

刚才那个topic
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

发送 接收

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

试一下容错 fault-tolerance

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564看一下变化:Leader换了一个  因为1被干掉了
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
还是收到了消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

七、使用kafka import/export data

刚才都是console 的数据,其他的sources other systems呢 用Kafka Connect

弄一个数据
> echo -e "foo\nbar" > test.txt
启动  指定配置文件
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
验证一下
> more test.sink.txt
foo
bar
消费者端
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
可以继续写入
> echo Another line>> test.txt

八、使用Kafka Streams

http://kafka.apache.org/22/documentation/streams/quickstart

WordCountDemo

https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

代码片段

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",Consumed.with(stringSerde, stringSerde);KTable<String, Long> wordCounts = textLines// Split each text line, by whitespace, into words..flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))// Group the text words as message keys.groupBy((key, value) -> value)// Count the occurrences of each word (message key)..count()// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

建一个 Kafka producer 指定input topic output topic

> bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--replication-factor 1 \--partitions 1 \--topic streams-wordcount-output \--config cleanup.policy=compact
Created topic "streams-wordcount-output".

启动WordCount demo application

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

启动一个生产者写数据

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams

启动一个消费者接数据

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializerall     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
kafka   1

转载于:https://www.cnblogs.com/tree1123/p/11150927.html

Kafka学习(一)-------- Quickstart相关推荐

  1. [Big Data - Kafka] kafka学习笔记:知识点整理

    一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许 ...

  2. kafka学习笔记:知识点整理

    一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险. ...

  3. 大数据 -- kafka学习笔记:知识点整理(部分转载)

    一 为什么需要消息系统 1.解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多 ...

  4. Kafka学习-入门

    在上一篇kafka简介的基础之上,本篇主要介绍如何快速的运行kafka. 在进行如下配置前,首先要启动Zookeeper. 配置单机kafka 1.进入kafka解压目录 2.启动kafka bin\ ...

  5. Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover

    1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...

  6. Kafka学习之四 Kafka常用命令

    2019独角兽企业重金招聘Python工程师标准>>> Kafka学习之四 Kafka常用命令 Kafka常用命令 以下是kafka常用命令行总结: 1.查看topic的详细信息 . ...

  7. J1angの小白式kafka学习总结(1)

    kafka学习阶段性总结(1) kafka概述 基本概念:什么是kafka 消息队列的两种模式 为什么要使用kafka kafka的基本架构组成 kafka架构 kafka组成 集群配置 jar包下载 ...

  8. kafka学习武林秘籍

    kafka学习教程分享 链接:https://pan.baidu.com/s/13nqDiX5yrXb6-X2OJddsoQ  提取码:njqf

  9. Kafka学习【1】

    Kafka学习[1] Kafka的用途有哪些?使用场景如何?### 消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦.冗余存储.流量削峰.缓冲.异步通信.扩展性.可恢复性等功 ...

最新文章

  1. Open source robotics toolkits: use virtual arenas to test your robotics algorithms
  2. 金山办公或将陷入低增长,再去哪里找客户?
  3. 半导体二极管和晶体三极管
  4. P1199 三国游戏
  5. OSPF协议介绍及配置 (下)
  6. 抛物面天线的工作原理是什么?
  7. [EOJ439] 强制在线
  8. 深入浅出 Java Concurrency (6): 锁机制 part 1[转]
  9. java怎么实现邮箱机制_JAVAMAIL实现与详细机制
  10. 疯狂python讲义pdf_如何自学成Python大神?这份学习宝典火爆 IT 圈!
  11. 命令行开发、编译、打包Android应用程序
  12. Thread 1: signal SIGABRT
  13. [转载] 的士速递3
  14. nohup命令简单的使用
  15. SpringBoot+zk+dubbo架构实践(二):SpringBoot 集成 zookeeper
  16. 设计模式(八)桥接模式
  17. S3cCTF-gyy-Writeup
  18. 无法获得下列许可solidworks standard无效的(不一致的)使用许可号码(-8,544,0) solidworks2020 (亲测有效)
  19. matlab norm函数_机器人手眼标定MATLAB及C++实现(二十九)
  20. 如何快速在手机上修改证件照底色

热门文章

  1. 算法题:在一个字符串中找到只出现一次的字符。如输入abaccdeeff,则输出bd。
  2. 管理任务执行-如何制定有效的机制
  3. volatile、static
  4. Linux查看系统信息的一些命令及查看已安装软件包的命令
  5. eclipse和jdk的版本问题,比如printf()出错
  6. B.一个人的旅行 (dijkstra算法)
  7. Android中SQLiteDatabase操作【附源码】
  8. 使用 Python ElementTree 生成 xml
  9. 在MFC,Win32程序中向控制台(Console)窗口输出调试信息
  10. python做算法题优势_Python语言在科学算法中的优势