扯在前面

Oracle通过AQ (Oracle Streams Advanced Queuing)来提供“进程间” (或者跨会话 -- inter-session) 通信的功能。关于inter-session communication, 貌似DBMS_PIPE也可以做到,这个打算写另外一片水文来介绍,在此不表。 AQ 其实类似于一个message queue, 至于为什么叫Advanced queue,就不清楚了:) message queue典型的应用场景是“生产者-消费者” 模式 (or 发布/订阅 (publisher/subscriber))。

AQ基础在于有个实实在在的"queue table", 因为数据表中的数据是持久化的,共享的,可以被多个session同时访问,因此很容易就实现了多个session信息传递的目的。有了queue table是不够的,自然还是需要有个queue, 然后可以通过这个介质publisher可以把 信息发布到queue table中,subscriber然后可以通过queue来访问queue table来取得数据。在这个过程中,最重要的还是subscriber 怎么知道何时来queue table里面取message, 当然subscriber可以一直不停地查询queue table 或者每隔多长时间来查询一次,这显然都不是很好,最好还是有种方式来提醒subscriber。 AQ支持所谓的callback方式来让subscriber及时获得queue table中的数据。

AQ相关接口

Oracle Advanced Queue 主要提供了DBMS_AQ 和 DBMS_AQADM两个package程序接口。有必要先了解这两个package都提供了什么东西...

相关类型定义

主要包括如下一些:
- ENQUEUE_OPTIONS_T

- DEQUEUE_OPTIONS_T

- MESSAGE_PROPERTIES_T

- AQ$_REG_INFO

- AQ$_DESCRIPTOR

DBMS_AQ

这个包主要定义了enqueue/dequeue等过程。  如下所示 (Note that this is far from a complete list of interfaces)

- ENQUEUE procedure

- ENQUEUE_ARRAY function

- DEQUEUE procedure

- DEQUEUE_ARRAY function

- REGISTER Procedure (Register for message notifications)

- UNREGISTER procedure (Unregister a subscription which turns off notification)

- LISTEN procedures  ( Listen to one or more queues on behalf of a list of agents)

- POST procedures (Posts to an anonymous subscription which allows all clients who are registered for the subscription to get notifications)

AQ 比较有用的应该是它提供的 callback procedure来支持异步调用的功能。不过有个限制就是自定义的callback procedure必须满足一定的接口规范,如下所示:

如果message的类型是RAW, 则接口如下

procedure plsqlcallback(
context IN RAW,
reginfo IN SYS.AQ$_REG_INFO,
descr IN SYS.AQ$_DESCRIPTOR,
payload IN RAW,
payloadl IN NUMBER);

如果message的类型的自定义的object类型,则接口如下:

procedure plsqlcallback(
context IN RAW,
reginfo IN SYS.AQ$_REG_INFO,
descr IN SYS.AQ$_DESCRIPTOR,
payload IN VARCHAR2,
payloadl IN NUMBER);

DBMS_AQADM

顾名思义,这个包提供了用来管理AQ的接口。 主要包括以下一些接口,

- CREATE_QUEUE_TABLE procedure

- CREATE_QUEUE procedure

- DROP_QUEUE procedure

- DROP_QUEUE_TABLE procedure

- PURGE_QUEUE_TABLE procedure

- START_QUEUE procedure

- STOP_QUEUE procedure

- ADD_SUBSCRIBER procedure

- REMOVE_SUBSCRIBER procedure

AQ in Action

1. Create a message type ( a.k.a. payload type)

SQL> create type test_msg_type as
2 object (message varchar2(4000));
3 /

Type created.

2. Create a queue table based on the payload type just created.

SQL> begin
2 dbms_aqadm.create_queue_table
3 ( queue_table => 'test_queue_table',
4 queue_payload_type => 'test_msg_type');
5 end;
6 /

PL/SQL procedure successfully completed.

3. Create a queue and start the queue

SQL> begin
2 dbms_aqadm.create_queue
3 ( queue_name => 'test_queue',
4 queue_table => 'test_queue_table');
5
6 dbms_aqadm.start_queue
7 ( queue_name => 'test_queue');
8
9 end;
10 /

PL/SQL procedure successfully completed.

Now let's see what Oracle has created behind the scene so far.

SQL> select object_name, object_type from user_objects;

OBJECT_NAME OBJECT_TYPE
---------------------------------------- --------------------
TEST_QUEUE_TABLE TABLE
TEST_MSG_TYPE TYPE
SYS_C0054669 INDEX
SYS_LOB0000262448C00030$$ LOB
AQ$_TEST_QUEUE_TABLE_T INDEX
AQ$_TEST_QUEUE_TABLE_I INDEX
AQ$_TEST_QUEUE_TABLE_E QUEUE
AQ$_TEST_QUEUE_TABLE_F VIEW
AQ$TEST_QUEUE_TABLE VIEW
TEST_QUEUE QUEUE

