本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常。

查看consumer消费情况

Group       Topic        Pid    Offset     logSize         Lag             Owner
demo-group demo-topic     0    9678273         9858394         180121          xxx-service-dpqpc-1510557406684-e2171bd6-0
demo-group demo-topic     1    9689443         9873522         184079          xxx-service-dpqpc-1510557406684-e2171bd6-1
demo-group demo-topic     2    9676875         9855874         178999          xxx-service-q7vch-1510557399475-b1d7d22c-0
demo-group demo-topic     3    9683393         9864518         181125          xxx-service-q7vch-1510557399475-b1d7d22c-1
复制代码

发现消费者的offset与logSize差距太大,lag值都过10w了。

正常的情况

Group           Topic         Pid   Offset          logSize         Lag             Owner
demo-group      demo-topic    0     9860587         9860587         0               demo-group_tomcat2-1512984437115-fc1ee57b-0
demo-group      demo-topic    1     9875814         9875814         0               demo-group_tomcat2-1512984437115-fc1ee57b-0
demo-group      demo-topic    2     9858213         9858214         1               demo-group_tomcat2-1512984437115-fc1ee57b-1
demo-group      demo-topic    3     9866744         9866744         0               demo-group_tomcat2-1512984437115-fc1ee57b-2
复制代码

像这种lag差距比较少的,是正常的。

查看topic的partition

    Topic:demo-topic    PartitionCount:4    ReplicationFactor:2 Configs:Topic: demo-topic   Partition: 0    Leader: 3   Replicas: 3,4   Isr: 4,3Topic: demo-topic   Partition: 1    Leader: 4   Replicas: 4,1   Isr: 1,4Topic: demo-topic   Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2Topic: demo-topic   Partition: 3    Leader: 2   Replicas: 2,3   Isr: 2,3
复制代码

topic是4个分区,因此4个consumer来消费是正常的。 问题可能是消费者消费速度太慢,或者消费者消费异常。

排查

jstack -l pid

2017-12-27 04:06:23
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):"Attach Listener" #12286 daemon prio=9 os_prio=0 tid=0x00007f2920001000 nid=0x3087 waiting on condition [0x0000000000000000]java.lang.Thread.State: RUNNABLE"ConsumerFetcherThread-xxx-service-dpqpc-1510557406684-e2171bd6-0-3" #9263 prio=5 os_prio=0 tid=0x00007f287400d800 nid=0x2440 waiting on condition [0x00007f285e6eb000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x00000007048874b0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)at kafka.utils.Utils$.inLock(Utils.scala:535)at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"ConsumerFetcherThread-xxx-service-dpqpc-1510557406684-e2171bd6-0-4" #9262 prio=5 os_prio=0 tid=0x00007f28740c2800 nid=0x243f waiting on condition [0x00007f291950d000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x00000007048086d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)at kafka.utils.Utils$.inLock(Utils.scala:535)at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"xxx-service-dpqpc-1510557406684-e2171bd6-leader-finder-thread" #9261 prio=5 os_prio=0 tid=0x0000000002302800 nid=0x243e waiting on condition [0x00007f28bd1df000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x0000000703d06518> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"consume-2" #62 prio=5 os_prio=0 tid=0x00007f28f8e86000 nid=0x51 waiting on condition [0x00007f28bd3e1000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x000000070440cd38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)"consume-1" #61 prio=5 os_prio=0 tid=0x00007f28f8e84800 nid=0x50 waiting on condition [0x00007f28bd4e2000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x000000070440cd38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)"xxx-service-dpqpc-1510557406684-e2171bd6_watcher_executor" #59 prio=5 os_prio=0 tid=0x00007f28fb685800 nid=0x4e waiting on condition [0x00007f28bd8e4000]java.lang.Thread.State: TIMED_WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x00000007048878d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:544)
复制代码

上面consume-1以及consume-2是具体的消费kafka的业务线程

error log

2017-12-16 12:53:34.257  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], begin rebalancing consumer xxx-service-q7vch-1510557399475-b1d7d22c try #1
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] Stopping leader finder thread
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] Stopping all fetchers
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] All connections stopped
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Cleared all relevant queues for this fetcher
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Cleared the data chunks in all the consumer message iterators
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Committing all offsets after clearing the fetcher queues
复制代码

日志几乎没有消费到消息的痕迹,但是lag确又有那么多。

一开始看异常日志,找到这个,加上上面的jstack,看到ConsumerFetcherThread一直blocking在PartitionTopicInfo.enqueue,有点怀疑是rebalance引起的死锁或阻塞。之前jstack忘记加-l,无法看到死锁信息。网上查了一下,看到ConsumerFetcherThread deadlock?有提到类似的问题,不过看是14年的帖子的,kafka0.8.2.2版本应该是有修复了才对。紧接着看到

