最近有幸,公司让我研究了spring-integration,对于这个spring出品的功能强大的工具,功能繁多且复杂。写此博客分享一下心得,也为记录一下最近研究这么久的知识点。理解的不够深,如果有错误的地方,希望各位朋友能批评指出。

一、what

首先,什么是spring-integration?研究之初,对这根管道有些迷惑,这是队列?这个activeMQ有啥区别?待研究了一段时间之后,才发现,spring-integration越来越像曾经做过的esb组件。那么spring-integration到底是什么呢?

官网给出的解释是,spring-integration是一个功能强大的EIP(Enterprise Integration Patterns),即企业集成模式。对,spring-integration是一个集大成者。就我自己的理解,集成了众多功能的它,是一种便捷的事件驱动消息框架用来在系统之间做消息传递的。

二、why

那么,我们为什么用它呢?spring-integration的官网上,给出了以下说法

spring-integration的目标

  • 提供一个简单的模型来实现复杂的企业集成解决方案
  • 为基于spring的应用添加异步的、消息驱动的行为
  • 让更多的Spring用户来使用他

看这种解释,我的直观感觉是:啥玩意?不懂啊!接着看到spring-integration的原则

  • 组件之间应该是松散的,模块性的易测的
  • 应用框架应该强迫分离业务逻辑和集成逻辑
  • 扩展节点应该有更好的抽象和可以再使用的能力

感觉,这个应该说的是解耦吧。另外看了下其他人的理解,如果你的系统处在各个系统的中间,需要JMS交互,又需要Database/Redis/MongoDB,还需要监听Tcp/UDP等,还有固定的文件转移,分析。还面对着时不时的更改需求的风险。那么,它再适合不过了。

三、how

那么,重点来了,如何使用呢?在介绍之前,先简单的介绍几个名词。

1.Message

Message是它的基础构件和核心,所有的流程都围绕着Message运转,如图所示

Message,就是所说的消息体,用来承载传输的信息用的。Message分为两部分,header和payload。header是头部信息,用来存储传输的一些特性属性参数。payload是用来装载数据的,他可以携带的任何Object对象,放什么都行,随你 。

2.MessageChannel

消息管道,生产者生产一个消息到channel,消费者从channel消费一个消息,所以channel可以对消息组件解耦,并且提供一个方便的拦截功能和监控功能。

对于MessageChannel,有以下几种

(1).PublishSubscribeChannel

发布订阅式通道形式,多用于消息广播形式,发送给所有已经订阅了的用户。在3.x版本之前,订阅者如果是0,启动会报错或者发送的时候报错。在4.x版本后,订阅者是0,则仍然会返回true。当然,可以配置最小订阅者数量(min-subscribers)

(2).QueueChannel

队列模式通道,最常用的形式。与发布订阅通道不同,此通道实现点对点式的传输方式,管道内部是队列方式,可以设置管道的容量,如果内部的消息已经达到了最大容量,则会阻塞住,直到队列有时间,或者发送的消息被超时处理。

(3).PriorityChannel

优先级队列通道,我的理解为QueueChannel的升级版,可以无视排队,根据设置的优先级直接插队。(壕无人性)

(4).RendezvousChannel

前方施工,禁止通行!这个是一个强行阻塞的通道,当消息进入通道后,通道禁止通行,直到消息在对方通道receive()后,才能继续使用。

(5).DirectChannel

最简单的点对点通道方式,一个简单的单线程通道。是spring-integration的默认通道类型

(6).ExecutorChannel

多线程通道模式,开启多线程执行点对点通道形式。这个通道博主还未研究,不敢多说........

3.Message Endpoint

消息的终点,或者我称他为消息节点,在channel你不能操作消息,只能在endpoint操作。对于常用的消息节点,有以下几种

(1).Transformer

解释者,转换者,翻译者,怎么理解都可以。作用是可以将消息转为你想要的类型。可以将xml形式转换成string类型。

