环境说明

一Oracle高级消息队列AQ创建消息负荷payload

创建队列表

创建队列并启动

队列的停止和删除

入队消息

出队消息

二Java使用JMS监听并处理Oracle AQ队列创建连接参数类

创建消息转换类

主类进行消息处理

三监控表记录变化通知Java创建表

创建存储过程

创建触发器

环境说明

本实验环境基于Oracle 12C和JDK1.8,其中Oracle 12C支持多租户特性,相较于之前的Oracle版本,使用‘C##用户名‘表示用户,例如如果数据库用户叫kevin,则登陆时使用C##kevin进行登陆。

一、Oracle高级消息队列AQ

Oracle AQ是Oracle中的消息队列,是Oracle中的一种高级应用,每个版本都在不断的加强,使用DBMS_AQ系统包进行相应的操作,是Oracle的默认组件,只要安装了Oracle数据库就可以使用。使用AQ可以在多个Oracle数据库、Oracle与Java、C等系统中进行数据传输。

下面分步骤说明如何创建Oracle AQ

1. 创建消息负荷payload

Oracle AQ中传递的消息被称为有效负荷(payloads),格式可以是用户自定义对象或XMLType或ANYDATA。本例中我们创建一个简单的对象类型用于传递消息。

create type demo_queue_payload_type as object (message varchar2(4000));

2. 创建队列表

队列表用于存储消息,在入队时自动存入表中,出队时自动删除。使用DBMS_AQADM包进行数据表的创建,只需要写表名,同时设置相应的属性。对于队列需要设置multiple_consumers为false,如果使用发布/订阅模式需要设置为true。

begin

dbms_aqadm.create_queue_table(

queue_table => 'demo_queue_table',

queue_payload_type => 'demo_queue_payload_type',

multiple_consumers => false

);

end;

执行完后可以查看oracle表中自动生成了demo_queue_table表,可以查看影响子段(含义比较清晰)。

3. 创建队列并启动

创建队列并启动队列:

begin

dbms_aqadm.create_queue (

queue_name => 'demo_queue',

queue_table => 'demo_queue_table'

);

dbms_aqadm.start_queue(

queue_name => 'demo_queue'

);

end;

至此,我们已经创建了队列有效负荷,队列表和队列。可以查看以下系统创建了哪些相关的对象:

SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';

OBJECT_NAME OBJECT_TYPE

------------------------------ ---------------

DEMO_QUEUE_TABLE TABLE

SYS_C009392 INDEX

SYS_LOB0000060502C00030$$ LOB

AQ$_DEMO_QUEUE_TABLE_T INDEX

AQ$_DEMO_QUEUE_TABLE_I INDEX

AQ$_DEMO_QUEUE_TABLE_E QUEUE

AQ$DEMO_QUEUE_TABLE VIEW

DEMO_QUEUE QUEUE

我们看到一个队列带出了一系列自动生成对象,有些是被后面直接用到的。不过有趣的是,创建了第二个队列。这就是所谓的异常队列(exception queue)。如果AQ无法从我们的队列接收消息,将记录在该异常队列中。

消息多次处理出错等情况会自动转移到异常的队列,对于异常队列如何处理目前笔者还没有找到相应的写法,因为我使用的场景并不要求消息必须一对一的被处理,只要起到通知的作用即可。所以如果消息转移到异常队列,可以执行清空队列表中的数据

delete from demo_queue_table;

4. 队列的停止和删除

如果需要删除或重建可以使用下面的方法进行操作:

BEGIN

DBMS_AQADM.STOP_QUEUE(

queue_name => 'demo_queue'

);

DBMS_AQADM.DROP_QUEUE(

queue_name => 'demo_queue'

);

DBMS_AQADM.DROP_QUEUE_TABLE(

queue_table => 'demo_queue_table'

);

END;

5. 入队消息

入列操作是一个基本的事务操作(就像往队列表Insert),因此我们需要提交。

declare

r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;

r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

v_message_handle RAW(16);

o_payload demo_queue_payload_type;

begin

o_payload := demo_queue_payload_type('what is you name ?');

dbms_aq.enqueue(

queue_name => 'demo_queue',

enqueue_options => r_enqueue_options,

message_properties => r_message_properties,

payload => o_payload,

msgid => v_message_handle

);

commit;

end;

通过SQL语句查看消息是否正常入队:

select * from aq$demo_queue_table;

select user_data from aq$demo_queue_table;

6. 出队消息

使用Oracle进行出队操作,我没有实验成功(不确定是否和DBMS_OUTPUT的执行权限有关),代码如下,读者可以进行调试:

declare

r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;

r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

v_message_handle RAW(16);

o_payload demo_queue_payload_type;

begin

