zoukankan      html  css  js  c++  java
  • Spark 计算人员三度关系

    1、一度人脉:双方直接是好友
     
    2、二度人脉:双方有一个以上共同的好友,这时朋友网可以计算出你们有几个共同的好友并且呈现数字给你。你们的关系是: 你->朋友->陌生人
     
    3、三度人脉:即你朋友的朋友的朋友就是这个陌生人。你们的关系是 你->朋友->朋友->陌生人
     
    4、四度人脉:比三度增加一度,你们的关系是,你->朋友->朋友->朋友->陌生人
     
    5、五度人脉:你->朋友->朋友->朋友->朋友->陌生人 ,像上面这张图片表示的就是一个五度人脉关系。
     
    6、六度人脉:你->朋友->朋友->朋友->朋友->朋友->陌生人

    数据格式如下:

    A,B  
    A,C  
    A,E  
    B,D  
    E,D  
    C,F  
    F,G  

    业务逻辑如下:

    1、转操作flatMapToPair将行数据变为键值对,如A,B表示A和B认识,A可以通过B认识B的朋友,B通过A可以认识A的朋友,转化结果为{A:A,B,deg1friend,A->B}、{B:B,Adeg1friend,B->A};

    2、转操作groupByKey对键值对按Key进行分组,转化结果为:{A,【A,B deg1friend,A->BA,E deg1friend,A->E A,Cdeg1friend,A->C 】}...;

    3、转操作flatMapToPair生成包含可能存在(A->B,A->C两者走向B和C不相同,但都认识A,B和C即存在可能)二度关系的新的键值对,如A和B认识且A与C认识,那么B与C可以存在认识关系即二度关系,路线走向为:B->A->C或C->A->B;

    4、转操作filter在新的键值对中筛选出一度关系即两者已经是认识的,如A和B认识是一度关系;

    5、转操作subtractByKey对包含二度关系的键值对删除存在一度关系的人员及只剩二度关系;

    6、转化操作flatMapToPair生成新的二度关系及走向(双向走向【B,Cdeg2friend,C->A->B,B,C,deg2friend,B->A->C】);

    7,将新的二度关系与一度关系进行合并;

    8、转操作groupByKey对键值对按Key进行分组,转化结果为:(B,B,A,deg1friend,B->A B,D,deg1friend,B->DB,Cdeg2friend,C->A->B B,E,deg2friend,B->A->EB,E,deg2friend,B->D->E B,Edeg2friend,E->A->B B,E,deg2friend,E->D->B B,C,deg2friend,B->A->C 】...;

    9、转化操作flatMapToPair生成包含可能存在(如:B->Cdeg2friend,C->A->B  ,  B->Ddeg1friendB->D ,判断条件前为deg2friend,后为 deg1friend,前split【0】= 后split【0】,后的起点不在前的路径内三度关系的新的键值对;

    10、转操作subtractByKey对包含三度关系的键值对删除存在一度关系的人员;

    11、行为操作countByKey统计存在三度关系的比重;

    具有实现:

    package com.test;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.regex.Pattern;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.MapFunction;
    import org.apache.spark.api.java.function.PairFlatMapFunction;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.storage.StorageLevel;
    
    import scala.Tuple2;
    
    public class Test2 {
    
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf().setMaster("local").setAppName("My Test APP");
    		
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		
    		JavaRDD<String> rdd = sc.textFile("C:/rmgx.txt");
    		
    		JavaPairRDD<String, String> r1 = rdd.flatMapToPair(new PairFlatMapFunction<String,String,String>(){
    			@Override
    			public Iterator<Tuple2<String, String>> call(String t)
    					throws Exception {
    				List<Tuple2<String, String>> list = new ArrayList(); 
    				String[] eachterm = t.split(",");
    				list.add(new Tuple2(eachterm[0], eachterm[0] + "," + eachterm[1] + "," + "deg1friend"+ "," + eachterm[0] + "->" + eachterm[1]));
    				list.add(new Tuple2(eachterm[1], eachterm[1] + "," + eachterm[0] + "," + "deg1friend"+ "," + eachterm[1] + "->" + eachterm[0]));
    				return list.iterator();
    			}
    			
    		});
    		
    		r1.persist(StorageLevel.DISK_ONLY());
    						
    		JavaPairRDD<String, Iterable<String>> r2 = r1.groupByKey();
    					
    		JavaPairRDD<String, String> r3 = r2.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Iterable<String>>,String,String>(){
    
    			@Override
    			public Iterator<Tuple2<String, String>> call(
    					Tuple2<String, Iterable<String>> t) throws Exception {
    				List<Tuple2<String, String>> list = new ArrayList(); 
    				for (Iterator iter = t._2.iterator(); iter.hasNext();) {
    				     String str1 = (String)iter.next();
    				     String str1_0 = str1.split(",")[0];
    				     String str1_1 = str1.split(",")[1];
    				     list.add(new Tuple2(str1_0+ "->" + str1_1,"deg1friend,"+str1_0+ "->" + str1_1));
    				     for (Iterator iter2 = t._2.iterator(); iter2.hasNext();) {
    				    	 String str2 = (String)iter2.next();
    				    	 String str2_0 = str2.split(",")[0];
    				    	 String str2_1 = str2.split(",")[1];  
    					     if(!str1_1.equals(str2_1)){
    					    	 list.add(new Tuple2(str1_1+ "->" + str2_1 ,"deg2friend,"+str1_1 + "->" + str2_0 + "->" + str2_1)); 
    					     }
    				     }
    				}
    				return list.iterator();
    			}
    		});
    		
    		JavaPairRDD<String, String> r4 = r3.filter(new Function<Tuple2<String,String>,Boolean>(){
    			@Override
    			public Boolean call(Tuple2<String, String> v1) throws Exception {
    				return v1._2.indexOf("deg1friend")>-1;
    			}	
    		});
    		
    		r4.persist(StorageLevel.DISK_ONLY());
    		
    		JavaPairRDD<String, String> r5 = r3.subtractByKey(r4);
    		
    		JavaPairRDD<String, String> r6 = r5.flatMapToPair(new PairFlatMapFunction<Tuple2<String,String>,String,String>(){
    			@Override
    			public Iterator<Tuple2<String, String>> call(
    					Tuple2<String, String> t) throws Exception {
    				List<Tuple2<String, String>> list = new ArrayList(); 
    				String t0 = t._1.split("->")[0];
    				String t1 = t._1.split("->")[1];
    				String t2_1 = t._2.split(",")[1];
    				list.add(new Tuple2(t0, t0 + "," + t1 + "," + "deg2friend"+ "," +t2_1));
    				list.add(new Tuple2(t1, t1 + "," + t0 + "," + "deg2friend"+ "," +t2_1));
    				return list.iterator();
    			}
    		});
    		
    		JavaPairRDD<String, String> r7= r1.union(r6);
    		
    		JavaPairRDD<String, Iterable<String>> r8 = r7.groupByKey();
    		
    		System.out.println("线路走向:"+StringUtils.join(r8.collect(), ","));
    		
    		JavaPairRDD<String, String> r9 = r8.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Iterable<String>>,String,String>(){
    			@Override
    			public Iterator<Tuple2<String, String>> call(
    					Tuple2<String, Iterable<String>> t) throws Exception {
    				List<Tuple2<String, String>> list = new ArrayList(); 
    				for (Iterator iter = t._2.iterator(); iter.hasNext();) {
    				     String str1 = (String)iter.next();
    				     String str1_0 = str1.split(",")[0];
    				     String str1_1 = str1.split(",")[1];
    				     String str1_2 = str1.split(",")[2];
    				     String str1_3 = str1.split(",")[3];
    				     for (Iterator iter2 = t._2.iterator(); iter2.hasNext();) {
    				    	 String str2 = (String)iter2.next();
    				    	 String str2_0 = str2.split(",")[0];
    					     String str2_1 = str2.split(",")[1];
    					     String str2_2 = str2.split(",")[2];
    					     String str2_3 = str2.split(",")[3];
    					     if(!str1_1.equals(str2_1) && str1_2.equals("deg2friend") && str2_2.equals("deg1friend") && !(str1_3.indexOf(str2_1)>-1) && (str1_3.split("->")[0].equals(str1_1))
    					    		 &&str1_0.equals(str2_0)) {
    					    	 list.add(new Tuple2(str1_1+ "->" + str2_1 ,"deg3friend,"+str1_3+"->"+str2_1)); 
    					     }
    				     }
    				}
    				return list.iterator();
    			}
    		});
    		JavaPairRDD<String, String> r10 = r9.subtractByKey(r4);
    		
    		System.out.println("线路走向:"+StringUtils.join(r10.collect(), ","));
    		
    		Map<String, Long> r11 = r10.countByKey();
    		
    		System.out.println(r11);
    		
    	}
    }

    运行结果如下:

    {C->D=2, B->F=1, G->A=1, F->E=1, F->B=1, E->F=1, D->C=2, A->G=1}

  • 相关阅读:
    SMB(Server Message Block) Protocal Research
    IPC$概念及入侵方式研究
    Linux中的pipe(管道)与named pipe(FIFO 命名管道)
    RSA Encrypting/Decrypting、RSA+AES Encrypting/Decrypting
    卷积思想理解、Convolutional Neural Network(CNN)卷积神经网络初探
    IMPLEMENTING A GRU/LSTM RNN WITH PYTHON AND THEANO
    Recurrent Neural Networks(RNN) 循环神经网络初探
    Neural Networks and Deep Learning(神经网络与深度学习)
    基于USB网卡适配器劫持DHCP Server嗅探Windows NTLM Hash密码
    Encryption and decryption、Steganography、Decryption Tools
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472429.html
Copyright © 2011-2022 走看看