高性能消息中间件——NATS
在介绍NATS之前先了解下什么是分布式系统和消息中间件对于分布式系统的定义,一直以来我都没有找到或者想到特别简练而又合适的定义,这里引用一下Distributed System Concepts and Design (Thrid Edition)中的一句话A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messages,从这句话我们可以看到几个重点,一是组件分布在网络计算机上,二是组件之间仅仅通过消息传递来通信并协调行动。消息中间件维基百科给出的定义为Message-oriented middleware(MOM) is software infrastructure focused on sending and receiving messages between distrubuted systems,意思就是面向消息的系统(消息中间件)是在分布式系统中完成消息的发送和接收的基础软件
复制代码
图片描述(最多50字)
消息中间件常被提及的好处即异步和解耦,市面上常常被使用到的中间件有RabbitMQ, ActiveMQ, Kafka等,他们的关注度和使用率都非常的高,并且使用起来也非常的方便。公司的WiseCloud产品就集成了RabbitMQ。而在下一个版本的更新中将会使用NATS来替换RabbitMQ。使用NATS的好处比较多首先就是其性能非常好,下面引用官网的性能对比图:
复制代码
图片描述(最多50字)
NATS介绍NATS是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的Publish/Subscribe模型,使用Golang语言开发。NATS的开发哲学认为高质量的QoS应该在客户端构建,故只建立了Request-Reply,不提供 1.持久化 2.事务处理 3.增强的交付模式 4.企业级队列。NATS消息传递模型NATS支持各种消息传递模型,包括:发布订阅(Publish Subscribe)请求回复(Request Reply)队列订阅(Queue Subscribers )提供的功能:纯粹的发布订阅模型(Pure pub-sub)服务器集群(Cluster mode server)自动精简订阅者(Auto-pruning of subscribers)基于文本协议(Text-based protocol)多服务质量保证(Multiple qualities of service - QoS)发布订阅(Publish Subscribe)NATS将publish/subscribe消息分发模型实现为一对多通信,发布者在Subject上发送消息,并且监听该Subject在任何活动的订阅者都会收到该消息
复制代码
图片描述(最多50字)
java:
复制代码
//publish Connection nc = Nats.connect("nats://127.0.0.1:4222"); nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8)); //subscribe Subscription sub = nc.subscribe("subject"); Message msg = sub.nextMessage(Duration.ofMillis(500)); String response = new String(msg.getData(), StandardCharsets. UTF_8); 或者是基于回调的subscribe
//subscribe Dispatcher d = nc.createDispatcher(msg - >{ String response = new String(msg.getData(), StandardCharsets.UTF_8) //do something }) d.subscribe("subject"); 请求响应(Request Reply)
NATS支持两种请求响应消息:点对点或多对多。点对点涉及最快或首先响应。在一对多的消息交换中,需要限制请求响应的限制在Request Reply过程中,发布请求发布带有响应主题的消息,期望对该subject做出响应操作
复制代码
图片描述(最多50字)
java:
复制代码
// publish Connection connection = Nats.connect("nats://127.0.0.1:4222"); String reply = "replyMsg"; //请求回应方法回调 Dispatcher d = connection.createDispatcher(msg ->
System.out.println("reply: " + JSON.toJSONString(msg)); }) d.unsubscribe(repl , 1); //订阅请求 d.subscribe(reply); //发布请求 connection.publish("requestSub", reply, "request".getBytes(StandardCharsets. UTF_8)); //subscribe Connection nc = Nats.connect("nats://127.0.0.1:4222"); //注册订阅 Dispatcher dispatcher = nc.createDispatcher(msg -> { System.out.println(JSON.toJSONString(msg)); nc.publish(msg.getReplyTo(), "this is reply".getBytes(StandardCharsets.UTF_8)); }); dispatcher.subscribe("requestSub"); 队列订阅&分享工作(Queue Subscribers & Sharing Work)
NATS提供称为队列订阅的负载均衡功能,虽然名字为queue(队列)但是并不是我们所认为的那样。他的主要功能是将具有相同queue名字的subject进行负载均衡。使用队列订阅功能消息发布者不需要做任何改动,消息接受者需要具有相同的对列名
复制代码
图片描述(最多50字)
// Subscribe Connection nc = Nats.connect(); Dispatcher d = nc.createDispatcher(msg -> { //do something System.out.println("msg: " + new String(msg.getData(),StandardCharsets.UTF_8)); }); d.subscribe("queSub", "queName"); Nats-Spring集成
NATS虽说是一个性能非常好的消息中间键,但是和Spring的集成不是很好。这里提供两个集成的思路
复制代码
CloudFoundry-Community/java-nats Wanlinus/nats-spring java-nats
这是一个由CloudFoundry主导的一个NATS java客户端。提供了区别于官方的nats客户端,支持注解配置,对Spring有比较好的支持,但是此项目已经有1年多没有更新且不支持NATS Streaming。相应用法参考Github,这里不做详细讲解.nats-spring由于开源社区只提供一个简单的NATS Client,缺少对注解和Spring的支持,所以我基于官方jnats客户端写了一个SpringBoot的兼容插件.主要是为了兼容spring boot amqp开发模式,尽量使用注解解决问题开发出来的,所以使用方法类似于在代码中使用@RabbitListener.具体使用方法如下{{git clonecd nats-springmvn clean install}}}
复制代码
cn.wanlinus nats-spring 1.0.0.RELEASE application.yml
spring: nats: urls:
nats://127.0.0.1:4222 @EnableNats @SpringBootApplication public class NatsDemo2Application { public static void main(String[] args) { SpringApplication.run(NatsDemo2Application.class, args); } } @Component public class Foo{ @NatsSubscribe("haha") public void message(Message message) { System.out.println(message.getSubject() + " : " + new String(message.getData())); } }
NATS Streaming介绍
NATS由于不能保证消息的投递正确性和存在其他的缺点,NATS Streaming就孕育而生.他是一个由NATS提供支持的数据流系统,采用Go语言编写,NATS Streaming与核心NATS平台无缝嵌入,扩展和互操作.除了核心NATS平台的功能外,他还提供了以下功能:
NATS Streaming特征
增强消息协议(Enhanced message protocol)
消息/事件持久化(Message/event persistence)
至少一次数据传输(At-least-once-delivery)
Publisher限速(Publisher rate limiting)
Subscriber速率匹配(Rate matching/limiting per subscriber)
按主题重发消息(Historical message replay by subject)
持续订阅(Durable subscriptions)
基本用法
在使用NATS Streaming之前首先要启动服务器,在这里我选择使用docker容器
4222 client默认连接端口
8222 Web端口6222 集群通信端口$ docker run -d -p 4222:4222 -p 8222:8222 -p 6222:6222 nats-streamingSTREAM: Starting nats-streaming-server[test-cluster] version 0.11.0STREAM: ServerID: bzkKJL3jI4KW9Hqb0bC1AeSTREAM: Go version: go1.11Starting nats-server version 1.3.0Git commit [not set]Starting http monitor on 0.0.0.0:8222Listening for client connections on 0.0.0.0:4222Server is readySTREAM: Recovering the state...STREAM: No recovered stateSTREAM: Message store is MEMORYSTREAM: ---------- Store Limits ----------STREAM: Channels: 100 *STREAM: --------- Channels Limits --------STREAM: Subscriptions: 1000 *STREAM: Messages : 1000000 *STREAM: Bytes : 976.56 MB *STREAM: Age : unlimited *STREAM: Inactivity : unlimited *STREAM: ----------------------------------java:
复制代码
// 第一个参数表示clusterId,在启动NATS Streaming容器的时候确定 // 第二个参数表示clientID,连接客户端的唯一标识符 StreamingConnectionFactory cf = new StreamingConnectionFactory ("test-cluster", "bar"); //设置Nats服务器地址和端口,默认是nats://127.0.0.1:4222 cf.setNatsConnection(Nats.connect("nats://127.0.0.1:4222")); StreamingConnection sc = cf.createConnection(); Publish: sc.publish("foo", "Hello World".getBytes());
Subscribe:
复制代码
sc.subscribe("foo", msg -> { System.out.println(new String(msg.getData(), StandardCharsets.UTF_8)); }, new SubscriptionOptions.Builder() .durableName("aa") .deliverAllAvailable().build()); 在使用NATS Streaming的时候需要注意订阅主题不支持通配符,在订阅消息时传入MessageHandler函数是接口实现和SubscriptionOptions对象.MessageHandler提供消息回调处理, SubscriptionOptions用于设置订阅选项,比如设置Queue, durableName, ack等。
Streaming-Spring集成作为一款优秀的消息中间件,却没有对Spring做集成,这是非常的可惜的事情.所以为了在工作中方便的使用他,我开发了一个很小的插件.虽然还有很大的改进空间,不过在公司的项目中却能够很好的运行.他开发思路和nats-spring差不多,所以使用方式也是大同小异,具体如下:{{git clone https://github.com/wanlinus/na ... g.gitcd nats-streaming-springmvn clean install}}}
复制代码
cn.wanlinus nats-streaming-spring 1.0.0-SNAPSHOT application.yml
spring: nats: streaming: nats-url: nats://127.0.0.1:4222 cluster-id: test-cluster @EnableNatsStreaming @SpringBootApplication public class StreamingDemoApplication { public static void main(String[] args) { SpringApplication.run(StreamingDemoApplication.class, args); } //发布消息只需要注入StreamingConnection @Autowired private StreamingConnection sc; public void sendMsg(){ sc.publish("foo", "publish message".getBytes()) } } @Service public class A { @Subscribe(value = "foo", durableName = "dname", queue = "queue") public void asd(Message message) throws IOException { System.out.println(new String(message.getData(), StandardCharsets.UTF_8)); } }
两个插件由于是为了结合项目所写的,所以里面有些部分并不通用。后续的开发中我将会继续进行抽象和改进。
复制代码
欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!
高性能消息中间件——NATS相关推荐
- nsqlookupd:高性能消息中间件 NSQ 解析
本文分享自华为云社区<高性能消息中间件 NSQ 解析-nsqlookupd 实现细节介绍>,原文作者:aoho . 本篇将会结合源码介绍 nsqlookupd 的实现细节.nsqlooku ...
- 高性能消息中间件 nsq 解析-介绍
随着互联网技术在各行各业的应用高速普及与发展,各层应用之间调用关系越来越复杂,架构.开发.运维成本越来越高,高内聚.低耦合.可扩展.高可用已成为了行业需求. 一提到消息队列 MQ(Message Qu ...
- 流行的通讯库/消息中间件
网络上各种各样的通讯中间件/MQ多不胜数.具作者所知,比较有名的有ACE.ICE.Boost::ASIO.MSMQ.ActiveMQ.RabbitMQ.ZeroMQ等等. 其中ACE.ICE是经典,网 ...
- ActiveMQ;RabbitMQ;ZeroMQ
中间件类型: Embedded middleware: As the name suggests, this typeof middleware handles embedded applicatio ...
- 简历上的“熟练掌握 RPC”,到底是个什么水平?
最近一朋友和我吐槽,说看到几个不错的简历,但一面试发现水分太大,让我想起去年面的一个高级开发,简历上写着"熟练掌握 RPC 框架",我就试探着问了几个问题"大概说下 RP ...
- 读者吐槽:Go 面试总被问到 RPC
最近一朋友和我吐槽,说看到几个不错的简历,一面试水分太高,这让我想起去年面的一个高级开发,简历上写着"熟练掌握 RPC 框架",我就试探着问了几个问题"大概说下 RP ...
- 是Dubbo不香了吗?阿里为啥又搞一套Spring Cloud Alibaba?
11 月 11 日零点刚过 26 秒,天猫双十一订单峰值达到 58.3 万笔/秒(如丝般顺滑),30分钟后双十一总成交额突破 3723 亿. 阿里内部是如何应对双十一这种高并发.大流量的场景的可能是很 ...
- MySQL索引的底层数据结构衍变史
原文链接,本文重新排版优化 目录 1. MySQL为什么要建立索引? 2. 可选数据结构 2.1 Hash结构(自适应哈希索引) 2.2 有序数组 2.3 二叉树结构 2.3.1 二叉树的增删(简单举 ...
- Artemis架构解析
目录 前言 1.Artemis Broker 1.1 外部工具与接口 1.1.1 命令行工具 1.1.2 RESTful API 1.1.3 JMX 1.1.4 管理控制页面 1.2 Artemis核 ...
最新文章
- get_headers()请求https报错解决思路
- 快速傅里叶变换(FFT)算法【详解】
- Oracle数据库实例的创建、删除、修改
- linux内核 header.s,Linux启动代码header.S研究
- java 同步方式 lock_Java的同步锁(Lock)
- Invalid character found in the request target. The valid characters are defined in RFC 7230 and RFC
- core和node开发小程序_node+微信小程序实现商城案例
- Java小知识-----Map 按Key排序和按Value排序
- redis笔记1---基础
- Red5开发第一步-Hello World
- 好文章推荐 数据库mysql
- CENTOS6.6上搭建单实例ORACLE12C
- 生信识图之 点图进阶-3(MA)
- 笔记本电脑数据怎么恢复?笔记本电脑数据恢复用什么工具?
- Linux服务器需要安装代理软件EPS(agent)数据库
- Xilinx基于PCIE的部分重配置实现(一)
- 给寸照换底色(抠头发)
- 阅文集团 php,腾讯开源|腾讯与阅文技术合作 微服务框架Tars再添PHP
- 实现不同海拔高度空气参数自由
- n76e003引脚图_n76e003at20数据手册
热门文章
- RN子组件获取redux数据
- SQL横表与纵表互转
- Helper Devise: could not find the `Warden::Proxy` instance on request environment
- win7 蓝屏信息获取和处理
- caffe学习系列(1):图像数据转换成db(leveldb/lmdb)文件
- [Java] 获取本月周次和日期时间段信息
- 整理90部好看的经典喜剧片
- iOS开发 - 不进入待机(屏幕保持唤醒)---UIApplication学习
- ​4种实现多列布局css
- linux下 apache启动、停止、重启命令