Riak Core Guide 1
Learn Riak Core Step By Step
riak core 是 riak的主要组成部分,主要负责分布式的部分,虽然官方有自己的存储后端,但是我们也可以使用其他的后端存储。
Partitioning & Distributing Work
riak core 在每个节点上都是使用master/worker配置,这样作为一个工作单元来执行,riak core的worker为vnode worker, 在每个节点上由riak_core_sup生成,vnode worker对应的模块为riak_core_vnode
。
如:
节点1:
Node: 'mfmn2@127.0.0.1', Process: <0.80.0> [{registered_name,riak_core_vnode_sup},{current_function,{gen_server,loop,6}},{initial_call,{proc_lib,init_p,5}},{status,waiting},{message_queue_len,0},{messages,[]},{links,[<0.148.0>,<0.152.0>,<0.154.0>,<0.155.0>,<0.153.0>,<0.150.0>,<0.151.0>,<0.149.0>,<0.140.0>,<0.144.0>,<0.146.0>,<0.147.0>,<0.145.0>,<0.142.0>,<0.143.0>,<0.141.0>,<0.136.0>,<0.138.0>,<0.139.0>,<0.137.0>,<0.77.0>,<0.135.0>]},{dictionary,[{'$ancestors',[riak_core_sup,<0.76.0>]},{'$initial_call',{supervisor_pre_r14b04,init,1}}]},{trap_exit,true},{error_handler,error_handler},{priority,normal},{group_leader,<0.75.0>},{total_heap_size,3571},{heap_size,2584},{stack_size,9},{reductions,4359},{garbage_collection,[{min_bin_vheap_size,46368},{min_heap_size,233},{fullsweep_after,10},{minor_gcs,6}]},{suspending,[]}]
节点2:
Node: 'mfmn1@127.0.0.1', Process: <0.80.0> [{registered_name,riak_core_vnode_sup},{current_function,{gen_server,loop,6}},{initial_call,{proc_lib,init_p,5}},{status,waiting},{message_queue_len,0},{messages,[]},{links,[<0.183.0>,<0.274.0>,<0.337.0>,<0.375.0>,<0.387.0>,<0.371.0>,<0.310.0>,<0.326.0>,<0.226.0>,<0.262.0>,<0.210.0>,<0.218.0>,<0.153.0>,<0.165.0>,<0.177.0>,<0.171.0>,<0.159.0>,<0.135.0>,<0.147.0>,<0.141.0>,<0.123.0>,<0.129.0>,<0.77.0>]},{dictionary,[{'$ancestors',[riak_core_sup,<0.76.0>]},{'$initial_call',{supervisor_pre_r14b04,init,1}}]},{trap_exit,true},{error_handler,error_handler},{priority,normal},{group_leader,<0.75.0>},{total_heap_size,1974},{heap_size,987},{stack_size,9},{reductions,8777},{garbage_collection,[{min_bin_vheap_size,46368},{min_heap_size,233},{fullsweep_after,10},{minor_gcs,2}]},{suspending,[]}]
节点3:
Node: 'mfmn3@127.0.0.1', Process: <0.80.0> [{registered_name,riak_core_vnode_sup},{current_function,{gen_server,loop,6}},{initial_call,{proc_lib,init_p,5}},{status,waiting},{message_queue_len,0},{messages,[]},{links,[<0.152.0>,<0.167.0>,<0.179.0>,<0.185.0>,<0.182.0>,<0.173.0>,<0.176.0>,<0.170.0>,<0.161.0>,<0.164.0>,<0.158.0>,<0.155.0>,<0.137.0>,<0.143.0>,<0.149.0>,<0.146.0>,<0.140.0>,<0.128.0>,<0.134.0>,<0.131.0>,<0.125.0>,<0.77.0>]},{dictionary,[{'$ancestors',[riak_core_sup,<0.76.0>]},{'$initial_call',{supervisor_pre_r14b04,init,1}}]},{trap_exit,true},{error_handler,error_handler},{priority,normal},{group_leader,<0.75.0>},{total_heap_size,3194},{heap_size,2584},{stack_size,9},{reductions,4507},{garbage_collection,[{min_bin_vheap_size,46368},{min_heap_size,233},{fullsweep_after,10},{minor_gcs,10}]},{suspending,[]}]
3个节点的vnode worker 加起来刚好22 + 23 + 22 - 3 = 64
(mfmn3@127.0.0.1)3> supervisor:count_children(riak_core_vnode_sup). [{specs,1},{active,21},{supervisors,0},{workers,21}] (mfmn3@127.0.0.1)4>
减去3的原因。
从上面那张图可以看出, riak_core_vnode_master负责与vnode的通信,这些vnode都是fsm,如:
获取当前节点的所有vnode:
(mfmn3@127.0.0.1)8> riak_core_vnode_master:all_nodes(mfmn_vnode). [<0.173.0>,<0.179.0>,<0.185.0>,<0.143.0>,<0.155.0>,<0.164.0>,<0.182.0>,<0.170.0>,<0.161.0>,<0.140.0>,<0.146.0>,<0.152.0>,<0.158.0>,<0.176.0>,<0.167.0>,<0.149.0>,<0.137.0>,<0.125.0>,<0.128.0>,<0.131.0>,<0.134.0>] (mfmn3@127.0.0.1)9>
这也再次证明了该节点的vnode个数为21.
向某个vnode发出Ping请求
这个例子是try-try-try
的例子:
前面会携带一个Pid,而这个Pid是根据hash在ets表中索引出来的,这个Pid是vnode 的Pid,根据这个Pid可以索引到具体的vnode,最后Mod:handle_command是用户的回调函数。
如果master没有找到对应的vnode,那么他会新建一个vnode:
get_vnode(Idx, State=#state{vnode_mod=Mod}) -> case idx2vnode(Idx, State) of no_match -> {ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx), MonRef = erlang:monitor(process, Pid), add_vnode_rec(#idxrec{idx=Idx,pid=Pid,monref=MonRef}, State), Pid; X -> X end.
因为vnode下面是存储后端,所以只要定位到vnode就可以访问后端存储。
下面是try try try
对应的代码:
start(_StartType, _StartArgs) -> case mfmn_sup:start_link() of {ok, Pid} -> ok = riak_core:register([{vnode_module, mfmn_vnode}]), ok = riak_core_ring_events:add_guarded_handler(mfmn_ring_event_handler, []), ok = riak_core_node_watcher_events:add_guarded_handler(mfmn_node_event_handler, []), ok = riak_core_node_watcher:service_up(mfmn, self()), {ok, Pid}; {error, Reason} -> {error, Reason} end.stop(_State) -> ok.
启动master, 注册vnode, 铁添加mfmn_ring_event_handler,mfmn_node_event_handler...
下面就讲解一下try try try
的例子,高手勿喷!!!
第二个例子 Riak Core, The vnode
这是一个Real Time Stastics
,简称RTS
--实时统计应用。
这个系统要解决的两个问题是解析记录和分发记录,这交给entry vnode
处理;第二个时接收实时统计,交给stat vnode
处理。
注意
Entry = DocIdx = riak_core_util:chash_key({list_to_binary(Client), term_to_binary(now())}), Stat = riak_core_util:chash_key({list_to_binary(Client), list_to_binary(StatName)}),
从哈希上可以看出,rts只支持单客户端。
什么是vnode
- vnode是一个虚拟节点,和物理节点不一样 - 一个虚拟节点对应一个`erlang process` - 一个虚拟节点是一个behaviour - gen_fsm behaviour - 一个虚拟节点处理进来的请求 - 一个虚拟节点可能会存储数据,这些数据可以被以后检索到 - 很多虚拟节点会运行在同一个物理节点上 - 每个虚拟机都有一个主虚拟节点,他主要用于和它的所有存活的节点保持联系如你所见,虚拟节点要处理很多东西,不过Basho已经帮我们处理掉,我们只要实现所提供的`vnode behaviour`即可。用户只要理解输入和输出,然后定义所需的回调函数即可。
生命周期
init/1和termiante/2回调函数是虚拟节点的生命边缘,既是起止和终止位置,当一个连接到vnode的进程崩溃,那么handle_exit/3将会被调用。
init([Index]) -> Result
Index :: int() >= 0 Result :: {ok, State} State :: term()
其中Index
为VNode
的hash
值。
rts注册了3种vnode -- rts_vnode
、rts_entry_vnode
、 rts_stat_vnode
、 对应的master vnode 分别为:rts_vnode_master
、rts_entry_vnode_master
、rts_stat_vnode_master
。
每种vnode负责不同服务. rts_vnode
提供ping服务, rts_entry_vnode
提供日志分析服务, rts_stat_vnode
对日志分析结果进行统计的服务。
terminate(Reason, State)
Reason :: normal | shutdown | {shutdown, term()} | term() State :: term() Result :: term()
用于vnode清理资源,由于entry 和 stat所有东西都是在内存中的,所以不用做清理工作,直接交由erlang vm处理即可:
terminate(_Reason, _State) -> ok.
handle_exit(Pid, Reason, State) -> Result
Pid :: pid() Reason :: term() State :: term() Result :: {noreply, NewState} | {stop, NewState}
当列链接到该vnode的进程挂掉时,这个回调函数将被调用,返回值有两种选择:1. 返回{stop, NewState}, 这样vnode也会被停止掉;2.返回{noreply, NewState},Vnode继续工作。
Commands
所有vnode的请求都变成命令请求。例如,一个riak kv的“get”请求最终会被handle_command处理,这个handle_command在vnode里定义。
为了实现一个命令,你需要增加一个新的handle_command/3
函数,用于匹配到来的请求。例如:为了得到一个{get, StateName}的统计请求,需要增加一个handle_command({get, StatName}, ...)
。
handle_command(Request, Sender, State) -> Result
Request :: term() Sender :: sender() State :: term() NewState :: term() Result :: {reply, Reply, NewState} | {noreply, NewState} | {stop, Reason, NewState}
请求可以是任何东西,但经常是一个标签化的元组,Sender
代表客户端进程,但经常是一个不透明的值,这个值用在riak_core_vnode:reply/2
中。State
更像一个gen_server state
,主要用于跟踪回调函数轨迹。
有3中返回值:
1) reply: 发送Reply
给客户端
2) noreply: 不发送应答
3) stop: 中断vnode
entry vnode
需要比较传进来的日志,然后在转发出去:
handle_command({entry, Client, Entry}, _Sender, #state{reg=Reg}=State) -> io:format("~p~n", [{entry, State#state.partition}]), lists:foreach(match(Client, Entry), Reg), {noreply, State}.
stat vnode
就像一个迷你的Redis
, 支持本地更新。
handle_command({get, StatName}, _Sender, #state{stats=Stats}=State) -> Reply = case dict:find(StatName, Stats) of error -> not_found; Found -> Found end, {reply, Reply, State};handle_command({set, StatName, Val}, _Sender, #state{stats=Stats0}=State) -> Stats = dict:store(StatName, Val, Stats0), {reply, ok, State#state{stats=Stats}};handle_command({incr, StatName}, _Sender, #state{stats=Stats0}=State) -> Stats = dict:update_counter(StatName, 1, Stats0), {reply, ok, State#state{stats=Stats}};handle_command({incrby, StatName, Val}, _Sender, #state{stats=Stats0}=State) -> Stats = dict:update_counter(StatName, Val, Stats0), {reply, ok, State#state{stats=Stats}};handle_command({append, StatName, Val}, _Sender, #state{stats=Stats0}=State) -> Stats = try dict:append(StatName, Val, Stats0) catch _:_ -> dict:store(StatName, [Val], Stats0) end, {reply, ok, State#state{stats=Stats}};handle_command({sadd, StatName, Val}, _Sender, #state{stats=Stats0}=State) -> F = fun(S) -> sets:add_element(Val, S) end, Stats = dict:update(StatName, F, sets:from_list([Val]), Stats0), {reply, ok, State#state{stats=Stats}}.
"Covering" Commands
riak_core_vnode_master:coverage
命令的回调为handle_coverage/4
,这是一个很方便的操作,例如:可以用在列出riak的keys, 或者二级索引的范围查找。注意,
首先,我们没有正在拉取对象。他所有的操作都落到vnode中去,这也意味着它是非常的高效的。比较酷的事情是这就好像是在本地操作一样,让你高性能的查询主键,并且根据匹配返回相应的对象;这是一个有效的方当使用像leveldb
这种有序的存储后端时。
Handoff
handoff
会在一个vnode
意识到自己不是一个正确的node
时会触发。如 :1. 由于增加/删除节点导致环的改变;2.一个node down掉后又up了
Riak Core实现了hinted handoff, hint
是一段数据,用于直线所在分区的vnode。实现handloff看起来比较困难,只要记住它的主要目的是vnode之间的数据传输就行了,如果你的vnode只是用来计算,那么可以无视它的存在。
handoff的成员有:is_empty/1
, delete/1
, handoff_starting/2
, handoff_cancelled/1
, encode_handoff_item/2
, handle_handoff_data/2
,handle_handoff_command/3
, and handoff_finished/2
.
is_empty(State) -> Result
State :: term() NewState :: term() Result :: {true, NewState} | {false, NewState}
一旦容器确定了一个vnode位置发生变化时,它的第一个操作就是确定这里是否有任何的数据需要被传输,如果需要则返回true
否则返回false
。当一个vnode被认为是空的话,delete/3
将被调用。
例子中的stat vnode
检测stats dict
的大小来确定其是否为空。
is_empty(State) -> case dict:size(State#state.stats) of 0 -> {true, State}; _ -> {false, State} end.
delete(State) -> Result
State :: term() NewState :: term() Result :: {ok, NewState}
如果vnode没有数据需要传输了,容器就会调用delete。
handoff_starting(TargetNode, State) -> Result
TargetNode :: node() Result :: {true, NewState} | {false, NewState} State :: term() NewState :: term()
当handoof一定发生的时候,容器就会调用handoff_starting/2
,vnode最后会指示handoff是否会发生,返回true
表示继续,返回false
表示取消。vnode可能会有很多策略来决定它的负载,或者在超载的时候选择不参与数据传输.目标节点也是一个数据传输节点。
handoff_cancelled(State) -> Result
State :: term() NewState :: term() Result :: {ok, NewState}
传输管理可以设置并发传输的操作数量,默认是4,但是可以调整。如果并发数量超标,那末容器就会调用handoff_cancelled/1
,这时候你可以撤销在handoff_starting/2完成的事情。
如果在handoff期间发生错误,handoff_cancelled/1
也会被调用。
encode_handoff_item(K, V) -> Result
K :: {Bucket, Key} Bucket :: riak_object:bucket() Key :: riak_object:key() V :: term() Result :: binary()
这个函数用于容器加码数据,如序列化数据,需要了解的3件事是:
1) Key
和Value
是一起加码的,比如,他们需要一起访问
2) 这个函数必须返回二进制
3) 和handle_handle_off_data/2一起工作
如:
handle_handoff_data(Data, #state{stats=Stats0}=State) -> {StatName, Val} = binary_to_term(Data), Stats = dict:store(StatName, Val, Stats0), {reply, ok, State#state{stats=Stats}}. encode_handoff_item(StatName, Val) -> term_to_binary({StatName,Val}).
handle_handoff_data(BinObj, State) -> Result
BinObj :: binary() State :: term() NewState :: term() Result :: {reply, ok, NewState} | {reply, {error, Error}, NewState}
这个函数主要用来反序列化handoff data
, 他的主要工作从而机制对象中重构vnode 的 state, 如果在解码数据时发生错误,那么返回{error, Error}来描述错误。
为了解码数据,例子中使用了binary_to_term/1
并且把它插入到本地词典中。
handle_handoff_data(Data, #state{stats=Stats0}=State) -> {StatName, Val} = binary_to_term(Data), Stats = dict:store(StatName, Val, Stats0), {reply, ok, State#state{stats=Stats}}.
handle_handoff_command(Request, Sender, State) -> Result
Request :: term() Sender :: sender() State :: term() NewState :: term() Result :: {reply, Reply, NewState} | {noreply, NewState} | {forward, NewState} | {drop, NewState} | {stop, Reason, NewState}
这个回调函数和handle_command/3
非常相识,不同的是是在一个请求到达handoff时被调用。有两个额外的返回值:继续或者终止. 其中终止和noreply
的行为是一样的,只是它用于提醒这是dropping
操作,你也可以不实现它。
如果你想要你的节点传输数据,那么你必须实现?FOLD_REQ
在handle_handoff_command/3中。你必须这样做,因为handoff manager
通过这个方法来迭代你的数据。?FOLD_REQ
是一个#riak_core_fold_req_v1
的record
,这个record包含了一个foldfun
函数和一个acc0 (accumulator)
迭代器。foldfun
函数需要3个参数: the key
, value
,和accumulator
.也就是说如果你的数据格式不支持foldfun
,那么就需要手动调整。例如,如果你的数据是在是在一个列表里面,那么你应该实现一个适配函数来转化。
F = fun({Key,Val}, Acc) -> FoldFun(Key, Val, Acc) end, Acc = lists:foldl(F, Acc0, State#state.data),
...
例子中的stat vnode
使用一个dict
,它支持3个参数的fold function
, 迭代器是一个不透明的值,并且要和socket
保持联系,定期的出发同步命令。有兴趣可以看一下handoff sender
. 最后,确保返回迭代器的最终的值。
handle_handoff_command(?FOLD_REQ{foldfun=Fun, acc0=Acc0}, _Sender, State) -> Acc = dict:fold(Fun, Acc0, State#state.stats), {reply, Acc, State}.
我也不知道fold request在vnode behaviour的回调,在下一天在讨论。
handoff_finished(TargetNode, State) -> Result
TargetNode :: node() Result :: any() State :: term()
当所有数据都传输到目标节点时,这个函数将会被调用,这个返回值可以是任何值,这个返回值会被容器忽略掉。
日志分析到日志统计
rts提供了http的接口方便用户录入数据,其中rts_wm_entry:process_post的数据结构如下
Client: "progski" Entry: "0.0.0.0 - - [21/Mar/2011:18:18:19 +0000] \"GET /blog/2011/aol_meet_riak.html HTTP/1.1\" 200 5865 \"http://www.google.com/\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.151 Safari/534.16\""
Client是gunzip -c progski.access.log.gz | ./replay progski
传递过来的, progski
为client,日志格式如下:
[_Host, _, _User, _Time, Req, Code, BodySize, _Referer, Agent] 0.0.0.0 - - [21/Mar/2011:18:47:27 +0000] "GET /blog/2011/aol_meet_riak.html HTTP/1.1" 200 12754 "-" "Java/1.6.0_24"
然后,entry master vnode
根据哈希来调用entry vnode
, entry vnode
切分日志,把日志切分为[_Host, _, _User, _Time, Req, Code, BodySize, _Referer, Agent]
,然后把每一项哈希出来,交由stat master vnode
转发到对应的stat vnode
中去。
PrefList = riak_core_apl:get_apl(DocIdx, 1, rts_entry), [IdxNode] = PrefList, rts_entry_vnode:entry(IdxNode, Client, Entry).
发送命令:
riak_core_vnode_master:command(IdxNode,{entry, Client, Entry},?MASTER).
其中?MASTER为rts_entry_vnode_master
.
如图:
最后交给stat vnode 进行统计。
Riak Core Guide 1相关推荐
- Riak Core Guide 3
Learn Riak Core Step By Step 3 Riak Core, Conflict Resolution 这一章主要描述最终一致性和如何实现强一致性. Object 非重重要的一个数 ...
- 对Riak Core的探索 (1) Hello
haogongju.人人IT网.59n南龙.360doc不要抄我的烂博客了,私人备忘用. [size=x-large]基于Riak Core的开发指南[/size] [size=large]1. he ...
- 点评可调整大小哈希表:Riak Core和随机切片技术
\ 本文要点 \\ 哈希表是一种用于管理空间的数据结构.它最初用于单一应用的内存,之后应用于大规模的计算集群.\\t 随着哈希表在一些新领域的应用,原有的哈希表大小调整方法存在一些不好的副作用.\\t ...
- TensorFlow新功能「AutoGraph」:将Python转换为计算图
伊瓢 编译自 TensorFlow博客 量子位 报道 | 公众号 QbitAI 昨天,TensorFlow推出了一个新功能「AutoGraph」,可以将Python代码(包括控制流print()和其 ...
- 使用 DS-MDK 开发 NXP iMX7
1). 简介 NXP i.MX7 处理器是一款具有 Cortex-A7 和 M4 的异构多核处理器.A7能够运行 Linux 等操作系统,完成GUI.网络.文件管理和算法运算等复杂任务,而 M4 则 ...
- 【UCB操作系统CS162项目】Pintos Lab1:线程调度 Threads
实验文档链接:Lab1: Threads 我的实现(更新至Lab 2):Altair-Alpha/pintos 开始之前 如文档所述,在开始编写代码之前需要先阅读掌握 GETTING STARTED ...
- 【UCB操作系统CS162项目】Pintos Lab0:项目上手 (Getting Real)
前言 Stanford 的 CS144 计网完成后让我们继续挑战一项更难的课程项目:UCB 操作系统 CS162 的 Pintos,这个也是多个 CS 顶校都在用的项目.老规矩讲课部分因为本科基本都学 ...
- AUTOSAR文档如何阅读 -- 这些缩写是干嘛的!!!
目录 1 Autosar BSW Module List 2 Autosar规范文档的类型 2.1 Autosar文档中提到的缩写 3 如何快速查看相邻CP Autosar版本之间的差异 结尾 优质博 ...
- BPF CO-RE reference guide
BPF CO-RE reference guide 目录 The missing manual Reading kernel data bpf_core_read() bpf_core_read_st ...
最新文章
- svn 目录结构 trunk java_如何彻底删除SVN中的文件和文件夹(附恢复方法)
- 技术雷达峰会2020:从技术趋势看行业挑战
- 从源文件中读出最后10KB内容到目的文件中
- C# HashTable 使用用法详解
- Spring Data Solr教程:Solr简介
- 汇编指令处理的数据长度
- WINDOWS SERVER 2008/2008 R2/2012 最大内存支持
- 史上最全的JFinal源码分析(不间断更新)
- Oracle 语句连接字符,oracle拼接字符串当sql语句
- 不再惧怕!二叉树结构相关算法总结 | 原力计划
- Mysql锁机制简单了解一下
- 使用slf4j和log4j记录日志
- Python全栈开发记录_第六篇(生成器和迭代器)
- hashcode值相同的字符串
- cheerio获取outerHTML
- SSM+基于ssm的汽车租赁平台的设计与实现 毕业设计-附源码211708
- vscode vim插件(updating)
- 信息的定义与特征,构成世界的三大要素:物质、能量、信息
- html file 英文,(转)如何将input type=file显示的浏览 变成英文的?
- 【Linux C】进程、线程和进程间通信