Kafka系列三 java API操作
使用java API操作kafka
1.pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast</groupId><artifactId>KafkaDemo</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>1.0.0</version></dependency></dependencies> </project>
2.producer和consumer配置文件
2.1producer.properties
#请求时候需要验证 acks=all #请求失败时候需要重试 retries=0 #内存缓存区大小 buffer.memory=33554432 #分区类 partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner #broker地址 bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092 #指定消息key序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #指定消息本身的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer
2.2consumer.properties
#每个消费者分配独立的组号 group.id=test #如果value合法,则自动提交偏移量 enable.auto.commit=true #设置多久一次更新被消费消息的偏移量 auto.commit.interval.ms=1000 #设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 session.timeout.ms=30000 #指定消息key序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #指定消息本身的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #broker地址 bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
3.生产者和消费者代码
3.1 KafkaProducerSimple.java
1 package cn.itcast.kafka; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.util.Properties; 6 import java.util.UUID; 7 8 import org.apache.kafka.clients.producer.KafkaProducer; 9 import org.apache.kafka.clients.producer.Producer; 10 import org.apache.kafka.clients.producer.ProducerRecord; 11 12 public class KafkaProducerSimple { 13 public static void main(String[] args) throws IOException { 14 Properties properties = new Properties(); 15 InputStream inStream = KafkaProducerSimple.class.getClassLoader().getResourceAsStream("producer.properties"); 16 17 properties.load(inStream); 18 19 Producer<String, String> producer = new KafkaProducer<>(properties); 20 String TOPIC = "orderMq6"; 21 for (int messageNo = 1; messageNo < 10000; messageNo++) { 22 producer.send(new ProducerRecord<String, String>(TOPIC,messageNo + "", UUID.randomUUID() + "itcast")); 23 } 24 } 25 }
3.2 KafkaConsumerSimple.java
1 package cn.itcast.kafka; 2 3 import java.io.InputStream; 4 import java.util.Arrays; 5 import java.util.Properties; 6 7 import org.apache.kafka.clients.consumer.Consumer; 8 import org.apache.kafka.clients.consumer.ConsumerRecord; 9 import org.apache.kafka.clients.consumer.ConsumerRecords; 10 import org.apache.kafka.clients.consumer.KafkaConsumer; 11 12 public class KafkaConsumerSimple { 13 14 public static void main(String[] args) throws Exception { 15 Properties properties = new Properties(); 16 InputStream inStream = KafkaConsumerSimple.class.getClassLoader().getResourceAsStream("consumer.properties"); 17 properties.load(inStream); 18 Consumer<String, String> consumer = new KafkaConsumer<>(properties); 19 consumer.subscribe(Arrays.asList("orderMq6")); 20 while (true) { 21 ConsumerRecords<String, String> records = consumer.poll(100); 22 if (records.count() > 0) { 23 for (ConsumerRecord<String, String> record : records) { 24 System.out.println(record.value()); 25 } 26 27 } 28 } 29 } 30 }
以上代码如果执行超时,必须在本地host文件中配置broker的hostname和ip的映射。
转载于:https://www.cnblogs.com/zhaobingqing/p/8579215.html
Kafka系列三 java API操作相关推荐
- 大数据技术之_20_Elasticsearch学习_01_概述 + 快速入门 + Java API 操作 + 创建、删除索引 + 新建、搜索、更新删除文档 + 条件查询 + 映射操作
大数据技术之_20_Elasticsearch学习_01 一 概述 1.1 什么是搜索? 1.2 如果用数据库做搜索会怎么样? 1.3 什么是全文检索和 Lucene? 1.4 什么是 Elastic ...
- 大数据技术之_20_Elasticsearch学习_01_概述 + 快速入门 + Java API 操作 + 创建、删除索引 + 新建、搜索、更新删除文档 + 条件查询 + 映射操作...
一 概述1.1 什么是搜索?1.2 如果用数据库做搜索会怎么样?1.3 什么是全文检索和 Lucene?1.4 什么是 Elasticsearch?1.5 Elasticsearch 的适用场景1.6 ...
- kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2
1.JAVA API操作kafka 修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...
- Hadoop读书笔记(三)Java API操作HDFS
Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的sh ...
- java api 操作helm
文章目录 java api 操作helm 一.helm架构在云管理平台开发中的不足 二.captain介绍 安装captain 卸载captain chart repo问题 三.命令行安装mongod ...
- Hbase 完全分布式模式的搭建、命令行操作、Java API操作
追风赶月莫停留,平芜尽处是春山. 文章目录 追风赶月莫停留,平芜尽处是春山. 环境 Hbase 完全分布式模式的搭建 一.下载安装包,解压到合适位置: 二.配置相关的文件: 三.将Hbase复制到其他 ...
- HDFS Java API 操作
文章目录 HDFS Java API操作 零.启动hadoop 一.HDFS常见类接口与方法 1.hdfs 常见类与接口 2.FileSystem 的常用方法 二.Java 创建Hadoop项目 1. ...
- MongoDB Java API操作很全的整理以及共享分片模式下的常见操作整理
MongoDB 是一个基于分布式文件存储的数据库.由 C++ 语言编写,一般生产上建议以共享分片的形式来部署. 但是MongoDB官方也提供了其它语言的客户端操作API.如下图所示: 提供了C.C++ ...
- 2021年大数据ZooKeeper(五):ZooKeeper Java API操作
目录 ZooKeeper Java API操作 引入maven坐标 节点的操作 ZooKeeper Java API操作 这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端 ...
最新文章
- 北航云计算公开课 01 Introduction to Cloud Computing
- react项目---基本语法字符串数组(6)
- 吴裕雄 python 神经网络——TensorFlow 花瓣分类与迁移学习(1)
- python基础知识面试题-深入解答关于Python的11道基本面试题
- HL7 ADT Message Sample
- BERT源码分析(PART I)
- php计算时间差js,JavaScript如何计算时间差(引入外部字体文件)?
- (2) freemarker入门案例2
- Java面试2021,java数据可视化项目
- 关键词是用分号还是逗号隔开_逗号、顿号、分号、冒号、破折号的用法
- 团队管理---猴子管理管理法则
- matlab 开采沉陷 何,MATLAB在开采沉陷预计可视化中的应用
- python2多线程_python_并发编程——多线程2
- Unofficial Windows Binaries for Python Extensi...
- JDK8下载 (jdk-8u271-windows-x64和jdk-8u271-linux-x64.tar)
- css的div纵向居中
- HTML、CSS面试题
- 主引导扇区及主引导记录MBR的详细说明
- 美容院 php源代码,基于ThinkPHP+B-JUI框架开发的微信美容院SPA预约消费管理系统PHP源码...
- 山西大同大学计算机分数线,山西大同大学录取分数线2021是多少分(附历年录取分数线)...
热门文章
- C/C++中static关键字用法汇总
- Linux下常用的C/C++开源Socket库
- Windows7在Notepad++中配置Python+OpenCV
- 【数据库】sqlite中的限制:数据库大小、表数、列数、行数、参数个数、连接数等
- 【AI】图示:精确度(查准率)Precision、召回率(查全率)Recall
- 【leetcode】力扣刷题(2):两数相加(go语言)
- 【Qt】Qt样式表(Style Sheet):官网说明及例子
- C++之Boost使用
- github上好的c语言项目,2019 github热门项目
- 找不到第三方怎么理赔_车子被撞,找不到肇事者怎么办?