概要
ODPS GRAPH是一套面向迭代的图计算处理框架。图计算作业使用图进行建模,图由点(Vertex)和边(Edge)组成,点和边包含权值(Value),ODPS GRAPH支持下述图编辑操作:
- 修改点或边的权值;
- 增加/删除点;
- 增加/删除边;
备注:
- 编辑点和边时,点与边的关系需要用户维护。
通过迭代对图进行编辑、演化,最终求解出结果,典型应用:PageRank,单源最短距离算法 ,K-均值聚类算法 等等。用户可以使用 ODPS GRAPH 提供的接口Java SDK编写图计算程序。
Graph数据结构
ODPS GRAPH能够处理的图必须是是一个由点(Vertex)和边(Edge)组成的有向图。由于ODPS仅提供二维表的存储结构,因此需要用户自行将图数据分解为二维表格式存储在ODPS中,在进行图计算分析时,使用自定义的GraphLoader将二维表数据转换为ODPS Graph引擎中的点和边。至于如何将图数据分解为二维表格式,用户可以根据各自的业务场景做决定。在 示例程序 中,我们给出的示例分别使用不同的表格式来表达图的数据结构,仅供大家参考。
点的结构可以简单表示为 < ID, Value, Halted, Edges >,分别表示点标识符(ID),权值(Value),状态(Halted, 表示是否要停止迭代),出边集合(Edges,以该点为起始点的所有边列表)。边的结构可以简单表示为,分别表示目标点(DestVertexID)和权值(Value)。[图片上传失败...(image-c43eb7-1555648148484)]
例如,上图由下面的点组成:
Vertex | |
---|---|
v0 | <0, 0, false, [ <1, 5 >, <2, 10 > ] > |
v1 | <1, 5, false, [ <2, 3>, <3, 2>, <5, 9>]> |
v2 | <2, 8, false, [<1, 2>, <5, 1 >]> |
v3 | <3, Long.MAX_VALUE, false, [<0, 7>, <5, 6>]> |
v5 | <5, Long.MAX_VALUE, false, [<3, 4 > ]> |
Graph 程序逻辑
1. 加载图:
图加载:框架调用用户自定义的GraphLoader将输入表的记录解析为点或边;分布式化:框架调用用户自定义的Partitioner对点进行分片(默认分片逻辑:点ID哈希值然后对Worker数取模),分配到相应的Worker;[图片上传失败...(image-982ae4-1555648148484)]
例如,上图假设Worker数是2,那么v0, v2会被分配到Worker0,因为ID对2取模结果为0,而v1, v3, v5将被分配到Worker1,ID对2取模结果为1;
2. 迭代计算:
-
一次迭代为一个”超步”(SuperStep),遍历所有非结束状态(Halted值为false)的点或者收到消息的点(处于结束状态的点收到信息会被自动唤醒),并调用其compute(ComputeContext context, Iterable messages)方法;
-
在用户实现的compute(ComputeContext context, Iterable messages)方法中:
- 处理上一个超步发给当前点的消息(Messages);
- 根据需要对图进行编辑:1). 修改点/边的取值;2). 发送消息给某些点;3). 增加/删除点或边;
- 通过Aggregator汇总信息到全局信息;
- 设置当前点状态,结束或非结束状态;
- 迭代进行过程中,框架会将消息以异步的方式发送到对应Worker并在下一个超步进行处理,用户无需关心;
3. 迭代终止(满足以下任意一条):
- 所有点处于结束状态(Halted值为true)且没有新消息产生;
- 达到最大迭代次数;
- 某个Aggregator的terminate方法返回true;
伪代码描述如下:
Aggregator
Aggregator是ODPS-GRAPH作业中常用的feature之一,特别是解决机器学习问题时。ODPS-GRAPH中Aggregator用于汇总并处理全局信息。本文将详细介绍的Aggregator的执行机制、相关API,并以Kmeans Clustering为例子说明Aggregator的具体用法。
Aggregator机制
Aggregator的逻辑分两部分,一部分在所有Worker上执行,即分布式执行,另一部分只在AggregatorOwner所在Worker上执行,即单点。其中在所有Worker上执行的操作包括创建初始值及局部聚合,然后将局部聚合结果发送给AggregatorOwner所在Worker上。AggregatorOwner所在Worker上聚合普通Worker发送过来的局部聚合对象,得到全局聚合结果,然后判断迭代是否结束。全局聚合的结果会在下一轮超步分发给所有Worker,供下一轮迭代使用。 如下图所示 :[图片上传失败...(image-3b5f84-1555648148484)]
Aggregator的API
Aggregator共提供了五个API供用户实现。下面逐个介绍5个API的调用时机及常规用途。
-
createStartupValue(context)
该API在所有Worker上执行一次,调用时机是所有超步开始之前,通常用以初始化AggregatorValue。在第0轮超步中,调用WorkerContext.getLastAggregatedValue() 或ComputeContext.getLastAggregatedValue()可以获取该API初始化的AggregatorValue对象。 -
createInitialValue(context)
该API在所有Worker上每轮超步开始时调用一次,用以初始化本轮迭代所用的AggregatorValue。通常操作是通过WorkerContext.getLastAggregatedValue() 得到上一轮迭代的结果,然后执行部分初始化操作。 -
aggregate(value, item)
该API同样在所有Worker上执行,与上述API不同的是,该API由用户显示调用ComputeContext#aggregate(item)来触发,而上述两个API,则由框架自动调用。该API用以执行局部聚合操作,其中第一个参数value是本Worker在该轮超步已经聚合的结果(初始值是createInitialValue返回的对象),第二个参数是用户代码调用ComputeContext#aggregate(item)传入的参数。该API中通常用item来更新value实现聚合。所有aggregate执行完后,得到的value就是该Worker的局部聚合结果,然后由框架发送给AggregatorOwner所在的Worker。 -
merge(value, partial)
该API执行于AggregatorOwner所在Worker,用以合并各Worker局部聚合的结果,达到全局聚合对象。与aggregate类似,value是已经聚合的结果,而partial待聚合的对象,同样用partial更新value。
假定有3个worker,分别是w0、w1、w2,其局部聚合结果是p0、p1、p2。假定发送到AggregatorOwner所在Worker的顺序为p1、p0、p2。那么merge执行次序为,首先执行merge(p1, p0),这样p1和p0就聚合为p1',然后执行merge(p1', p2),p1'和p2聚合为p1'',而p1''即为本轮超步全局聚合的结果。
从上述示例可以看出,当只有一个worker时,不需要执行merge方法,也就是说merge()不会被调用。 -
terminate(context, value)
当AggregatorOwner所在Worker执行完merge()后,框架会调用terminate(context, value)执行最后的处理。其中第二个参数value,即为merge()最后得到全局聚合,在该方法中可以对全局聚合继续修改。执行完terminate()后,框架会将全局聚合对象分发给所有Worker,供下一轮超步使用。
terminate()方法的一个特殊之处在于,如果返回true,则整个作业就结束迭代,否则继续执行。在机器学习场景中,通常判断收敛后返回true以结束作业。
Kmeans Clustering示例
下面以典型的KmeansClustering作为示例,来看下Aggregator具体用法。附件有完整代码,这里我们逐个部分解析代码。
- GraphLoader部分
GraphLoader部分用以加载输入表,并转换为图的点或边。这里我们输入表的每行数据为一个样本,一个样本构造一个点,并用Vertex的value来存放样本。
我们首先定义一个Writable类KmeansValue作为Vertex的value类型:java
- static class KmeansValue implements Writable {
DenseVector sample;
public KmeansValue() {
}
public KmeansValue(DenseVector v) {
this.sample = v;
}
@Override
public void write(DataOutput out) throws IOException {
wirteForDenseVector(out, sample);
}
@Override
public void readFields(DataInput in) throws IOException {
sample = readFieldsForDenseVector(in);
}
}
KmeansReader中,每读入一行数据(一个Record)创建一个点,这里用recordNum作为点的ID,将record内容转换成DenseVector对象并封装进VertexValue中。
- Vertex部分
自定义的KmeansVertex代码如下。逻辑非常简单,每轮迭代要做的事情就是将自己维护的样本执行局部聚合。具体逻辑参见下面Aggregator的实现。java
- static class KmeansVertex extends
Vertex {
@Override
public void compute(
context.aggregate(getValue());
}
}
KmeansAggrValue中维护了三个对象,其中centroids是当前的K个中心点,假定样本是m维的话,centroids就是一个K*m的矩阵。sums是和centroids大小一样的矩阵,每个元素记录了到特定中心点最近的样本特定维之和,例如sums(i,j)是到第i个中心点最近的样本的第j维度之和。
counts是个K维的向量,记录到每个中心点距离最短的样本个数。sums和counts一起用以计算新的中心点,也是要聚合的主要内容。 接下来是自定义的Aggregator实现类KmeansAggregator,我们按照上述API的顺序逐个看其实现。
首先是createStartupValue()。
String[] ss = lines[i].split(",");
for (int j = 0; j < ss.length; j++) {
av.centroids.set(i, j, Double.valueOf(ss[j]));
}
我们在该方法中初始化一个KmeansAggrValue对象,然后从资源文件centers中读取初始中心点,并赋值给centroids。而sums和counts初始化为0。<br />接来下是createInitialValue()的实现:<br />```java
@Override
public void aggregate(KmeansAggrValue value, Object item)
throws IOException {
DenseVector sample = ((KmeansValue)item).sample;
// find the nearest centroid
int min = findNearestCentroid(value.centroids, sample);
// update sum and count
for (int i = 0; i < sample.size(); i ++) {
value.sums.add(min, i, sample.get(i));
}
value.counts.add(min, 1.0d);
}
throws IOException {
merge的实现逻辑很简单,就是把各个worker聚合出的sums和counts相加即可。<br />最后是terminate()的实现:<br />```java
@Override
public boolean terminate(WorkerContext context, KmeansAggrValue value)
throws IOException {
// Calculate the new means to be the centroids (original sums)
DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids);
// print old centroids and new centroids for debugging
System.out.println("
superstep: " + context.getSuperstep() +
"
old centriod:
" + value.centroids + " new centriod:
" + newCentriods);
boolean converged = isConverged(newCentriods, value.centroids, 0.05d);
System.out.println("superstep: " + context.getSuperstep() + "/"
+ (context.getMaxIteration() - 1) + " converged: " + converged);
if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
// converged or reach max iteration, output centriods
for (int i = 0; i < newCentriods.numRows(); i++) {
Writable[] centriod = new Writable[newCentriods.numColumns()];
for (int j = 0; j < newCentriods.numColumns(); j++) {
centriod[j] = new DoubleWritable(newCentriods.get(i, j));
}
context.write(centriod);
}
// true means to terminate iteration
return true;
}
// update centriods
value.centroids.set(newCentriods);
// false means to continue iteration
return false;
}
- (System.currentTimeMillis() - start) / 1000.0 + " seconds");
public static void main(String[] args) throws IOException {
if (args.length < 2)
GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// 将作业中使用的资源添加到cache resource,对应于jar命令中 -resources 和 -libjars 中指定的资源
job.addCacheResource("mapreduce-examples.jar");
// 将使用的jar及其他文件添加到class cache resource,对应于jar命令中 -libjars 中指定的资源
job.addCacheResourceToClassPath("mapreduce-examples.jar");
// 设置console中,odps_config.ini对应的配置项,使用时替换为自己的配置
OdpsConf.getInstance().setProjName("project_name");
OdpsConf.getInstance().setEndpoint("end_point");
OdpsConf.getInstance().setAccessId("access_id");
OdpsConf.getInstance().setAccessKey("access_key");
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
}
GraphJob job = new GraphJob();
job.addInput(TableInfo.builder().tableName(“tblname”).build()); //表作为输入
job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build()); //分区作为输入
//只读取输入表的 col2 和 col0 列,在 GraphLoader 的 load 方法中,record.get(0) 得到的是col2列,顺序一致
job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build(), new String[]{"col2", "col0"});
备注:
关于作业输入定义,更多的信息参见GraphJob的addInput相关方法说明,框架读取输入表的记录传给用户自定义的GraphLoader载入图数据;
限制: 暂时不支持分区过滤条件。更多应用限制请参考 应用限制;
定义作业输出,支持多路输出,通过label标识每路输出:
GraphJob job = new GraphJob();
//输出表为分区表时需要给到最末一级分区
job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build());
// 下面的参数 true 表示覆盖tableinfo指定的分区,即INSERT OVERWRITE语义,false表示INSERT INTO语义
job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(), true);
void addCacheResources(String resourceNames)
void addCacheResourcesToClassPath(String resourceNames)
public byte[] readCacheFile(String resourceName) throws IOException;
public Iterable readCacheArchive(String resourceName) throws IOException;
public Iterable readCacheArchive(String resourceName, String relativePath)throws IOException;
public Iterable readResourceTable(String resourceName);
public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException;
public Iterable readCacheArchiveAsStream(String resourceName) throws IOException;
public Iterable readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;
com.aliyun.odps
odps-sdk-graph
0.20.7
sources
1,"2:2,3:1,4:4"
2,"1:2,3:2,4:1"
3,"1:1,2:2,5:1"
4,"1:4,2:1,5:1"
5,"3:1,4:1"
Counters: 3
graph task finish
import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;
public class SSSP {
public static final String START_VERTEX = "sssp.start.vertex.id";
public static class SSSPVertex extends
}
public static class MinLongCombiner extends
}
public static class SSSPVertexReader extends
}
public static void main(String[] args) throws IOException {
}
}
import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
public class PageRank {
private final static Logger LOG = Logger.getLogger(PageRank.class);
public static class PageRankVertex extends
}
public static class PageRankVertexReader extends
}
private static void printUsage() {
}
public static void main(String[] args) throws IOException {
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Aggregator;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Tuple;
import com.aliyun.odps.io.Writable;
public class Kmeans {
private final static Logger LOG = Logger.getLogger(Kmeans.class);
public static class KmeansVertex extends
}
public static class KmeansVertexReader extends
}
public static class KmeansAggrValue implements Writable {
}
public static class KmeansAggregator extends Aggregator {
}
private static void printUsage() {
}
public static void main(String[] args) throws IOException {
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Random;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableRecord;
public class BipartiteMatching {
private static final Text UNMATCHED = new Text("UNMATCHED");
public static class TextPair implements Writable {
}
public static class BipartiteMatchingVertexReader extends
}
public static class BipartiteMatchingVertex extends
}
private static void printUsage() {
}
public static void main(String[] args) throws IOException {
}
}
本文作者:云花
本文为云栖社区原创内容,未经允许不得转载。