package org.langtong.sparkdemo; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; import java.io.IOException; import java.io.Serializable; import java.util.*; import java.util.Map.Entry; /** * spark读文件 入mysql * org.langtong.sparkdemo.SparkReadFile2MysqlFull * * @author Administrator */ public class SparkReadFile2MysqlFull implements Serializable { private static final long serialVersionUID = 1L; private static Properties connectionProperties; private static JavaSparkContext jsc; private static SparkSession lalala; private static SQLContext sqlContext; private static String url = "jdbc:mysql://192.168.2.258:3306/diagbot?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"; private static String mysqldriver = "com.mysql.jdbc.Driver"; private static String user = "root"; private static String password = "diagbot@db"; private static String[] serials = new String[]{"13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32"}; private static String[] tables = new String[]{"emrdata_1", "emrdata_2", "emrdata_3", "emrdata_4", "emrdata_5", "emrdata_6", "emrdata_7", "emrdata_8", "emrdata_9"}; private static Map<String, Object> map_t = new HashMap<String, Object>(); private static Map<String, String> map_s = new HashMap<String, String>(); private static Map<String, String> map_r = new HashMap<String, String>(); private static Map<String, String> map_k = new HashMap<String, String>(); public static void main(String[] args) throws IOException { for (int i = 0; i <= 57; i++) { StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("hdfs://192.168.2.258:9000/datas/parquetFile/ord_detail_0718/part"); stringBuffer.append(i); ordDetail_full(stringBuffer.toString(), i); } // ordDetail_full("/opt/diagbot/datas/srrsh/ord_detail/part0"); } @SuppressWarnings("unused") private static void emrdataIndedx(String filePath, JavaSparkContext jsc, SQLContext sqlContext) { JavaRDD<String> javaRDD = jsc.textFile(filePath); String[] fields = {"pk_dcemr", "pk_dcpv", "empi", "code_pati", "code_pvtype", "code_ref_emr", "code_emr_type", "name_emr_type", "prlog_rdn", "code_dept_emr", "name_dept_emr", "code_psn_edit", "data_source", "source_pk", "create_time"}; JavaRDD<Row> mapPartitions = javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() { private static final long serialVersionUID = 1L; ObjectMapper mapper = new ObjectMapper(); @SuppressWarnings("unchecked") public Iterator<Row> call(Iterator<String> iterator) throws Exception { ArrayList<Row> arrayList = new ArrayList<Row>(); // TODO Auto-generated method stub while (iterator.hasNext()) { try { String next = iterator.next(); map_t = mapper.readValue(next, Map.class); for (Entry<String, Object> entry : map_t.entrySet()) { map_s.put(entry.getKey(), String.valueOf(entry.getValue())); } } catch (Exception e) { // TODO: handle exception return null; } arrayList.add(createEmerIndexRow(map_s)); } return arrayList.iterator(); } }); StructType schema = createStructType(fields); HashMap<String, JavaRDD<Row>> hashMap2 = new HashMap<String, JavaRDD<Row>>(); Dataset<Row> createDataFrame1 = sqlContext.createDataFrame(mapPartitions, schema); createDataFrame1.write().mode(SaveMode.Append) .jdbc(url, "emrdataindex_1", connectionProperties); } public static JavaRDD<Row> emrdata(JavaRDD<String> javaRDD) { JavaRDD<Row> rowRDD = javaRDD.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; public Row call(String line) throws Exception { // TODO Auto-generated method stub try { ObjectMapper mapper = new ObjectMapper(); map_t = mapper.readValue(line, Map.class); for (Entry<String, Object> entry : map_t.entrySet()) { map_s.put(entry.getKey(), String.valueOf(entry.getValue())); } } catch (Exception e) { // TODO: handle exception map_s.put("pvcode", "99999999"); map_s.put("remark", line); map_s.put("pcode", "3100|H02"); return createEmrdataRow(map_s); } return createEmrdataRow(map_s); } }); return rowRDD; } private static StructType createStructType(String[] fields) { LinkedList<StructField> structFieldsList = new LinkedList<StructField>(); for (String field : fields) { structFieldsList.add(DataTypes.createStructField(field, DataTypes.StringType, true)); } StructType schema = DataTypes.createStructType(structFieldsList); return schema; } private static JavaSparkContext getContext(String master) { SparkConf conf = new SparkConf().setAppName("SparkReadkwz").setMaster( master); conf.set("spark.scheduler.mode", "FAIR"); JavaSparkContext jsc = new JavaSparkContext(conf); return jsc; } private static void ordRec(String filePath) { String master = "spark://192.168.2.258:7077"; jsc = getContext(master); sqlContext = new SQLContext(jsc); connectionProperties = new Properties(); connectionProperties.put("user", user); connectionProperties.put("password", password); connectionProperties.put("driver", "com.mysql.jdbc.Driver"); JavaRDD<String> javaRDD = jsc.textFile(filePath); String[] ordrecFields = {"pk_ord_record", "pk_dcord", "pk_dcpv", "code_pvtype", "name_pvtype", "pvcode" , "code_ord", "empi", "code_pati", "code_sex", "name_sex", "age", "code_dept", "name_dept", "bed" , "pk_dcordrisreq", "code_req", "code_rep", "code_rep_type", "name_rep_type", "code_eu_type" , "name_eu_type", "code_eu_item", "name_eu_item", "create_time"}; StructType schema = createStructType(ordrecFields); JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; ObjectMapper mapper = new ObjectMapper(); public Row call(String line) throws Exception { // TODO Auto-generated method stub try { map_t = mapper.readValue(line, Map.class); for (Entry<String, Object> entry : map_t.entrySet()) { map_s.put(entry.getKey(), String.valueOf(entry.getValue())); } } catch (Exception e) { // TODO: handle exception return null; } return createOrdRecRow(map_s); } }); //去重 JavaRDD<Row> distinctRDD = mapPartitions.distinct(); JavaRDD<Row> filterRDD = distinctRDD.filter(new Function<Row, Boolean>() { @Override public Boolean call(Row row) throws Exception { if (null == row) return false; if ("P".equals(row.getString(3)) || "null".equals(row.getString(3))) return false; if (row.getString(2).split("_").length == 1) return false; return true; } }); //缓存 JavaRDD<Row> persistRDD = filterRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); JavaRDD<Row> filterRDD1 = null; Dataset<Row> dataFrame1 = null; for (final String se : serials ) { filterRDD1 = persistRDD.filter(new Function<Row, Boolean>() { @Override public Boolean call(Row row) throws Exception { return row.getString(0).startsWith(se); } }); if (!filterRDD1.isEmpty()) { dataFrame1 = sqlContext.createDataFrame(filterRDD1, schema); dataFrame1.write().mode(SaveMode.Append).jdbc(url, "ord_rec" + se, connectionProperties); } } jsc.close(); } private static void ord(String filePath) { String master = "spark://192.168.2.258:7077"; jsc = getContext(master); sqlContext = new SQLContext(jsc); connectionProperties = new Properties(); connectionProperties.put("user", user); connectionProperties.put("password", password); connectionProperties.put("driver", "com.mysql.jdbc.Driver"); JavaRDD<String> javaRDD = jsc.textFile(filePath); final String[] ordPart2Fields = {"pvcode", "pk_dcpv", "empi", "code_pvtype", "name_pvtype", "code_sex", "name_sex", "birthday", "code_dept", "name_dept", "code_ord", "code_orditem_type", "name_orditem_type", "code_orditem", "name_orditem", "date_create", "date_end", "note_ord", "code_pres", "parent_code", "create_time"}; StructType schema = createStructType(ordPart2Fields); JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; ObjectMapper mapper = new ObjectMapper(); public Row call(String line) throws Exception { // TODO Auto-generated method stub try { map_t = mapper.readValue(line, Map.class); for (Map.Entry<String, Object> entry : map_t.entrySet()) { map_s.put(entry.getKey(), String.valueOf(entry.getValue())); } } catch (Exception e) { // TODO: handle exception return null; } return createOrdRow(map_s); } }); JavaRDD<Row> distinctRDD1 = mapPartitions.distinct(); JavaRDD<Row> mapRDD1 = distinctRDD1.map(new Function<Row, Row>() { private static final long serialVersionUID = 1L; public Row call(Row row) throws Exception { // TODO Auto-generated method stub try { String trimCodeType = row.getString(11).trim(); for (int i = 0; i < ordPart2Fields.length; i++) { map_k.put(ordPart2Fields[i], row.getString(i)); } map_k.put("code_orditem_type", trimCodeType); return createOrdRow(map_k); } catch (Exception e) { return row; } } }); JavaRDD<Row> filterRDD = mapRDD1.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { // TODO Auto-generated method stub if (row == null) return false; if ("P".equals(row.getString(3))) { return false; } return true; } }); JavaRDD<Row> persistRDD = filterRDD.persist(StorageLevel .MEMORY_AND_DISK_SER()); JavaRDD<Row> filter1 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("1"); } }); JavaRDD<Row> filter2 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("2"); } }); JavaRDD<Row> filter3 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("31") || row.getString(0).startsWith("32") || row.getString(0).startsWith("33"); } }); JavaRDD<Row> filter4 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("34") || row.getString(0).startsWith("4"); } }); JavaRDD<Row> filter5 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("35") || row.getString(0).startsWith("5"); } }); JavaRDD<Row> filter6 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("36") || row.getString(0).startsWith("6"); } }); JavaRDD<Row> filter7 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("37") || row.getString(0).startsWith("7"); } }); JavaRDD<Row> filter8 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("8"); } }); JavaRDD<Row> filter9 = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { return row.getString(0).startsWith("38") || row.getString(0).startsWith("39") || row.getString(0).startsWith("30") || row.getString(0).startsWith("9"); } }); Dataset<Row> createDataFrame1 = sqlContext.createDataFrame(filter1, schema); createDataFrame1.write().mode(SaveMode.Append) .jdbc(url, "ord_1", connectionProperties); Dataset<Row> createDataFrame2 = sqlContext.createDataFrame(filter2, schema); createDataFrame2.write().mode(SaveMode.Append) .jdbc(url, "ord_2", connectionProperties); Dataset<Row> createDataFrame3 = sqlContext.createDataFrame(filter3, schema); createDataFrame3.write().mode(SaveMode.Append) .jdbc(url, "ord_3", connectionProperties); Dataset<Row> createDataFrame4 = sqlContext.createDataFrame(filter4, schema); createDataFrame4.write().mode(SaveMode.Append) .jdbc(url, "ord_4", connectionProperties); Dataset<Row> createDataFrame5 = sqlContext.createDataFrame(filter5, schema); createDataFrame5.write().mode(SaveMode.Append) .jdbc(url, "ord_5", connectionProperties); Dataset<Row> createDataFrame6 = sqlContext.createDataFrame(filter6, schema); createDataFrame6.write().mode(SaveMode.Append) .jdbc(url, "ord_6", connectionProperties); Dataset<Row> createDataFrame7 = sqlContext.createDataFrame(filter7, schema); createDataFrame7.write().mode(SaveMode.Append) .jdbc(url, "ord_7", connectionProperties); Dataset<Row> createDataFrame8 = sqlContext.createDataFrame(filter8, schema); createDataFrame8.write().mode(SaveMode.Append) .jdbc(url, "ord_8", connectionProperties); Dataset<Row> createDataFrame9 = sqlContext.createDataFrame(filter9, schema); createDataFrame9.write().mode(SaveMode.Append) .jdbc(url, "ord_9", connectionProperties); jsc.close(); } private static void ordDetail_full(String filePath, int i) { String master = "spark://192.168.2.145:7077"; lalala = SparkSession.builder().config(new SparkConf().setAppName("lalala")).getOrCreate(); // String master="local[*]"; SparkContext sparkContext = lalala.sparkContext(); SQLContext sqlContext = lalala.sqlContext(); // SparkReadFile2MysqlFull.sqlContext = new SQLContext(jsc); connectionProperties = new Properties(); connectionProperties.put("user", user); connectionProperties.put("password", password); connectionProperties.put("driver", "com.mysql.jdbc.Driver"); // JavaRDD<String> javaRDD = jsc.textFile(filePath); final String[] ordDetailFields = {"pk_rep_lis", "name_index_lis", "value_lis", "name_quanti_unit", "limit_high", "limit_low", "desc_rrs", "value_flag_name", "create_time", "lis_trend"}; StructType schema = createStructType(ordDetailFields); Dataset<Row> parquet = lalala.read().parquet(filePath); Dataset<Row> distinctRDD = parquet.distinct(); JavaRDD<Row> rowJavaRDD = distinctRDD.toJavaRDD(); // // 先映射一下 JavaRDD<Row> mapRDD = rowJavaRDD.map(new Function<Row, Row>() { public Row call(Row row) throws Exception { for (int i = 0; i < ordDetailFields.length; i++) { map_k.put(ordDetailFields[i], row.getString(i)); } String value_lis = row.getString(2);//检验结果 String limit_high = row.getString(4);//最高值 String limit_low = row.getString(5);//最低值 String desc_rrs = row.getString(6);//正常结果区间 String value_flag_name = row.getString(7);//箭头 String lis_tread = row.getString(9);//检验结果趋势 try { if (value_lis.replaceAll("[^\d]+", "").isEmpty() || value_lis.equals("")) { lis_tread = value_lis; } else if (value_lis.replaceAll("[\d]+", "").contains(desc_rrs)) { lis_tread = "正常"; } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↑".equals(value_flag_name)) { lis_tread = "偏高"; } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↓".equals(value_flag_name)) { lis_tread = "偏低"; } else if (desc_rrs.contains(":")) { lis_tread = desc_rrs; } else if (!value_lis.replaceAll("[^.\d]+", "").isEmpty() && !limit_low.replaceAll("[^.\d]+", "").isEmpty() && !limit_high.replaceAll("[^.\d]+", "").isEmpty() && getCount(value_lis.replaceAll("[^.\d]+", "")) <= 1 && getCount(limit_low.replaceAll("[^.\d]+", "")) <= 1 && getCount(limit_high.replaceAll("[^.\d]+", "")) <= 1 && !".".equals(value_lis.replaceAll("[^.\d]+", "")) && !".".equals(limit_low.replaceAll("[^.\d]+", "")) && !".".equals(limit_high.replaceAll("[^.\d]+", ""))) { if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float .parseFloat(limit_high.replaceAll("[^.\d]+", ""))) { lis_tread = "正常"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8 && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float .parseFloat(limit_low.replaceAll("[^.\d]+", ""))) { lis_tread = "偏低"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5 && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8) { lis_tread = "很低"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5) { lis_tread = "非常低"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2) { lis_tread = "偏高"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2 && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) { lis_tread = "很高"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) { lis_tread = "非常高"; } } else { lis_tread = "null"; } } catch (Exception e) { lis_tread = "null"; } map_k.put("lis_trend", lis_tread); return createOrdDetailFullRow(map_k); } }); Dataset<Row> dataFrame = sqlContext.createDataFrame(mapRDD, schema); dataFrame.repartition(1).write().orc("hdfs://192.168.2.232:9000/datas/parquetFile/ord_detail_second/part" + i); sparkContext.stop(); lalala.stop(); } private static void ordDetail_split(String filePath) { String master = "spark://192.168.2.258:7077"; // String master="local[*]"; jsc = getContext(master); sqlContext = new SQLContext(jsc); connectionProperties = new Properties(); connectionProperties.put("user", user); connectionProperties.put("password", password); connectionProperties.put("driver", "com.mysql.jdbc.Driver"); JavaRDD<String> javaRDD = jsc.textFile(filePath); final String[] ordDetailFields = {"pk_rep_lis", "name_index_lis", "value_lis", "name_quanti_unit", "limit_high", "limit_low", "desc_rrs", "value_flag_name", "create_time", "lis_all", "lis_result"}; StructType schema = createStructType(ordDetailFields); JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; ObjectMapper mapper = new ObjectMapper(); public Row call(String line) throws Exception { try { map_t = mapper.readValue(line, Map.class); for (Entry<String, Object> entry : map_t.entrySet()) { map_s.put(entry.getKey(), String.valueOf(entry.getValue())); } } catch (Exception e) { return null; } return createOrdDetailSpiltRow(map_s); } }); JavaRDD<Row> filterRDD = mapPartitions .filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row row) throws Exception { // TODO Auto-generated method stub if (null == row) { return false; } return true; } }); JavaRDD<Row> distinctRDD = filterRDD.distinct(); // 先映射一下 JavaRDD<Row> mapRDD = distinctRDD.map(new Function<Row, Row>() { public Row call(Row row) throws Exception { for (int i = 0; i < ordDetailFields.length; i++) { map_k.put(ordDetailFields[i], row.getString(i)); } String name_index_lis = row.getString(1);//检验项目 String value_lis = row.getString(2);//检验结果 String name_quanti_unit = row.getString(3);//结果单位 String limit_high = row.getString(4);//最高值 String limit_low = row.getString(5);//最低值 String desc_rrs = row.getString(6);//正常结果区间 String value_flag_name = row.getString(7);//箭头 String result = row.getString(10);//检验结果趋势result if (value_lis.replaceAll("[^\d]+", "").isEmpty() && "null".equals(desc_rrs)) { value_flag_name = value_lis; } else if (value_lis.replaceAll("[\d]+", "").contains(desc_rrs)) { value_flag_name = "正常"; } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↑".equals(value_flag_name)) { value_flag_name = "偏高"; } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↓".equals(value_flag_name)) { value_flag_name = "偏低"; } else if (desc_rrs.contains(":")) { value_flag_name = desc_rrs; } else if (!value_lis.replaceAll("[^.\d]+", "").isEmpty() && !limit_low.replaceAll("[^.\d]+", "").isEmpty() && !limit_high.replaceAll("[^.\d]+", "").isEmpty() && getCount(value_lis.replaceAll("[^.\d]+", "")) <= 1 && getCount(limit_low.replaceAll("[^.\d]+", "")) <= 1 && getCount(limit_high.replaceAll("[^.\d]+", "")) <= 1 && !".".equals(value_lis.replaceAll("[^.\d]+", "")) && !".".equals(limit_low.replaceAll("[^.\d]+", "")) && !".".equals(limit_high.replaceAll("[^.\d]+", ""))) { if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float .parseFloat(limit_high.replaceAll("[^.\d]+", ""))) { value_flag_name = "正常"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8 && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float .parseFloat(limit_low.replaceAll("[^.\d]+", ""))) { value_flag_name = "偏低"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5 && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8) { value_flag_name = "很低"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5) { value_flag_name = "太低了"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2) { value_flag_name = "偏高"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2 && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) { value_flag_name = "很高"; } if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) { value_flag_name = "太高了"; } } else { value_flag_name = "null"; } StringBuffer bf = new StringBuffer(); bf.append("[ "); bf.append(name_index_lis); bf.append(" :"); bf.append(value_lis); bf.append(name_quanti_unit); bf.append(" "); bf.append("区间 :"); bf.append(desc_rrs); bf.append(" "); bf.append("趋势 :"); bf.append(value_flag_name); bf.append(" ]"); result = bf.toString(); // map_k.put("lis_all",name_index_lis); map_k.put("lis_result", result); return createOrdDetailSpiltRow(map_k); } }); JavaPairRDD<String, Row> mapToPairRDD = mapRDD .mapToPair(new PairFunction<Row, String, Row>() { private static final long serialVersionUID = 1L; public Tuple2<String, Row> call(Row row) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Row>(row.getString(0), row); } }); // 分组进行计算 JavaPairRDD<String, Row> reduceByKeyRDD = mapToPairRDD .reduceByKey(new Function2<Row, Row, Row>() { private static final long serialVersionUID = 1L; public Row call(Row v1, Row v2) throws Exception { // TODO Auto-generated method stub String lis_all1 = v1.getString(9); String lis_all2 = v1.getString(9); String result1 = v1.getString(10); String result2 = v2.getString(10); for (int i = 0; i < ordDetailFields.length; i++) { map_r.put(ordDetailFields[i], v1.getString(i)); } // map_r.put("lis_all",lis_all1+" "+lis_all2); map_r.put("lis_result", result1 + " " + result2); return createOrdDetailSpiltRow(map_r); } }); JavaRDD<Row> map = reduceByKeyRDD.map(new Function<Tuple2<String, Row>, Row>() { private static final long serialVersionUID = 1L; public Row call(Tuple2<String, Row> v1) throws Exception { // TODO Auto-generated method stub return v1._2(); } }); JavaRDD<Row> persistRDD = map.persist(StorageLevel.MEMORY_AND_DISK_SER()); JavaRDD<Row> filter = null; Dataset<Row> createDataFrame = null; for (final String se : serials) { filter = persistRDD.filter(new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Row v1) throws Exception { // TODO Auto-generated method stub return v1.getString(0).startsWith(se); } }); if (!filter.isEmpty()) { createDataFrame = sqlContext.createDataFrame(filter, schema); createDataFrame.write().mode(SaveMode.Append) .jdbc(url, "ord_detail" + se, connectionProperties); } } jsc.close(); } public static Row createOrdDetailSpiltRow(Map<String, String> map) { return RowFactory .create(map.get("pk_rep_lis"), map.get("name_index_lis"), map.get("value_lis"), map.get("name_quanti_unit"), map.get("limit_high"), map.get("limit_low"), map.get("desc_rrs"), map.get("value_flag_name"), map.get("create_time"), map.get("lis_all"), map.get("lis_result")); } public static Row createOrdDetailFullRow(Map<String, String> map) { return RowFactory .create(map.get("pk_rep_lis"), map.get("name_index_lis"), map.get("value_lis"), map.get("name_quanti_unit"), map.get("limit_high"), map.get("limit_low"), map.get("desc_rrs"), map.get("value_flag_name"), map.get("create_time"), map.get("lis_trend")); } public static Row createEmrdataRow(Map<String, String> map) { return RowFactory.create(map.get("code_group"), map.get("code_org"), map.get("code_pati"), map.get("create_time"), map.get("data_source"), map.get("degree"), map.get("edit_time"), map.get("empi"), map.get("flag_del"), map.get("inputfield"), map.get("pcode"), map.get("phrcd"), map.get("pk_dcemr"), map.get("pk_rec_data"), map.get("pname"), map.get("prtype"), map.get("punit"), map.get("pvcode"), map.get("rdn"), map.get("relardn"), map.get("remark"), map.get("setno"), map.get("source_pk"), map.get("stay"), map.get("valuetype")); } public static Row createEmerIndexRow(Map<String, String> map) { return RowFactory.create(map.get("pk_dcemr"), map.get("pk_dcpv"), map.get("empi"), map.get("code_pati"), map.get("code_pvtype"), map.get("code_ref_emr"), map.get("code_emr_type"), map.get("name_emr_type"), map.get("prlog_rdn"), map.get("code_dept_emr"), map.get("name_dept_emr"), map.get("code_psn_edit"), map.get("data_source"), map.get("source_pk"), map.get("create_time")); } public static Row createOrdRow(Map<String, String> map) { return RowFactory.create( map.get("pvcode"), map.get("pk_dcpv"), map.get("empi"), map.get("code_pvtype"), map.get("name_pvtype"), map.get("code_sex"), map.get("name_sex"), map.get("birthday"), map.get("code_dept"), map.get("name_dept"), map.get("code_ord"), map.get("code_orditem_type"), map.get("name_orditem_type"), map.get("code_orditem"), map.get("name_orditem"), map.get("date_create"), map.get("date_end"), map.get("note_ord"), map.get("code_pres"), map.get("parent_code"), map.get("create_time")); } public static Row createOrdRecRow(Map<String, String> map) { return RowFactory.create( map.get("pk_ord_record"), map.get("pk_dcord"), map.get("pk_dcpv"), map.get("code_pvtype"), map.get("name_pvtype"), map.get("pvcode"), map.get("code_ord"), map.get("empi"), map.get("code_pati"), map.get("code_sex"), map.get("name_sex"), map.get("age"), map.get("code_dept"), map.get("name_dept"), map.get("bed"), map.get("pk_dcordrisreq"), map.get("code_req"), map.get("code_rep"), map.get("code_rep_type"), map.get("name_rep_type"), map.get("code_eu_type"), map.get("name_eu_type"), map.get("code_eu_item"), map.get("name_eu_item"), map.get("create_time")); } private static Integer getCount(String source) { String replace = source.replace(".", ""); return source.length() - replace.length(); } }