zoukankan      html  css  js  c++  java
  • spark的广播变量

    直接上代码:
    包含了,map,filter,persist,mapPartitions等函数

     String master = "spark://192.168.2.279:7077";
    //         jsc = getContext("local[2]");
            jsc = getContext(master);
            sqlContext = new SQLContext(jsc);
            connectionProperties = new Properties();
            connectionProperties.put("user", user);
            connectionProperties.put("password", password);
            connectionProperties.put("driver", "com.mysql.jdbc.Driver");
    //        emrdataIndedx(filePath, jsc, sqlContext);//加载emrdataindex数据到mysql
            JavaRDD<String> javaRDD = jsc.textFile(filePath);
            String[] fields = {"pk_dcpv", "code_pvtype", "name_pvtype", "code_ord", "empi", "code_sex"
                    , "name_sex", "birthday", "age", "code_dept", "name_dept", "bed", "pk_dcordrisreq"
                    , "code_req", "code_rep", "code_rep_type", "name_rep_type", "code_state", "name_state"
                    , "code_eu_type", "name_eu_type", "code_eu_item", "name_eu_item", "code_part"
                    , "name_part", "create_time", "code_pres", "parent_code"};
            String[] old_type = {"D", "GYN", "X ", "MR ", "L05", "L04",
                    "L12", "B ", "OTHC", "DOS", "ECG", "CT ", "UIS", "L02",
                    "RIS", "SY ", "CB ", "L01", "ENT", "L03", "EYE", "NSC",
                    "L07", "EMG", "NEU", "PTH", "DC", "INF", "GC", "L08",
                    "L09", "BD", "L26", "ECT", "GM", "GP", "L10", "EDO",
                    "L11", "DER", "EEG", "URO", "PFT", "L25", "RF", "OTH",
                    "PIS", "PMR", "PSY", "MPL", "BM", "Z", "EIS", "BED", "BLD",
                    "L27", "FOD", "R", "GYP", "CTD", "BDT", "L99", "EUS", "HNS",
                    "L91", "SED", "L28", "F", "IED", "FOW", "L31", "OO", "P01", "L13"};
            //广播变量
            final Broadcast<String[]> broadcast = jsc.broadcast(old_type);
            StructType schema = createStructType(fields);
            JavaRDD<Row> mapPartitions1 = javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() {
                private static final long serialVersionUID = 1L;
                ObjectMapper mapper = new ObjectMapper();
    
                @SuppressWarnings("unchecked")
                public Iterator<Row> call(Iterator<String> iterator)
                        throws Exception {
                    ArrayList<Row> arrayList = new ArrayList<Row>();
                    // TODO Auto-generated method stub
                    while (iterator.hasNext()) {
                        try {
                            String next = iterator.next();
                            map_t = mapper.readValue(next, Map.class);
                            for (Entry<String, Object> entry : map_t.entrySet()) {
                                map_s.put(entry.getKey(), String.valueOf(entry.getValue()));
                            }
                        } catch (Exception e) {
                            return null;
                        }
                        Row createOrdPart3Row = createOrdPart3Row(map_s);
                        arrayList.add(createOrdPart3Row);
    
                    }
                    return arrayList.iterator();
                }
            });
            JavaRDD<Row> mapPartitions2 = mapPartitions1.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    // TODO Auto-generated method stub
                    String pk_dcpv1 = row.getString(0);
                    String code_pvtype1 = row.getString(1);
                    String code_rep_type1 = row.getString(15);
                    return pk_dcpv1.split("_").length == 2
                            && (!"".equals(code_pvtype1) || null != code_pvtype1 || !"P".equals(code_pvtype1))
                            && Arrays.asList(broadcast.value()).contains(code_rep_type1);
                }
            });
            //broadcast不用就销毁
            broadcast.destroy();
            JavaRDD<Row> mapPartitions = mapPartitions2.repartition(100);
            JavaRDD<Row> persist = mapPartitions.persist(StorageLevel.MEMORY_AND_DISK_SER());
            JavaRDD<Row> filter1 = persist.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    // TODO Auto-generated method stub
                    return row.getString(0).startsWith("1");
                }
            });
  • 相关阅读:
    Swing编程之helloworld
    spring boot集成redis缓存
    Redis安装与运行讲解
    使用IDEA创建Spring boot项目,集成mybaits。并进行简单的数据库查询操作
    SQL字符串拼接FOR XML PATH
    Webservice大文件断点续传
    SQL查询库、表,列等的一些操作
    SQL列转行,行转列实现
    写一个发布Windows服务服务的小工具
    使用DocX替代COM组件的实现
  • 原文地址:https://www.cnblogs.com/kwzblog/p/10180281.html
Copyright © 2011-2022 走看看