Learn Riak Core Step By Step

riak core 是 riak的主要组成部分,主要负责分布式的部分,虽然官方有自己的存储后端,但是我们也可以使用其他的后端存储。

Partitioning & Distributing Work

riak core 在每个节点上都是使用master/worker配置,这样作为一个工作单元来执行,riak core的worker为vnode worker, 在每个节点上由riak_core_sup生成,vnode worker对应的模块为riak_core_vnode

如:

节点1:

Node: 'mfmn2@127.0.0.1', Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},{current_function,{gen_server,loop,6}},{initial_call,{proc_lib,init_p,5}},{status,waiting},{message_queue_len,0},{messages,[]},{links,[<0.148.0>,<0.152.0>,<0.154.0>,<0.155.0>,<0.153.0>,<0.150.0>,<0.151.0>,<0.149.0>,<0.140.0>,<0.144.0>,<0.146.0>,<0.147.0>,<0.145.0>,<0.142.0>,<0.143.0>,<0.141.0>,<0.136.0>,<0.138.0>,<0.139.0>,<0.137.0>,<0.77.0>,<0.135.0>]},{dictionary,[{'$ancestors',[riak_core_sup,<0.76.0>]},{'$initial_call',{supervisor_pre_r14b04,init,1}}]},{trap_exit,true},{error_handler,error_handler},{priority,normal},{group_leader,<0.75.0>},{total_heap_size,3571},{heap_size,2584},{stack_size,9},{reductions,4359},{garbage_collection,[{min_bin_vheap_size,46368},{min_heap_size,233},{fullsweep_after,10},{minor_gcs,6}]},{suspending,[]}]

节点2:

Node: 'mfmn1@127.0.0.1', Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},{current_function,{gen_server,loop,6}},{initial_call,{proc_lib,init_p,5}},{status,waiting},{message_queue_len,0},{messages,[]},{links,[<0.183.0>,<0.274.0>,<0.337.0>,<0.375.0>,<0.387.0>,<0.371.0>,<0.310.0>,<0.326.0>,<0.226.0>,<0.262.0>,<0.210.0>,<0.218.0>,<0.153.0>,<0.165.0>,<0.177.0>,<0.171.0>,<0.159.0>,<0.135.0>,<0.147.0>,<0.141.0>,<0.123.0>,<0.129.0>,<0.77.0>]},{dictionary,[{'$ancestors',[riak_core_sup,<0.76.0>]},{'$initial_call',{supervisor_pre_r14b04,init,1}}]},{trap_exit,true},{error_handler,error_handler},{priority,normal},{group_leader,<0.75.0>},{total_heap_size,1974},{heap_size,987},{stack_size,9},{reductions,8777},{garbage_collection,[{min_bin_vheap_size,46368},{min_heap_size,233},{fullsweep_after,10},{minor_gcs,2}]},{suspending,[]}]

节点3:

Node: 'mfmn3@127.0.0.1', Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},{current_function,{gen_server,loop,6}},{initial_call,{proc_lib,init_p,5}},{status,waiting},{message_queue_len,0},{messages,[]},{links,[<0.152.0>,<0.167.0>,<0.179.0>,<0.185.0>,<0.182.0>,<0.173.0>,<0.176.0>,<0.170.0>,<0.161.0>,<0.164.0>,<0.158.0>,<0.155.0>,<0.137.0>,<0.143.0>,<0.149.0>,<0.146.0>,<0.140.0>,<0.128.0>,<0.134.0>,<0.131.0>,<0.125.0>,<0.77.0>]},{dictionary,[{'$ancestors',[riak_core_sup,<0.76.0>]},{'$initial_call',{supervisor_pre_r14b04,init,1}}]},{trap_exit,true},{error_handler,error_handler},{priority,normal},{group_leader,<0.75.0>},{total_heap_size,3194},{heap_size,2584},{stack_size,9},{reductions,4507},{garbage_collection,[{min_bin_vheap_size,46368},{min_heap_size,233},{fullsweep_after,10},{minor_gcs,10}]},{suspending,[]}]