DBMS_AQ.DEQUEUE(

queue_name => 'demo_queue',

dequeue_options => r_dequeue_options,

message_properties => r_message_properties,

payload => o_payload,

msgid => v_message_handle

);

DBMS_OUTPUT.PUT_LINE(

'***** Browse message is [' || o_payload.message || ']****'

);

end;

二、Java使用JMS监听并处理Oracle AQ队列

Java使用JMS进行相应的处理,需要使用Oracle提供的jar,在Oracle安装目录可以找到:在linux中可以使用find命令进行查找,例如

find `pwd` -name 'jmscommon.jar'

需要的jar为:

app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar

app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar

app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar

app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar

app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar

1. 创建连接参数类

实际使用时可以把参数信息配置在properties文件中,使用Spring进行注入。

package org.kevin.jms;

/**

*

*@author 李文锴

* 连接参数信息

*

*/

public class JmsConfig {

public String username = "c##kevin";

public String password = "a111111111";

public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";

public String queueName = "demo_queue";

}

2. 创建消息转换类

因为消息载荷是Oracle数据类型,需要提供一个转换工厂类将Oracle类型转换为Java类型。

package org.kevin.jms;

import java.sql.SQLException;

import oracle.jdbc.driver.OracleConnection;

import oracle.jdbc.internal.OracleTypes;

import oracle.jpub.runtime.MutableStruct;

import oracle.sql.CustomDatum;

import oracle.sql.CustomDatumFactory;

import oracle.sql.Datum;

import oracle.sql.STRUCT;

/**

*

*@author 李文锴

* 数据类型转换类

*

*/

@SuppressWarnings("deprecation")

public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {

public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";

public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

MutableStruct _struct;

// 12表示字符串

static int[] _sqlType = { 12 };

static CustomDatumFactory[] _factory = new CustomDatumFactory[1];

static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();

public static CustomDatumFactory getFactory() {

return _MessageFactory;

}

public QUEUE_MESSAGE_TYPE() {

_struct = new MutableStruct(new Object[1], _sqlType, _factory);

}

public Datum toDatum(OracleConnection c) throws SQLException {

return _struct.toDatum(c, _SQL_NAME);

}

public CustomDatum create(Datum d, int sqlType) throws SQLException {

if (d == null)

return null;

QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();

o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);

return o;

}

public String getContent() throws SQLException {

return (String) _struct.getAttribute(0);

}

}

3. 主类进行消息处理

package org.kevin.jms;

import java.util.Properties;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.Session;

import oracle.jms.AQjmsAdtMessage;

import oracle.jms.AQjmsDestination;

import oracle.jms.AQjmsFactory;

import oracle.jms.AQjmsSession;

/**

*

*@author 李文锴 消息处理类

*

*/

public class Main {

public static void main(String[] args) throws Exception {

JmsConfig config = new JmsConfig();

QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl,

new Properties());

QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password);

AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

conn.start();

Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName);

MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);

consumer.setMessageListener(new MessageListener() {

@Override

public void onMessage(Message message) {

System.out.println("ok");

AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;

try {

QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();

System.out.println(payload.getContent());

} catch (Exception e) {

e.printStackTrace();

}

}

});

Thread.sleep(1000000);

}

}

使用Oracle程序块进行入队操作,在没有启动Java时看到队列表中存在数据。启动Java后,控制台正确的输出的消息;通过Oracle程序块再次写入消息,发现控制台正确处理消息。Java的JMS监听不是立刻进行处理,可能存在几秒中的时间差,时间不等。

三、监控表记录变化通知Java

下面的例子创建一个数据表,然后在表中添加触发器,当数据变化后触发器调用存储过程给Oracle AQ发送消息,然后使用Java JMS对消息进行处理。

1. 创建表

创建student表,包含username和age两个子段,其中username时varchar2类型,age时number类型。

2. 创建存储过程

创建send_aq_msg存储过程,因为存储过程中调用dbms数据包,系统包在存储过程中执行需要进行授权(使用sys用户进行授权):

grant execute on dbms_aq to c##kevin;

注意存储过程中包含commit语句。

create or replace

PROCEDURE send_aq_msg (info IN VARCHAR2) as

r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;

r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

v_message_handle RAW(16);

o_payload demo_queue_payload_type;

begin

o_payload := demo_queue_payload_type(info);

dbms_aq.enqueue(

queue_name => 'demo_queue',

enqueue_options => r_enqueue_options,

message_properties => r_message_properties,

payload => o_payload,

msgid => v_message_handle

);

commit;

end send_aq_msg;

3. 创建触发器

在student表中创建触发器,当数据写入或更新时,如果age=18,则进行入队操作。需要调用存储过程发送消息,但触发器中不能包含事物提交语句,因此需要使用pragma autonomous_transaction;声明自由事物:

CREATE OR REPLACE TRIGGER STUDENT_TR

