对于StreamInsight这种不是很线性的架构,最好还是直接拿出来一个例子,简单但完整的把流程走过一遍,更能看清所谓“流”、“事件”、“适配器”之类到底是什么东西,有什么关系。

官方例子下载地址:http://go.microsoft.com/fwlink/?LinkId=180356,这里就理一遍其中最简单的例子:TrafficJoinQuery

场景描述

这个例子的场景可以描述为:有九个测速器,编号为1001~1009,分别放置在3个地点。每个测速器每20s会记录下这20s内通过的车辆数以及它们的平均速度。现在要统计出每个测速器记录的一分钟内车辆数的平均数:

比如1001号测速器,10:00:00~10:00:20记录了20辆车,10:00:20~10:00:40记录了15辆车,10:00:40~10:01:00记录了25辆车,10:01:00~10:01:20记录了5辆车,那么1001号测速器在10:00:00~10:01:00这一分钟内车辆数的平均数就是(20+15+25)/3=20,而在10:00:20~10:01:20这一分钟内车辆数的平均数就是(15+25+5)/3=15。

这里最重要的就是搞清每一次计数的时候,哪些数据是包括其中的

提供的数据是两个csv文件,一个是包含了时间、测速器编号、车数、车速的日志文件,另一个是测速器编号与所在地点(1,2,3)对应的表。最终的结果在对第一张表的聚合计算的基础上,再把这两张表连接起来。

准备工作

当然要先安装StreamInsight http://msdn.microsoft.com/zh-cn/library/ee378749.aspx 。然后注意把下载下来的例子里的

using (Server server = Server.Create("Default"))

改成

using (Server server = Server.Create("XXXXX"))

其中XXXXX就是你的StreamInsight的实例名。 如果想使用 Connect的方法的话,需要先开启一个 Host,提供一个 EndPoint :

Server serverInsight = Server.Create("StreamInsight");
ServiceHost host = new ServiceHost(serverInsight.CreateManagementService());
WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message);
binding.HostNameComparisonMode = HostNameComparisonMode.Exact;
host.AddServiceEndpoint(typeof(IManagementService),binding,"http://localhost:80/StreamInsight/StreamInsight");
host.Open();

然后在程序中通过

