hazelcast 提供了3中方法调用startCleanup:

第一种是在ConcuurentMapManager的构造函数中,通过调用node的executorManager中的ScheduledExecutorService来创建每秒运行一次cleanup操作的线程(代码例如以下)。

因为这是ConcuurentMapManager构造函数的代码,所以这样的调用startCleanup的操作是默认就会有的。

node.executorManager.getScheduledExecutorService().scheduleAtFixedRate(new Runnable() {publicvoid run() {for (CMap cMap : maps.values()) {cMap.startCleanup(false);}}}, 1, 1, TimeUnit.SECONDS);

另外一种是通过配置文件来触发startCleanup的运行。配置 PutOperationhandlerif overcapacity policy。我们系统的配置文件没有配置这方面的policy,全部这样的方式在我们系统中没有使用。

第三种是自己直接写代码去调用startCleanup函数(public方法。线程安全的). 这个没有实如今我们的系统中。

所以我的调查方向放在了第一种调用的情况,hazelcast里面的ScheduledExecutorService是通过java.util.ScheduledThreadPoolExecutor 来实现的.

esScheduled = new ScheduledThreadPoolExecutor(5, new ExecutorThreadFactory(node.threadGroup,node.getThreadPoolNamePrefix("scheduled"), classLoader), new RejectionHandler()) {protected void beforeExecute(Thread t, Runnable r) {threadPoolBeforeExecute(t, r);}}

查看ScheduledThreadPoolExecutor的实现,它把线程实现分成了3个部分: runnable tasks可运行任务, workers to execute the tasks运行任务的详细线程 以及 ScheduledThreadPoolExecutor 调度workers依照要求运行runnable tasks。

我们通过scheduleAtFixdRate提交了task,scheduleAtFixedRate先把它打包成反复运行的ScheduleFutureTask

<pre name="code" class="java">    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();RunnableScheduledFuture<?

> t = decorateTask(command, new <strong>ScheduledFutureTas</strong>k<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; }

ScheduleFutureTask的run方法实现又一次schedule:

public void  run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();<strong> reExecutePeriodic(outerTask);</strong>}
}

delayedExecute里面假设当前worker的数目小于初始化定义的CorePool的数目,就创建新的worker线程,然后把task放到queue里面

private void delayedExecute(Runnable command) {if (isShutdown()) {reject(command);return;}// Prestart a thread if necessary. We cannot prestart it// running the task because the task (probably) shouldn't be// run yet, so thread will just idle until delay elapses.if (getPoolSize() < getCorePoolSize())prestartCoreThread();<strong> super.getQueue().add(command);</strong>
}
public boolean prestartCoreThread() {return addIfUnderCorePoolSize(null);}private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}return t != null;}
private Thread addThread(Runnable firstTask) {Worker w = new Worker(firstTask);Thread t = threadFactory.newThread(w);boolean workerStarted = false;if (t != null) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();w.thread = t;workers.add(w);int nt = ++poolSize;if (nt > largestPoolSize)largestPoolSize = nt;try {t.start();workerStarted = true;}finally {if (!workerStarted)workers.remove(w);}}return t;
}

全部启动的worker就做一件事情,从queue中取task运行

     try {hasRun = true;Runnable task = firstTask;firstTask = null;while (task != null || (task = <strong>getTask</strong>()) != null) {<strong>runTask(task);</strong>task = null;}} finally {workerDone(this);}}}Runnable getTask() {<strong> for (;;) {</strong>try {int state = runState;if (state > SHUTDOWN)return null;Runnable r;if (state == SHUTDOWN)  // Help drain queuer = workQueue.poll();else if (poolSize > corePoolSize || allowCoreThreadTimeOut)r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);else<strong> r = workQueue.take();</strong>if (r != null)return r;if (workerCanExit()) {if (runState >= SHUTDOWN) // Wake up othersinterruptIdleWorkers();return null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}}
}
private void runTask(Runnable task) {final ReentrantLock runLock = this.runLock;runLock.lock();try {if ((runState >= STOP ||(Thread.interrupted() && runState >= STOP)) &&hasRun)thread.interrupt();boolean ran = false;beforeExecute(thread, task);<strong> try {task.run();ran = true;afterExecute(task, null);++completedTasks;} catch (RuntimeException ex) {if (!ran)afterExecute(task, ex);throw ex;}</strong>} finally {runLock.unlock();}}

了解了java threadpool的工作原理之后。我们能够知道。startCleanup是代码pass给ScheduledThreadPoolExecutor的runnable task,它不被运行,可能的原因有:

1. ScheduledThreadPoolExecutor初始化时候出错,task全然没有提交成功。因为lastCleanup并非系统应用的启动时间,已经过了几个月了,所以。非常明显在系统初始化的时候,esScheduled(ScheduledThreadPoolExecutor)还是正常工作的,仅仅是突然在2月4号停止了工作,所以这样的可能性能够排除。

2.    Worker 没有正常工作。不在从ScheduledThreadPoolExecutor的queue里面取数据,这个非常快就被我排除了:

首先heap dump中有5个pending workers in esScheduled (0/2/3/5/9):

其次从thread dump中能够看出,这五个线程都是在等着从queue里面取数据:

    ……<strong> at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)[optimiz</strong>ed]at java/util/concurrent/DelayQueue.take(DelayQueue.java:164)[optimized]at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)[inlined]at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)[optimized]at java/util/concurrent/ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)[optimized]at java/util/concurrent/ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)at java/lang/Thread.run(Thread.java:662)at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method)-- end of trace
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-2" id=51 idx=0xd8 tid=32639 prio=5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-3" id=52 idx=0xdc tid=32640 prio=5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-4" id=53 idx=0xe0 tid=32641 prio=5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-5" id=75590 idx=0x3cc tid=3308 prio=5 alive, parked, native_blocked

