zoukankan      html  css  js  c++  java
  • 【赵强老师】Flink的DataSet算子

    Flink为了能够处理有边界的数据集和无边界的数据集,提供了对应的DataSet API和DataStream API。我们可以开发对应的Java程序或者Scala程序来完成相应的功能。下面举例了一些DataSet API中的基本的算子。

    下面我们通过具体的代码来为大家演示每个算子的作用。

    1、Map、FlatMap与MapPartition

    //获取运行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    ArrayList<String> data = new ArrayList<String>();
    data.add("I love Beijing");
    data.add("I love China");
    data.add("Beijing is the capital of China");
    DataSource<String> text = env.fromCollection(data);
    
    DataSet<List<String>> mapData = text.map(new MapFunction<String, List<String>>() {
    
    	public List<String> map(String data) throws Exception {
    		String[] words = data.split(" ");
    		
    		//创建一个List
    		List<String> result = new ArrayList<String>();
    		for(String w:words){
    			result.add(w);
    		}
    		return result;
    	}
    });
    mapData.print();
    System.out.println("*****************************************");
    
    DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
    
    	public void flatMap(String data, Collector<String> collection) throws Exception {
    		String[] words = data.split(" ");
    		for(String w:words){
    			collection.collect(w);
    		}
    	}
    });
    flatMapData.print();
    
    System.out.println("*****************************************");
    /*	new MapPartitionFunction<String, String>
    	第一个String:表示分区中的数据元素类型
    	第二个String:表示处理后的数据元素类型*/
    DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {
    
    	public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
    		//针对分区进行操作的好处是:比如要进行数据库的操作,一个分区只需要创建一个Connection
    		//values中保存了一个分区的数据
    		 Iterator<String> it = values.iterator();
    		while (it.hasNext()) {
    			String next = it.next();
    			String[] split = next.split(" ");
    			for (String word : split) {
    				out.collect(word);
    			}
    		}
    		//关闭链接
    	}
    });
    mapPartitionData.print();
    

    2、Filter与Distinct

    //获取运行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    ArrayList<String> data = new ArrayList<String>();
    data.add("I love Beijing");
    data.add("I love China");
    data.add("Beijing is the capital of China");
    DataSource<String> text = env.fromCollection(data);
    
    DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
    
    	public void flatMap(String data, Collector<String> collection) throws Exception {
    		String[] words = data.split(" ");
    		for(String w:words){
    			collection.collect(w);
    		}
    	}
    });
    
    //去掉重复的单词
    flatMapData.distinct().print();
    System.out.println("*********************");
    
    //选出长度大于3的单词
    flatMapData.filter(new FilterFunction<String>() {
    	
    	public boolean filter(String word) throws Exception {
    		int length = word.length();
    		return length>3?true:false;
    	}
    }).print();
    

    3、Join操作

    //获取运行的环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    //创建第一张表:用户ID  姓名
    ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
    data1.add(new Tuple2(1,"Tom"));
    data1.add(new Tuple2(2,"Mike"));
    data1.add(new Tuple2(3,"Mary"));
    data1.add(new Tuple2(4,"Jone"));
    //创建第二张表:用户ID 所在的城市
    ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
    data2.add(new Tuple2(1,"北京"));
    data2.add(new Tuple2(2,"上海"));
    data2.add(new Tuple2(3,"广州"));
    data2.add(new Tuple2(4,"重庆"));
    
    //实现join的多表查询:用户ID  姓名  所在的程序
    DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
    DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
    
    table1.join(table2).where(0).equalTo(0)
    /*第一个Tuple2<Integer,String>:表示第一张表
     * 第二个Tuple2<Integer,String>:表示第二张表
     * Tuple3<Integer,String, String>:多表join连接查询后的返回结果   */		                   
    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() {
    	public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
    			Tuple2<Integer, String> table2) throws Exception {
    		return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
    	} }).print();
    

    4、笛卡尔积

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    //创建第一张表:用户ID  姓名
    ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
    data1.add(new Tuple2(1,"Tom"));
    data1.add(new Tuple2(2,"Mike"));
    data1.add(new Tuple2(3,"Mary"));
    data1.add(new Tuple2(4,"Jone"));
    
    //创建第二张表:用户ID 所在的城市
    ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
    data2.add(new Tuple2(1,"北京"));
    data2.add(new Tuple2(2,"上海"));
    data2.add(new Tuple2(3,"广州"));
    data2.add(new Tuple2(4,"重庆"));
    
    //实现join的多表查询:用户ID  姓名  所在的程序
    DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
    DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
    
    //生成笛卡尔积
    table1.cross(table2).print();
    
     

    5、First-N

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    //这里的数据是:员工姓名、薪水、部门号
    DataSet<Tuple3<String, Integer,Integer>> grade = 
    		env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10),
    						 new Tuple3<String, Integer,Integer>("Mary",1500,20),
    						 new Tuple3<String, Integer,Integer>("Mike",1200,30),
    						 new Tuple3<String, Integer,Integer>("Jerry",2000,10));
    
    //按照插入顺序取前三条记录
    grade.first(3).print();
    System.out.println("**********************");
    
    //先按照部门号排序,在按照薪水排序
    grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();
    System.out.println("**********************");
    
    //按照部门号分组,求每组的第一条记录
    grade.groupBy(2).first(1).print();
    
     

    6、外链接操作

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    //创建第一张表:用户ID  姓名
    ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
    data1.add(new Tuple2(1,"Tom"));
    data1.add(new Tuple2(3,"Mary"));
    data1.add(new Tuple2(4,"Jone"));
    
    //创建第二张表:用户ID 所在的城市
    ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
    data2.add(new Tuple2(1,"北京"));
    data2.add(new Tuple2(2,"上海"));
    data2.add(new Tuple2(4,"重庆"));
    
    //实现join的多表查询:用户ID  姓名  所在的程序
    DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
    DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
    
    //左外连接
    table1.leftOuterJoin(table2).where(0).equalTo(0)
    	  .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
    
    		public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
    				Tuple2<Integer, String> table2) throws Exception {
    			// 左外连接表示等号左边的信息会被包含
    			if(table2 == null){
    				return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
    			}else{
    				return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
    			}
    		}
    	}).print();
    
    System.out.println("***********************************");
    //右外连接
    table1.rightOuterJoin(table2).where(0).equalTo(0)
    	  .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
    
    		public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
    				Tuple2<Integer, String> table2) throws Exception {
    			//右外链接表示等号右边的表的信息会被包含
    			if(table1 == null){
    				return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
    			}else{
    				return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1);
    			}
    		}
    	}).print();
    
    System.out.println("***********************************");
    
    //全外连接
    table1.fullOuterJoin(table2).where(0).equalTo(0)
    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
    
    	public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2)
    			throws Exception {
    		if(table1 == null){
    			return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
    		}else if(table2 == null){
    			return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
    		}else{
    			return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
    		}
    	}
    	
    }).print();
    

  • 相关阅读:
    Java 第一章 初识Java
    Tomcat基础教程(三)
    Tomcat基础教程(二)
    Web Service相关工具的配置
    分布式版本控制系统Git的安装与使用
    个人项目小学四则运算 “软件”之初版
    结对项目四则运算 “软件”之升级版
    第一篇作业准备
    Linux常用命令入门文件、网络、系统及其他操作命令
    MySql5.7默认生成的密码无法正常登陆
  • 原文地址:https://www.cnblogs.com/collen7788/p/13784743.html
Copyright © 2011-2022 走看看