文章目录

  • 线程
    • 使用线程的优点
    • 事件驱动编程
    • 线程中的挑战
    • Example: web crawler
      • Serial crawler
      • ConcurrentMutex crawler
      • ConcurrentChannel crawler
  • RPC
    • 调用流程
    • 异常处理
      • at-least-once
      • at-most-once
      • exactly-once
    • Example:K/V

学习本次课程需要对Go语言有一定的了解,推荐Google官方的Go教程 A Tour of Go

线程

多个线程允许一个程序同时进行多项任务,每个线程内部程序串行运行,并且有自己的程序计数器、寄存器和栈空间。

使用线程的优点

多线程的应用在分布式系统中非常常见,因为它能够支持并发操作,非常契合分布式系统的特点。

  • I/O concurrency
    – 使用线程可以同时处理大量的I/O任务。一种常见的场景是,client构建多个线程来向不同的server发起rpc请求,每个请求线程得到响应后再执行对应的处理任务。
  • Multicore performance
    – 使用多线程可以最大限度的利用多核CPU的性能。多个线程可以由不同的CPU核心进行处理,不同CPU核心的线程拥有独立的CPU周期。
  • Convenience
    – 很多时候多线程可以大大简化我们的编程难度。比如在分布式系统中,我们想要每隔一定的时间进行一次事件检查(如 MapReduce中Master节点检查Worker是否异常),我们就可以创建一个线程,让其专门负责定期检查Worker是否存活。

事件驱动编程

如果要实现并发I/O,除了采取多线程的方式,还可以采用事件驱动编程的思想来实现,如epoll模型等。在事件驱动编程中,有一个线程会负责循环检测所有的事件状态,如客户端发起的rpc请求等,当该线程检测到事件到来时,如服务器响应rpc请求,该线程就会调用相应的处理函数,并继续进行循环监听。事件驱动编程相比多线程的实现方式有以下不同:

  • 优点
    – 开销更小(多线程的创建和删除以及空间占用远大于事件驱动)
  • 缺点
    – 无法充分利用多核CPU的性能
    – 实现较为复杂

线程中的挑战

在进行多线程编程时,通常需要仔细考虑以下几个重要问题。

  • shared data
    – 线程间是可以共享进程数据的,但是在使用共享数据的过程中,很可能会出现冲突问题。如两个线程同时执行n=n+1,由于读写的先后顺序不一致,程序产生的结果也会不一样。
  • coordination
    – 我们经常需要线程间能够相互协作,比如经典的消费者-生产者模型。在Go语言中,线程间的相互协作通常有以下几种实现方式,channelsync.Condsync.WaitGroup
  • deadlock

Example: web crawler

下面用一个简单的网页爬虫来展示Go中多线程的应用,对于一个网页爬虫,我们需要其从给定的url出发不断递归查询,并且每个url只能爬取一次。我们首先先给出基本的数据结构。

//
// main
//func main() {fmt.Printf("=== Serial===\n")Serial("http://golang.org/", fetcher, make(map[string]bool))fmt.Printf("=== ConcurrentMutex ===\n")ConcurrentMutex("http://golang.org/", fetcher, makeState())fmt.Printf("=== ConcurrentChannel ===\n")ConcurrentChannel("http://golang.org/", fetcher)
}//
// Fetcher
//type Fetcher interface {// Fetch returns a slice of URLs found on the page.Fetch(url string) (urls []string, err error)
}// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResulttype fakeResult struct {body stringurls []string
}func (f fakeFetcher) Fetch(url string) ([]string, error) {if res, ok := f[url]; ok {fmt.Printf("found:   %s\n", url)return res.urls, nil}fmt.Printf("missing: %s\n", url)return nil, fmt.Errorf("not found: %s", url)
}// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{"http://golang.org/": &fakeResult{"The Go Programming Language",[]string{"http://golang.org/pkg/","http://golang.org/cmd/",},},"http://golang.org/pkg/": &fakeResult{"Packages",[]string{"http://golang.org/","http://golang.org/cmd/","http://golang.org/pkg/fmt/","http://golang.org/pkg/os/",},},"http://golang.org/pkg/fmt/": &fakeResult{"Package fmt",[]string{"http://golang.org/","http://golang.org/pkg/",},},"http://golang.org/pkg/os/": &fakeResult{"Package os",[]string{"http://golang.org/","http://golang.org/pkg/",},},
}

