• 环境说明
  • 一Oracle高级消息队列AQ
    • 创建消息负荷payload
    • 创建队列表
    • 创建队列并启动
    • 队列的停止和删除
    • 入队消息
    • 出队消息
  • 二Java使用JMS监听并处理Oracle AQ队列
    • 创建连接参数类
    • 创建消息转换类
    • 主类进行消息处理
  • 三监控表记录变化通知Java
    • 创建表
    • 创建存储过程
    • 创建触发器

环境说明

本实验环境基于Oracle 12C和JDK1.8,其中Oracle 12C支持多租户特性,相较于之前的Oracle版本,使用‘C##用户名‘表示用户,例如如果数据库用户叫kevin,则登陆时使用C##kevin进行登陆。


一、Oracle高级消息队列AQ

Oracle AQ是Oracle中的消息队列,是Oracle中的一种高级应用,每个版本都在不断的加强,使用DBMS_AQ系统包进行相应的操作,是Oracle的默认组件,只要安装了Oracle数据库就可以使用。使用AQ可以在多个Oracle数据库、Oracle与Java、C等系统中进行数据传输。

下面分步骤说明如何创建Oracle AQ

1. 创建消息负荷payload

Oracle AQ中传递的消息被称为有效负荷(payloads),格式可以是用户自定义对象或XMLType或ANYDATA。本例中我们创建一个简单的对象类型用于传递消息。

create type demo_queue_payload_type as object (message varchar2(4000));

2. 创建队列表

队列表用于存储消息,在入队时自动存入表中,出队时自动删除。使用DBMS_AQADM包进行数据表的创建,只需要写表名,同时设置相应的属性。对于队列需要设置multiple_consumers为false,如果使用发布/订阅模式需要设置为true。

begindbms_aqadm.create_queue_table(queue_table   => 'demo_queue_table',queue_payload_type => 'demo_queue_payload_type',multiple_consumers => false);
end;

执行完后可以查看oracle表中自动生成了demo_queue_table表,可以查看影响子段(含义比较清晰)。

3. 创建队列并启动

创建队列并启动队列:

begindbms_aqadm.create_queue (queue_name  => 'demo_queue',queue_table => 'demo_queue_table');dbms_aqadm.start_queue(queue_name  =>  'demo_queue');
end;

至此,我们已经创建了队列有效负荷,队列表和队列。可以查看以下系统创建了哪些相关的对象:

SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';OBJECT_NAME OBJECT_TYPE
------------------------------ ---------------
DEMO_QUEUE_TABLE TABLE
SYS_C009392 INDEX
SYS_LOB0000060502C00030$$ LOB
AQ$_DEMO_QUEUE_TABLE_T INDEX
AQ$_DEMO_QUEUE_TABLE_I INDEX
AQ$_DEMO_QUEUE_TABLE_E QUEUE
AQ$DEMO_QUEUE_TABLE VIEW
DEMO_QUEUE QUEUE

我们看到一个队列带出了一系列自动生成对象,有些是被后面直接用到的。不过有趣的是,创建了第二个队列。这就是所谓的异常队列(exception queue)。如果AQ无法从我们的队列接收消息,将记录在该异常队列中。

消息多次处理出错等情况会自动转移到异常的队列,对于异常队列如何处理目前笔者还没有找到相应的写法,因为我使用的场景并不要求消息必须一对一的被处理,只要起到通知的作用即可。所以如果消息转移到异常队列,可以执行清空队列表中的数据

delete from demo_queue_table;

4. 队列的停止和删除

如果需要删除或重建可以使用下面的方法进行操作:

BEGINDBMS_AQADM.STOP_QUEUE(queue_name => 'demo_queue');DBMS_AQADM.DROP_QUEUE(queue_name => 'demo_queue');DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'demo_queue_table');
END;

5. 入队消息

入列操作是一个基本的事务操作(就像往队列表Insert),因此我们需要提交。

declarer_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;v_message_handle RAW(16);o_payload demo_queue_payload_type;
begino_payload := demo_queue_payload_type('what is you name ?');dbms_aq.enqueue(queue_name  => 'demo_queue',enqueue_options => r_enqueue_options,message_properties => r_message_properties,payload => o_payload,msgid => v_message_handle);commit;
end;

通过SQL语句查看消息是否正常入队:

select * from aq$demo_queue_table;
select user_data from aq$demo_queue_table;

6. 出队消息

使用Oracle进行出队操作,我没有实验成功(不确定是否和DBMS_OUTPUT的执行权限有关),代码如下,读者可以进行调试:

