通过Java 8流使用Oracle AQ
Oracle数据库最令人敬畏的功能之一是Oracle AQ:Oracle数据库高级队列 。 AQ API直接在数据库中实现了完整的事务性消息传递系统。
在数据库位于系统中心的经典体系结构中,使用AQ进行进程间通信时,多个应用程序(其中一些应用程序用Java编写,其他应用程序用Perl或PL / SQL编写)访问同一数据库。太好了 如果您更喜欢Java EE,则可以购买基于Java的MQ解决方案,并将该消息总线/中间件放在系统体系结构的中心。 但是,为什么不使用数据库呢?
如何在jOOQ中使用PL / SQL AQ API
用于AQ消息入队和出队的PL / SQL API非常简单,可以使用jOOQ的OracleDSL.DBMS_AQ
API从Java轻松访问它。
此处使用的队列配置如下所示:
CREATE OR REPLACE TYPE message_t AS OBJECT (ID NUMBER(7),title VARCHAR2(100 CHAR)
)
/BEGINDBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'message_aq_t',queue_payload_type => 'message_t');DBMS_AQADM.CREATE_QUEUE(queue_name => 'message_q',queue_table => 'message_aq_t');DBMS_AQADM.START_QUEUE(queue_name => 'message_q');COMMIT;
END;
/
并且jOOQ代码生成器将生成有用的类,并将所有类型信息直接与它们相关联(简化示例):
class Queues {static final Queue<MessageTRecord> MESSAGE_Q = new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T);
}class MessageTRecord {void setId(Integer id) { ... }Integer getId() { ... }void setTitle(String title) { ... }String getTitle() { ... }MessageTRecord(Integer id, String title) { ... }
}
然后,可以使用这些类直接在生成的队列引用上安全地使消息类型入队和出队:
// The jOOQ configuration
Configuration c = ...// Enqueue a message
DBMS_AQ.enqueue(c, MESSAGE_Q, new MessageTRecord(1, "test"));// Dequeue it again
MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);
很简单,不是吗?
现在,让我们利用Java 8功能
消息队列就是无限(阻塞)消息流。 从Java 8开始,我们为此类消息流提供了强大的API,即Stream API。
这就是为什么我们为即将到来的jOOQ 3.8添加了新的API,将现有的jOOQ AQ API与Java 8 Streams相结合的原因:
// The jOOQ configuration
Configuration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q).filter(m -> "test".equals(m.getTitle())).forEach(System.out::println);
上面的流管道将侦听MESSAGE_Q
队列,使用所有消息,过滤掉不包含"test"
的消息,并打印其余消息。
阻止流
有趣的是,这是一个无限的阻塞流。 只要队列中没有新消息,流管道处理就会简单地在队列上阻塞,等待新消息。 这对于顺序流来说不是问题,但是在调用Stream.parallel()
时会发生什么呢?
jOOQ将消耗事务中的每个消息。 jOOQ 3.8事务在ForkJoinPool.ManagedBlocker
运行:
static <T> Supplier<T> blocking(Supplier<T> supplier) {return new Supplier<T>() {volatile T result;@Overridepublic T get() {try {ForkJoinPool.managedBlock(new ManagedBlocker() {@Overridepublic boolean block() {result = supplier.get();return true;}@Overridepublic boolean isReleasable() {return result != null;}});}catch (InterruptedException e) {throw new RuntimeException(e);}return asyncResult;}};
}
这不是很多魔术。 当ManagedBlocker
由ForkJoinWorkerThread
运行时,它会运行一些特殊的代码,以确保线程的ForkJoinPool
不会由于线程耗尽而死锁。 有关更多信息,请在此处阅读此有趣的文章:http: //zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health
或以下堆栈溢出答案: http : //stackoverflow.com/a/35272153/521799
因此,如果您想要超快速的并行AQ出队过程,请运行:
// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q).parallel().filter(m -> "test".equals(m.getTitle())).forEach(System.out::println);
而且您将拥有几个线程,这些线程将使消息并行出队。
不想等待jOOQ 3.8?
没问题。 使用当前版本并将dequeue
操作包装在您自己的Stream
:
Stream<MessageTRecord> stream = Stream.generate(() ->DSL.using(config).transactionResult(c ->dequeue(c, MESSAGE_Q))
);
做完了
奖励:异步出队
在我们讨论时,排队系统的另一个很好的功能是它们的异步性。 在Java 8中, CompletionStage
是一个非常有用的用于建模(和组合)异步算法的类型,它是默认实现CompletableFuture
,它再次在ForkJoinPool
执行任务。
使用jOOQ 3.8,您可以再次简单地调用
// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...CompletionStage<MessageTRecord> stage =
DBMS_AQ.dequeueAsync(c, MESSAGE_Q).thenCompose(m -> ...)...;
敬请期待jOOQ博客上的另一篇文章,我们将研究更复杂的异步用例,并使用jOOQ 3.8和Java 8阻止SQL语句。
翻译自: https://www.javacodegeeks.com/2016/02/using-oracle-aq-via-java-8-streams.html
通过Java 8流使用Oracle AQ相关推荐
- oracle aq_通过Java 8流使用Oracle AQ
oracle aq Oracle数据库最令人敬畏的功能之一是Oracle AQ:Oracle数据库高级队列 . AQ API直接在数据库中实现了完整的事务性消息传递系统. 在数据库处于系统中心的经典体 ...
- 通过JMS监听Oracle AQ,在数据库变化时触发执行Java程序
环境说明 一Oracle高级消息队列AQ 创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列 创建连接参 ...
- java监听oracle aq,JMS监听Oracle AQ
该文档中,oracle版本为11g,jdk版本1.8,java项目使用maven构建,并使用了定时任务来做AQ监听的重连功能,解决由于外部原因导致连接断裂之后,需要手动重启项目才能恢复连接的问题 一. ...
- java监听oracle aq,透过JMS监听Oracle AQ,在数据库变化时触发执行Java程序
环境说明 一Oracle高级消息队列AQ创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列创建连接参数类 ...
- 通过JMS监听Oracle AQ,在数据苦表变化时触发并执行Java程序
环境说明 一Oracle高级消息队列AQ 创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列 创建连接参 ...
- java文件流插入数据库_使用Java流查询数据库
java文件流插入数据库 在本文中,您将学习如何编写纯Java应用程序,这些应用程序能够处理现有数据库中的数据,而无需编写一行SQL(或类似的语言,例如HQL),而无需花费数小时将所有内容放在一起. ...
- clob类型用java怎么存,Java 储存和读取 oracle CLOB 类型字段的实用方法
当前位置:我的异常网» 编程 » Java 储存和读取 oracle CLOB 类型字段的实用方法 Java 储存和读取 oracle CLOB 类型字段的实用方法 www.myexceptions. ...
- Java 8流中的数据库CRUD操作
在开始使用新工具时要克服的最大障碍是让您着手处理小事情. 到目前为止,您可能对新的Java 8 Stream API的工作方式充满信心,但是您可能尚未将其用于数据库查询. 为了帮助您开始使用Strea ...
- java 8流在另一个流_Java 8流图
java 8流在另一个流 Java 8 Stream map function can be used to perform some operation on all of it's element ...
最新文章
- Linux shell 学习笔记(7)— 构建基本脚本(变量、重定向、管道、状态码)
- 肝了三天,万字长文教你玩转 tcpdump,从此抓包不用愁
- android的 selector 背景选择器和 shape 详解(转)
- 计算机网络OSI架构详细图
- 我的世界1.13的服务器网站,我的世界1.13纯净版
- (2,1,3)卷积码与一种QC-LDPC码的译码性能对比
- 【Kubernetes】mac 安装minikube
- 网络工程师英语系列2(CISCO IP Telephony)
- 在JS中使用Ajax
- 数据结构上机实践第七周项目2 - 自建算法库——链队(链式队列)
- 突破灰色按钮原理讲解
- 使用Python爬取百度热搜榜
- 快压、360压缩、WinRAR关于打开快压通过超高压缩比压缩后的文件不兼容的问题
- linux文件放在哪个目录,linux中驱动放在哪个目录下
- 财务分析中三张财务报表计算公式
- 经典SFM步骤——Lowe2005
- 请问Bat文件是用什么什么语言写的?
- unity animator动画播放完毕后执行
- 人工智能架构图和产业链构成
- 【java多线程】多线程为什么跑的比单线程还要慢?!
热门文章
- java面试常考系列四
- 单链表基本操作在主函数中的实现
- 捡到东西说给钱才给东西?算不算敲诈勒索……
- javaweb实现分页(二)
- 循环录(输)入 java 课的学生成绩(5个学生),统计分数大于等于 80 分的学生
- ssh(Spring+Spring mvc+hibernate)——applicationContext.xml
- Mybatis+mysql动态分页查询数据案例——配置映射文件(HouseDaoMapper.xml)
- Json、Gson、Jackson
- Hibernate使用最新的MySQL8.+版本出现的问题!
- java design按钮_DesignJava 设计模式,讲述 的各种 方便在项目中进行 框架结构 Develop 238万源代码下载- www.pudn.com...