剖析nsq消息队列(一) 简介及去中心化实现原理
分布式消息队列nsq,简单易用,去中心化的设计使nsq
更健壮,nsq
充分利用了go
语言的goroutine
和channel
来实现的消息处理,代码量也不大,读不了多久就没了。后期的文章我会把nsq
的源码分析给大家看。
主要的分析路线如下
- 分析
nsq
的整体框架结构,分析如何做到的无中心化分布式拓扑结构,如何处理的单点故障。 - 分析
nsq
是如何保证消息的可靠性,如何保证消息的处理,对于消息的持久化是如何处理和扩展的。 - 分析
nsq
是如何做的消息的负载处理,即如何把合理的、不超过客户端消费能力的情况下,把消息分发到不同的客户端。 - 分析
nsq
提供的一些辅助组件。
这篇帖子,介绍nsq
的主体结构,以及他是如何做到去中心化的分布式拓扑结构,如何处理的单点故障。
几个组件是需要先大概说一下
nsqd
消息队列的主体,对消息的接收,处理和把消息分发到客户端。
nsqlookupd
nsq
拓扑结构信息的管理者,有了他才能组成一个简单易用的无中心化的分布式拓扑网络结构。
go-nsq
nsq
官方的go语言客户端,基本上市面上的主流编程语言都有相应的客户端在这里
还有可视化的组件nsqadmin
和一些工具像nsq_to_file
、nsq_stat
、等等,这些在后期的帖子里会介绍
使用方式
两种方式一种是直接连接另一种是通过nsqlookupd
进行连接
直连方式
nsqd
是独立运行的,我们可以直接使用部署几个nsqd
然后使用客户端直连的方式使用
例子
目前资源有限,我就都在一台机器上模拟了
启动两个nsqd
./nsqd -tcp-address ":8000" -http-address ":8001" -data-path=./a
./nsqd -tcp-address ":7000" -http-address ":7001" -data-path=./b
正常启动会有类似下面的输出
[nsqd] 2019/08/29 18:42:56.928345 INFO: nsqd v1.1.1-alpha (built w/go1.12.7)
[nsqd] 2019/08/29 18:42:56.928512 INFO: ID: 538
[nsqd] 2019/08/29 18:42:56.928856 INFO: NSQ: persisting topic/channel metadata to b/nsqd.dat
[nsqd] 2019/08/29 18:42:56.935797 INFO: TCP: listening on [::]:7000
[nsqd] 2019/08/29 18:42:56.935891 INFO: HTTP: listening on [::]:7001
简单使用
func main() {adds := []string{"127.0.0.1:7000", "127.0.0.1:8000"}config := nsq.NewConfig()topicName := "testTopic1"c, _ := nsq.NewConsumer(topicName, "ch1", config)testHandler := &MyTestHandler{consumer: c}c.AddHandler(testHandler)if err := c.ConnectToNSQDs(adds); err != nil {panic(err)}stats := c.Stats()if stats.Connections == 0 {panic("stats report 0 connections (should be > 0)")}stop := make(chan os.Signal)signal.Notify(stop, os.Interrupt)fmt.Println("server is running....")<-stop
}type MyTestHandler struct {consumer *nsq.Consumer
}func (m MyTestHandler) HandleMessage(message *nsq.Message) error {fmt.Println(string(message.Body))return nil
}
方法 c.ConnectToNSQDs(adds)
,连接多个nsqd
服务
然后运行多个客户端实现
这时,我们发送一个消息,
curl -d 'hello world 2' 'http://127.0.0.1:7001/pub?topic=testTopic1'
nsqd会根据他的算法,把消息分配到一个客户端
客户端的输入如下
2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:7000) connecting to nsqd
2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:8000) connecting to nsqd
server is running....
hello world 2
但是这种做的话,需要客户端做一些额外的工作,需要频繁的去检查所有nsqd
的状态,如果发现出现问题需要客户端主动去处理这些问题。
总结
我使用的客户端库是官方库 go-nsq
,使用直接连nsqd
的方式,
- 如果有
nsqd
出现问题,现在的处理方式,他会每隔一段时间执行一次重连操作。想去掉这个连接信息就要额外做一些处理了。 - 如果对
nsqd
进行横向扩充,只能是自己民额外的写一些代码调用ConnectToNSQDs
或者ConnectToNSQD
方法
去中心化连接方式 nsqlookupd
官方推荐使用连接nsqlookupd
的方式,nsqlookupd
用于做服务的注册和发现,这样可以做到去中心化。
图中我们运行着多个nsqd
和多个nsqlookupd
的实例,客户端去连接nsqlookupd
来操作nsqd
例子
我们要先启动nsqlookupd
,为了演示方便,我启动两个nsqlookupd
实例, 三个nsqd
实例
./nsqlookupd -tcp-address ":8200" -http-address ":8201"
./nsqlookupd -tcp-address ":7200" -http-address ":7201"
为了演示横向扩充,先启动两个,客户端连接后,再启动第三个。
./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
./nsqd -tcp-address ":7000" -http-address ":7001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./b
--lookupd-tcp-address
用于指定lookup
的连接地址
客户端简单代码
package mainimport ("fmt""os""os/signal""time""github.com/nsqio/go-nsq"
)func main() {adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}config := nsq.NewConfig()config.MaxInFlight = 1000config.MaxBackoffDuration = 5 * time.Secondconfig.DialTimeout = 10 * time.SecondtopicName := "testTopic1"c, _ := nsq.NewConsumer(topicName, "ch1", config)testHandler := &MyTestHandler{consumer: c}c.AddHandler(testHandler)if err := c.ConnectToNSQLookupds(adds); err != nil {panic(err)}stop := make(chan os.Signal)signal.Notify(stop, os.Interrupt)fmt.Println("server is running....")<-stop
}type MyTestHandler struct {consumer *nsq.Consumer
}func (m MyTestHandler) HandleMessage(message *nsq.Message) error {fmt.Println(string(message.Body))return nil
}
方法ConnectToNSQLookupds
就是用于连接nsqlookupd
的,但是需要注意的是,连接的是http
端口7201
和8201
,库go-nsq
是通过请求其中一个nsqlookupd
的 http 方法http://127.0.0.1:7201/lookup?topic=testTopic1
来得到所有提供topic=testTopic1
的nsqd
列表信息,然后对所有的nsqd进行
连接,
2019/08/30 13:47:26 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:7000) connecting to nsqd
2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) connecting to nsqd
目前我们已经连接了两个。
我们演示一下橫向扩充,启动第三个nsqd
./nsqd -tcp-address ":6000" -http-address ":6001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./c
这里会有一个问题,当我启动了一个亲的nsqd
但是他的topic是空的,我们需指定这新的nsqd
处理哪些topic。
我们可以用nsqadmin
查看所有的topic
./nsqadmin --lookupd-http-address=127.0.0.1:8201 --lookupd-http-address=127.0.0.1:7201
然后去你的nsqd
上去建topic
curl -X POST 'http://127.0.0.1:6001/topic/create?topic=testTopic1'
当然也可以自己写一些自动化的角本
查看客户端的日志输出
2019/08/30 14:56:01 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 14:56:01 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:6000) connecting to nsqd
已经连上我们的新nsqd
了
我手动关闭一个nsqd
实例
客户端的日志输出已经断开了连接
2019/08/30 15:04:20 ERR 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) IO error - EOF
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) beginning close
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) readLoop exiting
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) breaking out of writeLoop
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) writeLoop exiting
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) finished draining, cleanup exiting
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) clean close complete
2019/08/30 15:04:20 WRN 1 [testTopic1/ch1] there are 2 connections left alive
并且nsqd
和nsqlookupd
也断开了连接,客户端再次从nsqlookupd
取所有的nsqd
的地址时得到的总是可用的地址。
去中心化实现原理
nsqlookupd
用于管理整个网络拓扑结构,nsqd用他实现服务的注册,客户端使用他得到所有的nsqd服务节点信息,然后所有的consumer端连接
实现原理如下,
nsqd
把自己的服务信息广播给一个或者多个nsqlookupd
客户端
连接一个或者多个nsqlookupd
,通过nsqlookupd
得到所有的nsqd
的连接信息,进行连接消费,- 如果某个
nsqd
出现问题,down机了,会和nsqlookupd
断开,这样客户端
从nsqlookupd
得到的nsqd
的列表永远是可用的。客户端
连接的是所有的nsqd
,一个出问题了就用其他的连接,所以也不会受影响。
转载于:https://www.cnblogs.com/li-peng/p/11435083.html
剖析nsq消息队列(一) 简介及去中心化实现原理相关推荐
- 技术动态 | 去中心化知识图谱协作平台建设实践
转载公众号 | DataFunTalk 文章作者:Epik 铭识协议 出品平台:DataFunTalk 导读:1月10日,由EpiK铭识协议主办的"2021开源知识运动"主题活动 ...
- 去中心化保证金交易平台Lever完成60万美元种子轮融资,NGC Venture等领投
官方消息,基于AMM的去中心化保证金交易平台Lever宣布完成60万美元种子轮融资,NGC Venture,ArkStream Capital,LD Capital和AU21 Capital领投,DA ...
- Redis集群——去中心化模式
去中心化模式特点 之前介绍的主从模式和哨兵模式都只有一个master主节点,如果写操作并发比较大时,这两个模式就会堵塞.这时就可以使用集群化模式,也称之为去中心化模式,其特点是多master和多sla ...
- Kafka消息队列组件简介
Kafka消息队列组件简介 消息队列的工作模式 消息队列的优点 Kafka架构 生产者写入流程 消费者组 消息队列的工作模式 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 点对点模型通 ...
- 去中心化借贷Compound项目简介
自从有了借贷,经济便开始以前所未有的速度增长.借贷体系需要某种形式的信任和中介,当前金融中介的角色由银行承担,但是我们有理由质疑当前借贷系统的各种缺陷,如强制性的筹资标准.获取银行服务的地理位置或法律 ...
- 去中心化数字身份DID简介——一、基本概念
本人最近正在研究数字身份DID技术,该技术在区块链领域目前还比较冷门,并没有什么实际的应用案例,但是数字身份的应用场景广阔,是一个必然会火起来的技术.于是打算写几篇文章,好好讲一下其中涉及到的概念,技 ...
- 区块链要去中心化么?
本文摘自< 深度探索区块链:Hyperledger技术与应用 >,原文发布在华章计算机. 区块链(Blockchain)技术自身仍然在飞速发展中,目前还缺乏统一的规范和标准.Wikiped ...
- 叶胜超区块链:Aelf(ELF)---去中心化的云计算区块链网络!
01 项目简介 Aelf代币简称为ELF,中文被戏称为:阿姨父或者姨父,和它对应的一个币被称为"阿姨",也就是AE, ælf是一个去中心化的云计算区块链网络,具有高性能,资源隔离和 ...
- 获国际架构顶会ATC2021最佳论文!Fuxi2.0去中心化的调度架构详解
简介: 近日,在国际体系架构顶会USENIX ATC2021上,阿里云飞天伏羲团队与香港中文大学合作的一篇论文<Scaling Large Production Clusters with Pa ...
最新文章
- NSwagStudio for Swagger Api
- ceph-deploy mod add_我的世界基座(Pedestals)Mod
- 阿里巴巴开源项目 Druid 负责人温少访谈
- 服务器指纹识别之 DNS TXT
- elk示例-精简版2
- android roboto字体下载,Android字体设置及Roboto字体使用方法
- oracle表格颜色,如何在oracle中使用光标更新特定颜色
- java上机实验答案_java上机实验答案与解析
- 【Elasticsearch】 6 种 能使 es 挂掉的方法
- JAVA泛型只能用引用类型_Java泛型和设计模式:不参数化对泛型类型的引用总是一件坏事吗?...
- linux系统硬盘数据恢复软件下载,R-Linux|R-Linux(linux数据恢复软件)下载 v5.1中文免费版 - 121下载站...
- paip.提升效率--批量变量赋值 “多元”赋值
- 威纶触摸屏与电脑连接_威纶通TK6070IP触摸屏下载线MT6071IE触摸屏编程线连接电脑USB线...
- 【2019年02月11日】股息率分红最高排名
- 计算机英语中CISC的汉语意思是,CISC是什么意思_CISC在线翻译_读音_用法_例句_含义-查字典网...
- 人工智能基础部分11-图像识别实战(网络层联想记忆,代码解读)
- 煤矿用计算机,煤矿安全生产中计算机的运用
- 赛灵思 PL 和 PS IBIS 模型解码器
- 小米路由器显示网络未连接到服务器,小米路由器4不能上网了如何解决?小米路由器4无法上网的解决方法汇总介绍...
- gdb调试之快速入门
热门文章
- C#基础知识学习——特性(Conditional特性、Obsolete特性、自定义特性)(十六)
- 国内期刊投稿用 CTeX(CTeX_2.9.2.164_Full)
- 对一次微信升级事件的自我反省:思维方式决定解决问题能力
- SparkSQL in中使用子查询耗时高如何优化
- java程序员标准指法_作为一个程序员,标准指法盲打都不会的该反省一下自己了...
- 基于驱动框架编写驱动代码
- java使用阿里云oss sdk
- 机器学习 决策树算法
- java微信二维码第三方后台登陆实现 ( 一 )
- Zcoin 项目评级:BB ,展望稳定 | TokenInsight