所以worker不正常也被排除了。

3.  我们提交给系统的runner task自己主动从queue里面消失了,从memory dump中确实发现queue没有tasks了

而没有task的原因非常明显是由于当前task运行完之后没有又一次reschedule,至于原因,由于scheduledFutrueTask已经不存在,无法从memory dump和thread dump中分析出结果,成为了一个谜。。

。。

public void  run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();<strong> reExecutePeriodic(outerTask);</strong>}
}

转载于:https://www.cnblogs.com/liguangsunls/p/6898371.html

实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2相关推荐

  1. linux无效内存访问,x86_64 Linux 3.0:无效的内存地址

    x86_64体系结构上的Linux 3.0上的进程具有64位虚拟地址空间. 很明显,在该地址空间中,保证0是无效的内存地址[请参见下面的定义],因为该地址用于指示NULL指针. 保证哪些其他64位数字 ...

  2. Keil | 解决Keil与Source Insight4.0配合使用时,代码与注释位置(乱码)不一样的问题

    文章目录 一.前言 二.解决问题 2.1.Keil 2.2.Source Insight4.0 一.前言 Keil | 解决Keil与VScode配合使用时,代码与注释位置不一样的问题 上一次解决VS ...

  3. 流计算 Oceanus | Flink JVM 内存超限的分析方法总结

    作者:董伟柯,腾讯 CSIG 高级工程师 问题背景 前段时间,某客户线上运行的大作业(并行度 200 左右)遇到了 TaskManager JVM 内存超限问题(实际内存用量 4.1G > 容器 ...

  4. MapReduce 在Shuffle阶段 内存溢出原因分析及处理方法

    在Reduce运行中,有时出现内存溢出错误,抛出的异常信息如下: Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError ...

  5. FreeRTOS源码分析与应用开发10:内存管理

    目录 1. 概述 1.1 RTOS中内存分配特点 1.2 内存堆(heap space)来源 1.2.1 ucHeap数组 1.2.2 链接器设置的堆 1.2.3 多个非连续内存堆 1.3 关于字节对 ...

  6. android手机内存使用情况分析

    android手机内存使用情况分析         通常客户经常纠结手机内存的使用率不合理,占有的内存太大,可用内存很少,客户往往需要给出解决方案或在给出原因,那么你首先需要知道手机的内存都被什么应用 ...

  7. jstat 内存泄漏_基于Java内存dump文件分析解决内存泄漏问题

    概述 本文介绍一次解决现场java内存泄漏问题的经过,希望能提供后续遇到类似情况的读者一点思路. 生产环境发现的问题问题 生产环境运维人员反馈,服务器(windows系统)卡死,相关的服务都运行异常, ...

  8. 从内存分配角度分析c和java里的static 关键字.

    即使作为Java的初学者, 对this 和 static 这两个关键字都不会陌生. 其实也不难理解: this 关键字:  指的是对象的本身(注意不是类本身)  跟.net 语言的Me 关键字类似. ...

  9. java thread 内存泄露_Java ThreadLocal 内存泄露问题分析及解决方法。

    前言 在分析ThreadLocal导致的内存泄露前,需要普及了解一下内存泄露.强引用与弱引用以及GC回收机制,这样才能更好的分析为什么ThreadLocal会导致内存泄露呢?更重要的是知道该如何避免这 ...

最新文章

  1. python中csv文件操作_python中操作csv文件
  2. iOS开发之自定义View的一些坑
  3. PreparedStatement动态参数的引入
  4. Java 客户端界面功能:停止当前操作
  5. navicat连接mysql报10061错
  6. 拼接的option会多出空行_Word空格,空行,页眉横线等问题,我只花一分钟就全解决了...
  7. linux收发outlook的邮件,Linux邮箱服务器配置:如何让outlook收发邮件,怎么样控制中继...
  8. java script 环境搭建_TypeScript环境搭建
  9. python时heatmap_就业寒冬,从拉勾招聘看Python就业前景
  10. Linux共享内存(二) (转载)
  11. 开始位置 环状图_【技术分享】如何找到压铸模具中真空阀的最佳位置?
  12. python爬虫什么意思-python的爬虫是什么意思
  13. Open Cube 时信魔方介绍
  14. 基于bim二次开发的智能楼宇管理系统
  15. 中国各省GDP数据集(1949年-2020年)
  16. 小程序组件传值解决页面数据刷新
  17. git push报错:fatal: unable to access ‘https://XXX.git/‘: Failed toconnect to github.com port 443
  18. html5允许属性值不使用引号,HTML5概述 - 阿振的个人空间 - OSCHINA - 中文开源技术交流社区...
  19. 轻轻松松背单词软件测试,完美单词王app
  20. YUV编码为H264 H264封装为MP4

热门文章

  1. JDBC学习笔记 day1
  2. 无表头单链表的总结----输出链表
  3. 关于 asp.net 点击确定按钮 获取不到新值问题
  4. iOS:Cocoa编码规范 -[译]Coding Guidelines for Cocoa
  5. !function(){}()
  6. PHP中的__toString方法(实现JS里的链式操作)
  7. webapi get请求 FromUri list参数传递
  8. js对象序列化为json字符串
  9. 南邮计算机学院是211,南京邮电大学是211还是985
  10. Pytorch LSTM实例2