1、官方源码的不足

HelloInsightObservable官方源码利用toToPointStream方法将观察者的实例转化为点事件流,接着在点事件流中使用linq查询e>50的输入,并将其输出

运行结果如下:

其不足之处在于代码有点混乱,而且只有一个观察者。

接下来本文就逐步修改,并且实现多个观察者的情况。

program.cs中定义观察者,将观察者“订阅”到目标对象的语句如下:

var outputObserver = new OutputObserver();

var outputObservable = query.ToObservable();//将事件流转化为可观察的输出

outputObservable.Subscribe(outputObserver);//提供通知信息到outputObserver

那容易想到的思路是直接在program.cs中添加多个观察者,再使用Subscribe方法订阅多个观察者。但是输出每次都有变动,由于不同观察者输出一样也看不出明显规律,偶尔还会由于枚举观察者的过程中观察者集合变动而产生异常

这是因为InputObservable.cs中模拟输入流的GenerateInput是Timer的回调函数。每一个观察者在运行之后都会将Timer设为停止状态,别的观察者在Timer已经启动的情况下加入不是很恰当。令人奇怪的是官方源码在InputObservable. cs的构造函数中启动了Timer,既然没打算添加多个观察者,那在GenerateInput中遍历观察者集合Observers的语句有什么意义?

2、修改OutputObserver,添加name属性

2.1、新建项目C#控制台应用HelloInsight_edit

添加如下引用:

Microsoft.ComplexEventProcessing;

Microsoft.ComplexEventProcessing.Observable;

System.Reactive;

System.Reactive.Providers;

2.2、实现接口IObserver

namespace HelloInsight_edit

{

public class OutputObserver:IObserver//实现IObserver接口

{

private string name;

public OutputObserver(string name){

this.name = name;

}

public virtual void OnCompleted()

{

Console.WriteLine("Stopping query...");

}

public virtual void OnError(Exception e)

{

Console.WriteLine("Unexpected error occured");

}

public virtual void OnNext(int value)

{

Console.WriteLine("{0}观察到的value: {1}", this.name,value);

}

}

}

为简单起见,IObserver的抽象类型都使用int型,以后Main方法创建事件流的时候也会相应修改。

3、修改事件源,实现IObservable接口

我们要删掉构造方法中的timer.change(timeSpan,timeSpan),新建了update方法,用来调用这句话。这样可以使得多个observer都添加到observers中之后再启动Timer。

public class EventSource:IObservable

{

private List> observers = new List>();

private readonly int dataNumber;

private int generatedNumber;

private Random random;

private readonly Timer timer;

private readonly int timeSpan;

//add

private int _randomNumber;

public EventSource(int dataNumber)

{

Console.WriteLine("我是构造方法");

this.random = new Random();

this.dataNumber = dataNumber;

this.generatedNumber = 0;

this.timer = new Timer(GenerateInput);//callback是一个委托,表示要执行的方法

this.timeSpan = 100;//每个随机数字产生的时间间隔 1000ms

//timer.Change(timeSpan, timeSpan);//此语句控制数据

this._randomNumber = -1;//初始化随机数字

}

public int RandomNumber

{

get { return _randomNumber; }

set { this._randomNumber = value; }

}

public void Update()

{

timer.Change(timeSpan, timeSpan);

}

private void GenerateInput(object _)

{

foreach (var observer in observers)

{

_randomNumber= random.Next(100);

Console.WriteLine("Random generated data {0} : {1}", generatedNumber, _randomNumber);

observer.OnNext(_randomNumber);

generatedNumber++;

if (generatedNumber >= dataNumber)

{

observer.OnCompleted();

timer.Change(Timeout.Infinite, timeSpan);

return;

}

}

timer.Change(timeSpan, timeSpan);

}

public void AddObserver(IObserver observer)

{

observers.Add(observer);

}

public void RemoveObserver(IObserver observer)

{

observers.Remove(observer);

}

//必须实现的方法

public IDisposable Subscribe(IObserver observer)

{

if (observer != null && !observers.Contains(observer))

{

observers.Add(observer);

}

Console.WriteLine("我是subscriber");

return observer as IDisposable;

}

}

4、修改program.cs

