微吐槽

hello,world.


不想了,我等码农,还是看看怎么来处理分布式系统中的事务这个老大难吧!

本文略长,读者需要有一定耐心,如果你是高级码农或者架构师级别,你可以跳过。
本文注重实战或者实现,不涉及CAP,略提ACID。
本文适合基础分布式程序员:

  1. 本文会涉及集群中节点的failover和recover问题.
  2. 本文会涉及事务及不透明事务的问题.
  3. 本文会提到微博和tweeter,并引出一个大数据问题.

由于分布式这个话题太大,事务这个话题也太大,我们从一个集群的一个小小节点开始谈起。

集群中存活的节点与同步

分布式系统中,如何判断一个节点(node)是否存活?
kafka这样认为:

  1. 此节点和zookeeper能喊话.(Keep sessions with zookeeper through heartbeats.)
  2. 此节点如果是个从节点,必须能够尽可能忠实地反映主节点的数据变化。
    也就是说,必须能够在主节点写了新数据后,及时复制这些变化的数据,所谓及时,不能拉下太多哦.

那么,符合上面两个条件的节点就可以认为是存活的,也可以认为是同步的(in-sync).

关于第1点,大家对心跳都很熟悉,那么我们可以这样认为某个节点不能和zookeeper喊话了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
zookeeper-node:
var timer =
newtimer()
.setInterval(10sec)
.onTime(slave-nodes,function(slave-nodes){
    slave-nodes.forEach( node -> {
        booleanisAlive = node.heartbeatACK(15sec);
        if(!isAlive) {
            node.numNotAlive += 1;
            if(node.numNotAlive >= 3) {
                node.declareDeadOrFailed();
                slave-nodes.remove(node);
                //回调也可 leader-node-app.notifyNodeDeadOrFailed(node)
            }
        }else
        node.numNotAlive = 0;
    });
});
timer.run();
//你可以回调也可以像下面这样简单的计时判断
leader-node-app:
var timer =
newtimer()
.setInterval(10sec)
.onTime(slave-nodes,function(slave-nodes){
    slave-nodes.forEach(node -> {
        if(node.isDeadOrFailed) {
        //node不能和zookeeper喊话了
        }
    });
});
timer.run();

关于第二点,要稍微复杂点了,怎么搞呢?
来这么分析:

  • 数据 messages.
  • 操作 op-log.
  • 偏移 position/offset.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 1. 先考虑messages
// 2. 再考虑log的postion或者offset
// 3. 考虑msg和off都记录在同源数据库或者存储设备上.(database or storage-device.)
vartimer =
newtimer()
.setInterval(10sec)
.onTime(slave-nodes,function(nodes){
    varcore-of-cpu = 8;
    //嫌慢就并发呗 mod hash go!
    nodes.groupParallel(core-of-cpu)
    .forEach(node -> {
        boolean nodeSucked = false;
        if(node.ackTimeDiff > 30sec) {
            //30秒内没有回复,node卡住了
            nodeSucked = true;
        }
        if(node.logOffsetDiff > 100) {
            //node复制跟不上了,差距超过100条数据
            nodeSucked = true;
        }
        if(nodeSucked) {
            //总之node“死”掉了,其实到底死没死,谁知道呢?network-error在分布式系统中或者节点失败这个事情是正常现象.
            node.declareDeadOrFailed();
            //不和你玩啦,集群不要你了
            nodes.remove(node);
            //该怎么处理呢,抛个事件吧.
            fire-event-NodeDeadOrFailed(node);
        }
    });
});
timer.run();

上面的节点的状态管理一般由zookeeper来做,leader或者master节点也会维护那么点状态。

那么应用中的leader或者master节点,只需要从zookeeper拉状态就可以,同时,上面的实现是不是一定最佳呢?不是的,而且多数操作可以合起来,但为了描述节点是否存活这个事儿,咱们这么写没啥问题。

节点死掉、失败、不同步了,咋处理呢?

好嘛,终于说到failover和recover了,那failover比较简单,因为还有其它的slave节点在,不影响数据读取。

  1. 同时多个slave节点失败了?
    没有100%的可用性.数据中心和机房瘫痪、网络电缆切断、hacker入侵删了你的根,总之你rp爆表了.
  2. 如果主节点失败了,那master-master不行嘛?
    keep-alived或者LVS或者你自己写failover吧.
    高可用架构(HA)又是个大件儿了,此文不展开了。

