本文分享自华为云社区《高性能消息中间件 NSQ 解析-nsqlookupd 实现细节介绍》,原文作者:aoho 。

本篇将会结合源码介绍 nsqlookupd 的实现细节。nsqlookupd 主要流程与nsqd 执行逻辑相似,区别在于具体运行的任务不同。

nsqlookupd是nsq管理集群拓扑信息以及用于注册和发现nsqd服务。所以,也可以把nsqlookupd理解为注册发现服务。当nsq集群中有多个nsqlookupd服务时,因为每个nsqd都会向所有的nsqlookupd上报本地信息,因此nsqlookupd具有最终一致性。

入口函数

在 nsq/apps/nsqlookupd/main.go 可以找到执行入口文件。

// 位于apps/nsqlookupd/main.go:45
func main() {prg := &program{}if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {logFatal("%s", err)}
}func (p *program) Init(env svc.Environment) error {if env.IsWindowsService() {dir := filepath.Dir(os.Args[0])return os.Chdir(dir)}return nil
}func (p *program) Start() error {opts := nsqlookupd.NewOptions()flagSet := nsqlookupdFlagSet(opts)...
}

同样,通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqlookupd 实例。

// 位于 apps/nsqlookupd/main.go:80
options.Resolve(opts, flagSet, cfg)nsqlookupd, err := nsqlookupd.New(opts)if err != nil {logFatal("failed to instantiate nsqlookupd", err)}p.nsqlookupd = nsqlookupdgo func() {err := p.nsqlookupd.Main()if err != nil {p.Stop()os.Exit(1)}}()

初始化配置参数(优先级:flagSet-命令行参数 > cfg-配置文件 > opts-默认值),开启协程,进入 nsqlookupd.Main() 主函数。

监听请求

我们来看下 nsqlookupd 是如何监听请求的,代码实现如下:

// 位于 nsqlookupd/nsqlookupd.go:53
func (l *NSQLookupd) Main() error {ctx := &Context{l}exitCh := make(chan error)var once sync.OnceexitFunc := func(err error) {once.Do(func() {if err != nil {l.logf(LOG_FATAL, "%s", err)}exitCh <- err})}tcpServer := &tcpServer{ctx: ctx}l.waitGroup.Wrap(func() {exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))})httpServer := newHTTPServer(ctx)l.waitGroup.Wrap(func() {exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))})err := <-exitChreturn err
}

开启 goroutine 执行 tcpServer, httpServer,分别监听 nsqd, nsqadmin 的客户端请求。

处理请求

// 位于 internal/protocol/tcp_server.go:17
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {logf(lg.INFO, "TCP: listening on %s", listener.Addr())for {clientConn, err := listener.Accept()if err != nil {if nerr, ok := err.(net.Error); ok && nerr.Temporary() {logf(lg.WARN, "temporary Accept() failure - %s", err)runtime.Gosched()continue}// theres no direct way to detect this error because it is not exposedif !strings.Contains(err.Error(), "use of closed network connection") {return fmt.Errorf("listener.Accept() error - %s", err)}break}go handler.Handle(clientConn)}logf(lg.INFO, "TCP: closing %s", listener.Addr())return nil
}

TCPServer 循环监听客户端请求,建立长连接进行通信,并开启 handler 处理每一个客户端 conn。

装饰 http 路由

httpServer 通过 http_api.Decorate 装饰器实现对各 http 路由进行 handler 装饰,如加 log 日志、V1 协议版本号的统一格式输出等;

func newHTTPServer(ctx *Context) *httpServer {log := http_api.Log(ctx.nsqlookupd.logf)router := httprouter.New()router.HandleMethodNotAllowed = truerouter.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)s := &httpServer{ctx:    ctx,router: router,}router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))// v1 negotiaterouter.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
}

处理客户端命令

tcp 解析 V1 协议,内部协议封装的 prot.IOLoop(conn) 进行循环处理客户端命令,直到客户端命令全部解析处理完毕才关闭连接。

var prot protocol.Protocolswitch protocolMagic {case "  V1":prot = &LookupProtocolV1{ctx: p.ctx}default:protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))clientConn.Close()p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",clientConn.RemoteAddr(), protocolMagic)return}err = prot.IOLoop(clientConn)

执行命令

通过内部协议进行 p.Exec(执行命令)、p.SendResponse(返回结果),保证每个 nsqd 节点都能正确的进行服务注册(register)与注销(unregister),并进行心跳检测(ping)节点的可用性,确保客户端取到的 nsqd 节点列表都是最新可用的。

for {line, err = reader.ReadString('\n')if err != nil {break}line = strings.TrimSpace(line)params := strings.Split(line, " ")var response []byteresponse, err = p.Exec(client, reader, params)if err != nil {ctx := ""if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {ctx = " - " + parentErr.Error()}_, sendErr := protocol.SendResponse(client, []byte(err.Error()))if sendErr != nil {p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)break}continue}if response != nil {_, err = protocol.SendResponse(client, response)if err != nil {break}}}conn.Close()