<!-- Filter过滤器 -->
<int:channel id="filterAChannel"/>
<int:filter input-channel="filterAChannel" output-channel="filterBChannel" expression="payload.name.equals('haha')"/>
<int:channel id="filterBChannel"/>
<int:service-activator input-channel="filterBChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>

(2).Filter

过滤器,顾名思义,过滤用的,用来判断一个消息是否应该被传输。用我的理解看,他就是spring-integration里面的if语句。

<!-- transformer转换器 -->
<int:channel id="transformerInChannel"/>
<int:transformer input-channel="transformerInChannel" output-channel="transformerOutChannel"expression="payload.name.toUpperCase() + '- [' + T(java.lang.System).currentTimeMillis() + ']'"/>
<int:channel id="transformerOutChannel"><int:queue/>
</int:channel>
<int:outbound-channel-adapter channel="transformerOutChannel" ref="receiveServiceImpl" method="helloTransformer"><int:poller fixed-delay="0"/>
</int:outbound-channel-adapter>

(3).Router

路由器,用来管理一个消息应该被发送到哪个channel中。相当于JAVA里面的switch case语句吧。判断条件很多,可是使用header里面的参数具体值(比如header里面有个定义为testRouter的参数,数值为A,那么消息经过路由会发送到判断为A的通道内,后面使用中再详细讲解)

    (4).Service Activator 

我称他为服务激活器,是一个连接服务实例到消息系统的通用端点。对于服务激活器,可能是因为我理解的不够全面,我总是将他和通道适配器搞混,因为我自己测试发现,激活器和适配器都可以作为一个消息出通道的节点。

(5).Channel Adapter

通道适配器是将消息通道连接到某个其他系统或传输的端点。通道适配器可以是入站或出站。通常情况下,通道适配器将在消息与从其他系统(文件,HTTP请求,JMS消息等)接收或发送的任何对象或资源之间进行映射。

(6).Channel Bridge

通道桥梁,用来作为管道之间进行通信使用的,常用情景为:在一个输入管道,将管道的内容发送到另外N个管道输出,配置方式如下

    <!-- bridge --><int:channel id="bridgeSendChannel"/><int:bridge input-channel="bridgeSendChannel" output-channel="bridgeReceiveAChannel"/><int:channel id="bridgeReceiveAChannel"/><int:bridge input-channel="bridgeReceiveAChannel" output-channel="bridgeReceiveBChannel"/><int:channel id="bridgeReceiveBChannel"><int:queue/></int:channel><int:outbound-channel-adapter channel="bridgeReceiveBChannel"expression="@receiveServiceImpl.helloBridge(payload.name,payload.age)"><int:poller fixed-delay="0"/></int:outbound-channel-adapter>

另外还有Splitter(分解器),Aggregator(聚合器)等。对于其他的消息节点,博主还没有做过多研究,就不再次误人子弟了。后续会将未研究到的一一补上。

4.Channel Interceptor

管道拦截器,能够以非常优雅,非常温柔的方式捕获管道传递之间的节点。对于拦截器,spring-integration给了我们六种节点

分别是发送前,邮寄后,发送成功后,接收前,接收后,接受成功后。可以分别在不同的节点进行操作。

四、use(demo地址在本文最后)

下面使用到的Test类为

import lombok.Data;/*** 普通测试dto* @author lin*/
@Data
public class Test {private String name;private String age;
}

(1)普通方式

xml配置,这里配置了一个通道helloWorldChannel,配置了个接收激活点,即接收方的地址为helloServiceImpl里面的hello方法。(其中ref指对应接收的类名,method指类里面接收的方法)

    <!-- 测试dto模式传输 --><int:channel id="testChannel"/><int:service-activator input-channel="testChannel" ref="receiveServiceImpl" method="hello"/>

发送方Service里面

    /*** 测试传输dto*/@Overridepublic void testDto() {System.out.println("testDto方法");Test test = new Test();test.setName("testDto");test.setAge("18");testChannel.send(MessageBuilder.withPayload(test).build());}

接收方Service里面

    @Overridepublic void hello(Test test) {System.out.println(test.getName() + " " + test.getAge());}

