vert.x 分布式锁

最近,似乎我们正在听到有关Java的最新和最好的框架的消息。 Ninja , SparkJava和Play等工具; 但是每个人都固执己见,使您感到需要重新设计整个应用程序以利用它们的出色功能。 这就是为什么当我发现Vert.x时令我感到宽慰的原因。 Vert.x不是一个框架,它是一个工具包,它不受质疑,而且正在解放。 Vert.x不想让您重新设计整个应用程序以使用它,它只是想让您的生活更轻松。 您可以在Vert.x中编写整个应用程序吗? 当然! 您可以将Vert.x功能添加到现有的Spring / Guice / CDI应用程序中吗? 是的 您可以在现有JavaEE应用程序中使用Vert.x吗? 绝对! 这就是让它变得惊人的原因。

背景

Vert.x诞生于Tim Fox决定他喜欢NodeJS生态系统中正在开发的许多东西,但他不喜欢在V8中进行权衡取舍:单线程,有限的库支持以及JavaScript本身。 Tim着手编写一个对如何使用它以及如何使用它没有质疑的工具箱,他决定在JVM上实现它的最佳位置。 因此,Tim和社区开始着手创建一个事件驱动的,非阻塞的,React性的工具包,该工具包在许多方面都可以反映NodeJS可以完成的工作,而且还利用了JVM内部的强大功能。 Node.x诞生了,后来发展成为Vert.x。

总览

Vert.x旨在实现事件总线,该事件总线使应用程序的不同部分可以以非阻塞/线程安全的方式进行通信。 它的一部分是根据Eralng和Akka展示的Actor方法建模的。 它还旨在充分利用当今的多核处理器和高度并发的编程需求。 因此,默认情况下,所有Vert.x VERTICLES默认都实现为单线程。 与NodeJS不同,Vert.x可以在许多线程中运行许多顶点。 另外,您可以指定某些顶点为“工作”顶点,并且可以是多线程的。 为了给蛋糕锦上添花,Vert.x通过使用Hazelcast对事件总线的多节点群集提供了底层支持。 它继续包含许多其他令人惊奇的功能,这些功能太多了,无法在此处列出,但是您可以在Vert.x官方文档中内容。

关于Vert.x,您需要了解的第一件事是,与NodeJS一样,永远不要阻塞当前线程。 默认情况下,Vert.x中的所有内容都设置为使用回调/未来/承诺。 Vert.x不执行同步操作,而是提供异步方法来执行大多数I / O和处理器密集型操作,这些操作可能会阻塞当前线程。 现在,使用回调可能很丑陋且很痛苦,因此Vert.x可以选择提供基于RxJava的API,该API使用Observer模式实现相同的功能。 最后,Vert.x通过在许多异步API上提供executeBlocking(Function f)方法,可以轻松使用现有的类和方法。 这意味着您可以选择喜欢使用Vert.x的方式,而不是由工具包指示必须如何使用它。

了解Vert.x的第二件事是它由顶点,模块和节点组成。 顶点是Vert.x中最小的逻辑单元,通常由单个类表示。 遵循UNIX Philosophy的原则,顶点应该是简单且具有单一用途的。 一组顶点可以放到一个模块中,该模块通常打包为单个JAR文件。 一个模块代表一组相关的功能,这些功能一起使用时,可以代表整个应用程序,也可以代表较大的分布式应用程序的一部分。 最后,节点是运行一个或多个模块/垂直模块的JVM的单个实例。 由于Vert.x具有从头开始内置的群集功能,因此Vert.x应用程序可以跨越一台计算机或跨多个地理位置的多台计算机跨越节点(尽管延迟可能会掩盖性能)。

示例项目