fakeFetcher是一个网页爬虫器,其实现了Fetcher接口,其会根据给出的url找到对应的结果。爬虫的结构被存储到fakeResult中。

Serial crawler

//
// Serial crawler
//func Serial(url string, fetcher Fetcher, fetched map[string]bool) {if fetched[url] {return}fetched[url] = trueurls, err := fetcher.Fetch(url)if err != nil {return}for _, u := range urls {Serial(u, fetcher, fetched)}

在串行爬虫中,我们通过递归调用Serial函数来实现需求,但是这种方式一次只能爬一个网页,效率很低。

ConcurrentMutex crawler

我们使用shared data + WaitGroup的方式来实现并发爬虫。

//
// Concurrent crawler with shared state and Mutex
//type fetchState struct {mu      sync.Mutexfetched map[string]bool
}func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {f.mu.Lock()already := f.fetched[url]f.fetched[url] = truef.mu.Unlock()if already {return}urls, err := fetcher.Fetch(url)if err != nil {return}var done sync.WaitGroupfor _, u := range urls {done.Add(1)go func(u string) {defer done.Done()ConcurrentMutex(u, fetcher, f)}(u)}done.Wait()return
}func makeState() *fetchState {f := &fetchState{}f.fetched = make(map[string]bool)return f
}

相比串行版本,该版本进行了以下改变:

  • 由于我们需要每个goroutine在执行fetch任务时保证url的唯一性,因此我们需要使用一个map来作为线程间的共享变量。
  • ConcurrentMutex中,我们使用sync.Mutex来保证map结构的读写正确
  • 使用sync.WaitGroup来同步等待创建的gorountines执行完毕后再退出函数。
  • 在go func入口使用defer done.Done(),来确保即使goroutinue执行异常,也能正确的更新WaitGroup计数器

ConcurrentChannel crawler

func worker(url string, ch chan []string, fetcher Fetcher) {urls, err := fetcher.Fetch(url)if err != nil {ch <- []string{}} else {ch <- urls}
}func coordinator(ch chan []string, fetcher Fetcher) {n := 1fetched := make(map[string]bool)for urls := range ch {for _, u := range urls {if fetched[u] == false {fetched[u] = truen += 1go worker(u, ch, fetcher)}}n -= 1if n == 0 {break}}
}func ConcurrentChannel(url string, fetcher Fetcher) {ch := make(chan []string)go func() {ch <- []string{url}}()coordinator(ch, fetcher)
}

区别于使用MutexWaitGroup,我们也可以用channel来实现功能:

  • coordinator函数负责分配任务,worker负责执行任务
  • coordinatorchannel中循环读取数据,并使用变量n来记录分配的任务数量。
  • worker将查询到的结果放入到channel中,并等待coordinator接收。

RPC

调用流程

RPC用于clientserver进行远程通信的一种调用框架,其基本组成如下图所示。

RPC框架调用流程如下:

  • client调用server上的函数f(x,y)
  • client stub将调用的函数及相关参数进行打包,通过网络发送给server
  • server stub接收到数据包后进行参数和函数解析,调用server中的方法f(x,y)
  • server将函数调用解决通过server stub返回,返回过程与发送过程相同

异常处理

在进行RPC通信的时候,可能出现的异常情况是client在发送了rpc request之后,没有收到server的响应。对于这种异常错误,一般有以下几种处理机制。

at-least-once

client会一直等待server的回复,并不断的重复的发送请求,直到达到发送上限或受到服务器的应答。可以发现,这种处理机制对读操作是可以正常运行的,但是对于写操作,需要server有处理重复写操作的能力。比如有一个K/V数据库,我们要求使用Put(10)方法往银行账户上增长10块钱,如果server端没有处理重复写操作的能力,就会造成数据错误。

at-most-once

