应用场景可能不太一样解决方式也大有不同,就按照我之前的场景的统一回复下吧:

1、内存溢出的原因:

当kafka集群(或单机)服务挂了,生产者继续向kafka发送消息时,有两个超时设置会导致线程不被及时释放,另外还有一个缓冲区大小的设置也会导致异常抛出,三个参数分别如下:

max.block.ms:指定生产者调用send()方法或使用partitionsFor()方法获取元数据时的阻塞时间,默认值60000ms(60秒);

request.timeout.ms:指定了生产者在发送数据时等待服务器返回响应的时间,默认值30000ms(30秒);

buffer.memory:设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息,如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。

即使用默认配置,当kafka挂了,线程调用send()方法向kafka发送消息至少会被阻塞60s,线程分分钟就会全部被阻塞,web容器在没有可用线程时收到的请求一般还会存放在队列中等待响应,线程得不到释放意味着内存同样无法被释放,所以很快内存就溢出了。

解决思路:

因此适当减少阻塞超时时长(测试设置为300ms)、增加生产者内存缓冲区,即便kafka挂了只要能即时释放线程及内存,应用服务就不至于挂掉,但阻塞时长过小有可能导致kafka网络波动时部分数据丢失,对数据有严格要求的场景并不适用。另外也可以从线程池里做限制,避免高并发场景下线程堵死的情况。

样例代码及分析:

package com.demo.kafka.util;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.support.SendResult;

import org.springframework.util.concurrent.ListenableFuture;

import com.demo.kafka.util.KafkaProducerFactory;

public class KafkaAppender{

private static final Logger log = LoggerFactory.getLogger(KafkaAppender.class);

// 线程数

private static int threadPoolNum = 20;

private static ExecutorService exec = Executors.newFixedThreadPool(threadPoolNum);

/**

* 解决当kafka挂掉时生产者内存溢出的三种思路:

* 1.获取当前ExecutorService线程池活动线程数,当活动线程数等于创建线程池线程数量时,表所有线程均处于阻塞状态,

* 此时return释放当前线程,不执行发送kafka; 特点:高并发时会丢失部分日志

*

* 2.当活动线程数等于创建线程池线程数量时,执行TimeUnit.MILLISECONDS.sleep(1000)

* 表示让出当前线程资源1秒,然后重新竞争发送; 特点:适用于kafka集群,或能及时恢复kafka服务的环境

*

* 3.设置最大阻塞时长max.block.ms,和最大请求时长request.timeout.ms为300ms(默认60s)

* 表示执行发送kafka超过300ms即认为发送失败,直接结束当前线程;(测试平均发送一条日志耗时为5ms)

*

*/

public void sendMsg2Kafka(final String msg) {

// 查询当前线程池活动线程数

int threadNum = ((ThreadPoolExecutor) exec).getActiveCount();

log.info("当前线程池活动线程数:{}", threadNum);

// 1(供参考).当线程池没有可用线程时接结束该线程任务(丢弃日志)

// while (threadNum == threadPoolNum) {

// log.info("线程池可用线程数为0,丢弃该条日志");

// return;

// }

// 2(供参考).当线程池没有可用线程时,调用线程进入睡眠状态,并让出执行机会给其它线程1000ms

// while (threadNum == threadPoolNum) {

// try {

// log.info("调用线程进入睡眠1000ms");

// TimeUnit.MILLISECONDS.sleep(1000);

// } catch (InterruptedException e) {

// log.error("Exception:", e);

// }

// }

// 3. 发送消息到kafka

exec.execute(new Runnable() {

@Override

public void run() {

long start = System.nanoTime();

try {

// 日志消息发送到kafka,并获获取返回结果

// KafkaProducerFactory是手动封装的一个获取KafkaTemplate的工厂类

ListenableFuture> result = KafkaProducerFactory.getKafkaTemplate().sendDefault(msg);

// 解析回调函数确认是否发送成功,失败时打印失败信息及阻塞时长

if (result != null) {

Long offsetIndex = result.get().getRecordMetadata().offset();

if (offsetIndex != null && offsetIndex >= 0) {

// 发送成功

long end = System.nanoTime();

log.info("日志发送成功,offset:{},发送耗时:{}ms", offsetIndex,TimeUnit.NANOSECONDS.toMillis(end - start));

} else {

// 发送失败

long end = System.nanoTime();

log.info("日志发送失败,阻塞时长:{}ms", TimeUnit.NANOSECONDS.toMillis(end - start));

}

}

} catch (Exception e) {

// 发送异常

long end = System.nanoTime();

log.error("日志发送异常,阻塞时长:{}ms", TimeUnit.NANOSECONDS.toMillis(end - start), e);

}

}

});

}

}

