这段时间,群里的gofun大增,讨论了nsq在集群使用的使用问题。这里简单整理了一下,希望有所帮助。

作为实时的分布式消息处理平台,nsq设计的目的是用来大规模地处理每天数以十亿计级别的消息。
由于具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。nsq基本是go开发服务端的消息中间件首选。
如果您在使用过程中,发现nsq的consumer无法接受到集群消息,或者想知道consumer与nsqd之间的拓扑关系,可以看看。

##组件构成
nsq有三个组件以及辅助的几个工具构成。
nsqd
nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。
它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。

  • 服务启动后有两个端口:一个给客户端,另一个是 HTTP API。还能够开启HTTPS。
  • 同一台服务器启动多个nsqd,要注意端口和数据路径必须不同,包括:–lookupd-tcp-address、 -tcp-address、–data-path。
  • 删除topic、channel需要http api调用。

nsqlookupd
nsqlookupd 是守护进程,负责管理拓扑信息并提供最终一致性的发现服务。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。

  • 该服务运行后有两个端口:TCP 接口,nsqd 用它来广播;HTTP 接口,客户端用它来发现和管理。
  • 在生产环境中,为了高可用,最好部署三个nsqlookupd服务。

nsqadmin
nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。
运行后,能够通过4171端口查看并管理topic和channel。

  • 通常只需要运行一个。

utilities
常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq 。目前在apps目录下。

基本要点

Topic和Channel
官方有个非常漂亮的动态图,展示了一个topic对应多个channel的效果

表达了三个含义:

  • 没有router
    对于消息中间件,话题(topic)和通道(channel)是非常基本的,他们是1:N 的关系。
    相对于RabitMQ,NSQ没有router这一层,功能也简化了不少,因此运维非常容易上手。

  • 消费者对应Channel
    如果channel没有消费,消息将会保留。如果同一个channel有多个消费者,则会轮训,按序分配给就绪(当前无处理任务)的消费者。因此,多消费者情况下,无法保证有序执行。

  • 存储
    Topic和Channel缓冲的数据相互独立,防止缓慢消费者造成对其他通道造成积压(同样适用于话题级别)。

消息创建与接收

  • 发布者
    消息发布,只能面向具体的nsqd服务进行。在API中对应的是nsq.Producer,直接初始化,就可以用了,非常简单:
    config := nsq.NewConfig()p, err := nsq.NewProducer("127.0.0.1:4150", config) if err != nil {panic(err)}//发布一条消息p.Publish("test", []byte(time.Now().String()))

代码中有两个含义非常重要:

  1. 一个topic的发布者只对应一个具体的NSQD,但可以多个发布者同时向一个NSQD发送消息,他们是N:1的关系。
  2. NSQD与topic是1:N的关系。

代码中的config是连接配置,作为发布者,不用刻意修改,在集群中足够使用。

  • 消费者
    消费者的理解要复杂一些,集群中最容易碰到无法接受到多节点消息的问题。结合官方多个文档及踩过的坑,需要注意:
  1. consumer要接收消息,是要连接到具体的nsqd服务的。通常我们能通过封装好的方法,基于lookupd服务来获取所有的nsqd服务地址并连接。
  2. 一个消费者订阅的topic分布在哪些nsqd服务中,则会直接连接。**nsqd之间是绝对不会互传topic的具体数据的。**下图描绘了consumer与nsqd的关系:
  3. 当多个nsqd服务都有相同的topic的时候,consumer要修改默认设置config.MaxInFlight才能连接。
  4. consumer与topic没有直接联系,而是通过具体的channel接受数据。如果consumer退出,channel不会自动删除。 如果不再需要,需要通过http端口删除channel,否则很可能会导致磁盘空间不足。

只要注意这几点,就很容易写出基本符合业务的代码:

    config:=nsq.NewConfig()//最大允许向两台NSQD服务器接受消息,默认是1,要特别注意config.MaxInFlight=2c1, err1 := nsq.NewConsumer("test", "test-channel1", nsq.NewConfig()) // 新建一个消费者if err1 != nil {panic(err1)}//对消息进行处理的具体方法receive:=func(msg *nsq.Message)error{fmt.Println(string(msg.Body)return nil}// 添加消息处理的具体实现c1.AddHandler(nsq.HandlerFunc(receive)) //将消费者连接到具体的NSQD//if err := c1.ConnectToNSQD("127.0.0.1:4150"); err != nil { // panic(err)//}//或者,如果启动了Lookupd服务,可通过nsqlookupd再分发给具体的nsqdif err := c1.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {panic(err)}

