oracle querytimeout,聊聊pg jdbc的queryTimeout及next方法
序
本文主要介绍一下pg jdbc statement的queryTimeout及resultSet的next方法
实例程序
@Test
public void testReadTimeout() throws SQLException {
Connection connection = dataSource.getConnection();
//https://jdbc.postgresql.org/documentation/head/query.html
connection.setAutoCommit(false); //NOTE 为了设置fetchSize,必须设置为false
String sql = "select * from demo_table";
PreparedStatement pstmt;
try {
pstmt = (PreparedStatement)connection.prepareStatement(sql);
pstmt.setQueryTimeout(1); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大
pstmt.setFetchSize(5000); //NOTE 这样设置为了模拟query timeout的异常
System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout());
System.out.println("ps.getFetchSize():" + pstmt.getFetchSize());
System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection());
System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize());
ResultSet rs = pstmt.executeQuery(); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大
//NOTE 这里返回了就代表statement执行完成,默认返回fetchSize的数据
int col = rs.getMetaData().getColumnCount();
System.out.println("============================");
while (rs.next()) { //NOTE 这个的timeout由socket的超时时间设置,oracle.jdbc.ReadTimeout=60000
for (int i = 1; i <= col; i++) {
System.out.print(rs.getObject(i));
}
System.out.println("");
}
System.out.println("============================");
} catch (SQLException e) {
e.printStackTrace();
} finally {
//close resources
}
}
PgStatement
ostgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgStatement.java
executeInternal()
private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
throws SQLException {
closeForNextExecution();
// Enable cursor-based resultset if possible.
if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit()
&& !wantsHoldableResultSet()) {
flags |= QueryExecutor.QUERY_FORWARD_CURSOR;
}
if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS;
// If the no results flag is set (from executeUpdate)
// clear it so we get the generated keys results.
//
if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) {
flags &= ~(QueryExecutor.QUERY_NO_RESULTS);
}
}
if (isOneShotQuery(cachedQuery)) {
flags |= QueryExecutor.QUERY_ONESHOT;
}
// Only use named statements after we hit the threshold. Note that only
// named statements can be transferred in binary format.
if (connection.getAutoCommit()) {
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
}
// updateable result sets do not yet support binary updates
if (concurrency != ResultSet.CONCUR_READ_ONLY) {
flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER;
}
Query queryToExecute = cachedQuery.query;
if (queryToExecute.isEmpty()) {
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
}
if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers
&& (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
// Simple 'Q' execution does not need to know parameter types
// When binaryTransfer is forced, then we need to know resulting parameter and column types,
// thus sending a describe request.
int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY;
StatementResultHandler handler2 = new StatementResultHandler();
connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0,
flags2);
ResultWrapper result2 = handler2.getResults();
if (result2 != null) {
result2.getResultSet().close();
}
}
StatementResultHandler handler = new StatementResultHandler();
result = null;
try {
startTimer();
connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
fetchSize, flags);
} finally {
killTimerTask();
}
result = firstUnclosedResult = handler.getResults();
if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
generatedKeys = result;
result = result.getNext();
if (wantsGeneratedKeysOnce) {
wantsGeneratedKeysOnce = false;
}
}
}
注意,这里在执行前后分别调用了startTimer()和killTimerTask()
startTimer()
private void startTimer() {
/*
* there shouldn't be any previous timer active, but better safe than sorry.
*/
cleanupTimer();
STATE_UPDATER.set(this, StatementCancelState.IN_QUERY);
if (timeout == 0) {
return;
}
TimerTask cancelTask = new TimerTask() {
public void run() {
try {
if (!CANCEL_TIMER_UPDATER.compareAndSet(PgStatement.this, this, null)) {
// Nothing to do here, statement has already finished and cleared
// cancelTimerTask reference
return;
}
PgStatement.this.cancel();
} catch (SQLException e) {
}
}
};
CANCEL_TIMER_UPDATER.set(this, cancelTask);
connection.addTimerTask(cancelTask, timeout);
}
startTimer调用了cleanupTimer()
cancelTask调用的是PgStatement.this.cancel()
最后调用connection.addTimerTask添加定时任务
cleanupTimer()
/**
* Clears {@link #cancelTimerTask} if any. Returns true if and only if "cancel" timer task would
* never invoke {@link #cancel()}.
*/
private boolean cleanupTimer() {
TimerTask timerTask = CANCEL_TIMER_UPDATER.get(this);
if (timerTask == null) {
// If timeout is zero, then timer task did not exist, so we safely report "all clear"
return timeout == 0;
}
if (!CANCEL_TIMER_UPDATER.compareAndSet(this, timerTask, null)) {
// Failed to update reference -> timer has just fired, so we must wait for the query state to
// become "cancelling".
return false;
}
timerTask.cancel();
connection.purgeTimerTasks();
// All clear
return true;
}
注意这里更新statement状态之后,调用task的cancel,以及connection.purgeTimerTasks()
cancel()
public void cancel() throws SQLException {
if (!STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.CANCELING)) {
// Not in query, there's nothing to cancel
return;
}
try {
// Synchronize on connection to avoid spinning in killTimerTask
synchronized (connection) {
connection.cancelQuery();
}
} finally {
STATE_UPDATER.set(this, StatementCancelState.CANCELLED);
synchronized (connection) {
connection.notifyAll(); // wake-up killTimerTask
}
}
}
executeQuery超时了则直接调用connection.cancelQuery()
public void cancelQuery() throws SQLException {
checkClosed();
queryExecutor.sendQueryCancel();
}
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/QueryExecutorBase.java
public void sendQueryCancel() throws SQLException {
if (cancelPid <= 0) {
return;
}
PGStream cancelStream = null;
// Now we need to construct and send a cancel packet
try {
if (logger.logDebug()) {
logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")");
}
cancelStream =
new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout);
if (cancelSignalTimeout > 0) {
cancelStream.getSocket().setSoTimeout(cancelSignalTimeout);
}
cancelStream.sendInteger4(16);
cancelStream.sendInteger2(1234);
cancelStream.sendInteger2(5678);
cancelStream.sendInteger4(cancelPid);
cancelStream.sendInteger4(cancelKey);
cancelStream.flush();
cancelStream.receiveEOF();
} catch (IOException e) {
// Safe to ignore.
if (logger.logDebug()) {
logger.debug("Ignoring exception on cancel request:", e);
}
} finally {
if (cancelStream != null) {
try {
cancelStream.close();
} catch (IOException e) {
// Ignored.
}
}
}
}
向数据库server发送cancel指令
killTimerTask()
private void killTimerTask() {
boolean timerTaskIsClear = cleanupTimer();
// The order is important here: in case we need to wait for the cancel task, the state must be
// kept StatementCancelState.IN_QUERY, so cancelTask would be able to cancel the query.
// It is believed that this case is very rare, so "additional cancel and wait below" would not
// harm it.
if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.IDLE)) {
return;
}
// Being here means someone managed to call .cancel() and our connection did not receive
// "timeout error"
// We wait till state becomes "cancelled"
boolean interrupted = false;
while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) {
synchronized (connection) {
try {
// Note: wait timeout here is irrelevant since synchronized(connection) would block until
// .cancel finishes
connection.wait(10);
} catch (InterruptedException e) { // NOSONAR
// Either re-interrupt this method or rethrow the "InterruptedException"
interrupted = true;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
这里先调用cleanupTimer,然后更新statement的状态
PgConnection
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java
getTimer()
private synchronized Timer getTimer() {
if (cancelTimer == null) {
cancelTimer = Driver.getSharedTimer().getTimer();
}
return cancelTimer;
}
这里创建或获取一个timer
addTimerTask()
public void addTimerTask(TimerTask timerTask, long milliSeconds) {
Timer timer = getTimer();
timer.schedule(timerTask, milliSeconds);
}
这个添加timerTask就是直接调度了
purgeTimerTasks()
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java
public void purgeTimerTasks() {
Timer timer = cancelTimer;
if (timer != null) {
timer.purge();
}
}
在cleanupTimer中被调用,用来清理已经被cancel掉的timer task
PgResultSet
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java
next()
public boolean next() throws SQLException {
checkClosed();
if (onInsertRow) {
throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."),
PSQLState.INVALID_CURSOR_STATE);
}
if (current_row + 1 >= rows.size()) {
if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) {
current_row = rows.size();
this_row = null;
rowBuffer = null;
return false; // End of the resultset.
}
// Ask for some more data.
row_offset += rows.size(); // We are discarding some data.
int fetchRows = fetchSize;
if (maxRows != 0) {
if (fetchRows == 0 || row_offset + fetchRows > maxRows) {
// Fetch would exceed maxRows, limit it.
fetchRows = maxRows - row_offset;
}
}
// Execute the fetch and update this resultset.
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
current_row = 0;
// Test the new rows array.
if (rows.isEmpty()) {
this_row = null;
rowBuffer = null;
return false;
}
} else {
current_row++;
}
initRowBuffer();
return true;
}
这里的fetch没有像executeQuery那样加timer
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java
public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize)
throws SQLException {
waitOnLock();
final Portal portal = (Portal) cursor;
// Insert a ResultHandler that turns bare command statuses into empty datasets
// (if the fetch returns no rows, we see just a CommandStatus..)
final ResultHandler delegateHandler = handler;
handler = new ResultHandlerDelegate(delegateHandler) {
public void handleCommandStatus(String status, int updateCount, long insertOID) {
handleResultRows(portal.getQuery(), null, new ArrayList(), null);
}
};
// Now actually run it.
try {
processDeadParsedQueries();
processDeadPortals();
sendExecute(portal.getQuery(), portal, fetchSize);
sendSync();
processResults(handler, 0);
estimatedReceiveBufferBytes = 0;
} catch (IOException e) {
abort();
handler.handleError(
new PSQLException(GT.tr("An I/O error occurred while sending to the backend."),
PSQLState.CONNECTION_FAILURE, e));
}
handler.handleCompletion();
}
小结
queryTimeout是采用添加timer来控制,如果请求过多,可能会造成timer过多
timeout时间不宜过长,不过正常执行完sql,会调用killTimerTask()方,里头会先cleanupTimer,取消timerTask,然后调用purgeTimerTasks()清理cancel掉的task,避免timeout时间过长导致task堆积最后内存溢出
超时之后会timer task会向数据库server发送cancel query指令
发送完cancel query指令之后,client端的查询按预期应该抛出SQLException(这里头的机制有待深入研究,可能是server端返回timeout error)
executeQuery方法默认会拉取fetchSize的数据并返回
next()方法根据需要再去fetch,这个fetch方法就没有timer来限制时间了,但是最底层应该是受socketTimeout限制
doc
oracle querytimeout,聊聊pg jdbc的queryTimeout及next方法相关推荐
- java的maxrow_聊聊pg jdbc statement的maxRows参数
序 本文主要解析一下pg jdbc statement的maxRows参数 Statement.setMaxRows void setMaxRows(int max) throws SQLExcept ...
- java access jdbc_Java jdbc连接Access数据库的方法学习(ucanaccess驱动)
我们如何使用java jdbc连接Access数据库并进行读写呢?这个只需要使用UcanaccessDriver这样的驱动就可以了,首先java连接Access数据库需要引入net相关的jar包,Ac ...
- oracle中按指定条数批量处理的方法
oracle中按指定条数批量处理的方法 示例 --每处理10000条提交一次 loop insert into t2 select * from t1 where rownum <= 10000 ...
- oracle ORA-01000: maximum open cursors exceeded问题的解决方法
oracle ORA-01000: maximum open cursors exceeded问题的解决方法 参考文章: (1)oracle ORA-01000: maximum open curso ...
- Oracle用户密码过期和用户被锁解决方法【转】
Oracle用户密码过期和用户被锁解决方法[转] 参考文章: (1)Oracle用户密码过期和用户被锁解决方法[转] (2)https://www.cnblogs.com/paul8339/p/590 ...
- Oracle怎样创建共享文件夹,Oracle vm要如何使用共享文件夹的解决方法
了解过的Oracle vm虚拟机的人,用过的朋友都知道,这是一个很方便的虚拟机,当然唯一不足的是它不支持直接的主机到虚拟机的文件拖拽,但是有其他的方式可以解决这个问题.下面是学习啦小编为大家整理的关于 ...
- Oracle数据库的impdp导入操作以及dba_directories使用方法
Oracle数据库的impdp导入操作以及dba_directories使用方法 今天从同事那里拿到了导出的dmp文件,当导入时发现了很多问题,记下来以免以后忘记,以下是本人的操作过程: 1.首先是创 ...
- oracle精确匹配时间,Oracle时间精确到时、分、秒处理方法
Oracle时间精确到时.分.秒处理方法 一般数据库中时间的格式为DATE类型,而我们从页面中获取的时间往往为String类型,这个就需要类型的转换.一般我们会通过调用 Java.text.Simpl ...
- oracle数据库重建em,oracle 11g em重建报唯一约束错误解决方法
oracle 11g em重建报唯一约束错误解决方法 更新时间:2012年11月27日 15:07:33 作者: 今天在手工配置Oracle11g的EM时总是报如下错误,也没有找到解决办法,以下是 ...
最新文章
- Git创建本地分支并关联远程分支
- 建立TCP连接时的三次握手与四次挥手问题
- RunDll32.exe 详解及[Windows批处理]清除IE缓存
- python 编辑距离 2组匹配_Python文本相似性计算之编辑距离详解
- 关于MARATHON和容器的端口映射
- 文华wh6如何修改服务器,文华财经 软件特色功能介绍修改
- Win10卸载微软sql服务器,Win10 64位如何彻底卸载Sql Server 2012 SQL2012卸载后无法重装的解决方法...
- easyrecovery15新版绿色序列号数据恢复软件
- cad剪裁地形图lisp_CAD怎么在完整地形图里截取需要的部分地形图
- 2020年阴历二月二十九 投资理财~业余投资者如果不深入研究财报该怎么办?
- 利用APPInventor开发手机APP,实现OBLOQ-IOT与Arduino设备通信
- 《Adobe After Effects CS6完全剖析》——第2章 时间标尺 营造整洁的工作流程之梦...
- 去掉串口硬盘的安全删除硬件图标
- 矫正ubuntu系统时间
- php视频直播系统源码Android 修改状态栏颜色 白色、透明色
- WebSocket connection to ‘wss://xxx.com’ failed: Error in NET::ERR_SSL_OBSOLETE_VERSION
- C++ 多线程学习总结
- 移动端h5网页调用支付宝支付接口
- 用Facebook做广告和营销,你需要注意哪些问题?
- 使用flex 布局让子元素 左右间距相等
热门文章
- 【Flink on k8s】JConsole 远程监控 TaskManager
- 【scala】IDEA运行scala程序:Error:scalac: bad option: ‘-make:transitive‘
- 【Elasticsearch】es mapper_parsing_exception
- 【Flink】Flink 单个任务 多个流的消费同一个topic的时候其中一个流卡死 不消费
- 【Java】java中的Fork/Join
- 【Siddhi】Siddhi的window操作
- Spring : spring的aware
- java.lang.Runtime.availableProcessors返回可用处理器的Java虚拟机的数量
- 贼好用的Java工具类库,GitHub星标10k+,你在用吗?
- Jdk14 都要出了,Jdk9 的新特性还不了解一下?