nsqlookupd 服务同时开启 tcp 和 http 两个监听服务,nsqd 会作为客户端,连上 nsqlookupd 的 tcp 服务,并上报自己的 topic 和 channel 信息,以及通过心跳机制判断 nsqd 状态;还有个 http 服务提供给 nsqadmin 获取集群信息。

小结

本文主要介绍 nsqlookupd 的实现,nsqlookupd 同样是一个守护进程,负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题( topic )的生产者,并且 nsqd 节点广播话题(topic)和通道( channel )信息。有两个接口: TCP 接口, nsqd 用它来广播。 HTTP 接口,客户端用它来发现和管理。

下一篇文章,将会继续介绍 nsq 中其他模块实现的细节。

点击关注,第一时间了解华为云新鲜技术~

nsqlookupd:高性能消息中间件 NSQ 解析相关推荐

  1. 高性能消息中间件 nsq 解析-介绍

    随着互联网技术在各行各业的应用高速普及与发展,各层应用之间调用关系越来越复杂,架构.开发.运维成本越来越高,高内聚.低耦合.可扩展.高可用已成为了行业需求. 一提到消息队列 MQ(Message Qu ...

  2. 高性能消息中间件——NATS

    在介绍NATS之前先了解下什么是分布式系统和消息中间件对于分布式系统的定义,一直以来我都没有找到或者想到特别简练而又合适的定义,这里引用一下Distributed System Concepts an ...

  3. Kafka设计解析(六)- Kafka高性能关键技术解析

    http://www.infoq.com/cn/articles/kafka-analysis-part-6 宏观架构层面 利用Partition实现并行处理 Partition提供并行处理的能力 K ...

  4. Artemis架构解析

    目录 前言 1.Artemis Broker 1.1 外部工具与接口 1.1.1 命令行工具 1.1.2 RESTful API 1.1.3 JMX 1.1.4 管理控制页面 1.2 Artemis核 ...

  5. 基于supersocket、C#对JT808协议进行解析构建gps监控平台服务端

    1)为什么使用SuperSocket? gps监控平台.车联网.物联网系统中GPRS网络数据的并发通讯和处理解析,主要功能有socket的UDP和TCP链路建立和维持,网络数据协议包接收与解析,分发上 ...

  6. Golang入门教程(十七)Linux/Windows下快速搭建和配置NSQ

    前言 NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub,其当前最新版本是0.3.1版.NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消 ...

  7. 【clickhouse】clickhouse 解析器

    文章目录 1.概述 1.概述 ClickHouse 中有两种类型的解析器, full parser 和 data format parser ,前者是一个完整的 SQL 解析器,后者是一个高性能的流解 ...

  8. 高性能ORM 框架之 MySqlSugar

    mysql 3.X API地址:  http://www.cnblogs.com/sunkaixuan/p/5987308.html MySqlSugar 1.5 API 一.介简 SqlSugar ...

  9. python语法详解_关于python:NLTK中解析的英语语法

    是否有可以立即使用并可以在NLTK中使用的即用型英语语法? 我搜索了使用NLTK进行解析的示例,但似乎我必须在解析句子之前手动指定语法. 非常感谢! 您可以看一下pyStatParser,这是一个简单 ...

最新文章

  1. 下载nodejs的mysql安装包下载_nodejs安装包下载|nodejs(javascript运行环境) v5.3.0 最新稳定版 - 软件下载 - 绿茶软件园|33LC.com...
  2. Apress水果大餐——移动开发
  3. python要不要装pycharm-Python和pyCharm安装
  4. python之父去面试-前端两年月入30K,高频面试题整理(含答案)
  5. JProfiler 11中文版
  6. Dubbo负载均衡机制
  7. ustc小道消息20211225
  8. 【转】WCF请求应答(Request-Reply)、单向操作(One-Way)、回调操作(Call Back)
  9. HTMl中3d变换卡片制作方法,CSS如何实现卡片3D翻转效果
  10. UNIQLO 11月销售额逆市上涨7.9%
  11. hadoop处理excel数据
  12. Android11安装谷歌,Android 11正式版
  13. 夜神模拟器adb连接电脑
  14. Jenkins 打包项目出错汇总(持续)
  15. 类型多样的人物ps后期素材素材,速来收藏
  16. excel中sheet不见了,怎么办
  17. 使用TCPDF插件生成pdf以及pdf的中文处理
  18. CST学习------网格类型及设置方法和技巧
  19. poky: PACKAGECONFIG的用法
  20. POJ-29932996

热门文章

  1. async 与 await 的用法详解
  2. es6 super 关键字
  3. goland创建一个不限长度的字节切片_Go语言3 : 切片
  4. 视觉SLAM笔记(63) RGB-D 稠密建图
  5. python3 一年中的天数 时间转化为北京时_Python3?环境搭建
  6. bootstrap4高度占一半_减肥选对了碳水,意味着成功了一半
  7. 2017.10.6 Java命名规范及使用情况
  8. 【CSS3】 理解CSS3 transform中的Matrix(矩阵)
  9. HTML5要点(四)对象全整理
  10. java设计模式2-观察者模式