Kafka学习(一)-------- Quickstart
参考官网:http://kafka.apache.org/quickstart
一、下载Kafka
官网下载地址 http://kafka.apache.org/downloads
截至2019年7月8日 最新版本为 2.3.0 2.12为编译的scala版本 2.3.0为kafka版本
Scala 2.12 - kafka_2.12-2.3.0.tgz (asc, sha512)
解压 > tar -xzf kafka_2.12-2.3.0.tgz > cd kafka_2.12-2.3.0
二、启动服务
要先启动zookeeper kafka内置了一个 也可以不用
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
三、创建topic
replication-factor为1 partitions为1
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看topic
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test
也可以不创建topic 设置自动创建 当publish的时候
四、发送消息
用command line client 进行测试 一行就是一条消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
五、消费者
command line consumer 可以接收消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
六、设置多broker集群
单broker没有意思 我们可以设置三个broker
首先为每个broker 复制配置文件
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
然后编辑
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dirs=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dirs=/tmp/kafka-logs-2
broker.id是唯一的 cluster中每一个node的名字 我们在same machine上 所有要设置listeners和log.dirs 以防冲突
建一个topic 一个partitions 三个replication-factor
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
用describe看看都是什么情况
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
有几个概念 :
"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
刚才那个topic
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
发送 接收
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
试一下容错 fault-tolerance
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564看一下变化:Leader换了一个 因为1被干掉了
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
还是收到了消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
七、使用kafka import/export data
刚才都是console 的数据,其他的sources other systems呢 用Kafka Connect
弄一个数据
> echo -e "foo\nbar" > test.txt
启动 指定配置文件
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
验证一下
> more test.sink.txt
foo
bar
消费者端
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
可以继续写入
> echo Another line>> test.txt
八、使用Kafka Streams
http://kafka.apache.org/22/documentation/streams/quickstart
WordCountDemo
https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
代码片段
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",Consumed.with(stringSerde, stringSerde);KTable<String, Long> wordCounts = textLines// Split each text line, by whitespace, into words..flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))// Group the text words as message keys.groupBy((key, value) -> value)// Count the occurrences of each word (message key)..count()// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
建一个 Kafka producer 指定input topic output topic
> bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--replication-factor 1 \--partitions 1 \--topic streams-wordcount-output \--config cleanup.policy=compact
Created topic "streams-wordcount-output".
启动WordCount demo application
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
启动一个生产者写数据
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
启动一个消费者接数据
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializerall 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
kafka 1
转载于:https://www.cnblogs.com/tree1123/p/11150927.html
Kafka学习(一)-------- Quickstart相关推荐
- [Big Data - Kafka] kafka学习笔记:知识点整理
一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许 ...
- kafka学习笔记:知识点整理
一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险. ...
- 大数据 -- kafka学习笔记:知识点整理(部分转载)
一 为什么需要消息系统 1.解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多 ...
- Kafka学习-入门
在上一篇kafka简介的基础之上,本篇主要介绍如何快速的运行kafka. 在进行如下配置前,首先要启动Zookeeper. 配置单机kafka 1.进入kafka解压目录 2.启动kafka bin\ ...
- Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover
1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...
- Kafka学习之四 Kafka常用命令
2019独角兽企业重金招聘Python工程师标准>>> Kafka学习之四 Kafka常用命令 Kafka常用命令 以下是kafka常用命令行总结: 1.查看topic的详细信息 . ...
- J1angの小白式kafka学习总结(1)
kafka学习阶段性总结(1) kafka概述 基本概念:什么是kafka 消息队列的两种模式 为什么要使用kafka kafka的基本架构组成 kafka架构 kafka组成 集群配置 jar包下载 ...
- kafka学习武林秘籍
kafka学习教程分享 链接:https://pan.baidu.com/s/13nqDiX5yrXb6-X2OJddsoQ 提取码:njqf
- Kafka学习【1】
Kafka学习[1] Kafka的用途有哪些?使用场景如何?### 消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦.冗余存储.流量削峰.缓冲.异步通信.扩展性.可恢复性等功 ...
最新文章
- Open source robotics toolkits: use virtual arenas to test your robotics algorithms
- 金山办公或将陷入低增长,再去哪里找客户?
- 半导体二极管和晶体三极管
- P1199 三国游戏
- OSPF协议介绍及配置 (下)
- 抛物面天线的工作原理是什么?
- [EOJ439] 强制在线
- 深入浅出 Java Concurrency (6): 锁机制 part 1[转]
- java怎么实现邮箱机制_JAVAMAIL实现与详细机制
- 疯狂python讲义pdf_如何自学成Python大神?这份学习宝典火爆 IT 圈!
- 命令行开发、编译、打包Android应用程序
- Thread 1: signal SIGABRT
- [转载] 的士速递3
- nohup命令简单的使用
- SpringBoot+zk+dubbo架构实践(二):SpringBoot 集成 zookeeper
- 设计模式(八)桥接模式
- S3cCTF-gyy-Writeup
- 无法获得下列许可solidworks standard无效的(不一致的)使用许可号码(-8,544,0) solidworks2020 (亲测有效)
- matlab norm函数_机器人手眼标定MATLAB及C++实现(二十九)
- 如何快速在手机上修改证件照底色
热门文章
- 算法题:在一个字符串中找到只出现一次的字符。如输入abaccdeeff,则输出bd。
- 管理任务执行-如何制定有效的机制
- volatile、static
- Linux查看系统信息的一些命令及查看已安装软件包的命令
- eclipse和jdk的版本问题,比如printf()出错
- B.一个人的旅行 (dijkstra算法)
- Android中SQLiteDatabase操作【附源码】
- 使用 Python ElementTree 生成 xml
- 在MFC,Win32程序中向控制台(Console)窗口输出调试信息
- python做算法题优势_Python语言在科学算法中的优势