  • 机器学习sklearn(三十一):Pipeline(管道)和 FeatureUnion(特征联合): 合并的评估器

    变换器(Transformers)通常与分类器,回归器或其他的学习器组合在一起以构建复合估计器。 完成这件事的最常用工具是 Pipeline。 Pipeline 经常与 FeatureUnion 结合起来使用。 FeatureUnion 用于将变换器(transformers)的输出串联到复合特征空间(composite feature space)中。 TransformedTargetRegressor 用来处理变换 target (即对数变化 y)。 作为对比,Pipelines类只用来变换(transform)观测数据(X)。

    1. Pipeline: 链式评估器

    Pipeline 可以把多个评估器链接成一个。这个是很有用的,因为处理数据的步骤一般都是固定的,例如特征选择、标准化和分类。Pipeline 在这里有多种用途:

    • 便捷性和封装性 你只要对数据调用 fit和 predict 一次来适配所有的一系列评估器。
    • 联合的参数选择 你可以一次grid search管道中所有评估器的参数。
    • 安全性 训练转换器和预测器使用的是相同样本,管道有助于防止来自测试数据的统计数据泄露到交叉验证的训练模型中。

    管道中的所有评估器,除了最后一个评估器,管道的所有评估器必须是转换器。 (例如,必须有 transform 方法). 最后一个评估器的类型不限(转换器、分类器等等)

    1.1. 用法

    1.1.1. 构造

    Pipeline 使用一系列 (key, value) 键值对来构建,其中 key 是你给这个步骤起的名字, value 是一个评估器对象:

    >>> from sklearn.pipeline import Pipeline
    >>> from sklearn.svm import SVC
    >>> from sklearn.decomposition import PCA
    >>> estimators = [('reduce_dim', PCA()), ('clf', SVC())]
    >>> pipe = Pipeline(estimators)
    >>> pipe
             steps=[('reduce_dim', PCA(copy=True,...)),
                    ('clf', SVC(C=1.0,...))], verbose=False)

    功能函数 make_pipeline 是构建管道的缩写; 它接收多个评估器并返回一个管道,自动填充评估器名:

    >>> from sklearn.pipeline import make_pipeline
    >>> from sklearn.naive_bayes import MultinomialNB
    >>> from sklearn.preprocessing import Binarizer
    >>> make_pipeline(Binarizer(), MultinomialNB())
             steps=[('binarizer', Binarizer(copy=True, threshold=0.0)),
                    ('multinomialnb', MultinomialNB(alpha=1.0,

    1.1.2. 访问步骤

    管道中的评估器作为一个列表保存在 steps 属性内,但可以通过索引或名称([idx])访问管道:

    >>> pipe.steps[0]  
    ('reduce_dim', PCA(copy=True, iterated_power='auto', n_components=None,
                       random_state=None, svd_solver='auto', tol=0.0,
    >>> pipe[0]  
    PCA(copy=True, iterated_power='auto', n_components=None, random_state=None,
        svd_solver='auto', tol=0.0, whiten=False)
    >>> pipe['reduce_dim']  
    PCA(copy=True, ...)


    >> pipe.named_steps.reduce_dim is pipe['reduce_dim']


    >>> pipe[:1]
    Pipeline(memory=None, steps=[('reduce_dim', PCA(copy=True, ...))],...)
    >>> pipe[-1:]
    Pipeline(memory=None, steps=[('clf', SVC(C=1.0, ...))],...)

    1.1.3. 嵌套参数

    管道中的评估器参数可以通过 <estimator>__<parameter> 语义来访问:

    >>> pipe.set_params(clf__C=10)
             steps=[('reduce_dim', PCA(copy=True, iterated_power='auto',...)),
                    ('clf', SVC(C=10, cache_size=200, class_weight=None,...))],


    >>> from sklearn.model_selection import GridSearchCV
    >>> param_grid = dict(reduce_dim__n_components=[2, 5, 10],
    ...                   clf__C=[0.1, 10, 100])
    >>> grid_search = GridSearchCV(pipe, param_grid=param_grid)

    单独的步骤可以用多个参数替换,除了最后步骤,其他步骤都可以设置为 passthrough 来跳过

    >>> from sklearn.linear_model import LogisticRegression
    >>> param_grid = dict(reduce_dim=['passthrough', PCA(5), PCA(10)],
    ...                   clf=[SVC(), LogisticRegression()],
    ...                   clf__C=[0.1, 10, 100])
    >>> grid_search = GridSearchCV(pipe, param_grid=param_grid)


    >>> pipe[0]  
    PCA(copy=True, ...)

    示例 :


    1.2. 注意

    对管道调用 fit 方法的效果跟依次对每个评估器调用 fit 方法一样, 都是transform 输入并传递给下个步骤。 管道中最后一个评估器的所有方法,管道都有。例如,如果最后的评估器是一个分类器, Pipeline 可以当做分类器来用。如果最后一个评估器是转换器,管道也一样可以。

    1.3. 缓存转换器:避免重复计算

    适配转换器是很耗费计算资源的。设置了memory 参数, Pipeline 将会在调用fit方法后缓存每个转换器。 如果参数和输入数据相同,这个特征用于避免重复计算适配的转换器。典型的示例是网格搜索转换器,该转化器只要适配一次就可以多次使用。

    memory 参数用于缓存转换器。memory 可以是包含要缓存的转换器的目录的字符串或一个 joblib.Memory 对象:

    >>> from tempfile import mkdtemp
    >>> from shutil import rmtree
    >>> from sklearn.decomposition import PCA
    >>> from sklearn.svm import SVC
    >>> from sklearn.pipeline import Pipeline
    >>> estimators = [('reduce_dim', PCA()), ('clf', SVC())]
    >>> cachedir = mkdtemp()
    >>> pipe = Pipeline(estimators, memory=cachedir)
    >>> pipe
             steps=[('reduce_dim', PCA(copy=True,...)),
                    ('clf', SVC(C=1.0,...))], verbose=False)
    >>> # Clear the cache directory when you don't need it anymore
    >>> rmtree(cachedir)


    使用 Pipeline 而不开启缓存功能,还是可以通过查看原始实例的,例如:

    >> from sklearn.datasets import load_digits
    >> digits = load_digits()
    >> pca1 = PCA()
    >> svm1 = SVC(gamma='scale')
    >> pipe = Pipeline([('reduce_dim', pca1), ('clf', svm1)])
    >> pipe.fit(digits.data, digits.target)
               steps=[('reduce_dim', PCA(...)), ('clf', SVC(...))],
    >> # The pca instance can be inspected directly
    >> print(pca1.components_)
          [[-1.77484909e-19  ... 4.07058917e-18]]

    开启缓存会在适配前触发转换器的克隆。因此,管道的转换器实例不能被直接查看。 在下面示例中, 访问 PCA 实例 pca2 将会引发 AttributeError 因为 pca2 是一个未适配的转换器。 这时应该使用属性 named_steps 来检查管道的评估器:

    >> cachedir = mkdtemp()
    >> pca2 = PCA()
    >> svm2 = SVC(gamma='scale')
    >> cached_pipe = Pipeline([('reduce_dim', pca2), ('clf', svm2)],memory=cachedir)
    >> cached_pipe.fit(digits.data, digits.target)
                steps=[('reduce_dim', PCA(...)), ('clf', SVC(...))],
    >> print(cached_pipe.named_steps['reduce_dim'].components_)
          [[-1.77484909e-19  ... 4.07058917e-18]]
    >> # Remove the cache directory
    >> rmtree(cachedir)

    示例 :

    2. 回归中的目标转换


    >>> import numpy as np
    >>> from sklearn.datasets import load_boston
    >>> from sklearn.compose import TransformedTargetRegressor
    >>> from sklearn.preprocessing import QuantileTransformer
    >>> from sklearn.linear_model import LinearRegression
    >>> from sklearn.model_selection import train_test_split
    >>> boston = load_boston()
    >>> X = boston.data
    >>> y = boston.target
    >>> transformer = QuantileTransformer(output_distribution='normal')
    >>> regressor = LinearRegression()
    >>> regr = TransformedTargetRegressor(regressor=regressor,
    ...                                   transformer=transformer)
    >>> X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
    >>> regr.fit(X_train, y_train)
    >>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
    R2 score: 0.67
    >>> raw_target_regr = LinearRegression().fit(X_train, y_train)
    >>> print('R2 score: {0:.2f}'.format(raw_target_regr.score(X_test, y_test)))
    R2 score: 0.64


    >>> def func(x):
    ...     return np.log(x)
    >>> def inverse_func(x):
    ...     return np.exp(x)


    >>> regr = TransformedTargetRegressor(regressor=regressor,
    ...                                   func=func,
    ...                                   inverse_func=inverse_func)
    >>> regr.fit(X_train, y_train)
    >>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
    R2 score: 0.65


    >>> def inverse_func(x):
    ...     return x
    >>> regr = TransformedTargetRegressor(regressor=regressor,
    ...                                   func=func,
    ...                                   inverse_func=inverse_func,
    ...                                   check_inverse=False)
    >>> regr.fit(X_train, y_train)
    >>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
    R2 score: -4.50

    注意 可以通过设置transformer或函数对func和inverse_func来触发转换。但是,同时设置这两个选项会产生错误。


    3. FeatureUnion(特征联合): 复合特征空间

    FeatureUnion 合并了多个转换器对象形成一个新的转换器,该转换器合并了他们的输出。一个 FeatureUnion 可以接收多个转换器对象。在适配期间,每个转换器都单独的和数据适配。 对于转换数据,转换器可以并发使用,且输出的样本向量被连接成更大的向量。

    FeatureUnion 功能与 Pipeline 一样- 便捷性和联合参数的估计和验证。

    可以结合:FeatureUnion和 Pipeline 来创造出复杂模型。

    (一个 FeatureUnion 没办法检查两个转换器是否会产出相同的特征。它仅仅在特征集合不相关时产生联合并确认是调用者的职责。)

    3.1. 用法

    一个 FeatureUnion 是通过一系列 (key, value) 键值对来构建的,其中的 key 给转换器指定的名字 (一个绝对的字符串; 他只是一个代号), value 是一个评估器对象:

    >>> from sklearn.pipeline import FeatureUnion
    >>> from sklearn.decomposition import PCA
    >>> from sklearn.decomposition import KernelPCA
    >>> estimators = [('linear_pca', PCA()), ('kernel_pca', KernelPCA())]
    >>> combined = FeatureUnion(estimators)
    >>> combined
                 transformer_list=[('linear_pca', PCA(copy=True,...)),
                                   ('kernel_pca', KernelPCA(alpha=1.0,...))],
                 transformer_weights=None, verbose=False)

    跟管道一样,特征联合有一个精简版的构造器叫做:func:make_union ,该构造器不需要显式给每个组价起名字。

    正如 Pipeline, 单独的步骤可能用set_params替换 ,并设置为drop来跳过:

    >>> combined.set_params(kernel_pca='drop')
                 transformer_list=[('linear_pca', PCA(copy=True,...)),
                                   ('kernel_pca', 'drop')],
                 transformer_weights=None, verbose=False)

    示例 :

    4. 用于异构数据的列转换器

    警告:compose.ColumnTransformer 还在实验中,它的 API可能会变动的。

    许多数据集包含不同类型的特性,比如文本、浮点数和日期,每种类型的特征都需要单独的预处理或特征提取步骤。 通常,在应用scikit-learn方法之前,最容易的是对数据进行预处理,例如 pandas。 在将数据传递给scikit-learn之前处理数据可能会出现问题,原因如下:

    • 将来自测试数据的统计信息集成到预处理程序中,使得交叉验证分数不可靠(被称为数据泄露)。 例如,在尺度变换或计算缺失值的情况下。
    • 你可能想要在parameter search中包含预处理器参数。

    compose.ColumnTransformer对数据的不同列执行不同的变换,该管道不存在数据泄漏,并且可以参数化。ColumnTransformer可以处理数组、稀疏矩阵和pandas DataFrames

    对每一列,都会应用一个不同的变换, 比如preprocessing或某个特定的特征抽取方法:

    >>> import pandas as pd
    >>> X = pd.DataFrame(
    ...     {'city': ['London', 'London', 'Paris', 'Sallisaw'],
    ...      'title': ["His Last Bow", "How Watson Learned the Trick",
    ...                "A Moveable Feast", "The Grapes of Wrath"],
    ...      'expert_rating': [5, 3, 4, 5],
    ...      'user_rating': [4, 5, 4, 3]})

    对于这些数据,我们可能希望使用preprocessing.OneHotEncodercity列编码为一个分类变量,同时使用feature_extraction.text.CountVectorizer来处理title列。由于我们可能会把多个特征抽取器用在同一列上, 我们给每一个变换器取一个唯一的名字,比如“city_category”和“title_bow”。默认情况下,忽略其余的ranking列(remainder='drop'):

    >>> from sklearn.compose import ColumnTransformer
    >>> from sklearn.feature_extraction.text import CountVectorizer
    >>> column_trans = ColumnTransformer(
    ...     [('city_category', CountVectorizer(analyzer=lambda x: [x]), 'city'),
    ...      ('title_bow', CountVectorizer(), 'title')],
    ...     remainder='drop')
    >>> column_trans.fit(X)
    ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
    >>> column_trans.get_feature_names()
    ['city_category__London', 'city_category__Paris', 'city_category__Sallisaw',
    'title_bow__bow', 'title_bow__feast', 'title_bow__grapes', 'title_bow__his',
    'title_bow__how', 'title_bow__last', 'title_bow__learned', 'title_bow__moveable',
    'title_bow__of', 'title_bow__the', 'title_bow__trick', 'title_bow__watson',
    >>> column_trans.transform(X).toarray()
    array([[1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
           [1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0],
           [0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
           [0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1]]...)




    >>> column_trans = ColumnTransformer(
    ...     [('city_category', OneHotEncoder(dtype='int'),['city']),
    ...      ('title_bow', CountVectorizer(), 'title')],
    ...     remainder='passthrough')
    >>> column_trans.fit_transform(X)
    array([[1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 5, 4],
           [1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 3, 5],
           [0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 4, 4],
           [0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 5, 3]]...)


    >>> from sklearn.preprocessing import MinMaxScaler
    >>> column_trans = ColumnTransformer(
    ...     [('city_category', OneHotEncoder(), ['city']),
    ...      ('title_bow', CountVectorizer(), 'title')],
    ...     remainder=MinMaxScaler())
    >>> column_trans.fit_transform(X)[:, -2:]
    array([[1. , 0.5],
           [0. , 1. ],
           [0.5, 0.5],
           [1. , 0. ]])

    函数 make_column_transformer可用来更简单的创建类对象 ColumnTransformer 。 特别的,名字将会被自动给出。上面的示例等价于

    >>> from sklearn.compose import make_column_transformer
    >>> column_trans = make_column_transformer(
    ...     (OneHotEncoder(), ['city']),
    ...     (CountVectorizer(), 'title'),
    ...     remainder=MinMaxScaler())
    >>> column_trans
    ColumnTransformer(n_jobs=None, remainder=MinMaxScaler(copy=True, ...),
             transformers=[('onehotencoder', ...)


     class sklearn.pipeline.Pipeline(steps*memory=Noneverbose=False)

    Pipeline of transforms with a final estimator.

    Sequentially apply a list of transforms and a final estimator. Intermediate steps of the pipeline must be ‘transforms’, that is, they must implement fit and transform methods. The final estimator only needs to implement fit. The transformers in the pipeline can be cached using memory argument.

    The purpose of the pipeline is to assemble several steps that can be cross-validated together while setting different parameters. For this, it enables setting parameters of the various steps using their names and the parameter name separated by a ‘__’, as in the example below. A step’s estimator may be replaced entirely by setting the parameter with its name to another estimator, or a transformer removed by setting it to ‘passthrough’ or None.

    Read more in the User Guide.

    New in version 0.5.


    List of (name, transform) tuples (implementing fit/transform) that are chained, in the order in which they are chained, with the last object an estimator.

    memorystr or object with the joblib.Memory interface, default=None

    Used to cache the fitted transformers of the pipeline. By default, no caching is performed. If a string is given, it is the path to the caching directory. Enabling caching triggers a clone of the transformers before fitting. Therefore, the transformer instance given to the pipeline cannot be inspected directly. Use the attribute named_steps or steps to inspect estimators within the pipeline. Caching the transformers is advantageous when fitting is time consuming.

    verbosebool, default=False

    If True, the time elapsed while fitting each step will be printed as it is completed.


    Dictionary-like object, with the following attributes. Read-only attribute to access any step parameter by user given name. Keys are step names and values are steps parameters.

    >>> from sklearn.svm import SVC
    >>> from sklearn.preprocessing import StandardScaler
    >>> from sklearn.datasets import make_classification
    >>> from sklearn.model_selection import train_test_split
    >>> from sklearn.pipeline import Pipeline
    >>> X, y = make_classification(random_state=0)
    >>> X_train, X_test, y_train, y_test = train_test_split(X, y,
    ...                                                     random_state=0)
    >>> pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
    >>> # The pipeline can be used as any other estimator
    >>> # and avoids leaking the test set into the train set
    >>> pipe.fit(X_train, y_train)
    Pipeline(steps=[('scaler', StandardScaler()), ('svc', SVC())])
    >>> pipe.score(X_test, y_test)



    Apply transforms, and decision_function of the final estimator

    fit(X[, y])

    Fit the model

    fit_predict(X[, y])

    Applies fit_predict of last step in pipeline after transforms.

    fit_transform(X[, y])

    Fit the model and transform with the final estimator


    Get parameters for this estimator.

    predict(X, **predict_params)

    Apply transforms to the data, and predict with the final estimator


    Apply transforms, and predict_log_proba of the final estimator


    Apply transforms, and predict_proba of the final estimator

    score(X[, y, sample_weight])

    Apply transforms, and score with the final estimator


    Apply transforms, and score_samples of the final estimator.


    Set the parameters of this estimator.

    class sklearn.pipeline.FeatureUnion(transformer_list*n_jobs=Nonetransformer_weights=Noneverbose=False)

    Concatenates results of multiple transformer objects.

    This estimator applies a list of transformer objects in parallel to the input data, then concatenates the results. This is useful to combine several feature extraction mechanisms into a single transformer.

    Parameters of the transformers may be set using its name and the parameter name separated by a ‘__’. A transformer may be replaced entirely by setting the parameter with its name to another transformer, or removed by setting to ‘drop’.

    Read more in the User Guide.

    New in version 0.13.

    transformer_listlist of (string, transformer) tuples

    List of transformer objects to be applied to the data. The first half of each tuple is the name of the transformer. The tranformer can be ‘drop’ for it to be ignored.

    Changed in version 0.22: Deprecated None as a transformer in favor of ‘drop’.

    n_jobsint, default=None

    Number of jobs to run in parallel. None means 1 unless in a joblib.parallel_backend context. -1 means using all processors. See Glossary for more details.

    Changed in version v0.20: n_jobs default changed from 1 to None

    transformer_weightsdict, default=None

    Multiplicative weights for features per transformer. Keys are transformer names, values the weights. Raises ValueError if key not present in transformer_list.

    verbosebool, default=False

    If True, the time elapsed while fitting each transformer will be printed as it is completed.



    >>> from sklearn.pipeline import FeatureUnion
    >>> from sklearn.decomposition import PCA, TruncatedSVD
    >>> union = FeatureUnion([("pca", PCA(n_components=1)),
    ...                       ("svd", TruncatedSVD(n_components=2))])
    >>> X = [[0., 1., 3], [2., 2., 5]]
    >>> union.fit_transform(X)
    array([[ 1.5       ,  3.0...,  0.8...],
           [-1.5       ,  5.7..., -0.4...]])


    fit(X[, y])

    Fit all transformers using X.

    fit_transform(X[, y])

    Fit all transformers, transform the data and concatenate results.


    Get feature names from all transformers.


    Get parameters for this estimator.


    Set the parameters of this estimator.


    Transform X separately by each transformer, concatenate results.