现在,我最近去过许多聚会和会议,在他们谈论React式编程时,它们向您展示的第一件事就是构建一个聊天室应用程序。 很好,但这并不能真正帮助您完全理解响应式开发的力量。 聊天室应用程序既简单又简单。 我们可以做得更好。 在本教程中,我们将使用旧版Spring应用程序并将其转换为利用Vert.x的优势。 这具有多个目的:表明该工具包易于与现有的Java项目集成,它使我们能够利用可能是生态系统中根深蒂固的现有工具的优势,并且使我们遵循DRY原则 ,因为我们不不必重写大量代码即可获得Vert.x的好处。

我们的旧版Spring应用程序是使用Spring Boot,Spring Data JPA和Spring REST的REST API的简单示例。 源代码可以在“主”分支中找到该处 。 我们还将使用其他分支来演示进展,因此,只要对gitJava 8有一点经验的人都可以轻松进行。 让我们从检查常规Spring应用程序的Spring Configuration类开始。

@SpringBootApplication
@EnableJpaRepositories
@EnableTransactionManagement
@Slf4j
public class Application {public static void main(String[] args) {ApplicationContext ctx = SpringApplication.run(Application.class, args);System.out.println("Let's inspect the beans provided by Spring Boot:");String[] beanNames = ctx.getBeanDefinitionNames();Arrays.sort(beanNames);for (String beanName : beanNames) {System.out.println(beanName);}}@Beanpublic DataSource dataSource() {EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();return builder.setType(EmbeddedDatabaseType.HSQL).build();}@Beanpublic EntityManagerFactory entityManagerFactory() {HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();vendorAdapter.setGenerateDdl(true);LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();factory.setJpaVendorAdapter(vendorAdapter);factory.setPackagesToScan("com.zanclus.data.entities");factory.setDataSource(dataSource());factory.afterPropertiesSet();return factory.getObject();}@Beanpublic PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {final JpaTransactionManager txManager = new JpaTransactionManager();txManager.setEntityManagerFactory(emf);return txManager;}
}

正如您在课程顶部看到的那样,我们有一些非常标准的Spring Boot注释。 你还会看到@ SLF4J批注这是一部分Lombok库,旨在帮助降低锅炉板代码。 我们还有@Bean注释方法,用于提供对JPA EntityManager,TransactionManager和DataSource的访问。 这些项目中的每一个都提供可注入的对象,供其他类使用。 项目中的其余类也类似地简化。 有一个客户 POJO,它是服务中使用的实体类型。 通过Spring Data创建了一个CustomerDAO 。 最后,有一个CustomerEndpoints类,它是JAX-RS注释的REST控制器。

如前所述,这是Spring Boot应用程序中的所有标准票价。 该应用程序的问题在于,在大多数情况下,它的可伸缩性有限。 您可以在Servlet容器中运行此应用程序,也可以在Jetty或Undertow之类的嵌入式服务器中运行该应用程序。 无论哪种方式,每个请求都占用一个线程,因此在等待I / O操作时浪费了资源。

切换到Convert-To-Vert.x-Web分支,我们可以看到Application类发生了一些变化。 现在,我们有了一些新的@Bean批注方法来注入Vertx实例本身,以及ObjectMapper实例(Jackson JSON库的一部分)。 我们还用新的CustomerVerticle替换了CustomerEnpoints类。 几乎所有其他内容都是相同的。

CustomerVerticle类带有@Component注释,这意味着Spring将在启动时实例化该类。 它还具有用@PostConstruct注释的start方法,以便在启动时启动Verticle。 查看代码的实际内容,我们看到Vert.x代码的第一部分: Router

Router类是vertx-web库的一部分,它使我们能够使用流畅的API来定义HTTP URL,方法和标头过滤器以进行请求处理。 将BodyHandler实例添加到默认路由可以处理POST / PUT正文并将其转换为JSON对象,然后Vert.x可以将其作为RoutingContext的一部分进行处理。 Vert.x中的路由顺序可能很重要。 如果定义的路由具有某种形式的全局匹配(*或regex),则除非实现chaining ,否则它可能会吞噬在其后定义的路由的请求。 我们的示例最初显示了3条路线。

