消息长度_nsq消息队列源码分析
nsq的源码比较简单,值得一读,特别是golang开发人员,下面重点介绍nsqd,nsqd是nsq的核心,其他的都是辅助工具,看完这篇文章希望你能对消息队列的原理和实现有一定的了解。
nsqd是一个守护进程,负责接收,排队,投递消息给客户端,并不保证消息的严格顺序,nsqd默认监听一个tcp端口 (4150) 和一个http端口 (4151) 以及一个可选的https端口
对订阅了同一个topic的同一个channel的消费者使用负载均衡策略,其实就是多个协程消费同一个channel
只要channel存在,即使没有该channel的消费者,也会将生产者的message缓存到队列(内存队列和磁盘队列)中,当有新的消费者产生后,就开始消费队列中的所有消息
保证队列中的 message 至少会被消费一次(在进程意外退出的时候这点都保证不了),并不能保证成功消费一次,即使 nsqd退出,也会将队列中的消息暂存磁盘上(进程退出的时候会将缓存中的消息存到磁盘上,意外情况如掉电就不行了,缓存中的消息就没有机会存盘而丢失,在实战中一般不会使用缓存队列即内存buffer为0,全部使用磁盘队列)
限定内存占用,能够配置nsqd中每个channel队列在内存中缓存的message数量,一旦channel的buffer写满,就将message写到磁盘中,这点使用golang select的优先级功能,default优先级最低
topic,channel 一旦建立,将会一直存在,要及时在管理台或者用代码清除无效的 topic 和 channel,避免资源的浪费,每个topic和channel都有独立的协程处理自身的消息,默认的buffer和其他的一些信息
nsq消息没有备份,一旦出现进程意外情况退出,可能会出现消息丢失,如没有消费成功的消息,写入文件但没有真正落盘的消息,这种意外情况很难杜绝,像意外退出这种情况kafka,redis等都会遇到这样的问题,最后都会采用一个折中的策略,定时将数据落盘
//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊汉
type Topic struct {// 64bit atomic vars need to be first for proper alignment on 32bit platformsmessageCount uint64 //消息总数量messageBytes uint64 //消息总长度sync.RWMutexname string //topic namechannelMap map[string]*Channel //保存topic下面的所有channelbackend BackendQueue //磁盘队列memoryMsgChan chan *Message //内存队列startChan chan intexitChan chan intchannelUpdateChan chan intwaitGroup util.WaitGroupWrapperexitFlag int32 //退出标记idFactory *guidFactory //生成msg id的工厂ephemeral bool //是否临时topicdeleteCallback func(*Topic) //删除topic方法指针deleter sync.Oncepaused int32 //暂停标记,1暂停, 0正常pauseChan chan intctx *context
}
Topic创建
nsqd用map[string]*Topic来保存所有topic,producter在发消息的时候回指定topic,nsqd在收到消息后会判断topic是否存在,不存在就会自动创建,每创建一个新的topic就会启动一个协程,用于处理topic相关的消息,如将内存/磁盘中的消息复制给topic中的每个channel、channel数量变化、channel暂停、topic退出
消息结构
// Command represents a command from a client to an NSQ daemon//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊汉
type Command struct {Name []byte //命令名称,可选:IDENTIFY、FIN、RDY、REQ、PUB、MPUB、DPUB、NOP、TOUCH、SUB、CLS、AUTHParams [][]byte //不同的命令做不同解析,涉及到topic的,Params[0]为topic nameBody []byte //消息内容
}// WriteTo implements the WriterTo interface and
// serializes the Command to the supplied Writer.
//
// It is suggested that the target Writer is buffered
// to avoid performing many system calls.
func (c *Command) WriteTo(w io.Writer) (int64, error) {var total int64var buf [4]byten, err := w.Write(c.Name) //命名名称,nsqd根据这个名称执行相关功能total += int64(n)if err != nil {return total, err}for _, param := range c.Params {n, err := w.Write(byteSpace) //空格total += int64(n)if err != nil {return total, err}n, err = w.Write(param) //参数total += int64(n)if err != nil {return total, err}}n, err = w.Write(byteNewLine) //空行ntotal += int64(n)if err != nil {return total, err}//消息内容if c.Body != nil { bufs := buf[:]binary.BigEndian.PutUint32(bufs, uint32(len(c.Body)))n, err := w.Write(bufs) //消息长度4字节total += int64(n)if err != nil {return total, err}n, err = w.Write(c.Body) //消息内容total += int64(n)if err != nil {return total, err}}return total, nil
}
nsqd收到这个结构做解析,就能知道命令名称(干什么),topic name,消息内容等,不同的命令,命令参数不一样
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {if bytes.Equal(params[0], []byte("IDENTIFY")) {return p.IDENTIFY(client, params)}err := enforceTLSPolicy(client, p, params[0])if err != nil {return nil, err}switch {case bytes.Equal(params[0], []byte("FIN")):return p.FIN(client, params)case bytes.Equal(params[0], []byte("RDY")):return p.RDY(client, params)case bytes.Equal(params[0], []byte("REQ")):return p.REQ(client, params)case bytes.Equal(params[0], []byte("PUB")):return p.PUB(client, params)case bytes.Equal(params[0], []byte("MPUB")):return p.MPUB(client, params)case bytes.Equal(params[0], []byte("DPUB")):return p.DPUB(client, params)case bytes.Equal(params[0], []byte("NOP")):return p.NOP(client, params)case bytes.Equal(params[0], []byte("TOUCH")):return p.TOUCH(client, params)case bytes.Equal(params[0], []byte("SUB")):return p.SUB(client, params)case bytes.Equal(params[0], []byte("CLS")):return p.CLS(client, params)case bytes.Equal(params[0], []byte("AUTH")):return p.AUTH(client, params)}return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
Topic收到消息
nsqd收到上面这个结构,解析之后,就会执行相关功能,我们以PUB命令为例:
1:读到空行处,能拿到命令名称和参数,命令名称=PUB,命令参数为topicName
2:检查topicName是否有效
3:获取消息内容长度,读取4个字节
4:分配对应内容长度空间,读取对应长度字节存入
5:获取topicName信息,没有就创建
6:构造消息结构体nsqd.Message,自动生成消息id
7:将消息提交给对应的topic,Topic.PutMessage
8:将消息写入topic对应的内存消息通道,内存消息通道默认大小为10000,如通道满了则写入磁盘
Topic中的消息分发给channel
在创建topic的时候回启动一个协程处理各种消息,其中就包括消费topic中的消息,topic只是将消息投递到其中的每个channel中,如topic下面有10个channel,则要复制9个nsqd.Message,每个channel一个nsqd.Message,但是消息id和消息内容是一样的,消息内容并不会被复制,topic收到消息将消息分发给channel就完事了,消息怎么发给消费者,由channel负责
type Channel struct {// 64bit atomic vars need to be first for proper alignment on 32bit platformsrequeueCount uint64 //重新入队数量messageCount uint64 //消息数量timeoutCount uint64 //超时数量,已经消费,但没有反馈结果,会重新加入队列,messageCount不会自增sync.RWMutextopicName string //topic namename string //channel namectx *contextbackend BackendQueue //将消息写入磁盘的队列,维护磁盘消息的读写memoryMsgChan chan *Message //内存消息队列,通道buffer默认10000exitFlag int32 //退出标记,1表示退出,0没有退出exitMutex sync.RWMutex// state trackingclients map[int64]Consumer //连接到这个topic-channel的所有clientpaused int32 //暂停标记,0不暂停,1暂停,暂停就不会往这个channel中copy消息ephemeral bool //临时channel标记,临时channel不会存到文件中deleteCallback func(*Channel) //用于从topic中删除channeldeleter sync.Once // Stats trackinge2eProcessingLatencyStream *quantile.Quantile// TODO: these can be DRYd updeferredMessages map[MessageID]*pqueue.Item //延迟消息map,方便查找deferredPQ pqueue.PriorityQueue //延迟消息队列deferredMutex sync.MutexinFlightMessages map[MessageID]*Message //消费中的消息map,方便查找inFlightPQ inFlightPqueue //消费中的消息队列inFlightMutex sync.Mutex
}
client订阅topic消息
订阅发送的还是Command这个结构,只不过订阅没有消息内容而已,指定topic和channel就行,如果topic和channel不存在都会自动创建,client和server建立的是tcp长连接,server会启动两个协程,一个用于发消息,一个用于接收消息,建立连接后,channel会把client加入它的map[int64]Consumer中,key为clientId,当topic收到消息后,会分发给channel,channel通过发消息的协程发给client
channel将消息推给消费者
channel中的消息存在两个地方:内存通道和磁盘队列,topic将消息分发给channel时,通过go的select将消息分发给内存通道或是磁盘队列,由于select的default分支优先级比case低,所以只要内存通道没满,就会往内存通道中写,否则就写入磁盘,
diskqueue.diskQueue维护着磁盘数据的读写,每个非临时的topic和channel都有这样一个字段。
发消息的协程就会一直读内存通道和磁盘队列中的数据,将消息发给client
nsq消息类型有三种如下:
// frame types
const (FrameTypeResponse int32 = 0 //响应FrameTypeError int32 = 1 //错误FrameTypeMessage int32 = 2 //消息
)
消息发送给client之后,也不知道消息到底有没有消费成功,有可能client收到消息之后就崩溃了,所以消息发给client之后,需要client给server发一个FIN消息告诉server,这个消息我消费成功,所以在将消息发送给client之后,消息出了内存队列/磁盘队列,进入了一个新的队列,叫飞行队列,表示这个消息正在运输消费中,为了维护在消费中的消息,nsq使用了两个数据结构:
type inFlightPqueue []*Message
inFlightPQ inFlightPqueue //按照超时时间排序的最小堆
inFlightMessages map[MessageID]*Message //保存消息
消息发送给client之后,同时会将消息存入inFlightPQ和inFlightMessages中,inFlightPQ中的消息都设置了超时时间默认是1分钟,如果1分钟后还没有收到client发过来的FIN消息,会将消息重新加入待消费队列,让client重新消费,目的是想保证每个消息至少被消费一次,由于消息可保存在内存中,进程可能随时挂掉并不能保证每个消息都至少被消费一次,如果不用内存队列,完全使用磁盘队列,当进程意外崩掉的时候,消息是否丢失要看磁盘队列的具体实现,完全使用磁盘队列性能差点,安全性更高
inFlightMessages就是为了方便通过消息id查找消息,收到client发送过来的FIN消息时就会将消息从inFlightPQ和inFlightMessages中删除,表示这个消息已经消费成功,数据也就被扔掉了
延迟消息
发延迟消息和发普通消息的区别是producter在生成延迟消息的时候指定了延迟时间,单位毫秒,命令:DPUB
延迟消息存在内存中,并没有存到磁盘中,延迟消息要是存在磁盘中,实现起来还是比较复杂
延迟消息同样使用了一个队列和一个map,结构如下:
type Item struct {Value interface{} //*MessagePriority int64 //执行的时间戳,单位毫秒Index int //队列索引
}
type PriorityQueue []*Item
deferredPQ pqueue.PriorityQueue
deferredMessages map[MessageID]*pqueue.Item
deferredPQ和inFlightPQ一样,是按照时间排序的最小堆
那么nsq是怎么判断消息超时,延迟消息的执行时间到了呢?
nsq有一个专门的协程来处理这两种情况,实现也很简单,就是每100毫秒检查一次,看是否有超时的消息,延迟消息是否执行时间是否到了,如果消息超时,则重新将消息加入待消费队列,每次将消息发送给client的时候,重试次数都会加一,即Message.Attempts++
延迟消息执行时间要是到了,就会当做一个普通的消息加入待消费队列,后面的流程都是一样的,默认最大延迟时间为1小时,所有的默认值在进程启动时都是可重新指定的
nsqd启动过程
1:加载启动参数
启动参数定义了结构nsqd.Options,并初始化好了默认值,在进程启动的时候可以指定对应的值,通过反射将这些参数赋给nsqd.Options,通过nsqd.Options就能方便的使用各个参数
2:加载topic和channel并启动
在nsqd启动的时候会加载配置文件nsqd.dat,验证topic和channel名称格式是否有效,然后启动所有topic,该暂停的就暂停,当topic和channel发生变更的时候回将所有信息重新保存到nsqd.dat中,如新增/删除/暂停/启动topic和channel会保存文件
topic和channel保存到文件中的结构
type meta struct {Topics []struct {Name string `json:"name"`Paused bool `json:"paused"`Channels []struct {Name string `json:"name"`Paused bool `json:"paused"`} `json:"channels"`} `json:"topics"`
}
3:启动tcp/http/https服务
nsq可以通过tcp和http通过服务,http和https提供的服务是一样,区别在于协议本身,当client通过tcp和server建立连接后,server会启动两个协程,一个用于发消息,一个用于收消息
tcp提供的服务如下:
服务命令 | 服务描述 |
INENTIFY | 认证 |
FIN | 消费完成 |
RDY | 指定可同时处理的消息数量 |
REQ | 消息重新加入队列 |
PUB | 发布单条消息 |
MPUB | 发布多条消息 |
DPUB | 发布单条延迟消息 |
NOP | 不做任何处理 |
TOUCH | 重新设置消息处理超时时间 |
SUB | 订阅,订阅后才能消费消息 |
CLS | 关闭停止消费 |
AUTH | 授权 |
client和server建立连接后,client通过命令INENTIFY将认证信息发给服务端,如果server在启动的时候指定了授权地址,server就会告诉client你需要认证,client就会通过命令AUTH将秘钥发给server,server去授权地址进行验证,验证通过后,就可以进行正常的消息发布和订阅了
http和https提供服务如下:
服务名称 |
发布单条/多条消息 |
topic新增/删除/情况topic中消息/暂停/启动 |
channel新增/删除/情况topic中消息/暂停/启动 |
nsq状态信息 |
ping |
启动参数查询和修改 |
tcp服务能发布和消费消息,http/https则只能发布消息,发布消息最后调的是同一个接口
端口信息
协议名称 | 默认端口 |
tcp | 4150 |
http | 4151 |
https | 4152 |
心跳
心跳默认30秒,在认证(INENTIFY)的时候client可以指定心跳时间间隔,server按照心跳给client发消息,消息内容:_heartbeat_,如果发送失败,发送消息的协程就会退出,这样server就不在给client发消息了,server如果从client读消息失败,接收消息的协程就会退出,关闭和client的连接,从channel中将client移除,这样就不在收client发来的消息,server中也就没有client的任何信息了
consumer和producter连着nsqd的同一个端口,为什么consumer能消费消息,而producter却不会呢?
nsq是个基于发布和订阅的消息队列,只有订阅了才能消费消息,consumer和producter虽然连着同一个端口,consumer在建立连接后,会发送SUB命令,告诉server我要订阅,而producter并没有,consumer在发送SUB命令后还会发送RDY命令告诉server能同时处理消息的个数,当rdyCount=0时,server也不会给consumer推消息,所以SUB和RDY这两个命令缺一不可
nsq消息文件的存取
nsq可以将消息存在内存中或是文件中,存在内存的好处就是速度快,确定就是一旦进程退出消息就丢失了,所以在实战中消息都会写到磁盘文件,虽然慢点但不容易丢消息
封装消息存取文件的实现在github.com/nsqio/go-diskqueue/diskqueue.go中
topic收到消息后,可以将消息存在内存中或是文件中,当内存channel写满之后就会写入文件,当我们把channel的buffer设置成0后,所有的消息就会写文件
每个topic都会启动一个协程将其收到的消息复制给其下面的每个channel,channel在将消息推送给consumer,channel收到topic发过来(函数调用)的消息,可将消息存入内存或是文件
消息写入内存,topic下面的channel其实是共享一份数据,因为数据都是自读的,而写入文件却是每个channel都有一组文件并将消息吸入,真正做到了读时复制,每个topic和channel都会实例化一个diskQueue,其结构如下
// diskQueue implements a filesystem backed FIFO queue//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊汉
type diskQueue struct {// 64bit atomic vars need to be first for proper alignment on 32bit platforms// run-time state (also persisted to disk)readPos int64 //已经读的位置writePos int64 //已经写的位置readFileNum int64 //正在读的文件编号writeFileNum int64 //正在写的文件编号depth int64 //没有消费的消息数量sync.RWMutex// instantiation time metadataname string // topicName 或者 topicName + ":" + channelNamedataPath string //存消息文件的目录maxBytesPerFile int64 // currently this cannot change once createdminMsgSize int32 //消息最小值maxMsgSize int32 //消息最大值syncEvery int64 // number of writes per fsyncsyncTimeout time.Duration // duration of time per fsyncexitFlag int32 //退出标记needSync bool //强制将文件缓冲区的数据写入磁盘// keeps track of the position where we have read// (but not yet sent over readChan)nextReadPos int64 //下次读的位置nextReadFileNum int64 //下次读的文件编号readFile *os.File //正在读的文件writeFile *os.File //正在写的文件reader *bufio.Reader //读缓冲区,默认4KwriteBuf bytes.Buffer //写缓冲区// exposed via ReadChan()readChan chan []byte //读channel// internal channelswriteChan chan []byte //写channelwriteResponseChan chan error //写结果通知emptyChan chan int //删除所有文件channelemptyResponseChan chan error //删除通知channelexitChan chan int //退出channelexitSyncChan chan int //退出命令同步等待channellogf AppLogFunc //写日志
}
文件名命名:目录 + topicName:channelName + .diskqueue.000001.dat
func (d *diskQueue) fileName(fileNum int64) string {return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
}
diskQueue在实例化的时候回初始化相关的属性,当文件大小大于指定文件的最大值时,文件编号writeFileNum就会自增1,新来的消息就会写入新的文件
按顺序读写文件,每个消息写文件的格式是:消息长度(4字节) + 消息内容,这样读消息也就很容易了,先读4字节,知道消息的长度,接着读消息内容,下一个消息也是这样读,当下一个消息读的位置大于文件的最大值时说明这个文件读完了,可以从下一个文件开始写了,
写文件是同步的,写完之后直接反馈消息是否写入成功,由于文件系统的缓存原因,系统并不是把消息马上写入磁盘,而是写入了文件的缓冲区,所以需要定时的将文件缓冲区的内容写入磁盘,nsq使用了两个策略将文件缓冲区的内容写入磁盘。两个策略同时进行
1:默认每2500条消息强制将文件缓存内容写入磁盘
2:默认每两秒强制将文件缓存内容写入磁盘
在将消息强制写入磁盘的同时,也会将队列当前状态写入另一个文件,若程序退出,下次启动后就能正常进行文件的读写,写入内容包括:
1:剩余消息数量
2:正在读的文件编号
3:读文件偏移量
4:正在写的文件编号
5:写文件偏移量
磁盘文件的删除,如果一个文件中的消息全部被消费了,那这个文件将被删除
断开重连
断开后如果不能自动重连,那就是死都不知道怎么死的,所以nsq是有断开重连功能的
server短发现断开后,不会自动重连,鬼知道你是不是主动断开,所以server发现断开了,就将client的相关信息完全删除,就像client从没有出现过
client断开后会自动重连,client分consumer和producer
consumer自动重连:consumer作为消费者就是读,所以当读失败的时候,consumer会关闭读写功能,就断开连接,当consumer收到的所有消息处理完成后,就会自动重连,注意写失败并不会自动重连
producer自动重连:producer作为生产者就是写,所以当写失败的时候,producer按照状态来决定是否重连,如果发现状态为非连接状态就连接,收到断开是不会重连的,在写失败的时候才会重连
参考资料
https://www.cnblogs.com/hlxs/p/11445103.html
https://github.com/nsqio/nsq
https://github.com/nsqio/go-nsq
https://mp.weixin.qq.com/s/lrbIx88Z1HwWNTO_5aABJQ
https://www.infoq.cn/article/2015/02/nsq-distributed-message-platform
消息长度_nsq消息队列源码分析相关推荐
- 并发编程5:Java 阻塞队列源码分析(下)
上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueue 和 PriorityBlockingQueue,这篇文 ...
- 并发-阻塞队列源码分析
阻塞队列 参考: http://www.cnblogs.com/dolphin0520/p/3932906.html http://endual.iteye.com/blog/1412212 http ...
- LiteOS内核源码分析:消息队列Queue
本文分享自华为云社区<LiteOS内核源码分析系列十 消息队列Queue>,原文作者:zhushy . 队列(Queue)是一种常用于任务间通信的数据结构.任务能够从队列里面读取消息,当队 ...
- 【TencentOS tiny】深度源码分析(4)——消息队列
消息队列 在前一篇文章中[TencentOS tiny学习]源码分析(3)--队列 我们描述了TencentOS tiny的队列实现,同时也点出了TencentOS tiny的队列是依赖于消息队列的, ...
- 【队列源码研究】消息队列beanstalkd源码详解
顺风车运营研发团队 李乐 1.消息队列简介 计算机软件发展的一个重要目标是降低软件耦合性: 网站架构中,系统解耦合的重要手段就是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分为多个阶段,每 ...
- 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解
导语 在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...
- SparkRPC源码分析之RPC管道与消息类型
SparkRPC源码分析之RPC管道与消息类型 我们前面看过了netty基础知识扫盲,那我们应该明白,ChannelHandler这个组件内为channel的各种事件提供了处理逻辑,也就是主要业务逻辑 ...
- RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...
- producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)
有序消息 消息有序指的是可以按照消息的发送顺序来消费. RocketMQ可以严格的保证消息有序.但这个顺序,不是全局顺序,只是分区(queue)顺序. 顺序消息生产者 public static vo ...
最新文章
- php 的cookie设置时间,php cookie时间设置的方法-PHP问题
- 实现微信朋友圈动态列表
- 微软私有云分享(R2)16PowerShell查看虚拟机信息
- .Net的后台服务技术有哪些?
- MYSQL正式环境主从复制(不锁表,不停服务)
- c专家编程(C专家编程pdf)
- python代码示例-Python代码样例列表
- 无聊日常——对QQ邮箱盗号邮件的垃圾账号填充
- HCL_路由器_三层交换
- 1_数据分析应掌握的Python基础
- 嵌入式linux,增加串口登陆密码
- C# AO/ArcEngine 栅格数据总结
- SAS学习(8)——自定义proc means的数据导出
- select 函数用法
- 有什么APP软件可以测试耳环,心理测试选择自己喜欢的耳环,测试自己最招桃花的地方...
- 【QT】实现贪吃蛇小游戏(附源码)
- 使用ajax——ajax四部曲
- 杂谈 :女人过了三十,只好嫁给流氓
- Mac 教程:OS X「剪切」移动文件的三种方法
- Selenium 实现下载文件 Firefox,Chrome
热门文章
- 2.Rails程序框架
- Latex个人常用清单--不断更新
- 【python】热力图绘制: intensity_heatmap,density_heatmap
- 手把手教你使用 Clion 开发 Linux C++ 项目
- MySql基础笔记(三)其他重要的事情
- C#/.NET整数的三种强制类型转换(int)、Convert.ToInt32()、int.Parse()的区别
- 用js实现图片的无缝滚动效果
- PDFSAM:简朴好用的 PDF 抢救器材
- datetime使用
- xunsearch全文检索初体验