nsq的源码比较简单,值得一读,特别是golang开发人员,下面重点介绍nsqd,nsqd是nsq的核心,其他的都是辅助工具,看完这篇文章希望你能对消息队列的原理和实现有一定的了解。

nsqd是一个守护进程,负责接收,排队,投递消息给客户端,并不保证消息的严格顺序,nsqd默认监听一个tcp端口 (4150) 和一个http端口 (4151) 以及一个可选的https端口

对订阅了同一个topic的同一个channel的消费者使用负载均衡策略,其实就是多个协程消费同一个channel

只要channel存在,即使没有该channel的消费者,也会将生产者的message缓存到队列(内存队列和磁盘队列)中,当有新的消费者产生后,就开始消费队列中的所有消息

保证队列中的 message 至少会被消费一次(在进程意外退出的时候这点都保证不了),并不能保证成功消费一次,即使 nsqd退出,也会将队列中的消息暂存磁盘上(进程退出的时候会将缓存中的消息存到磁盘上,意外情况如掉电就不行了,缓存中的消息就没有机会存盘而丢失,在实战中一般不会使用缓存队列即内存buffer为0,全部使用磁盘队列)

限定内存占用,能够配置nsqd中每个channel队列在内存中缓存的message数量,一旦channel的buffer写满,就将message写到磁盘中,这点使用golang select的优先级功能,default优先级最低

topic,channel 一旦建立,将会一直存在,要及时在管理台或者用代码清除无效的 topic 和 channel,避免资源的浪费,每个topic和channel都有独立的协程处理自身的消息,默认的buffer和其他的一些信息

nsq消息没有备份,一旦出现进程意外情况退出,可能会出现消息丢失,如没有消费成功的消息,写入文件但没有真正落盘的消息,这种意外情况很难杜绝,像意外退出这种情况kafka,redis等都会遇到这样的问题,最后都会采用一个折中的策略,定时将数据落盘

//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊汉
type Topic struct {// 64bit atomic vars need to be first for proper alignment on 32bit platformsmessageCount uint64  //消息总数量messageBytes uint64  //消息总长度sync.RWMutexname              string   //topic namechannelMap        map[string]*Channel //保存topic下面的所有channelbackend           BackendQueue //磁盘队列memoryMsgChan     chan *Message //内存队列startChan         chan intexitChan          chan intchannelUpdateChan chan intwaitGroup         util.WaitGroupWrapperexitFlag          int32  //退出标记idFactory         *guidFactory //生成msg id的工厂ephemeral      bool  //是否临时topicdeleteCallback func(*Topic)  //删除topic方法指针deleter        sync.Oncepaused    int32   //暂停标记,1暂停, 0正常pauseChan chan intctx *context
}

Topic创建

nsqd用map[string]*Topic来保存所有topic,producter在发消息的时候回指定topic,nsqd在收到消息后会判断topic是否存在,不存在就会自动创建,每创建一个新的topic就会启动一个协程,用于处理topic相关的消息,如将内存/磁盘中的消息复制给topic中的每个channel、channel数量变化、channel暂停、topic退出

消息结构

// Command represents a command from a client to an NSQ daemon//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊汉
type Command struct {Name   []byte   //命令名称,可选:IDENTIFY、FIN、RDY、REQ、PUB、MPUB、DPUB、NOP、TOUCH、SUB、CLS、AUTHParams [][]byte //不同的命令做不同解析,涉及到topic的,Params[0]为topic nameBody   []byte   //消息内容
}// WriteTo implements the WriterTo interface and
// serializes the Command to the supplied Writer.
//
// It is suggested that the target Writer is buffered
// to avoid performing many system calls.
func (c *Command) WriteTo(w io.Writer) (int64, error) {var total int64var buf [4]byten, err := w.Write(c.Name)  //命名名称,nsqd根据这个名称执行相关功能total += int64(n)if err != nil {return total, err}for _, param := range c.Params {n, err := w.Write(byteSpace)  //空格total += int64(n)if err != nil {return total, err}n, err = w.Write(param)  //参数total += int64(n)if err != nil {return total, err}}n, err = w.Write(byteNewLine)   //空行ntotal += int64(n)if err != nil {return total, err}//消息内容if c.Body != nil {   bufs := buf[:]binary.BigEndian.PutUint32(bufs, uint32(len(c.Body)))n, err := w.Write(bufs)   //消息长度4字节total += int64(n)if err != nil {return total, err}n, err = w.Write(c.Body)  //消息内容total += int64(n)if err != nil {return total, err}}return total, nil
}