java内存溢出无法创建线程_kafka生产者发送消息失败导致内存溢出java.lang.OutOfMemoryError:Java heap space,请教如何解决?...相关推荐

  1. 应用jacob组件造成的内存溢出解决方案(java.lang.OutOfMemoryError: Java heap space)

    http://www.educity.cn/wenda/351088.html 使用jacob组件造成的内存溢出解决方案(java.lang.OutOfMemoryError: Java heap s ...

  2. java内存溢出模拟_模拟实战排查堆内存溢出(java.lang.OutOfMemoryError: Java heap space)问题...

    前言: 模拟实战中排查堆内存溢出(java.lang.OutOfMemoryError: Java heap space)的问题. 堆内存溢出的原因:一般都是创建了大量的对象,这些对象一直被引用着,无 ...

  3. executor线程池框架_如何使用Java 5 Executor框架创建线程池

    executor线程池框架 Java 5以Executor框架的形式在Java中引入了线程池,它允许Java程序员将任务提交与任务执行分离. 如果要使用Java进行服务器端编程,则线程池是维护系统可伸 ...

  4. 如何使用Java 5 Executor框架创建线程池

    Java 5以Executor框架的形式在Java中引入了线程池,它允许Java程序员将任务提交与任务执行分离. 如果要使用Java进行服务器端编程,则线程池是维护系统可伸缩性,鲁棒性和稳定性的重要概 ...

  5. Java学习笔记:创建线程的两种方法

    Java学习笔记:创建线程的两种方法 一.预备工作 1.创建Maven项目ThreadDemo 2.在pom.xml里添加依赖 二.继承Thread类创建子线程

  6. ES内存溢出,报错:java.lang.OutOfMemoryError: Java heap space

    es集群挂掉,报错信息如下: [WARN ][o.e.h.n.Netty4HttpServerTransport] [capitalcentre-14] caught exception while ...

  7. java.lang.OutOfMemoryError: Java heap space内存溢出解决方案

    java.lang.OutOfMemoryError: Java heap space内存溢出解决方案 一.设置环境变量 JAVA_OPTS= -Xms2048m -Xmx2048m 二.在Eclil ...

  8. Java 并发 多线程:创建线程的四种方式

    Java 并发 多线程: 创建线程的四种方式 继承 Thread 类并重写 run 方法 实现 Runnable 接口 实现 Callable 接口 使用线程池的方式创建 1. 通过继承 Thread ...

  9. java缓存内存泄漏_记一次mybaits缓存导致的内存溢出 java.lang.OutOfMemoryError: Java heap space...

    先贴一下错误截图 org.springframework.web.util.NestedServletException: Handler dispatch failed; nested except ...

  10. 内存不足 java.lang.OutOfMemoryError: Java heap space

    问题描述 Exception in thread "main" java.lang.OutOfMemoryError: Java heap space 解决方案[转] 一直都知道可 ...

最新文章

  1. spark应用程序转换_打包并提交运行Spark应用程序jar包
  2. python脚本中执行另一个脚本_如何用python调用另一个python脚本?
  3. 一天就能上线音乐教学APP?网易云信首推音乐教学解决方案!
  4. 并发执行变成串行_网易Java研发面试官眼中的Java并发——安全性、活跃性、性能...
  5. java学习(148):三个参数的输入流
  6. hexo github搭建博客常用的命令
  7. defineProperty AND defineProperties
  8. [Ajax] 案例 -- 三级联动
  9. 12大深度学习开源框架(caffe,tensorflow,pytorch,mxnet等)汇总详解
  10. PC网站实现微信扫码登录功能(二)
  11. 今天二月二龙抬头:除了理发 各地还有哪些习俗?
  12. 最简单的方法来压缩图片,改变图片大小
  13. mysql analyze_MySQL数据库执行analyze采集信息
  14. CentOS 7教程(二)-网络设置
  15. OneWay广告Unity版SDK接入
  16. android xml加密解密,华为配置加解密工具
  17. win10怎么用计算机的搜索,win10搜索文件内容怎么操作_win10如何搜索文档内的内容...
  18. 为什么机器人运动学逆解最好采用双变量反正切函数atan2而不用反正/余弦函数?
  19. mysql dump -a -b_MySQL数据导出工具 mysqldump
  20. Webix - JavaScript UI 9.2.0

热门文章

  1. sweetalert
  2. 你要看看这些有趣的函数方法吗?
  3. Office 365中的密码过期策略
  4. UITableView(二)
  5. Image Gallery
  6. Openlayers 杂项
  7. 移动端overflow-x去掉滑动条
  8. 比对字段判断字段是否为空,合并列字段(合并多列)
  9. PHP sql IN查询改成子查询
  10. 敢从头写一个OFFICE,你这么厉害,怎么不来解几个BUG