当发生异常时,server会检测重复的rpc request并且会返回之前运行的请求,而不是重新执行该请求。每个client在发送请求时,都会携带一个XID的唯一标识符,XID通常由随机数,IP和sequence number组合而成。

  server:if seen[xid]:r = old[xid]elser = handler()old[xid] = rseen[xid] = true

如何确保rpc可以安全的丢弃重复的rpc request,具体的做法可以参考TCP的实现思路。

Go中的rpc框架采用的简化版的at-most-once做法:

  • 使用的是TCP连接
  • client不会发送重复的request
  • client会返回错误,如果其没有受到response

exactly-once

舒服分布式系统中的难题,比较难实现,目前通用的解决方案是重传+冗余检测+异常处理。

Example:K/V

下面我们用一个简单的K/V数据库来学习如何用Go来实现RPC通信。该例子中的数据库包含两个功能,putgetput操作支持clientserver中插入一个任意的键值对数据,get操作支持client查询server中的数据。

package mainimport ("fmt""log""net""net/rpc""sync"
)//
// Common RPC request/reply definitions
//const (OK       = "OK"ErrNoKey = "ErrNoKey"
)type Err stringtype PutArgs struct {Key   stringValue string
}type PutReply struct {Err Err
}type GetArgs struct {Key string
}type GetReply struct {Err   ErrValue string
}//
// Client
//func connect() *rpc.Client {client, err := rpc.Dial("tcp", ":1234")if err != nil {log.Fatal("dialing:", err)}return client
}func get(key string) string {client := connect()args := GetArgs{"subject"}reply := GetReply{}err := client.Call("KV.Get", &args, &reply)if err != nil {log.Fatal("error:", err)}client.Close()return reply.Value
}func put(key string, val string) {client := connect()args := PutArgs{"subject", "6.824"}reply := PutReply{}err := client.Call("KV.Put", &args, &reply)if err != nil {log.Fatal("error:", err)}client.Close()
}//
// Server
//type KV struct {mu   sync.Mutexdata map[string]string
}func server() {kv := new(KV)kv.data = map[string]string{}rpcs := rpc.NewServer()rpcs.Register(kv)l, e := net.Listen("tcp", ":1234")if e != nil {log.Fatal("listen error:", e)}go func() {for {conn, err := l.Accept()if err == nil {go rpcs.ServeConn(conn)} else {break}}l.Close()}()
}func (kv *KV) Get(args *GetArgs, reply *GetReply) error {kv.mu.Lock()defer kv.mu.Unlock()val, ok := kv.data[args.Key]if ok {reply.Err = OKreply.Value = val} else {reply.Err = ErrNoKeyreply.Value = ""}return nil
}func (kv *KV) Put(args *PutArgs, reply *PutReply) error {kv.mu.Lock()defer kv.mu.Unlock()kv.data[args.Key] = args.Valuereply.Err = OKreturn nil
}//
// main
//func main() {server()put("subject", "6.824")fmt.Printf("Put(subject, 6.824) done\n")fmt.Printf("get(subject) -> %s\n", get("subject"))
}

下面对代码进行分析和解释。

通用数据结构

  • 定义getput的请求格式和应答格式
  • 定义K/V数据的基本格式,包括键值对和互斥锁

client流程

  • connect()函数用于与server建立tcp连接。
  • get()put()函数相当于client stubs,用于打包请求数据
  • Call()函数通知RPC准备发起请求。在本例中我们在Call之前,已经定义好了请求和应答的格式,RPC库会打包参数,发送请求,然后等待回复,收到回复后再根据回复格式解析参数。

server流程

  • 创建一个基于tcp连接的server,并将K/V数据库注册到RPC库中
  • RPC框架在收到请求后,会为新的请求启动一个goroutine。新线程会解析请求参数,并在已注册的服务中找到匹配的服务,调用对应的函数,打包执行结果写入TCP连接。
  • Get()Put()函数必须加锁,因为RPC会为每个请求都单独创建一个goroutine