@PostConstructpublic void start() throws Exception {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());router.get("/v1/customer/:id").produces("application/json").blockingHandler(this::getCustomerById);router.put("/v1/customer").consumes("application/json").produces("application/json").blockingHandler(this::addCustomer);router.get("/v1/customer").produces("application/json").blockingHandler(this::getAllCustomers);vertx.createHttpServer().requestHandler(router::accept).listen(8080);}

请注意,定义了HTTP方法,定义了“ Accept”标头(通过消耗),定义了“ Content-Type”标头(通过生产)。 我们还看到我们正在通过对blockingHandler方法的调用传递对请求的处理。 Vert.x路由的阻塞处理程序接受RoutingContext对象,因为它是唯一的参数。 RoutingContext保存Vert.x请求对象,响应对象和任何参数/ POST主体数据(例如“:id”)。 您还将看到,我使用方法引用而不是lambda来将逻辑插入blockingHandler(我发现它更具可读性)。 这3条请求路由的每个处理程序都在该类中一个单独的方法中定义。 这些方法基本上只是调用DAO上的方法,根据需要进行序列化或反序列化,设置一些响应头,并通过发送响应来结束请求。 总体而言,非常简单明了。

private void addCustomer(RoutingContext rc) {try {String body = rc.getBodyAsString();Customer customer = mapper.readValue(body, Customer.class);Customer saved = dao.save(customer);if (saved!=null) {rc.response().setStatusMessage("Accepted").setStatusCode(202).end(mapper.writeValueAsString(saved));} else {rc.response().setStatusMessage("Bad Request").setStatusCode(400).end("Bad Request");}} catch (IOException e) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", e);}}private void getCustomerById(RoutingContext rc) {log.info("Request for single customer");Long id = Long.parseLong(rc.request().getParam("id"));try {Customer customer = dao.findOne(id);if (customer==null) {rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");} else {rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));}} catch (JsonProcessingException jpe) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", jpe);}}private void getAllCustomers(RoutingContext rc) {log.info("Request for all customers");List customers = StreamSupport.stream(dao.findAll().spliterator(), false).collect(Collectors.toList());try {rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(customers));} catch (JsonProcessingException jpe) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", jpe);}}

您可能会说:“但是,这比我的Spring注释和类还要更多的代码和混乱”。 这可能是正确的,但实际上取决于您如何实现代码。 这只是一个介绍性的示例,因此我使代码非常简单易懂。 我可以使用Vert.x的注释库以类似于JAX-RS的方式实现端点。 此外,我们还获得了可扩展性的巨大改进。 在幕后,Vert.x Web使用Netty进行低级异步I / O操作,从而使我们能够处理更多并发请求(受数据库连接池的大小限制)。

通过使用Vert.x Web库,我们已经对该应用程序的可伸缩性和并发性进行了一些改进,但是通过实现Vert.x EventBus ,我们可以做一些改进。 通过将数据库操作分为Worker Verticles,而不是使用blockingHandler,我们可以更有效地处理请求处理。 这在“ 转换为工作人员垂直”分支中显示。 应用程序类保持不变,但是我们更改了CustomerEndpoints类,并添加了一个名为CustomerWorker的新类。 此外,我们添加了一个名为Spring Vert.x Extension的新库,该库为Vert.x Verticles提供了Spring Dependency Injections支持。 首先查看新的CustomerEndpoints类。

@PostConstructpublic void start() throws Exception {log.info("Successfully create CustomerVerticle");DeploymentOptions deployOpts = new DeploymentOptions().setWorker(true).setMultiThreaded(true).setInstances(4);vertx.deployVerticle("java-spring:com.zanclus.verticles.CustomerWorker", deployOpts, res -> {if (res.succeeded()) {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());final DeliveryOptions opts = new DeliveryOptions().setSendTimeout(2000);router.get("/v1/customer/:id").produces("application/json").handler(rc -> {opts.addHeader("method", "getCustomer").addHeader("id", rc.request().getParam("id"));vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));});router.put("/v1/customer").consumes("application/json").produces("application/json").handler(rc -> {opts.addHeader("method", "addCustomer");vertx.eventBus().send("com.zanclus.customer", rc.getBodyAsJson(), opts, reply -> handleReply(reply, rc));});router.get("/v1/customer").produces("application/json").handler(rc -> {opts.addHeader("method", "getAllCustomers");vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));});vertx.createHttpServer().requestHandler(router::accept).listen(8080);} else {log.error("Failed to deploy worker verticles.", res.cause());}});}

