使用java API操作kafka

1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast</groupId><artifactId>KafkaDemo</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>1.0.0</version></dependency></dependencies>
</project>

2.producer和consumer配置文件

  2.1producer.properties

#请求时候需要验证
acks=all
#请求失败时候需要重试
retries=0
#内存缓存区大小
buffer.memory=33554432
#分区类
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
#broker地址
bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
#指定消息key序列化方式
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#指定消息本身的序列化方式
value.serializer=org.apache.kafka.common.serialization.StringSerializer

  2.2consumer.properties

#每个消费者分配独立的组号
group.id=test
#如果value合法,则自动提交偏移量
enable.auto.commit=true
#设置多久一次更新被消费消息的偏移量
auto.commit.interval.ms=1000
#设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
session.timeout.ms=30000
#指定消息key序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消息本身的序列化方式
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#broker地址
bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092

3.生产者和消费者代码

  3.1 KafkaProducerSimple.java

 1 package cn.itcast.kafka;
 2
 3 import java.io.IOException;
 4 import java.io.InputStream;
 5 import java.util.Properties;
 6 import java.util.UUID;
 7
 8 import org.apache.kafka.clients.producer.KafkaProducer;
 9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11
12 public class KafkaProducerSimple {
13     public static void main(String[] args) throws IOException {
14         Properties properties = new Properties();
15         InputStream inStream = KafkaProducerSimple.class.getClassLoader().getResourceAsStream("producer.properties");
16
17         properties.load(inStream);
18
19         Producer<String, String> producer = new KafkaProducer<>(properties);
20         String TOPIC = "orderMq6";
21         for (int messageNo = 1; messageNo < 10000; messageNo++) {
22             producer.send(new ProducerRecord<String, String>(TOPIC,messageNo + "", UUID.randomUUID() + "itcast"));
23         }
24     }
25 }

  3.2 KafkaConsumerSimple.java

 1 package cn.itcast.kafka;
 2
 3 import java.io.InputStream;
 4 import java.util.Arrays;
 5 import java.util.Properties;
 6
 7 import org.apache.kafka.clients.consumer.Consumer;
 8 import org.apache.kafka.clients.consumer.ConsumerRecord;
 9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11
12 public class KafkaConsumerSimple {
13
14     public static void main(String[] args) throws Exception {
15         Properties properties = new Properties();
16         InputStream inStream = KafkaConsumerSimple.class.getClassLoader().getResourceAsStream("consumer.properties");
17         properties.load(inStream);
18         Consumer<String, String> consumer = new KafkaConsumer<>(properties);
19         consumer.subscribe(Arrays.asList("orderMq6"));
20         while (true) {
21             ConsumerRecords<String, String> records = consumer.poll(100);
22             if (records.count() > 0) {
23                 for (ConsumerRecord<String, String> record : records) {
24                     System.out.println(record.value());
25                 }
26
27             }
28         }
29     }
30 }

  以上代码如果执行超时,必须在本地host文件中配置broker的hostname和ip的映射。

转载于:https://www.cnblogs.com/zhaobingqing/p/8579215.html

Kafka系列三 java API操作相关推荐

  1. 大数据技术之_20_Elasticsearch学习_01_概述 + 快速入门 + Java API 操作 + 创建、删除索引 + 新建、搜索、更新删除文档 + 条件查询 + 映射操作

    大数据技术之_20_Elasticsearch学习_01 一 概述 1.1 什么是搜索? 1.2 如果用数据库做搜索会怎么样? 1.3 什么是全文检索和 Lucene? 1.4 什么是 Elastic ...

  2. 大数据技术之_20_Elasticsearch学习_01_概述 + 快速入门 + Java API 操作 + 创建、删除索引 + 新建、搜索、更新删除文档 + 条件查询 + 映射操作...

    一 概述1.1 什么是搜索?1.2 如果用数据库做搜索会怎么样?1.3 什么是全文检索和 Lucene?1.4 什么是 Elasticsearch?1.5 Elasticsearch 的适用场景1.6 ...

  3. kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2

    1.JAVA API操作kafka  修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...

  4. Hadoop读书笔记(三)Java API操作HDFS

    Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的sh ...

  5. java api 操作helm

    文章目录 java api 操作helm 一.helm架构在云管理平台开发中的不足 二.captain介绍 安装captain 卸载captain chart repo问题 三.命令行安装mongod ...

  6. Hbase 完全分布式模式的搭建、命令行操作、Java API操作

    追风赶月莫停留,平芜尽处是春山. 文章目录 追风赶月莫停留,平芜尽处是春山. 环境 Hbase 完全分布式模式的搭建 一.下载安装包,解压到合适位置: 二.配置相关的文件: 三.将Hbase复制到其他 ...

  7. HDFS Java API 操作

    文章目录 HDFS Java API操作 零.启动hadoop 一.HDFS常见类接口与方法 1.hdfs 常见类与接口 2.FileSystem 的常用方法 二.Java 创建Hadoop项目 1. ...

  8. MongoDB Java API操作很全的整理以及共享分片模式下的常见操作整理

    MongoDB 是一个基于分布式文件存储的数据库.由 C++ 语言编写,一般生产上建议以共享分片的形式来部署. 但是MongoDB官方也提供了其它语言的客户端操作API.如下图所示: 提供了C.C++ ...

  9. 2021年大数据ZooKeeper(五):ZooKeeper Java API操作

    目录 ZooKeeper Java API操作 引入maven坐标 节点的操作 ZooKeeper Java API操作 这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端 ...

最新文章

  1. 北航云计算公开课 01 Introduction to Cloud Computing
  2. react项目---基本语法字符串数组(6)
  3. 吴裕雄 python 神经网络——TensorFlow 花瓣分类与迁移学习(1)
  4. python基础知识面试题-深入解答关于Python的11道基本面试题
  5. HL7 ADT Message Sample
  6. BERT源码分析(PART I)
  7. php计算时间差js,JavaScript如何计算时间差(引入外部字体文件)?
  8. (2) freemarker入门案例2
  9. Java面试2021,java数据可视化项目
  10. 关键词是用分号还是逗号隔开_逗号、顿号、分号、冒号、破折号的用法
  11. 团队管理---猴子管理管理法则
  12. matlab 开采沉陷 何,MATLAB在开采沉陷预计可视化中的应用
  13. python2多线程_python_并发编程——多线程2
  14. Unofficial Windows Binaries for Python Extensi...
  15. JDK8下载 (jdk-8u271-windows-x64和jdk-8u271-linux-x64.tar)
  16. css的div纵向居中
  17. HTML、CSS面试题
  18. 主引导扇区及主引导记录MBR的详细说明
  19. 美容院 php源代码,基于ThinkPHP+B-JUI框架开发的微信美容院SPA预约消费管理系统PHP源码...
  20. 山西大同大学计算机分数线,山西大同大学录取分数线2021是多少分(附历年录取分数线)...

热门文章

  1. C/C++中static关键字用法汇总
  2. Linux下常用的C/C++开源Socket库
  3. Windows7在Notepad++中配置Python+OpenCV
  4. 【数据库】sqlite中的限制:数据库大小、表数、列数、行数、参数个数、连接数等
  5. 【AI】图示:精确度(查准率)Precision、召回率(查全率)Recall
  6. 【leetcode】力扣刷题(2):两数相加(go语言)
  7. 【Qt】Qt样式表(Style Sheet):官网说明及例子
  8. C++之Boost使用
  9. github上好的c语言项目,2019 github热门项目
  10. 找不到第三方怎么理赔_车子被撞,找不到肇事者怎么办?