using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress(@http://localhost/StreamInsight/StreamInsight))) 

连接到EndPoint。

适配器

例子的Solution下包括三个项目,其中“SimpleTextFileReader”和“SimpleTextFileWriter”是两个适配器项目,分别对应输出、输入适配器。从例子中可以看出,推荐的做法是适配器项目与主程序项目独立,这样能很容易的切换适配器

查看这两个项目,可以看出输入适配器与输出适配器的结构是类似的,都包含一个工厂 Factory 类,一个提供配置信息的 Config 类,三个分别对应三种事件模型的适配器。

Factory

对于输出适配器,Factory类要完成的就是用Create方法,根据输入的事件模型(EventShape)来返回对应的适配器。而输入适配器的Factory类由于应用了 IDeclareAdvanceTimeProperties 接口,还要额外实现 DeclareAdvanceTimeProperties 方法来进行一些配置,主要是CTI事件的生成频率、延迟时长以及超时事件的处理策略的配置。具体可参见代码中的注释和 AdvanceTimeGenerationSettings 以及 AdapterAdvanceTimeSettings 这两个类的构造函数在 MSDN 中的解释。

Config

虽然一般 Config 类都带有"Config"的后缀,但事实上 Config 类并没有统一的基类或者接口。它的作用就是由外部传递一些配置信息给 Factory 并进一步传递到适配器中。

一般来说 Config 类中不包含公开的方法,而是由一些基本类型的属性构成。

在这个例子中,TextFileReaderConfig 类中配置了输入文件的名称(InputFileName),列的分隔符(Delimiter),文件的文化属性(CultureName),各列的顺序(InputFieldOrders),它们的用处可以在适配器中看到。而 CtiFrequency 则指明了 CTI 事件的频率,作用于 TextFileReaderFactory 。

Adapter

不同的事件模型对应的适配器,其代码往往是类似的。比照 SimpleTextFileReader 工程下的三个适配器类,我们会发现除了 CreateEventFromLine 方法内部有不同,其他都是近似甚至一样的。

这里关键的方法是 ProduceEvents,Start 方法和 Resume 方法都调用了这个方法:

/// <summary>
/// Main driver to read events from the CSV file and enqueue them.
/// </summary>
private void ProduceEvents()
{IntervalEvent currentEvent = default(IntervalEvent);try{// Keep reading lines from the file.while (true){if (AdapterState.Stopping == AdapterState){Stopped();return;}// Did we enqueue the previous line successfully?if (this.currentLine == null){this.currentLine = this.streamReader.ReadLine();if (this.currentLine == null){// Stop adapter (and hence the query) at the end of the file.Stopped();return;}}try{// Create and fill event structure with data from text file line.currentEvent = this.CreateEventFromLine(this.currentLine);// In case we just went into the stopping state.if (currentEvent == null){continue;}}catch (Exception e){// The line couldn't be transformed into an event.// Just ignore it, and release the event's memory.ReleaseEvent(ref currentEvent);this.consoleTracer.WriteLine(this.currentLine + " could not be read into a CEP event: " + e.Message);// Make sure we read a new line next time.this.currentLine = null;continue;}if (EnqueueOperationResult.Full == Enqueue(ref currentEvent)){// If the enqueue was not successful, we keep the event.// It is good practice to release the event right away and// not hold on to it.ReleaseEvent(ref currentEvent);// We are suspended now. Tell the engine we are ready to be resumed.Ready();// Leave thread to wait for call into Resume().return;}// Enqueue was successful, so we can read a new line again.this.currentLine = null;}}catch (AdapterException e){this.consoleTracer.WriteLine("ProduceEvents - " + e.Message + e.StackTrace);}
}

在 While 循环中每次从日志文件中读取一行记录,然后利用 CreateEventFromLine 方法将该行记录转化为相应的事件 currentEvent,最后通过 Enqueue 方法,把新的事件插入队列中。如果理解了上一篇文章中的适配器的状态机,就会注意在每次读取日志前先判断适配器的状态是否为 Stopping ,并在日志读取空行(日志读完)后停止适配器运行。

当 Enqueue 的结果为 Full 时,说明队列已满,这次插入是失败的,而且当前的状态是 Suspended(由输出适配器或者其他的适配器导致)。所以一方面通过 Ready 方法将状态重置为 Running 好进行下一次的插入。同时为了节省内存,释放 currentEvent 。

这里要注意几个 return ,因为在这里说明直接退出了方法,循环中止,日志读取中止。直到再次调用 ProduceEvents 方法,也就是外部调用 Resume 方法(在整个Query过程中,Start 方法只会在初始时调用一次),才会再次启动循环,读取日志。

至于 CreateEventFromLine 方法,就是通过一行日志生成对应的事件。对于非类型化的适配器,事件负载要通过 SetField 方法赋值,这里通过 Config 中的 InputFieldOrders,将 csv 日志的各列分别对应到事件负载的各字段中。

主程序

主项目 TrafficJoinQuery 中的三个文件,在 EventTypes 中的两个类对应两种事件负载——测量日志与地理信息。这就体现了非类型化的适配器的优势——对于两种事件负载,只需要同一个适配器就可以了,负载字段在运行时根据配置信息动态确定

查询模板

Program中,最复杂的是 QueryTemplate 的创建。所谓 QueryTemplate,顾名思义,就是查询模板,通过预先设定一套计算方法和规则,将输入流转化为输出流。这里有两段 Linq 代码:

// Extend duration of each sensor reading, so that they fall in
// a one-minute sliding window. Group by sensor ID and calculate the
// average vehicular count per group within each window.
// Include the grouping key in the aggregation result.
var avgCount = from oneMinReading in sensorStream.AlterEventDuration(e => TimeSpan.FromMinutes(1))group oneMinReading by oneMinReading.SensorId into oneGroupfrom eventWindow in oneGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)select new { avgCount = eventWindow.Avg(e => e.VehicularCount), SensorId = oneGroup.Key };// Join sensors and locations. Moreover, filter the count
// result by a threshold, which is looked up based on the
// sensor location through a user-defined function.
var joined = from averageEvent in avgCountjoin locationData in locationStreamon averageEvent.SensorId equals locationData.SensorIdwhere averageEvent.avgCount > UserFunctions.LocationCountThreshold(locationData.LocationId)select new{SensorId = locationData.SensorId,LocationID = locationData.LocationId,VehicularCount = averageEvent.avgCount};

在第一段中先利用 AlterEventDuration 方法将每条记录的有效时间延续至一分钟——因为我们要统计的是一分钟的平均值。之后对 SensorId 做聚合分组,最后用 SnapshotWindow 方法截取每组每个时间段的平均值。这里 SnapshotWindow 可以认为是给事件流的横切面拍了一个快照,获取的是一个时间点上的数据。

而第二段就是将第一段获得的事件流与地点数据做连接,而且还利用 UserFunctions 提供的 LocationCountThreshold 方法过滤了一部分数据。最终我们得到的事件负载包含了 SensorId 、LocationID 、VehicularCount 三个字段。

关于聚合、连接、时间窗口以及其他的 Linq 语法,具体会在以后介绍。

查询绑定

有了查询模板,也只是打了一个空架子,只有连上输入、输出适配器,才能得到一个能实际运作的系统。在 BindQuery 方法中就将两个输入适配器和一个输出适配器与查询模板绑定在了一起。

两个输入适配器一个是边缘事件适配器,一个是时间段事件适配器。前者对应的是地理数据,因为边缘事件在没有接收到结束边缘事件时,它的结束时间是无穷大,也就是在整个查询过程中是有效的,正适合需要一直有效的地理数据。而时间段事件在生成时就明确了开始时间和结束时间,符合这里车数日志记录的情况。

输出适配器是点事件,说明我们要得到的结果是每个时间点意义上的值。

查询启动、停止与诊断

// Start the query
query.Start();// Wait for the query to be suspended - that is the state
// it will be in as soon as the output adapter stops due to
// the end of the stream.
DiagnosticView dv = server.GetDiagnosticView(query.Name);while ((string)dv[DiagnosticViewProperty.QueryState] == "Running")
{// Sleep for 1s and check againThread.Sleep(1000);dv = server.GetDiagnosticView(query.Name);
}// Retrieve some diagnostic information from the CEP server
// about the query.
Console.WriteLine(string.Empty);
RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/EventManager")), Console.Out);
RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/PlanManager")), Console.Out);
RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/Application/TrafficJoinSample/Query/TrafficSensorQuery")), Console.Out);query.Stop();

启动、停止不需细说。由于 query.Start() 后实际是适配器用另外的线程执行相应的方法(ProduceEvents),主线程需要等待适配器线程执行结束。所以这里用 DiagnosticView 获得当前查询的状态。直到不为 Running,才输出查询的诊断报告。最后停止查询。

这里的诊断报告会列出一些查询数据,比如总事件数、查询时间等。但从中很难看出查询的具体流程是怎样的,即使你进行调试,由于具体的查询实际是在各个线程中执行的,无法顺序跟踪事件的产生、计算、输出。所以,StreamInsight 提供了一个图形化的调试工具,StreamInsight Event Flow Debugger。关于这个工具的使用,会在下一篇文章详细介绍。

转载于:https://www.cnblogs.com/smjack/archive/2010/10/29/1864429.html

【原】StreamInsight 浅入浅出(四)—— 例子相关推荐

  1. 浅入浅出 1.7和1.8的 HashMap

    HashMap 是我们最最最常用的东西了,它就是我们在大学中学习数据结构的时候,学到的哈希表这种数据结构.面试中,HashMap 的问题也是常客,现在卷到必须答出来了,是必须会的知识. 我在学习 Ha ...

  2. [科普]浅入浅出Liunx Shellcode

    创建时间:2008-05-13 文章属性:原创 文章提交: pr0cess  (pr0cess_at_cnbct.org) 浅入浅出Liunx Shellcode /*---------------- ...

  3. 浅入深出之Java集合框架(上)

    Java中的集合框架(上) 由于Java中的集合框架的内容比较多,在这里分为三个部分介绍Java的集合框架,内容是从浅到深,如果已经有java基础的小伙伴可以直接跳到浅入深出之Java集合框架(下). ...

  4. 浅入深出之Java集合框架(中)

    Java中的集合框架(中) 由于Java中的集合框架的内容比较多,在这里分为三个部分介绍Java的集合框架,内容是从浅到深,如果已经有java基础的小伙伴可以直接跳到浅入深出之Java集合框架(下). ...

  5. 浅入浅出深度学习理论实践

    全文共9284个字,40张图,预计阅读时间30分钟. 前言 之前在知乎上看到这么一个问题:在实际业务里,在工作中有什么用得到深度学习的例子么?用到 GPU 了么?,回头看了一下自己写了这么多东西一直围 ...

  6. Spring浅入浅出——不吹牛逼不装逼

    Spring浅入浅出--不吹牛逼不装逼 前言: 今天决定要开始总结框架了,虽然以前总结过两篇,但是思维是变化的,而且也没有什么规定说总结过的东西就不能再总结了,是吧.这次总结我命名为浅入浅出,主要在于 ...

  7. Angular浅入深出系列 - 写在前面

    本系列目录: 写在前面 基础知识 控制器(Controller) 作用域(Scope) 集合(Collection) 模块(Module) 依赖注入(Dependency Injection) 服务( ...

  8. 处理中文乱码_浅入深出:一次提问引发的深思,从此再也不怕“乱码”问题

    这是恋习Python之浅入深出系列第3篇原创首发文章 作者|丁彦军 来源|恋习Python(ID:sldata2017) 转载请联系授权(微信ID:2394608316) 近日,有位粉丝向我请教,在爬 ...

  9. Spring注解浅入浅出——不吹牛逼不装逼

    Spring注解浅入浅出--不吹牛逼不装逼 前情提要 上文书咱们说了<Spring浅入浅出>,对Spring的核心思想看过上篇的朋友应该已经掌握了,此篇用上篇铺垫,引入注解,继续深入学习. ...

  10. 浅入浅出Oracle Spatial GeoRaster 10g影像数据管理(2)

    浅入浅出Oracle Spatial GeoRaster  10g 影像数据管理(2)--物理存储 1.物理存储方式概要      在上个部分<浅入浅出Oracle Spatial GeoRas ...

最新文章

  1. 鼠标控制,扇形的大小
  2. 四、设计模式——策略模式
  3. 【CodeForces - 1150C】Prefix Sum Primes(思维)
  4. SaaS 不懂留存!别玩
  5. android tv 帮助,android TV端各类焦点问题
  6. Asp.net防止盗链
  7. 华为手表表盘的数字什么意思_华为gt2表盘上的数字是什么意思
  8. spring boot面试问题集锦
  9. CCF-A类+B类+C类(2019)
  10. 51单片机之模拟IIC总线
  11. win10系统文件拖拽卡顿_终于找到Win10卡顿病根了!看完秒懂
  12. 【已解决】 Unable to attach or mount volumes: unmounted volumes
  13. 通信基础笔记 ----奈奎斯特和香农定理
  14. 深圳首辆数字人民币主题观光巴士亮相
  15. 阿里最新秋招面经,腾讯/美团/字节1万道Java中高级面试题
  16. 为何要从用户角度出发来思考问题
  17. ges resource dynamic 和 ges enqueues较高导致数据库宕机
  18. 应用上K8S:K8S集成Java应用
  19. linux怎么用命令打开wine,Linux系统运维:10分钟教你如何使用Wine在Linux下玩魔兽世界...
  20. 本题要求实现一个函数,输入一个正整数n(1<=n<=9),输出n行空心的数字金字塔。要求定义和调用函数hollow_pyramid(n)打印出n行空心的数字金字塔。

热门文章

  1. Pandas入门1(DataFrame+Series读写/Index+Select+Assign)
  2. 动态规划应用--“杨辉三角”最短路径 LeetCode 120
  3. POJ 1064 分割线缆(二分查找)
  4. python清洗文本数据_02.数据预处理之清洗文本信息
  5. python中response.text_Sanic response text() 函数用法和示例
  6. mqtt如何判断设备离线_反渗透纯水设备膜元件如何离线清洗?
  7. 岭回归预测PM2.5
  8. Git中非常重要的一个文件——.gitignore详解
  9. 全栈深度学习第3期: 怎样科学管理实验数据?
  10. 史上最强多线程面试44题和答案:线程锁+线程池+线程同步等