一、镜像队列使用

1.镜像队列作用

​ RabbitMQ默认集群模式,并不包管队列的高可用性,尽管队列信息,交换机、绑定这些可以复制到集群里的任何一个节点,然则队列内容不会复制,固然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能守候重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,须要创建镜像队列。

2.策略设置

镜像队列设置可以基于策略设置,策略设置可以通过如下两种方法:

(1)RabbitMQ 管理后台

(2)rabbitmqctl 设置

policy 添加命令:

rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>

指令参数详情

参数名称 描述
-p 可选参数,针对指定 vhost 下的exchange或 queue
--priority 可选参数,policy 的优先级
--apply-to 可选参数,策略适用的对象类型,其值可为 "queues", "exchanges" 或 "all".默认是"all"
name policy 的名称
pattern 匹配模式(正则表达式)
definition 镜像定义,json 格式,包括三部分(ha-mode,ha-params,ha-sync-mode)具体配置见下表

definition参数详情

参数名称 描述
ha-mode 指名镜像队列模式,其值可为"all","exactly"或"nodes",all:表示在集群所有节点上进行镜像;exactly:表示在指定个数的节点上镜像,节点个数由 ha-params 指定;nodes:表示在指定节点上进行镜像,节点名称通过ha-params 指定。
ha-params ha-mode模式需要用到的参数:exactly 模式下为数字表述镜像节点数,nodes 模式下为节点列表表示需要镜像的节点。
ha-sync-mode 镜像队列中消息的同步方式,其值可为"automatic"或"manually".

例如:对队列名称为 hello 开头的所有队列镜像镜像,并且在集群的节点 rabbit@10.18.195.57上进行镜像,队列消息自动同步,policy 的设置命令:

rabbitmqctl set_policy --apply-to queues hello-ha "^hello" '{"ha-mode":"nodes","ha-params":["rabbit@10.18.195.57"],"ha-sync-mode":"automatic"}'

3.ha 策略确认

镜像队列策略是否生效可以通过如下两种方式验证:

(1)RabbitMQ 管理后台

可以通过策略管理验证策略是否配置正确

通过队列列表也可以查看队列应用的策略,如果是镜像策略,可以看到当前队列副本数

通过队列详情可以查看镜像队列当前主副本在哪个节点,从副本在哪几个节点

(2)rabbitmqctl 查看

查看策略详情指令:

rabbitmqctl list_policies

返回:

查看队列是否镜像指令:

rabbitmqctl list_queues name pid slave_pids

返回:

二、镜像队列实现原理

1.整体介绍

​ 通常队列由两部分组成:一部分是 amqqueue_process, 负责协议相关的消息处理,即接收生产者发布的消息,向消费者投递消息,处理消息 confirm,ack 等等;另外一部分是 backing_queue, 作为消息存储的具体形式和引擎,提供了相关接口供进程amqqueue_process调用,用来完成消息的存储及可能的持久化工作等。

​ 镜像队列和普通队列组成有所不同,镜像队列存在两类进程:master队列进程为 amqqueue_process,slave 队列进程为 rabbit_mirror_queue_slave,每个进程会创建一个 gm(guaranteed multicast)进程,镜像队列中所有 gm 进程会组成一个进程组用于广播和接收消息。同时和普通队列一样,每个进程都包含一个用于处理消息逻辑的队列 backing_queue(默认为rabbit_variable_queue)。集群中每个有客户端连接的节点都会启动若干个channel进程,channel进程中记录着镜像队列中master和所有slave进程的Pid,以便直接与队列进程通信。整体结构如下:

​ gm 负责消息广播,至于广播消息处理,master 队列上回掉处理是通过coordinator,消息相关协议操作是通过amqqueue_process处理,而 slave 队列都是由rabbit_mirror_queue_slave进行处理。

注意:消息的发布和消费都是通过 master 队列完成,master 队列对消息进行处理同时将消息的处理动作通过 gm 广播给所有 slave 队列,slave 的 gm 收到消息后,通过回调交由 rabbit_mirror_queue_slave 进行实际处理。

2.gm(Guaranteed Muticast)

​ 镜像队列 gm 组通过将所有 gm 进程形成一个循环链表,每个 gm 都会监控位于自己左右两边的 gm,当有 gm 新增时,相邻的 gm 保证当前广播的消息会通知到新的 gm 上;当有 gm 失效时,相邻的 gm 会接管保证本次广播消息会通知到所有 gm。

