zoukankan      html  css  js  c++  java
  • ALINK(三):PYALINK 以及ALINK 任务运行(本地模式与集群模式)

    一 前言

    目前不支持pyflink-shell.sh的任何模式。

    只支持jupyter notebook以及python shell以及jar包提交的方式.

    下面是来自官方钉钉群的回复:

     二 jupyter notebook 下

    1 本地模式

      使用方法创建本地运行环境:useLocalEnv(parallism, flinkHome=None, config=None)

      其中,

      参数 parallism 表示执行所使用的并行度;

      flinkHome 为 flink 的完整路径,默认使用 PyAlink 自带的 flink路径;

           config为Flink所接受的配置参数。运行后出现如下所示的输出,表示初始化运行环境成功:

           JVM listening on ***Python listening on ***

    2 远程集群模式

    ①启动hadoop集群

    ②启动flink集群

    $FLINK_HOME/bin/start-cluster.sh

    注意flink集群的端口号,默认是master:8081,这个要写入后面的代码中去的。

    由于不支持pyflink-shell.sh,所以只能打开jupyter notebook来做实验了

    过方法可以连接一个已经启动的 Flink 集群:useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)

    其中,参数

    • host 和 port 表示集群的地址;
    • parallelism 表示执行作业的并行度;
    • flinkHome 为 flink 的完整路径,默认使用 PyAlink 自带的 flink-1.9.0 路径;
    • localIp 指定实现 Flink DataStream 的打印预览功能时所需的本机IP地址,需要 Flink 集群能访问。默认为localhost
    • shipAlinkAlgoJar 是否将 PyAlink 提供的 Alink 算法包传输给远程集群,如果远程集群已经放置了 Alink 算法包,那么这里可以设为 False,减少数据传输。

    Flink-1.10 及以上版本对应的 pyalink 包,还支持类似 pyflink 脚本的远程集群运行方式。

    完整测试代码如下(下面的Desktop和8082要改成自己的):

    from pyalink.alink import *
    useRemoteEnv("Desktop", 8082, 2, flinkHome=None, localIp="localhost", config=None)
     
    URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/review_rating_train.csv"
    SCHEMA_STR = "review_id bigint, rating5 bigint, rating3 bigint, review_context string"
    LABEL_COL = "rating5"
    TEXT_COL = "review_context"
    VECTOR_COL = "vec"
    PRED_COL = "pred"
    PRED_DETAIL_COL = "predDetail"
    source = CsvSourceBatchOp() 
        .setFilePath(URL)
        .setSchemaStr(SCHEMA_STR)
        .setFieldDelimiter("_alink_")
        .setQuoteChar(None)
     
    ## Split data for train and test
    trainData = SplitBatchOp().setFraction(0.9).linkFrom(source)
    testData = trainData.getSideOutput(0)
     
    pipeline = (
        Pipeline()
        .add(
            Segment()
            .setSelectedCol(TEXT_COL)
        )
        .add(
            StopWordsRemover()
            .setSelectedCol(TEXT_COL)
        ).add(
            DocHashCountVectorizer()
            .setFeatureType("WORD_COUNT")
            .setSelectedCol(TEXT_COL)
            .setOutputCol(VECTOR_COL)
        )
    )
     
     
     
    naiveBayes = (
        NaiveBayesTextClassifier()
        .setVectorCol(VECTOR_COL)
        .setLabelCol(LABEL_COL)
        .setPredictionCol(PRED_COL)
        .setPredictionDetailCol(PRED_DETAIL_COL)
    )
    model = pipeline.add(naiveBayes).fit(trainData)
     
     
    predict = model.transform(testData)
    metrics = (
        EvalMultiClassBatchOp()
        .setLabelCol(LABEL_COL)
        .setPredictionDetailCol(PRED_DETAIL_COL)
        .linkFrom(predict)
        .collectMetrics()
    )
     
    print("ConfusionMatrix:", metrics.getConfusionMatrix())
    print("LabelArray:", metrics.getLabelArray())
    print("LogLoss:", metrics.getLogLoss())
    print("Accuracy:", metrics.getAccuracy())
    print("Kappa:", metrics.getKappa())
    print("MacroF1:", metrics.getMacroF1())
    print("Label 1 Accuracy:", metrics.getAccuracy("1"))
    print("Label 1 Kappa:", metrics.getKappa("1"))
    print("Label 1 Precision:", metrics.getPrecision("1"))

    实验结果如下:

    ConfusionMatrix: [[4944, 374, 190, 181, 223], [29, 1207, 128, 137, 82], [1, 2, 317, 22, 10], [0, 0, 0, 62, 0], [1, 0, 1, 1, 187]]
    LabelArray: ['5', '4', '3', '2', '1']
    LogLoss: 1.3876163466154336
    Accuracy: 0.8293616495863687
    Kappa: 0.6641967288935378
    MacroF1: 0.6239089988842421
    Label 1 Accuracy: 0.960735893320163
    Label 1 Kappa: 0.5242700620715995
    Label 1 Precision: 0.9842105263157894
    下面的这个来自web ui

     三 在集群上运行Alink算法

    1. 准备Flink集群
      wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
      tar -xf flink-1.10.0-bin-scala_2.11.tgz && cd flink-1.10.0
      ./bin/start-cluster.sh
    1. 准备Alink算法包
      git clone https://github.com/alibaba/Alink.git
      cd Alink && mvn -Dmaven.test.skip=true clean package shade:shade
    1. 运行Java示例
      ./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar
      # ./bin/flink run -p 2 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar
      # ./bin/flink run -p 2 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples
  • 相关阅读:
    【2021-01-26】保持学习,也是一种科学的健康保养方式
    【2021-01-25】正常市场化的企业该有的现象
    day 42 mysql 数据库(2)
    day 41 数据库初学习
    在排序数组中查找元素的第一个和最后一个位置
    飞机游戏
    两个数组的交集 II
    重复N次的元素
    单词规律
    存在重复元素2
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14865059.html
Copyright © 2011-2022 走看看