1、kafka版本:kafka_2.11-1.0.1
2、配置pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>kafka-demo</groupId><artifactId>kafka-demo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.1</version><!--<exclusions><exclusion><artifactId>jmxtools</artifactId><groupId>com.sun.jdmk</groupId></exclusion><exclusion><artifactId>jmxri</artifactId><groupId>com.sun.jmx</groupId></exclusion><exclusion><artifactId>jms</artifactId><groupId>javax.jms</groupId></exclusion></exclusions>--></dependency></dependencies>
</project>

编写producer demo案例

package kafka;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Properties;public class KafkaProducer {private final Producer<String, String> producer;public final static String TOPIC = "TEST-TOPIC";private KafkaProducer(){Properties props = new Properties();//此处配置的是kafka的端口props.put("metadata.broker.list", "192.168.18.140:9092");//配置value的序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");//配置key的序列化类props.put("key.serializer.class", "kafka.serializer.StringEncoder");//request.required.acks//0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).//1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).//-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.props.put("request.required.acks","-1");producer = new Producer<String, String>(new ProducerConfig(props));}void produce() {int messageNo = 1000;final int COUNT = 10000;while (messageNo < COUNT) {String key = String.valueOf(messageNo);String data = "hello kafka message " + key;producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));System.out.println(data);messageNo ++;}}public static void main( String[] args ){new KafkaProducer().produce();}
}

运行结果:

Connected to the target VM, address: '127.0.0.1:56726', transport: 'socket'
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
hello kafka message 1000
hello kafka message 1001
hello kafka message 1002
hello kafka message 1003

编写consumer demo 案例

package kafka;import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;public class KafkaConsumer {private final ConsumerConnector consumer;private KafkaConsumer() {Properties props = new Properties();//zookeeper 配置props.put("zookeeper.connect", "192.168.18.140:2181");//group 代表一个消费组props.put("group.id", "jd-group");//zk连接超时props.put("zookeeper.session.timeout.ms", "4000");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "smallest");//序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");ConsumerConfig config = new ConsumerConfig(props);consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);}void consume() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());Map<String, List<KafkaStream<String, String>>> consumerMap =consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext()) {System.out.println(it.next().message());}}public static void main(String[] args) {new KafkaConsumer().consume();}
}

运行结果:

"C:\Program Files\Java\jdk1.8.0_101\bin\java.exe" -javaagent:D:\Installed\ideaIU-2018.1.5.win-scala\lib\idea_rt.jar=56756:D:\Installed\ideaIU-2018.1.5.win-scala\bin -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_101\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_101\jre\lib\rt.jar;E:\workspace\kafka\kafka-demo\target\classes;D:\maven3.3\localRepository\org\apache\kafka\kafka_2.11\1.0.1\kafka_2.11-1.0.1.jar;D:\maven3.3\localRepository\org\apache\kafka\kafka-clients\1.0.1\kafka-clients-1.0.1.jar;D:\maven3.3\localRepository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;D:\maven3.3\localRepository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\maven3.3\localRepository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;D:\maven3.3\localRepository\com\fasterxml\jackson\core\jackson-databind\2.9.1\jackson-databind-2.9.1.jar;D:\maven3.3\localRepository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;D:\maven3.3\localRepository\com\fasterxml\jackson\core\jackson-core\2.9.1\jackson-core-2.9.1.jar;D:\maven3.3\localRepository\net\sf\jopt-simple\jopt-simple\5.0.4\jopt-simple-5.0.4.jar;D:\maven3.3\localRepository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;D:\maven3.3\localRepository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;D:\maven3.3\localRepository\org\slf4j\slf4j-log4j12\1.7.25\slf4j-log4j12-1.7.25.jar;D:\maven3.3\localRepository\log4j\log4j\1.2.17\log4j-1.2.17.jar;D:\maven3.3\localRepository\com\101tec\zkclient\0.10\zkclient-0.10.jar;D:\maven3.3\localRepository\org\apache\zookeeper\zookeeper\3.4.10\zookeeper-3.4.10.jar" kafka.KafkaConsumer
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
hello kafka message 1000
hello kafka message 1002
hello kafka message 1004
hello kafka message 1006

代码来源:http://outofmemory.cn/code-snippet/33051/java-kafka-producer-consumer-example

最简单的kafka demo案例相关推荐

  1. bootstrap datetimepicker 用法+demo案例下载

    bootstrap datetimepicker 用法+demo案例下载 官网文档地址是:http://www.bootcss.com/p/bootstrap-datetimepicker/ date ...

  2. restTemplate loadbalance 负载均衡使用demo 案例 原理以及全网最细源码解析

    restTemplate 是spring 提供的http请求工具,类似于httpclient, 默认情况下与其他的http 工具类没有区别 但是当添加了@Loadbalance 注解之后,则具备了负载 ...

  3. vue配置文件读取_Vue+Spring Boot简单用户登录Demo实现

    ❝ 「如果觉得文章好看,欢迎点赞.」「同时欢迎关注微信公众号:氷泠之路.」 ❞ 这是一个前后端分离的简单用户登录Demo. 技术栈 Vue BootstrapVue Kotlin Spring Boo ...

  4. flask简单的登录demo

    flask框架(二):简单的登录demo 一:main.py # -*- coding: utf-8 -*- # @Author : Felix Wang # @time : 2018/7/3 22: ...

  5. python爬虫简单实例-最简单的Python爬虫案例,看得懂说明你已入门,附赠教程

    原标题:最简单的Python爬虫案例,看得懂说明你已入门,附赠教程 这是最简单的Python爬虫案例,如果你能看懂,那么请你保持信心,因为你已经入门Python爬虫,只要带着信心和努力,你的技术能力在 ...

  6. PHP酒店管理demo案例(数组遍历)

    PHP酒店管理demo案例(数组遍历) 目录 PHP酒店管理demo案例(数组遍历) PHP酒店管理前台编码: PHP酒店管理后台编码: 执行效果: 点击入住测试: 退房测试: 源码地址: PHP酒店 ...

  7. 简单的vue入门案例

    一. 简单入门Hello World案例 二.插值表达式 三.点击事件 四.按键事件 1.如果按下不是 0 - 9 则阻止事件执行 2.打印按下什么按键 五.鼠标事件 1.打印绝对坐标 2.打印相对坐 ...

  8. mmdetection 使用笔记 01: 安装与简单的推理demo

    mmdetection 使用笔记 01: 安装与简单的推理demo mmdetection是来自商汤和港中文联合实验室openmmlab推出的目标检测工具包,与其同系列的还有基础视觉包mmcv,图像分 ...

  9. mui组件 a 锚点定位(Demo案例演示)- 代码篇

    文章目录 `从踩坑,入坑,到跳出坑`:mui框架(在mui-scroll中如何进行页内锚点跳转) `那么,用什么方法实现锚点跳转?` `思路就是:` - 我们可以使用另外一种MUI组件,即:`(顶部选 ...

最新文章

  1. 4键电子手表说明书_电子手表怎么调(电子手表的四个键的功能各是什么)
  2. Django:模型model和数据库mysql(一)
  3. python之Argparse模块
  4. WebBrowser内存泄露
  5. 关于拖拽上传 [一个拖拽上传修改头像的流程]
  6. linux ssh密钥对,Mac使用ssh密钥登录Linux
  7. C#窗体在任务栏对窗体放大或缩小
  8. Spring整合RabbitMQ
  9. jar 打包命令详解
  10. o00o0o php,PHP $O00OO0=urldecode eval 解密,记一次商业源码的去后门
  11. java统计计数_java – 使用LongAdder计算统计计数器的最大值?
  12. 2022内推 | 字节跳动校招 + 社招,包括NLP、CV和ASR和研究员等
  13. 关于用Sql Server 2008 搭建一个多评委多客户端的比赛打分平台的整体构想
  14. WMS系统仓库条码管理流程解析
  15. AIC、BIC、QAIC及HQ准则
  16. 现有开发语言以及适用范围
  17. Spring Boot(二):整合 JPA 及 事务控制
  18. VAE 中后验坍塌问题
  19. 如何使用谷歌浏览器Chrome把整个网页保存成图片
  20. 产品分析中如何去做用户调研

热门文章

  1. java 中的vector_详解Java中的Vector
  2. python3网络爬虫代码_《Python3网络爬虫开发实战代码》
  3. 《机器学习实战》第十三章 PCA
  4. Python自动化运维——IP地址处理模块
  5. 用Python处理图片九宫格
  6. Uipath 学习栏目基础教学:2Uipath变量介绍
  7. pyqt5讲解3:QComboBox,QSpinBox,QSlider
  8. VTK:绘制BorderPixelSize边框像素大小用法实战
  9. wxWidgets:wxNavigationEnabled< W >类模板的用法
  10. boost::johnson_all_pairs_shortest_paths用法的测试程序