我们来关注下recover方面的东西,这里把视野打开点,不仅关注slave节点重启后追log来同步数据,我们看下在实际应用中,数据请求(包括读、写、更新)失败怎么办?

大家可能都会说,重试(retry)呗、重放(replay)呗或者干脆不管了呗!
行,都行,这些都是策略,但具体怎么个搞法,你真的清楚了?


一个bigdata问题

我们先摆个探讨的背景:

问题:消息流,比如微博的微博(真绕),源源不断地流进我们的应用中,要处理这些消息,有个需求是这样的:

Reach is the number of unique people exposed to a URL on Twitter.

那么,统计一下3小时内的本条微博(url)的reach总数。

怎么解决呢?

把某时间段内转发过某条微博(url)的人拉出来,把这些人的粉丝拉出来,去掉重复的人,然后求总数,就是要求的reach.

为了简单,我们忽略掉日期,先看看这个方法行不行:

1
2
3
4
5
6
7
8
9
10
11
12
/** ---------------------------------
* 1. 求出转发微博(url)的大V.
* __________________________________*/
方法 :getUrlToTweetersMap(String url_id)
SQL : /* 数据库A,表url_user存储了转发某url的user */
SELECT url_user.user_id as tweeter_id
FROM url_user
WHERE url_user.url_id = ${url_id}
返回 :[user_1,...,user_m]

1
2
3
4
5
6
7
8
9
10
11
12
/** ---------------------------------
* 2. 求出大V的粉丝
* __________________________________*/
方法 : getFollowers(String tweeter_id);
SQL :   /* 数据库B */
SELECT users.id as user_id
FROM users
WHERE users.followee_id = ${tweeter_id}
返回:tweeter的粉丝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** ---------------------------------
* 3. 求出Reach
* __________________________________*/
varurl = queryArgs.getUrl();
vartweeters = getUrlToTweetersMap();
varresult = newHashMap<String,Integer>();
tweeters.forEach(t -> {
    // 你可以批量in + 并发读来优化下面方法的性能
    varfollowers = getFollowers(t.tweeter_id);
    followers.forEach(f -> {
        //hash去重
        result.put(f.user_id,1);
    });
});
//Reach
returnresult.size();

其实这又引出了一个很重要的问题,也是很多大谈框架、设计、模式却往往忽视的问题:性能和数据库建模的关系。

  1. 数据量有多大?
    不知道读者有木有对这个问题的数据库I/O有点想法,或者虎躯一震呢?
    Computing reach is too intense for a single machine – it can require thousands of database calls and tens of millions of tuples.
    在上面的数据库设计中避免了JOIN,为了提高求大V粉丝的性能,可以将一批大V作为batch/bulk,然后多个batch并发读,誓死搞死数据库。
    这里将微博到转发者表所在的库,与粉丝库分离,如果数据更大怎么办?
    库再分表…
    OK,假设你已经非常熟悉传统关系型数据库的分库分表及数据路由(读路径的聚合、写路径的分发)、或者你对于sharding技术也很熟悉、或者你良好的结合了HBase的横向扩展能力并有一致性策略来解决其二级索引问题.
    总之,存储和读取的问题假设你已经解决了,那么分布式计算呢?
  2. 微博这种应用,人与人之间的关系成图状(网),你怎么建模存储?而不仅仅对应这个问题,比如:
    某人的好友的好友可能和某人有几分相熟?

看看用storm怎么来解决分布式计算,并提供流式计算的能力:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// url到大V -> 数据库1
TridentState urlToTweeters =
    topology.newStaticState(getUrlToTweetersState());
// 大V到粉丝 -> 数据库2
TridentState tweetersToFollowers =
    topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream("reach")
    .stateQuery(urlToTweeters,newFields("args"),newMapGet(), newFields("tweeters"))
    .each(newFields("tweeters"),newExpandList(), newFields("tweeter"))
    .shuffle()/* 大V的粉丝很多,所以需要分布式处理*/
    .stateQuery(tweetersToFollowers,newFields("tweeter"),newMapGet(), newFields("followers"))
    .parallelismHint(200)/* 粉丝很多,所以需要高并发 */
    .each(newFields("followers"),newExpandList(), newFields("follower"))
    .groupBy(newFields("follower"))
    .aggregate(newOne(), newFields("one"))/* 去重 */
    .parallelismHint(20)
    .aggregate(newCount(), newFields("reach"));/* 计算reach数 */

最多处理一次(At most once)

