kafaka生产者消费者demo(简易上手demo)
kafaka生产者消费者demo(简易上手demo)
文章目录
- kafaka生产者消费者demo(简易上手demo)
- 导包
- kafka官方client
- spring官方template
- spring官方springcloud stream starter
- kafka官方client使用
- 生产者Demo
- 消费者Demo
- 简易的多线程生产者
- 生产
- 消费
- 使用线程池优化生产者
- ProducerThreadPool
- 测试使用
- 测试结果
- spring官方template使用
- 配置
- 生产者Demo
- 消费者Demo
- spring官方springcloud stream starter使用
- 配置
- 启动类
- 生产者Demo
- 消费者Demo
- 测试类
- 结果
导包
kafka官方client
kafka官方提供的Java client jar包
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.0</version></dependency>
spring官方template
也可以使用spring官方提供的kafaka template
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.4</version>
</dependency>
spring官方springcloud stream starter
使用spring-cloud-starter-stream-kafka可以整合kafka进入到spring项目中
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-kafka -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.2</version>
</dependency>
kafka官方client使用
生产者Demo
使用KafkaProducer做生产者,可以使用多线程模拟多个生产者,这里提供简单的test来供以参考。
- bootstrap.servers: kafka服务器的地址。
- acks:消息的确认机制,默认值是0。
- acks=0:如果设置为0,生产者不会等待kafka的响应。
- acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
- acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
- retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送。(允许重发的情况)
- batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
- key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
- value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
@Testpublic void testPost(){//主题(当主题不存在,自动创建主题)String topic = "product_post";//配置Properties properties = new Properties();//kafka服务器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//反序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//生产者KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);//生产信息for (int i = 0; i < 100; i++) {String msg = String.format("hello,第%d条信息", i);//消息(key可以为null,key值影响消息发往哪个分区)ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, String.valueOf(i), msg);//发送kafkaProducer.send(producerRecord);System.out.println("发送第"+i+"条信息");}//关闭kafkaProducer.close();}
消费者Demo
使用KafkaConsumer做消费者client API,可以通过多线程模拟生产订阅关系。这里给一个简单的消费者demo。
- bootstrap.servers: kafka的地址。
- group.id:组名,不同组名可以重复消费。(同组重复消费会抛异常)
- enable.auto.commit:是否自动提交,默认为true。
- auto.commit.interval.ms: 从poll(拉)的回话处理时长。
- session.timeout.ms:超时时间。
- max.poll.records:一次最大拉取的数据条数。
- auto.offset.reset:消费规则,默认earliest 。
- earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
- latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
- none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
- key.deserializer: 键反序列化器,默认org.apache.kafka.common.serialization.StringDeserializer
- value.deserializer:值反序列化器,默认org.apache.kafka.common.serialization.StringDeserializer
@Testpublic void testGet() throws InterruptedException {//主题String topic = "product_post";//配置Properties properties = new Properties();//kafka服务器地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//k,v的序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//消费者分组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"Consumer-Group-1");//offset重置模式properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);//订阅(可以订阅多个主题)kafkaConsumer.subscribe(Collections.singletonList(topic));//消费while (true){//获取信息ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));//遍历records.forEach(o->{System.out.println(String.format("topic==%s,offset==%s,key==%s,value==%s",o.topic(),o.offset(),o.key(),o.value()));});//睡眠Thread.sleep(500);}}
简易的多线程生产者
生产
实现Runnable接口可以实现简易的多线程生产者,模拟多个生产者生产
@Getter
public class MyselfProducer implements Runnable{//主题(当主题不存在,自动创建主题)private final String topic;//配置private final Properties properties;//主题和配置的多线程共享public MyselfProducer(String topic,Properties properties){this.topic = topic;this.properties = properties;}@Overridepublic void run() {//每个线程单独的生产者KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);//生产信息for (int i = 0; i < 100; i++) {String msg = String.format("hello,线程%s发送第%d条信息",Thread.currentThread().getName() , i);//消息(key可以为null,key值影响消息发往哪个分区)ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, String.valueOf(i), msg);//发送kafkaProducer.send(producerRecord);//控制台显示System.out.println(msg);}//关闭kafkaProducer.close();}
}
消费
使用多线程进行生产,然后使用消费者Demo进行消费,获得以下结果
使用线程池优化生产者
ProducerThreadPool
public class ProducerThreadPool{//主题(当主题不存在,自动创建主题)private final String topic;//配置private final Properties properties;//要产生的生产者线程类private final Class<? extends Runnable> producerClass;//线程池private final ThreadPoolExecutor executor;public ProducerThreadPool(String topic,Properties properties,Class<? extends Runnable> c){//初始化线程池this.executor = new ThreadPoolExecutor(5,10,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());//主题this.topic = topic;//配置this.properties = properties;//线程类this.producerClass = c;}public Future<?> createAndsubmit(){try {//反射出构造器Constructor<? extends Runnable> constructor = producerClass.getConstructor(String.class, Properties.class);//实例化生产者线程Runnable runnable = constructor.newInstance(topic, properties);System.out.println("提交线程池");//提交到线程池return executor.submit(runnable);} catch (NoSuchMethodException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}
测试使用
写一个Test使用自己写的ProducerThreadPool生产者线程池
@Testpublic void testProducerThreadPool() throws InterruptedException {//主题(当主题不存在,自动创建主题)String topic = "threadPool_topic";//配置Properties properties = new Properties();//kafka服务器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);ProducerThreadPool producerThreadPool = new ProducerThreadPool(topic, properties, MyselfProducer.class);//生产并提交Future<?> futureA = producerThreadPool.createAndsubmit();Future<?> futureB = producerThreadPool.createAndsubmit();Future<?> futureC = producerThreadPool.createAndsubmit();Thread.sleep(5000);System.out.println(String.format("线程A状态%s",futureA.isDone()));System.out.println(String.format("线程B状态%s",futureB.isDone()));System.out.println(String.format("线程C状态%s",futureC.isDone()));}
测试结果
生产过程结果
消费结果
spring官方template使用
配置
使用spring官方提供的kafka template就需要配置Bean,讲bean注入到上下文中。
@Configuration
@EnableKafka
public class KafkaConfiguration {//ConcurrentKafkaListenerContainerFactory为创建Kafka监听器的工厂类@Beanpublic ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<Integer, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);return factory;}//kafkaTemplate实现了Kafka 生产者等功能@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<Integer, String> producerFactory) {KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory);return template;}//根据consumerProps填写的参数创建消费者工厂@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps());}//根据senderProps填写的参数创建生产者工厂@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(ProducerProps());}//消费者配置参数private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();//连接地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//GroupIDprops.put(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-Kafka-1");//是否自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交的频率props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");//Session超时设置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");//键的反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);//值的反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}//生产者配置private Map<String, Object> ProducerProps (){Map<String, Object> props = new HashMap<>();//Kafka服务器连接地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//重试机制,0为不启用重试机制props.put(ProducerConfig.RETRIES_CONFIG, 1);//控制批处理大小,单位为字节props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,减少网络IO次数props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//生产者可以使用的总内存字节来缓冲等待发送到服务器的记录props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);//键的序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//值的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}}
生产者Demo
可以通过kafkaTemplate发送消息,也可以通过spring提供的工厂生产produce并进行消息的发送。
@Component
public class MsgProducer {//主题static final String topic = "spring-kafka";//spring提供的模板类(生产)@Autowiredprivate KafkaTemplate kafkaTemplate;//spring提供的生产者工厂@Autowiredprivate ProducerFactory producerFactory;//使用template发送消息public void sendMsg(Integer key, String msg){kafkaTemplate.send(topic,key,msg);}public void sendMsg(String msg){kafkaTemplate.send(topic,msg);}//使用原生Producer client API发送消息public void sendMsgByProducer(Integer key, String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,key,msg));producer.close();}public void sendMsgByProducer(String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,msg));producer.close();}
}
消费者Demo
更具上面的配置,这些Consumer在组Consumer-Kafka-1
,组里面有两个不同的Consumer,分别是Consumer-1
,Consumer-2
。
@Slf4j
@Component
public class MsgConsumer {static final String topicA = "spring-kafka";static final String topicB = "spring-kafka-B";//订阅一个主题@KafkaListener(id = "Consumer-1",topics = {topicA})public String getMsg(String msg){return msg;} //订阅多个主题@KafkaListener(id = "Consumer-2",topics = {topicA,topicB})public String getMsgBytwo(String msg){return msg;}//指定主题分区,并指定读取的分区offset位置@KafkaListener(id = "Consumer-3",topicPartitions = {@TopicPartition(topic = topicA,partitions = {"0","1"}),@TopicPartition(topic = topicB,partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))})public String getMsgByPartition(String msg){return msg;}//通过原生Consumer获取消息public ConsumerRecords getMsgByConsumer(){Consumer consumer = consumerFactory.createConsumer();consumer.subscribe(Collections.singleton(topicA));ConsumerRecords poll = consumer.poll(Duration.ofMillis(500));consumer.close();return poll;}}
spring官方springcloud stream starter使用
spring官方提供了一套统一的消息中间件的编程框架,对外提供统一的编程方式,隐藏底层消息中间件编程的差异。
关于springcloud stream 的概念可以查看:Spring Cloud Stream 体系及原理介绍-阿里云开发者社区 (aliyun.com)
配置
spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:input: #channelName,官方提供的默认输入通道名(消费者)destination: topicA #消费者订阅的topicgroup: consumer-group-1 #消费者分组content-type: text/plainoutput:destination: topicA #生产者将数据发送的topiccontentType: text/plain
启动类
因为测试需要,本人同时bind输入和输出channel(Source,Sink)。
@SpringBootApplication
@EnableBinding({Source.class, Sink.class})
@ComponentScan("org.example.**")
public class WebApplication {public static void main(String[] args) {SpringApplication.run(WebApplication.class,args);}
}
生产者Demo
@Component
public class SourceProducer {@Autowiredprivate Source source;//默认有一个叫output的MessageChannel@Autowiredprivate MessageChannel output;//通过source发送public void send(String msg){//source.output获得是MessageChannelMessageChannel output = source.output();System.out.println("发送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());}//通过MessageChannel直接发送public void sendByChannel(String msg){System.out.println("发送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());}
}
消费者Demo
@Component
public class SinkConsumer {@StreamListener(Sink.INPUT)public void getMsg(Message<String> msg){System.out.println("收到消息:"+msg.getPayload());}
}
测试类
因为SpringRunner会启动spring容器,而容器里面有StreamListener监听着Stream,
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProductorPostTest {@Autowiredprivate SourceProducer sourceProducer;@Testpublic void testSource() throws InterruptedException {String msg = "消息A";while (true){sourceProducer.send(msg);Thread.sleep(1000);}}
}
结果
发送消息:消息A
收到消息:消息A
发送消息:消息A
收到消息:消息A
发送消息:消息A
收到消息:消息A
kafaka生产者消费者demo(简易上手demo)相关推荐
- 图文并茂的生产者消费者应用实例demo
前面的几篇文章<<.NET 中的阻塞队列BlockingCollection的正确打开方式>><<项目开发中应用如何并发处理的一二事>>从代码以及理论角 ...
- 微信小程序引入高德地图Demo 快速上手
文章目录 前言 一.获取高德key 二.引入官方实例 总结 前言 本文参照官方文档进行编写 最后引入官方实例 最终效果 ` 一.获取高德key 注册账号 https://lbs.amap.com/?r ...
- TencentOS-tiny官方开发板EVB_MX上手Demo
文章目录 零. 序 一.问题清单 二.上手Demo 1.板载LED 2.板载OLED 3.E53_IA1使用 问题1:浮点类型:交叉编译器对(float)浮点类型的支持 4.esp8266联网 01 ...
- Java synchronized 实现生产者-消费者模型
synchronized synchronized (临界资源) {// 访问临界资源的代码 } 上述代码的作用是给临界资源"加锁",其他线程访问临界资源会被阻塞,目的是保证同一时 ...
- java消费者模式_基于Java 生产者消费者模式(详细分析)
生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗.虽然它们任务不同,但处理的资源是相 ...
- 实现一个通用的生产者消费者队列(c语言版本)
背景:笔者之前一直从事嵌入式音视频相关的开发工作,对于音视频的数据的处理,生产者消费者队列必不可少,而如何实现一个高效稳定的生产者消费者队列则十分重要,不过按照笔者从业的经验,所看到的现象,不容乐观, ...
- Java 并发(生产者/消费者 模式)
>生产者/消费者 模式角色:生产者,消费者都是线程,两者中间是容器,容器内部是产品. 要求: 容器 里面要定义容量 容器 往里面添加(满时等待) 或者 从里面删除(空时等待) ,都要是阻塞的(等 ...
- Java生产者 消费者模型的一种实现
本文主要介绍java中生产者/消费者模式的实现,对java线程锁机制的一次深入理解. 生产者/消费者模型 生产者/消费者模型要保证,同一个资源在同一时间节点下只能被最多一个线程访问,这个在java中用 ...
- 生产者消费者模型实现方式:管程法,信号灯法
管程法 生产者:负责生产的数据模块 消费者:负责处理数据的模块 缓冲区:消费者不能直接从生产者获取产品,生产者生产进入缓冲区 public class Demo {public static void ...
最新文章
- static_cast, dynamic_cast, reinterpret_cast, const_cast区别比较
- 中国移动将向广大开发者开放了SDK/API等开发工具
- 预训练模型的前世今生(有福利!)
- mysql 存储过程的使用;
- 转载 网络上的8051 free IP core资源
- 【OpenCV学习笔记】【函数学习】十九(感兴趣区域)
- 安装ffmpeg及nginx模块
- android之socket编程实例
- 百度地图街景图片爬取
- Gitee 多人协作开发教程
- Tomcat修行之路-7.Tomcat-Mapper组件机制以及请求处理机制
- spacy教程--基础
- 【Blender3D模型库】飞机摇身变玩具?Blender视觉特效教程
- 前端修改input上传的图片大小
- Python编辑器UliPad安装
- 王半仙儿的日记-0013
- Adobe 官方公布的 RTMP 规范+未公布的部分
- Coke Rejection Risks Big Spill
- sql server 函数大全
- CSS_变换(transform)
热门文章
- 利用jquery的qrcode.js插件生成二维码的两种方式的使用
- OpenSSL 之 RSA 相关命令学习笔记
- python debug
- 【参数】REMOTE_LOGIN_PASSWORDFILE参数三种取值及其行为特性分析
- 0207.Domino R8.0.x群集配置手册
- python中frozenset( )和set()的用法区别
- Spring中BeanPostProcessor
- 0b3398php,思想道德修养与法律基础(九江职业技术学院)知到2020题目及答案
- windows系统如何查看端口被占用、杀进程
- Variable、Tensor、Numpy的转换