(2)普通多参数方式

xml配置,这里通过获取payload里面的具体参数来传参的形式

    <!-- 测试多参数传递 --><int:channel id="moreParamChannel"/><int:service-activator input-channel="moreParamChannel"expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>

发送方Service里面,将所有的参数通过Map形式装载到payload里面

    /*** 测试多参数传输*/@Overridepublic void moreParamm() {System.out.println("greetMoreParam方法");HashMap<String, String> map = new HashMap();map.put("name", "moreParam");map.put("age", "18");helloWorldMoreParamChannel.send(MessageBuilder.withPayload(map).build());}

接收方Service里面

    @Overridepublic void helloMoreParam(String name, String age) {System.out.println(name + " " + age);}

(3)JMS方式

xml配置,这里配置了个MQ,将消息放入mq中进行传递

    <!-- 测试Mq配置--><int:channel id="topicChannel"/><bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://127.0.0.1:61616?trace=true&keepAlive=true</value></property><property name="useAsyncSend" value="true"/></bean><int-jms:outbound-channel-adapter channel="topicChannel" destination-name="topic.myTopic" pub-sub-domain="true"/><int:channel id="listenerChannel"/><int-jms:message-driven-channel-adapter id="messageDrivenAdapter" channel="listenerChannel"destination-name="topic.myTopic" pub-sub-domain="true"/><int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage"/>

发送方Service里面

    /*** 使用mq进行传输发送方法*/@Overridepublic void send() {HashMap<String,Object> map = new HashMap<>();map.put("name","MqService");map.put("age","18");topicChannel.send(MessageBuilder.withPayload(map).build());}

接收方Service里面

public void processMessage(HashMap<String,Object> map) {System.out.println("MessageListener::::::Received message: " + map.toString());}

(4)订阅方式

xml配置,这里配置了两个订阅者,订阅者分别是两个方法

    <!-- 测试订阅发布 --><!--min-subscribers=""参数为预期最小订阅者,如果必须有订阅者,则这里填写最少数;默认值为0--><int:publish-subscribe-channel id="pubsubChannel"/><int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveOne"></int:outbound-channel-adapter><int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveTwo"></int:outbound-channel-adapter>

发送方Service里面

    @Overridepublic void pubsubSend() {Test test = new Test();test.setName("pubsubSend");test.setAge("18");publishSubscribeChannel.send(MessageBuilder.withPayload(test).build());}

接收方Service里面

    @Overridepublic void helloReceiveOne(Test test){System.out.println("One:"+test.getName()+" "+test.getAge());}@Overridepublic void helloReceiveTwo(Test test){System.out.println("Two:"+test.getName()+" "+test.getAge());}

(5)router方式

xml配置,这里配置了一个入口通道,当消息进入入口后,通过判断header里面的'tsetHeader'参数的值,如果值为A,则进入routerAChannel通道,如果为B则进入routerBChannel通道。进入通道后分别进入两者的接收方法中。其中两种方法用了传递类,和多参数传递的形式。

    <!-- 测试路由 --><!-- 路由入口 --><int:channel id="routingChannel"><int:queue/></int:channel><!-- 路由器 --><int:header-value-router input-channel="routingChannel" header-name="testHeader"><int:poller fixed-delay="0"/><int:mapping value="A" channel="routerAChannel"/><int:mapping value="B" channel="routerBChannel"/></int:header-value-router><!-- 路由出口 --><int:channel id="routerAChannel"><int:queue/></int:channel><int:outbound-channel-adapter channel="routerAChannel" ref="receiveServiceImpl" method="helloRouterTest"><int:poller fixed-delay="0"/></int:outbound-channel-adapter><int:channel id="routerBChannel"><int:queue/></int:channel><int:outbound-channel-adapter channel="routerBChannel"expression="@receiveServiceImpl.helloRouterMap(payload.name,payload.age)"><int:poller fixed-delay="0"/></int:outbound-channel-adapter>

