zoukankan      html  css  js  c++  java
  • dask

    DASK

    https://github.com/dask/dask

    https://dask.org/

    DASK提供并行计算和任务调度能力。

    集成和很多数据科学工具。

    堪称数据科学家的SPARK.

    Dask provides advanced parallelism for analytics,

    enabling performance at scale for the tools you love

    Integrates with existing projects

    Built with the broader community

    Dask is open source and freely available. It is developed in coordination with other community projects like Numpy, Pandas, and Scikit-Learn.

    Numpy

    Dask arrays scale Numpy workflows, enabling multi-dimensional data analysis in earth science, satellite imagery, genomics, biomedical applications, and machine learning algorithms.

    Pandas

    Dask dataframes scale Pandas workflows, enabling applications in time series, business intelligence, and general data munging on big data.

    Scikit-Learn

    Dask-ML scales machine learning APIs like Scikit-Learn and XGBoost to enable scalable training and prediction on large models and large datasets.

    和SPARK差别

    https://docs.dask.org/en/latest/spark.html

    spark是成熟的并且是包罗万象的。

    dask是轻量和容易集成现有代码的。提供的灵活的并行到已有方案。

    • Spark is mature and all-inclusive. If you want a single project that does everything and you’re already on Big Data hardware, then Spark is a safe bet, especially if your use cases are typical ETL + SQL and you’re already using Scala.
    • Dask is lighter weight and is easier to integrate into existing code and hardware. If your problems vary beyond typical ETL + SQL and you want to add flexible parallelism to existing solutions, then Dask may be a good fit, especially if you are already using Python and associated libraries like NumPy and Pandas.

    工作流接口

    https://docs.dask.org/en/latest/delayed.html

    工作流接口不是dask的提供的主要对象,其主要对象为数据结构,

    如果数据结构不能被使用,可使用底层提供的接口,定制并行算法。

    Sometimes problems don’t fit into one of the collections like dask.array or dask.dataframe.

    In these cases, users can parallelize custom algorithms using the simpler dask.delayed interface.

    This allows one to create graphs directly with a light annotation of normal python code:

    >>> x = dask.delayed(inc)(1)
    >>> y = dask.delayed(inc)(2)
    >>> z = dask.delayed(add)(x, y)
    >>> z.compute()
    5
    >>> z.visualize()

    DEMO

    https://github.com/fanqingsong/machine_learning_workflow_on_dask/blob/master/kmeans_with_workflow.py

    from csv import reader
    from sklearn.cluster import KMeans
    import joblib
    from dask import delayed
    import dask
    
    
    # Load a CSV file
    def load_csv(filename):
        file = open(filename, "rt")
        lines = reader(file)
        dataset = list(lines)
        return dataset
    
    # Convert string column to float
    def str_column_to_float(dataset, column):
        for row in dataset:
            row[column] = float(row[column].strip())
    
    # Convert string column to integer
    def str_column_to_int(dataset, column):
        class_values = [row[column] for row in dataset]
        unique = set(class_values)
        lookup = dict()
        for i, value in enumerate(unique):
            lookup[value] = i
        for row in dataset:
            row[column] = lookup[row[column]]
        return lookup
    
    def getRawIrisData():
        # Load iris dataset
        filename = 'iris.csv'
        dataset = load_csv(filename)
        print('Loaded data file {0} with {1} rows and {2} columns'.format(filename, len(dataset), len(dataset[0])))
        print(dataset[0])
        # convert string columns to float
        for i in range(4):
            str_column_to_float(dataset, i)
        # convert class column to int
        lookup = str_column_to_int(dataset, 4)
        print(dataset[0])
        print(lookup)
    
        return dataset
    
    @dask.delayed
    def getTrainData():
        dataset = getRawIrisData()
        trainData = [ [one[0], one[1], one[2], one[3]] for one in dataset ]
    
        return trainData
    
    @dask.delayed
    def getNumClusters():
        return 3
    
    @dask.delayed
    def train(numClusters, trainData):
        print("numClusters=%d" % numClusters)
    
        model = KMeans(n_clusters=numClusters)
    
        model.fit(trainData)
    
        # save model for prediction
        joblib.dump(model, 'model.kmeans')
    
        return trainData
    
    @dask.delayed
    def predict(irisData):
        # test saved prediction
        model = joblib.load('model.kmeans')
    
        # cluster result
        labels = model.predict(irisData)
    
        print("cluster result")
        print(labels)
    
    
    def machine_learning_workflow_pipeline():
        trainData = getTrainData()
        numClusters = getNumClusters()
        trainData = train(numClusters, trainData)
        total = predict(trainData)
    
        #total.visualize()
    
        total.compute()
    
    
    
    if __name__ == "__main__":
        machine_learning_workflow_pipeline()
  • 相关阅读:
    Android读取url图片保存及文件读取
    Adroid解析json
    接口设计的 11 种原则
    检查并创建目录mkdir
    python 替换windows换行符为unix格式
    python中 __name__及__main()__的使用
    python中的urlencode与urldecode
    CentOS Docker 安装
    CentOS基础命令大全
    ubuntu更换阿里源
  • 原文地址:https://www.cnblogs.com/lightsong/p/13826309.html
Copyright © 2011-2022 走看看