MIT 6.824 Lec2.RPC and Threads相关推荐

  1. mit 6.824 Distributed System

    文章目录 LEC1 Introduction LEC2 RPC and Threads LEC3 GFS LEC4 Primary-Backup Replication LEC5 Go, Thread ...

  2. MIT 6.824 学习笔记(一)--- RPC 详解

    从本文开始,将记录作者学习 MIT 6.824 分布式系统的学习笔记,如果有志同道合者,欢迎一起交流. RPC 的定义和结构 RPC 全称为 Remote Procedure Call,他表示一种远程 ...

  3. MIT 6.824 l01 Introduction

    6.824 2020 Lecture 1: Introduction 6.824: Distributed Systems Engineering What is a distributed syst ...

  4. mit 6.824 lab1A解析

    1.前言 想不想自己实现一下分布式的主从选举算法?reids,tidb,kafka主从协调都用到了raft一致性算法.mit6.824分布式系统的第二次作业lab2A就是用golang实现一个简易版的 ...

  5. MIT 6.824 Lab2A (raft) -- Leader Election

    文章目录 实验要求 Leader Election流程 及详细实现介绍 基本角色 关键超时变量 关键的两个RPC实现 RequestVote RPC AppendEntries RPC Go并发编程实 ...

  6. MIT 6.824学习笔记

    introduction 为什么需要分布式计算? 并行,容错,物理因素(地理位置),安全 分布式计算面临的挑战? 并发性,部分错误,性能问题 基础设施 存储,通信,计算----目标是抽象成从外部看上去 ...

  7. MIT 6.824 Raft论文精读

    文章目录 Introduction Raft Consensus Algorithm Raft Basics Leader Election Log Replication Safety Electi ...

  8. MIT 6.824涉及的部分论文翻译

    引言 这篇文章用于记录在学习6.824过程中所涉及到的论文的翻译,以帮助像我一样的英语蒻蒻愉快的享受6.824.因为很多论文并不是很常见,导致很多连论文阅读笔记都没有,所以希望看到这篇文章的朋友找到或 ...

  9. 「论文自译」MIT 6.824 In Search of an Understandable Consensus Algorithm (Extended Version)

    文章目录 Abstract 1 - Introduction 2 - Replicated state machines 3 - What's wrong with Paxos? 4 - Design ...

最新文章

  1. 关于学习过程中走过的弯路
  2. MFC List Control 控件添加单元格编辑,实现可编辑重写
  3. JAVA基础-面向对象07
  4. Bluetooth Low Energy 嗅探
  5. C#StreamWriter的操作解析
  6. 【Qt】New Features in Qt 5.15
  7. 了解使用Android ConstraintLayout
  8. python学习笔记(10)--组合数据类型(序列类型)
  9. c语言编程安全队列,C语言编程队列的实现
  10. mysql+一致性非锁定读_MySQL探秘(六):InnoDB一致性非锁定读
  11. Oracle行迁移和行链接
  12. 【数据库系统设计】关系数据库标准语言SQL(3)
  13. Django开发密码管理表实例【附源码】
  14. java动态代理学习笔记
  15. java handlerbase_Java Firebase.AuthResultHandler方法代码示例
  16. linux find内容替换,利用find和sed批量替换文件内容
  17. 创建一个简单的MFC程序
  18. java项目经验总结
  19. 最详细农行招聘面试经历
  20. 泰拉瑞亚Terraria+蒲公英联机平台联机教程

热门文章

  1. html如何判断文本框是否为空,javascript怎么判断文本框是否为空?
  2. 成为女神同桌,我只用一行代码
  3. Linux手动安装Anaconda3与whl安装包
  4. 南京工业大学python试卷_南京工业大学《生物信息学》期末复习试卷.doc
  5. residural gated graph convnets 翻译
  6. 世界上最强大的SoC——NVIDIA Xavier学习笔记
  7. java二维数组坐标_Java 二维数组
  8. 【高效日程管理】基于iOS快捷指令自动化,这是一个记录长期目标、统计专注时长、记录健康数据(需要智能手环、手表)、并生成每日报告与自我总结的高效日程管理方式
  9. 中国app开发公司前十名的共同特点
  10. 【算法】灰狼算法GWO优化支持向量机回归SVR模型