​ gm 组信息会记录在本地数据库(mnesia)中,不同的镜像队列行程的 gm 组也是不同的。

​ 消息从 master 队列对应的 gm 发出后,顺着链表依次传送到所有 gm 进程,由于所有 gm 进程组成一个循环链表,master 队列的 gm 线程最终会收到自己发送的消息,这个时候 master 队列就知道消息已经复制到所有 slave 队列了。

3.重要的数据结构

queue 队列相关信息

-record(q, { q,                    %% 队列信息数据结构amqqueueexclusive_consumer,   %% 当前队列的独有消费者has_had_consumers,    %% 当前队列中是否有消费者的标识backing_queue,        %% backing_queue对应的模块名字backing_queue_state,  %% backing_queue对应的状态结构consumers,            %% 消费者存储的优先级队列expires,              %% 当前队列未使用就删除自己的时间sync_timer_ref,       %% 同步confirm的定时器,当前队列大部分接收一次消息就要确保当前定时器的存在(200ms的定时器)rate_timer_ref,       %% 队列中消息进入和出去的速率定时器expiry_timer_ref,     %% 队列中未使用就删除自己的定时器stats_timer,          %% 向rabbit_event发布信息的数据结构状态字段msg_id_to_channel,    %% 当前队列进程中等待confirm的消息gb_trees结构,里面的结构是Key:MsgId Value:{SenderPid, MsgSeqNo}ttl,                  %% 队列中设置的消息存在的时间ttl_timer_ref,        %% 队列中消息存在的定时器ttl_timer_expiry,     %% 当前队列头部消息的过期时间点senders,              %% 向当前队列发送消息的rabbit_channel进程列表dlx,                  %% 死亡消息要发送的exchange交换机(通过队列声明的参数或者policy接口来设置)dlx_routing_key,      %% 死亡消息要发送的路由规则(通过队列声明的参数或者policy接口来设置)max_length,           %% 当前队列中消息的最大上限(通过队列声明的参数或者policy接口来设置)max_bytes,            %% 队列中消息内容占的最大空间args_policy_version,  %% 当前队列中参数设置对应的版本号,每设置一次都会将版本号加一status                %% 当前队列的状态}).

state 记录 gm 进程状态

