kafka异步发送数据

对于一个项目,我试图记录用户的基本交易,例如添加和删除一个项目以及多种类型的项目,并为每笔交易向kafka发送一条消息。 日志机制的准确性不是至关重要的,在kafka服务器停机的情况下,我不希望它阻止我的业务代码。 在这种情况下,将数据发送到kafka的异步方法是更好的方法。

我的kafka生产者代码在其引导项目中。 为了使其异步,我只需要添加两个注释:@EnableAsync和@Async。

@EnableAsync将在您的配置类中使用(还要记住,带有@SpringBootApplication的类也是配置类),并将尝试查找TaskExecutor bean。 如果没有,它将创建一个SimpleAsyncTaskExecutor。 SimpleAsyncTaskExecutor适用于玩具项目,但对于任何大于此的项目都存在一定的风险,因为它不限制并发线程,也不会重用线程。 为了安全起见,我们还将添加一个任务执行者bean。

所以,

 @SpringBootApplication  public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); }  } 

会变成

 @EnableAsync  @SpringBootApplication  public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize( 2 ); executor.setMaxPoolSize( 2 ); executor.setQueueCapacity( 500 ); executor.setThreadNamePrefix( "KafkaMsgExecutor-" ); executor.initialize(); return executor; }  } 

如您所见,这里没有太多变化。 我设置的默认值应根据您的应用程序需求进行调整。

我们需要的第二件事是添加@Async。

我的旧代码是:

 @Service  public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate<String, KafkaInfo> kafkaTemplate; @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus); }  } 

