Kafka包含四种核心的API:

1、Producer API支持应用将数据流发送到Kafka集群的主题

2、Consumer API支持应用从Kafka集群的主题中读取数据流

3、Streams API支持数据流从输入主题转化到输出主题

4、Connect API支持实现持续地从一些源系统或应用划入Kafka或者从Kafka推入一些源系统或应用的接口。

我们这里主要讨论Producer API和Consumer API的使用,由于最新版的kafka java api中使用了一些jdk8的新特性,所以要求我们在本机上jdk版本要在8以上。

pom.xml如下:

org.springframework.kafka

spring-kafka

Producer API

Producer用来向Kafka集群中发布消息记录的Kafka客户端。Producer是线程安全的,并且通常来讲,在多个线程间共享一个producer要比每个线程都创建一个producer速度更快。producer代码示例:

packagecom.example.demo;importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;public classMyProducer {public static voidmain(String[] args) {

Properties props= newProperties();

props.put("bootstrap.servers", "192.168.1.124:9092");

props.put("acks", "all");

props.put("retries", 0);

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("partitioner.class", "com.example.demo.MyPartitioner");

props.put("buffer.memory", 33554432);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++)

producer.send(new ProducerRecord("powerTopic", Integer.toString(i), Integer.toString(i)));

producer.close();

}

}

properties里用到的配置参数在kafka的源码里org.apache.kafka.clients.producer.ProducerConfig类中,这里说一下常用的:

bootstrap.servers 配置项处需要填写我们要发送到的Kafka集群地址。

ack 配置项用来控制producer要求leader确认多少消息后返回调用成功。当值为0时producer不需要等待任何确认消息。当值为1时只需要等待leader确认。当值为-1或all时需要全部ISR集合返回确认才可以返回成功。

retries 当 retries > 0 时,如果发送失败,会自动尝试重新发送数据。发送次数为retries设置的值。

buffer.memory、batch.size、linger.ms三个参数用来控制缓冲区大小和延迟发送时间,具体含义可以参考官方文档的配置。

key.serializer 和 value.serializer 指定使用什么序列化方式将用户提供的key和value进行序列化。

Consumer API

Consumer的API分为High-level API和Low-level API。前者提供了高度抽象的API,使用起来简单、方便。因此本文将主要讲述High-level API。Low-level API提供了更强的控制能力,但使用起来较为繁琐。自动提交consumer代码示例:

packagecom.example.demo;importjava.util.Arrays;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;public classMyAutoCommitConsumer {public static voidmain(String[] args) {

Properties props= newProperties();

props.put("bootstrap.servers", "192.168.1.124:9092");

props.put("group.id", "test");

props.put("enable.auto.commit", "true");

props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

@SuppressWarnings("resource")

KafkaConsumer consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("powerTopic"));while (true) {

ConsumerRecords records = consumer.poll(100);for (ConsumerRecordrecord : records)

System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());

}

}

}

properties里用到的配置参数在kafka的源码里org.apache.kafka.clients.consumer.ConsumerConfig类中,本例中用到参数解释如下:

bootstrap.servers配置项指定了consumer需要连接的服务器集群。多台服务器用“,”分隔

enable.auto.commit配置项指定了提交offset的方式为自动提交,auto.commit.interval.ms配置项配置了每次自动提交的时间间隔。

group.id 即消费者组标签,本例中消费者组的名称为test。

key.deserializer和value.deserializer指用什么方式进行反序列化。

自动提交offset的方式非常简单,但多数情况下,我们不会使用自动提交的方式。因为不论从Kafka集群中拉取的数据是否被处理成功,offset都会被更新,也就是如果处理过程中出现错误可能会出现数据丢失的情况。所以多数情况下我们会选择手动提交方式,我们看到 enable.auto.commit 配置项被设置为false,代表手动提交。示例代码如下:

packagecom.example.demo;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;public classMyManualCommitConsumer {public static voidmain(String[] args) {

Properties props= newProperties();

props.put("bootstrap.servers", "192.168.1.124:9092");

props.put("group.id", "test");

props.put("enable.auto.commit", "false");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

@SuppressWarnings("resource")

KafkaConsumer consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("myFirstTopic"));final int minBatchSize = 200;

List> list = new ArrayList<>();while (true) {

ConsumerRecords records = consumer.poll(100);for (ConsumerRecordrecord : records) {

System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

list.add(record);

}if (list.size() >=minBatchSize) {

System.out.println("list中的缓存数据大于minBatchSize时批量进行处理");

consumer.commitSync();

System.out.println("全部数据处理成功后手动提交");

list.clear();

}

}

}

}

