直接上代码:
包含了,map,filter,persist,mapPartitions等函数
String master = "spark://192.168.2.279:7077"; // jsc = getContext("local[2]"); jsc = getContext(master); sqlContext = new SQLContext(jsc); connectionProperties = new Properties(); connectionProperties.put("user", user); connectionProperties.put("password", password); connectionProperties.put("driver", "com.mysql.jdbc.Driver"); // emrdataIndedx(filePath, jsc, sqlContext);//加载emrdataindex数据到mysql JavaRDD<String> javaRDD = jsc.textFile(filePath); String[] fields = {"pk_dcpv", "code_pvtype", "name_pvtype", "code_ord", "empi", "code_sex" , "name_sex", "birthday", "age", "code_dept", "name_dept", "bed", "pk_dcordrisreq" , "code_req", "code_rep", "code_rep_type", "name_rep_type", "code_state", "name_state" , "code_eu_type", "name_eu_type", "code_eu_item", "name_eu_item", "code_part" , "name_part", "create_time", "code_pres", "parent_code"}; String[] old_type = {"D", "GYN", "X ", "MR ", "L05", "L04", "L12", "B ", "OTHC", "DOS", "ECG", "CT ", "UIS", "L02", "RIS", "SY ", "CB ", "L01", "ENT", "L03", "EYE", "NSC", "L07", "EMG", "NEU", "PTH", "DC", "INF", "GC", "L08", "L09", "BD", "L26", "ECT", "GM", "GP", "L10", "EDO", "L11", "DER", "EEG", "URO", "PFT", "L25", "RF", "OTH", "PIS", "PMR", "PSY", "MPL", "BM", "Z", "EIS", "BED", "BLD", "L27", "FOD", "R", "GYP", "CTD", "BDT", "L99", "EUS", "HNS", "L91", "SED", "L28", "F", "IED", "FOW", "L31", "OO", "P01", "L13"}; //广播变量 final Broadcast<String[]> broadcast = jsc.broadcast(old_type); StructType schema = createStructType(fields); JavaRDD<Row> mapPartitions1 = javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() { private static final long serialVersionUID = 1L; ObjectMapper mapper = new ObjectMapper(); @SuppressWarnings("unchecked") public Iterator<Row> call(Iterator<String> iterator) throws Exception { ArrayList<Row> arrayList = new ArrayList<Row>(); // TODO Auto-generated method stub while (iterator.hasNext()) { try { String next = iterator.next(); map_t = mapper.readValue(next, Map.class); for (Entry<String, Object> entry : map_t.entrySet()) { map_s.put(entry.getKey(), String.valueOf(entry.getValue())); } } catch (Exception e) { return null; } Row createOrdPart3Row = createOrdPart3Row(map_s); arrayList.add(createOrdPart3Row); } return arrayList.iterator(); } }); JavaRDD<Row> mapPartitions2 = mapPartitions1.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { // TODO Auto-generated method stub String pk_dcpv1 = row.getString(0); String code_pvtype1 = row.getString(1); String code_rep_type1 = row.getString(15); return pk_dcpv1.split("_").length == 2 && (!"".equals(code_pvtype1) || null != code_pvtype1 || !"P".equals(code_pvtype1)) && Arrays.asList(broadcast.value()).contains(code_rep_type1); } }); //broadcast不用就销毁 broadcast.destroy(); JavaRDD<Row> mapPartitions = mapPartitions2.repartition(100); JavaRDD<Row> persist = mapPartitions.persist(StorageLevel.MEMORY_AND_DISK_SER()); JavaRDD<Row> filter1 = persist.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { // TODO Auto-generated method stub return row.getString(0).startsWith("1"); } });