
到目前为止,我们已经涵盖了Rx的全部知识点,并且我们已经有足够的知识开始使用Rx !尽管如此,许多开发人员还是希望在编写代码前编写测试用例。测试可以用来证明代码实际上满足了需求,提供了抵御回归的安全网,甚至可以帮助代码文档化。本章假设您熟悉依赖注入和单元测试的概念,如mock或stub。

Testing software has its roots in debugging and demonstrating code. Having largely matured past manual tests that try to "break the application", modern quality assurance standards demand a level of automation that can help evaluate and prevent bugs. While teams of testing specialists are common, more and more coders are expected to provide quality guarantees via automated test suites.

Up to this point, we have covered a broad scope of Rx, and we have almost enough knowledge to start using Rx in anger! Still, many developers would not dream of coding without first being able to write tests. Tests can be used to prove that code is in fact satisfying requirements, provide a safety net against regression and can even help document the code. This chapter makes the assumption that you are familiar with the concepts of dependency injection and unit testing with test-doubles, such as mocks or stubs.


  • 在测试场景中,调度(以及线程)通常被避免,因为它会引入可能导致非确定性测试的竞争条件
  • 测试应该尽可能快地运行
  • 对于许多人来说,Rx是一种新的技术/库。很自然地,当我们逐步掌握Rx的过程中,我们可能想重构一些以前的Rx代码。我们希望使用测试来确保重构没有改变代码库的内部行为
  • 同样,当我们升级Rx版本时,测试将保证功能没有被破坏。

Rx poses some interesting problems to our Test-Driven community:

  • Scheduling, and therefore threading, is generally avoided in test scenarios as it can introduce race conditions which may lead to non-deterministic tests
  • Tests should run as fast as possible
  • For many, Rx is a new technology/library. Naturally, as we progress on our journey to mastering Rx, we may want to refactor some of our previous Rx code. We want to use tests to ensure that our refactoring has not altered the internal behavior of our code base
  • Likewise, tests will ensure nothing breaks when we upgrade versions of Rx.


While we do want to test our code, we don't want to introduce slow or non-deterministic tests; indeed, the later would introduce false-negatives or false-positives. If we look at the Rx library, there are plenty of methods that involve scheduling (implicitly or explicitly), so using Rx effectively makes it hard to avoid scheduling. This LINQ query shows us that there are at least 26 extension methods that accept an IScheduler as a parameter.

var query = from method in typeof(Observable).GetMethods()
from parameter in method.GetParameters()
where typeof (IScheduler).IsAssignableFrom(parameter.ParameterType)
group method by method.Name
into morderby m.Keyselect m.Key;
foreach (var methodName in query)




























这些方法中的许多都有一个重载,它不使用IScheduler类型参数,而是使用他的一个默认实例。TDD/Test 程序员希望选择接受IScheduler的重载,这样他们就可以在测试中控制调度时序。我很快会解释为什么。


Many of these methods also have an overload that does not take an IScheduler and instead uses a default instance. TDD/Test First coders will want to opt for the overload that accepts the IScheduler, so that they can have some control over scheduling in our tests. I will explain why soon.

Consider this example, where we create a sequence that publishes values every second for five seconds.

var interval = Observable




If we were to write a test that ensured that we received five values and they were each one second apart, it would take five seconds to run. That would be no good; I want hundreds if not thousands of tests to run in five seconds. Another very common requirement is to test a timeout. Here, we try to test a timeout of one minute.

var never = Observable.Never<int>();

var exceptionThrown = false;



i => Console.WriteLine("This will never run."),

ex => exceptionThrown = true);



  1. 要么断言运行得太快,测试毫无意义,因为它总是失败,或者
  2. 为了进行准确的测试,我们必须增加一分钟的延迟


We have two problems here:

  1. either the Assert runs too soon, and the test is pointless as it always fails, or
  2. we have to add a delay of one minute to perform an accurate test

For this test to be useful, it would therefore take one minute to run. Unit tests that take one minute to run are not acceptable.




在本例中,我们使用简单的重载(schedule (Action))将任务调度到队列上,以便立即运行。然后,我们将虚拟时钟向前推进了一分钟。通过这样做,我们可以及时执行到那个时间点的所有计划。请注意,尽管我们计划立即执行一个操作,但是在手动推进时钟之前,它实际上不会执行。