发送方Service里面

    @Overridepublic void routerA(String name, String age) {Test test = new Test();test.setAge(age);test.setName(name);routingChannel.send(MessageBuilder.withPayload(test).setHeader("testHeader", "A").build());}@Overridepublic void routerB(String name, String age) {HashMap<String,String> map = new HashMap<>();map.put("name", name);map.put("age", age);routingChannel.send(MessageBuilder.withPayload(map).setHeader("testHeader", "B").build());}

接收方Service里面

    @Overridepublic void helloRouterTest(Test test){System.out.println("routerA方法");System.out.println("helloRouterTest:"+test.getName()+" "+test.getAge());}@Overridepublic void helloRouterMap(String name,String age){System.out.println("routerB方法");System.out.println("helloRouterMap:"+name+" "+age);}

(6)网关方式

xml配置,在这里面配置了一个接口类,当调用这个接口的方法时,就会进入网关配置的通道

    <!-- 网关通道口模式,dto --><int:channel id="getWayChannel"><int:queue/></int:channel><int:gateway service-interface="com.lin.integration.service.interfaces.UseGetWaySender" id="helloGetWaySender"default-request-channel="getWayChannel"/><int:outbound-channel-adapter channel="getWayChannel" ref="receiveServiceImpl" method="hello"><int:poller fixed-delay="0"></int:poller></int:outbound-channel-adapter><!-- 网关通道口模式,多参数传递 --><int:channel id="getWayMoreParamChannel"><int:queue/></int:channel><int:gateway service-interface="com.lin.integration.service.interfaces.MoreParamSender" id="getWayMoreParamSender"default-request-channel="getWayMoreParamChannel"/><int:outbound-channel-adapter channel="getWayMoreParamChannel"expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"><int:poller fixed-delay="0"></int:poller></int:outbound-channel-adapter>

网关interface里面

public interface UseGetWaySender {void sendMessage(Test test);}
public interface MoreParamSender {void sendMessage(Map map);}

发送方Service里面

    /*** 测试网关dto*/@Overridepublic void getWay() {Test test = new Test();test.setAge("18");test.setName("getWay");useGetWaySender.sendMessage(test);}/*** 测试网关多参数*/@Overridepublic void getWayMoreParam() {HashMap<String, String> map = new HashMap();map.put("name", "getWayMoreParam");map.put("age", "18");moreParamSender.sendMessage(map);}

(7)全局拦截器

拦截器中,将需要拦截的管道进行拦截,拦截之后就会对这个管道的发送端,接收端进行拦截,拦截的接口在上文已经提到过,拦截的配置如下

    <!-- 全局拦截器 --><int:channel-interceptor pattern="testInterceptorChannel" order="3" ref="countingChannelInterceptor"></int:channel-interceptor><int:channel id="testInterceptorChannel"/><int:service-activator input-channel="testInterceptorChannel" ref="receiveServiceImpl" method="hello"/>

对于近期的spring-integration研究,这些只是“初探”,如此好的一个框架模式,我也将在今后进行深入研究,会将文章进行补充,希望各位对于我文章里面的不足与错误的地方进行批评指出,从而能互相交流研究,多谢。

参考文献:

https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/

https://www.aliyun.com/jiaocheng/301276.html

https://blog.csdn.net/xiayutai1/article/details/53302652?locationNum=4&fps=1

http://www.iteye.com/topic/744524

https://blog.csdn.net/slivefox/article/details/3740541

https://my.oschina.net/zhzhenqin/blog/86586

http://www.importnew.com/16538.html

demo码云地址(10.21更新,增加了java dsl):

https://gitee.com/doubletreelin/spring-integration-mydemo.git

