对于一个项目,我试图记录用户的基本交易,例如添加和删除一个项目以及多种类型的项目,并为每笔交易向kafka发送一条消息。 日志机制的准确性不是至关重要的,在kafka服务器停机的情况下,我不希望它阻止我的业务代码。 在这种情况下,将数据发送到kafka的异步方法是一种更好的方法。

我的kafka生产者代码在其引导项目中。 为了使其异步,我只需要添加两个注释:@EnableAsync和@Async。

@EnableAsync将在您的配置类中使用(还要记住,带有@SpringBootApplication的类也是配置类),并将尝试查找TaskExecutor bean。 如果没有,它将创建一个SimpleAsyncTaskExecutor。 SimpleAsyncTaskExecutor适用于玩具项目,但对于任何大于此的项目都存在一定的风险,因为它不限制并发线程,也不会重用线程。 为了安全起见,我们还将添加一个任务执行者bean。

所以,

 @SpringBootApplication  public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); }  } 

会变成

 @EnableAsync  @SpringBootApplication  public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize( 2 ); executor.setMaxPoolSize( 2 ); executor.setQueueCapacity( 500 ); executor.setThreadNamePrefix( "KafkaMsgExecutor-" ); executor.initialize(); return executor; }  } 

如您所见,这里没有太多变化。 我设置的默认值应根据您的应用程序需求进行调整。

我们需要的第二件事是添加@Async。

我的旧代码是:

 @Service  public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate<String, KafkaInfo> kafkaTemplate; @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus); }  } 

如您所见,同步代码非常简单。 它只需要kafkaTemplate并将消息对象发送到“ logs”主题。 我的新代码比这更长。

 @Service  public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate kafkaTemplate; @Async @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus)); future.addCallback( new ListenableFutureCallback<>() { @Override public void onSuccess( final SendResult<String, KafkaInfo> message) { // left empty intentionally } @Override public void onFailure( final Throwable throwable) { // left empty intentionally } }); }  } 

在这里,onSuccess()对我而言并不真正有意义。 但是onFailure()可以记录异常,因此可以通知我我的kafka服务器是否存在问题。

我还要与您分享另一件事。 为了通过kafkatemplate发送对象,我必须为其配备序列化文件。

 public class KafkaInfoSerializer implements Serializer<kafkainfo> { @Override public void configure(Map map, boolean b) { } @Override public byte [] serialize(String arg0, KafkaInfo info) { byte [] retVal = null ; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(info).getBytes(); } catch (Exception e) { // log the exception } return retVal; } @Override public void close() { }  } 

另外,不要忘记为其添加配置。 有几种定义kafka的序列化器的方法。 最简单的方法之一是将其添加到application.properties。

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer

现在,您有了一个启动项目,该项目可以将异步对象发送到所需的主题。

翻译自: https://www.javacodegeeks.com/2020/01/send-your-data-async-on-kafka.html

在Kafka上异步发送数据相关推荐

  1. kafka异步发送数据_在Kafka上异步发送数据

    kafka异步发送数据 对于一个项目,我试图记录用户的基本交易,例如添加和删除一个项目以及多种类型的项目,并为每笔交易向kafka发送一条消息. 日志机制的准确性不是至关重要的,在kafka服务器停机 ...

  2. kafka实现异步发送_Kafka Producer 异步发送消息居然也会阻塞?

    Kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 Kafka 传输海量消息,在这过程中遇到了一个 Kafka Producer 异步发送消息会被阻塞的问题, ...

  3. kafka实现异步发送_深入理解Kafka的发送流程

    引子 上一篇介绍了KafkaProducer的初始化的过程,这一篇将会介绍KafkaProducer消息发送流程,同样以源码中的examples为例. 我们实现的回调方法,用于提供请求完成的异步处理, ...

  4. kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程

    线上某服务 A 调用服务 B 接口完成一次交易,一次晚上的生产变更之后,系统监控发现服务 B 接口频繁超时,后续甚至返回线程池耗尽错误 Thread pool is EXHAUSTED.因为服务 B ...

  5. 【kafka】flink 发送 数据到 kafka 报错 Memory records is not writable

    1.场景1 1.1 概述 本次环境,kafka的版本为0.9.0.1,主要情况是flink写入数据到kafka shell端是可以发送的,我的程序重启后也是好的,运行一端时间后消息就发不出去了 然后程 ...

  6. rust里mp5a4_rust tokio mpsc 异步发送数据

    启动两个异步任务,一个阻塞在键盘输入.一个间歇的读取通道,有数据就输出.在快速输入的时候会卡住,因为 mpsc 的 buffer 设置为了 5. 代码如下: use std::io; use toki ...

  7. discard connection丢失数据_python kafka 生产者发送数据的三种方式

    python kafka 生产者发送数据的三种方式 发送方式 同步发送 发送数据耗时最长 有发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断 ...

  8. Kafka生产者——消息发送流程,同步、异步发送API

    生产者消息发送流程 发送原理 Kafka的Producer发送消息采用的是异步发送的方式. 在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAc ...

  9. C# 的TCPClient异步连接与异步读数据

    Socket的TCP通讯 一. socket的通讯原理 服务器端的步骤如下. (1)建立服务器端的Socket,开始侦听整个网络中的连接请求. (2)当检测到来自客户端的连接请求时,向客户端发送收到连 ...

最新文章

  1. python-9-IO编程
  2. 引用数据类型的深拷贝
  3. bat 启动 不弹出对话框_CAD中转换出的PDF文件模糊要怎么办
  4. 微服务介绍及Asp.net Core实战项目系列之微服务介绍
  5. 计算机键盘打出来都是英语大写怎么办,电脑键盘上大小写怎么切换
  6. 著名模拟鼠标点击软件小点点被收购
  7. Zcash已发布ZIP 313提案
  8. Kotlin 能取代 Java 吗?
  9. 虚函数 动态绑定 实现方式是:虚函数表
  10. 下载频道岁末领任务~赚下载分~~活动开始啦!!!!
  11. matlab高级教程,Matlab绘图系列之高级绘图教程
  12. 大学BBS年度十大原创淡黄笑话
  13. 网络工程师考试资料汇总
  14. 恶补地理知识--四大洋,七大洲
  15. WEB打印控件Lodop(V6.x)使用说明及样例
  16. Cobbler自动装机服务搭建步骤
  17. 一篇文章搞定前端单元测试框架 Jest
  18. [转]辨别常见与不常见音乐文件格式的质量好坏!
  19. 风口的猪(小米实习生笔试)
  20. MyEclipse中jer配置

热门文章

  1. ssl2345-繁忙的都市
  2. OJ4008-糖果【各种dp之3】
  3. codeforces1493 D. GCD of an Array(数论)
  4. codeforces1469 E. A Bit Similar
  5. 【bfs】重力球(luogu 7473/NOI Online 2021 普及组 T3)
  6. 1D/1D动态规划的三种优化方法
  7. 节操大师 北方大学生程序设计竞赛 南开大学
  8. 汇编语言(三十五)之输入字符串以$结束然后输出字母个数
  9. Oracle入门(十四.8)之迭代控制:基本循环Loop
  10. java之StringBuider与StringBuffer