将输入源的实例es转化为点事件流stream,query过滤得到stream中大于50的事件流,query2过滤得到stream大于70的事件流。建立了3个观察者roger、luffy和nami,我们用luffy观察query,用nami观察query2。

修好program.cs之后就可以调试了噢耶……

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

//add

using Microsoft.ComplexEventProcessing;

using Microsoft.ComplexEventProcessing.Linq;

namespace HelloInsight_edit

{

class Program

{

static void Main(string[] args)

{

//将EventSource类作为CEP引擎的输入。

EventSource es = new EventSource(10);

var server = Server.Create("Default");

var application = server.CreateApplication("Observable Application");

//注意以下4行,这里与适配器方式的程序不同的是,没有插入CTI事件。

var stream = es.ToPointStream(application,

e => PointEvent.CreateInsert(DateTime.Now, e),

AdvanceTimeSettings.StrictlyIncreasingStartTime,

"Observable Stream");

var query = from e in stream

where e > 50

select e;

OutputObserver roger = new OutputObserver("roger");

OutputObserver luffy = new OutputObserver("luffy");

OutputObserver nami = new OutputObserver("nami");

Console.WriteLine("Starting query...");

//直接对原始流添加观察者

//es.AddObserver(roger); es.AddObserver(luffy); es.AddObserver(nami);

//对newStream添加观察者

var newStream = query.ToObservable();

newStream.Subscribe(luffy); //newStream.Subscribe(nami);//添加多个订阅者可能会有异常

//对newStream2添加观察者

var query2 = from e in stream

where e > 70

select e;

var newStream2 = query2.ToObservable();

newStream2.Subscribe(nami);

//调用timer.change(定义callback的等待时间和时间间隔)

es.Update();

Console.ReadLine();

}

}

}

运行结果:

可以看出,Subscribe两个观察者的操作先执行。

在遍历观察者集合observers的过程中,每组显示2个随机数。luffy和nami依次观察第一个和第二个。

{?个人理解为newStream.Subscribe(luffy);的功能类似于一个绑定了luffy的线程,遍历结束之后全部用户开始依次输出。全局变量generatedNumber负责整体次数}

这不是我们要的功能。

对于流中每个事件,不同观察者都观察到才行。

4.1、重写GenerateInput(object _)

将生成随机数的语句放到遍历操作foreach之前

private void GenerateInput(object _)

{

_randomNumber = random.Next(100);

if (generatedNumber <= dataNumber)

{

Console.WriteLine("Random generated data {0} : {1}", generatedNumber, _randomNumber);

foreach (var observer in observers) observer.OnNext(_randomNumber);//使用最大程度实现的OnNext

}

else

{

observers.ElementAt(0).OnCompleted();

timer.Change(Timeout.Infinite, timeSpan);

}

generatedNumber++;

timer.Change(timeSpan, timeSpan);

}

运行结果:

可以看出,对于流中每个事件,luffy检测到了大于50的事件,nami检测到了大于70的事件,实现了预定的目标。

{!接下来我们要将观察者模式、点事件流检测和WCF(Windows Communication Foundation)相结合,实现事件源和观察者WCF通信,便于接下来部署到网络中}

5、参考资料

[1]IObserver接口

[2]IDisposable接口

[3]virtual方法

