zoukankan      html  css  js  c++  java
  • spark读文件写mysql(java版)

    package org.langtong.sparkdemo;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.broadcast.Broadcast;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.storage.StorageLevel;
    import scala.Tuple2;
    
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.*;
    import java.util.Map.Entry;
    
    /**
     * spark读文件 入mysql
     * org.langtong.sparkdemo.SparkReadFile2MysqlFull
     *
     * @author Administrator
     */
    public class SparkReadFile2MysqlFull implements Serializable {
        private static final long serialVersionUID = 1L;
        private static Properties connectionProperties;
        private static JavaSparkContext jsc;
        private static SparkSession lalala;
        private static SQLContext sqlContext;
        private static String url = "jdbc:mysql://192.168.2.258:3306/diagbot?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false";
        private static String mysqldriver = "com.mysql.jdbc.Driver";
        private static String user = "root";
        private static String password = "diagbot@db";
        private static String[] serials = new String[]{"13", "14", "15", "16", "17",
                "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32"};
        private static String[] tables = new String[]{"emrdata_1", "emrdata_2",
                "emrdata_3", "emrdata_4", "emrdata_5", "emrdata_6", "emrdata_7",
                "emrdata_8", "emrdata_9"};
        private static Map<String, Object> map_t = new HashMap<String, Object>();
        private static Map<String, String> map_s = new HashMap<String, String>();
        private static Map<String, String> map_r = new HashMap<String, String>();
        private static Map<String, String> map_k = new HashMap<String, String>();
    
        public static void main(String[] args) throws IOException {
            for (int i = 0; i <= 57; i++) {
                StringBuffer stringBuffer = new StringBuffer();
                stringBuffer.append("hdfs://192.168.2.258:9000/datas/parquetFile/ord_detail_0718/part");
                stringBuffer.append(i);
                ordDetail_full(stringBuffer.toString(), i);
            }
    //        ordDetail_full("/opt/diagbot/datas/srrsh/ord_detail/part0");
    
        }
    
    
        @SuppressWarnings("unused")
        private static void emrdataIndedx(String filePath, JavaSparkContext jsc,
                                          SQLContext sqlContext) {
            JavaRDD<String> javaRDD = jsc.textFile(filePath);
            String[] fields = {"pk_dcemr", "pk_dcpv", "empi", "code_pati",
                    "code_pvtype", "code_ref_emr", "code_emr_type",
                    "name_emr_type", "prlog_rdn", "code_dept_emr", "name_dept_emr",
                    "code_psn_edit", "data_source", "source_pk", "create_time"};
    
            JavaRDD<Row> mapPartitions = javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() {
                private static final long serialVersionUID = 1L;
                ObjectMapper mapper = new ObjectMapper();
    
                @SuppressWarnings("unchecked")
                public Iterator<Row> call(Iterator<String> iterator)
                        throws Exception {
                    ArrayList<Row> arrayList = new ArrayList<Row>();
                    // TODO Auto-generated method stub
                    while (iterator.hasNext()) {
                        try {
                            String next = iterator.next();
                            map_t = mapper.readValue(next, Map.class);
                            for (Entry<String, Object> entry : map_t.entrySet()) {
                                map_s.put(entry.getKey(),
                                        String.valueOf(entry.getValue()));
                            }
                        } catch (Exception e) {
                            // TODO: handle exception
                            return null;
                        }
                        arrayList.add(createEmerIndexRow(map_s));
                    }
                    return arrayList.iterator();
                }
            });
            StructType schema = createStructType(fields);
            HashMap<String, JavaRDD<Row>> hashMap2 = new HashMap<String, JavaRDD<Row>>();
            Dataset<Row> createDataFrame1 = sqlContext.createDataFrame(mapPartitions,
                    schema);
            createDataFrame1.write().mode(SaveMode.Append)
                    .jdbc(url, "emrdataindex_1", connectionProperties);
        }
    
        public static JavaRDD<Row> emrdata(JavaRDD<String> javaRDD) {
            JavaRDD<Row> rowRDD = javaRDD.map(new Function<String, Row>() {
                private static final long serialVersionUID = 1L;
    
                public Row call(String line) throws Exception {
                    // TODO Auto-generated method stub
                    try {
                        ObjectMapper mapper = new ObjectMapper();
                        map_t = mapper.readValue(line, Map.class);
                        for (Entry<String, Object> entry : map_t.entrySet()) {
                            map_s.put(entry.getKey(),
                                    String.valueOf(entry.getValue()));
                        }
    
                    } catch (Exception e) {
                        // TODO: handle exception
                        map_s.put("pvcode", "99999999");
                        map_s.put("remark", line);
                        map_s.put("pcode", "3100|H02");
                        return createEmrdataRow(map_s);
                    }
                    return createEmrdataRow(map_s);
    
                }
            });
    
            return rowRDD;
        }
    
        private static StructType createStructType(String[] fields) {
            LinkedList<StructField> structFieldsList = new LinkedList<StructField>();
            for (String field : fields) {
                structFieldsList.add(DataTypes.createStructField(field,
                        DataTypes.StringType, true));
            }
            StructType schema = DataTypes.createStructType(structFieldsList);
            return schema;
    
        }
    
        private static JavaSparkContext getContext(String master) {
            SparkConf conf = new SparkConf().setAppName("SparkReadkwz").setMaster(
                    master);
            conf.set("spark.scheduler.mode", "FAIR");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            return jsc;
        }
    
        private static void ordRec(String filePath) {
            String master = "spark://192.168.2.258:7077";
            jsc = getContext(master);
            sqlContext = new SQLContext(jsc);
            connectionProperties = new Properties();
            connectionProperties.put("user", user);
            connectionProperties.put("password", password);
            connectionProperties.put("driver", "com.mysql.jdbc.Driver");
            JavaRDD<String> javaRDD = jsc.textFile(filePath);
            String[] ordrecFields = {"pk_ord_record", "pk_dcord", "pk_dcpv", "code_pvtype", "name_pvtype", "pvcode"
                    , "code_ord", "empi", "code_pati", "code_sex", "name_sex", "age", "code_dept", "name_dept", "bed"
                    , "pk_dcordrisreq", "code_req", "code_rep", "code_rep_type", "name_rep_type", "code_eu_type"
                    , "name_eu_type", "code_eu_item", "name_eu_item", "create_time"};
            StructType schema = createStructType(ordrecFields);
            JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() {
                private static final long serialVersionUID = 1L;
                ObjectMapper mapper = new ObjectMapper();
    
                public Row call(String line) throws Exception {
                    // TODO Auto-generated method stub
                    try {
                        map_t = mapper.readValue(line, Map.class);
                        for (Entry<String, Object> entry : map_t.entrySet()) {
                            map_s.put(entry.getKey(), String.valueOf(entry.getValue()));
                        }
                    } catch (Exception e) {
                        // TODO: handle exception
                        return null;
                    }
                    return createOrdRecRow(map_s);
                }
            });
    
            //去重
            JavaRDD<Row> distinctRDD = mapPartitions.distinct();
    
            JavaRDD<Row> filterRDD = distinctRDD.filter(new Function<Row, Boolean>() {
                @Override
                public Boolean call(Row row) throws Exception {
                    if (null == row) return false;
                    if ("P".equals(row.getString(3)) || "null".equals(row.getString(3))) return false;
    
                    if (row.getString(2).split("_").length == 1) return false;
                    return true;
                }
            });
    
            //缓存
            JavaRDD<Row> persistRDD = filterRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
            JavaRDD<Row> filterRDD1 = null;
            Dataset<Row> dataFrame1 = null;
            for (final String se : serials
                    ) {
                filterRDD1 = persistRDD.filter(new Function<Row, Boolean>() {
                    @Override
                    public Boolean call(Row row) throws Exception {
                        return row.getString(0).startsWith(se);
                    }
                });
                if (!filterRDD1.isEmpty()) {
                    dataFrame1 = sqlContext.createDataFrame(filterRDD1, schema);
                    dataFrame1.write().mode(SaveMode.Append).jdbc(url, "ord_rec" + se, connectionProperties);
                }
    
            }
            jsc.close();
    
        }
    
        private static void ord(String filePath) {
            String master = "spark://192.168.2.258:7077";
            jsc = getContext(master);
            sqlContext = new SQLContext(jsc);
            connectionProperties = new Properties();
            connectionProperties.put("user", user);
            connectionProperties.put("password", password);
            connectionProperties.put("driver", "com.mysql.jdbc.Driver");
            JavaRDD<String> javaRDD = jsc.textFile(filePath);
            final String[] ordPart2Fields = {"pvcode", "pk_dcpv", "empi",
                    "code_pvtype", "name_pvtype", "code_sex", "name_sex",
                    "birthday", "code_dept", "name_dept", "code_ord",
                    "code_orditem_type", "name_orditem_type", "code_orditem",
                    "name_orditem", "date_create", "date_end", "note_ord",
                    "code_pres", "parent_code", "create_time"};
            StructType schema = createStructType(ordPart2Fields);
            JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() {
                private static final long serialVersionUID = 1L;
                ObjectMapper mapper = new ObjectMapper();
    
                public Row call(String line) throws Exception {
                    // TODO Auto-generated method stub
                    try {
                        map_t = mapper.readValue(line, Map.class);
                        for (Map.Entry<String, Object> entry : map_t.entrySet()) {
                            map_s.put(entry.getKey(),
                                    String.valueOf(entry.getValue()));
                        }
                    } catch (Exception e) {
                        // TODO: handle exception
                        return null;
                    }
                    return createOrdRow(map_s);
                }
            });
            JavaRDD<Row> distinctRDD1 = mapPartitions.distinct();
            JavaRDD<Row> mapRDD1 = distinctRDD1.map(new Function<Row, Row>() {
                private static final long serialVersionUID = 1L;
    
                public Row call(Row row) throws Exception {
                    // TODO Auto-generated method stub
                    try {
                        String trimCodeType = row.getString(11).trim();
                        for (int i = 0; i < ordPart2Fields.length; i++) {
                            map_k.put(ordPart2Fields[i], row.getString(i));
                        }
                        map_k.put("code_orditem_type", trimCodeType);
                        return createOrdRow(map_k);
                    } catch (Exception e) {
                        return row;
                    }
                }
            });
            JavaRDD<Row> filterRDD = mapRDD1.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    // TODO Auto-generated method stub
                    if (row == null)
                        return false;
                    if ("P".equals(row.getString(3))) {
                        return false;
                    }
                    return true;
                }
            });
            JavaRDD<Row> persistRDD = filterRDD.persist(StorageLevel
                    .MEMORY_AND_DISK_SER());
            JavaRDD<Row> filter1 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("1");
                }
            });
            JavaRDD<Row> filter2 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("2");
                }
            });
            JavaRDD<Row> filter3 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("31")
                            || row.getString(0).startsWith("32")
                            || row.getString(0).startsWith("33");
                }
            });
            JavaRDD<Row> filter4 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("34")
                            || row.getString(0).startsWith("4");
                }
            });
            JavaRDD<Row> filter5 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("35")
                            || row.getString(0).startsWith("5");
                }
            });
            JavaRDD<Row> filter6 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("36")
                            || row.getString(0).startsWith("6");
                }
            });
            JavaRDD<Row> filter7 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("37")
                            || row.getString(0).startsWith("7");
                }
            });
            JavaRDD<Row> filter8 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("8");
                }
            });
            JavaRDD<Row> filter9 = persistRDD.filter(new Function<Row, Boolean>() {
                private static final long serialVersionUID = 1L;
    
                public Boolean call(Row row) throws Exception {
                    return row.getString(0).startsWith("38")
                            || row.getString(0).startsWith("39")
                            || row.getString(0).startsWith("30")
                            || row.getString(0).startsWith("9");
                }
            });
            Dataset<Row> createDataFrame1 = sqlContext.createDataFrame(filter1,
                    schema);
            createDataFrame1.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_1", connectionProperties);
            Dataset<Row> createDataFrame2 = sqlContext.createDataFrame(filter2,
                    schema);
            createDataFrame2.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_2", connectionProperties);
            Dataset<Row> createDataFrame3 = sqlContext.createDataFrame(filter3,
                    schema);
            createDataFrame3.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_3", connectionProperties);
            Dataset<Row> createDataFrame4 = sqlContext.createDataFrame(filter4,
                    schema);
            createDataFrame4.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_4", connectionProperties);
            Dataset<Row> createDataFrame5 = sqlContext.createDataFrame(filter5,
                    schema);
            createDataFrame5.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_5", connectionProperties);
            Dataset<Row> createDataFrame6 = sqlContext.createDataFrame(filter6,
                    schema);
            createDataFrame6.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_6", connectionProperties);
            Dataset<Row> createDataFrame7 = sqlContext.createDataFrame(filter7,
                    schema);
            createDataFrame7.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_7", connectionProperties);
            Dataset<Row> createDataFrame8 = sqlContext.createDataFrame(filter8,
                    schema);
            createDataFrame8.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_8", connectionProperties);
    
            Dataset<Row> createDataFrame9 = sqlContext.createDataFrame(filter9,
                    schema);
            createDataFrame9.write().mode(SaveMode.Append)
                    .jdbc(url, "ord_9", connectionProperties);
            jsc.close();
    
        }
    
    
        private static void ordDetail_full(String filePath, int i) {
            String master = "spark://192.168.2.145:7077";
            lalala = SparkSession.builder().config(new SparkConf().setAppName("lalala")).getOrCreate();
            // String master="local[*]";
            SparkContext sparkContext = lalala.sparkContext();
            SQLContext sqlContext = lalala.sqlContext();
    //        SparkReadFile2MysqlFull.sqlContext = new SQLContext(jsc);
            connectionProperties = new Properties();
            connectionProperties.put("user", user);
            connectionProperties.put("password", password);
            connectionProperties.put("driver", "com.mysql.jdbc.Driver");
    //        JavaRDD<String> javaRDD = jsc.textFile(filePath);
            final String[] ordDetailFields = {"pk_rep_lis", "name_index_lis", "value_lis", "name_quanti_unit",
                    "limit_high", "limit_low", "desc_rrs", "value_flag_name", "create_time", "lis_trend"};
            StructType schema = createStructType(ordDetailFields);
    
            Dataset<Row> parquet = lalala.read().parquet(filePath);
    
            Dataset<Row> distinctRDD = parquet.distinct();
            JavaRDD<Row> rowJavaRDD = distinctRDD.toJavaRDD();
    //
            // 先映射一下
            JavaRDD<Row> mapRDD = rowJavaRDD.map(new Function<Row, Row>() {
                public Row call(Row row) throws Exception {
                    for (int i = 0; i < ordDetailFields.length; i++) {
                        map_k.put(ordDetailFields[i], row.getString(i));
                    }
                    String value_lis = row.getString(2);//检验结果
                    String limit_high = row.getString(4);//最高值
                    String limit_low = row.getString(5);//最低值
                    String desc_rrs = row.getString(6);//正常结果区间
                    String value_flag_name = row.getString(7);//箭头
                    String lis_tread = row.getString(9);//检验结果趋势
                    try {
                        if (value_lis.replaceAll("[^\d]+", "").isEmpty() || value_lis.equals("")) {
                            lis_tread = value_lis;
                        } else if (value_lis.replaceAll("[\d]+", "").contains(desc_rrs)) {
                            lis_tread = "正常";
                        } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↑".equals(value_flag_name)) {
                            lis_tread = "偏高";
                        } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↓".equals(value_flag_name)) {
                            lis_tread = "偏低";
                        } else if (desc_rrs.contains(":")) {
                            lis_tread = desc_rrs;
                        } else if (!value_lis.replaceAll("[^.\d]+", "").isEmpty()
                                && !limit_low.replaceAll("[^.\d]+", "").isEmpty()
                                && !limit_high.replaceAll("[^.\d]+", "").isEmpty()
                                && getCount(value_lis.replaceAll("[^.\d]+", "")) <= 1
                                && getCount(limit_low.replaceAll("[^.\d]+", "")) <= 1
                                && getCount(limit_high.replaceAll("[^.\d]+", "")) <= 1
                                && !".".equals(value_lis.replaceAll("[^.\d]+", ""))
                                && !".".equals(limit_low.replaceAll("[^.\d]+", ""))
                                && !".".equals(limit_high.replaceAll("[^.\d]+", ""))) {
                            if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                    .parseFloat(limit_low.replaceAll("[^.\d]+", ""))
                                    && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                    .parseFloat(limit_high.replaceAll("[^.\d]+", ""))) {
                                lis_tread = "正常";
                            }
                            if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                    .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8
                                    && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                    .parseFloat(limit_low.replaceAll("[^.\d]+", ""))) {
                                lis_tread = "偏低";
                            }
                            if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                    .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5
                                    && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                    .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8) {
                                lis_tread = "很低";
                            }
                            if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                    .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5) {
                                lis_tread = "非常低";
                            }
                            if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                    .parseFloat(limit_high.replaceAll("[^.\d]+", ""))
                                    && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                    .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2) {
                                lis_tread = "偏高";
                            }
                            if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                    .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2
                                    && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                    .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) {
                                lis_tread = "很高";
                            }
                            if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                    .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) {
                                lis_tread = "非常高";
                            }
                        } else {
                            lis_tread = "null";
                        }
                    } catch (Exception e) {
                        lis_tread = "null";
                    }
    
                    map_k.put("lis_trend", lis_tread);
                    return createOrdDetailFullRow(map_k);
                }
            });
            Dataset<Row> dataFrame = sqlContext.createDataFrame(mapRDD, schema);
            dataFrame.repartition(1).write().orc("hdfs://192.168.2.232:9000/datas/parquetFile/ord_detail_second/part" + i);
            sparkContext.stop();
            lalala.stop();
        }
    
        private static void ordDetail_split(String filePath) {
            String master = "spark://192.168.2.258:7077";
            // String master="local[*]";
            jsc = getContext(master);
            sqlContext = new SQLContext(jsc);
            connectionProperties = new Properties();
            connectionProperties.put("user", user);
            connectionProperties.put("password", password);
            connectionProperties.put("driver", "com.mysql.jdbc.Driver");
            JavaRDD<String> javaRDD = jsc.textFile(filePath);
            final String[] ordDetailFields = {"pk_rep_lis", "name_index_lis", "value_lis", "name_quanti_unit",
                    "limit_high", "limit_low", "desc_rrs", "value_flag_name", "create_time", "lis_all", "lis_result"};
            StructType schema = createStructType(ordDetailFields);
            JavaRDD<Row> mapPartitions = javaRDD.map(new Function<String, Row>() {
                private static final long serialVersionUID = 1L;
                ObjectMapper mapper = new ObjectMapper();
    
                public Row call(String line) throws Exception {
                    try {
                        map_t = mapper.readValue(line, Map.class);
                        for (Entry<String, Object> entry : map_t.entrySet()) {
                            map_s.put(entry.getKey(),
                                    String.valueOf(entry.getValue()));
                        }
                    } catch (Exception e) {
                        return null;
                    }
                    return createOrdDetailSpiltRow(map_s);
                }
            });
    
            JavaRDD<Row> filterRDD = mapPartitions
                    .filter(new Function<Row, Boolean>() {
                        private static final long serialVersionUID = 1L;
    
                        public Boolean call(Row row) throws Exception {
                            // TODO Auto-generated method stub
                            if (null == row) {
                                return false;
                            }
                            return true;
                        }
                    });
            JavaRDD<Row> distinctRDD = filterRDD.distinct();
    
            // 先映射一下
            JavaRDD<Row> mapRDD = distinctRDD.map(new Function<Row, Row>() {
    
                public Row call(Row row) throws Exception {
                    for (int i = 0; i < ordDetailFields.length; i++) {
                        map_k.put(ordDetailFields[i], row.getString(i));
                    }
                    String name_index_lis = row.getString(1);//检验项目
                    String value_lis = row.getString(2);//检验结果
                    String name_quanti_unit = row.getString(3);//结果单位
                    String limit_high = row.getString(4);//最高值
                    String limit_low = row.getString(5);//最低值
                    String desc_rrs = row.getString(6);//正常结果区间
                    String value_flag_name = row.getString(7);//箭头
                    String result = row.getString(10);//检验结果趋势result
                    if (value_lis.replaceAll("[^\d]+", "").isEmpty() && "null".equals(desc_rrs)) {
                        value_flag_name = value_lis;
                    } else if (value_lis.replaceAll("[\d]+", "").contains(desc_rrs)) {
                        value_flag_name = "正常";
                    } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↑".equals(value_flag_name)) {
                        value_flag_name = "偏高";
                    } else if (("null".equals(limit_high) || "null".equals(limit_low)) && "↓".equals(value_flag_name)) {
                        value_flag_name = "偏低";
                    } else if (desc_rrs.contains(":")) {
                        value_flag_name = desc_rrs;
                    } else if (!value_lis.replaceAll("[^.\d]+", "").isEmpty()
                            && !limit_low.replaceAll("[^.\d]+", "").isEmpty()
                            && !limit_high.replaceAll("[^.\d]+", "").isEmpty()
                            && getCount(value_lis.replaceAll("[^.\d]+", "")) <= 1
                            && getCount(limit_low.replaceAll("[^.\d]+", "")) <= 1
                            && getCount(limit_high.replaceAll("[^.\d]+", "")) <= 1
                            && !".".equals(value_lis.replaceAll("[^.\d]+", ""))
                            && !".".equals(limit_low.replaceAll("[^.\d]+", ""))
                            && !".".equals(limit_high.replaceAll("[^.\d]+", ""))) {
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", ""))
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", ""))) {
                            value_flag_name = "正常";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", ""))) {
                            value_flag_name = "偏低";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) >= Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.8) {
                            value_flag_name = "很低";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) < Float
                                .parseFloat(limit_low.replaceAll("[^.\d]+", "")) * 0.5) {
                            value_flag_name = "太低了";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", ""))
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2) {
                            value_flag_name = "偏高";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.2
                                && Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) <= Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) {
                            value_flag_name = "很高";
                        }
                        if (Float.parseFloat(value_lis.replaceAll("[^.\d]+", "")) > Float
                                .parseFloat(limit_high.replaceAll("[^.\d]+", "")) * 1.5) {
                            value_flag_name = "太高了";
                        }
                    } else {
                        value_flag_name = "null";
                    }
                    StringBuffer bf = new StringBuffer();
                    bf.append("[ ");
                    bf.append(name_index_lis);
                    bf.append(" :");
                    bf.append(value_lis);
                    bf.append(name_quanti_unit);
                    bf.append("	");
                    bf.append("区间 :");
                    bf.append(desc_rrs);
                    bf.append("	");
                    bf.append("趋势 :");
                    bf.append(value_flag_name);
                    bf.append(" ]");
                    result = bf.toString();
    //                map_k.put("lis_all",name_index_lis);
                    map_k.put("lis_result", result);
                    return createOrdDetailSpiltRow(map_k);
                }
            });
    
            JavaPairRDD<String, Row> mapToPairRDD = mapRDD
                    .mapToPair(new PairFunction<Row, String, Row>() {
                        private static final long serialVersionUID = 1L;
    
                        public Tuple2<String, Row> call(Row row) throws Exception {
                            // TODO Auto-generated method stub
                            return new Tuple2<String, Row>(row.getString(0), row);
                        }
                    });
            // 分组进行计算
            JavaPairRDD<String, Row> reduceByKeyRDD = mapToPairRDD
                    .reduceByKey(new Function2<Row, Row, Row>() {
                        private static final long serialVersionUID = 1L;
    
                        public Row call(Row v1, Row v2) throws Exception {
                            // TODO Auto-generated method stub
                            String lis_all1 = v1.getString(9);
                            String lis_all2 = v1.getString(9);
                            String result1 = v1.getString(10);
                            String result2 = v2.getString(10);
                            for (int i = 0; i < ordDetailFields.length; i++) {
                                map_r.put(ordDetailFields[i], v1.getString(i));
                            }
    //                        map_r.put("lis_all",lis_all1+"
    "+lis_all2);
                            map_r.put("lis_result", result1 + "
    " + result2);
                            return createOrdDetailSpiltRow(map_r);
                        }
                    });
            JavaRDD<Row> map = reduceByKeyRDD.map(new Function<Tuple2<String, Row>, Row>() {
                        private static final long serialVersionUID = 1L;
                        public Row call(Tuple2<String, Row> v1) throws Exception {
                            // TODO Auto-generated method stub
                            return v1._2();
                        }
                    });
            JavaRDD<Row> persistRDD = map.persist(StorageLevel.MEMORY_AND_DISK_SER());
            JavaRDD<Row> filter = null;
            Dataset<Row> createDataFrame = null;
            for (final String se : serials) {
                filter = persistRDD.filter(new Function<Row, Boolean>() {
                    private static final long serialVersionUID = 1L;
    
                    public Boolean call(Row v1) throws Exception {
                        // TODO Auto-generated method stub
                        return v1.getString(0).startsWith(se);
                    }
                });
                if (!filter.isEmpty()) {
                    createDataFrame = sqlContext.createDataFrame(filter, schema);
                    createDataFrame.write().mode(SaveMode.Append)
                            .jdbc(url, "ord_detail" + se, connectionProperties);
                }
            }
            jsc.close();
    
        }
    
        public static Row createOrdDetailSpiltRow(Map<String, String> map) {
            return RowFactory
                    .create(map.get("pk_rep_lis"),
                            map.get("name_index_lis"), map.get("value_lis"),
                            map.get("name_quanti_unit"), map.get("limit_high"),
                            map.get("limit_low"), map.get("desc_rrs"),
                            map.get("value_flag_name"),
                            map.get("create_time"),
                            map.get("lis_all"),
                            map.get("lis_result"));
    
        }
    
        public static Row createOrdDetailFullRow(Map<String, String> map) {
            return RowFactory
                    .create(map.get("pk_rep_lis"),
                            map.get("name_index_lis"), map.get("value_lis"),
                            map.get("name_quanti_unit"), map.get("limit_high"),
                            map.get("limit_low"), map.get("desc_rrs"),
                            map.get("value_flag_name"),
                            map.get("create_time"), map.get("lis_trend"));
    
        }
    
        public static Row createEmrdataRow(Map<String, String> map) {
            return RowFactory.create(map.get("code_group"), map.get("code_org"),
                    map.get("code_pati"), map.get("create_time"),
                    map.get("data_source"), map.get("degree"),
                    map.get("edit_time"), map.get("empi"), map.get("flag_del"),
                    map.get("inputfield"), map.get("pcode"), map.get("phrcd"),
                    map.get("pk_dcemr"), map.get("pk_rec_data"), map.get("pname"),
                    map.get("prtype"), map.get("punit"), map.get("pvcode"),
                    map.get("rdn"), map.get("relardn"), map.get("remark"),
                    map.get("setno"), map.get("source_pk"), map.get("stay"),
                    map.get("valuetype"));
        }
    
        public static Row createEmerIndexRow(Map<String, String> map) {
            return RowFactory.create(map.get("pk_dcemr"), map.get("pk_dcpv"),
                    map.get("empi"), map.get("code_pati"), map.get("code_pvtype"),
                    map.get("code_ref_emr"), map.get("code_emr_type"),
                    map.get("name_emr_type"), map.get("prlog_rdn"),
                    map.get("code_dept_emr"), map.get("name_dept_emr"),
                    map.get("code_psn_edit"), map.get("data_source"),
                    map.get("source_pk"), map.get("create_time"));
        }
    
        public static Row createOrdRow(Map<String, String> map) {
            return RowFactory.create(
                    map.get("pvcode"),
                    map.get("pk_dcpv"),
                    map.get("empi"),
                    map.get("code_pvtype"),
                    map.get("name_pvtype"),
                    map.get("code_sex"),
                    map.get("name_sex"),
                    map.get("birthday"),
                    map.get("code_dept"),
                    map.get("name_dept"),
                    map.get("code_ord"),
                    map.get("code_orditem_type"),
                    map.get("name_orditem_type"),
                    map.get("code_orditem"),
                    map.get("name_orditem"),
                    map.get("date_create"),
                    map.get("date_end"),
                    map.get("note_ord"),
                    map.get("code_pres"),
                    map.get("parent_code"),
                    map.get("create_time"));
    
        }
    
        public static Row createOrdRecRow(Map<String, String> map) {
            return RowFactory.create(
                    map.get("pk_ord_record"),
                    map.get("pk_dcord"),
                    map.get("pk_dcpv"),
                    map.get("code_pvtype"),
                    map.get("name_pvtype"),
                    map.get("pvcode"),
                    map.get("code_ord"),
                    map.get("empi"),
                    map.get("code_pati"),
                    map.get("code_sex"),
                    map.get("name_sex"),
                    map.get("age"),
                    map.get("code_dept"),
                    map.get("name_dept"),
                    map.get("bed"),
                    map.get("pk_dcordrisreq"),
                    map.get("code_req"),
                    map.get("code_rep"),
                    map.get("code_rep_type"),
                    map.get("name_rep_type"),
                    map.get("code_eu_type"),
                    map.get("name_eu_type"),
                    map.get("code_eu_item"),
                    map.get("name_eu_item"),
                    map.get("create_time"));
    
        }
    
        private static Integer getCount(String source) {
            String replace = source.replace(".", "");
            return source.length() - replace.length();
        }
    
    
    }
  • 相关阅读:
    vue 组件的封装
    原生tab选项卡
    vue 登录验证码
    input type=”file“ change事件只执行一次的问题
    Java容器解析系列(13) WeakHashMap详解
    Java容器解析系列(12) LinkedHashMap 详解
    Java容器解析系列(11) HashMap 详解
    Java泛型之自限定类型
    java Reference
    Thread类源码解析
  • 原文地址:https://www.cnblogs.com/kwzblog/p/10180436.html
Copyright © 2011-2022 走看看