Event Bus(事件总线) 是Vert.x的神经系统,负责应用系统消息的传递。Vert.x各模块(Verticle)之间的相互调用就是通过Event Bus实现的,因此各Verticle之间是高度解耦的。

Event Bus提供发布订阅功能和点对点的消息服务,类似于消息队列,每条消息在Event Bus上都有一个地址(address),发布者向这个地址发送消息,接收者从这个地址接收消息。

Event Bus独立于应用系统,它使用TCP协议进行通信。因此,在任何的应用中,只要能够创建TCP连接,都可以通过Event Bus连接到Vert.x实例,如下图所示。也正是因为这个,Vert.x天生就对分布式支持非常好。

Event Bus的api使用非常简单,下面我们分别从获取Event Bus实例,发布消息,订阅消息以及服务调用来逐一说明。

1. 获取Event Bus实例

获取Event Bus实例非常简单,直接通过Vert.x实例获取,代码如下

EventBus eb = vertx.eventBus();

可以看到,通过Vert.x实例获取Event Bus的实例和获取文件系统fileSystem以及创建HTTP服务createHttpServer非常类似。拿到Event Bus的实例之后,发现Event Bus和Vertx一样,都是接口,查看接口的代码可以非常清楚的看到Event Bus对外提供的API,代码如下

@Fluent
EventBus send(String address, Object message);@Fluent
EventBus publish(String address, Object message);<T> MessageConsumer<T> consumer(String address);

为了节约篇幅,去掉了部分重载的方法。主要的几个方法send,publish,consumer,sender,publisher通过方法的名字就可以猜测出方法的用途,后面根据具体的场景来分别对这些方法进行说明。

2.发布消息

发布消息的方法有多个方法的重载,最简单的发布方法接收一个地址,和一个Object类型的消息对象,如下所示

@Fluent
EventBus publish(String address, Object message);

这种方式发布的消息,所有订阅到address的消费者都能够收到消息,这也就是发布-订阅模式。还有一种是点对点模式,这种模式发布的消息只能有一个消费者收到,方法如下

@Fluent
<T> EventBus send(String address, Object message,DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler);

3.订阅消息

对于订阅消息比较简单,只有一个方法,就是consumer,方法的声明如下

<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);

如果想要通过其他的开发语言或者在其他应用中调用Vert.x实例模块,可以通过发起TCP请求,连接到EventBus,Event Bus使用的通信协议如下,但一定不要忘记,需要创建Event Bus Bride。

<Length: uInt32><{type: String,address: String,(replyAddress: String)?,headers: JsonObject,body: JsonObject}: JsonObject>

4.远程服务调用

最后,附一个使用Event Bus进行远程组件调用的Demo,这里例子相对来讲可能有些复杂,但相比较真实企业项目还差的远。这个例子就是一个简单的微服务,多个服务通过Event Bus进行通信。

我们把每个Verticle单独为一个Maven模块,这个模块中核心有三个类,一个Verticle,一个对外暴露的接口Interface,一个接口的具体实现类。为了简单,这里只有两个模块。

1.第一个Verticle是对外提供服务的,暂且称之为FirstVerticle,对外提供一个sayHello服务,模块结构如下

(1)FirstVerticle中就是发布Service到总线上,代码非常简单,这里只列出start方法的代码

@Override
public void start() throws Exception {Service service = Service.create(vertx);new ServiceBinder(vertx).setAddress(address).register(Service.class, service);
}

发布服务,并没有调用consumer方法,而是调用了ServiceBinder类的register方法,实际上这个方法的底层就是在调用consumer方法,只是帮我们做了很多的其实事情,这里暂且不去关注。

(2)Service接口中定义sayHello方法,并且提供了获取当前实例的方法(JDK8的新特性)

@ProxyGen
@VertxGen
public interface Service {/*** 这个方法是提供给自己使用,方便创建服务的实现类** @param vertx* @return*/static Service create(Vertx vertx) {return new ServiceImpl(vertx);}/*** 这个方法给消费者使用,便于消费者创建生产者的代理类,以此来消费生产者的服务** @param vertx* @param address* @return*/static Service createProxy(Vertx vertx, String address) {return new ServiceVertxEBProxy(vertx, address);}/*** 测试接口方法** @param name*/void sayHello(String name, Handler<AsyncResult<JsonObject>> resultHandle);}

