项目场景

使用MongoDB cursor从中大批量拉取数据做ETL。


问题描述

处理程序连续运行25到35分钟肯定会遇到过早关闭连接,有时还会遇到游标找不到问题。

com.mongodb.MongoSocketReadException: Prematurely reached end of stream

at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112)

at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:135)

at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:713)

at com.mongodb.internal.connection.InternalStreamConnection.receiveMessageWithAdditionalTimeout(InternalStreamConnection.java:571)

at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:410)

at com.mongodb.internal.connection.InternalStreamConnection.receive(InternalStreamConnection.java:369)

at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:221)

at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:157)

at java.lang.Thread.run(Thread.java:748)


第一次分析:

1. 前面链接关闭比较懵圈,但是游标这个很直接了。阅读MongoDB文档了解到每个Session默认的timeout是30min。timeout之后session会被标记删除,等待删除线程完成销毁。特别地,当session被删除时,无论其中的cursor是否正在被使用也会被一并删除。看起来就是session超时了。

2. session为什么会超时。因为我使用的是Spring的MongoTemplate来操作,其中的session是共享的。因此当session超时时,所有进行中的操作都得完蛋。这么看来,我需要调整这么几个点。

a. 基于cursor的操作调整为,专用session处理;

b. 开辟一个session刷新线程对session做刷新;

c. 当操作结束或者异常退出时,刷新线程能够感知session已不再使用,将其删除;


第一次验证:

1.  在service中增加获取session绑定的MongoTemplate的方法;

2. 调整方法不再基于@Autowired MongoTemplate来操作,而是基于传入的SessionTuple来操作;

3. 对游标进行操作前将其加入刷新线程的session队列中,并基于try-with-resource包裹保证无论异常/还是正常结束调用到close方法;

4. 刷新线程对队列中的对象做判断,如果已close则从队列中删除;

一顿操作猛如虎,欢天喜地地上测试,结果这个异常依然坚挺地存在。之所以说存在,是因为不是超过30分钟必现,而是连续运行几个小时还会出现,而且没啥时间规律。仅仅比之前出现需要的时间长而已。


第二次分析:

1.  按道理来说我刷新了session,可以保证session不会timeout。那么游标应该也不会timeout。再看文档,cursortimeout的时间为10分钟而且是idle timeout。MongoDB Server Parameters — MongoDB Manualhttps://www.mongodb.com/docs/v4.4/reference/parameters/#logical-session

当然也可以通过命令再确认,在mongoshell上切换到admin查看相关参数。

use admin 
db.adminCommand( { getParameter : "*" } )

看返回值里的

cursorTimeoutMillis: Long("600000"),

我的一直在获取数据并且每次都小于1s,肯定不是这里的问题。

2. 没其他思路,再看看日志吧,觉得日志太少。于是修改到TRACE级别,看看有没有更多信息。

3. 结果发现最终产生原因是ServerMonitor线程发现的。然后上代码分析,发现直接颠覆了对heartbeat的认知。

Caused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream

at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112)

at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:135)

at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:713)

at com.mongodb.internal.connection.InternalStreamConnection.receiveMessageWithAdditionalTimeout(InternalStreamConnection.java:571)

at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:410)

at com.mongodb.internal.connection.InternalStreamConnection.receive(InternalStreamConnection.java:369)

at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:221)

at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:157)

... 1 common frames omitted

4. 在DefaultServerMonitor线程中有一个专用链接对集群进行状态探测在方法(lookupServerDescription)而这个探测的间隔就是心跳频次,也就是hearbeatFrequenceMs参数。如果成功,啥都好说。如果失败,那不好意思关闭整个ConnectionPool.

public class ServerMonitor{@Overridepublic void run() {ServerDescription currentServerDescription = unknownConnectingServerDescription(serverId, null);try {while (!isClosed) {ServerDescription previousServerDescription = currentServerDescription;// 具体的就在这个lookupServerDescription方法里currentServerDescription =
lookupServerDescription(currentServerDescription);if (isClosed) {continue;}if (currentCheckCancelled) {waitForNext();currentCheckCancelled = false;continue;}logStateChange(previousServerDescription, currentServerDescription);sdamProvider.get().update(currentServerDescription);if (((connection == null || shouldStreamResponses(currentServerDescription))&& currentServerDescription.getTopologyVersion() != null)|| (connection != null && connection.hasMoreToCome())|| (currentServerDescription.getException() instanceof MongoSocketException&& previousServerDescription.getType() != UNKNOWN)) {continue;}waitForNext();}} finally {if (connection != null) {connection.close();}}}
}

5. 所以,这次的失败应该是探测超时引起的。但是好端端地为啥超时了呢?总之就是实际心跳间隔与设置的心跳间隔不匹配。那实际到底需要多久呢?直接debug下发现不执行数据拉取3秒左右,数据任务一开始大概8秒左右,再联想到我还会开并发。这样不仅mongodb server压力大,提供query service的mongodb client机器的压力也会变大。再看我设置的值是10s。只好斗胆猜测是这里的问题,果断调整为20s。


第二次验证:

稳如老狗地运行了2个小时,然后放到测试环境来个拉力测试。结果,稳如老狗地跑了1天,问题总算解决。


反思