10 rows selected.

Note there is another queue -- AQ$_TEST_QUEUE_TABLE_E  created for us. Just as the suffix "E" implies, this queue will be used to store the message if the AQ cannot retrieve a message from our user-queue.

Now let's see what is inside our queue table. Obviously, nothing!

SQL> select * from test_queue_table;

no rows selected

SQL>

4. Enqueue messages


1 declare
2 v_enqueue_options dbms_aq.enqueue_options_t;
3 v_message_properties dbms_aq.message_properties_t;
4 v_message_handle raw(16);
5 v_payload test_msg_type;
6 begin
7 v_payload := test_msg_type('Hello There');
8 dbms_aq.enqueue
9 ( queue_name => 'test_queue',
10 enqueue_options => v_enqueue_options,
11 message_properties => v_message_properties,
12 payload => v_payload,
13 msgid => v_message_handle);
14 commit;
15* end;
16 /

PL/SQL procedure successfully completed.

SQL>

Note the enqueue action is essentially a transaction (insert into the queue table), hence we needed to commit it to let other sessions can see the data in the queue table.

Now let's see what's inside the queue table.

SQL> select count(*) from test_queue_table;

COUNT(*)
----------
1

SQL> select count(*) from aq$test_queue_table;

COUNT(*)
----------
1

SQL> select user_data from aq$test_queue_table;

USER_DATA(MESSAGE)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TEST_MSG_TYPE('Hello There')

SQL>

5. Browsing messages

DBMS_AQ.DEQUEUE can be used to either dequeue message from the queue (remove the message from the queue table) which is the default behavior, or browse the message from the queue ( will not remove the message from the queue).

To browse the messages, we can set the dequeue message mode to be DBMS_AQ.BROWSE. See below an example,

SQL> declare
2 v_dequeue_options dbms_aq.dequeue_options_t;
3 v_message_properties dbms_aq.message_properties_t;
4 v_message_handle raw(16);
5 v_payload test_msg_type;
6 begin
7
8 v_dequeue_options.dequeue_mode := dbms_aq.browse;
9
10 dbms_aq.dequeue(
11 queue_name => 'test_queue',
12 dequeue_options => v_dequeue_options,
13 message_properties => v_message_properties,
14 payload => v_payload,
15 msgid => v_message_handle);
16
17 dbms_output.put_line('Browsed message: ' || v_payload.message);
18
19 end;
20 /

PL/SQL procedure successfully completed.

SQL> set serveroutput on
SQL> /
Browsed message: Hello There

PL/SQL procedure successfully completed.

SQL>

We can verify the message is still out there in the queue (table) by querying the view aq$test_queue_table.

SQL> select user_data from aq$test_queue_table;

USER_DATA(MESSAGE)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TEST_MSG_TYPE('Hello There')

SQL>

6. Dequeue messages

Now let's do the real dequeuing operation. Note this doesn't have to be from the same session since enqueues are committed transactions and AQ is table-based. Similarly, dequeue is also a transaction. If we are happy with the message, we must commit the dequeue as well.


1 declare
2 v_dequeue_options dbms_aq.dequeue_options_t;
3 v_message_properties dbms_aq.message_properties_t;
4 v_message_handle raw(16);
5 v_payload test_msg_type;
6 begin
7 dbms_aq.dequeue(
8 queue_name => 'test_queue',
9 dequeue_options => v_dequeue_options,
10 message_properties => v_message_properties,
11 payload => v_payload,
12 msgid => v_message_handle);
13 dbms_output.put_line('Dequeue message: ' || v_payload.message);
14 commit;
15* end;
16 /
Dequeue message: Hello There

PL/SQL procedure successfully completed.

SQL>

We can confirm the message is gone by querying the queue table...

SQL> select count(*) from test_queue_table;

COUNT(*)
----------
0

7. Clean Up

Before going on to the topic of notification, let's do the clean up first. 
SQL> begin
2 dbms_aqadm.stop_queue('test_queue');
3 dbms_aqadm.drop_queue('test_queue');
4 dbms_aqadm.drop_queue_table('test_queue_table');
5 end;
6 /

PL/SQL procedure successfully completed.

SQL>

SQL> select object_name, object_type from user_objects;

OBJECT_NAME OBJECT_TYPE
---------------------------------------- --------------------
TEST_MSG_TYPE TYPE

SQL>

8. Notification

The examples above shows how to dequeue the messages manually. This is not pleasant in the real world. Most of the time, we'd like there would be some mechanism to notify dequeuing instead of dequeuing initiatively.