路由相同,但实现代码不同。 现在,我们不再使用对blockingHandler的调用,而是实现了适当的异步处理程序,该处理程序在事件总线上发送事件。 此Verticle中不再进行任何数据库处理。 我们已将数据库处理移至一个工作线程,该线程具有多个实例,以线程安全的方式并行处理多个请求。 我们还为这些事件的回复时间注册了一个回调,以便我们可以向发出请求的客户端发送适当的响应。 现在,在CustomerWorker Verticle中,我们已经实现了数据库逻辑和错误处理。

@Override
public void start() throws Exception {vertx.eventBus().consumer("com.zanclus.customer").handler(this::handleDatabaseRequest);
}public void handleDatabaseRequest(Message<Object> msg) {String method = msg.headers().get("method");DeliveryOptions opts = new DeliveryOptions();try {String retVal;switch (method) {case "getAllCustomers":retVal = mapper.writeValueAsString(dao.findAll());msg.reply(retVal, opts);break;case "getCustomer":Long id = Long.parseLong(msg.headers().get("id"));retVal = mapper.writeValueAsString(dao.findOne(id));msg.reply(retVal);break;case "addCustomer":retVal = mapper.writeValueAsString(dao.save(mapper.readValue(((JsonObject)msg.body()).encode(), Customer.class)));msg.reply(retVal);break;default:log.error("Invalid method '" + method + "'");opts.addHeader("error", "Invalid method '" + method + "'");msg.fail(1, "Invalid method");}} catch (IOException | NullPointerException e) {log.error("Problem parsing JSON data.", e);msg.fail(2, e.getLocalizedMessage());}
}

CustomerWorker工人垂直服务器在事件总线上注册消费者以获取消息。 代表事件总线上地址的字符串是任意的,但是建议使用反向tld样式的命名结构,以确保地址唯一(“ com.zanclus.customer”)很简单。 每当有新消息发送到该地址时,它将被传递到一个,只有一个工作层。 然后,工作层将调用handleDatabaseRequest来完成数据库工作,JSON序列化和错误处理。

你有它。 您已经看到Vert.x可以集成到旧版应用程序中,以提高并发性和效率,而不必重写整个应用程序。 我们可以使用现有的Google Guice或JavaEE CDI应用程序完成类似的操作。 当我们在Vert.x中尝试添加响应功能时,所有业务逻辑都可能保持相对不变。 下一步由您决定。 接下来的一些想法包括Clustering , WebSockets和ReactiveX sugar的VertxRx 。

翻译自: https://www.javacodegeeks.com/2015/12/reactive-development-using-vert-x.html

vert.x 分布式锁