这里有三个方法,create方法是为了便于获得类的实例,这个比较好理解。createProxy方法实际上在我们当前的演示案例中并没有使用到,这里读者可以先不用去关注。对于sayHello方法的resultHandle这个参数可能不是很理解,这个参数非常简单,因为Vert.x是一个异步框架,sayHello方法也是异步执行的,那么方法的返回结构就是通过这个参数封装的。

(3)ServiceImpl的实现类的代码就更简单了,就一个sayHello方法的实现,如下

@Override
public void sayHello(String name, Handler<AsyncResult<JsonObject>> resultHandle) {resultHandle.handle(Future.succeededFuture(new JsonObject().put("msg", "SUCCESS")));
}

给调用者回了个Json对象,这个对象中包含一个msg,值为SUCCESS

(4)还有四,不就上面三个吗,还有一个很关键的部分,package-info

@ModuleGen(groupPackage = "stu.vertx.cluster.service.hello", name = "FirstVerticle")
package stu.vertx.cluster.service.hello;import io.vertx.codegen.annotations.ModuleGen;

这里描述的是这个模块的包,这个是必须的,否则ServiceProxy不能生成,其他的也都是白扯。所以这里,很关键,很关键,很关键。

2.第一个组件就完成了,下面是第二个组件,第二个组件来调用第一个组件,完成一系列操作。这个组件接收客户端请求,然后接收客户端上送的name属性,传递给第二个组件,并拿到第二个组件的返回值。这个组件不对外提供服务,因此就一个Verticle就可以了。代码如下

