一 前言
目前不支持pyflink-shell.sh的任何模式。
只支持jupyter notebook以及python shell以及jar包提交的方式.
下面是来自官方钉钉群的回复:
二 jupyter notebook 下
1 本地模式
使用方法创建本地运行环境:useLocalEnv(parallism, flinkHome=None, config=None)。
其中,
参数 parallism 表示执行所使用的并行度;
flinkHome 为 flink 的完整路径,默认使用 PyAlink 自带的 flink路径;
config为Flink所接受的配置参数。运行后出现如下所示的输出,表示初始化运行环境成功:
JVM listening on ***Python listening on ***
2 远程集群模式
①启动hadoop集群
②启动flink集群
$FLINK_HOME/bin/start-cluster.sh
注意flink集群的端口号,默认是master:8081,这个要写入后面的代码中去的。
由于不支持pyflink-shell.sh,所以只能打开jupyter notebook来做实验了
过方法可以连接一个已经启动的 Flink 集群:useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)
。
其中,参数
host
和port
表示集群的地址;parallelism
表示执行作业的并行度;flinkHome
为 flink 的完整路径,默认使用 PyAlink 自带的 flink-1.9.0 路径;localIp
指定实现Flink DataStream
的打印预览功能时所需的本机IP地址,需要 Flink 集群能访问。默认为localhost
。shipAlinkAlgoJar
是否将 PyAlink 提供的 Alink 算法包传输给远程集群,如果远程集群已经放置了 Alink 算法包,那么这里可以设为 False,减少数据传输。
Flink-1.10 及以上版本对应的 pyalink 包,还支持类似 pyflink 脚本的远程集群运行方式。
完整测试代码如下(下面的Desktop和8082要改成自己的):
from pyalink.alink import * useRemoteEnv("Desktop", 8082, 2, flinkHome=None, localIp="localhost", config=None) URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/review_rating_train.csv" SCHEMA_STR = "review_id bigint, rating5 bigint, rating3 bigint, review_context string" LABEL_COL = "rating5" TEXT_COL = "review_context" VECTOR_COL = "vec" PRED_COL = "pred" PRED_DETAIL_COL = "predDetail" source = CsvSourceBatchOp() .setFilePath(URL) .setSchemaStr(SCHEMA_STR) .setFieldDelimiter("_alink_") .setQuoteChar(None) ## Split data for train and test trainData = SplitBatchOp().setFraction(0.9).linkFrom(source) testData = trainData.getSideOutput(0) pipeline = ( Pipeline() .add( Segment() .setSelectedCol(TEXT_COL) ) .add( StopWordsRemover() .setSelectedCol(TEXT_COL) ).add( DocHashCountVectorizer() .setFeatureType("WORD_COUNT") .setSelectedCol(TEXT_COL) .setOutputCol(VECTOR_COL) ) ) naiveBayes = ( NaiveBayesTextClassifier() .setVectorCol(VECTOR_COL) .setLabelCol(LABEL_COL) .setPredictionCol(PRED_COL) .setPredictionDetailCol(PRED_DETAIL_COL) ) model = pipeline.add(naiveBayes).fit(trainData) predict = model.transform(testData) metrics = ( EvalMultiClassBatchOp() .setLabelCol(LABEL_COL) .setPredictionDetailCol(PRED_DETAIL_COL) .linkFrom(predict) .collectMetrics() ) print("ConfusionMatrix:", metrics.getConfusionMatrix()) print("LabelArray:", metrics.getLabelArray()) print("LogLoss:", metrics.getLogLoss()) print("Accuracy:", metrics.getAccuracy()) print("Kappa:", metrics.getKappa()) print("MacroF1:", metrics.getMacroF1()) print("Label 1 Accuracy:", metrics.getAccuracy("1")) print("Label 1 Kappa:", metrics.getKappa("1")) print("Label 1 Precision:", metrics.getPrecision("1"))
实验结果如下:
ConfusionMatrix: [[4944, 374, 190, 181, 223], [29, 1207, 128, 137, 82], [1, 2, 317, 22, 10], [0, 0, 0, 62, 0], [1, 0, 1, 1, 187]]
LabelArray: ['5', '4', '3', '2', '1']
LogLoss: 1.3876163466154336
Accuracy: 0.8293616495863687
Kappa: 0.6641967288935378
MacroF1: 0.6239089988842421
Label 1 Accuracy: 0.960735893320163
Label 1 Kappa: 0.5242700620715995
Label 1 Precision: 0.9842105263157894
下面的这个来自web ui
三 在集群上运行Alink算法
- 准备Flink集群
wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
tar -xf flink-1.10.0-bin-scala_2.11.tgz && cd flink-1.10.0
./bin/start-cluster.sh
- 准备Alink算法包
git clone https://github.com/alibaba/Alink.git
cd Alink && mvn -Dmaven.test.skip=true clean package shade:shade
- 运行Java示例
./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar
# ./bin/flink run -p 2 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar
# ./bin/flink run -p 2 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples