实时学习
https://yuzhouwan.com/posts/4735/
什么是机器学习?
Wikipedia 给出的定义是,一个计算机科学的子领域,由 模式识别 和 人工智能 中的计算机学习理论 演变而来
探索 结构化的、可学习的规则引擎,如何用来对数据 进行训练 和 预测什么又是 Real-time 机器学习呢?
一般性的 机器学习 的对象 是一堆 offline 的训练集,通过对这些数据的学习,来确立模型
如果数据是快速变化的,这时就需要将 新数据 分配好权重,加入到目标训练集中;之后,将预测出来的结果,再次反馈到 数据模型中去Real-time 和 No Real-time 的本质区别在哪儿?
因为 实时模型 是动态更新的,实现算法上,比 非实时的 ML 需要考虑,如何避免依赖 将 新数据 和 旧数据 整合在一块再计算所带来的性能问题
更多时候,长期积累的数据,是很难再做到全量计算(比如,多项式贝叶斯 Multinomial naive bayes,用于处理 dataset 过大,而内存不足的情况)
streaming-k-means
https://stats.stackexchange.com/questions/222235/explain-streaming-k-means
"Streaming kmeans" typically refers to what is also known as "online kmeans". Online algorithms refer to computations that are performed iteratively, with data arriving during the computation in single observations or in batches, in contrast to "offline" algorithms, where all the data are available when the computation starts. Typically, some intermediate result is calculated based on initial data and then modified as new data arrive. For instance, exponential smoothing is a forecasting algorithm that is very naturally performed online.
In the specific case of k-means, we would first apply a standard k-means algorithm to cluster an initial dataset. Then, the cluster centers would be updated as new data arrive. Such algorithms often keep and update clusterings with different numbers of clusters, because the optimal number of clusters may change over time as data arrives.
Alternative names for online algorithms include "sequential algorithms", and a number of other synonyms as per Wikipedia. For the specific case of online k-means, this SO question looks like a good place to start: Online k-means clustering
SPARK API
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=streaminglinearregressionwithsgd#pyspark.mllib.clustering.StreamingKMeans
- class
pyspark.mllib.clustering.
StreamingKMeans
(k=2, decayFactor=1.0, timeUnit='batches')[source]Provides methods to set k, decayFactor, timeUnit to configure the KMeans algorithm for fitting and predicting on incoming dstreams. More details on how the centroids are updated are provided under the docs of StreamingKMeansModel.
- Parameters
k – Number of clusters. (default: 2)
decayFactor – Forgetfulness of the previous centroids. (default: 1.0)
timeUnit – Can be “batches” or “points”. If points, then the decay factor is raised to the power of number of new points and if batches, then decay factor will be used as is. (default: “batches”)
New in version 1.5.0.
latestModel
()[source]Return the latest model
New in version 1.5.0.
predictOn
(dstream)[source]Make predictions on a dstream. Returns a transformed dstream object
New in version 1.5.0.
predictOnValues
(dstream)[source]Make predictions on a keyed dstream. Returns a transformed dstream object.
New in version 1.5.0.
setDecayFactor
(decayFactor)[source]Set decay factor.
New in version 1.5.0.
setHalfLife
(halfLife, timeUnit)[source]Set number of batches after which the centroids of that particular batch has half the weightage.
New in version 1.5.0.
setInitialCenters
(centers, weights)[source]Set initial centers. Should be set before calling trainOn.
New in version 1.5.0.
setK
(k)[source]Set number of clusters.
New in version 1.5.0.
setRandomCenters
(dim, weight, seed)[source]Set the initial centres to be random samples from a gaussian population with constant weights.
New in version 1.5.0.
trainOn
(dstream)[source]Train the model on the incoming dstream.
New in version 1.5.0.
SPARK TEST CASE
https://spark-test.github.io/pyspark-coverage-site/pyspark_mllib_tests_test_streaming_algorithms_py.html
@unittest.skip("SPARK-10086: Flaky StreamingKMeans test in PySpark") def test_trainOn_predictOn(self): """Test that prediction happens on the updated model.""" stkm = StreamingKMeans(decayFactor=0.0, k=2) stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0]) # Since decay factor is set to zero, once the first batch # is passed the clusterCenters are updated to [-0.5, 0.7] # which causes 0.2 & 0.3 to be classified as 1, even though the # classification based in the initial model would have been 0 # proving that the model is updated. batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]] batches = [self.sc.parallelize(batch) for batch in batches] input_stream = self.ssc.queueStream(batches) predict_results = [] def collect(rdd): rdd_collect = rdd.collect() if rdd_collect: predict_results.append(rdd_collect) stkm.trainOn(input_stream) predict_stream = stkm.predictOn(input_stream) predict_stream.foreachRDD(collect) self.ssc.start() def condition(): self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) return True eventually(condition, catch_assertions=True)
Demo
https://github.com/fanqingsong/machine_learning_system_realtime
实现效果, train页面训练过程中,model不停地被更新,predict页面使用最新的model去测试。
Architecture
Generally, train process is time consumming, and predict process is quick. So set train flow as async mode, and predict flow as sync mode. BTW, train process is implemented by stream, with streaming kmeans library: https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=streaminglinearregressionwithsgd#pyspark.mllib.clustering.StreamingKMeans
train flow
- user start model train from browser
- django recieve the "start train" message
- django schedule process manager celery process to launch a train process based on kafka.
- browser transfer iris data one by one to django
- django call sender process to send one iris data to train process by kafka
- train process receive one iris data, training model with streaming kmeans library
- after feeding all iris data, display the train result for all iris data
predict flow
- user input prediction features on browser, then click submit
- browser send prediction features to django
- django call prediction api with prediction features
- django feedback the prediction result to browser
+----------------+ +-------------+ +------------+ +---------------+ | | | | | | | | | Browser | | django | | Celery | | train proc | | | | | | | | ess | +-------+--------+ +------+------+ +------+-----+ +-------+-------+ | | | | | request train | | | +----------------------------->+ launch a train process with celery | +------------------------+-------------------->| | | | | | send 1st iris data | | | +----------------------------->+ send 1st iris data with celery | | +------------------------+-------------------->+ | | | | | | | | | | | | | | | | | send last iris data | | | +----------------------------->+ send last iris data with celery | | +------------------------+-------------------->+ | | | | | | | | | | | | | | | | | | | | | | | | | send one iris features | | | +----------------------------->+ predict cluster in celery | | +------------------------> | | | | | | | | | | | | | + + + +
technology stack
category name comment frontend reactjs frontend framework frontend redux state management frontend react-C3JS D3 based graph tool frontend react-bootstrap style component library frontend data-ui react data visualization tool backend django backend framework backend django-rest-knox authentication library backend djangorestframework restful framework backend spark.ml machine learning tool backend redis-server broker for celery backend celery worker for django backend kafka broker for spark.ml streaming kmeans train