回到主题,引出上面的例子,一是为了引出一个有关分布式(存储+计算)的问题,二是透漏这么点意思:
码农,就应该关注设计和实现的东西,比如Jay Kreps是如何发明Kafka这个轮子的 : ]

如果你还是码农级别,咱来务点实吧,前面我们说到recover,节点恢复的问题,那么我们恢复几个东西?

基本的:

  • 节点状态
  • 节点数据

本篇从数据上来讨论下这个问题,为使问题再简单点,我们考虑写数据的场景,如果我们用write-ahead-log的方式来保证数据复制和一致性,那么我们会怎么处理一致性问题呢?

  1. 主节点有新数据写入.
  2. 从节点追log,准备复制这批新数据。从节点做两件事:
    (1). 把数据的id偏移写入log;
    (2). 正要处理数据本身,从节点挂了。

那么根据上文的节点存活条件,这个从节点挂了这件事被探测到了,从节点由维护人员手动或者其自己恢复了,那么在加入集群和小伙伴们继续玩耍之前,它要同步自己的状态和数据。
问题来了:

如果根据log内的数据偏移来同步数据,那么,因为这个节点在处理数据之前就把偏移写好了,可是那批数据lost-datas没有得到处理,如果追log之后的数据来同步,那么那批数据lost-datas就丢了。

在这种情况下,就叫作数据最多处理一次,也就是说数据会丢失。

最少处理一次(At least once)

好吧,丢失数据不能容忍,那么我们换种方式来处理:

  1. 主节点有新数据写入.
  2. 从节点追log,准备复制这批新数据。从节点做两件事:
    (1). 先处理数据;
    (2). 正要把数据的id偏移写入log,从节点挂了。

问题又来了:

如果从节点追log来同步数据,那么因为那批数据duplicated-datas被处理过了,而数据偏移没有反映到log中,如果这样追,会导致这批数据重复。

这种场景,从语义上来讲,就是数据最少处理一次,意味着数据处理会重复。


仅处理一次(Exactly once)

Transaction

好吧,数据重复也不能容忍?要求挺高啊。
大家都追求的强一致性保证(这里是最终一致性),怎么来搞呢?
换句话说,在更新数据的时候,事务能力如何保障呢?
假设一批数据如下:

1
2
3
4
5
6
// 新到数据
{
    transactionId:4
    urlId:99
    reach:5
}

现在要更新这批数据到库里或者log里,那么原来的情况是:

1
2
3
4
5
6
// 老数据
{
    transactionId:3
    urlId:99
    reach:3
}

如果说可以保证如下三点:

  1. 事务ID的生成是强有序的.(隔离性,串行)
  2. 同一个事务ID对应的一批数据相同.(幂等性,多次操作一个结果)
  3. 单条数据会且仅会出现在某批数据中.(一致性,无遗漏无重复)

那么,放心大胆的更新好了:

1
2
3
4
5
6
7
// 更新后数据
{
    transactionId:4
    urlId:99
    //3 + 5 = 8
    reach:8
}

注意到这个更新是ID偏移和数据一起更新的,那么这个操作靠什么来保证:原子性。
你的数据库不提供原子性?后文略有提及。

这里是更新成功了。如果更新的时候,节点挂了,那么库里或者log里的id偏移不写,数据也不处理,等节点恢复,就可以放心去同步,然后加入集群玩耍了。

所以说,要保证数据仅处理一次,还是挺困难的吧?

上面的保障“仅处理一次”这个语义的实现有什么问题呢?

性能问题。

这里已经使用了batch策略来减少到库或磁盘的Round-Trip Time,那么这里的性能问题是什么呢?

考虑一下,采用master-master架构来保证主节点的可用性,但是一个主节点失败了,到另一个主节点主持工作,是需要时间的。
假设从节点正在同步,啪!主节点挂了!因为要保证仅处理一次的语义,所以原子性发挥作用,失败,回滚,然后从主节点拉失败的数据(你不能就近更新,因为这批数据可能已经变化了,或者你根本没缓存本批数据),结果是什么呢?

老主节点挂了, 新的主节点还没启动,所以这次事务就卡在这里,直到数据同步的源——主节点可以响应请求。

如果不考虑性能,就此作罢,这也不是什么大事。

你似乎意犹未尽?来吧,看看“银弹”是什么?

Opaque-Transaction

现在,我们来追求这样一种效果:

某条数据在一批数据中(这批数据对应着一个事务),很可能会失败,但是它会在另一批数据中成功。
换句话说,一批数据的事务ID一定相同。