vert.x 分布式锁_使用Vert.x进行响应式开发相关推荐

  1. mysql 分布式锁_【分布式锁的演化】分布式锁居然还能用MySQL?

    前言 之前的文章中通过电商场景中秒杀的例子和大家分享了单体架构中锁的使用方式,但是现在很多应用系统都是相当庞大的,很多应用系统都是微服务的架构体系,那么在这种跨jvm的场景下,我们又该如何去解决并发. ...

  2. 请列举你了解的分布式锁_面试官想要你回答的分布式锁实现原理

    写在前面 在了解分布式锁具体实现方案之前,我们应该先思考一下使用分布式锁必须要考虑的一些问题.​ 互斥性:在任意时刻,只能有一个进程持有锁. 防死锁:即使有一个进程在持有锁的期间崩溃而未能主动释放锁, ...

  3. redis setnx 分布式锁_手写Redis分布式锁

    分布式锁使用场景 现在的系统都是集群部署,每个服务都不是单节点的了.比如库存服务,可能部署到3台机器上分别命名为节点1,节点2,节点3.库存服务需要扣减库存,扣减库存肯定需要锁吧,如果使用Lock或者 ...

  4. 请列举你了解的分布式锁_终于搞懂分布式锁是什么了!

    当下在互联网技术架构中,最流行的莫过于分布式架构了.为什么大家纷纷都采用分布式架构呢? 1.高效低廉,将部署在高性能机的程序分散在多个小型机中部署: 2.扩展性强,可随着业务的扩展而横向扩展系统的性能 ...

  5. 请列举你了解的分布式锁_这几种常见的“分布式锁”写法,搞懂再也不怕面试官,安排!...

    什么是分布式锁? 大家好,我是jack xu,今天跟大家聊一聊分布式锁.首先说下什么是分布式锁,当我们在进行下订单减库存,抢票,选课,抢红包这些业务场景时,如果在此处没有锁的控制,会导致很严重的问题. ...

  6. zookeeper 分布式锁_关于redis分布式锁,zookeeper分布式锁原理的一些学习与思考

    编辑:业余草来源:https://www.xttblog.com/?p=4946 首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法 ...

  7. redis cluster 分布式锁_关于分布式锁原理的一些学习与思考redis分布式锁,zookeeper分布式锁...

    首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法.变量. 在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在j ...

  8. 使用Vert.x进行响应式开发

    最近,似乎我们正在听到有关Java的最新和最好的框架的消息. 忍者 , SparkJava和Play等工具; 但是每个人都固执己见,使您感到您需要重新设计整个应用程序以利用它们的出色功能. 这就是为什 ...

  9. 浅析“分布式锁”的实现方式丨C++后端开发丨底层原理

    线程锁.进程锁以及分布式锁相关视频讲解:详解线程锁.进程锁以及分布式锁 如何高效学习使用redis相关视频讲解:10年大厂程序员是如何高效学习使用redis Linux服务器开发高级架构学习视频:C/ ...

最新文章

  1. Mysql InnoDB索引分析
  2. reactjs typescript数据传递
  3. piovt table python_python – Pandas:pivot和pivot_table之间的区别...
  4. 15分钟内开始使用Amazon Web Services和全自动资源调配
  5. 十八、MySQL之TCL事务控制语言(详解)
  6. linux perl 单例模式,Perl脚本学习经验(三)--Perl中ftp的使用
  7. 第 5-1 课:线程与死锁 + 面试题
  8. 如何有效开展小组教学_如何有效地开展小组合作学习——数学科主题教研活动...
  9. WPF 虚拟化 VirtualizingWrapPanel 和 VirtualLizingTilePanel
  10. 一个脚本就能明白Shell 脚本中的位置参数的含义
  11. 程序员修炼之道:从小工到专家pdf
  12. 大型企业网络配置系列课程详解(六) --PPP链路的配置与相关概念的理解
  13. mysql整段注释_MySQL 添加注释(comment)
  14. bootstrap你让前端小狮子们又喜又恨
  15. 解决【Windows+Delphi+多线程+String】效率低的问题
  16. 最土团购系统常见问题的汇总
  17. NoSQLBooster4MongoDB - 用SQL查询MongoDB
  18. 大数据教孩子如何写好作文
  19. 误Ghost后的分区恢复
  20. python实现提取视频里的语音转换为文字

热门文章

  1. P5659-[CSP-S2019]树上的数【贪心】
  2. jzoj3461-小麦亩产一千八【斐波那契数列】
  3. 【模板】KMP算法、fail树
  4. Sentinel(四)之工作主流程
  5. Tomcat 使用apr优化
  6. Class类中的getEnclosingXX、getDeclaredXX
  7. Java开发必须掌握的8种网站攻防技术
  8. MyBatis中ThreadLocal
  9. 2020蓝桥杯省赛---java---B---5( REPEAT 程序)
  10. 二叉树的前中后序查找+思路分析