当消费者解析数据抛出错误后,channel会requene,但间隔时间将会越来越长。

##两节点集群示例
以下为同一台设备部署一个nsqlookupd、两个nsqd、一个admin的部署脚本和go代码,可以做一个简单的集群调试:
###nsq下载
进入官网nsq.io的下载链接,获取最新版的执行文件。
对于macos,可以直接brew install
###服务启动与停止
以下脚本,为服务启动、停止脚本,可直接运行。需要注意chmod +x

nsq_start.sh

#服务启动
#注意更改一下 --data-path 所指定的数据存放路径,否则会无法运行。
echo '删除日志文件'
rm -f nsqlookupd.log
rm -f nsqd1.log
rm -f nsqd2.log
rm -f nsqadmin.logecho '启动nsq服务'
nohup nsqlookupd >nsqlookupd.log 2>&1&echo '启动nsqd服务'
nohup nsqd --lookupd-tcp-address=0.0.0.0:4160 -tcp-address="0.0.0.0:4150"  --data-path=~/nsqd1  >nsqd1.log 2>&1&
nohup nsqd --lookupd-tcp-address=0.0.0.0:4160 -tcp-address="0.0.0.0:4152" -http-address="0.0.0.0:4153" --data-path=~/nsqd2 >nsqd2.log 2>&1&echo '启动nsqdadmin服务'
nohup nsqadmin --lookupd-http-address=0.0.0.0:4161 >nsqadmin.log 2>&1&

nsq_shutdown.sh

#nsq_shutdown.sh
#服务停止
ps -ef | grep nsq| grep -v grep | awk '{print $2}' | xargs kill -2

运行后,访问本机:4171端口,就能够通过web页面进行查看:

###运行代码

package mainimport ("github.com/nsqio/go-nsq""time""fmt""utils/waitwraper"
)func main() {var wg waitwraper.WaitWraper//接受消息consume()//分别向不同的服务节点发送消息wg.Wrap(func(){    produce("node1","localhost:4150")})wg.Wrap(func (){produce("node2","localhost:4152")})wg.Wait()
}
func produce(tag string,addr string) {config := nsq.NewConfig()p, err := nsq.NewProducer(addr, config)if err != nil {panic(err)}for {time.Sleep(time.Second*5)p.Publish("test", []byte(tag+":"+time.Now().String()))}
}
func consume() {config := nsq.NewConfig()//注意MaxInFlight的设置,默认只能接受一个节点config.MaxInFlight=2c, err := nsq.NewConsumer("test", "consum", config)if err != nil {panic(err)}hand := func(msg *nsq.Message) error{fmt.Println(string(msg.Body))return nil}c.AddHandler(nsq.HandlerFunc(hand))if err:= c.ConnectToNSQLookupd("localhost:4161");err!=nil{fmt.Println(err)}
}

代码运行后,看到的UI会有点变化:

##小结
最后再次强调一下非常有用的几个细节:

  • nsqd启动时,端口和数据存放要不同。

  • 消息发送必须指定具体的某个nsqd;而消费则可以通过lookupd获取再重定向

  • 消费者接受数据时,要设置 config.MaxInFlight。

  • channel在消费者退出后并不会删除,需要特别注意。如果仅仅是想利用消息中间件解耦服务,不考虑离线数据保存,不妨考虑nats。

  • channel的名字,有很多限制,基本ASSCI字符+数字,以及点号".",下划线"_"。中文、以及空格、冒号":"、横线"-"等都不得出现。

  • channel有多个消费者时,无法保证消息处理的有序性。