如您所见,同步代码非常简单。 它只需要kafkaTemplate并将消息对象发送到“ logs”主题。 我的新代码比这更长。

 @Service  public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate kafkaTemplate; @Async @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus)); future.addCallback( new ListenableFutureCallback<>() { @Override public void onSuccess( final SendResult<String, KafkaInfo> message) { // left empty intentionally } @Override public void onFailure( final Throwable throwable) { // left empty intentionally } }); }  } 

在这里,onSuccess()对我而言并不真正有意义。 但是onFailure()我可以记录该异常,以便通知我我的kafka服务器是否有问题。

我还要与您分享另一件事。 为了通过kafkatemplate发送对象,我必须为其配备序列化文件。

 public class KafkaInfoSerializer implements Serializer<kafkainfo> { @Override public void configure(Map map, boolean b) { } @Override public byte [] serialize(String arg0, KafkaInfo info) { byte [] retVal = null ; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(info).getBytes(); } catch (Exception e) { // log the exception } return retVal; } @Override public void close() { }  } 

另外,不要忘记为其添加配置。 有几种方法可以为kafka定义序列化器。 最简单的方法之一是将其添加到application.properties。

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer

现在,您有了一个启动项目,该项目可以将异步对象发送到所需的主题。

翻译自: https://www.javacodegeeks.com/2020/01/send-your-data-async-on-kafka.html

kafka异步发送数据

kafka异步发送数据_在Kafka上异步发送数据相关推荐

  1. 云中数据_免费备份和共享云中数据的最佳网站

    云中数据 We've been told many times how important backups are, although we may not realize it until it's ...

  2. Java_Hive自定义函数_UDF函数清洗数据_清洗出全国的省份数据

    Java_Hive_UDF函数清洗数据_清洗出全国的省份数据 最近用Hadoop搞数据清洗,需要根据原始的地区数据清洗出对应的省份数据,当然我这里主要清洗的是内陆地区的数据,原始数据中不包含港澳台地区 ...

  3. ajax post提交数据_详解Ajax异步加载

    前言: 作为资深球迷,提起Ajax,第一反应想到的是阿贾克斯,那个曾培养出伊布,范德法特,苏亚雷斯,亨特拉尔等一众球星的荷甲著名球队. 很显然,我们今天说的Ajax,跟足球没有任何关系,我们说的是这个 ...

  4. python 钉钉机器人发送图片_利用Python自动发送钉钉数据消息

    现在大部分公司都使用钉钉作为内部的主要沟通工具,钉钉消息基本都上都能快速有效的被阅读,打开率会比邮件高上不少.所以准备使用钉钉来播报平台每日的成交额,并附上一些鼓励的话和图片.起到一个激励团队的作用 ...

  5. Dws同步mysql数据_数据库技术丨GaussDB(DWS)数据同步状态查看方法

    摘要:针对数据同步状态查看方法,GaussDB(DWS)提供了丰富的系统函数.视图.工具等可以直观地对同步进度进行跟踪,尤其是为方便定位人员使用,gs_ctl工具已集合了大部分相关系统函数的调用,可做 ...

  6. 机器学习 处理不平衡数据_在机器学习中处理不平衡数据

    机器学习 处理不平衡数据 As an ML engineer or data scientist, sometimes you inevitably find yourself in a situat ...

  7. 如何给mysql表添加百万条数据_给mysql一百万条数据的表添加索引

    直接alter table add index 添加索引,执行一个小时没反应,并且会导致锁表:故放弃该办法,最终解决办法如下: 一.打开mysql 命令行客户端 这里我们那可以看到导出的数据文件所存放 ...

  8. wireshark抓组播数据_捕获广播或多播地址数据MAC地址数据端口应用程序数据Wireshark网络分析实例集锦大学霸...

    捕获广播或多播地址数据MAC地址数据端口应用程序数据Wireshark网络分析实例集锦大学霸 Wireshark网络分析实例集锦大学霸 3.8.3  捕获广播或多播地址数据广播地址就是当IP地址的网络 ...

  9. 有效数据外含有额外数据_basemap之地图上画额外数据

    有时候我们想将自己的数据画在地图上,比如点,线,热体图等.我们先画地图地形底图,然后将数据点画在地图之上. 1. 散点 Basemap.scatter() 比如我们经常会将站点画在地图上,如地震台站, ...

  10. cesium 3dtiles 加载本地数据_记一次Cesium地形数据生成过程

    问题描述 有一小块带高程值的点状数据,需要根据该数据生成Cesium支持的3dtiles数据,在Cesium中显示.经过一周多时间的摸索,终于能够在Cesium中加载成功.现将数据处理流程做个记录,以 ...

最新文章

  1. python的zip函数
  2. 别再用 BeanUtils 了,这款 PO VO DTO 转换神器不香么?
  3. 软件质量保证计划_软件测试计划 笔记
  4. Nature会议:驾驭植物微生物组(21年10月22-24,在线,优惠截止9月24日)
  5. 发展中国家如何炼成发达国家?
  6. Oracle复杂查询21道题精析
  7. ACM技巧 - O(1)快速乘(玄学) 总结
  8. kendo grid输入框验证方法
  9. AAA验证和ciscorescue v4.2 验证服务器的搭建(telnet方式和级别的设置)
  10. php中ignore_user_abort函数的用法(定时)
  11. [转载] Python的双端队列deque
  12. 幻灯片形式设计:从方法到技巧
  13. Lync Server 2013 实战系列之五:标准版-定义拓扑生成器
  14. “我男友是程序员,修BUG比我重要,服了!”
  15. 写一个小程序实现win系统定时锁屏
  16. java语言的基本介绍及相关特性
  17. java 堆栈的声明_Java 堆栈
  18. 银发经济崛起:什么才是“收割”老年人的正确姿势?
  19. coffeescript基本语法
  20. c++除法保留小数_小学数学整数和小数的应用题解答方法公式汇总,新学期必备...

热门文章

  1. 2020牛客暑期多校训练营(第六场)
  2. 【刷题记录】排列dp
  3. 模板:容斥优化多重方案背包
  4. CF505E-Mr. Kitayuta vs. Bamboos【贪心,二分】
  5. P3466-[POI2008]KLO-Building blocks【Treap】
  6. CDQ 分治与整体二分
  7. 【Floyed】【最短路】商店选址问题(ssl 1760)
  8. 博客文章列表(二)——算法、数据结构、数据库、ABCD
  9. Java中对象的三种状态
  10. 数据结构(三)之单链表反向查找