StreamInsight 学习地址:http://www.cnblogs.com/StreamInsight/archive/2011/10/26/StreamInsight-Query-Series-Summarizing.html
有的时候我就想快速地试验一个技术想法,而不想因此去构建一个完整的解决方案。StreamInsight作为一个新产品,也有这样的问题。理想情况,我们应当可以试验一个产品,将其运行在某一个场景下并作出快速的评估。现在有了LINQPad,StreamInsight的快速试验变为了可能。这篇博文将一步步介绍如何安装组件和使用各种各样的数据源测试StreamInsight查询。如果你能坚持阅读下来,作为奖励,我会告诉你怎样使用OData数据,并在其上执行LINQ查询。
第一步:安装StreamInsight 1.1
你需要安装StreamInsight的第二个发布版本来使用LINQPad集成功能,从微软下载中心可以下载到StreamInsight 1.1。安装StreamInsight时,你可以选择评估版本,也可以使用SQL Server 2008 R2 许可密钥(可以在SQL Server安装媒介x86DefaultSetup.ini中找到)来进行激活。
第二步:安装LINQPad 4.0
你可以选择LINQPad的免费版本(从这里下载),也可以选择具有内置智能感知功能的付费版本。
第三步:添加StreamInsight的LINQPad驱动
当运行LINQPad时,你可以看到一个叫做“Add connection”的选项。
之后你会看到一些内置的驱动,包括LINQ-to-SQL和OData等。
选择“View more drivers…”按钮,你会看到由微软创建的最新StreamInsight驱动。
这个驱动安装时间大约200毫秒,然后你就可以在LINQPad驱动列表中看到它了。
第四步:创建与StreamInsight驱动的新连接
如果窗口还处于打开状态选择驱动,否则回到LINQPad,选择“Add connection”,接下去在“Choose Data Context”向导页上点击Next按钮。现在你会看到一个标题为“StreamInsight Context Chooser”的弹出窗口,在其中你可以选择微软提供的数据集,也可以选择新的上下文。这里我选择Default Context。
第五步:编写简单的查询
现在我们连接到了StreamInsight的上下文,在编写查询之前请确保“Language”值选择为“C# Statements”,且“Database”值为“StreamInsight: Default Context”。
由于默认上下文中并没有一个输入数据源,因此我们创建了一个简单的点事件集合,并将其转换为流进行处理。我们的第一个查询获取所有Count大于4的事件。
//define event collection var source = new[] { PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "ABC", Type="Customer", Count=4 }), PointEvent.CreateInsert(new DateTime(2010, 12, 2), new { ID = "DEF", Type="Customer", Count=9 }), PointEvent.CreateInsert(new DateTime(2010, 12, 3), new { ID = "GHI", Type="Partner", Count=5 }) }; //convert to stream var input = source.ToStream(Application,AdvanceTimeSettings.IncreasingStartTime); var largeCount = from i in input where i.Count > 4 select i; //emit results to LINQPad largeCount.Dump();
输出结果如下,注意只有两条记录输出。
(译者注:StartTime是GMT时间,不同时区下结果会不一样)
为了更多地展示一些StreamInsight的能力,我创建了另外一个查询,该查询在三个事件(放在同一天以使得所有的点事件在同一个快照中)之上创建一个快照窗口,并对每一个类型计算Count值之和。
var source = new[] { PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "ABC", Type="Customer", Count=4 }), PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "DEF", Type="Customer", Count=9 }), PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "GHI", Type="Partner", Count=5 }) }; var input = source.ToStream(Application, AdvanceTimeSettings.IncreasingStartTime); var custSum = from i in input group i by i.Type into TypeGroups from window in TypeGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { Type = TypeGroups.Key, TypeSum = window.Sum(e => e.Count) }; custSum.Dump();
这个查询同样得到两个消息,但是请注意新的TypeSum值是所有Type值相同事件的聚合。
通过上面5个步骤(花费大约8分钟的时间),我们安装了本机组件并且成功执行了一对StreamInsight查询。
其实现在我就可以停笔了,但是还是让我们尝试一下更有趣的事情吧。试想如何在一个已有的OData数据源执行一个查询?下面的另外三个步骤将进一步展示LINQPad和StreamInsight。
*第六步:编写Northwind条目的OData连接
点击LINQPad中的“Add connection”按钮,选择“WCF Data Services(OData)”驱动,接下去选择OData作为提供者,并在“URI”文本框中填入OData源
(http://services.odata.org/Northwind/Northwind.svc),点击OK。在LINQPad中你会看到Northwind OData源中包含的所有实体。
现在让我们执行一个非常简单的查询。这个查询遍历所有的员工(Employee)记录,最后输出每一个员工的ID,受雇日期和国家。
var emps = from e in Employees orderby e.HireDate ascending select new { HireDate = (DateTime)e.HireDate, EmpId = e.EmployeeID, Country = e.Country }; emps.Dump();
输出结果如下:
*第七步:在Northwind数据之上增添StreamInsight查询能力
如果希望在一个特定窗口时间内,按照国家分类来查看员工的雇佣情况该怎么做呢?我们完全可以通过一个直接的LINQ查询做到,但是这样又有什么乐趣呢?言归正传,想想看在员工数据上使用实时分析能做出什么样的有趣事情,当然这个话题并不是这篇博文要谈的重点。
LINQPad仅允许一次使用一个数据上下文,因此想要同时使用OData源和StreamInsight查询,我们需要做一点变通。Mark Simms已经写了一篇颇有深度的文章来解释这个问题,这里我给出一个简短的版本。
右击包含OData查询的LINQPad查询选项卡,选择“Query Properties”,添加StreamInsight dll的引用。点击“Additional Reference”选项卡中的“Add”按钮,查找并选择Microsoft.ComplexEventProcessing.dll和Microsoft.ComplexEventProcessing.Observable.dll(如果看不到这两个dll的话,记得选中Show GAC Assemblies选项)。
切换到“Additional Namespace Imports”选项卡,手工输入查询所需要的命名空间。
现在我们准备在OData源上使用StreamInsight LINQ查询了。
*第八步:在Northwind数据之上增添StreamInsight查询能力
我把先前写好的查询拷贝过来,在此基础上进行后续工作。
查询下方,我实例化了一个StreamInsight “server”用来托管查询,并定义了一个StreamInsight 应用来包含查询。接下去,我将OData结果转换为了CEP流,并创建了一个使用Tumbling窗口的StreamInsight查询,用以输出每60天窗口中的每个国家的雇用人数,最后将结果输出到LINQPad中。
var emps = from e in Employees orderby e.HireDate ascending select new { HireDate = (DateTime)e.HireDate, EmpId = e.EmployeeID, Country = e.Country }; //define StreamInsight server using (Server siServer = Server.Create("RSEROTERv2")) { //create StreamInsight app Application empApp = siServer.CreateApplication("demo"); //map odata query to the StreamInsight input stream var empStream = emps.ToPointStream(empApp, s => PointEvent.CreateInsert(s.HireDate, s), AdvanceTimeSettings.IncreasingStartTime); var counts = from f in empStream group f by f.Country into CountryGroup from win in CountryGroup.TumblingWindow(TimeSpan.FromDays(60), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { EmpCountry = CountryGroup.Key, Count = win.Count() }; //turn results into enumerable var sink = from g in counts.ToPointEnumerable() where g.EventKind == EventKind.Insert select new { WinStart = g.StartTime, Country = g.Payload.EmpCountry, Count = g.Payload.Count }; sink.Dump(); }
查询的结果如下图:
小结
现在你应该有所了解了!前面5个步骤大概可以在10分钟内完成,而剩下的星号步骤可以在另外5分钟内完成。
这是一个相当快速,低投资的方法来尝鲜一个强大产品。