nsqd收到这个结构做解析,就能知道命令名称(干什么),topic name,消息内容等,不同的命令,命令参数不一样

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {if bytes.Equal(params[0], []byte("IDENTIFY")) {return p.IDENTIFY(client, params)}err := enforceTLSPolicy(client, p, params[0])if err != nil {return nil, err}switch {case bytes.Equal(params[0], []byte("FIN")):return p.FIN(client, params)case bytes.Equal(params[0], []byte("RDY")):return p.RDY(client, params)case bytes.Equal(params[0], []byte("REQ")):return p.REQ(client, params)case bytes.Equal(params[0], []byte("PUB")):return p.PUB(client, params)case bytes.Equal(params[0], []byte("MPUB")):return p.MPUB(client, params)case bytes.Equal(params[0], []byte("DPUB")):return p.DPUB(client, params)case bytes.Equal(params[0], []byte("NOP")):return p.NOP(client, params)case bytes.Equal(params[0], []byte("TOUCH")):return p.TOUCH(client, params)case bytes.Equal(params[0], []byte("SUB")):return p.SUB(client, params)case bytes.Equal(params[0], []byte("CLS")):return p.CLS(client, params)case bytes.Equal(params[0], []byte("AUTH")):return p.AUTH(client, params)}return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

Topic收到消息

nsqd收到上面这个结构,解析之后,就会执行相关功能,我们以PUB命令为例:

1:读到空行处,能拿到命令名称和参数,命令名称=PUB,命令参数为topicName

2:检查topicName是否有效

3:获取消息内容长度,读取4个字节

4:分配对应内容长度空间,读取对应长度字节存入

5:获取topicName信息,没有就创建

6:构造消息结构体nsqd.Message,自动生成消息id

7:将消息提交给对应的topic,Topic.PutMessage

8:将消息写入topic对应的内存消息通道,内存消息通道默认大小为10000,如通道满了则写入磁盘

Topic中的消息分发给channel

在创建topic的时候回启动一个协程处理各种消息,其中就包括消费topic中的消息,topic只是将消息投递到其中的每个channel中,如topic下面有10个channel,则要复制9个nsqd.Message,每个channel一个nsqd.Message,但是消息id和消息内容是一样的,消息内容并不会被复制,topic收到消息将消息分发给channel就完事了,消息怎么发给消费者,由channel负责

type Channel struct {// 64bit atomic vars need to be first for proper alignment on 32bit platformsrequeueCount uint64   //重新入队数量messageCount uint64   //消息数量timeoutCount uint64   //超时数量,已经消费,但没有反馈结果,会重新加入队列,messageCount不会自增sync.RWMutextopicName string    //topic namename      string    //channel namectx       *contextbackend BackendQueue  //将消息写入磁盘的队列,维护磁盘消息的读写memoryMsgChan chan *Message //内存消息队列,通道buffer默认10000exitFlag      int32   //退出标记,1表示退出,0没有退出exitMutex     sync.RWMutex// state trackingclients        map[int64]Consumer   //连接到这个topic-channel的所有clientpaused         int32  //暂停标记,0不暂停,1暂停,暂停就不会往这个channel中copy消息ephemeral      bool   //临时channel标记,临时channel不会存到文件中deleteCallback func(*Channel) //用于从topic中删除channeldeleter        sync.Once  // Stats trackinge2eProcessingLatencyStream *quantile.Quantile// TODO: these can be DRYd updeferredMessages map[MessageID]*pqueue.Item  //延迟消息map,方便查找deferredPQ       pqueue.PriorityQueue  //延迟消息队列deferredMutex    sync.MutexinFlightMessages map[MessageID]*Message  //消费中的消息map,方便查找inFlightPQ       inFlightPqueue   //消费中的消息队列inFlightMutex    sync.Mutex
}

client订阅topic消息

订阅发送的还是Command这个结构,只不过订阅没有消息内容而已,指定topic和channel就行,如果topic和channel不存在都会自动创建,client和server建立的是tcp长连接,server会启动两个协程,一个用于发消息,一个用于接收消息,建立连接后,channel会把client加入它的map[int64]Consumer中,key为clientId,当topic收到消息后,会分发给channel,channel通过发消息的协程发给client

channel将消息推给消费者

channel中的消息存在两个地方:内存通道和磁盘队列,topic将消息分发给channel时,通过go的select将消息分发给内存通道或是磁盘队列,由于select的default分支优先级比case低,所以只要内存通道没满,就会往内存通道中写,否则就写入磁盘,

diskqueue.diskQueue维护着磁盘数据的读写,每个非临时的topic和channel都有这样一个字段。

发消息的协程就会一直读内存通道和磁盘队列中的数据,将消息发给client

nsq消息类型有三种如下:

// frame types
const (FrameTypeResponse int32 = 0   //响应FrameTypeError    int32 = 1   //错误FrameTypeMessage  int32 = 2   //消息
)

消息发送给client之后,也不知道消息到底有没有消费成功,有可能client收到消息之后就崩溃了,所以消息发给client之后,需要client给server发一个FIN消息告诉server,这个消息我消费成功,所以在将消息发送给client之后,消息出了内存队列/磁盘队列,进入了一个新的队列,叫飞行队列,表示这个消息正在运输消费中,为了维护在消费中的消息,nsq使用了两个数据结构:

type inFlightPqueue []*Message
inFlightPQ       inFlightPqueue //按照超时时间排序的最小堆
inFlightMessages map[MessageID]*Message //保存消息

消息发送给client之后,同时会将消息存入inFlightPQ和inFlightMessages中,inFlightPQ中的消息都设置了超时时间默认是1分钟,如果1分钟后还没有收到client发过来的FIN消息,会将消息重新加入待消费队列,让client重新消费,目的是想保证每个消息至少被消费一次,由于消息可保存在内存中,进程可能随时挂掉并不能保证每个消息都至少被消费一次,如果不用内存队列,完全使用磁盘队列,当进程意外崩掉的时候,消息是否丢失要看磁盘队列的具体实现,完全使用磁盘队列性能差点,安全性更高

inFlightMessages就是为了方便通过消息id查找消息,收到client发送过来的FIN消息时就会将消息从inFlightPQ和inFlightMessages中删除,表示这个消息已经消费成功,数据也就被扔掉了

延迟消息

发延迟消息和发普通消息的区别是producter在生成延迟消息的时候指定了延迟时间,单位毫秒,命令:DPUB

延迟消息存在内存中,并没有存到磁盘中,延迟消息要是存在磁盘中,实现起来还是比较复杂

延迟消息同样使用了一个队列和一个map,结构如下:

type Item struct {Value    interface{}  //*MessagePriority int64 //执行的时间戳,单位毫秒Index    int   //队列索引
}
type PriorityQueue []*Item
deferredPQ       pqueue.PriorityQueue
deferredMessages map[MessageID]*pqueue.Item
deferredPQ和inFlightPQ一样,是按照时间排序的最小堆

那么nsq是怎么判断消息超时,延迟消息的执行时间到了呢?

nsq有一个专门的协程来处理这两种情况,实现也很简单,就是每100毫秒检查一次,看是否有超时的消息,延迟消息是否执行时间是否到了,如果消息超时,则重新将消息加入待消费队列,每次将消息发送给client的时候,重试次数都会加一,即Message.Attempts++

延迟消息执行时间要是到了,就会当做一个普通的消息加入待消费队列,后面的流程都是一样的,默认最大延迟时间为1小时,所有的默认值在进程启动时都是可重新指定的

nsqd启动过程

1:加载启动参数

启动参数定义了结构nsqd.Options,并初始化好了默认值,在进程启动的时候可以指定对应的值,通过反射将这些参数赋给nsqd.Options,通过nsqd.Options就能方便的使用各个参数

2:加载topic和channel并启动

在nsqd启动的时候会加载配置文件nsqd.dat,验证topic和channel名称格式是否有效,然后启动所有topic,该暂停的就暂停,当topic和channel发生变更的时候回将所有信息重新保存到nsqd.dat中,如新增/删除/暂停/启动topic和channel会保存文件

topic和channel保存到文件中的结构

type meta struct {Topics []struct {Name     string `json:"name"`Paused   bool   `json:"paused"`Channels []struct {Name   string `json:"name"`Paused bool   `json:"paused"`} `json:"channels"`} `json:"topics"`
}

3:启动tcp/http/https服务

nsq可以通过tcp和http通过服务,http和https提供的服务是一样,区别在于协议本身,当client通过tcp和server建立连接后,server会启动两个协程,一个用于发消息,一个用于收消息

tcp提供的服务如下:

服务命令 服务描述
INENTIFY 认证
FIN 消费完成
RDY 指定可同时处理的消息数量
REQ 消息重新加入队列
PUB 发布单条消息
MPUB 发布多条消息
DPUB 发布单条延迟消息
NOP 不做任何处理
TOUCH 重新设置消息处理超时时间
SUB 订阅,订阅后才能消费消息
CLS 关闭停止消费
AUTH 授权

client和server建立连接后,client通过命令INENTIFY将认证信息发给服务端,如果server在启动的时候指定了授权地址,server就会告诉client你需要认证,client就会通过命令AUTH将秘钥发给server,server去授权地址进行验证,验证通过后,就可以进行正常的消息发布和订阅了

http和https提供服务如下:

服务名称
发布单条/多条消息
topic新增/删除/情况topic中消息/暂停/启动
channel新增/删除/情况topic中消息/暂停/启动
nsq状态信息
ping
启动参数查询和修改

tcp服务能发布和消费消息,http/https则只能发布消息,发布消息最后调的是同一个接口

端口信息

协议名称 默认端口
tcp 4150
http 4151
https 4152

心跳

心跳默认30秒,在认证(INENTIFY)的时候client可以指定心跳时间间隔,server按照心跳给client发消息,消息内容:_heartbeat_,如果发送失败,发送消息的协程就会退出,这样server就不在给client发消息了,server如果从client读消息失败,接收消息的协程就会退出,关闭和client的连接,从channel中将client移除,这样就不在收client发来的消息,server中也就没有client的任何信息了

consumer和producter连着nsqd的同一个端口,为什么consumer能消费消息,而producter却不会呢?

nsq是个基于发布和订阅的消息队列,只有订阅了才能消费消息,consumer和producter虽然连着同一个端口,consumer在建立连接后,会发送SUB命令,告诉server我要订阅,而producter并没有,consumer在发送SUB命令后还会发送RDY命令告诉server能同时处理消息的个数,当rdyCount=0时,server也不会给consumer推消息,所以SUB和RDY这两个命令缺一不可

nsq消息文件的存取

nsq可以将消息存在内存中或是文件中,存在内存的好处就是速度快,确定就是一旦进程退出消息就丢失了,所以在实战中消息都会写到磁盘文件,虽然慢点但不容易丢消息

封装消息存取文件的实现在github.com/nsqio/go-diskqueue/diskqueue.go中

topic收到消息后,可以将消息存在内存中或是文件中,当内存channel写满之后就会写入文件,当我们把channel的buffer设置成0后,所有的消息就会写文件

每个topic都会启动一个协程将其收到的消息复制给其下面的每个channel,channel在将消息推送给consumer,channel收到topic发过来(函数调用)的消息,可将消息存入内存或是文件

消息写入内存,topic下面的channel其实是共享一份数据,因为数据都是自读的,而写入文件却是每个channel都有一组文件并将消息吸入,真正做到了读时复制,每个topic和channel都会实例化一个diskQueue,其结构如下

// diskQueue implements a filesystem backed FIFO queue//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊汉
type diskQueue struct {// 64bit atomic vars need to be first for proper alignment on 32bit platforms// run-time state (also persisted to disk)readPos      int64   //已经读的位置writePos     int64   //已经写的位置readFileNum  int64   //正在读的文件编号writeFileNum int64   //正在写的文件编号depth        int64   //没有消费的消息数量sync.RWMutex// instantiation time metadataname            string  // topicName 或者 topicName + ":" + channelNamedataPath        string  //存消息文件的目录maxBytesPerFile int64   // currently this cannot change once createdminMsgSize      int32   //消息最小值maxMsgSize      int32   //消息最大值syncEvery       int64   // number of writes per fsyncsyncTimeout     time.Duration // duration of time per fsyncexitFlag        int32   //退出标记needSync        bool    //强制将文件缓冲区的数据写入磁盘// keeps track of the position where we have read// (but not yet sent over readChan)nextReadPos     int64   //下次读的位置nextReadFileNum int64   //下次读的文件编号readFile  *os.File      //正在读的文件writeFile *os.File      //正在写的文件reader    *bufio.Reader //读缓冲区,默认4KwriteBuf  bytes.Buffer  //写缓冲区// exposed via ReadChan()readChan chan []byte    //读channel// internal channelswriteChan         chan []byte   //写channelwriteResponseChan chan error //写结果通知emptyChan         chan int   //删除所有文件channelemptyResponseChan chan error //删除通知channelexitChan          chan int   //退出channelexitSyncChan      chan int   //退出命令同步等待channellogf AppLogFunc    //写日志
}

文件名命名:目录 + topicName:channelName + .diskqueue.000001.dat

func (d *diskQueue) fileName(fileNum int64) string {return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
}

diskQueue在实例化的时候回初始化相关的属性,当文件大小大于指定文件的最大值时,文件编号writeFileNum就会自增1,新来的消息就会写入新的文件

按顺序读写文件,每个消息写文件的格式是:消息长度(4字节) + 消息内容,这样读消息也就很容易了,先读4字节,知道消息的长度,接着读消息内容,下一个消息也是这样读,当下一个消息读的位置大于文件的最大值时说明这个文件读完了,可以从下一个文件开始写了,

写文件是同步的,写完之后直接反馈消息是否写入成功,由于文件系统的缓存原因,系统并不是把消息马上写入磁盘,而是写入了文件的缓冲区,所以需要定时的将文件缓冲区的内容写入磁盘,nsq使用了两个策略将文件缓冲区的内容写入磁盘。两个策略同时进行

1:默认每2500条消息强制将文件缓存内容写入磁盘

2:默认每两秒强制将文件缓存内容写入磁盘

在将消息强制写入磁盘的同时,也会将队列当前状态写入另一个文件,若程序退出,下次启动后就能正常进行文件的读写,写入内容包括:

1:剩余消息数量

2:正在读的文件编号

3:读文件偏移量

4:正在写的文件编号

5:写文件偏移量

磁盘文件的删除,如果一个文件中的消息全部被消费了,那这个文件将被删除

断开重连
断开后如果不能自动重连,那就是死都不知道怎么死的,所以nsq是有断开重连功能的
server短发现断开后,不会自动重连,鬼知道你是不是主动断开,所以server发现断开了,就将client的相关信息完全删除,就像client从没有出现过
client断开后会自动重连,client分consumer和producer
consumer自动重连:consumer作为消费者就是读,所以当读失败的时候,consumer会关闭读写功能,就断开连接,当consumer收到的所有消息处理完成后,就会自动重连,注意写失败并不会自动重连
producer自动重连:producer作为生产者就是写,所以当写失败的时候,producer按照状态来决定是否重连,如果发现状态为非连接状态就连接,收到断开是不会重连的,在写失败的时候才会重连

参考资料

https://www.cnblogs.com/hlxs/p/11445103.html

https://github.com/nsqio/nsq

https://github.com/nsqio/go-nsq

https://mp.weixin.qq.com/s/lrbIx88Z1HwWNTO_5aABJQ

https://www.infoq.cn/article/2015/02/nsq-distributed-message-platform

消息长度_nsq消息队列源码分析相关推荐

  1. 并发编程5:Java 阻塞队列源码分析(下)

    上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueue 和 PriorityBlockingQueue,这篇文 ...

  2. 并发-阻塞队列源码分析

    阻塞队列 参考: http://www.cnblogs.com/dolphin0520/p/3932906.html http://endual.iteye.com/blog/1412212 http ...

  3. LiteOS内核源码分析:消息队列Queue

    本文分享自华为云社区<LiteOS内核源码分析系列十 消息队列Queue>,原文作者:zhushy . 队列(Queue)是一种常用于任务间通信的数据结构.任务能够从队列里面读取消息,当队 ...

  4. 【TencentOS tiny】深度源码分析(4)——消息队列

    消息队列 在前一篇文章中[TencentOS tiny学习]源码分析(3)--队列 我们描述了TencentOS tiny的队列实现,同时也点出了TencentOS tiny的队列是依赖于消息队列的, ...

  5. 【队列源码研究】消息队列beanstalkd源码详解

    顺风车运营研发团队 李乐 1.消息队列简介 计算机软件发展的一个重要目标是降低软件耦合性: 网站架构中,系统解耦合的重要手段就是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分为多个阶段,每 ...

  6. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  7. SparkRPC源码分析之RPC管道与消息类型

    SparkRPC源码分析之RPC管道与消息类型 我们前面看过了netty基础知识扫盲,那我们应该明白,ChannelHandler这个组件内为channel的各种事件提供了处理逻辑,也就是主要业务逻辑 ...

  8. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  9. producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)

    有序消息 消息有序指的是可以按照消息的发送顺序来消费. RocketMQ可以严格的保证消息有序.但这个顺序,不是全局顺序,只是分区(queue)顺序. 顺序消息生产者 public static vo ...

