数组运算加速是至关科学计算重要的领域,本节我们以一个简单函数为例,使用C语言为python数组加速。
一、Cython
本函数为一维数组修剪最大最小值
version1
@cython.boundscheck(False) @cython.wraparound(False) cpdef clip(double[:] a, double min, double max, double[:] out): ''' Clip the values in a to be between min and max. Result in out ''' if min > max: raise ValueError("min must be <= max") if a.shape[0] != out.shape[0]: raise ValueError("input and output arrays must be the same size") for i in range(a.shape[0]): if a[i] < min: out[i] = min elif a[i] > max: out[i] = max else: out[i] = a[i]
利用Cython类型的内存视图,极大的简化了数组的操作。
cpdef clip()
声明了clip()
同时为C级别函数以及Python级别函数。 在Cython中,这个是很重要的,因为它表示此函数调用要比其他Cython函数更加高效 (比如你想在另外一个不同的Cython函数中调用clip())。- 类型参数
double[:] a
和double[:] out
声明这些参数为一维的双精度数组。 作为输入,它们会访问任何实现了内存视图接口的数组对象,这个在PEP 3118有详细定义。 包括了NumPy中的数组和内置的array库。 clip()
定义之前的两个装饰器可以优化下性能:@cython.boundscheck(False)
省去了所有的数组越界检查, 当你知道下标访问不会越界的时候可以使用它@cython.wraparound(False)
消除了相对数组尾部的负数下标的处理(类似Python列表)
version2_条件表达式
任何时候处理数组时,研究并改善底层算法同样可以极大的提示性能
@cython.boundscheck(False) @cython.wraparound(False) cpdef clip(double[:] a, double min, double max, double[:] out): if min > max: raise ValueError("min must be <= max") if a.shape[0] != out.shape[0]: raise ValueError("input and output arrays must be the same size") for i in range(a.shape[0]): out[i] = (a[i] if a[i] < max else max) if a[i] > min else min
version3_释放GIL
释放GIL,这样多个线程能并行运行,要这样做的话,需要修改代码,使用 with nogil:
@cython.boundscheck(False) @cython.wraparound(False) cpdef clip(double[:] a, double min, double max, double[:] out): if min > max: raise ValueError("min must be <= max") if a.shape[0] != out.shape[0]: raise ValueError("input and output arrays must be the same size") with nogil: for i in range(a.shape[0]): out[i] = (a[i] if a[i] < max else max) if a[i] > min else min
编写setup.py
from distutils.core import setup from distutils.extension import Extension from Cython.Distutils import build_ext ext_modules = [ Extension('sample', ['sample.pyx']) ] setup( name = 'Sample app', cmdclass = {'build_ext': build_ext}, ext_modules = ext_modules )
使用 python3 setup.py build_ext --inplace
来构建它
效率测试示意如下,
>>> import sample >>> import numpy >>> b = numpy.random.uniform(-10,10,size=1000000) >>> c = numpy.zeros_like(b)
>>> import timeit
>>> timeit.timeit('numpy.clip(b,-5,5,c)','from __main__ import b,c,numpy',number=1000)
>>> timeit.timeit('sample.clip(b,-5,5,c)','from __main__ import b,c,sample', ... number=1000)
其中使用numpy自己的clip对比试验,
2.6287411409430206 # numpy
2.8034782900940627 # v1
2.7247575907967985 # v2
2.6071253868285567 # v3
版本三近似于numpy的实现效果,其他版本差一些(每次试验结果都会略有差异,这里只是粗略的比较一下)。
二维数组处理版本参考:
@cython.boundscheck(False) @cython.wraparound(False) cpdef clip2d(double[:,:] a, double min, double max, double[:,:] out): if min > max: raise ValueError("min must be <= max") for n in range(a.ndim): if a.shape[n] != out.shape[n]: raise TypeError("a and out have different shapes") for i in range(a.shape[0]): for j in range(a.shape[1]): if a[i,j] < min: out[i,j] = min elif a[i,j] > max: out[i,j] = max else: out[i,j] = a[i,j]
二、自己写接口
sample.c中添加
/* n:longth of array */ void clip(double *a, int n, double min, double max, double *out) { double x; for (; n >= 0; n--, a++, out++) { x = *a; *out = x > max ? max : (x < min ? min : x); } }
pysample.c中添加
// void clip(double *a, int n, double min, double max, double *out); static PyObject *py_clip(PyObject *self, PyObject *args){ PyObject *a, *out; int min, max; if(!PyArg_ParseTuple(args, "OiiO", &a, &min, &max, &out)){ //py数组对象暂记 return NULL; } // printf("%i, %i ", min, max); Py_buffer view_a, view_out; //py数组对象接收对象 if (PyObject_GetBuffer(a, &view_a, PyBUF_ANY_CONTIGUOUS | PyBUF_FORMAT) == -1) { return NULL; } if (PyObject_GetBuffer(out, &view_out, PyBUF_ANY_CONTIGUOUS | PyBUF_FORMAT) == -1) { return NULL; } clip(view_a.buf, view_a.shape[0], min, max, view_out.buf); PyBuffer_Release(&view_a); PyBuffer_Release(&view_out); return Py_BuildValue(""); }
函数登记处添加
{"clip", py_clip, METH_VARARGS, "clip array"},
则可,实际测试发现,手动接口效率也很高,和numpy同一水平。