matlab cep,【CEP】重构和改进HelloInsightObservable相关推荐

  1. 基于matlab蓝牙跳频系统,基于Matlab软件的蓝牙跳频改进算法

    60 基于Matlab软件的蓝牙跳频改进算法 [毛淑华 岩淑霞 雷伯录] 介绍了蓝牙跳频系统对跳频序列的要求,并且提出了一种改进的自适应跳频方案.基于C语言和MATLAB工具对原跳频系统和改进后系统分 ...

  2. 机器学习 | MATLAB实现PSO-IELM粒子群改进极限学习机回归和分类预测

    分机器学习 | MATLAB实现PSO-IELM粒子群改进极限学习机回归和分类预测 目录 分机器学习 | MATLAB实现PSO-IELM粒子群改进极限学习机回归和分类预测 基本介绍 程序设计 回归主 ...

  3. 【图像分割】基于matlab形态学重建和过滤改进FCM算法(FRFCM)的图像分割【含Matlab源码 085期】

    ⛄一.简介 首先,通过引入形态学重构操作将图像的局部空间信息纳入FRFCM中,以保证抗噪性和图像细节保留.其次,基于局部空间邻居和聚类中心内像素之间距离的成员资格分区的修改被仅依赖于成员资格分区的空间 ...

  4. 【MATLAB第6期】基于MATLAB的粒子群及若干改进的粒子群算法原理介绍 持续更新

    一.经典粒子群PSO算法 1 思想来源 粒子群优化(Particle Swarm Optimization,PSO) 作为进化计算的一个分支,是由 Eberhart 和 Kennedy 于 1995 ...

  5. matlab重排矩阵,重构和重新排列数组

    重构 reshape 函数可以更改数组的大小和形状.例如,将 3×4 矩阵重构成 2×6 矩阵. A = [1 4 7 10; 2 5 8 11; 3 6 9 12] A = 3×4 1 4 7 10 ...

  6. matlab遗传算法无人机问题,基于改进遗传算法的无人机路径规划

    [1] 杨陆强, 果霖, 朱加繁, 等. 我国农用无人机发展概况与展望[J]. 农机化研究,2017,39(8):6-11.(YANG L Q,GUO L, ZHU J F, et al. The d ...

  7. 相空间重构 matlab 程序源,matlab求相空间重构延迟时间和嵌入维数

    关联积分计算 function C_I=correlation_integral(X,M,r) %该函数用来计算关联积分 %C_I:关联积分的返回值 %X:重构的相空间矢量,是一个m*M的矩阵 %M: ...

  8. 【Matlab WSN通信】A_Star改进LEACH多跳传输协议【含源码 487期】

    一.代码运行视频(哔哩哔哩) [Matlab路径规划]蚁群算法机器人大规模栅格地图最短路径规划[含源码 1860期] 二.蚁群算法及栅格地图简介 随着机器人技术在诸多领域的应用, 如机器人协作焊接.灾 ...

  9. matlab 光度 三维重构 code,基于matlab的三维点云数据三维重建

    基于matlab的三维点云数据三维重建,通过快速构建三角网,实现三维模型的建立,并提供实例数据用于实验. 三维重建的英文术语名称是3D Reconstruction. 三维重建是指对三维物体建立适合计 ...

最新文章

  1. Atitit 实现java的linq 以及与stream api的比较
  2. 5.7 随机采样最小二乘法
  3. linux私有组信息存放在哪,【Linux】Linux私有组,主要组和附加组
  4. select,poll,epoll区别
  5. 智能优化算法:适应度相关优化算法 - 附代码
  6. dvwa学习笔记之xss
  7. 专家看台:阿里软件研发总监叶伟:如何处理技术和需求的矛盾
  8. plsqldev显示语言有问题
  9. 做webgl遇到的两个坑
  10. 芯片行业常用英文术语最详细总结(图文快速掌握)
  11. 【Unity3D开发小游戏】Unity3D零基础一步一步教你制作跑酷类游戏
  12. ui设计一般用什么软件(ui学哪些软件)
  13. python下载vip素材_Python下载素材脚本
  14. 不用找,你想要的手抄报 小报印刷模板素材都在这里
  15. 从“真快乐”APP看国美的野心,不止娱乐零售
  16. 04.奇特的一生(笔记)
  17. 数据分析模型:OGSM模型
  18. 傻瓜式ensp380启用NGFW USG6000v教程,并附web网管教程
  19. 华为HCIE云计算之FA桌面云业务发放
  20. IGARSS2019-项目实战总结-keras

热门文章

  1. linux多系统更改启动顺序
  2. Resnet的pytorch官方实现代码解读
  3. vs2013 error MSB8031 MBCSMFC问题的解决
  4. rn webview加载本地静态html,RNwebview加载本地html.htm
  5. [云炬创业基础笔记]第七张创业团队测试8
  6. [云炬ThinkPython阅读笔记]1.3 第一个程序
  7. 编程打怪升级之路2018-06-01
  8. 实验技术杂志文献20180126
  9. [2DPIC调试笔记]parameter_antenna_radiation1013(3)
  10. xamarin textview 滚动_微软测试 Win10 Chromium/Edge CPU 优化和滚动新效果