zoukankan      html  css  js  c++  java
  • flink dataset join笔记

    1、dataset的join连接,通过key进行关联,一般情况下的join都是inner join,类似sql里的inner join

    key包括以下几种情况:

    a key expression

    a key-selector function

    one or more field position keys (Tuple DataSet only).

    Case Class Fields

    2、inner join的几种情况

    2.1 缺省的join,jion到一个Tuple2元组里

    public static class User { public String name; public int zip; }
    public static class Store { public Manager mgr; public int zip; }
    DataSet<User> input1 = // [...]
    DataSet<Store> input2 = // [...]
    // result dataset is typed as Tuple2
    DataSet<Tuple2<User, Store>>
                result = input1.join(input2)
                               .where("zip")       // key of the first input (users)
                               .equalTo("zip");    // key of the second input (stores)

    2.2 用户自定义JoinFuncation,使用with语句

    // some POJO
    public class Rating {
      public String name;
      public String category;
      public int points;
    }
    
    // Join function that joins a custom POJO with a Tuple
    public class PointWeighter
             implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
    
      @Override
      public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
        // multiply the points and rating and construct a new output tuple
        return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
      }
    }
    
    DataSet<Rating> ratings = // [...]
    DataSet<Tuple2<String, Double>> weights = // [...]
    DataSet<Tuple2<String, Double>>
                weightedRatings =
                ratings.join(weights)
    
                       // key of the first input
                       .where("category")
    
                       // key of the second input
                       .equalTo("f0")
    
                       // applying the JoinFunction on joining pairs
                       .with(new PointWeighter());

    2.3 使用Flat-Join Function,这种JoinFuncation和FlatJoinFuncation与MapFuncation和FlatMapFuncation的关系类似

    public class PointWeighter
             implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
      @Override
      public void join(Rating rating, Tuple2<String, Double> weight,
          Collector<Tuple2<String, Double>> out) {
        if (weight.f1 > 0.1) {
            out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
        }
      }
    }
    
    DataSet<Tuple2<String, Double>>
                weightedRatings =
                ratings.join(weights) // [...]

    2.4 join的投影构造,生成自定义的结果集

    DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
    DataSet<Tuple2<Integer, Double>> input2 = // [...]
    DataSet<Tuple4<Integer, String, Double, Byte>>
                result =
                input1.join(input2)
                      // key definition on first DataSet using a field position key
                      .where(0)
                      // key definition of second DataSet using a field position key
                      .equalTo(0)
                      // select and reorder fields of matching tuples
                      .projectFirst(0,2).projectSecond(1).projectFirst(1);
    projectFirst(int...) and projectSecond(int...) 
    选择应组合成输出元组的第一个和第二个连接输入的字段。索引的顺序定义了输出元组中字段的顺序。
    连接投影也适用于非元组数据集,在这种情况下,必须在不带参数的情况下调用projectFirst()或projectSecond(),以将连接元素添加到输出元组。

    2.5 加入join数据集大小提示,这是为了优化join的效率,引导优化器选择正确的执行策略。

    DataSet<Tuple2<Integer, String>> input1 = // [...]
    DataSet<Tuple2<Integer, String>> input2 = // [...]
    
    DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
                result1 =
                // hint that the second DataSet is very small
                input1.joinWithTiny(input2)
                      .where(0)
                      .equalTo(0);
    
    DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
                result2 =
                // hint that the second DataSet is very large
                input1.joinWithHuge(input2)
                      .where(0)
                      .equalTo(0);

    2.6 join的算法提示,Flink运行时可以以各种方式执行连接。在不同情况下,每种可能的方式都优于其他方式。系统会尝试自动选择合理的方式,但允许您手动选择策略,以防您想要强制执行连接的特定方式。

    DataSet<SomeType> input1 = // [...]
    DataSet<AnotherType> input2 = // [...]
    
    DataSet<Tuple2<SomeType, AnotherType> result =
          input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                .where("id").equalTo("key");
        OPTIMIZER_CHOOSES:相当于不提供任何提示,将选择留给系统。
    
        BROADCAST_HASH_FIRST:广播第一个输入并从中构建哈希表,由第二个输入探测。如果第一个输入非常小,这是一个很好的策略。
    
        BROADCAST_HASH_SECOND:广播第二个输入并从中构建一个哈希表,由第一个输入探测。如果第二个输入非常小,这是一个好策略。
    
        REPARTITION_HASH_FIRST:系统对每个输入进行分区(shuffle)(除非输入已经分区)并从第一个输入构建哈希表。如果第一个输入小于第二个输入,则此策略很好,但两个输入仍然很大。
    注意:如果不能进行大小估算,并且不能重新使用预先存在的分区和排序顺序,则这是系统使用的默认回退策略。 REPARTITION_HASH_SECOND:系统对每个输入进行分区(shuffle)(除非输入已经被分区)并从第二个输入构建哈希表。如果第二个输入小于第一个输入,则此策略很好,但两个输入仍然很大。 REPARTITION_SORT_MERGE:系统对每个输入进行分区(shuffle)(除非输入已经被分区)并对每个输入进行排序(除非它已经排序)。输入通过已排序输入的流合并来连接。如果已经对一个或两个输入进行了排序,则此策略很好。

    3、FlatJoinFunction与FlatMapFunction的区别(JoinFuncation和MapFuncation的情况类似)

    1、实际上两者可以干相同的事情
    2、使用的区别是FlatJoinFunction有两个输入(就是join的两个数据集)一个输出,
        而FlatMapFunction只有一个输入,但是这个输入参数里可以直接包括多个输入结构(即join的两个数据集都可以放入到一个输入参数里),
        所以最终实现的结果实际是一致的。

    3.1 FlatMapFunction应用join的例子

          DataSet<Long> pagesInput = // [...]
          DataSet<Tuple2<Long, Long>> linksInput = // [...]
    
          DataSet<Tuple2<Long, Double>> pagesWithRanks = // [...]
    
          DataSet<Tuple2<Long, Long[]>> adjacencyListInput =// [...]
    
          IterativeDataSet<Tuple2<Long, Double>> iteration = // [...]
    
           DataSet<Tuple2<Long, Double>> newRanks = iteration.join(adjacencyListInput)
                    .where(0).equalTo(0)
                    .flatMap(new JoinVertexWithEdgesMatch())
                    //下面的不用关注
                    .groupBy(0)
                    .aggregate(Aggregations.SUM, 1)
                    
                    .map(new Dampener(PageRank.DAMPENING_FACTOR, numPages));
        public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {
    
            @Override
            public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out) {
                Long[] neighbors = value.f1.f1;
                double rank = value.f0.f1;
                double rankToDistribute = rank / ((double) neighbors.length);
    
                for (Long neighbor : neighbors) {
                    out.collect(new Tuple2<Long, Double>(neighbor, rankToDistribute));
                }
            }
        }
    从上面的例子可以看到FlatMapFunction虽然只有一个输入,但是输入参数Tuple2里包含两个Tuple2,这被包含的两个Tuple2就是join的两个数据集。

    3.2 FlatJoinFunction和JoinFuncation例子,它们使用with语句来实现

    DataSet<Tuple2<Long, Long>> changes =   iteration.getWorkset().join(edges)
                    .where(0).equalTo(0)
                    .with(new NeighborWithComponentIDJoin())
                    .groupBy(0).aggregate(Aggregations.MIN, 1)
                    .join(iteration.getSolutionSet()).where(0).equalTo(0)
                    .with(new ComponentIdFilter());
    
    
    
        public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
    
            @Override
            public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
                return new Tuple2<>(edge.f1, vertexWithComponent.f1);
            }
        }
    
    
        public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
    
            @Override
            public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
                if (candidate.f1 < old.f1) {
                    out.collect(candidate);
                }
            }
        }
    从上述例子可以看到FlatJoinFunction或者JoinFunction是两个输入参数,也就是join的两个数据集

    3.3 从源码上看,FlatJoinFunction与FlatMapFunction两者实际没太大区别

    @Public
    @FunctionalInterface
    public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {
    
        /**
         * The join method, called once per joined pair of elements.
         *
         * @param first The element from first input.
         * @param second The element from second input.
         * @param out The collector used to return zero, one, or more elements.
         *
         * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
    }
    @Public
    @FunctionalInterface
    public interface FlatMapFunction<T, O> extends Function, Serializable {
    
        /**
         * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
         * it into zero, one, or more elements.
         *
         * @param value The input value.
         * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        void flatMap(T value, Collector<O> out) throws Exception;
    }

    4、outer join,外连接,类似sql的left join,right join,full join的情况

    OuterJoin在两个数据集上执行左,右或全外连接。外连接类似于常规的(inner join)连接,并创建在其键上相等的所有元素对。
    此外,如果在另一侧没有找到匹配的key,则保留“外部”侧(左侧,右侧或两者)的记录。
    匹配元素对(或一个元素和另一个输入的空值)被赋予JoinFunction以将元素对转换为单个元素,
    或者给予FlatJoinFunction以将元素对转换为任意多个(包括none)元素。

    4.1 外连接OuterJoin

    OuterJoin调用用户定义的连接函数来处理连接元组。连接函数接收第一个输入DataSet的一个元素和第二个输入DataSet的一个元素,并返回一个元素。根据外连接的类型(left,right,full),join函数的两个输入元素之一可以为null。
    
    以下代码使用键选择器函数执行DataSet与自定义java对象和Tuple DataSet的左外连接,并显示如何使用用户定义的连接函数:
    // some POJO
    public class Rating {
      public String name;
      public String category;
      public int points;
    }
    
    // Join function that joins a custom POJO with a Tuple
    public class PointAssigner
             implements JoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
    
      @Override
      public Tuple2<String, Integer> join(Tuple2<String, String> movie, Rating rating) {
        // Assigns the rating points to the movie.
        // NOTE: rating might be null
        return new Tuple2<String, Double>(movie.f0, rating == null ? -1 : rating.points;
      }
    }
    
    DataSet<Tuple2<String, String>> movies = // [...]
    DataSet<Rating> ratings = // [...]
    DataSet<Tuple2<String, Integer>>
                moviesWithPoints =
                movies.leftOuterJoin(ratings)
    
                       // key of the first input
                       .where("f0")
    
                       // key of the second input
                       .equalTo("name")
    
                       // applying the JoinFunction on joining pairs
                       .with(new PointAssigner());

    4.2 FlatJoinFuncation实现OuterJoin

    public class PointAssigner
             implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
      @Override
      public void join(Tuple2<String, String> movie, Rating rating
        Collector<Tuple2<String, Integer>> out) {
      if (rating == null ) {
        out.collect(new Tuple2<String, Integer>(movie.f0, -1));
      } else if (rating.points < 10) {
        out.collect(new Tuple2<String, Integer>(movie.f0, rating.points));
      } else {
        // do not emit
      }
    }
    
    DataSet<Tuple2<String, Integer>>
                moviesWithPoints =
                movies.leftOuterJoin(ratings) // [...]
     
  • 相关阅读:
    DHCP分配ip地址。0.0.0.0与255.255.255.255
    net-snmp配置文件详解
    net-snmp开发中出现“Error opening specified endpoint"" ”的解决方案
    Elasticsearch 学习笔记
    Prometheus 监控报警系统 AlertManager 之邮件告警
    Match All Query
    Elasticsearch postman
    Centos7修改root密码
    ElasticSearch中profile API的使用
    kafka查询某时间段内的消息
  • 原文地址:https://www.cnblogs.com/asker009/p/11074417.html
Copyright © 2011-2022 走看看