declarer_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;v_message_handle RAW(16);o_payload demo_queue_payload_type;
beginDBMS_AQ.DEQUEUE(queue_name => 'demo_queue',dequeue_options => r_dequeue_options,message_properties => r_message_properties,payload => o_payload,msgid => v_message_handle);DBMS_OUTPUT.PUT_LINE('***** Browse message is [' || o_payload.message || ']****');end;

二、Java使用JMS监听并处理Oracle AQ队列

Java使用JMS进行相应的处理,需要使用Oracle提供的jar,在Oracle安装目录可以找到:在linux中可以使用find命令进行查找,例如

find `pwd` -name 'jmscommon.jar'

需要的jar为:

  • app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar
  • app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar
  • app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar
  • app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar
  • app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar

1. 创建连接参数类

实际使用时可以把参数信息配置在properties文件中,使用Spring进行注入。

package org.kevin.jms;
/*** * @author 李文锴*  连接参数信息**/
public class JmsConfig {public String username = "c##kevin";public String password = "a111111111";public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";public String queueName = "demo_queue";
}

2. 创建消息转换类

因为消息载荷是Oracle数据类型,需要提供一个转换工厂类将Oracle类型转换为Java类型。

package org.kevin.jms;import java.sql.SQLException;import oracle.jdbc.driver.OracleConnection;
import oracle.jdbc.internal.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.CustomDatum;
import oracle.sql.CustomDatumFactory;
import oracle.sql.Datum;
import oracle.sql.STRUCT;/*** * @author 李文锴 * 数据类型转换类**/
@SuppressWarnings("deprecation")
public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";public static final int _SQL_TYPECODE = OracleTypes.STRUCT;MutableStruct _struct;// 12表示字符串static int[] _sqlType = { 12 };static CustomDatumFactory[] _factory = new CustomDatumFactory[1];static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();public static CustomDatumFactory getFactory() {return _MessageFactory;}public QUEUE_MESSAGE_TYPE() {_struct = new MutableStruct(new Object[1], _sqlType, _factory);}public Datum toDatum(OracleConnection c) throws SQLException {return _struct.toDatum(c, _SQL_NAME);}public CustomDatum create(Datum d, int sqlType) throws SQLException {if (d == null)return null;QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);return o;}public String getContent() throws SQLException {return (String) _struct.getAttribute(0);}}

3. 主类进行消息处理

package org.kevin.jms;import java.util.Properties;import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;/*** * @author 李文锴 消息处理类**/
public class Main {public static void main(String[] args) throws Exception {JmsConfig config = new JmsConfig();QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl,new Properties());QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password);AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);conn.start();Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName);MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {System.out.println("ok");AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;try {QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();System.out.println(payload.getContent());} catch (Exception e) {e.printStackTrace();}}});Thread.sleep(1000000);}}

使用Oracle程序块进行入队操作,在没有启动Java时看到队列表中存在数据。启动Java后,控制台正确的输出的消息;通过Oracle程序块再次写入消息,发现控制台正确处理消息。Java的JMS监听不是立刻进行处理,可能存在几秒中的时间差,时间不等。


三、监控表记录变化通知Java

下面的例子创建一个数据表,然后在表中添加触发器,当数据变化后触发器调用存储过程给Oracle AQ发送消息,然后使用Java JMS对消息进行处理。

1. 创建表

创建student表,包含username和age两个子段,其中username时varchar2类型,age时number类型。

2. 创建存储过程

创建send_aq_msg存储过程,因为存储过程中调用dbms数据包,系统包在存储过程中执行需要进行授权(使用sys用户进行授权):

grant execute on dbms_aq to c##kevin;

注意存储过程中包含commit语句。

create or replace
PROCEDURE send_aq_msg (info IN VARCHAR2) asr_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;v_message_handle RAW(16);o_payload demo_queue_payload_type;
begino_payload := demo_queue_payload_type(info);dbms_aq.enqueue(queue_name  => 'demo_queue',enqueue_options => r_enqueue_options,message_properties => r_message_properties,payload => o_payload,msgid => v_message_handle);commit;
end send_aq_msg;

3. 创建触发器

在student表中创建触发器,当数据写入或更新时,如果age=18,则进行入队操作。需要调用存储过程发送消息,但触发器中不能包含事物提交语句,因此需要使用pragma autonomous_transaction;声明自由事物:

CREATE OR REPLACE TRIGGER STUDENT_TR
AFTER INSERT OR UPDATE OF AGE ON STUDENT FOR EACH ROW
DECLARE
pragma autonomous_transaction;
BEGINif :new.age = 18 thensend_aq_msg(:new.username);  end if;
END;

创建完触发器后向执行插入或更新操作:

insert into student (username,age) values ('jack.lee.3k', 18);
update student set age=18 where username='jack003';

Java JMS可以正确的处理消息。