AFTER INSERT OR UPDATE OF AGE ON STUDENT FOR EACH ROW

DECLARE

pragma autonomous_transaction;

BEGIN

if :new.age = 18 then

send_aq_msg(:new.username);

end if;

END;

创建完触发器后向执行插入或更新操作:

insert into student (username,age) values ('jack.lee.3k', 18);

update student set age=18 where username='jack003';

Java JMS可以正确的处理消息。

java监听oracle aq,透过JMS监听Oracle AQ,在数据库变化时触发执行Java程序相关推荐

  1. 通过JMS监听Oracle AQ,在数据库变化时触发执行Java程序

    环境说明 一Oracle高级消息队列AQ 创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列 创建连接参 ...

  2. 通过JMS监听Oracle AQ,在数据苦表变化时触发并执行Java程序

    环境说明 一Oracle高级消息队列AQ 创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列 创建连接参 ...

  3. java的tey语句return了_Java中try、finally语句中有return时的执行情况

    在Java中当try.finally语句中包含return语句时,执行情况到底是怎样的,finally中的代码是否执行,大家众说纷纭,有的说会执行,有的说不会执行,到底哪种说法正确,现在通过下面的例子 ...

  4. vue 深度监听watch(如何watch监听一个对象内部的变化)

    第一个handler:其值是一个回调函数.即监听到变化时应该执行的函数. 第二个是deep:其值是true或false:确认是否深入监听.(一般监听时是不能监听到对象属性值的变化的,数组的值变化可以听 ...

  5. java计算机毕业设计云端小区物业智能管理系统源码+系统+mysql数据库+lw文档+部署

    java计算机毕业设计云端小区物业智能管理系统源码+系统+mysql数据库+lw文档+部署 java计算机毕业设计云端小区物业智能管理系统源码+系统+mysql数据库+lw文档+部署 本源码技术栈: ...

  6. 为什么java可跨平台执行,java为什么可以跨平台执行

    java为什么可以跨平台执行以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! java为什么可以跨平台执行 因为java程 ...

  7. windows计划任务启动bat执行java文件

    系统:win7 环境:需要配置好jdk的环境变量 需求:每次开机,用bat批处理执行将一个位于D:\workspace\console目录底下的console.txt重命名的java文件 拿到需求,我 ...

  8. Jexl表达式引擎-根据字符串动态执行JAVA

    Table of Contents generated with DocToc 一.使用场景 二.市面上表达式引擎比较 2.1 Aviator 2.2 Jexl 一.使用场景 在做某些项目的时候,有时 ...

  9. java计算机毕业设计Vue垃圾分类指南平台设计与实现源码+数据库+系统+lw文档

    java计算机毕业设计Vue垃圾分类指南平台设计与实现源码+数据库+系统+lw文档 java计算机毕业设计Vue垃圾分类指南平台设计与实现源码+数据库+系统+lw文档 本源码技术栈: 项目架构:B/S ...

最新文章

  1. ATS 6.2.1中缓存文件过期并不回源校验的“坑”
  2. 鸿蒙之后华为把欧拉也捐了,还承诺不做欧拉商用发行版
  3. android 读取其他应用程序,android – 在另一个应用程序中请求我自己的ContentProvider的读取权限...
  4. 浅谈腾讯微博与新浪微博的优劣
  5. 在英特尔® 凌动™ 处理器上将 OpenGL* 游戏移植到 Android* (第一部分)
  6. ProjeQtOr(项目管理软件) v9.0.2
  7. Python中remove,pop,del的区别
  8. svn服务的安装与设置 .
  9. 品牌诞生于两个驱动力
  10. 回归(regression)——统计学习方法
  11. python operator.itemgetter
  12. TypeScript 的声明文件的使用与编写
  13. WebResponse 跨域访问
  14. sklearn中的损失函数
  15. java 如何执行dig 命令_linux dig 命令使用方法
  16. Linux系统可视化界面与Shell界面切换
  17. Qt面对高分辨率屏幕的解决方法思考
  18. 软件工程:数据流图和结构图怎么画?
  19. 海尔正式发布COSMO平台,世界智能制造将要去哪?
  20. Android MTU 值修改

热门文章

  1. 本周开课 | 第15期高级转录组分析和R数据可视化培训
  2. 解决A component required a bean named ‘entityManagerFactory‘ that could not be found
  3. 字符串的常用内置方法
  4. 判断一个字符串是否是合法IP地址
  5. JPEG公布智能图像编码提案结果,火山引擎排名主观质量评测第一
  6. OpenGL实用开源库 汇总
  7. 啰里吧嗦式讲解java静态代理动态代理模式
  8. 如何写好宣传软文?软文推广对企业有什么帮助?
  9. 如何通过今日头条引精准流量,学完即用
  10. SpringMVC(狂神学习笔记)2021-10-5