一、Kafka Consumer API

1.1、Consumer

1.2、KafkaConsumer

1.3、ConsumerRecords

1.4、ConsumerRecord

1.5、KafkaConsumer 实战

package demo02;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;public class SimpleConsumer {public static void main(String[] args) {String topic = "test_02_02";String group = "test_group";Map<String, Object> kafkaProperties = new HashMap<>();kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");kafkaProperties.put("group.id", group);kafkaProperties.put("enable.auto.commit","true");kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer = new KafkaConsumer<>(kafkaProperties);consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}
}

结果:

二、Producer & Consumer整合实战

  • 1、设计一个工具类可以返回随机字符串

WordUtil .java

package demo03;import org.apache.kafka.common.protocol.types.Field;import java.util.Random;public class WordUtil {public static final String[] WORDS = "A traditional queue retains records in-order on the server, and if multiple consumers consume from the queue then the server hands out records in the order they are stored. However, although the server hands out records in order, the records are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the records is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of \"exclusive consumer\" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing. Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.".split(" ");static Random random = new Random();public static KV generateRandom(){int index = random.nextInt(WORDS.length);return new KV(String.valueOf(index),WORDS[index]);}public static void main(String[] args) {for(int i=0;i<10;i++){KV kv = generateRandom();System.out.printf("key: %s, value: %s\n",kv.getK(),kv.getV());}}
}

KV.java

package demo03;import org.apache.kafka.common.protocol.types.Field;public class KV {public String k;public String v;public KV(String k, String v) {this.k = k;this.v = v;}public String getK() {return k;}public void setK(String k) {this.k = k;}public String getV() {return v;}public void setV(String v) {this.v = v;}
}

执行WordUtil.java结果:

  • 2、设计Producer可以每秒发送数据

TimerProducer.java

package demo03;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.HashMap;
import java.util.Map;public class TimerProducer {public static void main(String[] args) throws InterruptedException {String topic = "test_02_02";Map<String,Object> kafkaProperties = new HashMap<>();kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");kafkaProperties.put("acks", "all");kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(kafkaProperties);int size = 60;for (int i = 0; i < size; i++) {Thread.sleep(1000L);KV kv = WordUtil.generateRandom();producer.send(new ProducerRecord<>(topic, kv.getK(), kv.getV()));}producer.close();}
}

TimerConsumer.java

package demo03;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;public class TimerConsumer {public static void main(String[] args) {String topic = "test_02_02";String group = "test_group";Map<String, Object> kafkaProperties = new HashMap<>();kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");kafkaProperties.put("group.id", group);kafkaProperties.put("enable.auto.commit", "true");kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties);consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));System.out.printf("\nTime: %s\n",new Date());for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}
}

结果:

学习笔记Kafka(六)—— Kafka Consumer API及开发实例相关推荐

  1. Zabbix学习笔记(六)Zabbix的APi使用

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 1.Zabbix API 3.获取tokens 4.实例 4.1获取主机组ID 4.2 获取监控模板ID 4.3 创建主机 ...

  2. Windows进程与线程学习笔记(六)—— 线程切换

    Windows进程与线程学习笔记(六)-- 线程切换 主动切换 分析KiSwapContext 分析SwapContext 分析KiSWapThread 总结 时钟中断切换 系统时钟 分析INT 0x ...

  3. 《Go语言圣经》学习笔记 第六章 方法

    <Go语言圣经>学习笔记 第六章 方法 目录 方法声明 基于指针对象的方法 通过嵌入结构体来扩展类型 方法值和方法表达式 示例:Bit数组 封装 注:学习<Go语言圣经>笔记, ...

  4. opencv学习笔记(六)直方图比较图片相似度

    opencv学习笔记(六)直方图比较图片相似度 opencv提供了API来比较图片的相似程度,使我们很简单的就能对2个图片进行比较,这就是直方图的比较,直方图英文是histogram, 原理就是就是将 ...

  5. JavaScript学习笔记(六)(Jquery入门)

    JavaScript学习笔记(六) 一.jQuery是什么? 二.jQuery的安装 三.载入事件区别 四.jQuery对象和DOM对象的区别 DOM对象和Jquery对象互转 五.选择器 5.1 j ...

  6. 《区块链原理与技术》学习笔记(六) — 区块链安全

    <区块链原理与技术>学习笔记 第六部分 四.区块链网络层 1. 网络层安全 1.1 分布式拒绝服务攻击(DDos) 1.2 延展性攻击 1.3 日蚀攻击 1.4 分割攻击 1.5 延迟攻击 ...

  7. PyTorch 学习笔记(六):PyTorch hook 和关于 PyTorch backward 过程的理解 call

    您的位置 首页 PyTorch 学习笔记系列 PyTorch 学习笔记(六):PyTorch hook 和关于 PyTorch backward 过程的理解 发布: 2017年8月4日 7,195阅读 ...

  8. OpenCV学习笔记(六)(七)(八)(九)(十)

    OpenCV学习笔记(六)--对XML和YAML文件实现I/O操作 1. XML.YAML文件的打开和关闭 XML\YAML文件在OpenCV中的数据结构为FileStorage,打开操作例如: [c ...

  9. Intel VT学习笔记(六)—— VM-Exit Handler

    Intel VT学习笔记(六)-- VM-Exit Handler Reutrn To DriverEntry VM-Exit Handler External interrupt I/O instr ...

最新文章

  1. solidwork2019/2020安装后出现无法获得许可证
  2. java多if语句的优化方案_Java代码多分支语句优化
  3. 【 58沈剑 架构师之路】各种SQL到底加了什么锁?
  4. python表示复数的语句是_在python中复数如何表示
  5. 1-2月我国程控交换机产量同比减少13.96%
  6. 十、给小白看的第三篇Python基础教程
  7. 如何通过shell脚本操作MongoDB
  8. 总结了24个C++的大坑,看你能躲过几个?
  9. python中使用什么注释语句和运算_Python基础知识
  10. sap-statistics in SAP UI5 http roundtrip
  11. android 应用在启动后进行全局的的初始化操作
  12. 秒杀系统设计中的业务性思考
  13. Spring3.0_调试错误集
  14. 基于ARM的SoC设计入门
  15. 沉迷游戏在心理学怎么解释
  16. c语言谱曲软件,基于C语言的音乐谱曲技巧与应用研究
  17. Nginx-代理服务器
  18. 无聊的时候氵一些小套路
  19. QT概念详解及开发入门简介
  20. 批量更新用户mous余额

热门文章

  1. 两步实现spark集群
  2. tensorflow 入门经典实例
  3. oracle java数据类型转换函数_Oracle基础——单行函数(类型转换函数)
  4. 文巾解题 面试题 01.02. 判定是否互为字符重排
  5. 用Tableau画Voronoi Treemap
  6. mapreduce编程实例(2)-求最大值和最小值
  7. Hadoop学习之以伪分布模式部署Hadoop及常见问题
  8. 苹果企业证书_企业签名App稳定吗?
  9. logistic回归 简介_金融专业进!逻辑回归模型简述
  10. 【git学习二】git基础之git管理本地项目