DASK
https://github.com/dask/dask
https://dask.org/
DASK提供并行计算和任务调度能力。
集成和很多数据科学工具。
堪称数据科学家的SPARK.
Dask provides advanced parallelism for analytics,
enabling performance at scale for the tools you love
Integrates with existing projects
Built with the broader community
Dask is open source and freely available. It is developed in coordination with other community projects like Numpy, Pandas, and Scikit-Learn.
Numpy
Dask arrays scale Numpy workflows, enabling multi-dimensional data analysis in earth science, satellite imagery, genomics, biomedical applications, and machine learning algorithms.
Pandas
Dask dataframes scale Pandas workflows, enabling applications in time series, business intelligence, and general data munging on big data.
Scikit-Learn
Dask-ML scales machine learning APIs like Scikit-Learn and XGBoost to enable scalable training and prediction on large models and large datasets.
和SPARK差别
https://docs.dask.org/en/latest/spark.html
spark是成熟的并且是包罗万象的。
dask是轻量和容易集成现有代码的。提供的灵活的并行到已有方案。
- Spark is mature and all-inclusive. If you want a single project that does everything and you’re already on Big Data hardware, then Spark is a safe bet, especially if your use cases are typical ETL + SQL and you’re already using Scala.
- Dask is lighter weight and is easier to integrate into existing code and hardware. If your problems vary beyond typical ETL + SQL and you want to add flexible parallelism to existing solutions, then Dask may be a good fit, especially if you are already using Python and associated libraries like NumPy and Pandas.
工作流接口
https://docs.dask.org/en/latest/delayed.html
工作流接口不是dask的提供的主要对象,其主要对象为数据结构,
如果数据结构不能被使用,可使用底层提供的接口,定制并行算法。
Sometimes problems don’t fit into one of the collections like
dask.array
ordask.dataframe
.In these cases, users can parallelize custom algorithms using the simpler
dask.delayed
interface.This allows one to create graphs directly with a light annotation of normal python code:
>>> x = dask.delayed(inc)(1) >>> y = dask.delayed(inc)(2) >>> z = dask.delayed(add)(x, y) >>> z.compute() 5 >>> z.visualize()
DEMO
https://github.com/fanqingsong/machine_learning_workflow_on_dask/blob/master/kmeans_with_workflow.py
from csv import reader from sklearn.cluster import KMeans import joblib from dask import delayed import dask # Load a CSV file def load_csv(filename): file = open(filename, "rt") lines = reader(file) dataset = list(lines) return dataset # Convert string column to float def str_column_to_float(dataset, column): for row in dataset: row[column] = float(row[column].strip()) # Convert string column to integer def str_column_to_int(dataset, column): class_values = [row[column] for row in dataset] unique = set(class_values) lookup = dict() for i, value in enumerate(unique): lookup[value] = i for row in dataset: row[column] = lookup[row[column]] return lookup def getRawIrisData(): # Load iris dataset filename = 'iris.csv' dataset = load_csv(filename) print('Loaded data file {0} with {1} rows and {2} columns'.format(filename, len(dataset), len(dataset[0]))) print(dataset[0]) # convert string columns to float for i in range(4): str_column_to_float(dataset, i) # convert class column to int lookup = str_column_to_int(dataset, 4) print(dataset[0]) print(lookup) return dataset @dask.delayed def getTrainData(): dataset = getRawIrisData() trainData = [ [one[0], one[1], one[2], one[3]] for one in dataset ] return trainData @dask.delayed def getNumClusters(): return 3 @dask.delayed def train(numClusters, trainData): print("numClusters=%d" % numClusters) model = KMeans(n_clusters=numClusters) model.fit(trainData) # save model for prediction joblib.dump(model, 'model.kmeans') return trainData @dask.delayed def predict(irisData): # test saved prediction model = joblib.load('model.kmeans') # cluster result labels = model.predict(irisData) print("cluster result") print(labels) def machine_learning_workflow_pipeline(): trainData = getTrainData() numClusters = getNumClusters() trainData = train(numClusters, trainData) total = predict(trainData) #total.visualize() total.compute() if __name__ == "__main__": machine_learning_workflow_pipeline()