3个节点的vnode worker 加起来刚好22 + 23 + 22 - 3 = 64

(mfmn3@127.0.0.1)3> supervisor:count_children(riak_core_vnode_sup).
[{specs,1},{active,21},{supervisors,0},{workers,21}]
(mfmn3@127.0.0.1)4>

减去3的原因。

从上面那张图可以看出, riak_core_vnode_master负责与vnode的通信,这些vnode都是fsm,如:

获取当前节点的所有vnode:

(mfmn3@127.0.0.1)8> riak_core_vnode_master:all_nodes(mfmn_vnode).
[<0.173.0>,<0.179.0>,<0.185.0>,<0.143.0>,<0.155.0>,<0.164.0>,<0.182.0>,<0.170.0>,<0.161.0>,<0.140.0>,<0.146.0>,<0.152.0>,<0.158.0>,<0.176.0>,<0.167.0>,<0.149.0>,<0.137.0>,<0.125.0>,<0.128.0>,<0.131.0>,<0.134.0>]
(mfmn3@127.0.0.1)9>

这也再次证明了该节点的vnode个数为21.

向某个vnode发出Ping请求

这个例子是try-try-try的例子:

前面会携带一个Pid,而这个Pid是根据hash在ets表中索引出来的,这个Pid是vnode 的Pid,根据这个Pid可以索引到具体的vnode,最后Mod:handle_command是用户的回调函数。

如果master没有找到对应的vnode,那么他会新建一个vnode:

get_vnode(Idx, State=#state{vnode_mod=Mod}) ->
    case idx2vnode(Idx, State) of
        no_match ->
            {ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx),
            MonRef = erlang:monitor(process, Pid),
            add_vnode_rec(#idxrec{idx=Idx,pid=Pid,monref=MonRef}, State),
            Pid;
        X -> X
    end.

因为vnode下面是存储后端,所以只要定位到vnode就可以访问后端存储。

下面是try try try对应的代码:

start(_StartType, _StartArgs) ->
    case mfmn_sup:start_link() of
        {ok, Pid} ->
            ok = riak_core:register([{vnode_module, mfmn_vnode}]),
            ok = riak_core_ring_events:add_guarded_handler(mfmn_ring_event_handler, []),
            ok = riak_core_node_watcher_events:add_guarded_handler(mfmn_node_event_handler, []),
            ok = riak_core_node_watcher:service_up(mfmn, self()),
            {ok, Pid};
        {error, Reason} ->
            {error, Reason}
    end.stop(_State) ->
    ok.

启动master, 注册vnode, 铁添加mfmn_ring_event_handler,mfmn_node_event_handler...

下面就讲解一下try try try 的例子,高手勿喷!!!



第二个例子 Riak Core, The vnode

这是一个Real Time Stastics,简称RTS--实时统计应用。

这个系统要解决的两个问题是解析记录和分发记录,这交给entry vnode处理;第二个时接收实时统计,交给stat vnode处理。

注意

Entry = DocIdx = riak_core_util:chash_key({list_to_binary(Client), term_to_binary(now())}),
Stat = riak_core_util:chash_key({list_to_binary(Client), list_to_binary(StatName)}),

从哈希上可以看出,rts只支持单客户端。

什么是vnode

-   vnode是一个虚拟节点,和物理节点不一样
-   一个虚拟节点对应一个`erlang process`
-   一个虚拟节点是一个behaviour - gen_fsm behaviour
-   一个虚拟节点处理进来的请求
-   一个虚拟节点可能会存储数据,这些数据可以被以后检索到
-   很多虚拟节点会运行在同一个物理节点上
-   每个虚拟机都有一个主虚拟节点,他主要用于和它的所有存活的节点保持联系如你所见,虚拟节点要处理很多东西,不过Basho已经帮我们处理掉,我们只要实现所提供的`vnode behaviour`即可。用户只要理解输入和输出,然后定义所需的回调函数即可。

生命周期

init/1和termiante/2回调函数是虚拟节点的生命边缘,既是起止和终止位置,当一个连接到vnode的进程崩溃,那么handle_exit/3将会被调用。

init([Index]) -> Result

Index :: int() >= 0
Result :: {ok, State}
State :: term()

其中IndexVNodehash值。

rts注册了3种vnode -- rts_vnoderts_entry_vnode、 rts_stat_vnode、 对应的master vnode 分别为:rts_vnode_masterrts_entry_vnode_masterrts_stat_vnode_master

每种vnode负责不同服务. rts_vnode提供ping服务, rts_entry_vnode提供日志分析服务, rts_stat_vnode对日志分析结果进行统计的服务。

terminate(Reason, State)

Reason      :: normal | shutdown | {shutdown, term()} | term()
State       :: term()
Result      :: term()

用于vnode清理资源,由于entry 和 stat所有东西都是在内存中的,所以不用做清理工作,直接交由erlang vm处理即可:

terminate(_Reason, _State) ->
    ok.

handle_exit(Pid, Reason, State) -> Result

Pid         :: pid()
Reason      :: term()
State       :: term()
Result      :: {noreply, NewState}
             | {stop, NewState}

当列链接到该vnode的进程挂掉时,这个回调函数将被调用,返回值有两种选择:1. 返回{stop, NewState}, 这样vnode也会被停止掉;2.返回{noreply, NewState},Vnode继续工作。

Commands

所有vnode的请求都变成命令请求。例如,一个riak kv的“get”请求最终会被handle_command处理,这个handle_command在vnode里定义。

为了实现一个命令,你需要增加一个新的handle_command/3函数,用于匹配到来的请求。例如:为了得到一个{get, StateName}的统计请求,需要增加一个handle_command({get, StatName}, ...)

handle_command(Request, Sender, State) -> Result

Request     :: term()
Sender      :: sender()
State       :: term()
NewState    :: term()
Result      :: {reply, Reply, NewState}
             | {noreply, NewState}
             | {stop, Reason, NewState}

请求可以是任何东西,但经常是一个标签化的元组,Sender代表客户端进程,但经常是一个不透明的值,这个值用在riak_core_vnode:reply/2中。State更像一个gen_server state,主要用于跟踪回调函数轨迹。

有3中返回值:

1) reply: 发送Reply给客户端

2) noreply: 不发送应答

