zoukankan      html  css  js  c++  java
  • Ray

    Ray

    https://ray.io/

    https://github.com/ray-project/ray

    (1)机器学习生态基于python语言,但是python具有全局解释器锁缺点,限制了对单台机器的多核的利用

    (2)同时查大规模模型的数据的出现,需要依赖集群来解决类似问题,引入了分布式机器学习的需求,

    但是不需要引入更加高层的应用(spark)的基础上,ray基于python生态,单程的简单的分布式计算框架。

    ray同时也包括了机器学习应用。

    Ray provides a simple, universal API for building distributed applications.

    Ray is packaged with the following libraries for accelerating machine learning workloads:

    • Tune: Scalable Hyperparameter Tuning
    • RLlib: Scalable Reinforcement Learning
    • RaySGD: Distributed Training Wrappers
    • Ray Serve: Scalable and Programmable Serving

    https://docs.ray.io/en/latest/index.html

    Ray provides a simple, universal API for building distributed applications.

    Ray accomplishes this mission by:

    1. Providing simple primitives for building and running distributed applications.

    2. Enabling end users to parallelize single machine code, with little to zero code changes.

    3. Including a large ecosystem of applications, libraries, and tools on top of the core Ray to enable complex applications.

    https://www.ctolib.com/topics-138457.html

    传统编程依赖于两个核心概念:函数和类。使用这些构建块就可以构建出无数的应用程序。

    但是,当我们将应用程序迁移到分布式环境时,这些概念通常会发生变化。

    一方面,OpenMPI、Python 多进程和 ZeroMQ 等工具提供了用于发送和接收消息的低级原语。这些工具非常强大,但它们提供了不同的抽象,因此要使用它们就必须从头开始重写单线程应用程序。

    另一方面,我们也有一些特定领域的工具,例如用于模型训练的 TensorFlow、用于数据处理且支持 SQL 的 Spark,以及用于流式处理的 Flink。这些工具提供了更高级别的抽象,如神经网络、数据集和流。但是,因为它们与用于串行编程的抽象不同,所以要使用它们也必须从头开始重写应用程序。

    取代 Python 多进程!伯克利开源分布式框架 Ray 用于分布式计算的工具

    Ray 占据了一个独特的中间地带。它并没有引入新的概念,而是采用了函数和类的概念,并将它们转换为分布式的任务和 actor。Ray 可以在不做出重大修改的情况下对串行应用程序进行并行化。

    来源(论文)

    https://arxiv.org/abs/1703.03924

    Real-Time Machine Learning: The Missing Pieces

    Machine learning applications are increasingly deployed not only to serve predictions using static models, but also as tightly-integrated components of feedback loops involving dynamic, real-time decision making. These applications pose a new set of requirements, none of which are difficult to achieve in isolation, but the combination of which creates a challenge for existing distributed execution frameworks: computation with millisecond latency at high throughput, adaptive construction of arbitrary task graphs, and execution of heterogeneous kernels over diverse sets of resources. We assert that a new distributed execution framework is needed for such ML applications and propose a candidate approach with a proof-of-concept architecture that achieves a 63x performance improvement over a state-of-the-art execution framework for a representative application.

     

    架构

    https://www.cnblogs.com/fanzhidongyzby/p/7901139.html

    论文给出的架构图里并未画出Driver的概念,因此我在其基础上做了一些修改和扩充。

    Ray的Driver节点和和Slave节点启动的组件几乎相同,不过却有以下区别:

    1. Driver上的工作进程DriverProcess一般只有一个,即用户启动的PythonShell。Slave可以根据需要创建多个WorkerProcess。
    2. Driver只能提交任务,却不能接收来自全局调度器分配的任务。Slave可以提交任务,也可以接收全局调度器分配的任务。
    3. Driver可以主动绕过全局调度器给Slave发送Actor调用任务(此处设计是否合理尚不讨论)。Slave只能接收全局调度器分配的计算任务。

    https://zhuanlan.zhihu.com/p/41875076

    其中的原理是将代码序列化到 redis 上存储为 object (object 可以理解为高效的不可变对象和数据共享),实现各种异步执行和数据交换,优先在本地节点完成任务,如果完不成再由global scheduler 调配到其它节点(更正补充)。

    DEMO CODE

    单机版本,分布式任务示例。

    remote声明函数为一个任务。

    remote调用会将任务分发到一个计算进程中,并执行。

    import ray
    ray.init()
    
    @ray.remote
    def f(x):
        return x * x
    
    futures = [f.remote(i) for i in range(4)]
    print(ray.get(futures))

    聚类学习工作流改造

    https://github.com/fanqingsong/machine_learning_workflow_on_ray

    from csv import reader
    from sklearn.cluster import KMeans
    import joblib
    import ray
    
    
    ray.init()
    
    
    # Load a CSV file
    def load_csv(filename):
        file = open(filename, "rt")
        lines = reader(file)
        dataset = list(lines)
        return dataset
    
    # Convert string column to float
    def str_column_to_float(dataset, column):
        for row in dataset:
            row[column] = float(row[column].strip())
    
    # Convert string column to integer
    def str_column_to_int(dataset, column):
        class_values = [row[column] for row in dataset]
        unique = set(class_values)
        lookup = dict()
        for i, value in enumerate(unique):
            lookup[value] = i
        for row in dataset:
            row[column] = lookup[row[column]]
        return lookup
    
    def getRawIrisData():
        # Load iris dataset
        filename = 'iris.csv'
        dataset = load_csv(filename)
        print('Loaded data file {0} with {1} rows and {2} columns'.format(filename, len(dataset), len(dataset[0])))
        print(dataset[0])
        # convert string columns to float
        for i in range(4):
            str_column_to_float(dataset, i)
        # convert class column to int
        lookup = str_column_to_int(dataset, 4)
        print(dataset[0])
        print(lookup)
    
        return dataset
    
    @ray.remote
    def getTrainData():
        dataset = getRawIrisData()
        trainData = [ [one[0], one[1], one[2], one[3]] for one in dataset ]
    
        return trainData
    
    @ray.remote
    def getNumClusters():
        return 3
    
    @ray.remote
    def train(numClusters, trainData):
        print("numClusters=%d" % numClusters)
    
        model = KMeans(n_clusters=numClusters)
    
        model.fit(trainData)
    
        # save model for prediction
        joblib.dump(model, 'model.kmeans')
    
        return trainData
    
    @ray.remote
    def predict(irisData):
        # test saved prediction
        model = joblib.load('model.kmeans')
    
        # cluster result
        labels = model.predict(irisData)
    
        print("cluster result")
        print(labels)
    
    
    def machine_learning_workflow_pipeline():
        trainData = getTrainData.remote()
        numClusters = getNumClusters.remote()
        trainData = train.remote(numClusters, trainData)
        result = predict.remote(trainData)
    
        result = ray.get(result)
        print("result=", result)
    
    
    
    if __name__ == "__main__":
        machine_learning_workflow_pipeline()

    Ray 破冰学习

    https://github.com/anyscale/academy/blob/master/ray-crash-course/00-Ray-Crash-Course-Overview.ipynb

  • 相关阅读:
    微信企业号开发:UserAgent
    用sinopia搭建内部npm服务
    python format用法详解
    python正则表达式re之compile函数解析
    Socket通信原理
    TCP半开连接与半闭连接
    使用npm安装一些包失败了的看过来(npm国内镜像介绍)
    UI优秀框架(库)
    关于 WebView 知识点的详解
    CommonJS规范
  • 原文地址:https://www.cnblogs.com/lightsong/p/13814930.html
Copyright © 2011-2022 走看看