Java Kafka 简单示例
Java Kafka 简单示例
简介
Java kafka 简单代码示例
maven依赖配置
<!-- kafka -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version>
</dependency>
kakfa生产和消费者生成
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** @author lw1243925457*/
public class KafkaUtil {public static KafkaConsumer<String, String> createConsumer(String servers, String topic) {Properties properties = new Properties();properties.put("bootstrap.servers", servers);properties.put("group.id", "group-1");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);kafkaConsumer.subscribe(Arrays.asList(topic));return kafkaConsumer;}public static void readMessage(KafkaConsumer<String, String> kafkaConsumer, int timeout) {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(timeout);for (ConsumerRecord<String, String> record : records) {String value = record.value();kafkaConsumer.commitAsync();System.out.println(value);}}}public static KafkaProducer<String, String> createProducer(String servers) {Properties properties = new Properties();properties.put("bootstrap.servers", servers);properties.put("acks", "all");properties.put("retries", 0);properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<String, String>(properties);}public static void send(KafkaProducer<String, String> producer, String topic, String message) {producer.send(new ProducerRecord<String, String>(topic, message));}
}
运行
public class Main {public static void main(String[] args) {String servers = "localhost:9092,localhost:9093,localhost:9094";String topic = "TestTopic";String message = "test";KafkaProducer<String, String> producer = KafkaUtil.createProducer(servers);KafkaUtil.send(producer, topic, message);KafkaConsumer<String, String> consumer = KafkaUtil.createConsumer(servers, topic);KafkaUtil.readMessage(consumer, 100);}
}
使用心得
总是读取最老的消息
可能是group-id的问题,新起一个group-id名称
- earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
参考链接
- java 实现kafka消息生产者和消费者
- kafka(三)—Kafka的Java代码示例和配置说明
- Kafka - 偏移量提交
- Kafka系列(四)Kafka消费者:从Kafka中读取数据
- Kafka auto.offset.reset值详解
Java Kafka 简单示例相关推荐
- java webwork_WebWork简单示例
[实例简介] [实例截图] [核心代码] package action; import bean.UserBean; import com.opensymphony.xwork.Action; pub ...
- java读取ACCESS数据库的简单示例
java读取ACCESS数据库的简单示例 虽然简单,对初学者来说,如果没有一段可以成功执行的代码供参考,还真难调试 先用ACCESS建一个数据库 DB1.MDB,里面有一表"table1 ...
- java实现账号单一ip登录,使用Java实现简单后台访问并获取IP示例
使用Java实现简单后台访问并获取IP示例 发布时间:2020-10-28 21:57:57 来源:亿速云 阅读:92 作者:Leah 使用Java实现简单后台访问并获取IP示例?针对这个问题,这篇文 ...
- java播放声音类和一个简单示例
java播放声音类和一个简单示例 播放声音的类 复制代码 代码如下: import java.io.File; import java.io.IOException; import javax.so ...
- java中的mapper是什么_Java使用ObjectMapper的简单示例
一.什么是ObjectMapper? ObjectMapper类是Jackson库的主要类,它提供一些功能将数据集或对象转换的实现. 它将使用JsonParser和JsonGenerator实例来实现 ...
- java操作redis简单示例
java操作redis简单示例 初学redis,在java语言和环境下完成redis的入门学习. 首先,官网下载源码,编译,安装,修改配置文件redis.conf中的 ...
- java调用百度地图api简单示例--获取国内任意两地之间距离
老师让我们从百度地图的api上获取数据源最为两地运输距离,结果百度地图api的开发文档居然连个示例都没有...于是上网找了半天,都是一百多行的源码,我就想用个api,你给我这玩意???终于最后还是找到 ...
- java avg_JPA 查询AVG简单示例
JPA教程 - JPA查询AVG简单示例 JPQL中的聚合查询的语法与SQL的语法非常相似. 有五个支持的聚合函数AVG COUNT MIN MAX SUM 结果可以分组在GROUP BY子句中,并使 ...
- Java TCP 抓包简单示例
Java TCP 抓包简单示例 由于目前网上没有一篇能真正方便读者操作的此类文章,本文对此通过示例做个简单介绍. 缘起 有一天本来在看头条,然后看到一则游戏的广告,看画面可能是我喜欢的建造类型(纪元1 ...
最新文章
- linux+软盘启动程序,软盘上的Linux系统方案
- HashMap的实现原理-----哈希讲解
- Jenkins Pipeline动态使用Git分支名称的技巧
- 【正一专栏】欧冠小组赛第四轮综述——有钱就能风清扬
- 学习python的基础是什么_学python需要什么基础
- SAP Spartacus使用cxComponentWrapper测试MiniCart
- 浅谈HR谈薪水的技巧
- php 所有子类,php获取分类以下的全部子类方法
- 求出一个整型数组的最大子集和
- 清除Mac OS X文件系统的附加属性@
- js函数内部定义函数的理解
- 宏病毒专杀软件测试大乐,推荐几个宏病毒专杀工具
- x390开机键_【ThinkPadX390评测】ThinkPad X390 4G版全球首测:全时在线超长续航的便携商务利器(全文)_ThinkPad X390_笔记本评测-中关村在线...
- {dede-list flag='h'}DedeCMS支持flag标签解决办法
- 把你的手机、平板变成电脑第二屏:Windows 屏幕扩展工具横评
- mysql本机地址_mysql连接时用的IP地址是不是电脑本机上的IP地址呢!
- tomcat jdbc数据库连接池详解之PoolCleaner
- 微信企业号开发模式的PHP代码
- 原生 APP 与 web APP的区别比较
- 技术人修炼之道阅读笔记(七)系统性思维方法
热门文章
- IList对象排序算法
- 解决苹果手机点击返回键页面不刷新问题
- 解决window.open被拦截问题
- 专访头条搜索:从推荐到搜索,如何构建搜索技术的另一种可能?
- 哈佛大学推荐:让自己变幸福的20件小事(值得收藏)
- 图卷积神经网络入门详解
- 同质异质网络——(F(fraud) A(Analytics) UDPSNT(Wylie_2015))
- 二进制文本编辑器_Textadept for mac(文本编辑) v10.5免费版
- centos7修改ip地址自动获取_南京课工场IT培训:如何搭建DHCP服务器及自动获取IP地址及相关操作...
- 独立站可以一个人做吗?