最新文章

  1. php 的cookie设置时间,php cookie时间设置的方法-PHP问题
  2. 实现微信朋友圈动态列表
  3. 微软私有云分享(R2)16PowerShell查看虚拟机信息
  4. .Net的后台服务技术有哪些?
  5. MYSQL正式环境主从复制(不锁表,不停服务)
  6. c专家编程(C专家编程pdf)
  7. python代码示例-Python代码样例列表
  8. 无聊日常——对QQ邮箱盗号邮件的垃圾账号填充
  9. HCL_路由器_三层交换
  10. 1_数据分析应掌握的Python基础
  11. 嵌入式linux,增加串口登陆密码
  12. C# AO/ArcEngine 栅格数据总结
  13. SAS学习(8)——自定义proc means的数据导出
  14. select 函数用法
  15. 有什么APP软件可以测试耳环,心理测试选择自己喜欢的耳环,测试自己最招桃花的地方...
  16. 【QT】实现贪吃蛇小游戏(附源码)
  17. 使用ajax——ajax四部曲
  18. 杂谈 :女人过了三十,只好嫁给流氓
  19. Mac 教程:OS X「剪切」移动文件的三种方法
  20. Selenium 实现下载文件 Firefox,Chrome

热门文章

  1. 2.Rails程序框架
  2. Latex个人常用清单--不断更新
  3. 【python】热力图绘制: intensity_heatmap,density_heatmap
  4. 手把手教你使用 Clion 开发 Linux C++ 项目
  5. MySql基础笔记(三)其他重要的事情
  6. C#/.NET整数的三种强制类型转换(int)、Convert.ToInt32()、int.Parse()的区别
  7. 用js实现图片的无缝滚动效果
  8. PDFSAM:简朴好用的 PDF 抢救器材
  9. datetime使用
  10. xunsearch全文检索初体验