To our rescue comes the TestScheduler; it introduces the concept of a virtual scheduler to allow us to emulate and control time.

A virtual scheduler can be conceptualized as a queue of actions to be executed. Each are assigned a point in time when they should be executed. We use the TestScheduler as a substitute, or test double, for the production IScheduler types. Using this virtual scheduler, we can either execute all queued actions, or only those up to a specified point in time.

In this example, we schedule a task onto the queue to be run immediately by using the simple overload (Schedule(Action)). We then advance the virtual clock forward by one tick. By doing so, we execute everything scheduled up to that point in time. Note that even though we schedule an action to be executed immediately, it will not actually be executed until the clock is manually advanced.

var scheduler = new TestScheduler();
var wasExecuted = false;
scheduler.Schedule(() => wasExecuted = true);
scheduler.AdvanceBy(1); //execute 1 tick of queued actions



Running and debugging this example may help you to better understand the basics of the TestScheduler.

The TestScheduler implements the IScheduler interface (naturally) and also extends it to allow us to control and monitor virtual time. We are already familiar with the IScheduler.Schedule methods, however the AdvanceBy(long), AdvanceTo(long) and Start() methods unique to the TestScheduler are of most interest. Likewise, the Clock property will also be of interest, as it can help us understand what is happening internally.

public class TestScheduler : ...
{//Implementation of ISchedulerpublic DateTimeOffset Now { get; }public IDisposable Schedule<TState>(TState state,Func<IScheduler, TState, IDisposable> action)public IDisposable Schedule<TState>(TState state,TimeSpan dueTime,Func<IScheduler, TState, IDisposable> action)public IDisposable Schedule<TState>(TState state,DateTimeOffset dueTime,Func<IScheduler, TState, IDisposable> action)//Useful extensions for testingpublic bool IsEnabled { get; private set; }public TAbsolute Clock { get; protected set; }public void Start()public void Stop()public void AdvanceTo(long time)public void AdvanceBy(long time)//Other methods...



The AdvanceTo(long) method will execute all the actions that have been scheduled up to the absolute time specified. The TestScheduler uses ticks as its measurement of time. In this example, we schedule actions to be invoked now, in 10 ticks, and in 20 ticks.

var scheduler = new TestScheduler();
scheduler.Schedule(() => Console.WriteLine("A"));
//Schedule immediatelys
cheduler.Schedule(TimeSpan.FromTicks(10), () => Console.WriteLine("B"));
scheduler.Schedule(TimeSpan.FromTicks(20), () => Console.WriteLine("C"));










Note that nothing happened when we advanced to 15 ticks. All work scheduled before 15 ticks had been performed and we had not advanced far enough yet to get to the next scheduled action.



The AdvanceBy(long) method allows us to move the clock forward a relative amount of time. Again, the measurements are in ticks. We can take the last example and modify it to use AdvanceBy(long).

var scheduler = new TestScheduler();
scheduler.Schedule(() => Console.WriteLine("A"));
//Schedule immediately
scheduler.Schedule(TimeSpan.FromTicks(10), () => Console.WriteLine("B"));
scheduler.Schedule(TimeSpan.FromTicks(20), () => Console.WriteLine("C"));











The TestScheduler's Start() method is an effective way to execute everything that has been scheduled. We take the same example again and swap out the AdvanceBy(long) calls for a single Start() call.

var scheduler = new TestScheduler();
scheduler.Schedule(() => Console.WriteLine("A"));
//Schedule immediately
scheduler.Schedule(TimeSpan.FromTicks(10), () => Console.WriteLine("B"));
scheduler.Schedule(TimeSpan.FromTicks(20), () => Console.WriteLine("C"));
Console.WriteLine("scheduler.Clock:{0}", scheduler.Clock);









Note that once all of the scheduled actions have been executed, the virtual clock matches our last scheduled item (20 ticks).

We further extend our example by scheduling a new action to happen after Start() has already been called.

var scheduler = new TestScheduler();
scheduler.Schedule(() => Console.WriteLine("A"));
scheduler.Schedule(TimeSpan.FromTicks(10), () => Console.WriteLine("B"));
scheduler.Schedule(TimeSpan.FromTicks(20), () => Console.WriteLine("C"));
Console.WriteLine("scheduler.Clock:{0}", scheduler.Clock);
scheduler.Schedule(() => Console.WriteLine("D"));








Note that the output is exactly the same; If we want our fourth action to be executed, we will have to call Start() again.




In previous releases of Rx, the Start() method was called Run(). Now there is a Stop() method whose name seems to imply some symmetry with Start(). All it does however, is set the IsEnabled property to false. This property is used as an internal flag to check whether the internal queue of actions should continue being executed. The processing of the queue may indeed be instigated by Start(), however AdvanceTo or AdvanceBy can be used too.

In this example, we show how you could use Stop() to pause processing of scheduled actions.

var scheduler = new TestScheduler();
scheduler.Schedule(() => Console.WriteLine("A"));
scheduler.Schedule(TimeSpan.FromTicks(10), () => Console.WriteLine("B"));
scheduler.Schedule(TimeSpan.FromTicks(15), scheduler.Stop);
scheduler.Schedule(TimeSpan.FromTicks(20), () => Console.WriteLine("C"));
Console.WriteLine("scheduler.Clock:{0}", scheduler.Clock);







Note that "C" never gets printed as we stop the clock at 15 ticks. I have been testing Rx successfully for nearly two years now, yet I have not found the need to use the Stop() method. I imagine that there are cases that warrant its use; however I just wanted to make the point that you do not have to be concerned about the lack of use of it in your tests.

调度冲突(Schedule collisions)


When scheduling actions, it is possible and even likely that many actions will be scheduled for the same point in time. This most commonly would occur when scheduling multiple actions for now. It could also happen that there are multiple actions scheduled for the same point in the future. The TestScheduler has a simple way to deal with this. When actions are scheduled, they are marked with the clock time they are scheduled for. If multiple items are scheduled for the same point in time, they are queued in order that they were scheduled; when the clock advances, all items for that point in time are executed in the order that they were scheduled.

var scheduler = new TestScheduler();
scheduler.Schedule(TimeSpan.FromTicks(10), () => Console.WriteLine("A"));
scheduler.Schedule(TimeSpan.FromTicks(10), () => Console.WriteLine("B"));
scheduler.Schedule(TimeSpan.FromTicks(10), () => Console.WriteLine("C"));
Console.WriteLine("scheduler.Clock:{0}", scheduler.Clock);








Note that the virtual clock is at 10 ticks, the time we advanced to.

测试Rx代码(Testing Rx code)


Now that we have learnt a little bit about the TestScheduler, let's look at how we could use it to test our two initial code snippets that use Interval and Timeout. We want to execute tests as fast as possible but still maintain the semantics of time. In this example we generate our five values one second apart but pass in our TestScheduler to the Interval method to use instead of the default scheduler.

public void Testing_with_test_scheduler()
{var expectedValues = new long[] {0, 1, 2, 3, 4};var actualValues = new List<long>();var scheduler = new TestScheduler();var interval = Observable.Interval(TimeSpan.FromSeconds(1), scheduler).Take(5);interval.Subscribe(actualValues.Add);scheduler.Start();CollectionAssert.AreEqual(expectedValues, actualValues);//Executes in less than 0.01s "on my machine"


While this is mildly interesting, what I think is more important is how we would test a real piece of code. Imagine, if you will, a ViewModel that subscribes to a stream of prices. As prices are published, it adds them to a collection. Assuming this is a WPF or Silverlight implementation, we take the liberty of enforcing that the subscription be done on the ThreadPool and the observing is executed on the Dispatcher.

public class MyViewModel : IMyViewModel
{private readonly IMyModel _myModel;private readonly ObservableCollection<decimal> _prices;public MyViewModel(IMyModel myModel){_myModel = myModel;_prices = new ObservableCollection<decimal>();}public void Show(string symbol){//TODO: resource mgt, exception handling etc..._myModel.PriceStream(symbol).SubscribeOn(Scheduler.ThreadPool).ObserveOn(Scheduler.Dispatcher).Timeout(TimeSpan.FromSeconds(10), Scheduler.ThreadPool).Subscribe(Prices.Add,ex=>{if(ex is TimeoutException)IsConnected = false;});IsConnected = true;}public ObservableCollection<decimal> Prices{get { return _prices; }}public bool IsConnected { get; private set; }

注入调度器依赖(Injecting scheduler dependencies)


While the snippet of code above may do what we want it to, it will be hard to test as it is accessing the schedulers via static properties. To help my testing, I have created my own interface that exposes the same IScheduler implementations that the Scheduler type does, i suggest you adopt this interface too.

public interface ISchedulerProvider
{IScheduler CurrentThread { get; }IScheduler Dispatcher { get; }IScheduler Immediate { get; }IScheduler NewThread { get; }IScheduler ThreadPool { get; }//IScheduler TaskPool { get; }

TaskPool属性是否应该包含取决于您的目标平台。如果您采用了这中方式,请根据您的命名约定来命名这个类型,例如,SchedulerService, Schedulers。用于生产环境下的默认实现如下:

Whether the TaskPool property should be included or not depends on your target platform. If you adopt this concept, feel free to name this type in accordance with your naming conventions e.g. SchedulerService, Schedulers. The default implementation that we would run in production is implemented as follows:

public sealed class SchedulerProvider : ISchedulerProvider
{public IScheduler CurrentThread{get { return Scheduler.CurrentThread; }}public IScheduler Dispatcher{get { return DispatcherScheduler.Instance; }}public IScheduler Immediate{get { return Scheduler.Immediate; }}public IScheduler NewThread{get { return Scheduler.NewThread; }}public IScheduler ThreadPool{get { return Scheduler.ThreadPool; }}//public IScheduler TaskPool { get { return Scheduler.TaskPool; } }


This now allows me to substitute implementations of ISchedulerProvider to help with testing. I could mock the ISchedulerProvider, but I find it easier to provide a test implementation. My implementation for testing is as follows.

public sealed class TestSchedulers : ISchedulerProvider


private readonly TestScheduler _currentThread = new TestScheduler();

private readonly TestScheduler _dispatcher = new TestScheduler();

private readonly TestScheduler _immediate = new TestScheduler();

private readonly TestScheduler _newThread = new TestScheduler();

private readonly TestScheduler _threadPool = new TestScheduler();

#region Explicit implementation of ISchedulerService

IScheduler ISchedulerProvider.CurrentThread { get { return _currentThread; } }

IScheduler ISchedulerProvider.Dispatcher { get { return _dispatcher; } }

IScheduler ISchedulerProvider.Immediate { get { return _immediate; } }

IScheduler ISchedulerProvider.NewThread { get { return _newThread; } }

IScheduler ISchedulerProvider.ThreadPool { get { return _threadPool; } }


public TestScheduler CurrentThread { get { return _currentThread; } }

public TestScheduler Dispatcher { get { return _dispatcher; } }

public TestScheduler Immediate { get { return _immediate; } }

public TestScheduler NewThread { get { return _newThread; } }

public TestScheduler ThreadPool { get { return _threadPool; } }



Note that ISchedulerProvider is implemented explicitly. This means that, in our tests, we can access the TestScheduler instances directly, but our system under test (SUT) still just sees the interface implementation. I can now write some tests for my ViewModel. Below, we test a modified version of the MyViewModel class that takes an ISchedulerProvider and uses that instead of the static schedulers from the Scheduler class. We also use the popular Moq framework in order to mock out our model.


public void SetUp()


_myModelMock = new Mock<IMyModel>();

_schedulerProvider = new TestSchedulers();

_viewModel = new MyViewModel(_myModelMock.Object, _schedulerProvider);



public void Should_add_to_Prices_when_Model_publishes_price()


decimal expected = 1.23m;

var priceStream = new Subject<decimal>();

_myModelMock.Setup(svc => svc.PriceStream(It.IsAny<string>())).Returns(priceStream);


//Schedule the OnNext

_schedulerProvider.ThreadPool.Schedule(() => priceStream.OnNext(expected));

Assert.AreEqual(0, _viewModel.Prices.Count);

//Execute the OnNext action


Assert.AreEqual(0, _viewModel.Prices.Count);

//Execute the OnNext handler


Assert.AreEqual(1, _viewModel.Prices.Count);

Assert.AreEqual(expected, _viewModel.Prices.First());



public void Should_disconnect_if_no_prices_for_10_seconds()


var timeoutPeriod = TimeSpan.FromSeconds(10);

var priceStream = Observable.Never<decimal>();

_myModelMock.Setup(svc => svc.PriceStream(It.IsAny<string>())).Returns(priceStream);


_schedulerProvider.ThreadPool.AdvanceBy(timeoutPeriod.Ticks - 1);






2 passed, 0 failed, 0 skipped, took 0.41 seconds (MSTest 10.0).


  1. 随着模型不断发送数据项,Price属性中不断添加新的价格
  2. 序列订阅到线程池上
  3. Price属性在Dispatcher上进行更新,及序列在Dispatcher上观察
  4. 十秒内没有给ViewModel设置价格数据,则超时断开
  5. 测试运行得很快。虽然运行测试的时间并不是很长,但大部分时间似乎都花在了测试工具的本身。此外,将测试计数增加到10只会增加0.03秒。通常,在现代的CPU上,我希望看到单元测试以每秒+1000个测试的速度运行

These two tests ensure five things:

  1. That the Price property has prices added to it as the model produces them
  2. That the sequence is subscribed to on the ThreadPool
  3. That the Price property is updated on the Dispatcher i.e. the sequence is observed on the Dispatcher
  4. That a timeout of 10 seconds between prices will set the ViewModel to disconnected.
  5. The tests run fast. While the time to run the tests is not that impressive, most of that time seems to be spent warming up my test harness. Moreover, increasing the test count to 10 only adds 0.03seconds. In general, on a modern CPU, I expect to see unit tests run at a rate of +1000 tests per second


在某些场景中,您对调度器不感兴趣,希望将测试集中在其他功能上。如果是这样,那么您可能想要创建ISchedulerProvider的另一个测试实现,它的所有成员都返回ImmediateScheduler 。这可以帮助你减少测试中的噪音。

Usually, I would not have more than one assert/verify per test, but here it does help illustrate a point. In the first test, we can see that only once both the ThreadPool and the Dispatcher schedulers have been run will we get a result. In the second test, it helps to verify that the timeout is not less than 10 seconds.

In some scenarios, you are not interested in the scheduler and you want to be focusing your tests on other functionality. If this is the case, then you may want to create another test implementation of the ISchedulerProvider that returns the ImmediateScheduler for all of its members. That can help reduce the noise in your tests.

public sealed class ImmediateSchedulers : ISchedulerService


public IScheduler CurrentThread { get { return Scheduler.Immediate; } }

public IScheduler Dispatcher { get { return Scheduler.Immediate; } }

public IScheduler Immediate { get { return Scheduler.Immediate; } }

public IScheduler NewThread { get { return Scheduler.Immediate; } }

public IScheduler ThreadPool { get { return Scheduler.Immediate; } }


高级特性 - ITestableObserver


The TestScheduler provides further advanced features. I find that I am able to get by quite well without these methods, but others may find them useful. Perhaps this is because I have found myself accustomed to testing without them from using earlier versions of Rx.


Start函数有三个重载,它们用于在给定时间启动一个可观察序列,记录它发出的通知并在给定时间进行订阅和释放订阅。这在一开始可能会让人困惑,因为这与没有参数的Start重载是完全不相关的。这三个重载返回一个ITestableObserver,它允许你记录来自一个可观察序列的通知,很像我们在Transformation chapter中看到的Materialize方法。

There are three overloads to Start, which are used to start an observable sequence at a given time, record the notifications it makes and dispose of the subscription at a given time. This can be confusing at first, as the parameterless overload of Start is quite unrelated. These three overloads return an ITestableObserver<T> which allows you to record the notifications from an observable sequence, much like the Materialize method we saw in the Transformation chapter.

public interface ITestableObserver<T> : IObserver<T>


// Gets recorded notifications received by the observer.

IList<Recorded<Notification<T>>> Messages { get; }



  1. 一个可观测序列工厂委托
  2. 指定执行工厂的时间
  3. 指定订阅工厂返回的可观测序列的时间
  4. 指定释放订阅的时间

While there are three overloads, we will look at the most specific one first. This overload takes four parameters:

  1. an observable sequence factory delegate
  2. the point in time to invoke the factory
  3. the point in time to subscribe to the observable sequence returned from the factory
  4. the point in time to dispose of the subscription


The time for the last three parameters is measured in ticks, as per the rest of the TestScheduler members.

public ITestableObserver<T> Start<T>(

Func<IObservable<T>> create,

long created,

long subscribed,

long disposed)



We could use this method to test the Observable.Interval factory method. Here, we create an observable sequence that spawns a value every second for 4 seconds. We use the TestScheduler.Start method to create and subscribe to it immediately (by passing 0 for the second and third parameters). We dispose of our subscription after 5 seconds. Once the Start method has run, we output what we have recorded.

var scheduler = new TestScheduler();

var source = Observable.Interval(TimeSpan.FromSeconds(1), scheduler)


var testObserver = scheduler.Start(

() => source,




Console.WriteLine("Time is {0} ticks", scheduler.Clock);

Console.WriteLine("Received {0} notifications", testObserver.Messages.Count);

foreach (Recorded<Notification<long>> message in testObserver.Messages)


Console.WriteLine("{0} @ {1}", message.Value, message.Time);



Time is 50000000 ticks

Received 5 notifications

OnNext(0) @ 10000001

OnNext(1) @ 20000001

OnNext(2) @ 30000001

OnNext(3) @ 40000001

OnCompleted() @ 40000001



Note that the ITestObserver<T> records OnNext and OnCompleted notifications. If the sequence was to terminate in error, the ITestObserver<T> would record the OnError notification instead.

We can play with the input variables to see the impact it makes. We know that the Observable.Interval method is a Cold Observable, so the virtual time of the creation is not relevant. Changing the virtual time of the subscription can change our results. If we change it to 2 seconds, we will notice that if we leave the disposal time at 5 seconds, we will miss some messages.

var testObserver = scheduler.Start(

() => Observable.Interval(TimeSpan.FromSeconds(1), scheduler).Take(4),





Time is 50000000 ticks

Received 2 notifications

OnNext(0) @ 30000000

OnNext(1) @ 40000000




We start the subscription at 2 seconds; the Interval produces values after each second (i.e. second 3 and 4), and we dispose on second 5. So we miss the other two OnNext messages as well as the OnCompleted message.

There are two other overloads to this TestScheduler.Start method.

public ITestableObserver<T> Start<T>(Func<IObservable<T>> create, long disposed)


if (create == null)

throw new ArgumentNullException("create");


return this.Start<T>(create, 100L, 200L, disposed);


public ITestableObserver<T> Start<T>(Func<IObservable<T>> create)


if (create == null)

throw new ArgumentNullException("create");


return this.Start<T>(create, 100L, 200L, 1000L);



As you can see, these overloads just call through to the variant we have been looking at, but passing some default values. I am not sure why these default values are special; I can not imagine why you would want to use these two methods, unless your specific use case matched that specific configuration exactly.



Just as we can record an observable sequence, we can also use CreateColdObservable to playback a set of Recorded<Notification<int>>. The signature for CreateColdObservable simply takes a params array of recorded notifications.

// Creates a cold observable from an array of notifications.

// Returns a cold observable exhibiting the specified message behavior.

public ITestableObservable<T> CreateColdObservable<T>(

params Recorded<Notification<T>>[] messages)


CreateColdObservable 返回 ITestableObservable<T>. 这个接口继承了IObservable<T> ,添加了订阅List和产生的消息List。

The CreateColdObservable returns an ITestableObservable<T>. This interface extends IObservable<T> by exposing the list of "subscriptions" and the list of messages it will produce.

public interface ITestableObservable<T> : IObservable<T>


// Gets the subscriptions to the observable.

IList<Subscription> Subscriptions { get; }

// Gets the recorded notifications sent by the observable.

IList<Recorded<Notification<T>>> Messages { get; }


使用CreateColdObservable, 可以模拟Observable.Interval.

Using CreateColdObservable, we can emulate the Observable.Interval test we had earlier.

var scheduler = new TestScheduler();

var source = scheduler.CreateColdObservable(

new Recorded<Notification<long>>(10000000, Notification.CreateOnNext(0L)),

new Recorded<Notification<long>>(20000000, Notification.CreateOnNext(1L)),

new Recorded<Notification<long>>(30000000, Notification.CreateOnNext(2L)),

new Recorded<Notification<long>>(40000000, Notification.CreateOnNext(3L)),

new Recorded<Notification<long>>(40000000, Notification.CreateOnCompleted<long>())


var testObserver = scheduler.Start(

() => source,




Console.WriteLine("Time is {0} ticks", scheduler.Clock);

Console.WriteLine("Received {0} notifications", testObserver.Messages.Count);

foreach (Recorded<Notification<long>> message in testObserver.Messages)


Console.WriteLine(" {0} @ {1}", message.Value, message.Time);



Time is 50000000 ticks

Received 5 notifications

OnNext(0) @ 10000001

OnNext(1) @ 20000001

OnNext(2) @ 30000001

OnNext(3) @ 40000001

OnCompleted() @ 40000001


Note that our output is exactly the same as the previous example with Observable.Interval.




We can also create hot test observable sequences using the CreateHotObservable method. It has the same parameters and return value as CreateColdObservable; the difference is that the virtual time specified for each message is now relative to when the observable was created, not when it is subscribed to as per the CreateColdObservable method.

This example is just that last "cold" sample, but creating a Hot observable instead.

var scheduler = new TestScheduler();

var source = scheduler.CreateHotObservable(

new Recorded<Notification<long>>(10000000, Notification.CreateOnNext(0L)),



Time is 50000000 ticks

Received 5 notifications

OnNext(0) @ 10000000

OnNext(1) @ 20000000

OnNext(2) @ 30000000

OnNext(3) @ 40000000

OnCompleted() @ 40000000



Note that the output is almost the same. Scheduling of the creation and subscription do not affect the Hot Observable, therefore the notifications happen 1 tick earlier than their Cold counterparts.

We can see the major difference a Hot Observable bears by changing the virtual create time and virtual subscribe time to be different values. With a Cold Observable, the virtual create time has no real impact, as subscription is what initiates any action. This means we can not miss any early message on a Cold Observable. For Hot Observables, we can miss messages if we subscribe too late. Here, we create the Hot Observable immediately, but only subscribe to it after 1 second (thus missing the first message).

var scheduler = new TestScheduler();

var source = scheduler.CreateHotObservable(

new Recorded>Notification>long<<(10000000, Notification.CreateOnNext(0L)),

new Recorded>Notification>long<<(20000000, Notification.CreateOnNext(1L)),

new Recorded>Notification>long<<(30000000, Notification.CreateOnNext(2L)),

new Recorded>Notification>long<<(40000000, Notification.CreateOnNext(3L)),

new Recorded>Notification>long<<(40000000, Notification.CreateOnCompleted>long<())


var testObserver = scheduler.Start(

() =< source,




Console.WriteLine("Time is {0} ticks", scheduler.Clock);

Console.WriteLine("Received {0} notifications", testObserver.Messages.Count);

foreach (Recorded>Notification>long<< message in testObserver.Messages)


Console.WriteLine(" {0} @ {1}", message.Value, message.Time);



Time is 50000000 ticks

Received 4 notifications

OnNext(1) @ 20000000

OnNext(2) @ 30000000

OnNext(3) @ 40000000

OnCompleted() @ 40000000




Finally, if you do not want to use the TestScheduler.Start methods, and you need more fine-grained control over your observer, you can use TestScheduler.CreateObserver(). This will return an ITestObserver that you can use to manage the subscriptions to your observable sequences with. Furthermore, you will still be exposed to the recorded messages and any subscribers.

Current industry standards demand broad coverage of automated unit tests to meet quality assurance standards. Concurrent programming, however, is often a difficult area to test well. Rx delivers a well-designed implementation of testing features, allowing deterministic and high-throughput testing. The TestScheduler provides methods to control virtual time and produce observable sequences for testing. This ability to easily and reliably test concurrent systems sets Rx apart from many other libraries.

Rx第六部分 测试相关推荐

  1. 2017-2018-1 20155308 《信息安全系统设计基础》课堂第六章测试(补做)

    2017-2018-1 20155308 <信息安全系统设计基础>课堂第六章测试 1 下面代码中,对数组x填充后,采用直接映射高速缓存,所有对x和y引用的命中率为() A. 1 B. 1/ ...

  2. 小学生体测测试环境怎么填_小学体测在各个学校展开 最新六年级测试项目及评价标准表一览...

    ­ 体测来了 体育锻炼又热了 ­ 每年一度的小学体育测试在各个学校开始. ­ 随之,各个小区楼下练习跳绳的小学生们逐渐增多,家长们也都重视起来,"亲训"的.陪练的.报班的--渠道不 ...

  3. 2022SDUT知到/智慧树----C语言第六章测试题解

    ** 第六章测试 ** 1[判断题] (10分) 有下列程序段,程序段运行后的输出结果##2##3##4##5( ). int k; for (k=2;k<6;k++,k++) printf(& ...

  4. 与六年测试工程师促膝长谈,他分享的这些让我对软件测试工作有了全新的认知~

    不知不觉已经从事软件测试六年了,2016年毕业到进入外包公司外包给微软做软件测试, 到现在加入著名的外企.六年的时间过得真快.长期的测试工作也让我对软件测试有了比较深入的认识.但是我至今还是一个底层的 ...

  5. 单元测试 代码里面都绝对路径怎么处理_原创 | 编写单元测试和实践TDD (六)测试哪些内容:Right-BICEP...

    上一章通过实例讲了"第一个单元测试"到底应该怎么做,这一章我们讲讲"对一个工作单元需要测试它哪些方面的内容"? 有6个值得测试的部位,统称为:Right-BIC ...

  6. 原创 | 使用JUnit、AssertJ和Mockito编写单元测试和实践TDD (六)测试哪些内容:Right-BICEP

    上一章通过实例讲了"第一个单元测试"到底应该怎么做,这一章我们讲讲"对一个工作单元需要测试它哪些方面的内容"? 有6个值得测试的部位,统称为:Right-BIC ...

  7. 思科 计算机网络 第六章测试考试答案

    测试 1.如果有两个或多个路由均可到达同一个目的网络,度量指标用于决定要在路由表中使用的路由. 请参见图示.填空题. 离开 PC-1 的数据包必须经过3跳才能到达 PC-4. 3.填空题. 缩写词NA ...

  8. 云开发微信小程序 - 最近火到爆的的MBTI十六人格测试

    写在开头 - 什么是MBTI人格测试? 迈尔斯-布里格斯类型指标(Myers–Briggs Type Indicator,MBTI)是由美国作家伊莎贝尔·布里格斯·迈尔斯和她的母亲凯瑟琳·库克·布里格 ...

  9. Python入门(二十六)测试(一)

    测试(一) 1.概述 2.测试函数 2.1 单元测试和测试用例 2.2 可通过的测试 2.3 未通过的测试 2.4 测试未通过怎办 2.5 添加新测试 1.概述 编写函数或类时,还可为其编写测试.通过 ...


  1. Unsupervised Feature Selection in Signed Social Networks 阅读笔记
  2. 初等数论--原根--a^k对模m的阶
  3. 胡珀:从危到机,AI 时代下的安全挑战
  4. 方立勋_30天掌握JavaWeb_JavaBean、mvc开发模式、el表达式、jstl标签
  5. 侧边栏_第四课 侧边栏和过滤器
  6. 【实用工具】交叉编译android版本的GDB
  7. JS中移动端项目取余数和switch于PC端的不同
  8. freemarker mysql 生成bean_基于数据库的代码自动生成工具,生成JavaBean、生成数据库文档、生成前后端代码等(v6.6.6版)...
  9. 阶段3 1.Mybatis_04.自定义Mybatis框架基于注解开发_1 今日课程内容介绍
  10. 19.1.27 laravel框架学习笔记
  11. Java web 实战项目案例
  12. Vue登录页面源代码分享
  13. log4j日志级别小结
  14. key去掉下划线自动大写首字母工具类
  15. 远程桌面链接怎么共享本地磁盘
  16. windows查看电池损耗
  17. 带你手摸手搭建vuepress站点
  18. 3D游戏设计-智能巡逻兵
  19. 牛津英语字典pdf下载_从1到18岁,这款牛津认证的免费APP是学英语最好的装备
  20. 杭州 职称 计算机免试,浙职称评审政策调整外语计算机免考年限有变动


  1. 高飞助教介绍的他们所用无人机机架组成
  2. Spring+SpringMVC+Jsp实现校园二手交易系统
  3. 【线上直播】微生物组学数据分析与挖掘专题会议
  4. ​IBM、Google、Oracle三巨头的公有云之殇(下)
  5. gt710显卡驱动linux,Ubuntu18.04导入nVidiaGT710显卡
  6. 想要从编程小白成为达人,这些你必须知道!(附STM32学习指南)
  7. 7-29 二分法求多项式单根 (20 分)
  8. 下拉框无法收回的解决方法:focus-outside使用方式
  9. 数学在计算机图形学中的应用
  10. 药店计算机信息系统知识培训,的药店信息管理系统.docx