mongodb dsl_具有Java DSL的Spring Integration MongoDB适配器
mongodb dsl
1引言
这篇文章解释了如何使用Spring Integration从MongoDB数据库中保存和检索实体。 为了完成此任务,我们将使用Java DSL配置扩展来配置入站和出站MongoDB通道适配器。 例如,我们将构建一个应用程序,使您可以将订单写入MongoDB存储,然后检索它们进行处理。
应用程序流程可以分为两部分:
- 新订单将发送到消息传递系统,在该系统中它们将被转换为实际产品,然后存储到MongoDB。
- 另一方面,另一个组件正在连续轮询数据库并处理它找到的任何新产品。
可以在我的Spring Integration存储库中找到源代码。
2 MessagingGateway –进入消息传递系统
我们的应用程序对消息传递系统一无所知。 实际上,它只会创建新订单并将其发送到接口(OrderService):
@SpringBootApplication
@EnableIntegration
public class MongodbBasicApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(MongodbBasicApplication.class, args);new MongodbBasicApplication().start(context);}public void start(ConfigurableApplicationContext context) {resetDatabase(context);Order order1 = new Order("1", true);Order order2 = new Order("2", false);Order order3 = new Order("3", true);InfrastructureConfiguration.OrderService orderService = context.getBean(InfrastructureConfiguration.OrderService.class);orderService.order(order1);orderService.order(order2);orderService.order(order3);}private void resetDatabase(ConfigurableApplicationContext context) {ProductRepository productRepository = context.getBean(ProductRepository.class);productRepository.deleteAll();}
}
首先看一下配置,我们可以看到OrderService实际上是一个消息传递网关。
@Configuration
@ComponentScan("xpadro.spring.integration.endpoint")
@IntegrationComponentScan("xpadro.spring.integration.mongodb")
public class InfrastructureConfiguration {@MessagingGatewaypublic interface OrderService {@Gateway(requestChannel = "sendOrder.input")void order(Order order);}...
}
发送到order方法的任何订单都将通过“ sendOrder.input”直接通道作为Message <Order>引入消息系统。
3第一部分-处理订单
Spring Integration消息流的第一部分由以下组件组成:
我们使用lambda创建一个IntegrationFlow定义,该定义将DirectChannel注册为其输入通道。 输入通道的名称解析为'beanName + .input'。 因此,该名称就是我们在网关中指定的名称:“ sendOrder.input”
@Bean
@Autowired
public IntegrationFlow sendOrder(MongoDbFactory mongo) {return f -> f.transform(Transformers.converter(orderToProductConverter())).handle(mongoOutboundAdapter(mongo));
}
流程在收到新订单时要做的第一件事是使用变压器将订单转换为产品。 要注册一个变压器,我们可以使用DSL API提供的Transformers工厂。 在这里,我们有不同的可能性。 我选择的是使用PayloadTypeConvertingTransformer ,它将有效负载转换为对象的委托给转换器。
public class OrderToProductConverter implements Converter<Order, Product> {@Overridepublic Product convert(Order order) {return new Product(order.getId(), order.isPremium());}
}
订单流程的下一步是将新创建的产品存储到数据库中。 在这里,我们使用MongoDB出站适配器:
@Bean
@Autowired
public MessageHandler mongoOutboundAdapter(MongoDbFactory mongo) {MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongo);mongoHandler.setCollectionNameExpression(new LiteralExpression("product"));return mongoHandler;
}
如果您想知道消息处理程序在内部实际上在做什么,它将使用mongoTemplate保存该实体:
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);Object payload = message.getPayload();this.mongoTemplate.save(payload, collectionName);
}
4第二部分–加工产品
在第二部分中,我们还有另一个用于处理产品的集成流程:
为了检索以前创建的产品,我们定义了一个入站通道适配器,它将继续轮询MongoDB数据库:
@Bean
@Autowired
public IntegrationFlow processProduct(MongoDbFactory mongo) {return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(Pollers.fixedDelay(3, TimeUnit.SECONDS))).route(Product::isPremium, this::routeProducts).handle(mongoOutboundAdapter(mongo)).get();
}
MongoDB入站通道适配器是负责从数据库轮询产品的适配器。 我们在构造函数中指定查询。 在这种情况下,我们每次都会轮询一种未加工的产品:
@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'processed' : false}"));messageSource.setExpectSingleResult(true);messageSource.setEntityClass(Product.class);messageSource.setCollectionNameExpression(new LiteralExpression("product"));return messageSource;
}
路由器定义显示了如何根据“溢价”字段将产品发送到其他服务激活器方法:
private RouterSpec<Boolean, MethodInvokingRouter> routeProducts(RouterSpec<Boolean, MethodInvokingRouter> mapping) {return mapping.subFlowMapping(true, sf -> sf.handle(productProcessor(), "fastProcess")).subFlowMapping(false, sf -> sf.handle(productProcessor(), "process"));
}
作为服务激活器,我们有一个简单的Bean,它记录一条消息并将产品设置为已处理。 然后,它将返回产品,以便流程中的下一个端点可以处理它。
public class ProductProcessor {public Product process(Product product) {return doProcess(product, String.format("Processing product %s", product.getId()));}public Product fastProcess(Product product) {return doProcess(product, String.format("Fast processing product %s", product.getId()));}private Product doProcess(Product product, String message) {System.out.println(message);product.setProcessed(true);return product;}
}
将产品设置为已处理的原因是因为下一步是更新其在数据库中的状态,以便不再对其进行轮询。 我们通过将流再次重定向到mongoDb出站通道适配器来保存它。
5结论
您已经了解了必须使用哪些端点才能使用Spring Integration与MongoDB数据库进行交互。 出站通道适配器将产品被动地保存到数据库,而入站通道适配器则主动轮询数据库以检索新产品。
如果您发现此帖子有用,请分享或给我的存储库加注星标。 我很感激 :)
我正在Google Plus和Twitter上发布我的新帖子。 如果您要更新新内容,请关注我。
翻译自: https://www.javacodegeeks.com/2016/11/spring-integration-mongodb-adapters-java-dsl.html
mongodb dsl
mongodb dsl_具有Java DSL的Spring Integration MongoDB适配器相关推荐
- 带有Java DSL的Spring Integration MongoDB适配器
1引言 这篇文章解释了如何使用Spring Integration从MongoDB数据库中保存和检索实体. 为了实现这一点,我们将使用Java DSL配置扩展来配置入站和出站MongoDB通道适配器. ...
- mongo java 注解,在Java中使用Spring Data MongoDB操作Mong | zifangsky的个人博客
前言:在上一篇文章中(PS:https://www.zifangsky.cn/923.html)我简单介绍了如何在Linux中安装MongoDB以及MongoDB的增删改查等基本命令用法(PS:更多M ...
- Spring Integration Java DSL示例
现在已经为Spring Integration引入了新的基于Java的DSL ,这使得可以使用基于纯Java的配置而不是基于Spring XML的配置来定义Spring Integration消息流. ...
- Spring Data MongoDB示例
Spring Data MongoDB示例 欢迎使用Spring Data MongoDB示例.Spring Data MongoDB是将Spring Framework与最广泛使用的NoSQL数据库 ...
- Spring Integration入门
为什么使用Spring IntegrationSpring Integration是Spring框架创建的又一个API,面向企业应用集成(EAI).说到集成,并不缺"解决办法":硬 ...
- Spring data MongoDB 系列之一连接数据库并 插入数据
前言 最近开发新项目中用到MongoDB,为方便以后使用便记录下用法. 本文的数据库操作是以model对象进行展开,但是MongoDB是接受以Json,Map等为对象的操作的. (本人的项目中是以Js ...
- spring.data.mongodb.uri认证失败Authentication failed
使用spring.data.mongodb.uri=mongodb://root:ve#duj7-wa06yhg$@192.154.81.16:27017/xxxx老是出现Authentication ...
- spring集成mq_使用Spring Integration Java DSL与Rabbit MQ集成
spring集成mq 我最近参加了在拉斯维加斯举行的2016年Spring大会 ,很幸运地看到了我在软件世界中长期敬佩的一些人. 我亲自遇到了其中的两个人,他们实际上合并了几年前我与Spring In ...
- 使用Spring Integration Java DSL与Rabbit MQ集成
我最近参加了在拉斯维加斯举行的2016年Spring会议 ,很幸运地看到了我在软件世界中长期敬佩的一些人. 我亲自遇到了其中的两个人,他们实际上合并了几年前我与Spring Integration相关 ...
最新文章
- Science:豆科植物如何建造“固氮工厂”?Murray组在根瘤共生机制取得重要进展...
- python【Matlibplot绘图库】Animation动画(真の能看懂~!)
- 频域采样与恢复matlab实验,连续信号的采样与重构实验报告
- 图的割点、桥与双连通分支
- linux7.0怎么设置中文,CentOS 7 yum安装zabbix 设置中文界面
- Wine 4.3 发布,Windows 应用的兼容层
- 一道很熟悉的前端面试题,你怎么答?
- vue-cli proxy中跨域中pathRewrite配置理解
- 开发MIS系统需要的技术及其含义、作用
- 递归——黑白棋子的移动(洛谷 P1259)
- 服务器系统格式,服务器系统编码格式
- 【优化算法】多目标蜻蜓优化算法(MODA)【含Matlab源码 1350期】
- 查看服务器远程桌面端口命令,Windows系统服务器远程桌面端口查看和修改方法...
- Ubuntu20.04下opencv的安装
- matlab程序特殊符号,MATLAB——matlab特殊符号表【转载】
- python使用给定字符密码_使用python生成一个指定长度的字符串(随机密码),要求包括数字、字母、特殊符号(string库解析)...
- 【bzoj1050】 旅行comf
- 电阻的耐功率冲击与耐电压冲击
- 计算机常用屏幕分辨率,pc端常用电脑屏幕 ((响应式PC端媒体查询)电脑屏幕分辨率尺寸大全)...
- 风格迁移1-02:Liquid Warping GAN(Impersonator)-源码模型测试-报错解决