当第一次看到链接过早关闭,首先想到的是心跳,觉得自己设置的心跳挺合理。但Mongodb Driver sync 提供的ConnectionPool仅基于Connection的使用空闲时间做管理,也就是多久不使用则关闭链接。同时没有对connection做心跳探活。实际心跳是用在对集群状态的探活上,如果该专用连接上探活失败,则将connection pool销毁而后重建,相当于此时是一个全新的cluster以及connection pool。同时在connection层面也确实有关于generation的设计,连接池管理中也会删除上一个generation的connection。过往的经验是connection有心跳设置,则肯定是没问题的。这种经验主义影响了判断。

Prematurely reached end of stream相关推荐

  1. mongodb偶尔报错com.mongodb.MongoSocketReadException: Prematurely reached end of stream

    项目开发中,链接mongodb的项目,偶尔报错com.mongodb.MongoSocketReadException: Prematurely reached end of stream 报错的详细 ...

  2. docker搭建mongodb高可用集群

    docker搭建mongodb集群 参考资料: 基于 Docker 的 MongoDB 主从集群 http://www.imooc.com/article/details/id/258885 mong ...

  3. 追源索骥:透过源码看懂Flink核心框架的执行流程

    https://www.cnblogs.com/bethunebtj/p/9168274.html 追源索骥:透过源码看懂Flink核心框架的执行流程 前言 1.从 Hello,World WordC ...

  4. c语言标准io中可读可写,C语言标准IO: [先读再feof] VS [先feof再读]

    刚学习C语言读取文件的时候,可能都遇到过这个"bug",读到末尾时数据有重复. 解决方案也是五花八门,甚至有人把数据先缓存了,再忽略掉最后一组.... 不妨看一段代码,两种解决方案 ...

  5. ExoPlayer播放器剖析(六)ExoPlayer同步机制分析

    关联博客 ExoPlayer播放器剖析(一)进入ExoPlayer的世界 ExoPlayer播放器剖析(二)编写exoplayer的demo ExoPlayer播放器剖析(三)流程分析-从build到 ...

  6. 追源索骥:透过源码看懂Flink核心框架的执行流程--来自GitHub

    追源索骥:透过源码看懂Flink核心框架的执行流程 联系qq2499496272可进行删除,需要文件版本的私聊!!~ 文章目录 追源索骥:透过源码看懂Flink核心框架的执行流程 前言 1.从 ~~H ...

  7. Android视频编解码

    简介 从广义上讲,编解码器就是处理输入数据来产生输出数据.MediaCode采用异步方式处理数据,并且使用了一组输入输出缓存(input and output buffers).简单来讲,你请求或接收 ...

  8. 阿里云大学Apache Flink大数据学习笔记

    之前有看过一些基础的大数据课程,现在又回来发现这里的课程更新还是蛮快的,讲的内容干货也很多,继续学习一下,下面是一些主要内容说明. 地址:https://developer.aliyun.com/le ...

  9. c语言 读文件 小数,请用C语言解决问题一个文件,每行131个小数,共61行,如何用? 爱问知识人...

    #include #include void main( void ) { ? ? ? ? ?int ?count, total = 0; ? ? ? ? ?char buffer[100]; ? F ...

  10. OpenGL.ES在Android上的简单实践:21-水印录制(MediaCodec输出h264+MediaMuxer合成mp4 上)

    OpenGL.ES在Android上的简单实践:21-水印录制(MediaCodec输出h264+MediaMuxer合成mp4 上) 1.录制视频需要什么? 在上篇文章,我们已经成功的满足了需求,在 ...

最新文章

  1. 白话Elasticsearch44-深入聚合数据分析之案例实战_颜色+品牌下钻分析时按最深层metric进行排序
  2. Java中list.forEach方法的使用示例-根据key获取对应的value
  3. Nginx配置实例-反向代理实现浏览器请求Nginx跳转到服务器某页面
  4. 如何运行 SAP Spartacus cypress 端到端测试
  5. java 堆转储快照_捕获Java堆转储的7个选项
  6. Spring中HibernateTemplate类的使用
  7. python 计算订单_python实现apriori算法的关联规则之支持度、置信度、提升度
  8. Gartner最新发布:影响2022年基础设施和运营重大趋势
  9. kotlin 编译时常量_Kotlin程序| 编译时常量示例
  10. 应用程序架构指导袖珍版
  11. 程序员必知必会的十大排序算法
  12. 大数据是如何改变制造业
  13. 两种方式从Mac计算机上的启动板菜单中删除应用程序?
  14. 【图像分割】基于K-means聚类算法图像分割【含Matlab源码 1476期】
  15. 计算机组成原理白中英第四章,白中英计算机组成原理第四章答案.ppt
  16. mysql虚拟列表_动态网页制作-官方版合集下载-多特
  17. heritrix3.x--SURT / 限定heritrix的爬行域
  18. ar 微信小程序_微信小程序开放AR功能,全面提升交互体验
  19. VBA 数值-文本转换
  20. Python绘制MACD指标图

热门文章

  1. 向中级程序员进击之路
  2. 使用pygame制作Flappy bird小游戏
  3. 阴阳师服务器维护内容,阴阳师8月1日维护内容介绍_友人帐弈鬼切内容介绍_3DM手游...
  4. 力扣刷题 DAY_85 贪心
  5. jQuery--下载方法
  6. Stata:各类全要素生产率TFP估算方法
  7. 临时码农敲门砖 有效 2022-10-16 (详情 见文末)
  8. 目标检测之YOLOv4算法分析
  9. html怎么直接修改,如何编辑运行HTML网页文件(HTML编辑工具使用介绍)
  10. JSP课程设计——民航售票管理系统