First create queue table for multiple consumers..


1 begin
2 dbms_aqadm.create_queue_table
3 ( queue_table => 'test_queue_table',
4 queue_payload_type => 'test_msg_type',
5 multiple_consumers => true);
6* end;
SQL> /

PL/SQL procedure successfully completed.

SQL>

Then create the queue and start it as usual,

SQL> begin
2 dbms_aqadm.create_queue
3 ( queue_name => 'test_queue',
4 queue_table => 'test_queue_table');
5
6 dbms_aqadm.start_queue
7 ( queue_name => 'test_queue');
8 end;
9 /

PL/SQL procedure successfully completed.

SQL>

To demonstrate the asynchronous nature of notification via callback, we are going to store the queued message in a normal application table.

SQL> create table test_message_table
2 ( message varchar2(4000));

Table created.

Now the key point comes, we create a callback plsql procedure. This procedure will dequeue the message and save it in the table test_message_table when there is notification. Remember the callback procedure interface signature must be as follows,

1 create or replace procedure test_queue_callback_procedure
2 ( context raw,
3 reginfo sys.aq$_reg_info,
4 descr sys.aq$_descriptor,
5 payload raw,
6 payloadl number)
7 AS
8 v_dequeue_options dbms_aq.dequeue_options_t;
9 v_message_properties dbms_aq.message_properties_t;
10 v_message_handle raw(16);
11 v_payload test_msg_type;
12 begin
13 v_dequeue_options.msgid := descr.msg_id;
14 v_dequeue_options.consumer_name := descr.consumer_name;
15 dbms_aq.dequeue
16 ( queue_name => descr.queue_name,
17 dequeue_options => v_dequeue_options,
18 message_properties => v_message_properties,
19 payload => v_payload,
20 msgid => v_message_handle);
21 insert into test_message_table(message)
22 values('Message [' || v_payload.message || ']' ||
23 ' dequeued at [' || to_char(systimestamp, 'yyyy-mm-dd hh24:mi:ss.FF3') || ']');
24 commit;
25* end;
SQL> /

Procedure created.

We need to add a named subscriber to the queue and register the action that the subscriber will take on notification.

1 begin
2 dbms_aqadm.add_subscriber
3 ( queue_name => 'test_queue',
4 subscriber => sys.aq$_agent
5 ('test_queue_subscriber',
6 null,
7 null)
8 );
9 dbms_aq.register
10 (
11 sys.aq$_reg_info_list
12 ( sys.aq$_reg_info
13 ( 'test_queue:test_queue_suscriber',
14 dbms_aq.namespace_aq,
15 'plsql://test_queue_callback_procedure',
16 HEXTORAW('FF')
17 )
18 ),
19 1
20 );
21* end;
22 /

PL/SQL procedure successfully completed.

SQL>

Refer to AQ$_REG_INFO Type for the detailed definition.

Now let's see what will happen when we enqueue a message...

1 declare
2 v_enqueue_options dbms_aq.enqueue_options_t;
3 v_message_properties dbms_aq.message_properties_t;
4 v_message_handle raw(16);
5 v_payload test_msg_type;
6 begin
7 v_payload := test_msg_type(
8 to_char(systimestamp,
9 'yyyy-mm-dd hh24:mi:ss.ff3'));
10 dbms_aq.enqueue
11 (
12 queue_name => 'test_queue',
13 enqueue_options => v_enqueue_options,
14 message_properties => v_message_properties,
15 payload => v_payload,
16 msgid => v_message_handle);
17 commit;
18* end;
SQL> /

PL/SQL procedure successfully completed.

SQL>

To see if the message was automatically dequeued, let's check out the table test_message_table,

SQL> select * from test_message_table;

MESSAGE
----------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------- ----------------------
Message [2010-11-06 16:07:47.537] dequeued at [2010-11-06 16:07:51.599]

SQL> select count(*) from test_queue_table;

COUNT(*)
----------
0

SQL>

Clean up: remove the subscriber as follows,

1 begin
2 DBMS_AQADM.REMOVE_SUBSCRIBER
3 ( queue_name => 'test_queue',
4 subscriber => sys.aq$_agent
5 ('test_queue_subscriber', null, null)
6 );
7* end;
SQL> /

PL/SQL procedure successfully completed.

Acknowledgements

本文的例子来自Adrian Billington的 introduction to advanced queuing