来看看例子吧,老数据不变,只是多了个字段:prevReach

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 老数据
{
    transactionId:3
    urlId:99
    //注意这里多了个字段,表示之前的reach的值
    prevReach:2
    reach:3
}
// 新到数据
{
    transactionId:4
    urlId:99
    reach:5
}

这种情况,新事务的ID更大、更靠后,表明新事务可以执行,还等什么,直接更新,更新后数据如下:

1
2
3
4
5
6
7
8
9
// 新到数据
{
    transactionId:4
    urlId:99
    //注意这里更新为之前的值
    prevReach:3
    //3 + 5 = 8
    reach:8
}

现在来看下另外的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 老数据
{
    transactionId:3
    urlId:99
    prevReach:2
    reach:3
}
// 新到数据
{
    //注意事务ID为3,和老数据中的事务ID相同
    transactionId:3
    urlId:99
    reach:5
}

这种情况怎么处理?是跳过吗?因为新数据的事务ID和库里或者log里的事务ID相同,按事务要求这次数据应该已经处理过了,跳过?
不,这种事不能靠猜的,想想我们有的几个性质,其中关键一点就是:

给定一批数据,它们所属的事务ID相同。

仔细体会下,上面那句话和下面这句话的差别:
给定一个事务ID,任何时候,其所关联的那批数据相同。

我们应该这么做,考虑到新到数据的事务ID和存储中的事务ID一致,所以这批数据可能被分别或者异步处理了,但是,这批数据对应的事务ID永远是同一个,那么,即使这批数据中的A部分先处理了,由于大家都是一个事务ID,那么A部分的前值是可靠的。

所以,我们将依靠prevReach而不是Reach的值来更新:

1
2
3
4
5
6
7
8
9
// 更新后数据
{
    transactionId:3
    urlId:99
    //这个值不变
    prevReach:2
    //2 + 5 = 7
    reach:7
}

你发现了什么呢?
不同的事务ID,导致了不同的值:

  1. 当事务ID为4,大于存储中的事务ID3,Reach更新为3+5 = 8.
  2. 当事务ID为3,等于存储中的事务ID3,Reach更新为2+5 = 7.

这就是Opaque Transaction.

这种事务能力是最强的了,可以保证事务异步提交。所以不用担心被卡住了,如果说集群中:

Transaction:

  • 数据是分批处理的,每个事务ID对应一批确定、相同的数据.
  • 保证事务ID的产生是强有序的.
  • 保证分批的数据不重复、不遗漏.
  • 如果事务失败,数据源丢失,那么后续事务就卡住直到数据源恢复.

Opaque-Transaction:

  • 数据是分批处理的,每批数据有确定而唯一的事务ID.
  • 保证事务ID的产生是强有序的.
  • 保证分批的数据不重复、不遗漏.
  • 如果事务失败,数据源丢失,不影响后续事务,除非后续事务的数据源也丢了.

其实这个全局ID的设计也是门艺术:

  • 冗余关联表的ID,以减少join,做到O(1)取ID.
  • 冗余日期(long型)字段,以避免order by.
  • 冗余过滤字段,以避免无二级索引(HBase)的尴尬.
  • 存储mod-hash的值,以方便分库、分表后,应用层的数据路由书写.

这个内容也太多,话题也太大,就不在此展开了。

你现在知道twitter的snowflake生成全局唯一且有序的ID的重要性了。


两阶段提交

现在用zookeeper来做两阶段提交已经是入门级技术,所以也不展开了。

如果你的数据库不支持原子操作,那么考虑两阶段提交吧。


结语

To be continued.

from: http://www.importnew.com/23597.html

