一、ReactiveX & RxGo介绍


ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。中文文档


Go 语言的 RxGo 看上去就像 go 入门不久的人写的,很怪异。 但 RxJava 库写的很好。

pmlpml/RxGo 模仿 Java 版写了 Go 版本新实现,已基本实现了 Creating Observables 和 Transforming Observables 两类算子。



硬件信息:使用virtual box配置虚拟机(内存3G、磁盘30G)
编程语言:GO 1.15.2


阅读 ReactiveX 文档。请在 pmlpml/RxGo 基础上,

  1. 修改、改进它的实现
  2. 或添加一组新的操作,如 filtering


rxgo.go 给出了基础类型、抽象定义、框架实现、Debug工具等

generators.go 给出了 sourceOperater 的通用实现和具体函数实现

transforms.go 给出了 transOperater 的通用实现和具体函数实现



Filtering Observables: Operators that selectively emit items from a source Observable.

  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
  • Distinct — suppress duplicate items emitted by an Observable
  • ElementAt — emit only item n emitted by an Observable
  • Filter — emit only those items from an Observable that pass a predicate test
  • First — emit only the first item, or the first item that meets a condition, from an Observable
  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
  • Last — emit only the last item emitted by an Observable
  • Sample — emit the most recent item emitted by an Observable within periodic time intervals
  • Skip — suppress the first n items emitted by an Observable
  • SkipLast — suppress the last n items emitted by an Observable
  • Take — emit only the first n items emitted by an Observable
  • TakeLast — emit only the last n items emitted by an Observable



