zoukankan      html  css  js  c++  java
  • IronPython and LINQ to Objects (V): 支持PLINQ

    当IronPython 2.6 Beta发布时,我阅读了《LINQ in Action》和DevHawkIronPython and LINQ to XML系列文章。受他们的启发,我撰写了三篇博文介绍了如何在IronPython中实现流水线风格的LINQ to Objects查询。

    当IronPython 2.6发布时,我发现IronPython改变了生成器(generator)的实现,于是撰写了第四篇博文,介绍如何利用Adapter模式和Iterator模式来配接IronPython生成器和LINQ的IEnumerable<Object>接口。

    不久前,IronPython 2.6.1和.NET Framework 4发布。为了支持.NET 4引入的PLINQ,我重构了linq.py,并在IronPython 2.6.1上测试通过。本文将介绍这次更新的实作方法。

     

    1. 调用扩展函数

    LINQ的底层基础是查询操作符,它们被实现为一批扩展函数。LINQ to Objects的查询操作符是定义在System.Linq.Enumerable中的扩展函数;PLINQ的查询操作符是定义在Sytem.Linq.ParallelEnumerable中的扩展函数。这些扩展函数是有数个重载的范型函数,例如Enumerable.Where是两个范型函数:

    static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source, Func<TSource, bool> predicate);

    static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source, Func<TSource, int, bool> predicate);

    对于C#这样的静态语言,范型函数的实例化和重载函数的选择是在编译时完成的。编译器利用静态类型系统提供的类型信息,使用类型推演生成具现函数,并使用重载规则将函数调用绑定到具现函数。IronPython是动态语言,范型函数具现和重载绑定是在运行时由IronPython引擎完成的。如果IronPython引擎不能完成类型推演或重载绑定,它会抛出运行时异常。

    1: Enumerable.Where([1, 2, 3], lambda x: x > 1)
    2: Enumerable.Where((i for i in [1, 2, 3]), lambda x: x > 1)

    在IronPython 2.6.1中,第一行的代码可以正确执行,说明IronPython引擎可以对列表(list)完成类型推演和重载绑定。但是,第2行代码将抛出异常:”TypeError: The type arguments for method 'Where' cannot be inferred from the usage. Try specifying the type arguments explicitly.” 这说明IronPython引擎对生成器(generator)无法完成类型推演,它要求程序员显式地指定范型函数的参数类型。

    考虑到生成器是Python实现“延迟求值”最重要的惯用法,而“延迟求值”是LINQ最重要的优点之一,我为每一个扩展函数都进行了如下包装,以显式指定范型函数的类型参数。因为IronPython的所有类型都继承自object(即System.Object),所以我将Where的类型参数TSource绑定到object,而IronPython引擎会完成剩下的工作。

    def Where(col, fun):

        return Enumerable.Where[object](col, Func[object, bool](fun))

    在我看来,以上代码展示了IronPython的本质困难之一:如何将动态类型配接到静态类型系统、如何在动态语言中调用为静态语言编写的程序库。IronPython团队给出了一个较好的解决方案:IronPython引擎在大多数情况下可以完成类型推演和重载绑定;当它需要程序员帮助时,程序员只要编写少量代码就可以完成任务。

     

    2. 构建流水线风格的查询

    C#允许对象像调用成员函数一样调用扩展函数,从而方便地编写出流水线风格的代码。IronPython将扩展函数视为普通的静态函数,不支持使用成员函数的语法调用扩展函数。为了构建流水线风格的查询,linq.py提供了一个包装类LinqWrapper,其核心代码如下。

    1:  def pipeline(func):
    2:      def wrapper(self, *args, **kws):
    3:          result = func(self, *args, **kws)
    4:          return LinqWrapper(self.ext, result)
    5:      return wrapper
    6:   
    7:  class LinqWrapper(object):
    8:      def __init__(self, ext, col):
    9:          self.ext = ext
    10:          self.col = col
    11:   
    12:      def Count(self, fun = lambda x: True):
    13:          return self.ext.Count[object](self.col, Func[object, bool](fun))
    14:   
    15:      @pipeline
    16:      def Where(self, fun):
    17:          return self.ext.Where[object](self.col, Func[object, bool](fun))
    18:   
    19:  assert (LinqWapper(Enumerable, [1,2,3])
    20:      .Where(lambda x: x > 1)
    21:      .Count()) == 2

    LinqWarpper用于包装Enumerable、ParallelEnumerable等实现了LINQ扩展函数的静态类。它的初始化函数有两个参数:ext代表被包装的静态类,col代表待处理的数据源。在LINQ to Objects中,数据源是实现了接口IEnumerable<TSource>的对象;在PLINQ中,数据源是实现了接口ParallelQuery<TSource>的对象。函数pipeline是修饰器(decorator),用于修饰LinqWarpper中返回数据源的函数。它将数据源封装为一个新的LinqWarpper对象,这样便可以实现19~21行的流水线风格的代码

    • 第19行,LinqWarpper包装了LINQ to Objects的静态类Enumerable,并作用于列表[1, 2, 3]。
    • 第20行,调用LinqWrapper.Where,该函数调用Enumerable.Where,并返回查询结果。由于LinqWrapper.Where被装饰器pipeline修饰,LinqWrapper.Where的返回值是pipeline的内嵌函数wrapper的返回值:一个新的LinqWrapper对象,它包装了Eunumerable,并作用于查询结果。
    • 第21行,调用LinqWrapper.Count,该函数调用Enumerable.Count,并返回查询结果。

     

    3. 一些辅助函数

    为了简化流水线查询的构建,linq.py提供了一批辅助函数。

    1:  def From(col):
    2:      if is_parallel_enumerable(col):
    3:          return LinqWrapper(ParallelEnumerable, col)
    4:      else:
    5:          return LinqWrapper(Enumerable, get_enumerable(col))
    6:     
    7:  def PFrom(col):
    8:      col = get_enumerable(col)
    9:      col = ParallelEnumerable.AsParallel(col)
    10:      return LinqWrapper(ParallelEnumerable, col)
    11:   
    12:  def get_enumerable(col):
    13:      return col if is_enumerable(col) else (c for c in col)
    14:   
    15:  def is_enumerable(obj):
    16:      return isinstance(obj, System.Collections.IEnumerable)
    17:     
    18:  def is_parallel_enumerable(obj):
    19:      return str(type(obj)) == "<type 'ParallelEnumerable'>"

     

    • 第1~5行,函数From的输入是IronPython中的可迭代对象,输出是LinqWrapper对象,其目的是简化LINQ to Objects查询的构造。
    • 第7~10行,函数PFrom的输入是IronPython的可迭代对象,输出是封装了ParrallelEnumerable的LinqWrapper对象,其目的是简化PLINQ查询的构造。
    • 第12~13行,函数get_enumerable检查可迭代对象col是否实现了接口IEnumerable,如果没有实现,则使用生成器将其配接到IEnumerable。在IronPython 2.6.1中,生成器对象实现了接口IEnumerable,可作为LINQ查询操作符的输入。

    有了以上辅助函数,我们就可以方便地构造LINQ查询。

    From([1, 2, 3]).Where(lambda x: x > 1).Count()

    PFrom([1, 2, 3]).Count(lambda x: x > 1)

    4. 测试

     

    本系列的最大缺点是没有提供测试用例以构建安全网。这使得linq.py在IronPython版本升级和重构过程中显得非常脆弱。为了亡羊补牢,我在最新的linq.py中增加了一批测试用例。详细测试逻辑可参考源代码,这里仅列出被测试覆盖的对象。

    li = [1, 2, 3, 4, 5]

    test(lambda: System.Array[System.Int32](li))

    test(lambda: System.Collections.ArrayList(li))

    test(lambda: li)

    test(lambda: tuple(li))

    test(lambda: set(li))

    test(lambda: dict((i, i) for i in li))

    test(lambda: (i for i in li))

    test(lambda: MyContainer(li))

    由以上代码可知,目前的测试覆盖了.NET强类型容器(整型数组)、弱类型容器(ArrayList)、Python内建容器(列表、元组、集合、字典)、生成器、以及自定义容器(参考源代码可知,该容器支持序列协议(sequence protocol),即实现了函数__getitem__)。

     

    5. linq.py

    最后列出linq.py的全部源代码。感谢IronPython团队将IronPython平滑地集成于CLR,使得我们可以用简短的代码将LINQ的威力引入Python的世界。

       1:  import clr
       2:  clr.AddReference('System.Core')
       3:   
       4:  import System
       5:  from System.Linq import Enumerable, ParallelEnumerable
       6:  from System import Func
       7:
       8:  def pipeline(func):
       9:      def wrapper(self, *args, **kws):
      10:          result = func(self, *args, **kws)
      11:          return LinqWrapper(self.ext, result)
      12:      return wrapper
      13:   
      14:  class LinqWrapper(object):
      15:      def __init__(self, ext, col):
      16:          self.ext = ext
      17:          self.col = col
      18:   
      19:      def __iter__(self):
      20:          return iter(self.col)
      21:         
      22:      def __str__(self):
      23:          return '[%s]' % ', '.join( (str(v) for v in self) )  24:         
      25:      def __repr__(self):
      26:          return str(self)
      27:         
      28:      def Count(self, fun = lambda x: True):
      29:          return self.ext.Count[object](self.col, Func[object, bool](fun))
      30:   
      31:      @pipeline
      32:      def Distinct(self):
      33:          return self.ext.Distinct[object](self)
      34:   
      35:      def First(self):
      36:          return self.ext.First[object](self)
      37:   
      38:      @pipeline
      39:      def GroupBy(self, fun):
      40:          return self.ext.GroupBy[object, object](self.col, Func[object, object](fun))
      41:   
      42:      @pipeline
      43:      def Join(outer, inner, outerKey, innerKey, fun):
      44:          return self.ext.Join[object, object, object, object](outer, inner
      45:              , Func[object, object](outerKey), Func[object, object](innerKey)
      46:              , Func[object, object, object](fun))

      48:      @pipeline
      49:      def OrderBy(self, fun):
      50:          return self.ext.OrderBy[object, object](self.col, Func[object, object](fun))
      51:   
      52:      @pipeline
      53:      def OrderByDesc(self, fun):
      54:          return self.ext.OrderByDescending[object, object](self.col, Func[object, object](fun))
      55:     
      56:      @pipeline
      57:      def ThenBy(self, fun):
      58:          return self.ext.ThenBy[object, object](self.col, Func[object, object](fun))
      59:   
      60:      @pipeline
      61:      def ThenByDesc(self, fun):
      62:          return self.ext.ThenByDescending[object, object](self.col, Func[object, object](fun))
      63:   
      64:      @pipeline
      65:      def Take(self, count):
      66:        return self.ext.Take[object](self.col, count)
      67:   
      68:      @pipeline
      69:      def Select(self, fun):
      70:          return self.ext.Select[object, object](self.col, Func[object, object](fun))
      71:   
      72:      def Single(self, fun):
      73:          return self.ext.Single[object](self.col, Func[object, bool](fun))
      74:   
      75:      @pipeline
      76:      def Where(self, fun):
      77:          return self.ext.Where[object](self.col, Func[object, bool](fun))
      78:   
      79:  def From(col):
      80:      if is_parallel_enumerable(col):
      81:          return LinqWrapper(ParallelEnumerable, col)
      82:      else:
      83:          return LinqWrapper(Enumerable, get_enumerable(col))
      84:     
      85:  def PFrom(col):
      86:      col = get_enumerable(col)
      87:      col = ParallelEnumerable.AsParallel(col)
      88:      return LinqWrapper(ParallelEnumerable, col)
      89:   
      90:  def get_enumerable(col):
      91:      return col if is_enumerable(col) else (c for c in col)
      92:   
      93:  def is_enumerable(obj):
      94:      return isinstance(obj, System.Collections.IEnumerable)
      95:     
      96:  def is_parallel_enumerable(obj):
      97:      return str(type(obj)) == "<type 'ParallelEnumerable'>"
      98:   
      99:  if __name__ == '__main__':
     100:      def test_query(ctr, from_):
     101:          print 'test_query', from_.__name__
     102:          query = (
     103:              from_(ctr())
     104:              .Where(lambda x: x > 2)
     105:              .Select(lambda x: x * 2)
     106:          )
     107:          expect = set(r for r in query)
     108:          actual = set(x * 2 for x in (y for y in ctr() if y > 2))
     109:          assert expect == actual, expect
     110:         
     111:      def test_count(ctr, from_):
     112:          print 'test_count', from_.__name__
     113:          expect = from_(ctr()).Where(lambda x: x > 2).Count()
     114:          actual = len([x for x in ctr() if x > 2])
     115:          assert expect == actual, expect
     116:         
     117:      def show(ctr):
     118:          col = ctr()
     119:          print 'type', type(col)
     120:          print 'clrtype', clr.GetClrType(type(col))
     121:          print 'is_enumerable', isinstance(col, System.Collections.IEnumerable)
     122:          print 'is_enumerable[object]', isinstance(col, System.Collections.Generic.IEnumerable[object])
     123:          print 'is_enumerable[Int32]', isinstance(col, System.Collections.Generic.IEnumerable[System.Int32])
     124:   
     125:      def test(ctr):
     126:          show(ctr)
     127:          for from_ in (From, PFrom):
     128:              test_query(ctr, from_)
     129:              test_count(ctr, from_)
     130:             
     131:      class MyContainer(object):
     132:          def __init__(self, col):
     133:              self.col = col
     134:             
     135:          def __getitem__(self, idx):
     136:              return self.col[idx]
     137:   
     138:      li = [1, 2, 3, 4, 5]
     139:      test(lambda: System.Array[System.Int32](li))
     140:      test(lambda: System.Collections.ArrayList(li))
     141:      test(lambda: li)
     142:      test(lambda: tuple(li))
     143:      test(lambda: set(li))
     144:      test(lambda: dict((i, i) for i in li))
     145:      test(lambda: (i for i in li))
     146:      test(lambda: MyContainer(li))

  • 相关阅读:
    机器学习书籍推荐
    25个机器学习面试题,期待你来解答
    观点 | 如何优雅地从四个方面加深对深度学习的理解
    Azure Public IP DNS域名
    SSH不允许Root登陆的方法
    MySQL on Azure高可用性设计 DRBD
    Linux ssh 不需要输入密码的方法
    MySQL on Azure高可用性设计 DRBD
    Express Route的配置
    Azure PIP (Instance Level Public IP)
  • 原文地址:https://www.cnblogs.com/liangshi/p/1726413.html
Copyright © 2011-2022 走看看