kafaka生产者消费者demo(简易上手demo)

文章目录

  • kafaka生产者消费者demo(简易上手demo)
      • 导包
        • kafka官方client
        • spring官方template
        • spring官方springcloud stream starter
    • kafka官方client使用
      • 生产者Demo
      • 消费者Demo
      • 简易的多线程生产者
        • 生产
        • 消费
      • 使用线程池优化生产者
        • ProducerThreadPool
        • 测试使用
        • 测试结果
    • spring官方template使用
      • 配置
      • 生产者Demo
      • 消费者Demo
    • spring官方springcloud stream starter使用
      • 配置
      • 启动类
      • 生产者Demo
      • 消费者Demo
      • 测试类
      • 结果

导包

kafka官方client

kafka官方提供的Java client jar包

 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.0</version></dependency>

spring官方template

也可以使用spring官方提供的kafaka template

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.4</version>
</dependency>

spring官方springcloud stream starter

使用spring-cloud-starter-stream-kafka可以整合kafka进入到spring项目中

<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-kafka -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.2</version>
</dependency>

kafka官方client使用

生产者Demo

使用KafkaProducer做生产者,可以使用多线程模拟多个生产者,这里提供简单的test来供以参考。

  • bootstrap.servers: kafka服务器的地址。

    • acks:消息的确认机制,默认值是0。
    • acks=0:如果设置为0,生产者不会等待kafka的响应。
    • acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
    • acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
  • retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送。(允许重发的情况)
  • batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
  • key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
  • value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
 @Testpublic void testPost(){//主题(当主题不存在,自动创建主题)String topic = "product_post";//配置Properties properties = new Properties();//kafka服务器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//反序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//生产者KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);//生产信息for (int i = 0; i < 100; i++) {String msg = String.format("hello,第%d条信息", i);//消息(key可以为null,key值影响消息发往哪个分区)ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, String.valueOf(i), msg);//发送kafkaProducer.send(producerRecord);System.out.println("发送第"+i+"条信息");}//关闭kafkaProducer.close();}

消费者Demo

使用KafkaConsumer做消费者client API,可以通过多线程模拟生产订阅关系。这里给一个简单的消费者demo。

  • bootstrap.servers: kafka的地址。
  • group.id:组名,不同组名可以重复消费。(同组重复消费会抛异常)
  • enable.auto.commit:是否自动提交,默认为true。
  • auto.commit.interval.ms: 从poll(拉)的回话处理时长。
  • session.timeout.ms:超时时间。
  • max.poll.records:一次最大拉取的数据条数。
  • auto.offset.reset:消费规则,默认earliest 。
    • earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
    • latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
    • none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
  • key.deserializer: 键反序列化器,默认org.apache.kafka.common.serialization.StringDeserializer
  • value.deserializer:值反序列化器,默认org.apache.kafka.common.serialization.StringDeserializer
 @Testpublic void testGet() throws InterruptedException {//主题String topic = "product_post";//配置Properties properties = new Properties();//kafka服务器地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//k,v的序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//消费者分组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"Consumer-Group-1");//offset重置模式properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);//订阅(可以订阅多个主题)kafkaConsumer.subscribe(Collections.singletonList(topic));//消费while (true){//获取信息ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));//遍历records.forEach(o->{System.out.println(String.format("topic==%s,offset==%s,key==%s,value==%s",o.topic(),o.offset(),o.key(),o.value()));});//睡眠Thread.sleep(500);}}

简易的多线程生产者

生产

实现Runnable接口可以实现简易的多线程生产者,模拟多个生产者生产

@Getter
public class MyselfProducer implements Runnable{//主题(当主题不存在,自动创建主题)private final String topic;//配置private final Properties properties;//主题和配置的多线程共享public MyselfProducer(String topic,Properties properties){this.topic = topic;this.properties = properties;}@Overridepublic void run() {//每个线程单独的生产者KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);//生产信息for (int i = 0; i < 100; i++) {String msg = String.format("hello,线程%s发送第%d条信息",Thread.currentThread().getName() , i);//消息(key可以为null,key值影响消息发往哪个分区)ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, String.valueOf(i), msg);//发送kafkaProducer.send(producerRecord);//控制台显示System.out.println(msg);}//关闭kafkaProducer.close();}
}
消费

使用多线程进行生产,然后使用消费者Demo进行消费,获得以下结果

使用线程池优化生产者

ProducerThreadPool
public class ProducerThreadPool{//主题(当主题不存在,自动创建主题)private final String topic;//配置private final Properties properties;//要产生的生产者线程类private final Class<? extends Runnable> producerClass;//线程池private final ThreadPoolExecutor executor;public ProducerThreadPool(String topic,Properties properties,Class<? extends Runnable> c){//初始化线程池this.executor = new ThreadPoolExecutor(5,10,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());//主题this.topic = topic;//配置this.properties = properties;//线程类this.producerClass = c;}public Future<?> createAndsubmit(){try {//反射出构造器Constructor<? extends Runnable> constructor = producerClass.getConstructor(String.class, Properties.class);//实例化生产者线程Runnable runnable = constructor.newInstance(topic, properties);System.out.println("提交线程池");//提交到线程池return executor.submit(runnable);} catch (NoSuchMethodException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}
测试使用

写一个Test使用自己写的ProducerThreadPool生产者线程池

 @Testpublic void testProducerThreadPool() throws InterruptedException {//主题(当主题不存在,自动创建主题)String topic = "threadPool_topic";//配置Properties properties = new Properties();//kafka服务器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);ProducerThreadPool producerThreadPool = new ProducerThreadPool(topic, properties, MyselfProducer.class);//生产并提交Future<?> futureA = producerThreadPool.createAndsubmit();Future<?> futureB = producerThreadPool.createAndsubmit();Future<?> futureC = producerThreadPool.createAndsubmit();Thread.sleep(5000);System.out.println(String.format("线程A状态%s",futureA.isDone()));System.out.println(String.format("线程B状态%s",futureB.isDone()));System.out.println(String.format("线程C状态%s",futureC.isDone()));}
测试结果

生产过程结果

消费结果

spring官方template使用

配置

使用spring官方提供的kafka template就需要配置Bean,讲bean注入到上下文中。

@Configuration
@EnableKafka
public class KafkaConfiguration {//ConcurrentKafkaListenerContainerFactory为创建Kafka监听器的工厂类@Beanpublic ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<Integer, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);return factory;}//kafkaTemplate实现了Kafka 生产者等功能@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<Integer, String> producerFactory) {KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory);return template;}//根据consumerProps填写的参数创建消费者工厂@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps());}//根据senderProps填写的参数创建生产者工厂@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(ProducerProps());}//消费者配置参数private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();//连接地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//GroupIDprops.put(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-Kafka-1");//是否自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交的频率props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");//Session超时设置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");//键的反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);//值的反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}//生产者配置private Map<String, Object> ProducerProps (){Map<String, Object> props = new HashMap<>();//Kafka服务器连接地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//重试机制,0为不启用重试机制props.put(ProducerConfig.RETRIES_CONFIG, 1);//控制批处理大小,单位为字节props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,减少网络IO次数props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//生产者可以使用的总内存字节来缓冲等待发送到服务器的记录props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);//键的序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//值的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}}

生产者Demo

可以通过kafkaTemplate发送消息,也可以通过spring提供的工厂生产produce并进行消息的发送。

@Component
public class MsgProducer {//主题static final String topic = "spring-kafka";//spring提供的模板类(生产)@Autowiredprivate KafkaTemplate kafkaTemplate;//spring提供的生产者工厂@Autowiredprivate ProducerFactory producerFactory;//使用template发送消息public void sendMsg(Integer key, String msg){kafkaTemplate.send(topic,key,msg);}public void sendMsg(String msg){kafkaTemplate.send(topic,msg);}//使用原生Producer client API发送消息public void sendMsgByProducer(Integer key, String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,key,msg));producer.close();}public void sendMsgByProducer(String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,msg));producer.close();}
}

消费者Demo

更具上面的配置,这些Consumer在组Consumer-Kafka-1,组里面有两个不同的Consumer,分别是Consumer-1Consumer-2

@Slf4j
@Component
public class MsgConsumer {static final String topicA = "spring-kafka";static final String topicB = "spring-kafka-B";//订阅一个主题@KafkaListener(id = "Consumer-1",topics = {topicA})public String getMsg(String msg){return msg;} //订阅多个主题@KafkaListener(id = "Consumer-2",topics = {topicA,topicB})public String getMsgBytwo(String msg){return msg;}//指定主题分区,并指定读取的分区offset位置@KafkaListener(id = "Consumer-3",topicPartitions = {@TopicPartition(topic = topicA,partitions = {"0","1"}),@TopicPartition(topic = topicB,partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))})public String getMsgByPartition(String msg){return msg;}//通过原生Consumer获取消息public ConsumerRecords getMsgByConsumer(){Consumer consumer = consumerFactory.createConsumer();consumer.subscribe(Collections.singleton(topicA));ConsumerRecords poll = consumer.poll(Duration.ofMillis(500));consumer.close();return poll;}}

spring官方springcloud stream starter使用

spring官方提供了一套统一的消息中间件的编程框架,对外提供统一的编程方式,隐藏底层消息中间件编程的差异。

关于springcloud stream 的概念可以查看:Spring Cloud Stream 体系及原理介绍-阿里云开发者社区 (aliyun.com)

配置

spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:input:  #channelName,官方提供的默认输入通道名(消费者)destination: topicA  #消费者订阅的topicgroup: consumer-group-1  #消费者分组content-type: text/plainoutput:destination: topicA  #生产者将数据发送的topiccontentType: text/plain

启动类

因为测试需要,本人同时bind输入和输出channel(Source,Sink)。

@SpringBootApplication
@EnableBinding({Source.class, Sink.class})
@ComponentScan("org.example.**")
public class WebApplication {public static void main(String[] args) {SpringApplication.run(WebApplication.class,args);}
}

生产者Demo

@Component
public class SourceProducer {@Autowiredprivate Source source;//默认有一个叫output的MessageChannel@Autowiredprivate MessageChannel output;//通过source发送public void send(String msg){//source.output获得是MessageChannelMessageChannel output = source.output();System.out.println("发送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());}//通过MessageChannel直接发送public void sendByChannel(String msg){System.out.println("发送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());}
}

消费者Demo

@Component
public class SinkConsumer {@StreamListener(Sink.INPUT)public void getMsg(Message<String> msg){System.out.println("收到消息:"+msg.getPayload());}
}

测试类

因为SpringRunner会启动spring容器,而容器里面有StreamListener监听着Stream,

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProductorPostTest {@Autowiredprivate SourceProducer sourceProducer;@Testpublic void testSource() throws InterruptedException {String msg = "消息A";while (true){sourceProducer.send(msg);Thread.sleep(1000);}}
}

结果

发送消息:消息A
收到消息:消息A
发送消息:消息A
收到消息:消息A
发送消息:消息A
收到消息:消息A

kafaka生产者消费者demo(简易上手demo)相关推荐

  1. 图文并茂的生产者消费者应用实例demo

    前面的几篇文章<<.NET 中的阻塞队列BlockingCollection的正确打开方式>><<项目开发中应用如何并发处理的一二事>>从代码以及理论角 ...

  2. 微信小程序引入高德地图Demo 快速上手

    文章目录 前言 一.获取高德key 二.引入官方实例 总结 前言 本文参照官方文档进行编写 最后引入官方实例 最终效果 ` 一.获取高德key 注册账号 https://lbs.amap.com/?r ...

  3. TencentOS-tiny官方开发板EVB_MX上手Demo

    文章目录 零. 序 一.问题清单 二.上手Demo 1.板载LED 2.板载OLED 3.E53_IA1使用 问题1:浮点类型:交叉编译器对(float)浮点类型的支持 4.esp8266联网 01 ...

  4. Java synchronized 实现生产者-消费者模型

    synchronized synchronized (临界资源) {// 访问临界资源的代码 } 上述代码的作用是给临界资源"加锁",其他线程访问临界资源会被阻塞,目的是保证同一时 ...

  5. java消费者模式_基于Java 生产者消费者模式(详细分析)

    生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗.虽然它们任务不同,但处理的资源是相 ...

  6. 实现一个通用的生产者消费者队列(c语言版本)

    背景:笔者之前一直从事嵌入式音视频相关的开发工作,对于音视频的数据的处理,生产者消费者队列必不可少,而如何实现一个高效稳定的生产者消费者队列则十分重要,不过按照笔者从业的经验,所看到的现象,不容乐观, ...

  7. Java 并发(生产者/消费者 模式)

    >生产者/消费者 模式角色:生产者,消费者都是线程,两者中间是容器,容器内部是产品. 要求: 容器 里面要定义容量 容器 往里面添加(满时等待) 或者 从里面删除(空时等待) ,都要是阻塞的(等 ...

  8. Java生产者 消费者模型的一种实现

    本文主要介绍java中生产者/消费者模式的实现,对java线程锁机制的一次深入理解. 生产者/消费者模型 生产者/消费者模型要保证,同一个资源在同一时间节点下只能被最多一个线程访问,这个在java中用 ...

  9. 生产者消费者模型实现方式:管程法,信号灯法

    管程法 生产者:负责生产的数据模块 消费者:负责处理数据的模块 缓冲区:消费者不能直接从生产者获取产品,生产者生产进入缓冲区 public class Demo {public static void ...

最新文章

  1. static_cast, dynamic_cast, reinterpret_cast, const_cast区别比较
  2. 中国移动将向广大开发者开放了SDK/API等开发工具
  3. 预训练模型的前世今生(有福利!)
  4. mysql 存储过程的使用;
  5. 转载 网络上的8051 free IP core资源
  6. 【OpenCV学习笔记】【函数学习】十九(感兴趣区域)
  7. 安装ffmpeg及nginx模块
  8. android之socket编程实例
  9. 百度地图街景图片爬取
  10. Gitee 多人协作开发教程
  11. Tomcat修行之路-7.Tomcat-Mapper组件机制以及请求处理机制
  12. spacy教程--基础
  13. 【Blender3D模型库】飞机摇身变玩具?Blender视觉特效教程
  14. 前端修改input上传的图片大小
  15. Python编辑器UliPad安装
  16. 王半仙儿的日记-0013
  17. Adobe 官方公布的 RTMP 规范+未公布的部分
  18. Coke Rejection Risks Big Spill
  19. sql server 函数大全
  20. CSS_变换(transform)

热门文章

  1. 利用jquery的qrcode.js插件生成二维码的两种方式的使用
  2. OpenSSL 之 RSA 相关命令学习笔记
  3. python debug
  4. 【参数】REMOTE_LOGIN_PASSWORDFILE参数三种取值及其行为特性分析
  5. 0207.Domino R8.0.x群集配置手册
  6. python中frozenset( )和set()的用法区别
  7. Spring中BeanPostProcessor
  8. 0b3398php,思想道德修养与法律基础(九江职业技术学院)知到2020题目及答案
  9. windows系统如何查看端口被占用、杀进程
  10. Variable、Tensor、Numpy的转换