@Override
public void start() throws Exception {HttpServer httpServer = vertx.createHttpServer();httpServer.requestHandler(request -> {// 获取到response对象HttpServerResponse response = request.response();// 设置响应头response.putHeader("Content-type", "text/html;charset=utf-8");// 通过配置action参数,指定要走哪一个方法DeliveryOptions options = new DeliveryOptions();options.addHeader("action", "sayHello");// 这个是给方法传入的参数JsonObject config = new JsonObject();config.put("name", "xiaozhang");// 通过eventBus调用方法vertx.eventBus().<JsonObject>send("service.demo.firstverticle", config, options, res -> {// 响应数据response.end(res.result().body().getString("msg"));});});httpServer.listen(1234);
}

OK,到这里,一个远程服务调用就完成了,是不是非常简单!

(一)Vert.x 简明介绍 https://blog.csdn.net/king_kgh/article/details/80772657

(二)Vert.x创建简单的HTTP服务 https://blog.csdn.net/king_kgh/article/details/80804078

(三)Vert.x Web开发之路由 https://blog.csdn.net/king_kgh/article/details/80848571

(四)Vert.x TCP服务实现 https://blog.csdn.net/king_kgh/article/details/84870775

(五)Vert.x数据库访问 https://blog.csdn.net/king_kgh/article/details/84894599

(六)Vert.x认证和授权 https://blog.csdn.net/king_kgh/article/details/85218454

(七)Vert.x事件总线(Event Bus)与远程服务调用 https://blog.csdn.net/king_kgh/article/details/86993812

Vert.x 案例代码:https://github.com/happy-fly

笔者就职于一家互联网支付公司,公司的核心项目使用的是Vert.x技术体系。记得笔者刚进入公司,接触Vert.x的时候,找遍了大大小小的网站,发现市面上关于Vert.x的文档除了官方文档外,几乎找不到其他资料。当时就励志要出一个专栏来写写Vert.x,以此帮助在Vert.x上采坑的朋友。因为笔者能力有限,文章中难免会有错误和疏漏,如果您对文章有意见或者好的建议,可以直接留言或者发送邮件到18366131816@163.com。如果您也对Vert.感兴趣,可以加入到我们,共同学习Vert.x,并推进国内开发者对Vert.x的认知。

Vert.x(vertx) 事件总线(EventBus)与 远程服务调用相关推荐

  1. vue 事件总线EventBus的概念、使用以及注意点

    vue组件中的数据传递最最常见的就是父子组件之间的传递.父传子通过props向下传递数据给子组件:子传父通过$emit发送事件,并携带数据给父组件.而有时两个组件之间毫无关系,或者他们之间的结构复杂, ...

  2. vue中央事件总线eventBus的简单理解和使用

    公共事件总线eventBus的实质就是创建一个vue实例,通过一个空的vue实例作为桥梁实现vue组件间的通信.它是实现非父子组件通信的一种解决方案. 用法如下: 第一步:项目中创建一个js文件(我通 ...

  3. Google guava 事件总线 EventBus 进程内消息队列

    Google guava 事件总线 EventBus 创建事件总线流程 码代码 引入依赖 一个简单的事件处理 监听者 创建事件生产者总线.注册事件监听者.发送事件 运行结果 扩展 多个事件监听者加De ...

  4. Vert.x实战 事件总线:Vert.x应用程序的主干

    本章包括: 什么是事件总线 如何在事件总线上拥有点对点通信[point-to-point].请求-应答通信[request-reply].发布/订阅通信[publish/subscribe] 用于在网 ...

  5. Android之事件总线EventBus详解

    顾名思义,AndroidEventBus是一个Android平台的事件总线框架,它简化了Activity.Fragment.Service等组件之间的交互,很大程度上降低了它们之间的耦合,使我们的代码 ...

  6. 自己动手写事件总线(EventBus)

    2019独角兽企业重金招聘Python工程师标准>>> 本文由云+社区发表 事件总线核心逻辑的实现. <!--more--> EventBus的作用 Android中存在 ...

  7. Android事件总线——EventBus的使用

    前言 首先我们来说下事件总线,它的作用:为了更简化并更高质量的在Activity,Fragment,Thread和Service等之间的通信,解决组件之间高耦合的同时仍能进行高效的通信. 什么是Eve ...

  8. 200代码写一套属于自己的事件总线(EventBus)库

    理论千万篇,不如实战来一篇. 源码 https://github.com/harvie1208/EventBus 关键词:观察者模式.反射.自定义注解.线程调度 手写200行代码,一步一步实现Even ...

  9. 2021-12-14 vue移动端卖座电影项目(十二) 使用mapState控制封装选项卡tabbar的显隐,以及回顾使用中央事件总线eventbus和vuex的state控制tabbar显隐的异同

    文章目录 0.实现场景:进入详情页时,底部选项卡隐藏 1.使用中央事件总线控制tabbar的v-show的值 2.使用vuex的state控制tabbar的v-show的值 3.使用vuex的muta ...

最新文章

  1. Google Protocol Buffers介绍
  2. php mysql 分组 分页_简单的PHP+Mysql实现分页
  3. 4号团队-团队任务5:项目总结会
  4. 清翔电子单片机原理图stc89c52_1000. 电子编程入门到工程师--从看得到开始
  5. 牛客假日团队赛1 A.蹄球锦标赛
  6. 不搞虚的!快速把你拉入Docker 的门里 | 原力计划
  7. 测开之路十六:@classmethod与@staticmethod
  8. php if多条件_通过PHP与Python代码对比浅析语法差异
  9. c语言printf输出格式
  10. linux安装qt4支持包,CentOS安装QT4遇到的问题
  11. 计算机鼠标双击怎么,鼠标双击变成属性怎么办 鼠标双击变成属性解决办法【详解】...
  12. 抑郁症:从自毁到重生,可能你只差一个它
  13. Tomcat部署静态页面
  14. Java面试题(持续日更)
  15. python文件中单词的删除_使用python删除文件中的多余单词
  16. PS高效处理图片总结
  17. Flutter 流式布局、自动换行(Wrap、Flow)
  18. python花式输出_Python——花式打印对象的若干种方法
  19. 基于Asterisk和TTS/ASR语音识别配置示例
  20. oracle用数据泵,Navicat 教程:Oracle 数据泵是什么

热门文章

  1. 局部变量与成员变量的·区别!
  2. springboot集成rabbitmq,根据查询的信息创建多个消息中心和消息队列,并实现不同的消息发送到不同的消息中心
  3. Caused by: com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedC
  4. scrolltop一直为0_「超级玛丽max2.0」「达尔文3号」「超级玛丽max3.0」,三军之战最全解析...
  5. 知乎点赞过万留学生自述:我们为什么需要代写???
  6. 基于Python文本内容/情感的对微博文本自动二元分类
  7. 【传感器模块】 HC-SR501 人体红外感应模块 热释电 红外传感器
  8. 【C/C++】char * ,char ** ,char a[ ] ,char *a[]
  9. 【2022研电赛】商业计划书赛道华南区二等奖:基于机器视觉的智能驾驶辅助系统
  10. c语言函数未定义的引用,c – CMake“未定义的函数引用”