通过JMS监听Oracle AQ,在数据库变化时触发执行Java程序相关推荐

  1. java监听oracle aq,透过JMS监听Oracle AQ,在数据库变化时触发执行Java程序

    环境说明 一Oracle高级消息队列AQ创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列创建连接参数类 ...

  2. 通过JMS监听Oracle AQ,在数据苦表变化时触发并执行Java程序

    环境说明 一Oracle高级消息队列AQ 创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列 创建连接参 ...

  3. java监听oracle aq,JMS监听Oracle AQ

    该文档中,oracle版本为11g,jdk版本1.8,java项目使用maven构建,并使用了定时任务来做AQ监听的重连功能,解决由于外部原因导致连接断裂之后,需要手动重启项目才能恢复连接的问题 一. ...

  4. 监听浏览器最小化与最大化时,执行的动作。

    本周做项目时,实现页面内容滚动的一个功能时, 发现在chrome与FF浏览器最小化后,当再次最大化时,滚动出现错位的现象.鉴于此,就去查了下http://html5test.com/ 中是否有相关的支 ...

  5. Oracle笔记-Oracle Net Manager添加监听IP(当服务器IP变化时要用)

    这里以window为例: 这里的程序为:Net Manager 这里添加一个地址,然后将主机填写需要当前局域网的IP地址即可.

  6. oracle 数据库起监听,oracle启动数据库监听

    linux/Aix启动.关闭Oracle及监听 Aix环境下管理Oracle 1.Aix下查看.启动数据库监听器 以oracle用户执行以下命令: ① lsnrctl status //-查看监听器状 ...

  7. oracle什么时候使用静态监听,Oracle监听之动态监听与静态监听特点

    动态注册不需要显示的配置listener.ora文件,实例启动的时候,PMON进程根据instance_name,service_name参数将实例和服务动态注册 1.如何查询某服务是静态监听注册还是 ...

  8. oracle实例注册监听,Oracle 19C 监听无法动态注册实例

    Oracle 19C 监听无法动态注册实例 环境: DB:Oracle 19.3.0.0.0 OS:Red Hat Enterprise Linux Server release 7.5 (Maipo ...

  9. vue 监听表格里的数据变化_vue中监听数据变化 watch

    今天做项目的时候,子组件中数据(原本固定的数据)需要父组件动态传入,如果一开始初始化用到的数据.但当时还没有获取到,初始化结束就不会更新数据了.只有监听这两个属性,再重新执行初始化. 1.watch是 ...

最新文章

  1. 如何在基于Bytom开发过程中集成IPFS
  2. JDK、Spring、Dubbo SPI 原理介绍
  3. 用Ant编译Flex项目的几点注意事项
  4. Google Chrome浏览器可能在您不知情的情况下破坏了您的测试
  5. Linux 系统应用编程——进程间通信(上)
  6. OJ1008: 美元和人民币
  7. java 文件url地址_简单的解析文件,取URL地址,并根据地址抓下页面
  8. Linux下coredump调试3:补录
  9. firefox启动很慢 linux_Win10安装和使用Linux子系统(WSL 2)完整指南
  10. 赛锐信息:在云中交付SAP解决方案
  11. python 元组和列表区别_Python基础教程,第三讲,列表和元组
  12. VB6源代码收藏页面
  13. 王招治计算机财务管理,计算机财务管理——以Excel为分析工具
  14. 推荐好用的ssh远程连接 linux的工具
  15. 在windows server 2008 R2上安装SVN的时候,提示安装kb2999226
  16. linux 模拟手机浏览器,Firefox模拟手机浏览器(iOS+Android) – UserAgent Switcher使用方法...
  17. python m4a转mp3_python脚本实现音频m4a格式转成MP3格式
  18. Oracle ILM相关(Information lifecycle management)
  19. 深入浅出matplotlib(101):研究最有名的滤波函数:sinc函数
  20. 嵌入式设备时间同步管理

热门文章

  1. 小胖游海南(Mar,2010)-三亚,博鳌,海口
  2. iOS开发-AppDelegate
  3. 软件测试周刊(第18期):一个精确的测量胜过一千个专家的意见
  4. HTML中属性manifest格式,manifest是啥 MANIFEST文件是什么文件?
  5. Dhtml,html,xhtml的区别
  6. 微信团队原创分享:Android版微信后台保活实战分享(进程保活篇)
  7. 计算机应用pns,基于PNS系统的计算机实验室管理应用研究——以重庆文理学院为例...
  8. 天河计算机学院,23名90后加入国防科大“天河”超级计算机团队
  9. 在虚拟机上搭建云平台环境(7)云平台组件服务安装
  10. Witt向量简介 §3.2:Witt向量的环结构概述