The fetchers are blocked on the queue since it is full, is your consumer iterator stopped and hence not getting more data from it?

有点开始怀疑是否是自己的业务线程没有捕获异常挂了,因而就没有消费了。重启了下程序,看log,刷刷的消费消息。再jstack对比一下

"ConsumerFetcherThread-xxx-376jt-1514353818187-b37be1c0-0-3" #81 prio=5 os_prio=0 tid=0x00007fe39c004000 nid=0x63 waiting on condition [0x00007fe3931f4000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x00000007822ac4e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)at kafka.utils.Utils$.inLock(Utils.scala:535)at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"ConsumerFetcherThread-xxx-376jt-1514353818187-b37be1c0-0-4" #80 prio=5 os_prio=0 tid=0x00007fe39c003000 nid=0x62 waiting on condition [0x00007fe3926ea000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x00000007821c9a68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)at kafka.utils.Utils$.inLock(Utils.scala:535)at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"xxx-376jt-1514353818187-b37be1c0-leader-finder-thread" #79 prio=5 os_prio=0 tid=0x0000000001f5a000 nid=0x61 waiting on condition [0x00007fe3920e7000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x0000000782154c30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"consume-2" #62 prio=5 os_prio=0 tid=0x00007fe48da13800 nid=0x51 runnable [0x00007fe392ff1000]java.lang.Thread.State: RUNNABLE//......at org.springframework.data.mongodb.core.MongoTemplate.executeFindMultiInternal(MongoTemplate.java:1948)at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1768)at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1751)at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:625)at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:590)at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:582)at com.xxx.consumer.KafkaStreamProcessor.process(KafkaStreamProcessor.java:37)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:497)at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)"consume-1" #61 prio=5 os_prio=0 tid=0x00007fe48e310000 nid=0x50 runnable [0x00007fe3930f2000]java.lang.Thread.State: RUNNABLE//...at org.springframework.data.mongodb.core.MongoTemplate$12.doInCollection(MongoTemplate.java:1157)at org.springframework.data.mongodb.core.MongoTemplate$12.doInCollection(MongoTemplate.java:1137)at org.springframework.data.mongodb.core.MongoTemplate.execute(MongoTemplate.java:463)at org.springframework.data.mongodb.core.MongoTemplate.doUpdate(MongoTemplate.java:1137)at org.springframework.data.mongodb.core.MongoTemplate.upsert(MongoTemplate.java:1099)at com.xxx.consumer.KafkaStreamProcessor.process(KafkaStreamProcessor.java:37)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:497)at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)"xxx-376jt-1514353818187-b37be1c0_watcher_executor" #59 prio=5 os_prio=0 tid=0x00007fe48fe7c000 nid=0x4e waiting on condition [0x00007fe3934f5000]java.lang.Thread.State: TIMED_WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x0000000782155248> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:544)
复制代码

对比一下,发现原来怀疑的ConsumerFetcherThread一直blocking在PartitionTopicInfo.enqueue在重启之后还是存在,因此可能是正常的。

在对比下consume-1与consume-2,发现了问题,有问题的线程堆栈没有看到自己的业务方法,而重启之后发现了业务方法。因此问题的原因渐渐明朗,就是因为没有catch异常导致。

业务方法

原来的业务方法大致如下

@Async
public void process(KafkaStream<byte[], byte[]> stream){ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()) {System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));}
}
复制代码

这里有个疑问就是线程异常没有catch的话,理论上再次new的线程,id应该递增才对,但是通过实验发现,走async的,抛异常之后,线程id都不变。

spring-core-4.3.13.RELEASE-sources.jar!/org/springframework/util/CustomizableThreadCreator.java

public class CustomizableThreadCreator implements Serializable {private final AtomicInteger threadCount = new AtomicInteger(0);/*** Template method for the creation of a new {@link Thread}.* <p>The default implementation creates a new Thread for the given* {@link Runnable}, applying an appropriate thread name.* @param runnable the Runnable to execute* @see #nextThreadName()*/public Thread createThread(Runnable runnable) {Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());thread.setPriority(getThreadPriority());thread.setDaemon(isDaemon());return thread;}/*** Return the thread name to use for a newly created {@link Thread}.* <p>The default implementation returns the specified thread name prefix* with an increasing thread count appended: e.g. "SimpleAsyncTaskExecutor-0".* @see #getThreadNamePrefix()*/protected String nextThreadName() {return getThreadNamePrefix() + this.threadCount.incrementAndGet();}//...
}
复制代码

这里的threadCount没有看到调用decrement方法,因此如果线程异常挂掉,则理论上新补充的线程id应该是递增的。

