zoukankan      html  css  js  c++  java
  • A machine learning system on spark

    简介

    https://github.com/fanqingsong/machine_learning_system_on_spark

    a simple machine learning system demo, for ML study. Based on machine_learning_system repo, add new process for ml model service with celery and spark.

    技术栈

    categorynamecomment
    frontend reactjs frontend framework
    frontend redux state management
    frontend react-C3JS D3 based graph tool
    frontend react-bootstrap style component library
    frontend data-ui react data visualization tool
    backend django backend framework
    backend django-rest-knox authentication library
    backend djangorestframework restful framework
    backend spark.ml machine learning tool

    架构

    Generally, train process is time consumming, and predict process is quick. So set train flow as async mode, and predict flow as sync mode.

     

    train flow

    • user start model train from browser
    • django recieve the "start train" message
    • django schedule spark.ml celery process to train model and return to user immediately.
    • browser query train status to django
    • django query train status from spark.ml celery process.
    • django feedback the train over status to browser

     

    predict flow

    • user input prediction features on browser, then click submit
    • browser send prediction features to django
    • django call prediction api with prediction features
    • django feedback the prediction result to browser
    +---------+            +-------------+            +------------+
    |         | start train|             |            |            |
    |         +------------>             |   start    |            |
    |         |            |             +---train---->            |
    |         |  query train             |            |            |
    |         +--status---->             |            |            |
    |         |            |             +----query -->            |
    |         <---train ---+             |    train   |            |
    |         |   over     |             |    status  |            |
    |  browser|            |   django    |            |  spark.ml  |
    |         |            |             |            |  on celery |
    |         |  predict   |             |            |            |
    |         +------------>             |    predict |            |
    |         |            |             +----------->+            |
    |         <--predict---+             |            |            |
    |         |  result    |             |            |            |
    |         |            |             |            |            |
    |         |            |             |            |            |
    +---------+            +-------------+            +------------+
    

    Django和Celery集成

    https://www.cnblogs.com/wdliu/p/9530219.html

    https://www.pythonf.cn/read/7143

    Celery

    https://docs.celeryproject.org/en/stable/getting-started/index.html

    Task queues are used as a mechanism to distribute work across threads or machines.

    A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.

    https://www.celerycn.io/

    任务队列一般用于线程或计算机之间分配工作的一种机制。

    任务队列的输入是一个称为任务的工作单元,有专门的工作进行不断的监视任务队列,进行执行新的任务工作。

    Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。

    Celery 可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。

    Celery 是用 Python 编写的,但协议可以用任何语言实现。除了 Python 语言实现之外,还有Node.js的node-celery和php的celery-php

    可以通过暴露 HTTP 的方式进行,任务交互以及其它语言的集成开发。

    https://docs.celeryproject.org/en/stable/reference/index.html

    celery — Distributed processing


    This module is the main entry-point for the Celery API. It includes commonly needed things for calling tasks, and creating Celery applications.

    Celery

    Celery application instance

    group

    group tasks together

    chain

    chain tasks together

    chord

    chords enable callbacks for groups

    signature()

    create a new task signature

    Signature

    object describing a task invocation

    current_app

    proxy to the current application instance

    current_task

    proxy to the currently executing task

    Spark.ml

    https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#

    http://dblab.xmu.edu.cn/blog/1779-2/

    KMeans 是一个迭代求解的聚类算法,其属于 划分(Partitioning) 型的聚类方法,即首先创建K个划分,然后迭代地将样本从一个划分转移到另一个划分来改善最终聚类的质量。

    ML包下的KMeans方法位于org.apache.spark.ml.clustering包下,其过程大致如下:

    1.根据给定的k值,选取k个样本点作为初始划分中心;
    2.计算所有样本点到每一个划分中心的距离,并将所有样本点划分到距离最近的划分中心;
    3.计算每个划分中样本点的平均值,将其作为新的中心;

    循环进行2~3步直至达到最大迭代次数,或划分中心的变化小于某一预定义阈值
    显然,初始划分中心的选取在很大程度上决定了最终聚类的质量,和MLlib包一样,ML包内置的KMeans类也提供了名为 KMeans|| 的初始划分中心选择方法,它是著名的 KMeans++ 方法的并行化版本,其思想是令初始聚类中心尽可能的互相远离,具体实现细节可以参见斯坦福大学的B Bahmani在PVLDB上的论文Scalable K-Means++,这里不再赘述。

    spark sql

    https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#

    https://www.cnblogs.com/Finley/p/6390528.html

    DataFrame提供了一些常用操作的实现, 可以使用这些接口查看或修改DataFrame:

    • df.collect(): 以Row列表的方式显示df中的所有数据
    • df.show(): 以可视化表格的方式打印df中的所有数据
    • df.count(): 显示df中数据的行数
    • df.describe() 返回一个新的DataFrame对象包含对df中数值列的统计数据
    • df.cache(): 以MEMORY_ONLY_SER方式进行持久化
    • df.persist(level): 以指定的方式进行持久化
    • df.unpersist(): 删除缓存

    DataFrame的一些属性可以用于查看它的结构信息:

    • df.columns: 返回各列名称的列表

    • df.schema: 以StructType对象的形式返回df的表结构

    • df.dtypes: 以列表的形式返回每列的名称和类型。
      [('name', 'string'), ('id', 'int')]

    • df.rdd 将DataFrame对象转换为rdd

    DataFrame支持使用Map和Reduce操作:

    • df.map(func): 等同于df.rdd.map(func)

    • df.reduce(func): 等同于 df.rdd.reduce(func)

  • 相关阅读:
    工作总结
    JSON数据使用
    DataTable知识
    树形结构菜单
    区域树前后台
    跨域总结
    工作一年感想
    项目整体架构分析
    springboot 和 mongdb连接问题 Exception in thread "main" com.mongodb.MongoSecurityException:
    go函数、方法、接口笔记
  • 原文地址:https://www.cnblogs.com/lightsong/p/13541677.html
Copyright © 2011-2022 走看看