zoukankan      html  css  js  c++  java
  • An Online Machine Learning System Demo

    实时学习

    https://yuzhouwan.com/posts/4735/

    什么是机器学习

     Wikipedia 给出的定义是,一个计算机科学的子领域,由 模式识别 和 人工智能 中的计算机学习理论 演变而来
     探索 结构化的、可学习的规则引擎,如何用来对数据 进行训练 和 预测

    什么又是 Real-time 机器学习呢?

     一般性的 机器学习 的对象 是一堆 offline 的训练集,通过对这些数据的学习,来确立模型
     如果数据是快速变化的,这时就需要将 新数据 分配好权重,加入到目标训练集中;之后,将预测出来的结果,再次反馈到 数据模型中去

    Real-time 和 No Real-time 的本质区别在哪儿?

     因为 实时模型 是动态更新的,实现算法上,比 非实时的 ML 需要考虑,如何避免依赖 将 新数据 和 旧数据 整合在一块再计算所带来的性能问题
     更多时候,长期积累的数据,是很难再做到全量计算(比如,多项式贝叶斯 Multinomial naive bayes,用于处理 dataset 过大,而内存不足的情况)

    streaming-k-means

    https://stats.stackexchange.com/questions/222235/explain-streaming-k-means

     

    "Streaming kmeans" typically refers to what is also known as " kmeans". Online algorithms refer to computations that are performed iteratively, with data arriving during the computation in single observations or in batches, in contrast to "offline" algorithms, where all the data are available when the computation starts. Typically, some intermediate result is calculated based on initial data and then modified as new data arrive. For instance, exponential smoothing is a forecasting algorithm that is very naturally performed online.

    In the specific case of , we would first apply a standard algorithm to cluster an initial dataset. Then, the cluster centers would be updated as new data arrive. Such algorithms often keep and update clusterings with different numbers of clusters, because the optimal number of clusters may change over time as data arrives.

    Alternative names for algorithms include "sequential algorithms", and a number of other synonyms as per Wikipedia. For the specific case of online k-means, this SO question looks like a good place to start: Online k-means clustering

    SPARK API

    https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=streaminglinearregressionwithsgd#pyspark.mllib.clustering.StreamingKMeans

    class pyspark.mllib.clustering.StreamingKMeans(k=2, decayFactor=1.0, timeUnit='batches')[source]

    Provides methods to set k, decayFactor, timeUnit to configure the KMeans algorithm for fitting and predicting on incoming dstreams. More details on how the centroids are updated are provided under the docs of StreamingKMeansModel.

    Parameters
    • k – Number of clusters. (default: 2)

    • decayFactor – Forgetfulness of the previous centroids. (default: 1.0)

    • timeUnit – Can be “batches” or “points”. If points, then the decay factor is raised to the power of number of new points and if batches, then decay factor will be used as is. (default: “batches”)

    New in version 1.5.0.

    latestModel()[source]

    Return the latest model

    New in version 1.5.0.

    predictOn(dstream)[source]

    Make predictions on a dstream. Returns a transformed dstream object

    New in version 1.5.0.

    predictOnValues(dstream)[source]

    Make predictions on a keyed dstream. Returns a transformed dstream object.

    New in version 1.5.0.

    setDecayFactor(decayFactor)[source]

    Set decay factor.

    New in version 1.5.0.

    setHalfLife(halfLife, timeUnit)[source]

    Set number of batches after which the centroids of that particular batch has half the weightage.

    New in version 1.5.0.

    setInitialCenters(centers, weights)[source]

    Set initial centers. Should be set before calling trainOn.

    New in version 1.5.0.

    setK(k)[source]

    Set number of clusters.

    New in version 1.5.0.

    setRandomCenters(dim, weight, seed)[source]

    Set the initial centres to be random samples from a gaussian population with constant weights.

    New in version 1.5.0.

    trainOn(dstream)[source]

    Train the model on the incoming dstream.

    New in version 1.5.0.

    SPARK TEST CASE

    https://spark-test.github.io/pyspark-coverage-site/pyspark_mllib_tests_test_streaming_algorithms_py.html

     
    
        @unittest.skip("SPARK-10086: Flaky StreamingKMeans test in PySpark") 
    
        def test_trainOn_predictOn(self): 
    
            """Test that prediction happens on the updated model.""" 
    
            stkm = StreamingKMeans(decayFactor=0.0, k=2) 
    
            stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0]) 
    
     
    
            # Since decay factor is set to zero, once the first batch 
    
            # is passed the clusterCenters are updated to [-0.5, 0.7] 
    
            # which causes 0.2 & 0.3 to be classified as 1, even though the 
    
            # classification based in the initial model would have been 0 
    
            # proving that the model is updated. 
    
            batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]] 
    
            batches = [self.sc.parallelize(batch) for batch in batches] 
    
            input_stream = self.ssc.queueStream(batches) 
    
            predict_results = [] 
    
     
    
            def collect(rdd): 
    
                rdd_collect = rdd.collect() 
    
                if rdd_collect: 
    
                    predict_results.append(rdd_collect) 
    
     
    
            stkm.trainOn(input_stream) 
    
            predict_stream = stkm.predictOn(input_stream) 
    
            predict_stream.foreachRDD(collect) 
    
     
    
            self.ssc.start() 
    
     
    
            def condition(): 
    
                self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) 
    
                return True 
    
     
    
            eventually(condition, catch_assertions=True) 

    Demo

    https://github.com/fanqingsong/machine_learning_system_realtime

    实现效果, train页面训练过程中,model不停地被更新,predict页面使用最新的model去测试。

    Architecture

    Generally, train process is time consumming, and predict process is quick. So set train flow as async mode, and predict flow as sync mode. BTW, train process is implemented by stream, with streaming kmeans library: https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=streaminglinearregressionwithsgd#pyspark.mllib.clustering.StreamingKMeans

     

    train flow

    • user start model train from browser
    • django recieve the "start train" message
    • django schedule process manager celery process to launch a train process based on kafka.
    • browser transfer iris data one by one to django
    • django call sender process to send one iris data to train process by kafka
    • train process receive one iris data, training model with streaming kmeans library
    • after feeding all iris data, display the train result for all iris data

     

    predict flow

    • user input prediction features on browser, then click submit
    • browser send prediction features to django
    • django call prediction api with prediction features
    • django feedback the prediction result to browser
    +----------------+              +-------------+          +------------+       +---------------+
    |                |              |             |          |            |       |               |
    |     Browser    |              |   django    |          |   Celery   |       |   train proc  |
    |                |              |             |          |            |       |   ess         |
    +-------+--------+              +------+------+          +------+-----+       +-------+-------+
            |                              |                        |                     |
            |       request train          |                        |                     |
            +----------------------------->+                launch a train process with celery
            |                              +------------------------+-------------------->|
            |                              |                        |                     |
            |     send 1st iris data       |                        |                     |
            +----------------------------->+           send 1st iris data with celery     |
            |                              +------------------------+-------------------->+
            |                              |                        |                     |
            |                              |                        |                     |
            |                              |                        |                     |
            |                              |                        |                     |
            |     send last iris data      |                        |                     |
            +----------------------------->+           send last iris data with celery    |
            |                              +------------------------+-------------------->+
            |                              |                        |                     |
            |                              |                        |                     |
            |                              |                        |                     |
            |                              |                        |                     |
            |                              |                        |                     |
            |                              |                        |                     |
            |     send one iris features   |                        |                     |
            +----------------------------->+     predict cluster in celery                |
            |                              +------------------------>                     |
            |                              |                        |                     |
            |                              |                        |                     |
            |                              |                        |                     |
            +                              +                        +                     +
    
    

     

    technology stack

    categorynamecomment
    frontend reactjs frontend framework
    frontend redux state management
    frontend react-C3JS D3 based graph tool
    frontend react-bootstrap style component library
    frontend data-ui react data visualization tool
    backend django backend framework
    backend django-rest-knox authentication library
    backend djangorestframework restful framework
    backend spark.ml machine learning tool
    backend redis-server broker for celery
    backend celery worker for django
    backend kafka broker for spark.ml streaming kmeans train
  • 相关阅读:
    最近,我和隐私计算干上了。
    面试官疯了吗,问我为什么浮点数不精确?
    全网连夜修复的Log4j漏洞,如何做到一行代码都不改?
    Log4j未平,Logback 又起!再爆漏洞?
    聊聊云原生和微服务架构
    手把手一步一步教你使用Java开发一个大型街机动作闯关类游戏01游戏窗口
    手把手一步一步教你使用Java开发一个大型街机动作闯关类游戏03全屏显示游戏窗口
    手把手一步一步教你使用Java开发一个大型街机动作闯关类游戏02支持中文及显示FPS
    手把手一步一步教你使用Java开发一个大型街机动作闯关类游戏04图像资源的透明处理
    python及pygame雷霆战机游戏项目实战01 控制飞机
  • 原文地址:https://www.cnblogs.com/lightsong/p/13691313.html
Copyright © 2011-2022 走看看