Java Kafka 简单示例


简介

    Java kafka 简单代码示例

maven依赖配置

<!-- kafka -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version>
</dependency>

kakfa生产和消费者生成

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** @author lw1243925457*/
public class KafkaUtil {public static KafkaConsumer<String, String> createConsumer(String servers, String topic) {Properties properties = new Properties();properties.put("bootstrap.servers", servers);properties.put("group.id", "group-1");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);kafkaConsumer.subscribe(Arrays.asList(topic));return kafkaConsumer;}public static void readMessage(KafkaConsumer<String, String> kafkaConsumer, int timeout) {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(timeout);for (ConsumerRecord<String, String> record : records) {String value = record.value();kafkaConsumer.commitAsync();System.out.println(value);}}}public static KafkaProducer<String, String> createProducer(String servers) {Properties properties = new Properties();properties.put("bootstrap.servers", servers);properties.put("acks", "all");properties.put("retries", 0);properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<String, String>(properties);}public static void send(KafkaProducer<String, String> producer, String topic, String message) {producer.send(new ProducerRecord<String, String>(topic, message));}
}

运行

public class Main {public static void main(String[] args) {String servers = "localhost:9092,localhost:9093,localhost:9094";String topic = "TestTopic";String message = "test";KafkaProducer<String, String> producer = KafkaUtil.createProducer(servers);KafkaUtil.send(producer, topic, message);KafkaConsumer<String, String> consumer = KafkaUtil.createConsumer(servers, topic);KafkaUtil.readMessage(consumer, 100);}
}

使用心得

总是读取最老的消息

    可能是group-id的问题,新起一个group-id名称

  • earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

参考链接

  • java 实现kafka消息生产者和消费者
  • kafka(三)—Kafka的Java代码示例和配置说明
  • Kafka - 偏移量提交
  • Kafka系列(四)Kafka消费者:从Kafka中读取数据
  • Kafka auto.offset.reset值详解

Java Kafka 简单示例相关推荐

  1. java webwork_WebWork简单示例

    [实例简介] [实例截图] [核心代码] package action; import bean.UserBean; import com.opensymphony.xwork.Action; pub ...

  2. java读取ACCESS数据库的简单示例

    java读取ACCESS数据库的简单示例  虽然简单,对初学者来说,如果没有一段可以成功执行的代码供参考,还真难调试  先用ACCESS建一个数据库 DB1.MDB,里面有一表"table1 ...

  3. java实现账号单一ip登录,使用Java实现简单后台访问并获取IP示例

    使用Java实现简单后台访问并获取IP示例 发布时间:2020-10-28 21:57:57 来源:亿速云 阅读:92 作者:Leah 使用Java实现简单后台访问并获取IP示例?针对这个问题,这篇文 ...

  4. java播放声音类和一个简单示例

    java播放声音类和一个简单示例 播放声音的类 复制代码 代码如下:  import java.io.File; import java.io.IOException; import javax.so ...

  5. java中的mapper是什么_Java使用ObjectMapper的简单示例

    一.什么是ObjectMapper? ObjectMapper类是Jackson库的主要类,它提供一些功能将数据集或对象转换的实现. 它将使用JsonParser和JsonGenerator实例来实现 ...

  6. java操作redis简单示例

    java操作redis简单示例     初学redis,在java语言和环境下完成redis的入门学习.              首先,官网下载源码,编译,安装,修改配置文件redis.conf中的 ...

  7. java调用百度地图api简单示例--获取国内任意两地之间距离

    老师让我们从百度地图的api上获取数据源最为两地运输距离,结果百度地图api的开发文档居然连个示例都没有...于是上网找了半天,都是一百多行的源码,我就想用个api,你给我这玩意???终于最后还是找到 ...

  8. java avg_JPA 查询AVG简单示例

    JPA教程 - JPA查询AVG简单示例 JPQL中的聚合查询的语法与SQL的语法非常相似. 有五个支持的聚合函数AVG COUNT MIN MAX SUM 结果可以分组在GROUP BY子句中,并使 ...

  9. Java TCP 抓包简单示例

    Java TCP 抓包简单示例 由于目前网上没有一篇能真正方便读者操作的此类文章,本文对此通过示例做个简单介绍. 缘起 有一天本来在看头条,然后看到一则游戏的广告,看画面可能是我喜欢的建造类型(纪元1 ...

最新文章

  1. linux+软盘启动程序,软盘上的Linux系统方案
  2. HashMap的实现原理-----哈希讲解
  3. Jenkins Pipeline动态使用Git分支名称的技巧
  4. 【正一专栏】欧冠小组赛第四轮综述——有钱就能风清扬
  5. 学习python的基础是什么_学python需要什么基础
  6. SAP Spartacus使用cxComponentWrapper测试MiniCart
  7. 浅谈HR谈薪水的技巧
  8. php 所有子类,php获取分类以下的全部子类方法
  9. 求出一个整型数组的最大子集和
  10. 清除Mac OS X文件系统的附加属性@
  11. js函数内部定义函数的理解
  12. 宏病毒专杀软件测试大乐,推荐几个宏病毒专杀工具
  13. x390开机键_【ThinkPadX390评测】ThinkPad X390 4G版全球首测:全时在线超长续航的便携商务利器(全文)_ThinkPad X390_笔记本评测-中关村在线...
  14. {dede-list flag='h'}DedeCMS支持flag标签解决办法
  15. 把你的手机、平板变成电脑第二屏:Windows 屏幕扩展工具横评
  16. mysql本机地址_mysql连接时用的IP地址是不是电脑本机上的IP地址呢!
  17. tomcat jdbc数据库连接池详解之PoolCleaner
  18. 微信企业号开发模式的PHP代码
  19. 原生 APP 与 web APP的区别比较
  20. 技术人修炼之道阅读笔记(七)系统性思维方法

热门文章

  1. IList对象排序算法
  2. 解决苹果手机点击返回键页面不刷新问题
  3. 解决window.open被拦截问题
  4. 专访头条搜索:从推荐到搜索,如何构建搜索技术的另一种可能?
  5. 哈佛大学推荐:让自己变幸福的20件小事(值得收藏)
  6. 图卷积神经网络入门详解
  7. 同质异质网络——(F(fraud) A(Analytics) UDPSNT(Wylie_2015))
  8. 二进制文本编辑器_Textadept for mac(文本编辑) v10.5免费版
  9. centos7修改ip地址自动获取_南京课工场IT培训:如何搭建DHCP服务器及自动获取IP地址及相关操作...
  10. 独立站可以一个人做吗?