你真的很熟分布式和事务吗?相关推荐

  1. 面试官:你说对MySQL事务很熟?那我问你10个问题

    大部分人学习和工作中用惯了CRUD,对面试官刨根问底的灵魂拷问你还能对答如流吗?我们有必要了解一些更深层次的数据库基础原理. 整理了面试中,关于MySQL事务和存储引擎10个FAQ(Frequentl ...

  2. 面试官:你说对 MySQL 事务很熟?那我问你 10 个问题

    作者 | LemonCoder 责编 | 胡巍巍 本文系作者投稿 学习关系型数据库MySQL是很好的切入点,大部分人学习和工作中用惯了CRUD,对面试官刨根问底的灵魂拷问你还能对答如流吗?我们有必要了 ...

  3. 值得推荐的C/C++框架和库 (真的很强大)

    值得学习的C语言开源项目 - 1. Webbench Webbench是一个在linux下使用的非常简单的网站压测工具.它使用fork()模拟多个客户端同时访问我们设定的URL,测试网站在压力下工作的 ...

  4. 天将 转:值得推荐的C/C++框架和库(真的很强大)

    转:值得推荐的C/C++框架和库(真的很强大) - 天将 - 博客园 天将 转:值得推荐的C/C++框架和库(真的很强大) 值得学习的C语言开源项目 - 1 Webbench - 2 Tinyhttp ...

  5. 分布式的事务该怎么做?

    分布式八大坑 分布式就是魔鬼啊! 张大胖最近十分感慨,他所在的公司原来有个电商系统,后来随着用户量越来越大,对系统的可用性要求越来越高. CTO要求把系统进行拆分, 从一个单体的应用,拆分成微服务组成 ...

  6. 《看聊天记录都学不会C语言?太菜了吧》(15)你学了一节课的函数我5分钟搞定了,还很熟

    好消息2020年4月13日晚7.30我在CSDN开播,等你来聊天 预约连接:https://live.csdn.net/room/A757291228/MJWK0Gem 本系列文章将会以通俗易懂的对话 ...

  7. CSDN开发者周刊 TDengine:专为物联网订制的大数据平台 YugaByte DB:高性能的分布式ACID事务数据库

    CSDN开发者周刊:   TDengine:专为物联网订制的大数据平台 YugaByte DB:高性能的分布式ACID事务数据库 CSDN开发者周刊:只为传递"有趣/有用"的开发者 ...

  8. 做饭真的很简单!做饭扫盲贴!

    http://bbs.mosh.cn/889225.html   <上一页 12下一页>   第1页第2页 评分 Is meng 威望值 530/800 发表 23 回复 345 积分 1 ...

  9. 一个人窝在摇椅里乘凉 我承认这样真的很安详 和楼下老爷爷一样

    素颜 作词:许嵩 作曲:许嵩 演唱:许嵩&何曼婷 又是一个安静的晚上 一个人窝在摇椅里乘凉 我承认这样真的很安详 和楼下老爷爷一样 听说你还在搞什么原创 搞来搞去好像也就这样 不如花点时间想想 ...

最新文章

  1. OO第三单元作业总结
  2. 北方大学 ACM 多校训练赛 第十五场 欢度6.18
  3. 【java】字符串和基本数据类型之间的转换
  4. Mathematica 13 for Mac(科学计算软件)
  5. 聚焦智能制造 香洲区产学研资对接合作活动 盈致科技成功牵手北理珠
  6. JsonFormat注解转换时间错误问题解决方案
  7. mysql relay log 查看_Mysql-relay log
  8. 远程桌面计算机密码是多少,局域网远程桌面连接密码
  9. 巧用防火墙端口来禁止迅雷、BT等软件
  10. 使用 Marvelous Designer 为DAZ Studio 的 Genesis8 Female做衣服 1
  11. java使用aspose实现Excel转PDF加入密码保护并解密
  12. 面试官问:MySQL锁与事物隔离级别你知道吗?
  13. Docker容器之harbor私有仓库部署与管理
  14. python红楼梦人物统计_基于共现使用Python来分析《红楼梦》中贾宝玉与金陵十二钗的关系...
  15. Linux常用命令及解析
  16. 【OBS】vs2019 + QT5.15.2 : obs-studio-27.2.4 configure和vs工程生成
  17. Cris 玩转大数据系列之消息队列神器 Kafka
  18. wubi-shell-crack
  19. 从此以后不再用“陈桥五笔”
  20. 两个局域网的引起的头脑风暴

热门文章

  1. 【Git】Git教程
  2. 新闻上的文本分类:机器学习大乱斗 王岳王院长 王岳王院长 5 个月前 目标 从头开始实践中文短文本分类,记录一下实验流程与遇到的坑 运用多种机器学习(深度学习 + 传统机器学习)方法比较短文本分类处
  3. 史蒂夫·乔布斯很懂团队建设
  4. 用c语言编程求主析取范式,求主析取范式.cpp · wangzhankun/C-Programming-Learn - Gitee.com...
  5. BM算法的shift1表是在所有情况下移动都是最快的吗?
  6. 实战并发编程 - 07循环等待死锁问题
  7. android:descendantFocusability
  8. html怎么消除打印的进纸,打印机缺纸状态怎么消除?
  9. python 两数之和
  10. android gradle is插件,android gradle 插件创建 configuration