MIT 6.824 Lec2.RPC and Threads
文章目录
- 线程
- 使用线程的优点
- 事件驱动编程
- 线程中的挑战
- Example: web crawler
- Serial crawler
- ConcurrentMutex crawler
- ConcurrentChannel crawler
- RPC
- 调用流程
- 异常处理
- at-least-once
- at-most-once
- exactly-once
- Example:K/V
学习本次课程需要对Go语言有一定的了解,推荐Google官方的Go教程 A Tour of Go
线程
多个线程允许一个程序同时进行多项任务,每个线程内部程序串行运行,并且有自己的程序计数器、寄存器和栈空间。
使用线程的优点
多线程的应用在分布式系统中非常常见,因为它能够支持并发操作,非常契合分布式系统的特点。
- I/O concurrency
– 使用线程可以同时处理大量的I/O任务。一种常见的场景是,client
构建多个线程来向不同的server
发起rpc
请求,每个请求线程得到响应后再执行对应的处理任务。 - Multicore performance
– 使用多线程可以最大限度的利用多核CPU的性能。多个线程可以由不同的CPU核心进行处理,不同CPU核心的线程拥有独立的CPU周期。 - Convenience
– 很多时候多线程可以大大简化我们的编程难度。比如在分布式系统中,我们想要每隔一定的时间进行一次事件检查(如 MapReduce中Master节点检查Worker是否异常),我们就可以创建一个线程,让其专门负责定期检查Worker是否存活。
事件驱动编程
如果要实现并发I/O,除了采取多线程的方式,还可以采用事件驱动编程的思想来实现,如epoll
模型等。在事件驱动编程中,有一个线程会负责循环检测所有的事件状态,如客户端发起的rpc
请求等,当该线程检测到事件到来时,如服务器响应rpc
请求,该线程就会调用相应的处理函数,并继续进行循环监听。事件驱动编程相比多线程的实现方式有以下不同:
- 优点
– 开销更小(多线程的创建和删除以及空间占用远大于事件驱动) - 缺点
– 无法充分利用多核CPU的性能
– 实现较为复杂
线程中的挑战
在进行多线程编程时,通常需要仔细考虑以下几个重要问题。
- shared data
– 线程间是可以共享进程数据的,但是在使用共享数据的过程中,很可能会出现冲突问题。如两个线程同时执行n=n+1
,由于读写的先后顺序不一致,程序产生的结果也会不一样。 - coordination
– 我们经常需要线程间能够相互协作,比如经典的消费者-生产者模型。在Go
语言中,线程间的相互协作通常有以下几种实现方式,channel
,sync.Cond
和sync.WaitGroup
。 - deadlock
Example: web crawler
下面用一个简单的网页爬虫来展示Go
中多线程的应用,对于一个网页爬虫,我们需要其从给定的url
出发不断递归查询,并且每个url
只能爬取一次。我们首先先给出基本的数据结构。
//
// main
//func main() {fmt.Printf("=== Serial===\n")Serial("http://golang.org/", fetcher, make(map[string]bool))fmt.Printf("=== ConcurrentMutex ===\n")ConcurrentMutex("http://golang.org/", fetcher, makeState())fmt.Printf("=== ConcurrentChannel ===\n")ConcurrentChannel("http://golang.org/", fetcher)
}//
// Fetcher
//type Fetcher interface {// Fetch returns a slice of URLs found on the page.Fetch(url string) (urls []string, err error)
}// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResulttype fakeResult struct {body stringurls []string
}func (f fakeFetcher) Fetch(url string) ([]string, error) {if res, ok := f[url]; ok {fmt.Printf("found: %s\n", url)return res.urls, nil}fmt.Printf("missing: %s\n", url)return nil, fmt.Errorf("not found: %s", url)
}// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{"http://golang.org/": &fakeResult{"The Go Programming Language",[]string{"http://golang.org/pkg/","http://golang.org/cmd/",},},"http://golang.org/pkg/": &fakeResult{"Packages",[]string{"http://golang.org/","http://golang.org/cmd/","http://golang.org/pkg/fmt/","http://golang.org/pkg/os/",},},"http://golang.org/pkg/fmt/": &fakeResult{"Package fmt",[]string{"http://golang.org/","http://golang.org/pkg/",},},"http://golang.org/pkg/os/": &fakeResult{"Package os",[]string{"http://golang.org/","http://golang.org/pkg/",},},
}
fakeFetcher是一个网页爬虫器,其实现了Fetcher接口,其会根据给出的url
找到对应的结果。爬虫的结构被存储到fakeResult
中。
Serial crawler
//
// Serial crawler
//func Serial(url string, fetcher Fetcher, fetched map[string]bool) {if fetched[url] {return}fetched[url] = trueurls, err := fetcher.Fetch(url)if err != nil {return}for _, u := range urls {Serial(u, fetcher, fetched)}
在串行爬虫中,我们通过递归调用Serial函数来实现需求,但是这种方式一次只能爬一个网页,效率很低。
ConcurrentMutex crawler
我们使用shared data + WaitGroup的方式来实现并发爬虫。
//
// Concurrent crawler with shared state and Mutex
//type fetchState struct {mu sync.Mutexfetched map[string]bool
}func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {f.mu.Lock()already := f.fetched[url]f.fetched[url] = truef.mu.Unlock()if already {return}urls, err := fetcher.Fetch(url)if err != nil {return}var done sync.WaitGroupfor _, u := range urls {done.Add(1)go func(u string) {defer done.Done()ConcurrentMutex(u, fetcher, f)}(u)}done.Wait()return
}func makeState() *fetchState {f := &fetchState{}f.fetched = make(map[string]bool)return f
}
相比串行版本,该版本进行了以下改变:
- 由于我们需要每个
goroutine
在执行fetch任务时保证url
的唯一性,因此我们需要使用一个map
来作为线程间的共享变量。 - 在
ConcurrentMutex
中,我们使用sync.Mutex
来保证map
结构的读写正确 - 使用
sync.WaitGroup
来同步等待创建的gorountines
执行完毕后再退出函数。 - 在go func入口使用
defer done.Done()
,来确保即使goroutinue
执行异常,也能正确的更新WaitGroup
计数器
ConcurrentChannel crawler
func worker(url string, ch chan []string, fetcher Fetcher) {urls, err := fetcher.Fetch(url)if err != nil {ch <- []string{}} else {ch <- urls}
}func coordinator(ch chan []string, fetcher Fetcher) {n := 1fetched := make(map[string]bool)for urls := range ch {for _, u := range urls {if fetched[u] == false {fetched[u] = truen += 1go worker(u, ch, fetcher)}}n -= 1if n == 0 {break}}
}func ConcurrentChannel(url string, fetcher Fetcher) {ch := make(chan []string)go func() {ch <- []string{url}}()coordinator(ch, fetcher)
}
区别于使用Mutex
和WaitGroup
,我们也可以用channel
来实现功能:
coordinator
函数负责分配任务,worker
负责执行任务coordinator
从channel
中循环读取数据,并使用变量n
来记录分配的任务数量。worker
将查询到的结果放入到channel
中,并等待coordinator
接收。
RPC
调用流程
RPC
用于client
与server
进行远程通信的一种调用框架,其基本组成如下图所示。
RPC
框架调用流程如下:
client
调用server
上的函数f(x,y)
client stub
将调用的函数及相关参数进行打包,通过网络发送给server
server stub
接收到数据包后进行参数和函数解析,调用server
中的方法f(x,y)
server
将函数调用解决通过server stub
返回,返回过程与发送过程相同
异常处理
在进行RPC
通信的时候,可能出现的异常情况是client
在发送了rpc request
之后,没有收到server
的响应。对于这种异常错误,一般有以下几种处理机制。
at-least-once
client
会一直等待server
的回复,并不断的重复的发送请求,直到达到发送上限或受到服务器的应答。可以发现,这种处理机制对读操作是可以正常运行的,但是对于写操作,需要server
有处理重复写操作的能力。比如有一个K/V数据库,我们要求使用Put(10)
方法往银行账户上增长10块钱,如果server
端没有处理重复写操作的能力,就会造成数据错误。
at-most-once
当发生异常时,server
会检测重复的rpc request
并且会返回之前运行的请求,而不是重新执行该请求。每个client
在发送请求时,都会携带一个XID
的唯一标识符,XID
通常由随机数,IP和sequence number组合而成。
server:if seen[xid]:r = old[xid]elser = handler()old[xid] = rseen[xid] = true
如何确保rpc
可以安全的丢弃重复的rpc request
,具体的做法可以参考TCP
的实现思路。
Go
中的rpc
框架采用的简化版的at-most-once
做法:
- 使用的是
TCP
连接 client
不会发送重复的request
client
会返回错误,如果其没有受到response
exactly-once
舒服分布式系统中的难题,比较难实现,目前通用的解决方案是重传+冗余检测+异常处理。
Example:K/V
下面我们用一个简单的K/V数据库来学习如何用Go
来实现RPC
通信。该例子中的数据库包含两个功能,put
和get
,put
操作支持client
向server
中插入一个任意的键值对数据,get
操作支持client
查询server
中的数据。
package mainimport ("fmt""log""net""net/rpc""sync"
)//
// Common RPC request/reply definitions
//const (OK = "OK"ErrNoKey = "ErrNoKey"
)type Err stringtype PutArgs struct {Key stringValue string
}type PutReply struct {Err Err
}type GetArgs struct {Key string
}type GetReply struct {Err ErrValue string
}//
// Client
//func connect() *rpc.Client {client, err := rpc.Dial("tcp", ":1234")if err != nil {log.Fatal("dialing:", err)}return client
}func get(key string) string {client := connect()args := GetArgs{"subject"}reply := GetReply{}err := client.Call("KV.Get", &args, &reply)if err != nil {log.Fatal("error:", err)}client.Close()return reply.Value
}func put(key string, val string) {client := connect()args := PutArgs{"subject", "6.824"}reply := PutReply{}err := client.Call("KV.Put", &args, &reply)if err != nil {log.Fatal("error:", err)}client.Close()
}//
// Server
//type KV struct {mu sync.Mutexdata map[string]string
}func server() {kv := new(KV)kv.data = map[string]string{}rpcs := rpc.NewServer()rpcs.Register(kv)l, e := net.Listen("tcp", ":1234")if e != nil {log.Fatal("listen error:", e)}go func() {for {conn, err := l.Accept()if err == nil {go rpcs.ServeConn(conn)} else {break}}l.Close()}()
}func (kv *KV) Get(args *GetArgs, reply *GetReply) error {kv.mu.Lock()defer kv.mu.Unlock()val, ok := kv.data[args.Key]if ok {reply.Err = OKreply.Value = val} else {reply.Err = ErrNoKeyreply.Value = ""}return nil
}func (kv *KV) Put(args *PutArgs, reply *PutReply) error {kv.mu.Lock()defer kv.mu.Unlock()kv.data[args.Key] = args.Valuereply.Err = OKreturn nil
}//
// main
//func main() {server()put("subject", "6.824")fmt.Printf("Put(subject, 6.824) done\n")fmt.Printf("get(subject) -> %s\n", get("subject"))
}
下面对代码进行分析和解释。
通用数据结构
- 定义
get
和put
的请求格式和应答格式 - 定义K/V数据的基本格式,包括键值对和互斥锁
client
流程
connect()
函数用于与server
建立tcp
连接。get()
和put()
函数相当于client stubs
,用于打包请求数据Call()
函数通知RPC
准备发起请求。在本例中我们在Call
之前,已经定义好了请求和应答的格式,RPC
库会打包参数,发送请求,然后等待回复,收到回复后再根据回复格式解析参数。
server
流程
- 创建一个基于
tcp
连接的server
,并将K/V
数据库注册到RPC
库中 RPC
框架在收到请求后,会为新的请求启动一个goroutine
。新线程会解析请求参数,并在已注册的服务中找到匹配的服务,调用对应的函数,打包执行结果写入TCP
连接。Get()
和Put()
函数必须加锁,因为RPC
会为每个请求都单独创建一个goroutine
。
MIT 6.824 Lec2.RPC and Threads相关推荐
- mit 6.824 Distributed System
文章目录 LEC1 Introduction LEC2 RPC and Threads LEC3 GFS LEC4 Primary-Backup Replication LEC5 Go, Thread ...
- MIT 6.824 学习笔记(一)--- RPC 详解
从本文开始,将记录作者学习 MIT 6.824 分布式系统的学习笔记,如果有志同道合者,欢迎一起交流. RPC 的定义和结构 RPC 全称为 Remote Procedure Call,他表示一种远程 ...
- MIT 6.824 l01 Introduction
6.824 2020 Lecture 1: Introduction 6.824: Distributed Systems Engineering What is a distributed syst ...
- mit 6.824 lab1A解析
1.前言 想不想自己实现一下分布式的主从选举算法?reids,tidb,kafka主从协调都用到了raft一致性算法.mit6.824分布式系统的第二次作业lab2A就是用golang实现一个简易版的 ...
- MIT 6.824 Lab2A (raft) -- Leader Election
文章目录 实验要求 Leader Election流程 及详细实现介绍 基本角色 关键超时变量 关键的两个RPC实现 RequestVote RPC AppendEntries RPC Go并发编程实 ...
- MIT 6.824学习笔记
introduction 为什么需要分布式计算? 并行,容错,物理因素(地理位置),安全 分布式计算面临的挑战? 并发性,部分错误,性能问题 基础设施 存储,通信,计算----目标是抽象成从外部看上去 ...
- MIT 6.824 Raft论文精读
文章目录 Introduction Raft Consensus Algorithm Raft Basics Leader Election Log Replication Safety Electi ...
- MIT 6.824涉及的部分论文翻译
引言 这篇文章用于记录在学习6.824过程中所涉及到的论文的翻译,以帮助像我一样的英语蒻蒻愉快的享受6.824.因为很多论文并不是很常见,导致很多连论文阅读笔记都没有,所以希望看到这篇文章的朋友找到或 ...
- 「论文自译」MIT 6.824 In Search of an Understandable Consensus Algorithm (Extended Version)
文章目录 Abstract 1 - Introduction 2 - Replicated state machines 3 - What's wrong with Paxos? 4 - Design ...
最新文章
- 关于学习过程中走过的弯路
- MFC List Control 控件添加单元格编辑,实现可编辑重写
- JAVA基础-面向对象07
- Bluetooth Low Energy 嗅探
- C#StreamWriter的操作解析
- 【Qt】New Features in Qt 5.15
- 了解使用Android ConstraintLayout
- python学习笔记(10)--组合数据类型(序列类型)
- c语言编程安全队列,C语言编程队列的实现
- mysql+一致性非锁定读_MySQL探秘(六):InnoDB一致性非锁定读
- Oracle行迁移和行链接
- 【数据库系统设计】关系数据库标准语言SQL(3)
- Django开发密码管理表实例【附源码】
- java动态代理学习笔记
- java handlerbase_Java Firebase.AuthResultHandler方法代码示例
- linux find内容替换,利用find和sed批量替换文件内容
- 创建一个简单的MFC程序
- java项目经验总结
- 最详细农行招聘面试经历
- 泰拉瑞亚Terraria+蒲公英联机平台联机教程
热门文章
- html如何判断文本框是否为空,javascript怎么判断文本框是否为空?
- 成为女神同桌,我只用一行代码
- Linux手动安装Anaconda3与whl安装包
- 南京工业大学python试卷_南京工业大学《生物信息学》期末复习试卷.doc
- residural gated graph convnets 翻译
- 世界上最强大的SoC——NVIDIA Xavier学习笔记
- java二维数组坐标_Java 二维数组
- 【高效日程管理】基于iOS快捷指令自动化,这是一个记录长期目标、统计专注时长、记录健康数据(需要智能手环、手表)、并生成每日报告与自我总结的高效日程管理方式
- 中国app开发公司前十名的共同特点
- 【算法】灰狼算法GWO优化支持向量机回归SVR模型