/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/src.zip!/java/util/concurrent/ThreadPoolExecutor.java

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}
复制代码

调试发现completedAbruptly都是false,因而业务线程没有抛异常,这岂不是矛盾了。突然想起async注解的拦截,渐渐豁然开朗。

AsyncExecutionInterceptor

spring-aop-4.3.13.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

@Overridepublic Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable<Object> task = new Callable<Object>() {@Overridepublic Object call() throws Exception {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;}};return doSubmit(task, executor, invocation.getMethod().getReturnType());}
复制代码

async注解通过AsyncExecutionInterceptor拦截,然后包了一层,处理了异常,因此线程池里头是没有异常的。

小结

  • 使用kafka消费数据的时候,需要对offset的lag值进行实时监控,以确认消费速度是否ok
  • 调用KafkaStream的iterator消费线程必须catch住异常,否则抛出了异常,就停止消费了。

doc

  • ConsumerFetcherThread deadlock?
  • Java Highlevel Consumer is stuck and the lag is increasing

聊聊kafka consumer offset lag increase异常相关推荐

  1. java 获取kafka lag,聊聊kafka consumer offset lag的监控

    序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...

  2. 【kafka】kafka consumer offset lag获取的三者方式

    1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...

  3. Flink当中使用kafka Consumer

    Flink与kafka结合使用的三个优势: 第一:kafka可以作为Flink的Source和Sink来使用: 第二:Kafka的Partition机制和Flink的并行度机制可以深度结合,从而提高数 ...

  4. Kafka Consumer位移(Offset)提交——解决Consumer重复消费和消息丢失问题

    本文目录 1.Consumer 位移(offset) 1.2 位移(offset)的作用 2. 位移(offset)提交导致的问题 2.1 消息丢失 2.2 消息重复消费 3 Consumer位移提交 ...

  5. Spring boot 项目Kafka Error connecting to node xxx:xxx Kafka项目启动异常 Failed to construct kafka consumer

    Spring boot 项目Kafka Error connecting to node xxx:xxx Spring boot Kafka项目启动异常 新建了一个springBoot集成Kafka的 ...

  6. kafka监控获取logSize, offset, lag等信息

    由于项目需要,需要查看kafka消费信息lag(lag = logSize - offset) 参考https://www.aliyun.com/jiaocheng/775267.html 的实现方式 ...

  7. kafka consumer消费者 offset groupID详解

    kafka consumer:消费者可以从多个broker中读取数据.消费者可以消费多个topic中的数据. 因为Kafka的broker是无状态的,所以consumer必须使用partition o ...

  8. 聊聊kafka client chunkQueue 与 MaxLag值

    为什么80%的码农都做不了架构师?>>>    序 前面一篇文章讨论了ConsumerFetcherManager的MaxLag与ConsumerOffsetChecker的lag值 ...

  9. 聊聊 Kafka:Kafka 消息重复的场景以及最佳实践

    一.前言 上一篇我们讲了 聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践,这一篇我们来说一说 Kafka 消息重复的场景以及最佳实践. 我们下面会从以下两个方面来说一下 Kafka 消息重复 ...

最新文章

  1. Transformers包tokenizer.encode()方法源码阅读笔记
  2. InnoDB Monitors
  3. iphone闪退修复工具_iOS 13.3越yu工具再更新,修复若干问题(附自签教程)
  4. Shit和trash不是评价设计的词汇
  5. 分布式系统服务器要求,浅谈分布式系统
  6. php return 值_php return的用法是什么
  7. SpringBoot+Ajax文件上传+FormData表单提交
  8. zephir-(1)开篇介绍
  9. LCD1602芯片的使用——简单易懂
  10. Windows and CentOS IPv4转IPv6隧道方法
  11. git 设置单个文件上传大小
  12. 二维图像中的Hessian矩阵(及MATLAB代码)
  13. 【图】女孩暴雨中为残疾乞丐撑伞引热议
  14. 中科院计算所培训中心开启课程研发新征程
  15. 335平台部分flash用烧录器烧写不能启动问题
  16. 11、java常用单词(转载)
  17. 大型网站前端使用图片格式的正确姿势
  18. Android手机总内存和可用内存
  19. java第八天/10.21
  20. 上海阿尔卡特朗讯7360远程从网管机传送ONU

热门文章

  1. lan代表计算机什么,LAN 是什么
  2. 管道仪表流程图中常用的字母及其含义
  3. @InjectMocks
  4. C#中sealed的用法
  5. RFM模型—零售数据实战
  6. # imshow 报错
  7. HTML5 的Input 类型
  8. Caused by: java.lang.IllegalArgumentException的解决方法
  9. Linux系统连接校园网指南(JLU)
  10. 怎么用stata打开dta文件_第三十二章、用c语言打开文件