Oracle Advanced Queue (DBMS_AQ/DMBS_AQADM)相关推荐

  1. Oracle Advanced Queuing 触发器入列和异步通知消息出列

    1. 官方文档: https://docs.oracle.com/database/121/ADQUE/aq_opers.htm 2. 授权用户操作dbms_aq/dbms_aqadm 的权限 gra ...

  2. Oracle Advanced Security:Column Encryption Overhead

    在Oracle 10g中出现了column encryption列加密特性,通过对列上的数据加密实现数据安全性的目的.当然实现这一加密特性是有代价的,一方面会导致所加密列数据每行所占磁盘空间字节数增长 ...

  3. oracle 删除 queue,C++ stl队列Queue用法介绍:删除,插入等操作代码举例

    c++队列queue模板类的定义在头文件中,queue 模板类需要两个模板参数,一个是元素类型,一个容器类型,元素类型是必要的,容器类型是可选的,默认为deque 类型. C++队列Queue是一种容 ...

  4. oracle翻译Advanced,Oracle高级复制,Oracle advanced replication,音标,读音,翻译,英文例句,英语词典...

    大肠杆菌等原核生物的环状染色体dna复制时,首先在dna的复制起点上解螺旋.dnab蛋白结合在复制起点处两个解旋了的单链上,分别形成两个前导链(leading strand)的引物(primer),当 ...

  5. excel字母数字排序_Excel数字不能正确排序或添加

    excel字母数字排序 Last week, I heard from someone who was having a problem sorting some numbers in Excel. ...

  6. Oracle Data Guard 理论知识

    RAC, Data Gurad, Stream 是Oracle 高可用性体系中的三种工具,每个工具即可以独立应用,也可以相互配合. 他们各自的侧重点不同,适用场景也不同. RAC 它的强项在于解决单点 ...

  7. oracle流复制实现,Oracle流复制技术的基本概念、工作流程及其容灾备份

    龙源期刊网 http://www.doczj.com/doc/9e45114b3186bceb18e8bb1b.html Oracle流复制技术的基本概念.工作流程及其容灾备份 作者:周军 来源:&l ...

  8. oracle stream 主键,oracle stream配置向导

    1. Stream 的工作原理 Stream 是Oracle Advanced Queue技术的一种扩展应用,这种技术最基本的原理就是收集事件,把时间保存在队列中,然后把这些事件发布给不同的订阅者. ...

  9. Oracle Stream Replication技术

    Stream 是Oracle 的消息队列(也叫Oracle Advanced Queue)技术的一种扩展应用. Oracle 的消息队列是通过发布/订阅的方式来解决事件管理.流复制(Stream re ...

最新文章

  1. linux tcp窗口大小设置,高性能Linux:TCP/IP内核参数调优之TCP窗口扩大因子(TCP Window Scaling)选项(理论篇)...
  2. 随笔 —— 当下不晚
  3. SwiftUI之深入解析如何实现3D Scroll效果
  4. 算法图解学习笔记01:二分查找大O表示法
  5. 《父亲家书》选:母亲的手摔伤了
  6. c++变量的作用域、生存期和可见性
  7. python2字符串编码方式_一、基础部分-2.字符串编码
  8. 通过回调函数阻止进程创建(验证结束,方案完全可行)
  9. 手势在c语言的作用,手势态度在人际交往中的重要性
  10. python中的zip是什么意思_python中zip是什么函数
  11. iterator [ɪtə'reɪtə] 遍历器
  12. 使用System Center Essentials 2007进行软件部署
  13. JavaScript如何获取css属性
  14. 【Linux】15 张 Vim 速查表奉上,帮你提高 N 倍效率!
  15. 高通9008工具 qpst 安装时报错 qpst server returned unexpected error attempting 解决办法
  16. 周期信号波形识别及参数测量装置(J 题) 【高职高专组】--2021 年全国大学生电子设计竞赛
  17. EcShop二次开发学习方法和Ecshop二次开发必备基础
  18. SQL中的Round函数
  19. hp390计算机硬盘模式设置,Bios设置中三种硬盘模式详解
  20. 改革IMF首先要增加中国发言权

热门文章

  1. 【面经】[华傲数据C++/Python系统开发实习]数据分析方向
  2. 芦溪中学2021高考成绩查询,芦溪中学2020年高考喜报!
  3. ​租房APP开发多功能服务来满足人们的需求,提高人们租房体验​
  4. boost::intrusive_ptr用法
  5. pycharm获取yelp相关数据(一)
  6. ucloud对象存储装宝塔_UCloud云服务器建站教程3 – UCloud安装宝塔面板/LNMP一键包WEB环境 | 老左笔记...
  7. 华为汽车再创奇迹,月订单可望破3万,将成特斯拉有力挑战者
  8. 最优质的空投糖果——平台币
  9. 前端基础第四天项目 社交媒体黑马头条项目-登录注册和个人中心
  10. BZOJ 1109: [POI2007]堆积木Klo 神分析, LIS, BIT, 二分