// filtering node implementation of streamOperator
type filteringOperator struct {opFunc func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool)
}//op op函数
func (sop filteringOperator) op(ctx context.Context, o *Observable) {// must hold defintion of flow resourcs here, such as chan etc., that is allocated when connected// this resurces may be changed when operation routine is running.in := o.pred.outflowout := o.outflow//fmt.Println(o.name, "operator in/out chan ", in, out)// Schedulergo func() {end := falsefor x := range in {if end {break}// can not pass a interface as parameter (pointer) to gorountion for it may change its value outside!temp := reflect.ValueOf(x)// send an error to stream if the flip not accept errorerr, ok := x.(error)if ok && !o.flip_accept_error {o.sendToFlow(ctx, err, out)continue}if sop.opFunc(ctx, o, temp, out) {end = true}}if o.flip != nil {buffer := (reflect.ValueOf(o.flip))for i := 0; i < buffer.Len(); i++ {o.sendToFlow(ctx, buffer.Index(i).Interface(), out)}}o.closeFlow(out)}()
}//newFilterObservable 新建一个Filter Observabl
func (parent *Observable) newFilterObservable(name string) (o *Observable) {//new Observableo = newObservable()o.Name = name//chain Observablesparent.next = oo.pred = parento.root = parent.root//set optionso.buf_len = BufferLenreturn o

3. Debounce函数


var count = 0
var tempTime time.Duration//Debounce 按时间防抖动
func (parent *Observable) Debounce(timespan time.Duration) (o *Observable) {tempTime = timespano = parent.newFilterObservable("debounce")o.flip_accept_error = trueo.flip_sup_ctx = truecount = 0o.operator = debounceOpreturn o
}var debounceOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {count++go func() {tempCount := counttime.Sleep(tempTime)select {case <-ctx.Done():returndefault:if tempCount == count {o.sendToFlow(ctx, item.Interface(), out)}}}()return false},

4. Distinct函数


var tempMap map[string]bool//Distinct 过滤掉重复出现的元素
func (parent *Observable) Distinct() (o *Observable) {o = parent.newFilterObservable("distinct")o.flip_accept_error = trueo.flip_sup_ctx = truetempMap = map[string]bool{}o.operator = distinctOpreturn o
}var distinctOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {itemStr := fmt.Sprintf("%v", item)_, ok := tempMap[itemStr]if !ok {tempMap[itemStr] = trueo.sendToFlow(ctx, item.Interface(), out)}return false},

5. ElementAt函数


var tempNum int//ElementAt 取第几个元素
func (parent *Observable) ElementAt(num int) (o *Observable) {tempNum = numo = parent.newFilterObservable("elementAt")o.flip_accept_error = trueo.flip_sup_ctx = truecount = 0o.operator = elementAtOpreturn o
}var elementAtOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {if count == tempNum {o.sendToFlow(ctx, item.Interface(), out)return true}count++return false},

6. Filter函数


// Filter `func(x anytype) bool` filters items in the original Observable and returns
// a new Observable with the filtered items.
func (parent *Observable) Filter(f interface{}) (o *Observable) {// check validation of ffv := reflect.ValueOf(f)inType := []reflect.Type{typeAny}outType := []reflect.Type{typeBool}b, ctx_sup := checkFuncUpcast(fv, inType, outType, true)if !b {panic(ErrFuncFlip)}o = parent.newTransformObservable("filter")o.flip_accept_error = checkFuncAcceptError(fv)o.flip_sup_ctx = ctx_supo.flip = fv.Interface()o.operator = filterOperaterreturn o
}var filterOperater = transOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {fv := reflect.ValueOf(o.flip)var params = []reflect.Value{x}rs, skip, stop, e := userFuncCall(fv, params)var item interface{} = rs[0].Interface()if stop {end = truereturn}if skip {return}if e != nil {item = e}// send dataif !end {if b, ok := item.(bool); ok && b {end = o.sendToFlow(ctx, x.Interface(), out)}}return

7. First函数


//First 完成时返回第一个元素
func (parent *Observable) First() (o *Observable) {o = parent.newFilterObservable("first")o.flip_accept_error = trueo.flip_sup_ctx = trueo.operator = firstOpreturn o
}var firstOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {o.sendToFlow(ctx, item.Interface(), out)return true},

8. IgnoreElements函数


9. Last函数


//Last 完成时返回最后一个元素
func (parent *Observable) Last() (o *Observable) {o = parent.newFilterObservable("last")o.flip_accept_error = trueo.flip_sup_ctx = trueo.operator = lastOpreturn o
}var lastOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {o.flip = append([]interface{}{}, item.Interface())return false},

10. Sample函数


var tempSample chan interface{}//Sample 定期发射Observable最近发射的数据项
func (parent *Observable) Sample(sample chan interface{}) (o *Observable) {tempSample = sampleo = parent.newFilterObservable("sample")o.flip_accept_error = trueo.flip_sup_ctx = trueo.operator = sampleOPreturn o
}var sampleOP = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {var latest interface{} = nillatest = item.Interface()go func() {tempEnd := truefor tempEnd {select {case <-ctx.Done():tempEnd = truecase <-tempSample:if latest != nil {if o.sendToFlow(ctx, latest, out) {tempEnd = false}latest = nil}}}}()return false},

11. Skip函数


//Skip 跳过前n个数据
func (parent *Observable) Skip(num int) (o *Observable) {tempNum = numo = parent.newFilterObservable("skip")o.flip_accept_error = trueo.flip_sup_ctx = truecount = 0o.operator = skipOpreturn o
}var skipOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {count++if count > tempNum {o.sendToFlow(ctx, item.Interface(), out)}return false},

12. SkipLast函数


var tempLasts []interface{}//SkipLast 跳过最后n个数据
func (parent *Observable) SkipLast(num int) (o *Observable) {tempNum = numo = parent.newFilterObservable("skipLast")o.flip_accept_error = trueo.flip_sup_ctx = truecount = 0o.operator = skipLastOpreturn o
}var skipLastOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {//var lasts []interface{}if count == tempNum {o.sendToFlow(ctx, tempLasts[0], out)tempLasts = tempLasts[1:]} else {count++}tempLasts = append(tempLasts, item.Interface())return false},

13. Take函数


//Take 取前n个数据
func (parent *Observable) Take(num int) (o *Observable) {tempNum = numo = parent.newFilterObservable("take")o.flip_accept_error = trueo.flip_sup_ctx = truecount = 0o.operator = takeOpreturn o
}var takeOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {count++if count > tempNum {return true}o.sendToFlow(ctx, item.Interface(), out)return false},

14. TakeLast函数


var tempLasts2 []interface{}//TakeLast 取最后n个数据
func (parent *Observable) TakeLast(num int) (o *Observable) {tempNum = numo = parent.newFilterObservable("takeLast")o.flip_accept_error = trueo.flip_sup_ctx = truecount = 0o.operator = takeLastOpreturn o
}var takeLastOp = filteringOperator{opFunc: func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool) {count++if count <= tempNum {tempLasts2 = append(tempLasts2, item.Interface())} else {tempLasts2 = tempLasts2[1:]tempLasts2 = append(tempLasts2, item.Interface())}o.flip = tempLasts2return false},




go build




package mainimport ("fmt""time""github.com/user/rxgo"
)func main() {fmt.Print("Debounce: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Debounce(2).Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()fmt.Print("Distinct: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Distinct().Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()fmt.Print("ElementAt 3: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).ElementAt(3).Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()fmt.Print("First: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).First().Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()//filterfmt.Print("Filter value < 4: ")res := []int{}rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Filter(func(x int) bool {return x < 4}).Subscribe(func(x int) {res = append(res, x)})for i := range res {fmt.Print(res[i])fmt.Print(" ")}fmt.Println()fmt.Println()fmt.Print("IgnoreElements: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).IgnoreElements().Subscribe(func(x int) {fmt.Print(x)})fmt.Println()fmt.Println()fmt.Print("Last: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Last().Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()fmt.Print("Sample: ")observableP := make(chan interface{})go func() {rxgo.Just(1, 2).Map(func(x int) int {switch x {case 1:time.Sleep(10 * time.Millisecond)case 2:time.Sleep(5 * time.Millisecond)default:time.Sleep(10 * time.Millisecond)}return x}).Subscribe(func(x int) {observableP <- x})}()rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Map(func(x int) int {time.Sleep(3 * time.Millisecond)return x}).Sample(observableP).Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()fmt.Print("Skip 2: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Skip(2).Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()fmt.Print("SkipLast 2: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).SkipLast(2).Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()fmt.Print("Take 4: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Take(4).Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()fmt.Print("TakeLast 3: ")rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).TakeLast(3).Subscribe(func(x int) {fmt.Print(x)fmt.Print(" ")})fmt.Println()fmt.Println()}





package rxgo_testimport ("testing""github.com/stretchr/testify/assert""github.com/user/rxgo"
)func TestDebounce(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Map(func(x int) int {return x}).Debounce(1)ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{1}, res, "Debounce Test Errorr")
}func TestDistinct(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 2, 6).Map(func(x int) int {return x}).Distinct()ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{1, 8, 3, 4, 2, 0, 6}, res, "Distinct Test Errorr")
}func TestElementAt(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 6).Map(func(x int) int {return x}).ElementAt(2)ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{3}, res, "SkipLast Test Errorr")
}func TestFirst(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 6).Map(func(x int) int {return x}).First()ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{1}, res, "First Test Errorr")
}func TestIgnoreElements(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 6).Map(func(x int) int {return x}).IgnoreElements()ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{}, res, "IgnoreElementsTest Errorr")
}func TestLast(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 6).Map(func(x int) int {return x}).Last()ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{6}, res, "Last Test Errorr")
}func TestSkip(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 6).Map(func(x int) int {return x}).Skip(3)ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{4, 2, 0, 6}, res, "Skip Test Errorr")
}func TestSkipLast(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 6).Map(func(x int) int {return x}).SkipLast(3)ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{1, 8, 3, 4}, res, "SkipLast Test Errorr")
}func TestTake(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 6).Map(func(x int) int {return x}).Take(4)ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{1, 8, 3, 4}, res, "Take Test Errorr")
}func TestTakeLast(t *testing.T) {res := []int{}ob := rxgo.Just(1, 8, 3, 4, 2, 0, 6).Map(func(x int) int {return x}).TakeLast(4)ob.Subscribe(func(x int) {res = append(res, x)})assert.Equal(t, []int{4, 2, 0, 6}, res, "TakeLast Test Errorr")


[henryhzy@localhost rxgo]$ go test -v
=== RUN   TestDebounce
--- PASS: TestDebounce (0.00s)
=== RUN   TestDistinct
--- PASS: TestDistinct (0.00s)
=== RUN   TestElementAt
--- PASS: TestElementAt (0.00s)
=== RUN   TestFirst
--- PASS: TestFirst (0.00s)
=== RUN   TestIgnoreElements
--- PASS: TestIgnoreElements (0.00s)
=== RUN   TestLast
--- PASS: TestLast (0.00s)
=== RUN   TestSkip
--- PASS: TestSkip (0.00s)
=== RUN   TestSkipLast
--- PASS: TestSkipLast (0.00s)
=== RUN   TestTake
--- PASS: TestTake (0.00s)
=== RUN   TestTakeLast
--- PASS: TestTakeLast (0.00s)
ok      github.com/user/rxgo    0.005s
[henryhzy@localhost rxgo]$

五、中文 api 文档


git clone https://github.com/golang/tools $GOPATH/src/golang.org/x/tools
go build golang.org/x/tools


go install
go doc
godoc -url="pkg/github.com/user/rxgo" > API.html





  1. 课程博客
  2. ReactiveX官网