golang实战-nsq集群入门与坑相关推荐

  1. eureka 集群失败的原因_Eureka集群的那些坑

    今天遇到一个Eureka集群的一个坑. 问题现场类似是这样的:两台Eureka组成的服务注册中心集群,两台服务提供方server1.server2,两个服务调用方client1.client2. 正常 ...

  2. 二进制安装mysql集群_实战mysql集群搭建(一)--centos7下二进制安装mysql-5.6

    在超哥的帮助下,完成了基于InnoDb数据引擎的mysql数据库集群搭建,实现了主从复制的功能,本篇博文介绍如何使用二进制安装mysql的方法,具体实现步骤如下: 软件使用说明: Liunx系统:ce ...

  3. Linux部署集群入门

    Linux部署集群入门 1.克隆虚拟机 直接复制已有的虚拟机的文件夹复制及可 2.配置网卡和ip地址 2.1配置网卡 2.2 配置ip地址 2.3 重启!!! 3.关闭防火墙 4.关闭seLinux ...

  4. 实战04_redis-cluster集群搭建

    接上一篇:实战_03_Redis基础命令https://blog.csdn.net/weixin_40816738/article/details/99213524 #安装gcc yum instal ...

  5. 企业实战_18_MyCat_ZK集群安装部署

    接上一篇:企业实战_17_MyCat水平扩展_跨分片查询_ER分片 https://gblfy.blog.csdn.net/article/details/100066013 文章目录 一.使用ZK记 ...

  6. Elasticsearch集群扩容踩坑记录

    ES集群扩容构建踩坑总结 文章目录 ES集群扩容构建踩坑总结 @[toc] 需求 配置 参数说明 Data node's cluster uuid diffrent from master node' ...

  7. nacos集群遇到的坑

    记录下搭建nacos集群过程中遇到的坑:(集群机子代号这里列为:nacos-a,nacos-b,nacos-c) 1.  集群搭建,需要建立数据库 nacos_config(该数据库在nacos官网集 ...

  8. 云服务器搭建k8s集群的巨坑,node间网络不通问题

    最近用腾讯云服务器搭建了k8s集群,踩到一个巨坑.现象就是服务正常搭建完毕,各个必须的pod也处于ready状态,但是node不能访问别的node的pod.搭建的示例tomcat的demo也不能通过外 ...

  9. 构建高可用的LVS负载均衡集群 入门篇

    一.LVS简介 LVS是Linux Virtual Server的简称,也就是Linux虚拟服务器, 是一个由章文嵩博士发起的自由软件项目,它的官方站点是www.linuxvirtualserver. ...

最新文章

  1. SecureCRT 6.7.1 注冊机 和谐 破解 补丁 方法
  2. html5制作线路图,HTML5绘制上海地铁线路图
  3. C语言orC++,最大的区别?
  4. Java 多线程(并发)
  5. vue中组件间的传参
  6. js实现kmp算法_数据结构作业之完整KMP算法实现通讯录
  7. 蓄电池单格电压多少伏_直流屏蓄电池电压的常见问题小结
  8. NSGA-Ⅱ算法原理
  9. 计算机毕业设计ssm图书管理系统
  10. 吃豆豆--Java小游戏
  11. stc12c5a60s 超声波HC-SR04测距
  12. 暗色html模板,暗色个人主页网站模板
  13. 计算机软件技术有哪些,1.1 什么是计算机软件技术
  14. Gateway—网关服务
  15. tidymodels搞定二分类资料多个模型评价和比较
  16. mysql drop fulltext_MySQL使用全文索引(fulltext index)
  17. python期末版二版-习题题库
  18. 网易雷火2022秋招岗位(全职)总结
  19. 申银万国交易系统昨瘫痪半小时 沪券商今紧急开会
  20. 2.2nbsp;HOST主桥

热门文章

  1. APP下载链接被微信封杀拦截屏蔽怎么解封在微信中直接点击下载链接直接下载app...
  2. Java:字符输入流、字符输出流
  3. 【DGL教程】第1章 图(Graph)
  4. 迷失lost结局什么意思_LOST的最后结局是什么
  5. 中国行业新分类2008/06/22 12:45 P.M.
  6. 【Swoole】当SWOOLE遇上PHP
  7. HBITMAP与BITMAP 的区别 BMP图像的格式
  8. win10密码忘了怎么办_vivo S7忘了手机密码怎么办?手机怎么解锁?
  9. SSL P2293 暗黑游戏 题目
  10. 软考-信息安全工程师-2