目录

1.背景

2.什么是业务流程驱动引擎

3.核心概念

4.技术实现

5.思考及后续规划


引言:在方舟(业务能力研发协作平台)、水滴(业务能力扩展框架)、业务能力共享模型(业务侧可视化的自定义业务对象)等数字中台基础设施的帮助下,我们的技术体系从幼儿园升入了小学,而事件可视化配置平台和业务流程驱动引擎则让我们完成了小升初。

1.背景

目前我司的业务环境和数据环境基于政府的监管要求而日趋复杂,因业务合规和数据合规的要求导致我们对业务和数据的隔离格外慎重。在这样的政策限定条件下,如何在面对不确定性的复杂业务和环境下环境,如何以安全、高效的方式进行业务复杂度的讲解及业务能力沉淀和复用是我们当前面临的课题。为什么需要业务流程驱动引擎?从业务的流程和业务活动的组织层面来讲,企业的业务实质上是一系列的业务活动基于特定的业务场景或业务规则在时间维度的时序性组合表达。在传统的研发模式中,开发人员在应用内部基于上帝之手般的代码组织聚合业务代码,但是在跨应用的消息触达和通知的场景上帝之手也难免难堪。

例如在业务的抽象和设计中,对于订单创建这个场景,在业务系统中的关联业务需要对订单创建的事件进行感知。如用户购买了一张从杭州飞往背景的机票,在支付环节因网络或某种因素导致支付失败,客服系统需要感知这个订单创建的事件然后基于订单的状态进行相关的障碍进行预测和行程预测等用户体验服务的提升,产品运营人员需要及时感知该机票支付失败后某段时间内依旧没有尝试重新支付,我们需要热线进行跟进。 在这种流程式聚合的场景上帝之手也不好使了。

2.什么是业务流程驱动引擎

业务流程驱动引擎是我们基于业务中台的业务场景打造的无服务器事件总线服务,解决跨应用之间的路由事件,消息注册、订阅、管理等问题,简化接入配合配置,帮助研发人员轻松构建松耦合、分布式的事件驱动架构的中台化的基础设施组件。

3.核心概念

事件业务中数据变化的记录或动作。

生产应用产生数据变化并发送事件消息的应用。

订阅应用:监听或消费事件消息的应用。

业务线数据产生业务部门或业务实体如新零售事业群、客服体验事业群。

消息投递通道:数据产生消息发送的通道如RockMateq或kafk。

事件名称:数据产生数据变化所产生的业务活动如创建订单/修改收货地址等。

消息主题:消息队列 RocketMQ的Topic。

事件编码:消息队列 RocketMQ的Tag。

运行模式:消息投递后消息消费的模式有自主消费模式(订阅方自己监听并处理消息)和托管消费模式(业务流程驱动引擎统一监听并处理消息)。

触发类型:消息触发处理的通道。

系统服务:业务提供的消息触达后调用的处理程序或服务接口。

4.技术实现

为实现事件驱动架构,我们从消息通道统一、云服务配置简化、业务系统接入集成、业务配置可视、业务流程可编排五个方面入手,来建设基于事件消息驱动的业务流程驱动引擎引擎系统。

4.1 统一消息通道

目前的业务中使用了多种消息中间件,在中台化建设中我们统一了消息中间件的选型,统一使用阿里云的消息队列 RocketMQ进行消息生产和消费,通过提供统一的消息总线服务,消息的接入可以简化到只需要调用消息发送的接口即可;系统无需额外进行配置,无需关心业务环境的隔离,我们通过SpringBoot规范提供了我司内部的Starter集成。

案列一:使用消息总线发送事件消息

1 引入 依赖包:

            <dependency><groupId>com.xxxx.event</groupId><artifactId>business-driven-engine-client</artifactId><version>${business-driven-engine-version}</version></dependency>

使用示例代码:

@Reference(version = "1.0.0", timeout = 3000, check = false, interfaceClass = EventSourceService.class)private EventSourceService eventSourceService;/*** 同步发送事件消息* @param eventSourceMessage 事件消息* @return*/public  Result<Boolean>  publish(EventSourceMessage eventSourceMessage) {try {Result<Boolean> result = eventSourceService.publish(eventSourceMessage);logger.info("EventSourceService.publish### 同步发送信息到事件中心. eventSourceMessage:{}, result={}", eventSourceMessage.toString(), result);return result;} catch (Exception e) {logger.error("EventSourceService.publish### 同步消息发送异常, eventSourceMessage={} msg={}", eventSourceMessage.toString(), e.getMessage());return  Result.failure(ErrorCode.DATA_COLLECT_MESSAGE_SEND_EXCEPTION.getErrCode(), e.getMessage());}}/*** 异步发送事件消息* @param eventSourceMessage 事件消息* @return*/public  Result<Boolean>  asyncPublish(EventSourceMessage eventSourceMessage) {try {Result<Boolean> result = eventSourceService.asyncPublish(eventSourceMessage);logger.info("EventSourceService.asyncPublish### 异步发送信息到事件中心. eventSourceMessage:{}, result={}", eventSourceMessage.toString(), result);return result;} catch (Exception e) {logger.error("EventSourceService.asyncPublish### 异步消息发送异常, eventSourceMessage={} msg={}", eventSourceMessage.toString(), e.getMessage());return  Result.failure(ErrorCode.DATA_COLLECT_MESSAGE_SEND_EXCEPTION.getErrCode(), e.getMessage());}}

案列二:使用集成Starter发送事件消息

1 引入依赖包:

            <dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 引用消息生产者

@Autowired
private ProducerBean producerBean;

ProducerBean已经在应用启动的时候构建完成。

