Learn Riak Core Step By Step 3

Riak Core, Conflict Resolution

这一章主要描述最终一致性和如何实现强一致性。

Object

非重重要的一个数据结构:

rts_obj : #rts_obj | not_found record( rts_obj,  {                    val ::  #incr  | statebox:statebox,
                    vclock  ::   vclock:vclock}
)vclock :: [ vclock_node(), {counter() , timestamp()}]record( incr , {
                    total :: pos_integer(),
                    counts :: dict() }).record( statebox , {
                        value :: term(),
                        %% sorted list of operations (oldest first)
                        queue :: [event()],
                        last_modified :: timestamp()}).event() :: {timestamp(), op()}op() :: basic_op | [op()]basic_op () :: {MFA}

incr记录的total表示总数, counts表示每一项的数,如: counts = [1, 2, 3, 4] ,那么 total 就等于10;incr会被用在incr或者incrBy中,而statebox会被用在sets中。

注意

因为statebox和incr没有node的概念,所以才结合vclock一起使用。

源码:

%% @doc A suite of functions that operate on the algebraic data type
%% `rts_obj'.
%%
%% TODO Possibly move type/record defs in there and use accessor funs
%% and opaque types.
-module(rts_obj).
-export([ancestors/1, children/1, equal/1, equal/2, merge/1, unique/1,update/3]).
-export([val/1, vclock/1]).-include("rts.hrl").%% @pure
%%
%% @doc Given a list of `rts_obj()' return a list of all the
%% ancestors.  Ancestors are objects that all the other objects in the
%% list have descent from.
-spec ancestors([rts_obj()]) -> [rts_obj()].
ancestors(Objs0) ->Objs = [O || O <- Objs0, O /= not_found],As = [[O2 || O2 <- Objs,ancestor(O2#rts_obj.vclock,O1#rts_obj.vclock)] || O1 <- Objs],unique(lists:flatten(As)).%% @pure
%%
%% @doc Predicate to determine if `Va' is ancestor of `Vb'.
-spec ancestor(vclock:vclock(), vclock:vclock()) -> boolean().
ancestor(Va, Vb) ->vclock:descends(Vb, Va) andalso (vclock:descends(Va, Vb) == false).%% @pure
%%
%% @doc Given a list of `rts_obj()' return a list of the children
%% objects.  Children are the descendants of all others objects.
children(Objs) ->unique(Objs) -- ancestors(Objs).%% @pure
%%
%% @doc Predeicate to determine if `ObjA' and `ObjB' are equal.
-spec equal(ObjA::rts_obj(), ObjB::rts_obj()) -> boolean().
equal(#rts_obj{vclock=A}, #rts_obj{vclock=B}) -> vclock:equal(A,B);
equal(not_found, not_found) -> true;
equal(_, _) -> false.%% @pure
%%
%% @doc Closure around `equal/2' for use with HOFs (damn verbose
%% Erlang).
-spec equal(ObjA::rts_obj()) -> fun((ObjB::rts_obj()) -> boolean()).
equal(ObjA) ->fun(ObjB) -> equal(ObjA, ObjB) end.%% @pure
%%
%% @doc Merge the list of `Objs', calling the appropriate reconcile
%% fun if there are siblings.
-spec merge([rts_obj()]) -> rts_obj().
merge([not_found|_]=Objs) ->P = fun(X) -> X == not_found end,case lists:all(P, Objs) oftrue -> not_found;false -> merge(lists:dropwhile(P, Objs))end;merge([#rts_obj{}|_]=Objs) ->case rts_obj:children(Objs) of[] -> not_found;[Child] -> Child;Chldrn ->Val = rts_get_fsm:reconcile(lists:map(fun val/1, Chldrn)),MergedVC = vclock:merge(lists:map(fun vclock/1, Chldrn)),#rts_obj{val=Val, vclock=MergedVC}end.%% @pure
%%
%% @doc Given a list of `Objs' return the list of uniques.
-spec unique([rts_obj()]) -> [rts_obj()].
unique(Objs) ->F = fun(not_found, Acc) ->Acc;(Obj, Acc) ->case lists:any(equal(Obj), Acc) oftrue -> Acc;false -> [Obj|Acc]endend,lists:foldl(F, [], Objs).%% @pure
%%
%% @doc Given a `Val' update the `Obj'.  The `Updater' is the name of
%% the entity performing the update.
-spec update(val(), node(), rts_obj()) -> rts_obj().
update(Val, Updater, #rts_obj{vclock=VClock0}=Obj0) ->VClock = vclock:increment(Updater, VClock0),Obj0#rts_obj{val=Val, vclock=VClock}.-spec val(rts_obj()) -> any().
val(#rts_obj{val=Val}) -> Val;
val(not_found) -> not_found.%% @pure
%%
%% @doc Given a vclock type `Obj' retrieve the vclock.
-spec vclock(rts_obj()) -> vclock:vclock().
vclock(#rts_obj{vclock=VC}) -> VC.

sadd

根据 obj来理清sadd操作的细节:

sadd 用来管理日志中的代理项,也就是agents

Coordinator : 协调器,就是节点,节点会被更新到vclock中去。

操作,如:statebox:modify({sets, add_element, [Val]}, SB0), 会被更新到statebox中去,statebox主要用来存储操作(MFA)

handle_command({sadd, {ReqID, Coordinator}, StatName, Val},
               _Sender, #state{stats=Stats0}=State) ->
    SB =
        case dict:find(StatName, Stats0) of
            {ok, #rts_obj{val=SB0}=O} ->
                SB1 = statebox:modify({sets, add_element, [Val]}, SB0),
                SB2 = statebox:expire(?STATEBOX_EXPIRE, SB1),
                rts_obj:update(SB2, Coordinator, O);
            error ->
                SB0 = statebox:new(fun sets:new/0),
                SB1 = statebox:modify({sets, add_element, [Val]}, SB0),
                VC0 = vclock:fresh(),
                VC = vclock:increment(Coordinator, VC0),
                #rts_obj{val=SB1, vclock=VC}
        end,
    Stats = dict:store(StatName, SB, Stats0),
    {reply, {ok, ReqID}, State#state{stats=Stats}};

操作方法是sets 集合,满足statebox对操作函的约束。

StateName: "agents" Coordinator: 'rts@127.0.0.1' Val: "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.25 (KHTML, like Gecko) Ubuntu/10.10 Chromium/12.0.705.0 Chrome/12.0.705.0 Safari/534.25" StateName: "agents" Coordinator: 'rts@127.0.0.1' Val: "AppEngine-Google; (+http://code.google.com/appengine; appid: ahrefs)"

srem

srem 和sadd相反,一个是增加元素的操作一个是删除的操作。

incrby 和 incr

某个协调器要对某个属性进行增量操作,这里没有增量减的原因是操作数可以表示增加和减少的操作,其实也没有减操作。

incr只要把操作数设为1即可。

会走此路径的选项有: total_sent发送的总字节数, GET请求数目, total_reqs总请求数, code(200, 400, 500)数目。

handle_command({incrby, {ReqID, Coordinator}, StatName, IncrBy}, _Sender, #state{stats=Stats0}=State) ->Obj =case dict:find(StatName, Stats0) of{ok, #rts_obj{val=#incr{total=T0, counts=C0}}=O} ->T = T0 + IncrBy,C = dict:update_counter(Coordinator, IncrBy, C0),Val = #incr{total=T, counts=C},rts_obj:update(Val, Coordinator, O);error ->Val = #incr{total=IncrBy,counts=dict:from_list([{Coordinator, IncrBy}])},VC0 = vclock:fresh(),VC = vclock:increment(Coordinator, VC0),#rts_obj{val=Val, vclock=VC}end,Stats = dict:store(StatName, Obj, Stats0),{reply, {ok, ReqID}, State#state{stats=Stats}};

冲突解决

如果出现冲突时,会在读操作的时候解决。

Veclock Clocks

Riak一个使用vector clocks来检测童一对象的版本冲突,rts中使用了rts_obj这个模块适配vclocks. 如果你了解riak_object,你会发现有很多相似的地方。希望在未来,这些冲突解决策略会被集成到riak core中(其实在2.0以后已经集成了,在2.0后的模块是riak_dts)。

vlock难易程度在于个人, 本章主要介绍向量时钟如何使用逻辑顺序来检测同一对象的不同版本。通过分配一个逻辑时间序列给每个版本,然后通过比较可以知道是否在某些时候发生了分区。如果分区发生了,那么意味每个版本都有丢失数据的可能,然而这还得依赖于对喜爱的存储方式。比如:集合在只有增加元素的情况下,即使平行版本出现对同一个对象的两次相加,也不会出现冲突,因为他们最后在集合中也只会出现一个,在rts中的agents就是使用sets,另一方面就是计数统计counters stats,如total_sent(跟踪网络服务器发送字节的总大小)。对于这种情况,如果出现平行版本,那么意味着每一个版本都有可能出现丢失一些由其他版本发送的字节, 如:

Node A Node B Node C
total_sent + 500 on Coordinator A
500 [{A,1}] 500 [{A,1}] 500 [{A,1}]
total_sent + 200 on Coordinator A
700 [{A,2}] 700 [{A,2}] 700 [{A,2}]
total_sent + 350 on Coordinator C
1050 [{A,2}, {C,1}] 1050 [{A,2}, {C,1}] 1050 [{A,2}, {C,1}]
Network Split -- (A,B), (C)
total_sent + 100 on Coordinator C
1050 [{A,2}, {C,1}] 1050 [{A,2}, {C,1}] 1150 [{A,2}, {C,2}]
total_sent + 500 on Coordinator B
1550 [{A,2}, {B,1}, {C,1}] 1550 [{A,2}, {B,1}, {C,1}] 1150 [{A,2}, {C,2}]
Network Repaired -- (A,B,C)
total_sent + 50 on Coordinator A
1600 [{A,3}, {B,1}, {C,1}] 1600 [{A,3}, {B,1}, {C,1}] 1200 [{A,3}, {C,2}]
GET total_sent on Coordinator A

上面的例子中,在版本A和B中丢失了100个字节,这些字节是在发生分区期间作用于节点C的。如果你查看向量时钟你会发现节点A和节点B是相同的,但是和节点C不一样,节点C多了一个在节点C协调器下的操作日志。这就说明这些版本版本出现了冲突,必须要解决。

rts_obj会检测冲突,coodinator(rts_get_fsm)会调用merge函数合并虚拟节点的多备份合并为一个。一般来说,如果返回是一个有序列的,无平衡排序的,那么合并根据逻辑时间返回简单地返回一个最新的对象,否则,如果出现平衡版本,他会平衡这些值,合并他们的向量时钟.

merge([#rts_obj{}|_]=Objs) ->case rts_obj:children(Objs) of[] -> not_found;[Child] -> Child;Chldrn ->Val = rts_get_fsm:reconcile(lists:map(fun val/1, Chldrn)),MergedVC = vclock:merge(lists:map(fun vclock/1, Chldrn)),#rts_obj{val=Val, vclock=MergedVC}end.

可以看到merge不会真正的检测冲突,他会交给children/1函数。只有多个孩子返回的时候才会出现冲突,如果只有一个孩子返回,说明所有对喜爱都是依次线性存在的,彼此之间没有冲突,不要只相信作者^?^, 相信自己^@^.

%% @doc Given a list of `rts_obj()' return a list of the children
%% objects.  Children are the descendants of all others objects.
children(Objs) ->unique(Objs) -- ancestors(Objs).

如果你还还不相信,你得好好看一下unique/1ancestors/1了,注意not_fould是一种特殊的例子,他是所有值的祖先,因此会被过滤掉。

%% @doc Given a list of `rts_obj()' return a list of all the
%% ancestors.  Ancestors are objects that all the other objects in the
%% list have descent from.
-spec ancestors([rts_obj()]) -> [rts_obj()].
ancestors(Objs0) ->Objs = [O || O <- Objs0, O /= not_found],As = [[O2 || O2 <- Objs,ancestor(O2#rts_obj.vclock,O1#rts_obj.vclock)] || O1 <- Objs],unique(lists:flatten(As)).%% @doc Predicate to determine if `Va' is ancestor of `Vb'.
-spec ancestor(vclock:vclock(), vclock:vclock()) -> boolean().
ancestor(Va, Vb) ->vclock:descends(Vb, Va) andalso (vclock:descends(Va, Vb) == false).%% @doc Given a list of `Objs' return the list of uniques.
-spec unique([rts_obj()]) -> [rts_obj()].
unique(Objs) ->F = fun(not_found, Acc) ->Acc;(Obj, Acc) ->case lists:any(equal(Obj), Acc) oftrue -> Acc;false -> [Obj|Acc]endend,lists:foldl(F, [], Objs).

检测冲突是成功的一般,另一半是解决冲突。

Reconciling Conflicts

在你的系统中要调节冲突,你必须了解存储的数据格式。如:在Riak中,数据是一个不通明的二进制对象,意味着Riak不会关心他们,也不会解决冲突,因为他们不了解这些数据。默认Riak采用一种vlock 时间戳的Last Write Wins (Lww)的方式,来选择最新的对象版本。如果用户不能接受这种方式,用户就要把allow_mult设置为true, 这样riak会让多个版本共存,在读取的时候,会返回所有的版本给调用者,调用者需要自己解决。

有时候,为了协调冲突,我们需要不仅要保存数据还要保存很多上下文信息。上面的例子中返回了16001200,最后我们无法知道丢失了多少数据, 只能采用折中的方法取1600, 是想一下,对A加50,上面的例子会是这样:

A B C
50{A,1} 50{A,1} 50{A,1}

如果采用这种策略,增加多点上下文信息:

50{A,50} 50{A,50} 50{A,50}

下面的例子可以适用在incr 或者incrBy

Node A Node B Node C
incr on Coordinator A
1 [{A,1}] 1 [{A,1}] 1 [{A,1}]
incr Coordinator A
2 [{A,2}] 2 [{A,2}] 2 [{A,2}]
incr on Coordinator C
3 [{A,2}, {C,1}] 3 [{A,2}, {C,1}] 3 [{A,2}, {C,1}]
Network Split -- (A,B), (C)
incr on Coordinator C
3 [{A,2}, {C,1}] 3 [{A,2}, {C,1}] 4 [{A,2}, {C,2}]
incr on Coordinator B
4 [{A,2}, {B,1}, {C,1}] 4 [{A,2}, {B,1}, {C,1}] 4 [{A,2}, {C,2}]
Network Repaired -- (A,B,C)
incr on Coordinator A
5 [{A,3}, {B,1}, {C,1}] 5 [{A,3}, {B,1}, {C,1}] 5 [{A,3}, {C,2}]
GET total_sent on Coordinator A

然后reconciling一下:


Reconciled Object = {A,3} + {B,1} + {C,2} => 6 [{A,3}, {B,1}, {C,2}]

其实在estatebox也有这种解决方案, 当然使用vclock +#incr容易点。

handle_command({incrby, {ReqID, Coordinator}, StatName, IncrBy}, _Sender, #state{stats=Stats0}=State) ->Obj =case dict:find(StatName, Stats0) of{ok, #rts_obj{val=#incr{total=T0, counts=C0}}=O} ->T = T0 + IncrBy,C = dict:update_counter(Coordinator, IncrBy, C0),Val = #incr{total=T, counts=C},rts_obj:update(Val, Coordinator, O);error ->Val = #incr{total=IncrBy,counts=dict:from_list([{Coordinator, IncrBy}])},VC0 = vclock:fresh(),VC = vclock:increment(Coordinator, VC0),#rts_obj{val=Val, vclock=VC}end,Stats = dict:store(StatName, Obj, Stats0),{reply, {ok, ReqID}, State#state{stats=Stats}};

接着就可以调用reconciling 来计算counters了:

-spec reconcile([A::any()]) -> A::any().
reconcile([#incr{}|_]=Vals) ->Get = fun(K, L) -> proplists:get_value(K, L, 0) end,Counts = [dict:to_list(V#incr.counts) || V <- Vals],Nodes = unique(lists:flatten([[Node || {Node,_} <- C] || C <- Counts])),MaxCounts = [{Node, lists:max([Get(Node, C) || C <- Counts])}|| Node <- Nodes],Total = lists:sum([lists:max([Get(Node, C) || C <- Counts])|| Node <- Nodes]),#incr{total=Total, counts=dict:from_list(MaxCounts)};

这是一种很好的方法在计数方面,如果是他数据结构会怎样呢,下面介绍其他的的数据结构时所采用的方法。

Reconcoling With StateBox

rts里既有计算器也有集合的功能,目前集合主要用在跟踪用户代理的服务器,这就意味着只能增加元素到集合里,然后通过union进行协调,然而如果你有删除操作,他就不能工作了。例如:

rts跟踪用户登录与注销事件,通过集合来跟踪所有登录的用户:

Node A Node B
user_login rzezeski on coordinator A
{rzezeski} {rzezeski}
user_login whilton on coordinator B
{rzezeski, whilton} {rzezeski, whilton}
Network Split -- (A) (B)
user_logout rzezeski on coordinator A
{whilton} {rzezeski, whilton}
user_logout whilton on coordinator B
{whilton} {rzezeski}
Partition Heal -- (A,B)
GET online_users

在分区恢复之后,如果RTS只是简单地union集合,那么rzezeskiwhilton会一直在线,事实上,他们都已经下线了.和计数器一样,我们需要更多的上下文信息来解决这些错误,更具体来说,我们需要知道操作是在网络分区发生的.以至于在分区恢复之后,可以重现他们. statebox就说跟这种活的.

本质上说,statebox 提供了一个事件窗口,以及其导致的结果值,我强调窗口,因为他是有限制的.你不能记住以前所有的事件,因为存储他们和遍历他们都需要很大的代价. 另外,如果你的集群大部分事件都是相连的,也没有什么原因需要记住的些老得事件,因为他们已经成功的传播到集群中去了.比较麻烦的时在发生网络分区的时候.比较关键的是statebox的窗口一定要大于partition窗口(包括时间项和操作总数),否则你将会丢失事件进而丢失数据.是否可接受完全在于你的应用和应用的数据需求.

继续讨论例子,statebox如何解决上面问题??, statebox会选择这些值中的一个然后追加到合并之后的所有操作中运算,这也就意味着statebox能处理的数据是有限制的,具体细节Lick

rts_state_vnode 使用了statebox来跟踪集合的sadd 和 srem的操作,需要注意的是我的窗口是按时间来排序的,也就是说,每一个statebox都会跟踪发生在有效期间内(?STATEBOX_EXPIRE)的窗口的所有的操作. 也就意味着操作数的跟踪是无限的,这就有可能发生很多的问题如果有大量的写操作发生.这也意味着分区的持续时间比?STATEBOX_EXPIRE长,那么就有可能出现数据的丢失. 因为超时的处理操作是要显式执行的,并且是在rts写的过程中.

handle_command({sadd, {ReqID, Coordinator}, StatName, Val},_Sender, #state{stats=Stats0}=State) ->SB = case dict:find(StatName, Stats0) of{ok, #rts_obj{val=SB0}=O} ->SB1 = statebox:modify({sets, add_element, [Val]}, SB0),SB2 = statebox:expire(?STATEBOX_EXPIRE, SB1),rts_obj:update(SB2, Coordinator, O);error ->SB0 = statebox:new(fun sets:new/0),SB1 = statebox:modify({sets, add_element, [Val]}, SB0),VC0 = vclock:fresh(),VC = vclock:increment(Coordinator, VC0),#rts_obj{val=SB1, vclock=VC}end,Stats = dict:store(StatName, SB, Stats0),{reply, {ok, ReqID}, State#state{stats=Stats}};

如果出现冲突,那么处理也是非常简单:

reconcile([V|_]=Vals) when element(1, V) == statebox -> statebox:merge(Vals).

其实在agents功能中是不需要reconcile的,因为他只有append操作,但是为了更加全面介绍statebox,作者才加入了reconcile.

Read Repaire

finalize(timeout, SD=#state{replies=Replies, stat_name=StatName}) ->MObj = merge(Replies),case needs_repair(MObj, Replies) oftrue ->repair(StatName, MObj, Replies),{stop, normal, SD};false ->{stop, normal, SD}end.

read repair函数并不会在merge后就调用, 它会一直等到N == 已经读取的节点数时才会被调用,merge只是解决冲突而已,冲突发生的原因是某些节点出现了问题,所以要对出现问题的节点进行修复.

rts_get_fsm:repair:

repair(StatName, MObj, [{IdxNode,Obj}|T]) ->case rts_obj:equal(MObj, Obj) of¦   true -> repair(StatName, MObj, T);¦   false ->¦   ¦   rts_stat_vnode:repair(IdxNode, StatName, MObj),¦   ¦   repair(StatName, MObj, T)end.

每个节点的信息依次和merge后的结果比较,如果不一样,那么给节点就需要修复.

rts_stat_vnode:repair:

 repair(IdxNode, StatName, Obj) ->riak_core_vnode_master:command(IdxNode,¦   ¦   ¦   ¦   ¦   ¦   ¦   ¦  {repair, undefined, StatName, Obj},¦   ¦   ¦   ¦   ¦   ¦   ¦   ¦  ignore,¦   ¦   ¦   ¦   ¦   ¦   ¦   ¦  ?MASTE

rts_stat_handle_command:

handle_command({repair, undefined, StatName, Obj}, _Sender, #state{stats=Stats0}=State) ->error_logger:error_msg("repair performed ~p~n", [Obj]),Stats = dict:store(StatName, Obj, Stats0),{noreply, State#state{stats=Stats}};

Failure Scenarios

下面演示一下:

先把全部的数据写到rts cluster去.

<我是在mac OS 下面操作的
bash-3.2$ cat cluster.sh
#!/bin/bashfor d in dev/dev*; do $d/bin/rts stop; done
for d in dev/dev*; do $d/bin/rts start; done
for d in dev/dev{2,3};  do $d/bin/rts-admin join rts1@127.0.0.1; done
./dev/dev1/bin/rts-admin ringready
cat progski.access.log | head -20 | bash replay --devrel progski

Node Goes Down

bash-3.2$ ./dev/dev1/bin/rts attach
Attaching to /tmp//Users/r/workspace/try-try-try/2011/riak-core-conflict-resolution/rts/dev/dev1/erlang.pipe.1 (^D to exit)(rts1@127.0.0.1)1> rts:get("progski", "GET").
Replies: [{{296867520082839655260123481645494988367611297792,'rts2@127.0.0.1'},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}},{{274031556999544297163190906134303066185487351808,'rts1@127.0.0.1'},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}}]Objs: [{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}]Objs: [{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}]19
(rts1@127.0.0.1)2> rts:get_dbg_preflist("progski", "GET").
[{{274031556999544297163190906134303066185487351808,'rts1@127.0.0.1'},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],...},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],...}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}},{{296867520082839655260123481645494988367611297792,'rts2@127.0.0.1'},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],...},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],...}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}},{{319703483166135013357056057156686910549735243776,'rts3@127.0.0.1'},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],...},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],...}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}}]
(rts1@127.0.0.1)3>

因为我的程序修改过了,所以输出和原版的不一样.

然后退出rts1这个节点:

(rts1@127.0.0.1)3>
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded(v)ersion (k)ill (D)b-tables (d)istribution
a
[End]
bash-3.2$

登录到rts2节点看一下:

bash-3.2$ ./dev/dev2/bin/rts attach
Attaching to /tmp//Users/r/workspace/try-try-try/2011/riak-core-conflict-resolution/rts/dev/dev2/erlang.pipe.1 (^D to exit)(rts2@127.0.0.1)1> rts:get_dbg_preflist("progski", "GET").
[{{296867520082839655260123481645494988367611297792,'rts2@127.0.0.1'},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],...},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],...}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}},{{319703483166135013357056057156686910549735243776,'rts3@127.0.0.1'},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],...},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],...}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}},{{274031556999544297163190906134303066185487351808,'rts2@127.0.0.1'},not_found}]
(rts2@127.0.0.1)2> rts:get("progski", "GET").
Replies: [{{274031556999544297163190906134303066185487351808,'rts2@127.0.0.1'},not_found},{{296867520082839655260123481645494988367611297792,'rts2@127.0.0.1'},{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}}]Objs: [not_found,{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}]19
(rts2@127.0.0.1)3> Objs: [{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]},not_found,{rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}]
=ERROR REPORT==== 30-Nov-2014::16:47:28 ===
repair performed {rts_obj,{incr,19,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[['rts2@127.0.0.1'|4],['rts1@127.0.0.1'|9],['rts3@127.0.0.1'|6]],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}},[{'rts3@127.0.0.1',{6,63584556111}},{'rts2@127.0.0.1',{4,63584556110}},{'rts1@127.0.0.1',{9,63584556110}}]}(rts2@127.0.0.1)3>

注意

rts2 返回的not_fould

节点rts1已经被rts2给替代, 调用rts:get("progski", "GET").时, 数据将被修复,再次调用时,不会有修复的情况,因为已经被修复.其实rts2只是启动了另外一个进程来接管rts1,详细的信息可以看第2篇教程.

使用演示多节点挂掉的情况

  • 启动:

for d in dev/dev*; do $d/bin/rts start; done

  • 登录到rts1节点:
./dev/dev1/bin/rts attach
(rts1@127.0.0.1)1> rts:get("progski", "GET").
...
Ctrl^C Ctrl^C
Now attach to rts2 and confirm one replica reports not_found.

然后退出节点rts1

  • 登录节点rts2
./dev/dev2/bin/rts attach
(rts2@127.0.0.1)1> rts:get_dbg_preflist("progski", "GET").
...
Increment the GET stat, confirm conflicting values, and then perform a read. If everything goes well the read should return 20.(rts2@127.0.0.1)3> rts:incr("progski", "GET").
ok
(rts2@127.0.0.1)4> rts:get_dbg_preflist("progski", "GET").
...{{274031556999544297163190906134303066185487351808,'rts2@127.0.0.1'},{rts_obj,{incr,1,
...
(rts2@127.0.0.1)5> rts:get("progski", "GET").
20
(rts2@127.0.0.1)6>
=ERROR REPORT==== 16-Jun-2011::23:24:07 ===
repair performed {rts_obj,{incr,20,...
(rts2@127.0.0.1)6> rts:get_dbg_preflist("progski", "GET").
What if you fail multiple nodes?Ctrl^C Ctrl^C

然后退出rts2

  • 登录节点rts3
./dev/dev3/bin/rts attach
(rts3@127.0.0.1)1> rts:get_dbg_preflist("progski", "GET").
...{{274031556999544297163190906134303066185487351808,'rts3@127.0.0.1'},not_found},{{296867520082839655260123481645494988367611297792,'rts3@127.0.0.1'},not_found}]
...
(rts3@127.0.0.1)2> rts:get("progski", "GET").
not_found
(rts3@127.0.0.1)3>
=ERROR REPORT==== 16-Jun-2011::23:29:48 ===
repair performed {rts_obj,{incr,20,...
(rts3@127.0.0.1)3> rts:get("progski", "GET").
20

此时rts:get("progski", "GET").返回not_fould,原因很简单,因为R = 2, 读到的节点都是not_fould,当再次读取的时候由于修复了,所以读取的话,能够读到真确的值.

(rts3@127.0.0.1)4> rts:get("progski", "GET", [{r,3}]).
20

Partitioned Writes

为了减少分区的出现,最好的方法还是把写操作尽可能地交给唯一一个协调器来处理,这种思想在很多分布式系统中都用用到,如master-slave这种分布式系统就把master作为写节点.

分区的产生跟节点down是一样的,下面是total_sent的例子.

  • 启动cluster
for d in dev/dev*; do $d/bin/rts start; done
  • 登录节点rts1
./dev/dev1/bin/rts attach
(rts1@127.0.0.1)1> rts:get("progski", "total_sent").
...
(rts1@127.0.0.1)2> rts:get_dbg_preflist("progski", "total_sent").
...

确定所有的备份都是95216,现在分区的方式写入到两个节点中{rts2, rts3}.

(rts1@127.0.0.1)5> rts:dbg_op(incrby, 'rts2@127.0.0.1', ['rts3@127.0.0.1'], "progski", "total_sent", 10000).
ok
(rts1@127.0.0.1)6> rts:get_dbg_preflist("progski", "total_sent").
...

10000加入到两个节点中, 其中第二个参数为协调器, 第三个参数为另外一起操作的节点,现在节点的数值应该是105126.

执行一个读操作将会触发修复行为,即使你是在节点rts1上执行的读操作:

(rts1@127.0.0.1)7> rts:get("progski", "total_sent").
105216
(rts1@127.0.0.1)8>
=ERROR REPORT==== 17-Jun-2011::00:39:17 ===
repair performed {rts_obj,{incr,105216,...(rts1@127.0.0.1)8> rts:get_dbg_preflist("progski", "total_sent").
...{{1233142006497949337234359077604363797834693083136,'rts1@127.0.0.1'},{rts_obj,{incr,105216,
...

Partitioned Writes and Node Down

模拟一种情况,这种情况会造成数据的丢失,假设A,B,C3个节点,C节点发生分区,然后协调器是C,想C写入一个事件,然后分区恢复,但是没用读操作,然后分区又发生了,节点C被分出去了,并且接着节点C Down机了,因为节点C的协调记录到时在内存中的,所以当节点C重启后,这些事件记录已经不存在,就算分区修复,发生在节点C的操作也是会被丢失,自然造成数据的丢失.解决的方法就是将数据和操作持久化.

for d in dev/dev*; do $d/bin/rts start; done
./dev/dev1/bin/rts attach
(rts1@127.0.0.1)19> rts:get("progski", "agents").
...
(rts1@127.0.0.1)20> rts:get_dbg_preflist("progski", "agents").
...
(rts1@127.0.0.1)21> rts:dbg_op(sadd, 'rts1@127.0.0.1', [], "progski", "agents", "Bar Agent").
ok
(rts1@127.0.0.1)22> rts:get_dbg_preflist("progski", "agents").
...
Ctrl^C Ctrl^C./dev/dev2/bin/rts attach
rts:get_dbg_preflist("progski", "agents").
...
(rts2@127.0.0.1)2> rts:get("progski", "agents").
...
(rts2@127.0.0.1)3> rts:get("progski", "agents").
...

再次启动节点:for d in dev/dev*; do $d/bin/rts start; done

此时节点rts1的数据全丢失了,分区发生的事件自然而然也别丢失.

Hinted Handoff & Conflict Resolution

冲突修复不限于在读期间,如在写期间有同步的需要也是要修复冲突的,除此之外,Hinted off也是一种选择.

当一个fallback节点意识到primary节点上线后,它就会进行数据的转移.然而在后备节点意识到主节点上线和进行数据转移之间有一定的延迟,在此时间窗口内,写操作可能在主节点上发生,如果这种情况发生,handoff数据就不能是简单地覆盖本地数据,否则会造成数据的丢失.

最好不要造成节点的Down机,因为数据转移会大大加重系统的负载.

handle_handoff_data(Data, #state{stats=Stats0}=State) ->{StatName, HObj} = binary_to_term(Data),MObj =case dict:find(StatName, Stats0) of{ok, Obj} -> rts_obj:merge([Obj,HObj]);error -> HObjend,Stats = dict:store(StatName, MObj, Stats0),{reply, ok, State#state{stats=Stats}}.

大家可以按照下面的步骤测试一下:

  • Take a node down -- this will cause fallback vnodes to be created.

  • Write some data -- this will cause the fallback vnode to be populated with parallel/conflicting objects relative to the other vnodes. It's important that you not perform a rts:get or else read repair will reconcile them.

  • Restart the downed node -- this will cause the primary to come online with no data.

  • Perform a rts:get to invoke read repair. At this point all primaries have the correct data but you have a fallback that has conflicting data. After some time the fallback will realize the primary is up and will begin handoff.

  • Wait for handoff messages to appear in the console. Retry the rts:get and make sure the data is still correct and no further read repair was made. This proves that the data was reconciled prior to writing it.

注意

handle_handoff_data(Data, #state{stats=Stats0}=State)是在后备节点发现主节点上线后,然后发送通知给主节点,主节点会调用该函数,并且分次调用的.




#番外篇

merge

-spec merge([rts_obj()]) -> rts_obj().
merge([not_found|_]=Objs) ->P = fun(X) -> X == not_found end,case lists:all(P, Objs) oftrue -> not_found;false -> merge(lists:dropwhile(P, Objs))end;merge([#rts_obj{}|_]=Objs) ->case rts_obj:children(Objs) of[] -> not_found;[Child] -> Child;Chldrn ->Val = rts_get_fsm:reconcile(lists:map(fun val/1, Chldrn)),MergedVC = vclock:merge(lists:map(fun vclock/1, Chldrn)),#rts_obj{val=Val, vclock=MergedVC}end.

首先过滤掉not_foundnodeObj, 然后在过虑子孙
如果出现冲突,merge会调用Val = rts_get_fsm:reconcile(lists:map(fun val/1, Chldrn))MergedVC = vclock:merge(lists:map(fun vclock/1, Chldrn))

%% @pure
%%
%% @doc Reconcile conflicts among conflicting values.
-spec reconcile([A::any()]) -> A::any().
reconcile([#incr{}|_]=Vals) ->Get = fun(K, L) -> proplists:get_value(K, L, 0) end,Counts = [dict:to_list(V#incr.counts) || V <- Vals],Nodes = unique(lists:flatten([[Node || {Node,_} <- C] || C <- Counts])),MaxCounts = [{Node, lists:max([Get(Node, C) || C <- Counts])}|| Node <- Nodes],Total = lists:sum([lists:max([Get(Node, C) || C <- Counts])|| Node <- Nodes]),#incr{total=Total, counts=dict:from_list(MaxCounts)};reconcile([V|_]=Vals) when element(1, V) == statebox -> statebox:merge(Vals).

至于reconcile的计算方法,这里就不多讲了,注意的就是计数器(#incr --> vclock)集合(sets --> statebox)的计算法是不一样的,可以参照本人的博客或者这个项目:EStateBox

rts_obj:children:

  • uniqu过滤not_found, 返回唯一序列

  • ancestors 把子孙找出来

最后过滤掉多余的子孙,返回结果,如果结果还不等于[]或者1,那么可以推断出冲突已经发生了;如:

没有冲突:[{1, 2}, {1, 2, 3}, not_found, {1, 2, 3}] -->merge() ---> [{1, 2, 3}]
出现冲突:[{1, 2}, {1, 2, 3}, not_found, {1,4}}] --> [{1, 2, 3}, {1, 4}]
-spec ancestors([rts_obj()]) -> [rts_obj()].
ancestors(Objs0) ->Objs = [O || O <- Objs0, O /= not_found],As = [[O2 || O2 <- Objs,ancestor(O2#rts_obj.vclock,O1#rts_obj.vclock)] || O1 <- Objs],unique(lists:flatten(As)).%% @pure
%%
%% @doc Predicate to determine if `Va' is ancestor of `Vb'.
-spec ancestor(vclock:vclock(), vclock:vclock()) -> boolean().
ancestor(Va, Vb) ->vclock:descends(Vb, Va) andalso (vclock:descends(Va, Vb) == false).%% @pure
%%
%% @doc Given a list of `rts_obj()' return a list of the children
%% objects.  Children are the descendants of all others objects.
children(Objs) ->unique(Objs) -- ancestors(Objs).%% @pure
%%
%% @doc Given a list of `Objs' return the list of uniques.
-spec unique([rts_obj()]) -> [rts_obj()].
unique(Objs) ->F = fun(not_found, Acc) ->Acc;(Obj, Acc) ->case lists:any(equal(Obj), Acc) oftrue -> Acc;false -> [Obj|Acc]endend,lists:foldl(F, [], Objs).
%% @pure
%%
%% @doc Predeicate to determine if `ObjA' and `ObjB' are equal.
-spec equal(ObjA::rts_obj(), ObjB::rts_obj()) -> boolean().
equal(#rts_obj{vclock=A}, #rts_obj{vclock=B}) -> vclock:equal(A,B);
equal(not_found, not_found) -> true;
equal(_, _) -> false.%% @pure
%%
%% @doc Closure around `equal/2' for use with HOFs (damn verbose
%% Erlang).
-spec equal(ObjA::rts_obj()) -> fun((ObjB::rts_obj()) -> boolean()).
equal(ObjA) ->fun(ObjB) -> equal(ObjA, ObjB) end.


这是riak core 最后一篇教程了, 谢谢大家!!!

感谢rzezeski

Riak Core Guide 3相关推荐

  1. Riak Core Guide 1

    Learn Riak Core Step By Step riak core 是 riak的主要组成部分,主要负责分布式的部分,虽然官方有自己的存储后端,但是我们也可以使用其他的后端存储. Parti ...

  2. 对Riak Core的探索 (1) Hello

    haogongju.人人IT网.59n南龙.360doc不要抄我的烂博客了,私人备忘用. [size=x-large]基于Riak Core的开发指南[/size] [size=large]1. he ...

  3. 点评可调整大小哈希表:Riak Core和随机切片技术

    \ 本文要点 \\ 哈希表是一种用于管理空间的数据结构.它最初用于单一应用的内存,之后应用于大规模的计算集群.\\t 随着哈希表在一些新领域的应用,原有的哈希表大小调整方法存在一些不好的副作用.\\t ...

  4. TensorFlow新功能「AutoGraph」:将Python转换为计算图

    伊瓢 编译自 TensorFlow博客  量子位 报道 | 公众号 QbitAI 昨天,TensorFlow推出了一个新功能「AutoGraph」,可以将Python代码(包括控制流print()和其 ...

  5. 使用 DS-MDK 开发 NXP iMX7

    1). 简介 NXP  i.MX7 处理器是一款具有 Cortex-A7 和 M4 的异构多核处理器.A7能够运行 Linux 等操作系统,完成GUI.网络.文件管理和算法运算等复杂任务,而 M4 则 ...

  6. 【UCB操作系统CS162项目】Pintos Lab1:线程调度 Threads

    实验文档链接:Lab1: Threads 我的实现(更新至Lab 2):Altair-Alpha/pintos 开始之前 如文档所述,在开始编写代码之前需要先阅读掌握 GETTING STARTED ...

  7. 【UCB操作系统CS162项目】Pintos Lab0:项目上手 (Getting Real)

    前言 Stanford 的 CS144 计网完成后让我们继续挑战一项更难的课程项目:UCB 操作系统 CS162 的 Pintos,这个也是多个 CS 顶校都在用的项目.老规矩讲课部分因为本科基本都学 ...

  8. AUTOSAR文档如何阅读 -- 这些缩写是干嘛的!!!

    目录 1 Autosar BSW Module List 2 Autosar规范文档的类型 2.1 Autosar文档中提到的缩写 3 如何快速查看相邻CP Autosar版本之间的差异 结尾 优质博 ...

  9. BPF CO-RE reference guide

    BPF CO-RE reference guide 目录 The missing manual Reading kernel data bpf_core_read() bpf_core_read_st ...

最新文章

  1. 深度学习之核心要素:输入输出、目标函数、前向传播、后向传播、学习率、梯度下降
  2. mybatis Android,mybatis使用selectByPrimaryKey出错
  3. mysql5.7 启动报发生系统错误2
  4. spring jdbctemplate调用存储过程,返回list对象
  5. JUnit与TestNG:您应该选择哪种测试框架?
  6. python paramiko长连接_【Python】 SSH连接的paramiko
  7. Installation error code: -103签名不一致错误
  8. STM32(十一)- 串行FLASH文件系统FatFs
  9. STM32串口中断接收
  10. -XX:NewRatio 命令
  11. 普渡大学计算机科学师生比,公立常春藤高校普渡大学,附申请要求+录取难度!...
  12. Cannot find SourceMap 'XXX.js.map'问题解决
  13. Backtrack5 搭建Nessus
  14. 塑料制品行业市场产业集中度分析预测及经营状况可行性研究
  15. #千锋逆战班,拼搏永向前#
  16. qq空间微博等更多社交平台分享
  17. MacOS中dyld: Library not loaded的错误修正
  18. msp430g2553 ADC10
  19. 《计算机网络》局域网
  20. Lazarus for Raspbian安装

热门文章

  1. 要怎么才能有效降低论文的查重率呢?
  2. P7791 [COCI2014-2015#7] TETA 题解返回题目
  3. 盘点数据库2013之一:数据分析崛起
  4. 四种访问修饰符---(Java版)
  5. 用java编写输出欢迎光临_编写一个完整的Java applet程序,程序功能为:在屏幕上输出“欢迎光临Java世界!”的字符串信息。...
  6. ospf的一类,二类,和三类LSA详解
  7. linux字符点阵,ASCII 字符的点阵显示
  8. 一些论文审稿方面的体会
  9. macOS万能音视频转换器-Permute 3 for mac完整安装-简单易学的使用方法
  10. 2020.7月做题记录