zoukankan      html  css  js  c++  java
  • flink批处理从0到1学习

    一、DataSet API之Data Sources(消费者之数据源)

    介绍:

    flink提供了大量的已经实现好的source方法,你也可以自定义source 通过实现sourceFunction接口来自定义无并行度的source, 或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source。

    类型:
    基于文件

    readTextFile(path) 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。

    基于集合

    fromCollection(Collection) 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

    代码实现:
    1、fromCollection
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    object StreamingFromCollectionScala {
    
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //隐式转换
        import org.apache.flink.api.scala._
    
        val data = List(10,15,20)
    
        val text = env.fromCollection(data)
    
        //针对map接收到的数据执行加1的操作
        val num = text.map(_+1)
    
        num.print().setParallelism(1)
    
        env.execute("StreamingFromCollectionScala")
    
      }
    
    }
    package xuwei.tech.batch;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    /**
     */
    public class BatchWordCountJava {
    
        public static void main(String[] args) throws Exception{
             val data = List(10,15,20)
            String outPath = "D:\data\result";
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            //获取文件中的内容
            val text = env.fromCollection(data)
            DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
            counts.writeAsCsv(outPath,"
    "," ").setParallelism(1);
            env.execute("batch word count");
    
        }
    
    
        public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split("\W+");
                for (String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<String, Integer>(token,1));
                    }
                }
            }
        }
    }
    
    
    2、readTextFile
    package xuwei.tech.batch;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    /**
     * Created by xuwei.tech on 2018/10/8.
     */
    public class BatchWordCountJava {
    
        public static void main(String[] args) throws Exception{
            String inputPath = "D:\data\file";
            String outPath = "D:\data\result";
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            //获取文件中的内容
            DataSource<String> text = env.readTextFile(inputPath);
    
            DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
            counts.writeAsCsv(outPath,"
    "," ").setParallelism(1);
            env.execute("batch word count");
    
        }
    
    
        public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split("\W+");
                for (String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<String, Integer>(token,1));
                    }
                }
            }
        }
    }
    

    二、DataSet API之Transformations

    介绍:

    1. Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
    2. FlatMap:输入一个元素,可以返回零个,一个或者多个元素
    3. MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】
    4. Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
    5. Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
    6. Aggregate:sum、max、min等
    7. Distinct:返回一个数据集中去重之后的元素,data.distinct()
    8. Join:内连接
    9. OuterJoin:外链接
    10. Cross:获取两个数据集的笛卡尔积
    11. Union:返回两个数据集的总和,数据类型需要一致
    12. First-n:获取集合中的前N个元素
    13. Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序
    代码实现:
    1、broadcast(广播变量)
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    /**
     * broadcast广播变量
     *
     *
     *
     * 需求:
     * flink会从数据源中获取到用户的姓名
     *
     * 最终需要把用户的姓名和年龄信息打印出来
     *
     * 分析:
     * 所以就需要在中间的map处理的时候获取用户的年龄信息
     *
     * 建议吧用户的关系数据集使用广播变量进行处理
     *
     *
     *
     *
     * 注意:如果多个算子需要使用同一份数据集,那么需要在对应的多个算子后面分别注册广播变量
     */
    public class BatchDemoBroadcast {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //1:准备需要广播的数据
            ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
            broadData.add(new Tuple2<>("zs",18));
            broadData.add(new Tuple2<>("ls",20));
            broadData.add(new Tuple2<>("ww",17));
            DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);
    
    
            //1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
            DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
                @Override
                public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                    HashMap<String, Integer> res = new HashMap<>();
                    res.put(value.f0, value.f1);
                    return res;
                }
            });
    
            //源数据
            DataSource<String> data = env.fromElements("zs", "ls", "ww");
    
            //注意:在这里需要使用到RichMapFunction获取广播变量
            DataSet<String> result = data.map(new RichMapFunction<String, String>() {
    
                List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
                HashMap<String, Integer> allMap = new HashMap<String, Integer>();
    
                /**
                 * 这个方法只会执行一次
                 * 可以在这里实现一些初始化的功能
                 *
                 * 所以,就可以在open方法中获取广播变量数据
                 *
                 */
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //3:获取广播数据
                    this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
                    for (HashMap map : broadCastMap) {
                        allMap.putAll(map);
                    }
    
                }
    
                @Override
                public String map(String value) throws Exception {
                    Integer age = allMap.get(value);
                    return value + "," + age;
                }
            }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操作
            result.print();
        }
    
    }
    
    
    2、IntCounter(累加器)
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.JobExecutionResult;
    import org.apache.flink.api.common.accumulators.IntCounter;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.MapOperator;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    /**
     * 全局累加器
     *
     * counter 计数器
     *
     * 需求:
     * 计算map函数中处理了多少数据
     *
     *
     * 注意:只有在任务执行结束后,才能获取到累加器的值
     *
     *
     *
     * Created by xuwei.tech on 2018/10/8.
     */
    public class BatchDemoCounter {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<String> data = env.fromElements("a", "b", "c", "d");
    
            DataSet<String> result = data.map(new RichMapFunction<String, String>() {
    
                //1:创建累加器
               private IntCounter numLines = new IntCounter();
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //2:注册累加器
                    getRuntimeContext().addAccumulator("num-lines",this.numLines);
    
                }
    
                //int sum = 0;
                @Override
                public String map(String value) throws Exception {
                    //如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
                    //sum++;
                    //System.out.println("sum:"+sum);
                    this.numLines.add(1);
                    return value;
                }
            }).setParallelism(8);
    
            //result.print();
    
            result.writeAsText("d:\data\count10");
    
            JobExecutionResult jobResult = env.execute("counter");
            //3:获取累加器
            int num = jobResult.getAccumulatorResult("num-lines");
            System.out.println("num:"+num);
    
        }
    
    
    
    }
    
    
    
    3、cross(获取笛卡尔积)
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.CrossOperator;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    
    import java.util.ArrayList;
    
    /**
     * 获取笛卡尔积
     *
     * Created by xuwei.tech on 2018/10/8.
     */
    public class BatchDemoCross {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //tuple2<用户id,用户姓名>
            ArrayList<String> data1 = new ArrayList<>();
            data1.add("zs");
            data1.add("ww");
    
            //tuple2<用户id,用户所在城市>
            ArrayList<Integer> data2 = new ArrayList<>();
            data2.add(1);
            data2.add(2);
    
            DataSource<String> text1 = env.fromCollection(data1);
            DataSource<Integer> text2 = env.fromCollection(data2);
    
            CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);
    
            cross.print();
    
    
        }
    
    
    
    }
    
    
    
    
    4、registerCachedFile(Distributed Cache)
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.commons.io.FileUtils;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.MapOperator;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    
    import java.io.File;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    /**
     * Distributed Cache
     */
    public class BatchDemoDisCache {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //1:注册一个文件,可以使用hdfs或者s3上的文件
            env.registerCachedFile("d:\data\file\a.txt","a.txt");
    
            DataSource<String> data = env.fromElements("a", "b", "c", "d");
    
            DataSet<String> result = data.map(new RichMapFunction<String, String>() {
                private ArrayList<String> dataList = new ArrayList<String>();
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //2:使用文件
                    File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
                    List<String> lines = FileUtils.readLines(myFile);
                    for (String line : lines) {
                        this.dataList.add(line);
                        System.out.println("line:" + line);
                    }
                }
    
                @Override
                public String map(String value) throws Exception {
                    //在这里就可以使用dataList
                    return value;
                }
            });
    
            result.print();
    
        }
    
    }
    
    
    5、distinct
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.MapPartitionFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.FlatMapOperator;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    public class BatchDemoDistinct {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<String> data = new ArrayList<>();
            data.add("hello you");
            data.add("hello me");
    
            DataSource<String> text = env.fromCollection(data);
    
            FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    String[] split = value.toLowerCase().split("\W+");
                    for (String word : split) {
                        System.out.println("单词:"+word);
                        out.collect(word);
                    }
                }
            });
    
            flatMapData.distinct()// 对数据进行整体去重
                    .print();
    
    
        }
    
    
    
    }
    
    
    
    
    6、排序(first)
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.common.operators.Order;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    
    import java.util.ArrayList;
    
    /**
     * 获取集合中的前N个元素
     * Created by xuwei.tech on 2018/10/8.
     */
    public class BatchDemoFirstN {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
            data.add(new Tuple2<>(2,"zs"));
            data.add(new Tuple2<>(4,"ls"));
            data.add(new Tuple2<>(3,"ww"));
            data.add(new Tuple2<>(1,"xw"));
            data.add(new Tuple2<>(1,"aw"));
            data.add(new Tuple2<>(1,"mw"));
    
    
            DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
    
    
            //获取前3条数据,按照数据插入的顺序
            text.first(3).print();
            System.out.println("==============================");
    
            //根据数据中的第一列进行分组,获取每组的前2个元素
            text.groupBy(0).first(2).print();
            System.out.println("==============================");
    
            //根据数据中的第一列分组,再根据第二列进行组内排序[升序],获取每组的前2个元素
            text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();
            System.out.println("==============================");
    
            //不分组,全局排序获取集合中的前3个元素,针对第一个元素升序,第二个元素倒序
            text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();
    
        }
    
    
    
    }
    
    
    7、join
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.FlatMapOperator;
    import org.apache.flink.api.java.tuple.Tuple1;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    public class BatchDemoJoin {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //tuple2<用户id,用户姓名>
            ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
            data1.add(new Tuple2<>(1,"zs"));
            data1.add(new Tuple2<>(2,"ls"));
            data1.add(new Tuple2<>(3,"ww"));
    
    
            //tuple2<用户id,用户所在城市>
            ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
            data2.add(new Tuple2<>(1,"beijing"));
            data2.add(new Tuple2<>(2,"shanghai"));
            data2.add(new Tuple2<>(3,"guangzhou"));
    
    
            DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
            DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
    
    
            text1.join(text2).where(0)//指定第一个数据集中需要进行比较的元素角标
                            .equalTo(0)//指定第二个数据集中需要进行比较的元素角标
                            .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
                                @Override
                                public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
                                        throws Exception {
                                    return new Tuple3<>(first.f0,first.f1,second.f1);
                                }
                            }).print();
    
            System.out.println("==================================");
    
            //注意,这里用map和上面使用的with最终效果是一致的。
            /*text1.join(text2).where(0)//指定第一个数据集中需要进行比较的元素角标
                    .equalTo(0)//指定第二个数据集中需要进行比较的元素角标
                    .map(new MapFunction<Tuple2<Tuple2<Integer,String>,Tuple2<Integer,String>>, Tuple3<Integer,String,String>>() {
                        @Override
                        public Tuple3<Integer, String, String> map(Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>> value) throws Exception {
                            return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1);
                        }
                    }).print();*/
        }
    }
    
    8、outerJoin
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    
    import java.util.ArrayList;
    
    /**
     * 外连接
     *
     * 左外连接
     * 右外连接
     * 全外连接
     */
    public class BatchDemoOuterJoin {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //tuple2<用户id,用户姓名>
            ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
            data1.add(new Tuple2<>(1,"zs"));
            data1.add(new Tuple2<>(2,"ls"));
            data1.add(new Tuple2<>(3,"ww"));
    
    
            //tuple2<用户id,用户所在城市>
            ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
            data2.add(new Tuple2<>(1,"beijing"));
            data2.add(new Tuple2<>(2,"shanghai"));
            data2.add(new Tuple2<>(4,"guangzhou"));
    
    
            DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
            DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
    
            /**
             * 左外连接
             *
             * 注意:second这个tuple中的元素可能为null
             *
             */
            text1.leftOuterJoin(text2)
                    .where(0)
                    .equalTo(0)
                    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
                        @Override
                        public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                            if(second==null){
                                return new Tuple3<>(first.f0,first.f1,"null");
                            }else{
                                return new Tuple3<>(first.f0,first.f1,second.f1);
                            }
    
                        }
                    }).print();
    
            System.out.println("=============================================================================");
    
            /**
             * 右外连接
             *
             * 注意:first这个tuple中的数据可能为null
             *
             */
            text1.rightOuterJoin(text2)
                    .where(0)
                    .equalTo(0)
                    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
                        @Override
                        public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                            if(first==null){
                                return new Tuple3<>(second.f0,"null",second.f1);
                            }
                            return new Tuple3<>(first.f0,first.f1,second.f1);
                        }
                    }).print();
    
    
    
            System.out.println("=============================================================================");
    
            /**
             * 全外连接
             *
             * 注意:first和second这两个tuple都有可能为null
             *
             */
    
            text1.fullOuterJoin(text2)
                    .where(0)
                    .equalTo(0)
                    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
                        @Override
                        public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                            if(first==null){
                                return new Tuple3<>(second.f0,"null",second.f1);
                            }else if(second == null){
                                return new Tuple3<>(first.f0,first.f1,"null");
                            }else{
                                return new Tuple3<>(first.f0,first.f1,second.f1);
                            }
                        }
                    }).print();
    
    
        }
    
    }
    
    
    
    9、union
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.UnionOperator;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    
    import java.util.ArrayList;
    
    /**
     * Created by xuwei.tech on 2018/10/8.
     */
    public class BatchDemoUnion {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
            data1.add(new Tuple2<>(1,"zs"));
            data1.add(new Tuple2<>(2,"ls"));
            data1.add(new Tuple2<>(3,"ww"));
    
    
            ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
            data2.add(new Tuple2<>(1,"lili"));
            data2.add(new Tuple2<>(2,"jack"));
            data2.add(new Tuple2<>(3,"jessic"));
    
    
            DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
            DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
    
            UnionOperator<Tuple2<Integer, String>> union = text1.union(text2);
    
            union.print();
    
    
    
        }
    
    
    
    }
    
    
    

    三、DataStream API之partition

    介绍:
    1. Rebalance:对数据集进行再平衡,重分区,消除数据倾斜
    2. Hash-Partition:根据指定key的哈希值对数据集进行分区
    3. partitionByHash()
    4. Range-Partition:根据指定的key对数据集进行范围分区
    5. .partitionByRange()
    6. Custom Partitioning:自定义分区规则
    7. 自定义分区需要实现Partitioner接口
    8. partitionCustom(partitioner, "someKey")
    9. 或者partitionCustom(partitioner, 0)
    代码实现:
    1、partitionByRange或partitionByHash
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.MapPartitionFunction;
    import org.apache.flink.api.common.operators.Order;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    
    /**
     * Hash-Partition
     *
     * Range-Partition
     *
     *
     * Created by xuwei.tech on 2018/10/8.
     */
    public class BatchDemoHashRangePartition {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
            data.add(new Tuple2<>(1,"hello1"));
            data.add(new Tuple2<>(2,"hello2"));
            data.add(new Tuple2<>(2,"hello3"));
            data.add(new Tuple2<>(3,"hello4"));
            data.add(new Tuple2<>(3,"hello5"));
            data.add(new Tuple2<>(3,"hello6"));
            data.add(new Tuple2<>(4,"hello7"));
            data.add(new Tuple2<>(4,"hello8"));
            data.add(new Tuple2<>(4,"hello9"));
            data.add(new Tuple2<>(4,"hello10"));
            data.add(new Tuple2<>(5,"hello11"));
            data.add(new Tuple2<>(5,"hello12"));
            data.add(new Tuple2<>(5,"hello13"));
            data.add(new Tuple2<>(5,"hello14"));
            data.add(new Tuple2<>(5,"hello15"));
            data.add(new Tuple2<>(6,"hello16"));
            data.add(new Tuple2<>(6,"hello17"));
            data.add(new Tuple2<>(6,"hello18"));
            data.add(new Tuple2<>(6,"hello19"));
            data.add(new Tuple2<>(6,"hello20"));
            data.add(new Tuple2<>(6,"hello21"));
    
    
            DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
    
            /*text.partitionByHash(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
                @Override
                public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                    Iterator<Tuple2<Integer, String>> it = values.iterator();
                    while (it.hasNext()){
                        Tuple2<Integer, String> next = it.next();
                        System.out.println("当前线程id:"+Thread.currentThread().getId()+","+next);
                    }
    
                }
            }).print();*/
    
    
            text.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
                @Override
                public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                    Iterator<Tuple2<Integer, String>> it = values.iterator();
                    while (it.hasNext()){
                        Tuple2<Integer, String> next = it.next();
                        System.out.println("当前线程id:"+Thread.currentThread().getId()+","+next);
                    }
    
                }
            }).print();
        }
    
    }
    
    
    
    
    2、mapPartition
    package xuwei.tech.batch.batchAPI;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.MapPartitionFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.MapPartitionOperator;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    
    /**
     * Created by xuwei.tech on 2018/10/8.
     */
    public class BatchDemoMapPartition {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<String> data = new ArrayList<>();
            data.add("hello you");
            data.add("hello me");
    
            DataSource<String> text = env.fromCollection(data);
    
            /*text.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    //获取数据库连接--注意,此时是每过来一条数据就获取一次链接
                    //处理数据
                    //关闭连接
                    return value;
                }
            });*/
    
    
            DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {
                @Override
                public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
                    //获取数据库连接--注意,此时是一个分区的数据获取一次连接【优点,每个分区获取一次链接】
                    //values中保存了一个分区的数据
                    //处理数据
                    Iterator<String> it = values.iterator();
                    while (it.hasNext()) {
                        String next = it.next();
                        String[] split = next.split("\W+");
                        for (String word : split) {
                            out.collect(word);
                        }
                    }
                    //关闭链接
                }
            });
    
            mapPartitionData.print();
    
    
        }
    
    
    
    }
    
    
    

    四、DataSet API之Data Sink(数据落地)

    介绍:
    1. writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
    2. writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法
    3. print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
    代码:
    1、writeAsCsv
    package xuwei.tech.batch;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    /**
     * Created by xuwei.tech on 2018/10/8.
     */
    public class BatchWordCountJava {
    
        public static void main(String[] args) throws Exception{
            String inputPath = "D:\data\file";
            String outPath = "D:\data\result";
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            //获取文件中的内容
            DataSource<String> text = env.readTextFile(inputPath);
    
            DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
            counts.writeAsCsv(outPath,"
    "," ").setParallelism(1);
            env.execute("batch word count");
    
        }
    
    
        public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split("\W+");
                for (String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<String, Integer>(token,1));
                    }
                }
            }
        }
    }
    

     

  • 相关阅读:
    hdu 5001 从任意点出发任意走d步不经过某点概率
    hdu 5007
    hdu 5009 离散化
    hdu 5011 Nim+拿完分堆
    thinkphp 删除多条记录
    thinkphp 实现无限极分类
    图片生成唯一的名字
    html 标签学习
    PHP比较运算!=和!==
    php使用 set_include_path
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/12846545.html
Copyright © 2011-2022 走看看