通过nacos配置中心对项目环境、日常环境、预发环境、生产环境进行了隔离,配置统一在配置中心配置,业务方无需关注技术细节。

    private static final Logger logger = LoggerFactory.getLogger(ProducerRepository.class);/*** Job处理线程池*/ExecutorService callbackExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1,Runtime.getRuntime().availableProcessors() + 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactoryBuilder().setNameFormat("event-driven-engine-callback-executor-pool-%d").build());@Autowiredprivate ProducerBean producerBean;/*** 发布事件消息** @param eventSourceMessage*/public SendResult rocketPublish(EventSourceMessage eventSourceMessage, EventDO event) {//消息结构化MessageBody messageBody = new MessageBody(JSONObject.toJSONString(eventSourceMessage));Message message = new Message(event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), messageBody.getMessage().getBytes());return producerBean.send(message);}/*** 异步发送事件消息,只要不抛异常就是成功* @param eventSourceMessage* @param event*/public void asyncPublish(EventSourceMessage eventSourceMessage, EventDO event) {producerBean.setCallbackExecutor(callbackExecutor);//消息结构化MessageBody messageBody = new MessageBody(JSONObject.toJSONString(eventSourceMessage));Message message = new Message(event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), messageBody.getMessage().getBytes());producerBean.sendAsync(message, new SendCallback() {@Overridepublic void onSuccess(final SendResult sendResult) {if (StringUtils.isNotBlank(sendResult.getMessageId())) {logger.info("异步消息发送成功: topic={}, eventCode={}, bizKey={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey());} else {logger.info("异步消息发送失败: topic={}, eventCode={}, bizKey={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey());}}@Overridepublic void onException(final OnExceptionContext context) {logger.info("异步消息发送异常: topic={}, eventCode={}, bizKey={}, messageId={}", event.getEventTopic(), event.getEventCode(), eventSourceMessage.getKey(), context.getMessageId());}});}

案列三:使用消息托管模式处理相关业务(业务驱动引擎统一消费消息并调用绑定的业务处理逻辑)

1 引入二方包

            <dependency><groupId>com.xxxx.event</groupId><artifactId>business-driven-engine-client</artifactId><version>${business-driven-engine-version}</version></dependency>

2 实现EventHandleService接口

基于便捷和统一的考量,我们统一提供泛化调用接口,使用Dubbo的泛化调用和版本隔离机制。具体实现如下:

托管消费会在消息触达后通过泛化调用工厂调用这个接口进行相关的业务处理。

案列四:使用自主消费模式处理相关业务(业务方监听消息并业务处理逻辑)

1 引入二方包

<dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 在yaml文件中配置订阅分组

spring:event:rocketmqConfig:groupId: GID_RMQ_TOPIC_EVENT_BUS

3 编写自主消费的订阅处理程序

@Slf4j
@Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")
@Component
public class BusinessDrivenEngineMessageHandler implements MessageHandler {@Overridepublic Action handle(String topic, String tag, Message message) {if(null == message){throw new EventBizException(EventEngineErrorCode.EVENT_MESSAGE_IS_EMPTY.getErrCode());}String appName =  DomainApplicationConstant.APPLICATION.toUpperCase();Subscribe subscribeAnn = this.getClass().getDeclaredAnnotation(Subscribe.class);String eventCode = subscribeAnn.tag();String subscribeCode = CommonConstant.SUBSCRIBE_PREFIX + eventCode + CommonConstant.SEPARATOR +  appName;log.info("EventDrivenEngineListener.handle#  messageId={}, topic={}, tag={}, key={}", message.getMsgID(), message.getTopic(), message.getTag(), message.getKey());Entry entry = null;try{// 定义资源。为了便于标识,资源名称定义为Group ID和Topic的组合。Group ID和Topic可以通过消息队列RocketMQ控制台获得。entry = SphU.entry("EventDrivenEngineListener.consume:" + topic);int reconsume = message.getReconsumeTimes();/** 消息已经重试了3次,如果不需要再次消费,则返回成功*/if (reconsume == CommonConstant.RETRIES_NUMBER) {log.info("EventTopicConsumeHandler.consume# 消息已经重试了{}次, 设置为消费成功", reconsume);return Action.CommitMessage;}return Action.CommitMessage;} catch(BlockException e){log.error("EventDrivenEngineListener.consume# 消息消费异常. messages={}, msg={}, exception={}", JSONObject.toJSONString(message), e.getMessage(), e);return Action.ReconsumeLater;} finally {if (entry != null) {entry.exit();}}}
}

4 注册订阅处理服务

    @Autowiredprivate BusinessDrivenEngineMessageHandler businessDrivenEngineMessageHandler;@Beanpublic EventBusRocketmqConsumeListener busRocketmqConsumeListener(){EventBusRocketmqConsumeListener eventBusRocketmqConsumeListener = new EventBusRocketmqConsumeListener();eventBusRocketmqConsumeListener.registerHandler(businessDrivenEngineMessageHandler);return eventBusRocketmqConsumeListener;}

备注:使用该注解的时候需要设置topic和tag(tag = "*"表示定约该消息下的所有Tag)

Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")

案列五:使用自主消费模式触发业务驱动引擎

此方式适用于业务方自主监听消息,然后通过触发器触发在配置平台配置好的任务清单,详请请下下文。

1 引入二方包

<dependency><groupId>com.xxxx.event.bus</groupId><artifactId>event-bus-spring-boot-starter</artifactId><version>1.0.0</version></dependency>

2 在yaml文件中配置订阅分组

spring:event:rocketmqConfig:groupId: GID_RMQ_TOPIC_EVENT_BUS

3 编写自主消费的订阅处理程序

@Slf4j
@Subscribe(topic = "RMQ_TOPIC_EVENT_BUS", tag = "EVENT_EVENT-PLATFORM_EVENT_BUS_TAG")
@Component
public class BusinessDrivenEngineMessageHandler implements MessageHandler {@Reference(version = "1.0.0", timeout = 3000, check = false, interfaceClass = EventSourceService.class)private EventSourceService eventSourceService;@Overridepublic Action handle(String topic, String tag, Message message) {if(null == message){throw new EventBizException(EventEngineErrorCode.EVENT_MESSAGE_IS_EMPTY.getErrCode());}log.info("EventDrivenEngineListener.handle#  messageId={}, topic={}, tag={}, key={}", message.getMsgID(), message.getTopic(), message.getTag(), message.getKey());try{Result<Boolean> result = eventSourceService.trigger(new EventMessageTriggerParam(message.getMsgID(), message.getTopic(), message.getTag(), message.getKey(), new String(message.getBody(), StandardCharsets.UTF_8)));return result.isSuccess() && result.getData() ? Action.CommitMessage : Action.ReconsumeLater;} catch(EventBizException e){log.error("EventDrivenEngineListener.consume# 消息消费异常. messages={}, msg={}, exception={}", JSONObject.toJSONString(message), e.getMessage(), e);return Action.ReconsumeLater;}}
}

4 注册订阅处理服务

    @Autowiredprivate BusinessDrivenEngineMessageHandler businessDrivenEngineMessageHandler;@Beanpublic EventBusRocketmqConsumeListener busRocketmqConsumeListener(){EventBusRocketmqConsumeListener eventBusRocketmqConsumeListener = new EventBusRocketmqConsumeListener();eventBusRocketmqConsumeListener.registerHandler(businessDrivenEngineMessageHandler);return eventBusRocketmqConsumeListener;}

4.2 阿里云服务配置简化

在业务调研中发现业务使用了多种消息通道,大量的Topic和队列不知道用在了什么业务场景、Topic是那也业务那个应用创建的、有哪些业务和应用订阅消费Topic、这些生产者和订阅者是为什么业务服务的,更有甚者在同一实例中跑着测试的和生产的Topic。这里的乱象我不过多叙述,基于这样的乱象,我们决定对事件的生产者和订阅者的关系进行维护,使其能够结构化的展现其组织关系,同时保证事件业务语义的场景化使其可以被复用。

4.2.1. 事件配置管理

为寻求事件治理的可视、可控、可管理,我们决定在研发协同层面做文章,具体实现方案如下:

  • 事件定义场景化:每个事件都有唯一的标识,每个事件都有业务化的语义描述;

  • 统一配置管理:提供统一的配合操作界面;

  • 集成消息通道:屏蔽底层的消息队列,对事件的消息投递业务可以选择多种消息投递通道;

  • 确定生产和消费的关系:通过结构化的方式展现事件原宿关系,如这个事件是那个业务线的哪个应用生产的、事件的业务场景是什么、使用什么消息通道、关联那个Topic以及这个事件被哪些应用订阅、订阅方订阅消息的用途、运行的模式等信息。

4.2.2 订阅配置管理

为管理业务中的订阅者我们对订阅信息使用配置界面进行配置,具体方案如下:

  • 订阅关系层次化:每个订阅者都有唯一的标识及订阅分组;

  • 订阅数据同步:通过阿里云接口同步创建订阅者的分组信息;

  • 业务可配置化:订阅者消息消费、触达运行等都通过界面进行配置下发;

订阅配置界面

4.2.3. 订阅消费服务可配置可编排

为实现一个业务事件触发后,多个订阅者的消息触发任务的编排,我们对订阅者的消费处理逻辑做了抽象和封装,具体方案如下:

  • 任务可编排:事件订阅者可以同时挂载多个触发处理任务,任务之间的执行顺序可以通过拖拽实现调整;

  • 任务可复用:挂载的任务是独立的系统服务接口,是封装的细粒度的业务逻辑,可以复用;

  • 调用通道泛化:提供统一的接口,通过实现接口和版本进行泛化调用;

5.思考及后续规划

业务驱动引擎一期上线后,彻底解决了依靠手工维护数据和配置的困境,使数据归属更加明确,使业务场景和业务语义更清晰,使业务关系更明确,使事件驱动和复用变得可能。但随着业务的发展,驱动任务的串行和并行的场景快速增加,我们又面临着新的课题。归纳起来有以下几点:

  • 如何保证并行网关和串行网关之间的输入和输出可以管道式的执行

  • 如何构建企业级的业务流程编排引擎实现地代码的业务开发?

数字化转型--基于事件驱动的中台架构实践(一)相关推荐

  1. 透过数字化转型再谈数据中台(三):一文遍历大数据架构变迁史

    编者按:<透过数字化转型再谈数据中台>系列连载 6-8 篇左右,作者结合自己在数据中台领域多年实践经验,总结了数据架构知识.BI 知识,以及分享给大家一些产业互联网实施经验.本文是系列文章 ...

  2. 32页精华图解金融业数字化转型发展报告:12大实践特点、4大展望(附PDF下载)

    <国民经济和社会发展第十四个五年规划和2035 年远景目标纲要>中提出加快数字化发展.建设数字中国的任务,要求金融业稳妥发展金融科技.加快金融机构数字化转型. 在此背景下,我国金融数字转型 ...

  3. 传统银行业务的数字化转型-中原银行大数据建设实践

    在以"场景赋能·驱动有数"为主题的神策 2018 数据驱动大会现场,中原银行刘远东发表了名为<传统银行业务的数字化转型-中原银行大数据建设实践>的主题演讲,以下内容根据 ...

  4. 汽车在转型!福特中国的架构实践

    软件正在吞噬汽车!当数字化.智能化逐渐渗透汽车时,也给传统车企带来了诸多的技术挑战.在此背景下,本文作者对云边协同及车联网的众多特性展开深入研究,并进行了基于微服务与容器化,深入的车云实践. 作者 | ...

  5. 中台干货!百度/小米/滴滴/京东,中台架构实践大比拼!

    点击"技术领导力"关注∆  每天早上8:30推送 作者| Mr.K    编辑| Emma 来源| 引力山丘 小米中台建设实践 01 小米的三大中台建设:业务+数据+技术 业务中台 ...

  6. 透过数字化转型再谈数据中台(一):关于数字化转型的几个见解

    备注:该系列连载6-8篇左右,InfoQ 首发. 本文中不分享与涉及数字化转型概念以及方法论, 这一篇是随笔记录方式,不是一篇完整的文章.(各大媒体以及网上各大网站以及很多专家都在讲述数字化转型各种方 ...

  7. 金电联行程小龙:企业数字化转型的目标、决策与实践

    在2021第六届大数据产业生态大会上,金电联行首席数据官.金电联行上海大数据产业研究院院长程小龙带来主题为"数智中的企业价值"的演讲. 在他看来,企业数字化转型首先需要明确目标,搞 ...

  8. 苏宁数据中台架构实践 附下载

    作为国内由传统零售转型数字化.互联网零售的典型企业,苏宁大数据平台汇集了30年线上.线下零售多业态数据资源,沉淀了丰富的零售经营模型,形成宝贵数据资产,并建设自研大数据产品矩阵.数据不仅支撑着苏宁各项 ...

  9. 阿里总监谢纯良,讲透《阿里中台架构实践与思考》,PPT 音频!

    欢迎关注"技术领导力",每天早上8:30推送 来源| AS大会 本文整理了,阿里技术方案总监--谢纯良,在AS大会上的题为<阿里巴巴中台技术架构--实践与思考>的分享. ...

最新文章

  1. java 自然语言处理_Java自然语言处理
  2. imagesc demo
  3. pyinstaller打包后闪现cmd黑色窗口解决方案
  4. 滴滴快车奖励政策,高峰奖励,翻倍奖励,按成交率,指派单数分级(3月21日)...
  5. sqlserver与mysql的一些不同的T-SQL语句
  6. Python学习总结(1)——编程准备和基本语法
  7. QUEST管径测试仪,非接触式测量系统
  8. Linux下socket编程的简单实例
  9. FPGA串口波特率计算
  10. 民数记研读3——于宏洁
  11. 睡眠即醒 蓝牙_如果您的Mac从睡眠中随机醒来,请尝试防止蓝牙唤醒功能 | MOS86...
  12. 自建云存储:Nextcloud vs. ownCloud vs. Seafile
  13. 「跑象科技」获得天使+融资,打造新一代实时数据基础平台
  14. 把句子拆分成单词 java_java – 将句子分成单词和标点符号
  15. DNS概述和DNS服务器部署(详细正向解析)
  16. 基于javaMail发送邮件的实现
  17. potplayer 多个进程_搞懂进程组、会话、控制终端关系,才能明白守护进程干嘛的?...
  18. Nuitka打包教程
  19. 424、电梯监控无线桥解决方案及注意事项
  20. 变频电源有哪些优点及特点

热门文章

  1. 【数据结构】-线索二叉树(后序)
  2. 关于SSM整合中遇到的org.springframework.beans.factory.BeanDefinitionStoreException: IOException pars 问题
  3. thinkphp5 获取sql语句
  4. HanLP 基于SVM支持向量机 训练 文本分类
  5. adb 钉钉自动打卡
  6. js 正则是否包含某些字符串_web前端:JavaScript 字符串是否包含某个字符串
  7. 【shell】shell标准输出与错误输出重定向
  8. 手工破解还原精灵密码(简单得很),另附,在线读取还原精灵密码
  9. toString()方法
  10. youtube 网红主博邮箱采集笔记