3) stop: 中断vnode

entry vnode 需要比较传进来的日志,然后在转发出去:

handle_command({entry, Client, Entry}, _Sender, #state{reg=Reg}=State) ->
    io:format("~p~n", [{entry, State#state.partition}]),
    lists:foreach(match(Client, Entry), Reg),
    {noreply, State}.

stat vnode就像一个迷你的Redis, 支持本地更新。

handle_command({get, StatName}, _Sender, #state{stats=Stats}=State) ->
    Reply =
        case dict:find(StatName, Stats) of
            error ->
                not_found;
            Found ->
                Found
        end,
    {reply, Reply, State};handle_command({set, StatName, Val}, _Sender, #state{stats=Stats0}=State) ->
    Stats = dict:store(StatName, Val, Stats0),
    {reply, ok, State#state{stats=Stats}};handle_command({incr, StatName}, _Sender, #state{stats=Stats0}=State) ->
    Stats = dict:update_counter(StatName, 1, Stats0),
    {reply, ok, State#state{stats=Stats}};handle_command({incrby, StatName, Val}, _Sender, #state{stats=Stats0}=State) ->
    Stats = dict:update_counter(StatName, Val, Stats0),
    {reply, ok, State#state{stats=Stats}};handle_command({append, StatName, Val}, _Sender, #state{stats=Stats0}=State) ->
    Stats = try dict:append(StatName, Val, Stats0)
            catch _:_ -> dict:store(StatName, [Val], Stats0)
            end,
    {reply, ok, State#state{stats=Stats}};handle_command({sadd, StatName, Val}, _Sender, #state{stats=Stats0}=State) ->
    F = fun(S) ->
                sets:add_element(Val, S)
        end,
    Stats = dict:update(StatName, F, sets:from_list([Val]), Stats0),
    {reply, ok, State#state{stats=Stats}}.

"Covering" Commands

riak_core_vnode_master:coverage命令的回调为handle_coverage/4,这是一个很方便的操作,例如:可以用在列出riak的keys, 或者二级索引的范围查找。注意,首先,我们没有正在拉取对象。他所有的操作都落到vnode中去,这也意味着它是非常的高效的。比较酷的事情是这就好像是在本地操作一样,让你高性能的查询主键,并且根据匹配返回相应的对象;这是一个有效的方当使用像leveldb这种有序的存储后端时。

Handoff

handoff会在一个vnode意识到自己不是一个正确的node时会触发。如 :1. 由于增加/删除节点导致环的改变;2.一个node down掉后又up了

Riak Core实现了hinted handoff, hint 是一段数据,用于直线所在分区的vnode。实现handloff看起来比较困难,只要记住它的主要目的是vnode之间的数据传输就行了,如果你的vnode只是用来计算,那么可以无视它的存在。

handoff的成员有:is_empty/1delete/1handoff_starting/2handoff_cancelled/1encode_handoff_item/2handle_handoff_data/2,handle_handoff_command/3, and handoff_finished/2.

is_empty(State) -> Result

State       :: term()
NewState    :: term()
Result      :: {true, NewState}
             | {false, NewState}

一旦容器确定了一个vnode位置发生变化时,它的第一个操作就是确定这里是否有任何的数据需要被传输,如果需要则返回true否则返回false。当一个vnode被认为是空的话,delete/3将被调用。

例子中的stat vnode检测stats dict的大小来确定其是否为空。

is_empty(State) ->
    case dict:size(State#state.stats) of
        0 -> {true, State};
        _ -> {false, State}
    end.

delete(State) -> Result

State       :: term()
NewState    :: term()
Result      :: {ok, NewState}

如果vnode没有数据需要传输了,容器就会调用delete。

handoff_starting(TargetNode, State) -> Result

TargetNode  :: node()
Result      :: {true, NewState} | {false, NewState}
State       :: term()
NewState    :: term()

当handoof一定发生的时候,容器就会调用handoff_starting/2,vnode最后会指示handoff是否会发生,返回true表示继续,返回false表示取消。vnode可能会有很多策略来决定它的负载,或者在超载的时候选择不参与数据传输.目标节点也是一个数据传输节点。

handoff_cancelled(State) -> Result

State       :: term()
NewState    :: term()
Result      :: {ok, NewState}

传输管理可以设置并发传输的操作数量,默认是4,但是可以调整。如果并发数量超标,那末容器就会调用handoff_cancelled/1,这时候你可以撤销在handoff_starting/2完成的事情。

如果在handoff期间发生错误,handoff_cancelled/1也会被调用。

encode_handoff_item(K, V) -> Result

K           :: {Bucket, Key}
Bucket      :: riak_object:bucket()
Key         :: riak_object:key()
V           :: term()
Result      :: binary()

这个函数用于容器加码数据,如序列化数据,需要了解的3件事是:

1) KeyValue是一起加码的,比如,他们需要一起访问

2) 这个函数必须返回二进制

3) 和handle_handle_off_data/2一起工作

如:

handle_handoff_data(Data, #state{stats=Stats0}=State) ->
    {StatName, Val} = binary_to_term(Data),
    Stats = dict:store(StatName, Val, Stats0),
    {reply, ok, State#state{stats=Stats}}.    encode_handoff_item(StatName, Val) ->
    term_to_binary({StatName,Val}).

handle_handoff_data(BinObj, State) -> Result

BinObj      :: binary()
State       :: term()
NewState    :: term()
Result      :: {reply, ok, NewState}
             | {reply, {error, Error}, NewState}

这个函数主要用来反序列化handoff data, 他的主要工作从而机制对象中重构vnode 的 state, 如果在解码数据时发生错误,那么返回{error, Error}来描述错误。

为了解码数据,例子中使用了binary_to_term/1并且把它插入到本地词典中。

handle_handoff_data(Data, #state{stats=Stats0}=State) ->
    {StatName, Val} = binary_to_term(Data),
    Stats = dict:store(StatName, Val, Stats0),
    {reply, ok, State#state{stats=Stats}}.

handle_handoff_command(Request, Sender, State) -> Result

Request     :: term()
Sender      :: sender()
State       :: term()
NewState    :: term()
Result      :: {reply, Reply, NewState}
             | {noreply, NewState}
             | {forward, NewState}
             | {drop, NewState}
             | {stop, Reason, NewState}

这个回调函数和handle_command/3非常相识,不同的是是在一个请求到达handoff时被调用。有两个额外的返回值:继续或者终止. 其中终止和noreply的行为是一样的,只是它用于提醒这是dropping操作,你也可以不实现它。

如果你想要你的节点传输数据,那么你必须实现?FOLD_REQ 在handle_handoff_command/3中。你必须这样做,因为handoff manager 通过这个方法来迭代你的数据。?FOLD_REQ是一个#riak_core_fold_req_v1record,这个record包含了一个foldfun函数和一个acc0 (accumulator)迭代器。foldfun函数需要3个参数: the keyvalue,和accumulator.也就是说如果你的数据格式不支持foldfun,那么就需要手动调整。例如,如果你的数据是在是在一个列表里面,那么你应该实现一个适配函数来转化。

F = fun({Key,Val}, Acc) -> FoldFun(Key, Val, Acc) end,
Acc = lists:foldl(F, Acc0, State#state.data),

...
例子中的stat vnode使用一个dict,它支持3个参数的fold function, 迭代器是一个不透明的值,并且要和socket保持联系,定期的出发同步命令。有兴趣可以看一下handoff sender. 最后,确保返回迭代器的最终的值。

handle_handoff_command(?FOLD_REQ{foldfun=Fun, acc0=Acc0}, _Sender, State) ->
    Acc = dict:fold(Fun, Acc0, State#state.stats),
    {reply, Acc, State}.

我也不知道fold request在vnode behaviour的回调,在下一天在讨论。

handoff_finished(TargetNode, State) -> Result

TargetNode  :: node()
Result      :: any()
State       :: term()

当所有数据都传输到目标节点时,这个函数将会被调用,这个返回值可以是任何值,这个返回值会被容器忽略掉。

日志分析到日志统计

rts提供了http的接口方便用户录入数据,其中rts_wm_entry:process_post的数据结构如下

Client: "progski"
Entry: "0.0.0.0 - - [21/Mar/2011:18:18:19 +0000] \"GET /blog/2011/aol_meet_riak.html HTTP/1.1\" 200 5865 \"http://www.google.com/\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.151 Safari/534.16\""

Client是gunzip -c progski.access.log.gz | ./replay progski传递过来的, progski 为client,日志格式如下:

[_Host, _, _User, _Time, Req, Code, BodySize, _Referer, Agent]
0.0.0.0 - - [21/Mar/2011:18:47:27 +0000] "GET /blog/2011/aol_meet_riak.html HTTP/1.1" 200 12754 "-" "Java/1.6.0_24"

然后,entry master vnode根据哈希来调用entry vnode, entry vnode 切分日志,把日志切分为[_Host, _, _User, _Time, Req, Code, BodySize, _Referer, Agent],然后把每一项哈希出来,交由stat master vnode转发到对应的stat vnode中去。

PrefList = riak_core_apl:get_apl(DocIdx, 1, rts_entry),
    [IdxNode] = PrefList,
    rts_entry_vnode:entry(IdxNode, Client, Entry).

发送命令:

riak_core_vnode_master:command(IdxNode,{entry, Client, Entry},?MASTER).

其中?MASTER为rts_entry_vnode_master.

如图:

最后交给stat vnode 进行统计。

Riak Core Guide 1相关推荐

  1. Riak Core Guide 3

    Learn Riak Core Step By Step 3 Riak Core, Conflict Resolution 这一章主要描述最终一致性和如何实现强一致性. Object 非重重要的一个数 ...

  2. 对Riak Core的探索 (1) Hello

    haogongju.人人IT网.59n南龙.360doc不要抄我的烂博客了,私人备忘用. [size=x-large]基于Riak Core的开发指南[/size] [size=large]1. he ...

  3. 点评可调整大小哈希表:Riak Core和随机切片技术

    \ 本文要点 \\ 哈希表是一种用于管理空间的数据结构.它最初用于单一应用的内存,之后应用于大规模的计算集群.\\t 随着哈希表在一些新领域的应用,原有的哈希表大小调整方法存在一些不好的副作用.\\t ...

  4. TensorFlow新功能「AutoGraph」:将Python转换为计算图

    伊瓢 编译自 TensorFlow博客  量子位 报道 | 公众号 QbitAI 昨天,TensorFlow推出了一个新功能「AutoGraph」,可以将Python代码(包括控制流print()和其 ...

  5. 使用 DS-MDK 开发 NXP iMX7

    1). 简介 NXP  i.MX7 处理器是一款具有 Cortex-A7 和 M4 的异构多核处理器.A7能够运行 Linux 等操作系统,完成GUI.网络.文件管理和算法运算等复杂任务,而 M4 则 ...

  6. 【UCB操作系统CS162项目】Pintos Lab1:线程调度 Threads

    实验文档链接:Lab1: Threads 我的实现(更新至Lab 2):Altair-Alpha/pintos 开始之前 如文档所述,在开始编写代码之前需要先阅读掌握 GETTING STARTED ...

  7. 【UCB操作系统CS162项目】Pintos Lab0:项目上手 (Getting Real)

    前言 Stanford 的 CS144 计网完成后让我们继续挑战一项更难的课程项目:UCB 操作系统 CS162 的 Pintos,这个也是多个 CS 顶校都在用的项目.老规矩讲课部分因为本科基本都学 ...

  8. AUTOSAR文档如何阅读 -- 这些缩写是干嘛的!!!

    目录 1 Autosar BSW Module List 2 Autosar规范文档的类型 2.1 Autosar文档中提到的缩写 3 如何快速查看相邻CP Autosar版本之间的差异 结尾 优质博 ...

  9. BPF CO-RE reference guide

    BPF CO-RE reference guide 目录 The missing manual Reading kernel data bpf_core_read() bpf_core_read_st ...

最新文章

  1. svn 目录结构 trunk java_如何彻底删除SVN中的文件和文件夹(附恢复方法)
  2. 技术雷达峰会2020:从技术趋势看行业挑战
  3. 从源文件中读出最后10KB内容到目的文件中
  4. C# HashTable 使用用法详解
  5. Spring Data Solr教程:Solr简介
  6. 汇编指令处理的数据长度
  7. WINDOWS SERVER 2008/2008 R2/2012 最大内存支持
  8. 史上最全的JFinal源码分析(不间断更新)
  9. Oracle 语句连接字符,oracle拼接字符串当sql语句
  10. 不再惧怕!二叉树结构相关算法总结 | 原力计划
  11. Mysql锁机制简单了解一下
  12. 使用slf4j和log4j记录日志
  13. Python全栈开发记录_第六篇(生成器和迭代器)
  14. hashcode值相同的字符串
  15. cheerio获取outerHTML
  16. SSM+基于ssm的汽车租赁平台的设计与实现 毕业设计-附源码211708
  17. vscode vim插件(updating)
  18. 信息的定义与特征,构成世界的三大要素:物质、能量、信息
  19. html file 英文,(转)如何将input type=file显示的浏览 变成英文的?
  20. 【Linux C】进程、线程和进程间通信

热门文章

  1. js-jquery-001-条形码概述
  2. 中国污泥处理处置行业前景趋势研究及投资风险分析报告2022-2027年新版
  3. Windows 12 网页版HTML源码
  4. egret引擎下,微信分包,微信登陆,微信分享例子
  5. 2015-2016年中国固态硬盘市场研究报告
  6. pop()函数的用法
  7. 中文、日文、韩文的unicode范围
  8. java微信群自动回复_功能强大,手机微信群控系统和云控哪个好?
  9. 计世网:IT人员秘密思考的十件事情
  10. DNS协议从入门到部署DNS服务器