最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)
最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)
欢迎转载,转载请注明网址:https://blog.csdn.net/qq_41910280
简介:一篇全面Kafka教程。
文章目录
- 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)
- 1. Kafka概述
- 2. kafka部署与原生命令
- 3. java连接kafka
- 4. spring-kafka
- 5. 使用springboot
- 参考文献
1. Kafka概述
1.1 Kafka是什么
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
1.2 Kafka有如下特性:
• 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
• 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
• 支持通过Kafka服务器和消费机集群来分区消息。
• 支持Hadoop并行数据加载。
1.3 相关术语介绍
Broker: Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic: 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition: Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer: 负责发布消息到Kafka broker
Consumer: 消息消费者,向Kafka broker读取消息的客户端。
Consumer Group: 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
1.4 实现原理
始终记住, kafka集群中每一个topic可以分布在不同broker, 每个broker可以有多个partition(也就是说,每个topic可以有多个partition)。 而每个消费者group中每个消费者都会接管相应的一些partition, 即每个partition在每个consumer group中始终由同一个cosumer消费。
订阅topic的不是consumer,而是consumer group。topic上的每一条消息分配到一个partition,然后由负责该partition的consumer去消费。
我们来看看官方的图, 事实上server 1中也有P1 P2的备份,server 2也有P0 P3的备份。对于每一个partition,在同一个consumer group中总由同一个cosumer消费。另外,在一个topic中,每一个partition的消息是有序的(即生产者produce的消息和消费者consume的顺序是一致的),而多个partition之间不保证顺序。
传统的消息队列有队列(点对点)和发布/订阅两种模式。而在kafka中,每个topic都具有这两种能力,如果有多个消费者组,那么就是发布/订阅模式,如果所有消费者在同一个group就和队列模式相似。并且每个topic都是可以扩展或者修改的,它可以是queuing也可以是publish/subscribe。
顺序性:传统消息队列中每个topic的消息是顺序保存在服务器上, 然后异步发送的消费者, 导致并行消费的消费者是无序消费的。传统消息队列通过“exclusive consumer”解决这个问题,这样只有一个消费者在消费消息,只有这一个comsumer挂了才换到另一个consumer,因此失去了并行性。 而kafka每个分区仅有一个consumer消费,保证了其顺序性,同时由于有多个分区,consumer可以均衡负载。建议消费者组中的消费者数量不要超过分区数量,否则部分consumer会打酱油。
1.5 kafka功能:
1.5.1 作为消息系统
见1.4实现原理
补充:
传统的消息队列有消息确认机制, 当一条消息发送给cosumer时将它标记为已发送未消费, 当消费者消费后发来确认再标记为已消费。但是这种确认机制有两个缺点, 一是comsumer在消费处理完成但发送确认之前故障, 则消息会被消费两次, 二是性能问题, 消息队列需要维护两种状态(已发送未消费、已消费)。因此考虑如何处理已发送但未确认的消息非常必要,这是一个棘手的问题。
而kafka采用offset替代消息确认机制。每个分区partition内的消息都是有序的,由同一个comsumer来消费,这使得状态消耗非常小,每个partition只需要维护一个整数。同时还可以故意回到以前的offset重新消费消息。具体的,针对不同场景有三种消费模式:
- At-most-once(最多一次),
客户端收到消息后,在处理消息前自动提交,这样kafka就认为consumer已经消费过了,偏移量增加。 - At-least-once(最少一次)
客户端收到消息,处理消息,再提交反馈。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么kafka认为这个消息还没有被consumer消费,产生重复消息推送。 - Exactly-once(正好一次)
保证消息处理和提交反馈在同一个事务中,即有原子性。
具体实现参考https://blog.csdn.net/laojiaqi/article/details/79034798
https://my.oschina.net/chuibilong/blog/896685
1.5.2 作为存储系统
kafka具有持久化以及备份等机制, 可以作为一种专用于高性能,低延迟提交日志存储,复制和传播的专用分布式文件系统。
1.5.3 用于流处理
在Kafka中,流处理器是指从输入主题获取连续的数据流,对这个输入执行一些处理,并产生连续的数据流到输出主题。
1.5.4 总结
这三个功能的组合似乎看起来不可思议, 但是它对kafka的作用至关重要。它允许kafka存储和处理过去的历史数据,也可以处理订阅topic后的未来数据。同时kafka可以具有极低的延迟,也可以定期加载数据,甚至可以与长时间停机维护的离线系统集成。
1.6 发布/订阅模式与观察者模式的区别
发布/订阅模式需要一个消息中间件,而消息的发布者(publisher)和订阅者(subscriber)不知道对方的存在,而观察者模式则是Subject发生变化时及时告知Observer,使其做出响应,有push和pull两种方式通知观察者。
2. kafka部署与原生命令
下载后解压到/opt/, 为了写教程我重新下载了最新稳定版本 (2019年4月11日) kafka_2.12-2.2.0, 下载地址
http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
修改config/server.properties, 添加如下参数
# 修改了broker.id listener端口:前面可以加ip host.name和port已经过时
broker.id=0
# IP 端口 日志存放路径 zookeeper地址和端口 (记得注释掉后面的这几个参数)
listeners=PLAINTEXT://192.168.253.128:9092
log.dirs=/opt/kafka_2.12-2.2.0/logs
zookeeper.connect=localhost:2181
其中192.168.253.128是本机ip
kafka命令
(首先, 你需要启动一个zookeeper,可以通过内置命令bin/zookeeper-server-start.sh config/zookeeper.properties启动, 也可以和我一样单独启动一个zookeeper)
启动命令:
bin/kafka-server-start.sh config/server.properties
后台启动:
nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
停止命令
bin/kafka-server-stop.sh
创建topic:
新版:bin/kafka-topics.sh --create --bootstrap-server 192.168.253.128:9092 --replication-factor 1 --partitions 1 --topic test
旧版(兼容):bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看所有topic:
新版:bin/kafka-topics.sh --list --bootstrap-server 192.168.253.128:9092
旧版(兼容):bin/kafka-topics.sh -list -zookeeper 127.0.0.1:2181
查看指定topic
新版:bin/kafka-topics.sh --describe --bootstrap-server 192.168.253.128:9092 --topic test
旧版(兼容):bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
修改分区的replication和partition
修改replication https://blog.csdn.net/lizhitao/article/details/45894109
修改partition https://www.cnblogs.com/buxizhizhoum/p/8251494.html
启动producer:
bin/kafka-console-producer.sh --broker-list 192.168.253.128:9092 --topic test
启动consumer:
bin/kafka-console-consumer.sh -bootstrap-server 192.168.253.128:9092 --topic test --from-beginning
(不加--from-beginning就是从未被consume的消息开始读取数据)
删除topic
1.bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
2.进入zookeeper目录执行bin/zkCli.sh连接zookeeper, 然后执行rmr /brokers/topics/test
启动producer和consumer之后你可以尝试从控制台发送和接收消息(小提示:在producer控制台输入” abcHdHc”并按下回车或两下EOF(我也不知道为什么要按两下…), 在consumer的控制台上会显示”abc”, ”^H”是键盘输入的Backspace退格键), 你还可以尝试使用kafka-connect从文件中获取消息到kafka或者从kafka导出到文件,详见http://kafka.apache.org/quickstart#quickstart_kafkaconnect
部署集群
提示: 如果你的kafka自动退出, 你可以查看日志, 如果有以下提示表明你的内存不够用了
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
你可以修改bin/kafka-server-start.sh, 将
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
中的内存改为-Xmx256M -Xms128M
> cp config/server.properties config/server-1.properties> cp config/server.properties config/server-2.properties> vi config/server-1.propertiesbroker.id=1listeners=PLAINTEXT://192.168.253.128:9093log.dirs=/opt/kafka_2.12-2.2.0/logs-1zookeeper.connect=localhost:2181> vi config/server-2.propertiesbroker.id=2listeners=PLAINTEXT://192.168.253.128:9094log.dirs=/opt/kafka_2.12-2.2.0/logs-2zookeeper.connect=localhost:2181> nohup bin/kafka-server-start.sh config/server-1.properties 1>/dev/null 2>&1 &> nohup bin/kafka-server-start.sh config/server-2.properties 1>/dev/null 2>&1 &
# 创建一个3个replication, 1个partition的节点
> bin/kafka-topics.sh --create --bootstrap-server 192.168.253.128:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
结果第一行是总览, 之后各行分别是各个partition的情况, 其中leader表示主节点, replicas表示全部节点, isr(in-sync)表示状态ok的节点
你可以kill其中的部分broker以测试其容错性
Kafka Connect
之前我们使用的是从控制台写入数据并写回控制台, 现在我们来尝试一下其他数据source与destination。
Kafka Connect是Kafka附带的工具,可以向Kafka导入和导出数据。
首先, stop之前的kafka, 然后将ip改为127.0.0.1重新启动(或者修改后面的properties文件中的ip)
创建数据源
> echo -e "foo\nbar" > test.txt
执行以下命令(该命令让source源连接器从test.txt读取数据到主题connect-test, 然后sink接收器连接器从connect-test主题读取数据写入到test.sink.txt)
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
另起一个窗口, 依次执行
> more test.sink.txt
> echo Another line>> test.txt
> more test.sink.txt
Kafka Streams
(新版–bootstrap-server 192.168.253.128:9092和旧版–zookeeper localhost:2181可以互换, 后面不废话了)
创建input topic
bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
创建output topic
bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic streams-wordcount-output
启动wordcount demo
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
在单独的终端启动input producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
启动output comsumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
–topic streams-wordcount-output
–from-beginning
–formatter kafka.tools.DefaultMessageFormatter
–property print.key=true
–property print.value=true
–property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
–property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
在producer控制台输入all streams lead to kafka
在comsumer控制台会打印
继续在producer输入hello kafka streams
comsumer控制台会多出三行
这是如何做到的呢?
实际上, 有一个KTable<String, Long>, 每次出现同样的String, 那么Long就会+1, 并且是逐个单词逐个单词来处理的, 而不是一句一句来处理的, 比如说你接着输入”kafka kafka kafka”, 控制台出打印 而不仅是”kafka 5”
停止kafka streams
我们可以按照顺序依次Ctrl + C掉comsumer、producer、wordcount、kafka、zookeeper
3. java连接kafka
先把kafka中server.properties的ip改回来, 然后启动3个kafka
pom.xml
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.12</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
</dependencies>
consumer
package com.example.consumer;import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class Consumer01 {public static void main(String[] args) {Properties props = new Properties();// 定义kakfa 服务的地址,不需要将所有broker指定上props.put("bootstrap.servers", "192.168.253.128:9092");// 制定consumer groupprops.put("group.id", "g1");// 是否自动确认offsetprops.put("enable.auto.commit", "true");// 自动确认offset的时间间隔props.put("auto.commit.interval.ms", "1000");// key的序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// value的序列化类props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 定义consumerKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 消费者订阅的topic, 可同时订阅多个consumer.subscribe(Arrays.asList("test", "my-multipartition-topic"));while (true) {// 读取数据,读取超时时间为100msConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}
}
producer
package com.example.provider;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;public class Provider01 {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "192.168.253.128:9092");// 等待所有副本节点的应答 "-1"与"all"相同props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 50; i++) {System.out.println(i);
// producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello world->" + i));producer.send(new ProducerRecord<String, String>("my-multipartition-topic", Integer.toString(i), "hello world->" + i));}System.out.println("producer close...");producer.close();}
}
通过上面的代码,你已经可以尝试发送消息和处理消息,还可以对比多个partition的topic和单个partition的结果,你会发现之前“特定partition中的消息是有序的,而多个partition的消息是无序的”这一结论的正确性。
callback producer
带回调函数的producer
package com.example.provider;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class CallbackProvider {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka服务端的主机名和端口号// props.put("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");props.put("bootstrap.servers", "192.168.253.128:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 自定义分区
// props.put("partitioner.class", "com.example.partitioner.Partitioner01");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);for (int i = 0; i < 50; i++) {Thread.sleep(500);kafkaProducer.send(new ProducerRecord<String, String>("my-multipartition-topic", "value->" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (metadata != null) {System.out.println(metadata.partition() + "---" + metadata.offset());}}});}kafkaProducer.close();}
}
指定分区(放开上面provider “自定义分区”的注释)
package com.example.partitioner;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;public class Partitioner01 implements Partitioner {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 控制分区return 1;}@Overridepublic void close() {}}
producer拦截器
实现ProducerInterceptor接口, 里面有4个方法(描述转载至https://blog.csdn.net/u013256816/article/details/78573425):
- ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在将消息序列化和分配分区之前会调用拦截器的这个方法来对消息进行相应的操作。一般来说最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样也会影响Broker端日志压缩(Log Compaction)的功能。
- void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被应答(Acknowledgement)之前或者消息发送失败时调用,优先于用户设定的Callback之前执行。这个方法运行在Producer的IO线程中,所以这个方法里实现的代码逻辑越简单越好,否则会影响消息的发送速率。
- void close():关闭当前的拦截器,此方法主要用于执行一些资源的清理工作。
- configure(Map<String, ?> configs):用来初始化此类的方法,这个是ProducerInterceptor接口的父接口Configurable中的方法。
拦截器01: 在record的value值前面在上时间错
package com.example.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** 获得record数据, 在value前面加上时间戳*/
public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 创建一个新的record,把时间戳写入消息体的最前部return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),record.key(), System.currentTimeMillis() + record.value());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
拦截器02: 统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器
package com.example.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CountInterceptor implements ProducerInterceptor<String, String> {private int errorCount = 0;private int successCount = 0;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 统计失败或者成功的次数if (exception == null)successCount++;elseerrorCount++;}@Overridepublic void close() {// 保存结果System.out.println("Successful sent: " + successCount);System.out.println("Failed sent: " + errorCount);}@Overridepublic void configure(Map<String, ?> configs) {}
}
修改Provider01: 添加拦截器配置
// 配置拦截器链
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("com.example.interceptor.CountInterceptor", "com.example.interceptor.TimeInterceptor"));
Kafka Streams
- 将输入流” streams-plaintext-input”变成一个个单词输出到” streams-linesplit-output”
package com.example.streams;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class Linesplit {public static void main(String[] args) {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");// 唯一标识: 区别与kafka集群通信的其他应用程序props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.253.128:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();// 指定输入和输出的topicKStream<String, String> source = builder.stream("streams-plaintext-input");source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
}
启动方式:
mvn clean package -Dmaven.test.skip=true
mvn exec:java -Dexec.mainClass=com.example.streams.Linesplit
按Ctrl+C退出streams
查看输出主题record命令:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.253.128:9092
–topic streams-linesplit-output
–from-beginning
–formatter kafka.tools.DefaultMessageFormatter
–property print.value=true
–property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
2. WordCount单词计数
package com.example.streams;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class WordCount {public static void main(String[] args) {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.253.128:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))).groupBy((key, value) -> value).count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
}
启动方式:
mvn clean package -Dmaven.test.skip=true
mvn exec:java -Dexec.mainClass=com.example.streams.WordCount
按Ctrl+C退出streams
查看输出主题record命令:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.253.128:9092
–topic streams-linesplit-output
–from-beginning
–formatter kafka.tools.DefaultMessageFormatter
–property print.value=true
–property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
4. spring-kafka
- AutoCommitTest自动提交offset
- AckCommitTest手动提交offset
- ConfigTest使用java spring配置
请参考https://github.com/Spark4J/kafka-demo/tree/master/spring-kafka-demo,该项目遵循“Anti 996”协议,具体spring-kafka用法讲解在下一章节“使用springboot”部分
5. 使用springboot
pom及源码见上一章github项目
先来个quick start再逐一讲解 (看不懂的跳过, 后面有讲解)
application.yml
spring:kafka:bootstrap-servers: 192.168.253.128:9092consumer:group-id: group1auto-offset-reset: earliestenable-auto-commit: trueauto-commit-interval: 100key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:batch-size: 16384retries: 3acks: allkey-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
# transaction-id-prefix: myKafkaTransact
Appliaction.java
package com.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;@SpringBootApplication
public class Application implements CommandLineRunner {public static Logger logger = LoggerFactory.getLogger(Application.class);public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Autowiredprivate KafkaTemplate<String, String> template;private final CountDownLatch latch = new CountDownLatch(3);@Overridepublic void run(String... args) throws Exception {this.template.send("topic1", "foo1");this.template.send("topic1", "foo2");this.template.send("topic1", "foo3");latch.await(60, TimeUnit.SECONDS);logger.info("All received");}@KafkaListener(topics = "topic1")public void listen(ConsumerRecord<?, ?> cr) throws Exception {logger.info(cr.toString());latch.countDown();}
}
1.配置topic
首先需要定义KafkaAdmin, 它可以自动向kafka添加主题。然后可以使用NewTopic创建主题。
@Configuration
public class TopicManager {@Value("${spring.kafka.bootstrap-servers}")private String bootstarp_servers;@Beanpublic KafkaAdmin admin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstarp_servers);return new KafkaAdmin(configs);}@Beanpublic NewTopic topic1() {return new NewTopic("thing1", 10, (short) 2);}@Beanpublic NewTopic topic2() {return new NewTopic("thing2", 10, (short) 2);}
}
2.发送消息
@Slf4j
@Component
public class MyProducer {@Autowiredprivate KafkaTemplate<String, String> template;// 异步非阻塞public void sendToKafkaAsync(final ProducerRecord<String, String> record) {ListenableFuture<SendResult<String, String>> future = template.send(record);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("发送消息成功: record={}, result={}", record, result);}@Overridepublic void onFailure(Throwable ex) {log.info("发送消息失败: record={}, exception={}", record, ex.getMessage());ex.printStackTrace();}});}// 同步阻塞public void sendToKafkaSync(final ProducerRecord<String, String> record) {try {template.send(record).get(10, TimeUnit.SECONDS);log.info("发送消息成功: record={}", record);} catch (Exception e) {log.info("发送消息失败: record={}, exception={}", record, e.getMessage());e.printStackTrace();}}}
Test
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {@AutowiredMyProducer producer;@Test
// @Transactionalpublic void contextLoads() {long begin = System.currentTimeMillis();for (int i = 0; i < 100; i++) {// producer.sendToKafkaAsync(new ProducerRecord<>("topic1", "" + i, "val" + i));producer.sendToKafkaSync(new ProducerRecord<>("topic1", "" + i, "val" + i));}long end = System.currentTimeMillis();System.out.println("发送一百条消息花费的时间:" + (end - begin));// 异步53ms 同步289ms}}
事务
producer的幂等性: 当producer启动时Kafka会为每个producer分配一个PID(64位整数), producer发送的每条消息都有sequence number, 序列号从0开始递增, 同时broker会为每个producer保存其sequence number。这样,每次producer发送新的消息batch时,如果携带sequence number和缓存的sequence number冲突(序号差大于一说明中间有数据未写入broker, Producer 抛出 InvalidSequenceNumber; 序号小于一表示为重复数据即ack回传失败, Producer 抛出 DuplicateSequenceNumber),则会拒绝写入(重点:1.PID; 2. sequence number缓存更新机制)。注意: 每个partition的幂等性需要在同一个PID下, 单个Producer的同一个session中, 如果一个producer挂了被分配了新的PID则无法保证, 因此又有了事务。
producer的事务: 应用程序有一个transaction.id, 就算重启也不会改变, 这种情况下kafka根据transaction.Id获取对应的PID,这个对应关系是保存在事务日志中。这样可以确保相同的TransactionId返回相同的PID,用于恢复或者终止之前未完成的事务。同时每个producer注册到kafka时还会初始化一个epoch,如果两个producer具有相同的transaction.id, 其中epoch较老的视为僵尸进程, kafka不接受其输入。而消费者也需要将isolation.level设置为read_committed。前面也说过,消费消息是通过offset来确定,而各topic-partition的offset记录在名为__consumer_offsets的topic中,即消息的消费也是向__consumer_offsets写入消息,因此,原子性的向多个topic和partition写入消息也保证了原子性的consume和produce。
详见:https://juejin.im/post/5c00e1985188255125070ccb
https://blog.51cto.com/13739602/2161924
https://www.confluent.io/blog/transactions-apache-kafka/
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
开启事务:
第一种方式(不支持僵尸进程): 1.设置spring.kafka.producer.transaction-id-prefix 2.@Transactional
第二种方式: 将DefaultKafkaProducerFactory 的producerPerConsumerPartition设置为false, 可以预防僵尸进程
3.接收消息
package com.example.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Slf4j
@Configuration
public class MyListener {/*这里concurrency等于3, topicPartitions等于4,那么一个Container分配两个partition, 另外两个Container各分配一个partition*/@KafkaListener(id = "myListener"/*, topicPartitions ={@TopicPartition(topic = "thing1", partitions = {"0", "1"}),@TopicPartition(topic = "thing2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset ="100"))// Note: 不要在partitions和partitionOffsets中指定同一个分区}*/, topics = "thing2", concurrency = "${listen.concurrency:3}")public void listen(ConsumerRecord<?, ?> record/*, Acknowledgment acknowledgment 手动模式才可以用*/) {log.info("收到消息: {}", record);}
}
参考文献
- 官方文档
神奇的小尾巴:
本人邮箱:zhouyouchn@126.com zhoooooouyou@gmail.com
zhouyou@whut.edu.cn 欢迎交流,共同进步。
欢迎转载,转载请注明本网址。
最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)相关推荐
- Apache Kafka教程--Kafka新手入门
Apache Kafka教程–Kafka新手入门 Kafka Assistant 是一款 Kafka GUI 管理工具--管理Broker,Topic,Group.查看消费详情.监控服务器状态.支持多 ...
- Apache Kafka教程
1.卡夫卡教程 今天,我们正在使用Apache Kafka Tutorial开始我们的新旅程.在这个Kafka教程中,我们将看到什么是Kafka,Apache Kafka历史以及Kafka的原因.此外 ...
- Kafka教程(一)基础入门:基本概念、安装部署、运维监控、命令行使用
Kafka教程(一)基础入门 1.基本概念 背景 领英->Apache 分布式.消息发布订阅系统 角色 存储系统 消息系统 流处理平台-Kafka Streaming 特点 高吞吐.低延迟 cg ...
- Docker-Compose部署kafka教程
Docker-Compose部署kafka教程 1. 环境 Ubuntu 20以上 Docker version 20以上 Docker-Compose version 1.25以上 安装Docker ...
- kafka教程_2018年机器学习趋势与Apache Kafka生态系统结合
kafka教程 在慕尼黑举行的OOP 2018大会上,我介绍了有关使用Apache Kafka生态系统和诸如TensorFlow,DeepLearning4J或H2O之类的深度学习框架构建可扩展,关键 ...
- Spring Apache Kafka教程
在本SpringApache Kafka课程中,我们将学习如何在Spring Boot项目中开始使用Apache Kafka,并开始生成和使用我们所选主题的消息. 除了一个简单的项目外,我们还将深入探 ...
- kafka启动_Kafka安装部署——单节点
1.1 Kafka的单节点部署 在实际的工作中,经常使用Kafka作为消息队列,然而并不是每一种业务场景都需要集群版的Kafka,有时单节点的Kafka就能满足了业务的需求.以下就是单节点kafka的 ...
- Kafka教程(一)Kafka入门教程
Kafka教程(一)Kafka入门教程 1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件 ...
- Docker最新超详细教程——安装与部署
Docker最新超详细教程--安装与部署 安装Docker CentOS安装Docker 1. 卸载(可选) 2. 安装yum工具 3. 更新本地镜像源 4. 安装docker 5. 注意事项 关闭防 ...
最新文章
- 数据库在EF中创建模型
- mybatis的#和$的差别
- nginx反向代理tomcat提示failed (13: Permission denied) while connecting to upstream
- Windows 如何在命令终端(CMD)使用命令来访问本地/远程的 Oracle 数据库呢?
- git 操作二进制文件
- 线性地址到物理地址的映射
- 中国人工智能学会通讯——基于视频的行为识别技术 1.5 基于深度学习的视频识别方法...
- java中 CopyOnWriteArrayList 的使用
- PostgreSQL(2)常用命令(附教程)
- 计算机视觉(CV)前沿国际国内期刊与会议
- Linux环境下搭建Java Web测试环境的具体步骤
- c语言程序设计双语版答案,C程序设计(双语版)习题答案.doc
- 13讲项目实战内页滚动图效果实现
- 企业级业务架构如何设计?
- 微型计算机常见的输入与输出设备,微型计算机的输入输出设备.doc
- HTTP: CDN缓存机制
- Kubernetes权威指南(下)
- 计算机辅助培训的策略,宁波诺丁汉大学学习策略培训对解决计算机辅助语言教学环境下信息过剩问题的启示...
- pip离线安装第三方包
- 计算机应用基础的题库,计算机应用基础题库
热门文章
- js判断网络链接的四种方法
- mac 安装node.js
- Android 快捷方式 shortcuts 使用
- Acquisition Attempt Failed!!! Clearing pending acquires. While trying to acquire a needed new resour
- 国外免费公共DNS解析服务器
- 自学考试-“运筹学基础”
- 机械键盘到底茶轴好还是黑轴好呢?第一次用,需要注意什么?
- Caffe简明教程1:Caffe简介
- Nachos系统调用的实现
- 奥利给! loading效果这么搞真的太棒了