另外需注意,consumer是有状态的,所以不是线程安全的,所以在进行多线程操作时需要在每个线程实例化一个consumer。

java kafka api_kafka java API的使用相关推荐

  1. java kafka client_Kafka Client API 基本使用

    之前讲过了[Kafka基本概念及原理][1],这次我们来看看Kafka Client的API.要使用Kafka Client的API,首先需要先部署Kafka集群,部署过程请参见[官网][2].然后在 ...

  2. java kafka client_Kafka Java Client基本使用及整合SpringBoot

    kafka-clients 添加依赖 org.apache.kafka kafka-clients 2.5.0 消费者 Consumer 代码上总体可以分为三部分:消费者的配置消费者的配置在 org. ...

  3. 2021年大数据Kafka(五):❤️Kafka的java API编写❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的java API编写 一.生产者代码 第一步: ...

  4. java consumer.poll_kafka消费者API consumer.poll()没有错误,没有异常,只是阻止

    我正在学习遵循Apache kafka文档的kafka . 我用默认配置启动它 . bin/zookeeper-server-start.sh config/zookeeper.properties ...

  5. (转)Kafka 消费者 Java 实现

    转自: Kafka 消费者 Java 实现 - 简书应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息,首先理解 Kafka 中消费者(consumer)和消费者组(c ...

  6. java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者

    转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...

  7. atitit.js javascript 调用c# java php后台语言api html5交互的原理与总结p97

    atitit.js javascript 调用c# java php后台语言api html5交互的原理与总结p97 1. 实现html5化界面的要解决的策略1 1.1. Js交互1 1.2. 动态参 ...

  8. java+JBroFuzz对restful api进行fuzz测试

    @本文原创,转载请注明 0X00: 序言 fuzz测试作为安全测试的一个基本策略,被越来越多的引入整个测试过程,来避免一些简单的可能引发的安全问题. 如何将fuzzing测试引入软件自动化测试过程是本 ...

  9. 外汇汇率接口 java_基于JAVA的货币汇率api调用代码实例

    代码描述:基于JAVA的货币汇率api调用代码实例 关联数据:货币汇率 接口地址:http://www.juhe.cn/docs/api/id/23 1.[代码][Java]代码 import jav ...

最新文章

  1. synchronize和lock的区别 synchionzie与volatile的区别
  2. 华为服务器部署项目,服务器部署项目
  3. 图像语义分割(12)-重新思考空洞卷积: 为弱监督和半监督语义分割设计的简捷方法
  4. python检查超过两个条件_python-基于多个条件检查,将值从另一个数据...
  5. js打印插件_使用 Nodejs 开发一个 commitlint-release 插件
  6. 419.甲板上的战舰
  7. LibSass 的二进制文件(P:\HBuilderX\plugins\compile-node-sass\node_modules\node-sass-china\vendor\win32-ia32
  8. Robots协议(摘)
  9. if函数多个条件怎么用c语言,条件函数怎么用(if函数多个条件怎么用)
  10. 两表关联去重查询全部数据
  11. html闹钟设置,设置闹钟标签.html
  12. centOS服务器 netstat命令 查看TCP连接数信息(转)
  13. WiMAX与Wi-Fi、DSL和3G的竞合关系
  14. untiy2020 与 HubSetup 安装教程
  15. HMI-44-【多媒体】开启新篇章
  16. word中如何将所有一级标题统一格式(转载)
  17. 借呗还完之后为什么关闭了_蚂蚁借呗怎么突然关闭了 蚂蚁借呗关闭后怎么重开...
  18. 高端配置台式计算机,高配置台式电脑清单 3款高性能主机推荐
  19. Java实现模拟斗地主洗牌、发牌、看牌并排序
  20. 基于matlab的神经网络设计,matlab神经网络训练图片

热门文章

  1. python3安装-mac python3 轻松安装教程
  2. python怎么学最快-零基础怎么样才能学好Python?Python入门必看
  3. python在中国的发展-python在中国的现状和发展趋势
  4. 用python画皮卡丘教程-利用Python绘制萌萌哒的皮卡丘
  5. python的编程模式-python编程(python开发的三种运行模式)【转】
  6. python3.6安装pyqt5-Python3.6安装PyQt5的方法
  7. python代码需要背吗-python程序需要编译吗
  8. python写一个游戏多少代码-使用Python写一个贪吃蛇游戏实例代码
  9. python编程入门书-清华大学出版社-图书详情-《Python编程入门与案例详解》
  10. LeetCode ZigZag Conversion