spring-integration初探相关推荐

  1. #翻译NO.3# --- Spring Integration Framework

    为什么80%的码农都做不了架构师?>>>    2.4 Message Endpoints A Message Endpoint represents the "filte ...

  2. 响应式Spring Cloud初探

    响应式Spring Cloud初探 分类:工程 原文链接:The Road to Reactive Spring Cloud 作者:  JOSH LONG 译者: helloworldtang 日期: ...

  3. Spring Integration学习资料

    Spring Integration学习资料 1.1     背景 Spring框架的一个重要主题是控制反转.从广义上来说,Spring处理其上下文中管理的组件的职责.只要组件减轻了职责,它们同时也被 ...

  4. Spring Integration 4.3.10 发布,Spring 消息通信

    Spring Integration 4.3.10 发布了.Spring Integration 能在基于 Spring 的应用中进行简单的消息通信,并通过简单的适配器与外部系统集成.这些适配器提供了 ...

  5. #翻译NO.5# --- Spring Integration Framework

    为什么80%的码农都做不了架构师?>>>    本人觉得这一章很重要,那就是 Spring默认的channel 的实现 DirectChannel,这个要大家多按照原文理解,开发者为 ...

  6. ESB学习笔记(Spring Integration实战)

    http://wangleifire.iteye.com/blog/351749 介绍 Spring Integration是Spring公司的一套ESB框架. 前面ESB介绍中我也做了一定了解.我们 ...

  7. #翻译NO.4# --- Spring Integration Framework

    为什么80%的码农都做不了架构师?>>>    Part III. Core Messaging This section covers all aspects of the cor ...

  8. java中channelmessage,MessageStore支持的QueueChannel与Spring Integration Java Config

    Spring Integration reference guide指的是使用MessageStore实现为QueueChannel提供持久性. 提到了很多次,但是所有示例都使用XML配置,即 但是Q ...

  9. Java开源项目:Spring Integration

    2019独角兽企业重金招聘Python工程师标准>>> Java开源项目:Spring Integration http://www.importnew.com/16538.html ...

  10. streaming api_通过Spring Integration消费Twitter Streaming API

    streaming api 1.概述 众所周知, Spring Integration具有用于与外部系统交互的大量连接器. Twitter也不例外,而且很长一段时间以来,因为Spring Social ...

最新文章

  1. 关于0bug中一处读者质疑的回复
  2. 《大话存储》读书笔记一
  3. VS2010 VC++ 编译出错 ---error LNK2005: public: virtual __thiscall CMemDC::~CMemDC(void)
  4. python训练手势分类器_python-Keras分类器的准确性在训练过程中稳定...
  5. python发动机悬置解耦计算-按重心处整车坐标系解耦
  6. linux内核实现ipsec,基于IPv6的IPSec原理分析和在Linux内核中的实现
  7. Java Web乱码分析及解决方式(一)——GET请求乱码
  8. 深度学习 轻量级卷积神经网络设计综述
  9. 如何将qrc文件添加至VS
  10. 用python打印平行四边形_shell脚本实现图形打印(三角形 平行四边形等)
  11. Spring源码系列(五)——@Aspect源码解析
  12. 【论文笔记】RSE//结合遥感数据和气象数据改进关中平原小麦产量估算的LSTM神经网络
  13. GBase8s数据库以 RESTRICT 方式或 CASCADE 方式删除安全标签对象
  14. Java零基础必看学习教程,Java开发环境配置详解
  15. 矩阵的基变换及对应基变换下向量的坐标变换
  16. Ansoft_ansys_hfss_12.1.rar
  17. Python科研绘图第一期——线型图(Line)、条型图(Bar)、散点图(Scatter)、子图(subplot)
  18. 【§睡觉win7主题之热门电脑主题下载§】
  19. stc8a_步进电机控制,加减速
  20. html新闻网页包括主页面,工作报告之html网页制作实验报告(8页)-原创力文档...

热门文章

  1. Mac 上用 Safari 一键轻松翻译网页
  2. Jsoncpp与中文出现的问题
  3. Leetcode之无重复字符的最长字符串
  4. 月薪三千做电商?新手创业做shopee需要准备多少启动资金?
  5. 二战前,请想好这些事!
  6. STM8L探索套件学习笔记-EXTI外部中断
  7. Linux上安装Adminer
  8. 西门子S7-1500PLC大型程序,各种FB块PTO控制20多个轴
  9. PDF417码制尺寸定义
  10. Tyvj 1004 滑雪~