-record(state,{ self,                 %% gm本身的IDleft,                 %% 该节点左边的节点right,                %% 该节点右边的节点group_name,           %% group名称与队列名一致module,               %% 回调模块rabbit_mirror_queue_slave或者rabbit_mirror_queue_coordinatorview,                 %% group成员列表视图信息,记录了成员的ID及每个成员的左右邻居节点(组装成一个循环列表)pub_count,            %% 当前已发布的消息计数members_state,        %% group成员状态列表 记录了广播状态:[#member{}]callback_args,        %% 回调函数的参数信息,rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator进程PIDconfirms,             %% confirm列表broadcast_buffer,     %% 缓存待广播的消息broadcast_buffer_sz,  %% 当前缓存带广播中消息实体总的大小broadcast_timer,      %% 广播消息定时器txn_executor          %% 操作Mnesia数据库的操作函数}).

gm_group 整个镜像队列群组的信息,该信息会存储到Mnesia数据库

-record(gm_group, { name,    %% group的名称,与queue的名称一致version, %% group的版本号, 新增节点/节点失效时会递增members  %% group的成员列表, 按照节点组成的链表顺序进行排序}).

view_member 镜像队列群组视图成员数据结构

-record(view_member, { id,       %% 单个镜像队列(结构是{版本号,该镜像队列的Pid})aliases,  %% 记录id对应的左侧死亡的GM进程列表left,     %% 当前镜像队列左边的镜像队列(结构是{版本号,该镜像队列的Pid})right     %% 当前镜像队列右边的镜像队列(结构是{版本号,该镜像队列的Pid})}).

三、镜像队列组群维护

1.节点新加入组群

目前已有节点 A,B,C,新加入节点 B,如图:

节点加入集群流程如下:

(1)新增节点先从 gm_group 中获取对应 group 成员信息;

(2)随机选择一个节点并向这个节点发送加入请求;

(3)集群节点收到新增节点请求后,更新 gm_group 对应信息,同时更新左右节点更新邻居信息(调整对左右节点的监控);

(4)集群节点回复通知新增节点成功加入 group;

(5)新增节点收到回复后更新 rabbit_queue 中的相关信息,同时根据策略同步消息。

核心流程详解:

(1)新增节点 D 的 GM 进程请求加入组群

%% 同步处理将自己加入到镜像队列的群组中的消息
handle_cast(join, State = #state { self          = Self,group_name    = GroupName,members_state = undefined,module        = Module,callback_args = Args,txn_executor  = TxnFun })->%% join_group函数主要执行逻辑%% 1.判断时候有存活节点,如果没有存活,则重新创建gm_group数据库数据%% 2.如果有存活GM进程,随机选择一个GM进程%% 3.将当前新增节点GM进程加入到选择的GM进程右侧%% 4.将所有存活的镜像队列组装成镜像队列循环队列视图A->D->B->C->AView = join_group(Self, GroupName, TxnFun),MembersState =%% 获取镜像队列视图的所有key列表case alive_view_members(View) of%% 如果是第一个GM进程的启动则初始化成员状态数据结构[Self] -> blank_member_state();%% 如果不是第一个GM进程加入到Group中,则成员状态先不做初始化,让自己左侧的GM进程发送过来的信息进行初始化_      -> undefinedend,%% 检查当前镜像队列的邻居信息(根据消息镜像队列的群组循环视图更新自己最新的左右两边的镜像队列)State1 = check_neighbours(State #state { view = View, members_state = MembersState }),%% 通知启动该GM进程的进程已经成功加入镜像队列群组(rabbit_mirror_queue_coordinator或rabbit_mirror_queue_slave模块回调)handle_callback_result({Module:joined(Args, get_pids(all_known_members(View))), State1});

(2)GM 进程 A 处理新增 GM 进程到自己右侧

%% 处理将新的镜像队列加入到本镜像队列的右侧的消息
handle_call({add_on_right, NewMember}, _From,State = #state { self          = Self,group_name    = GroupName,members_state = MembersState,txn_executor  = TxnFun }) ->%% 记录将新的镜像队列成员加入到镜像队列组中,将新加入的镜像队列写入gm_group结构中的members字段中(有新成员加入群组的时候,则将版本号增加一)Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun),%% 根据组成员信息生成新的镜像队列视图数据结构View1 = group_to_view(Group),%% 删除擦除的成员MembersState1 = remove_erased_members(MembersState, View1),%% 向新加入的成员即右边成员发送加入成功的消息ok = send_right(NewMember, View1,{catchup, Self,          prepare_members_state(MembersState1)}),%% 根据新的镜像队列循环队列视图和老的视图修改视图,同时根据镜像队列循环视图更新自己左右邻居信息{Result, State1} = change_view(View1, State #state {members_state = MembersState1 }),%% 向请求加入的镜像队列发送最新的当前镜像队列的群组信息handle_callback_result({Result, {ok, Group}, State1}).

(3) GM进程 D 处理 GM 进程 A 发送过来成员状态信息

%% 左侧的GM进程通知右侧的GM进程最新的成员状态(此情况是本GM进程是新加入Group的,等待左侧GM进程发送过来的消息进行初始化成员状态)
handle_msg({catchup, Left, MembersStateLeft},State = #state { self          = Self,left          = {Left, _MRefL},right         = {Right, _MRefR},view          = View,%% 新加入的GM进程在加入后是没有初始化成员状态,是等待左侧玩家发送消息来进行初始化members_state = undefined }) ->%% 异步向自己右侧的镜像队列发送最新的所有成员信息,让Group中的所有成员更新成员信息ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),%% 将成员信息转化成字典数据结构MembersStateLeft1 = build_members_state(MembersStateLeft),%% 新增加的GM进程更新最新的成员信息{ok, State #state { members_state = MembersStateLeft1 }};

2.节点失效

​ 当 Slave 节点失效时,仅仅是相邻节点感知,然后重新调整邻居节点信息,更新 rabbit_queue, gm_group的记录。

​ 当 Master 节点失效时流程如下:

(1)由于所有 mirror_queue_slave进程会对 amqqueue_process 进程监控,如果 Master 节点失效,mirror_queue_slave感知后通过 GM 进行广播;

(2)存活最久的 Slave 节点会提升自己为 master 节点;

(3)该节点会创建出新的 coordinator,并通知 GM 进程修改回调处理器为 coordinator;

(4)原来的 mirror_queue_slave 作为 amqqueue_process 处理生产发布的消息,向消费者投递消息。

核心流程详解:

(1)GM 进程挂掉处理

%% 接收到自己左右两边的镜像队列GM进程挂掉的消息
handle_info({'DOWN', MRef, process, _Pid, Reason},State = #state { self          = Self,left          = Left,right         = Right,group_name    = GroupName,confirms      = Confirms,txn_executor  = TxnFun }) ->%% 得到挂掉的GM进程Member = case {Left, Right} of%% 左侧的镜像队列GM进程挂掉的情况{{Member1, MRef}, _} -> Member1;%% 右侧的镜像队列GM进程挂掉的情况{_, {Member1, MRef}} -> Member1;_                    -> undefinedend,case {Member, Reason} of{undefined, _} ->noreply(State);{_, {shutdown, ring_shutdown}} ->noreply(State);_ -> timer:sleep(100),%% 先记录有镜像队列成员死亡的信息,然后将所有存活的镜像队列组装镜像队列群组循环队列视图%% 有成员死亡的时候会将版本号增加一,record_dead_member_in_group函数是更新gm_group数据库表中的数据,将死亡信息写入数据库表View1 = group_to_view(record_dead_member_in_group(Member, GroupName, TxnFun)),handle_callback_result(case alive_view_members(View1) of%% 当存活的镜像队列GM进程只剩自己的情况[Self] -> maybe_erase_aliases(State #state {members_state = blank_member_state(),confirms      = purge_confirms(Confirms) },View1);%% 当存活的镜像队列GM进程不止自己(根据新的镜像队列循环队列视图和老的视图修改视图,同时根据镜像队列循环视图更新自己左右邻居信息)%% 同时将当前自己节点的消息信息发布到自己右侧的GM进程_      -> change_view(View1, State)end)end.

(2)主镜像队列回调 rabbit_mirror_queue_coordinator处理 GM 进程挂掉

%% 处理循环镜像队列中有死亡的镜像队列(主镜像队列接收到死亡的镜像队列不可能是主镜像队列死亡的消息,它监视的左右两侧的从镜像队列进程)
handle_cast({gm_deaths, DeadGMPids},State = #state { q  = #amqqueue { name = QueueName, pid = MPid } })when node(MPid) =:= node() ->%% 返回新的主镜像队列进程,死亡的镜像队列进程列表,需要新增加镜像队列的节点列表case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, DeadGMPids) of{ok, MPid, DeadPids, ExtraNodes} ->%% 打印镜像队列死亡的日志rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,DeadPids),%% 异步在ExtraNodes的所有节点上增加QName队列的从镜像队列rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async),noreply(State);{error, not_found} ->{stop, normal, State}end;

(3)从镜像队列回调 rabbit_mirror_queue_coordinator处理 GM 进程挂掉

%% 从镜像队列处理有镜像队列成员死亡的消息(从镜像队列接收到主镜像队列死亡的消息)
handle_call({gm_deaths, DeadGMPids}, From,State = #state { gm = GM, q = Q = #amqqueue {name = QName, pid = MPid }}) ->Self = self(),%% 返回新的主镜像队列进程,死亡的镜像队列进程列表,需要新增加镜像队列的节点列表case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of{error, not_found} -> gen_server2:reply(From, ok),{stop, normal, State};{ok, Pid, DeadPids, ExtraNodes} ->%% 打印镜像队列死亡的日志(Self是副镜像队列)rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids),case Pid of%% 此情况是主镜像队列没有变化MPid ->gen_server2:reply(From, ok),%% 异步在ExtraNodes的所有节点上增加QName队列的副镜像队列rabbit_mirror_queue_misc:add_mirrors(QName, ExtraNodes, async),noreply(State);%% 此情况是本从镜像队列成为主镜像队列Self ->%% 将自己这个从镜像队列提升为主镜像队列QueueState = promote_me(From, State),%% 异步在ExtraNodes的所有节点上增加QName队列的副镜像队列rabbit_mirror_queue_misc:add_mirrors(QName, ExtraNodes, async),%% 返回消息,告知自己这个从镜像队列成为主镜像队列{become, rabbit_amqqueue_process, QueueState, hibernate};_ ->%% 主镜像队列已经发生变化gen_server2:reply(From, ok),[] = ExtraNodes,%% 确认在主节点宕机时否有为完成传输的数据,确认所有从节点都接收到主节点宕机的消息,然后传输未传输的消息。ok = gm:broadcast(GM, process_death),noreply(State #state { q = Q #amqqueue { pid = Pid } })endend;

(4)主镜像队列挂掉否选取新的主镜像队列

%% 返回新的主镜像队列进程,死亡的镜像队列进程列表,需要新增加镜像队列的节点列表
remove_from_queue(QueueName, Self, DeadGMPids) ->rabbit_misc:execute_mnesia_transaction(fun () ->%% 代码运行到这一步有可能队列已经被删除case mnesia:read({rabbit_queue, QueueName}) of[] -> {error, not_found};[Q = #amqqueue { pid        = QPid,slave_pids = SPids,gm_pids    = GMPids }] ->%% 获得死亡的GM列表和存活的GM列表{DeadGM, AliveGM} = lists:partition(fun ({GM, _}) ->lists:member(GM, DeadGMPids)end, GMPids),%% 获得死亡的实际进程的Pid列表DeadPids  = [Pid || {_GM, Pid} <- DeadGM],%% 获得存活的实际进程的Pid列表AlivePids = [Pid || {_GM, Pid} <- AliveGM],%% 获得slave_pids字段中存活的队列进程Pid列表Alive     = [Pid || Pid <- [QPid | SPids],lists:member(Pid, AlivePids)],%% 从存活的镜像队列提取出第一个镜像队列进程Pid,它是最老的镜像队列,它将作为新的主镜像队列进程{QPid1, SPids1} = promote_slave(Alive),Extra =case {{QPid, SPids}, {QPid1, SPids1}} of{Same, Same} ->[];%% 此处的情况是主镜像队列没有变化,或者调用此接口的从镜像队列成为新的主镜像队列_ when QPid =:= QPid1 orelse QPid1 =:= Self ->%% 主镜像队列已经变化,当前从队列变更为主队列,信息更新到数据库(mnesia)Q1 = Q#amqqueue{pid        = QPid1,slave_pids = SPids1,gm_pids    = AliveGM},store_updated_slaves(Q1),%% 根据队列的策略如果启动的从镜像队列需要自动同步,则进行同步操作maybe_auto_sync(Q1),%% 根据当前集群节点和从镜像队列进程所在的节点得到新增加的节点列表slaves_to_start_on_failure(Q1, DeadGMPids);%% 此处的情况是主镜像队列已经发生变化,且调用此接口的从镜像队列没有成为新的主镜像队列_ ->%% 更新最新的存活的从镜像队列进程Pid列表和存活的GM进程列表Q1 = Q#amqqueue{slave_pids = Alive,gm_pids    = AliveGM},%% 存储更新队列的从镜像队列信息store_updated_slaves(Q1),[]end,{ok, QPid1, DeadPids, Extra}endend).

四、镜像队列消息同步

1.消息广播

消息广播流程如下:

(1)Master 节点发出消息,顺着镜像队列循环列表发送;

(2)所有 Slave 节点收到消息会对消息进行缓存(Slave 节点缓存消息用于在广播过程中,有节点失效或者新增节点,这样左侧节点感知变化后会重新将消息推送给右侧节点);

(3)当 Master 节点收到自己发送的消息后意味着所有节点都收到了消息,会再次广播 Ack 消息;

(4)Ack 消息同样会顺着循环列表经过所有 Slave 节点,通知 Slave 节点可以清除缓存消息;

(5)当 Ack 消息回到 Master 节点,对应消息的广播结束。

核心流程详解:

(1)GM 组群中消息广播

%% 节点挂掉的情况或者新增节点发送给自己右侧GM进程的信息
%% 左侧的GM进程通知右侧的GM进程最新的成员状态(此情况是有新GM进程加入Group,但是自己不是最新加入的GM进程,但是自己仍然需要更新成员信息)
handle_msg({catchup, Left, MembersStateLeft},State = #state { self = Self,left = {Left, _MRefL},view = View,members_state = MembersState })when MembersState =/= undefined ->%% 将最新的成员信息转化成字典数据结构MembersStateLeft1 = build_members_state(MembersStateLeft),%% 获取左侧镜像队列传入的成员信息和自己进程存储的成员信息的ID的去重AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++?DICT:fetch_keys(MembersStateLeft1)),%% 根据左侧GM进程发送过来的成员状态和自己GM进程里的成员状态得到需要广播给后续GM进程的信息{MembersState1, Activity} =lists:foldl(fun (Id, MembersStateActivity) ->%% 获取左侧镜像队列传入Id对应的镜像队列成员信息#member { pending_ack = PALeft, last_ack = LA } =find_member_or_blank(Id, MembersStateLeft1),with_member_acc(%% 函数的第一个参数是Id对应的自己进程存储的镜像队列成员信息fun (#member { pending_ack = PA } = Member, Activity1) ->%% 发送者和自己是一个人则表示消息已经发送回来,或者判断发送者是否在死亡列表中case is_member_alias(Id, Self, View) of%% 此情况是发送者和自己是同一个人或者发送者已经死亡true ->%% 根据左侧GM进程发送过来的ID最新的成员信息和本GM进程ID对应的成员信息得到已经发布的信息{_AcksInFlight, Pubs, _PA1} = find_prefix_common_suffix(PALeft, PA),%% 重新将自己的消息发布{Member #member { last_ack = LA },%% 组装发送的内容和ack消息结构activity_cons(Id, pubs_from_queue(Pubs), [], Activity1)};false ->%% 根据左侧GM进程发送过来的ID最新的成员信息和本GM进程ID对应的成员信息得到Ack和Pub列表%% 上一个节点少的消息就是已经得到确认的消息,多出来的是新发布的消息{Acks, _Common, Pubs} =find_prefix_common_suffix(PA, PALeft),{Member,%% 组装发送的发布和ack消息结构activity_cons(Id, pubs_from_queue(Pubs), acks_from_queue(Acks), Activity1)}endend, Id, MembersStateActivity)end, {MembersState, activity_nil()}, AllMembers),handle_msg({activity, Left, activity_finalise(Activity)},State #state { members_state = MembersState1 });

(2) GM 进程内部广播

%% GM进程内部广播的接口(先调用本GM进程的回调进程进行处理消息,然后将广播数据放入广播缓存中)
internal_broadcast(Msg, SizeHint,State = #state { self                = Self,pub_count           = PubCount,module              = Module,callback_args       = Args,broadcast_buffer    = Buffer,broadcast_buffer_sz = BufferSize }) ->%% 将发布次数加一PubCount1 = PubCount + 1,{%% 先将消息调用回调模块进行处理Module:handle_msg(Args, get_pid(Self), Msg),%% 然后将广播消息放入广播缓存State #state { pub_count           = PubCount1,broadcast_buffer    = [{PubCount1, Msg} | Buffer],broadcast_buffer_sz = BufferSize + SizeHint}}.

(3)缓存消息发送定时器

%% 确保广播定时器的关闭和开启,当广播缓存中有数据则启动定时器,当广播缓存中没有数据则停止定时器
%% 广播缓存中没有数据,同时广播定时器不存在的情况
ensure_broadcast_timer(State = #state { broadcast_buffer = [],broadcast_timer  = undefined }) ->State;
%% 广播缓存中没有数据,同时广播定时器存在,则直接将定时器删除掉
ensure_broadcast_timer(State = #state { broadcast_buffer = [],broadcast_timer  = TRef }) ->erlang:cancel_timer(TRef),State #state { broadcast_timer = undefined };
%% 广播缓存中有数据且没有定时器的情况
ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),State #state { broadcast_timer = TRef };
ensure_broadcast_timer(State) ->State.

注:当处理消息时,缓存中的内容大小超过100M 则不会等定时器触发,会立刻将消息发给自己右侧的 GM 进程。

2.消息同步

​ 配置镜像队列时有一个属性ha-sync-mode,支持两种模式 automatic 或 manually 默认为 manually。

​ 当 ha-sync-mode = manually,新节点加入到镜像队列组后,可以从左节点获取当前正在广播的消息,但是在加入之前已经广播的消息无法获取,所以会处于镜像队列之间数据不一致的情况,直到加入之前的消息都被消费后,主从镜像队列数据保持一致。当加入之前的消息未全部消费完之前,主节点宕机,新节点选为主节点时,这部分消息将丢失。

​ 当 ha-sync-mode = automatic,新加入组群的 Slave 节点会自动进行消息同步,使主从镜像队列数据保持一致。

作者:jaredCoder
链接:https://www.jianshu.com/p/f917067bcee3
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

RabbitMQ镜像队列实现原理相关推荐

  1. RabbitMQ镜像队列原理分析

    对于RabbitMQ的节点来说,有单节点模式和集群模式两种,其中集群模式又分为普通集群模式和镜像队列集群模式,在<RabbitMQ集群架构搭建与高可用性实现>文中,介绍了RabbitMQ的 ...

  2. RabbitMQ + 镜像队列 + HAProxy 实现负载均衡的集群

    RabbitMQ + 镜像队列 + HAProxy 实现负载均衡的集群 一.集群管理(RabbitMQ扩容) 1. 环境介绍 hostname ip mq1 192.168.80.16 mq2 192 ...

  3. rabbitmq——镜像队列

    转自:http://my.oschina.net/hncscwc/blog/186350?p=1 1. 镜像队列的设置 镜像队列的配置通过添加policy完成,policy添加的命令为: rabbit ...

  4. RabbitMQ镜像队列与负载均衡

    镜像队列 RabbitMQ集群是由多个broker节点构成的,那么从服务的整体可用性上来讲,该集群对于单点失效是有弹性的,但是同时也需要注意:尽管exchange和binding能够在单点失效问题上幸 ...

  5. 【深入理解RabbitMQ原理】RabbitMQ 普通队列与镜像队列 底层原理

    RabbitMQ 底层实现原理 普通MQ的结构 MQ内部大致又可以分为两部分:amqueue和backing queue, amqqueue负责实现amqp协议规定的mq的基本逻辑, backing ...

  6. 消息队列探秘 – RabbitMQ 消息队列工作原理

    1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有 ...

  7. RabbitMQ消息队列工作原理及集成使用

    消息 指的是两个应用间传递的数据.数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象. 消息队列(Message Queue) 是在消息的传输过程中保存消息的容器. 在消息队列中,通常有 ...

  8. RabbitMQ 的延时队列和镜像队列原理与实战

    在阿里云栖开发者沙龙PHP技术专场上,掌阅资深后端工程师.掘金小测<Redis深度历险>作者钱文品为大家介绍了RabbitMQ的延时队列和镜像队列的原理与实践,重点比较了RabbitMQ提 ...

  9. Rabbitmq集群,镜像队列和分布式原理

    前言 基于前两次的分享会,结合rabbitmq相关知识,做一个小结.说明一致性的设计思想,在此说明相关的基础理论. CAP定理: 在计算机科学里,CAP定理又被称作布鲁尔定理(Brewer theor ...

最新文章

  1. 隐藏驱动模块(源码)
  2. 关于C++中的友元函数的总结
  3. jQuery中操作元素节点appendTo()与prependTo()的区别
  4. 05 HTML字符串转换成jQuery对象、绑定数据到元素上
  5. mysql数据库业务逻辑_Mysql业务设计(逻辑设计)
  6. Lesson Plan 教学计划 翻译
  7. 隐藏终端、暴露终端和RTS、CTS机制
  8. linux系统搭建监控,Linux系统搭建zabbix监控系统实例讲解
  9. 每天Leetcode 刷题 初级算法篇-设计问题-最小栈
  10. 高数————思维导图(上岸必备)(向量代数与几何部分)
  11. 321影音 多功能播放器
  12. 网络流(二) 最大流算法的实现
  13. 阿里首席架构师讲解“双十一”亿级流量高并发的系统架构搭建方法
  14. 怎么简单快速一个钟头入侵网站
  15. charles+安卓模拟器采集豆果美食app
  16. LogAnomaly: Unsupervised Detection of Sequential and Quantitative Anomalies in Unstructured Logs
  17. 维基解密再爆猛料:CIA利用漏洞入侵全球数十亿个人电子设备
  18. 【无人机三维路径规划】基于A算法解决三维路径规划问题含危险障碍地形含Matlab源码
  19. challenge是什么意思_英语单词学习-challenge是什么意思_翻译_用法_例句
  20. UE4.26源码版学习广域网独立服务器时遇到的客户端运行黑屏问题

热门文章

  1. 主动被动获取隐藏的SSID名称
  2. CenterNet: Keypoint Triplets for Object Detection ----- 论文翻译理解
  3. 维修计算机起名字,电脑维修店起名,电脑维修店起名大全
  4. numpy.random函数整合(部分)
  5. html radio 设置不选中,JQuery控制radio选中和不选中方法
  6. 9位混合密码压缩包解密工具
  7. 2021-08-19集合框架综合案例
  8. Vue前端js循环遍历数组八种方法总结最新
  9. PHP imagecreate - 生成自定义图片
  10. uniapp app端导出excel的探索和实现(二)