java监听oracle aq,透过JMS监听Oracle AQ,在数据库变化时触发执行Java程序
环境说明
一Oracle高级消息队列AQ创建消息负荷payload
创建队列表
创建队列并启动
队列的停止和删除
入队消息
出队消息
二Java使用JMS监听并处理Oracle AQ队列创建连接参数类
创建消息转换类
主类进行消息处理
三监控表记录变化通知Java创建表
创建存储过程
创建触发器
环境说明
本实验环境基于Oracle 12C和JDK1.8,其中Oracle 12C支持多租户特性,相较于之前的Oracle版本,使用‘C##用户名‘表示用户,例如如果数据库用户叫kevin,则登陆时使用C##kevin进行登陆。
一、Oracle高级消息队列AQ
Oracle AQ是Oracle中的消息队列,是Oracle中的一种高级应用,每个版本都在不断的加强,使用DBMS_AQ系统包进行相应的操作,是Oracle的默认组件,只要安装了Oracle数据库就可以使用。使用AQ可以在多个Oracle数据库、Oracle与Java、C等系统中进行数据传输。
下面分步骤说明如何创建Oracle AQ
1. 创建消息负荷payload
Oracle AQ中传递的消息被称为有效负荷(payloads),格式可以是用户自定义对象或XMLType或ANYDATA。本例中我们创建一个简单的对象类型用于传递消息。
create type demo_queue_payload_type as object (message varchar2(4000));
2. 创建队列表
队列表用于存储消息,在入队时自动存入表中,出队时自动删除。使用DBMS_AQADM包进行数据表的创建,只需要写表名,同时设置相应的属性。对于队列需要设置multiple_consumers为false,如果使用发布/订阅模式需要设置为true。
begin
dbms_aqadm.create_queue_table(
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type',
multiple_consumers => false
);
end;
执行完后可以查看oracle表中自动生成了demo_queue_table表,可以查看影响子段(含义比较清晰)。
3. 创建队列并启动
创建队列并启动队列:
begin
dbms_aqadm.create_queue (
queue_name => 'demo_queue',
queue_table => 'demo_queue_table'
);
dbms_aqadm.start_queue(
queue_name => 'demo_queue'
);
end;
至此,我们已经创建了队列有效负荷,队列表和队列。可以查看以下系统创建了哪些相关的对象:
SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';
OBJECT_NAME OBJECT_TYPE
------------------------------ ---------------
DEMO_QUEUE_TABLE TABLE
SYS_C009392 INDEX
SYS_LOB0000060502C00030$$ LOB
AQ$_DEMO_QUEUE_TABLE_T INDEX
AQ$_DEMO_QUEUE_TABLE_I INDEX
AQ$_DEMO_QUEUE_TABLE_E QUEUE
AQ$DEMO_QUEUE_TABLE VIEW
DEMO_QUEUE QUEUE
我们看到一个队列带出了一系列自动生成对象,有些是被后面直接用到的。不过有趣的是,创建了第二个队列。这就是所谓的异常队列(exception queue)。如果AQ无法从我们的队列接收消息,将记录在该异常队列中。
消息多次处理出错等情况会自动转移到异常的队列,对于异常队列如何处理目前笔者还没有找到相应的写法,因为我使用的场景并不要求消息必须一对一的被处理,只要起到通知的作用即可。所以如果消息转移到异常队列,可以执行清空队列表中的数据
delete from demo_queue_table;
4. 队列的停止和删除
如果需要删除或重建可以使用下面的方法进行操作:
BEGIN
DBMS_AQADM.STOP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'demo_queue_table'
);
END;
5. 入队消息
入列操作是一个基本的事务操作(就像往队列表Insert),因此我们需要提交。
declare
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
begin
o_payload := demo_queue_payload_type('what is you name ?');
dbms_aq.enqueue(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end;
通过SQL语句查看消息是否正常入队:
select * from aq$demo_queue_table;
select user_data from aq$demo_queue_table;
6. 出队消息
使用Oracle进行出队操作,我没有实验成功(不确定是否和DBMS_OUTPUT的执行权限有关),代码如下,读者可以进行调试:
declare
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
begin
DBMS_AQ.DEQUEUE(
queue_name => 'demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'***** Browse message is [' || o_payload.message || ']****'
);
end;
二、Java使用JMS监听并处理Oracle AQ队列
Java使用JMS进行相应的处理,需要使用Oracle提供的jar,在Oracle安装目录可以找到:在linux中可以使用find命令进行查找,例如
find `pwd` -name 'jmscommon.jar'
需要的jar为:
app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar
app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar
app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar
app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar
app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar
1. 创建连接参数类
实际使用时可以把参数信息配置在properties文件中,使用Spring进行注入。
package org.kevin.jms;
/**
*
*@author 李文锴
* 连接参数信息
*
*/
public class JmsConfig {
public String username = "c##kevin";
public String password = "a111111111";
public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";
public String queueName = "demo_queue";
}
2. 创建消息转换类
因为消息载荷是Oracle数据类型,需要提供一个转换工厂类将Oracle类型转换为Java类型。
package org.kevin.jms;
import java.sql.SQLException;
import oracle.jdbc.driver.OracleConnection;
import oracle.jdbc.internal.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.CustomDatum;
import oracle.sql.CustomDatumFactory;
import oracle.sql.Datum;
import oracle.sql.STRUCT;
/**
*
*@author 李文锴
* 数据类型转换类
*
*/
@SuppressWarnings("deprecation")
public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {
public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";
public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
MutableStruct _struct;
// 12表示字符串
static int[] _sqlType = { 12 };
static CustomDatumFactory[] _factory = new CustomDatumFactory[1];
static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();
public static CustomDatumFactory getFactory() {
return _MessageFactory;
}
public QUEUE_MESSAGE_TYPE() {
_struct = new MutableStruct(new Object[1], _sqlType, _factory);
}
public Datum toDatum(OracleConnection c) throws SQLException {
return _struct.toDatum(c, _SQL_NAME);
}
public CustomDatum create(Datum d, int sqlType) throws SQLException {
if (d == null)
return null;
QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();
o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
return o;
}
public String getContent() throws SQLException {
return (String) _struct.getAttribute(0);
}
}
3. 主类进行消息处理
package org.kevin.jms;
import java.util.Properties;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
/**
*
*@author 李文锴 消息处理类
*
*/
public class Main {
public static void main(String[] args) throws Exception {
JmsConfig config = new JmsConfig();
QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl,
new Properties());
QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password);
AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName);
MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("ok");
AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
try {
QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();
System.out.println(payload.getContent());
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(1000000);
}
}
使用Oracle程序块进行入队操作,在没有启动Java时看到队列表中存在数据。启动Java后,控制台正确的输出的消息;通过Oracle程序块再次写入消息,发现控制台正确处理消息。Java的JMS监听不是立刻进行处理,可能存在几秒中的时间差,时间不等。
三、监控表记录变化通知Java
下面的例子创建一个数据表,然后在表中添加触发器,当数据变化后触发器调用存储过程给Oracle AQ发送消息,然后使用Java JMS对消息进行处理。
1. 创建表
创建student表,包含username和age两个子段,其中username时varchar2类型,age时number类型。
2. 创建存储过程
创建send_aq_msg存储过程,因为存储过程中调用dbms数据包,系统包在存储过程中执行需要进行授权(使用sys用户进行授权):
grant execute on dbms_aq to c##kevin;
注意存储过程中包含commit语句。
create or replace
PROCEDURE send_aq_msg (info IN VARCHAR2) as
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
begin
o_payload := demo_queue_payload_type(info);
dbms_aq.enqueue(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end send_aq_msg;
3. 创建触发器
在student表中创建触发器,当数据写入或更新时,如果age=18,则进行入队操作。需要调用存储过程发送消息,但触发器中不能包含事物提交语句,因此需要使用pragma autonomous_transaction;声明自由事物:
CREATE OR REPLACE TRIGGER STUDENT_TR
AFTER INSERT OR UPDATE OF AGE ON STUDENT FOR EACH ROW
DECLARE
pragma autonomous_transaction;
BEGIN
if :new.age = 18 then
send_aq_msg(:new.username);
end if;
END;
创建完触发器后向执行插入或更新操作:
insert into student (username,age) values ('jack.lee.3k', 18);
update student set age=18 where username='jack003';
Java JMS可以正确的处理消息。
java监听oracle aq,透过JMS监听Oracle AQ,在数据库变化时触发执行Java程序相关推荐
- 通过JMS监听Oracle AQ,在数据库变化时触发执行Java程序
环境说明 一Oracle高级消息队列AQ 创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列 创建连接参 ...
- 通过JMS监听Oracle AQ,在数据苦表变化时触发并执行Java程序
环境说明 一Oracle高级消息队列AQ 创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列 创建连接参 ...
- java的tey语句return了_Java中try、finally语句中有return时的执行情况
在Java中当try.finally语句中包含return语句时,执行情况到底是怎样的,finally中的代码是否执行,大家众说纷纭,有的说会执行,有的说不会执行,到底哪种说法正确,现在通过下面的例子 ...
- vue 深度监听watch(如何watch监听一个对象内部的变化)
第一个handler:其值是一个回调函数.即监听到变化时应该执行的函数. 第二个是deep:其值是true或false:确认是否深入监听.(一般监听时是不能监听到对象属性值的变化的,数组的值变化可以听 ...
- java计算机毕业设计云端小区物业智能管理系统源码+系统+mysql数据库+lw文档+部署
java计算机毕业设计云端小区物业智能管理系统源码+系统+mysql数据库+lw文档+部署 java计算机毕业设计云端小区物业智能管理系统源码+系统+mysql数据库+lw文档+部署 本源码技术栈: ...
- 为什么java可跨平台执行,java为什么可以跨平台执行
java为什么可以跨平台执行以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! java为什么可以跨平台执行 因为java程 ...
- windows计划任务启动bat执行java文件
系统:win7 环境:需要配置好jdk的环境变量 需求:每次开机,用bat批处理执行将一个位于D:\workspace\console目录底下的console.txt重命名的java文件 拿到需求,我 ...
- Jexl表达式引擎-根据字符串动态执行JAVA
Table of Contents generated with DocToc 一.使用场景 二.市面上表达式引擎比较 2.1 Aviator 2.2 Jexl 一.使用场景 在做某些项目的时候,有时 ...
- java计算机毕业设计Vue垃圾分类指南平台设计与实现源码+数据库+系统+lw文档
java计算机毕业设计Vue垃圾分类指南平台设计与实现源码+数据库+系统+lw文档 java计算机毕业设计Vue垃圾分类指南平台设计与实现源码+数据库+系统+lw文档 本源码技术栈: 项目架构:B/S ...
最新文章
- ATS 6.2.1中缓存文件过期并不回源校验的“坑”
- 鸿蒙之后华为把欧拉也捐了,还承诺不做欧拉商用发行版
- android 读取其他应用程序,android – 在另一个应用程序中请求我自己的ContentProvider的读取权限...
- 浅谈腾讯微博与新浪微博的优劣
- 在英特尔® 凌动™ 处理器上将 OpenGL* 游戏移植到 Android* (第一部分)
- ProjeQtOr(项目管理软件) v9.0.2
- Python中remove,pop,del的区别
- svn服务的安装与设置 .
- 品牌诞生于两个驱动力
- 回归(regression)——统计学习方法
- python operator.itemgetter
- TypeScript 的声明文件的使用与编写
- WebResponse 跨域访问
- sklearn中的损失函数
- java 如何执行dig 命令_linux dig 命令使用方法
- Linux系统可视化界面与Shell界面切换
- Qt面对高分辨率屏幕的解决方法思考
- 软件工程:数据流图和结构图怎么画?
- 海尔正式发布COSMO平台,世界智能制造将要去哪?
- Android MTU 值修改
热门文章
- 本周开课 | 第15期高级转录组分析和R数据可视化培训
- 解决A component required a bean named ‘entityManagerFactory‘ that could not be found
- 字符串的常用内置方法
- 判断一个字符串是否是合法IP地址
- JPEG公布智能图像编码提案结果,火山引擎排名主观质量评测第一
- OpenGL实用开源库 汇总
- 啰里吧嗦式讲解java静态代理动态代理模式
- 如何写好宣传软文?软文推广对企业有什么帮助?
- 如何通过今日头条引精准流量,学完即用
- SpringMVC(狂神学习笔记)2021-10-5