zoukankan      html  css  js  c++  java
  • spark记录(16)SparkStreaming On HDFS AND TO MySQL

    本测试使用本地文件系统代替HDFS,如需测试HDFS监控请解开注释换成自己的HDFS集群

    sparkstreaming只会监控该文件夹下新增的文件,并不会监控到原文件的删除和修改

    SparkStreaming On HDFS

    代码 1:监控文件夹下新增加的数据并打印到控制台上

    /**
     *
     *  Spark standalone or Mesos with cluster deploy mode only:
     *  在提交application的时候  添加 --supervise 选项  如果Driver挂掉 会自动启动一个Driver
     *
     */
    public class SparkStreamingOnHDFS {
        public static void main(String[] args) {
            final SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkStreamingOnHDFS");
            
    //        final String checkpointDirectory = "hdfs://node1:9000/spark/SparkStreaming/CheckPoint2017";
            final String checkpointDirectory = "./checkpoint";
            
            JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
                @Override
                public JavaStreamingContext create() {  
                    return createContext(checkpointDirectory,conf);
                }
            };
            /**
             * 获取JavaStreamingContext 先去指定的checkpoint目录中去恢复JavaStreamingContext
             * 如果恢复不到,通过factory创建
             */
            JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
            jsc.start();
            jsc.awaitTermination();
            jsc.close();
        }
    
    //    @SuppressWarnings("deprecation")
        private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) {
    
            // If you do not see this printed, that means the StreamingContext has
            // been loaded
            // from the new checkpoint
            System.out.println("Creating new context");
            SparkConf sparkConf = conf;
            // Create the context with a 1 second batch size
    
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
    //        ssc.sparkContext().setLogLevel("WARN");
            /**
             *  checkpoint 保存:
             *        1.配置信息
             *        2.DStream操作逻辑
             *        3.job的执行进度
             *      4.offset
             */
            ssc.checkpoint(checkpointDirectory);
            
            /**
             * 监控的是HDFS上的一个目录,监控文件数量的变化     文件内容如果追加监控不到。
             * 只监控文件夹下新增的文件,减少的文件时监控不到的,文件的内容有改动也监控不到。
             */
    //        JavaDStream<String> lines = ssc.textFileStream("hdfs://node1:9000/spark/sparkstreaming");
            JavaDStream<String> lines = ssc.textFileStream("./data");
             
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String s) {
                    return Arrays.asList(s.split(" "));
                }
            });
            
    
            JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<String, Integer>(s.trim(), 1);
                }
            });
    
            JavaPairDStream<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });
            
            counts.print();
    //        counts.filter(new Function<Tuple2<String,Integer>, Boolean>() {
    //
    //            /**
    //             * 
    //             */
    //            private static final long serialVersionUID = 1L;
    //
    //            @Override
    //            public Boolean call(Tuple2<String, Integer> v1) throws Exception {
    //                System.out.println("*************************");
    //                return true;
    //            }
    //        }).print();
            return ssc;
        }
    }

    代码 2:该代码源源不断向目标文件夹中写入数据

    /**
     * 此复制文件的程序是模拟在data目录下动态生成相同格式的txt文件,用于给sparkstreaming 中 textFileStream提供输入流。
     * @author root
     *
     */
    public class CopyFile_data {
        public static void main(String[] args) throws IOException, InterruptedException {
            while(true){
                Thread.sleep(5000);
                String uuid = UUID.randomUUID().toString();
                System.out.println(uuid);
                copyFile(new File("words.txt"),new File(".\data\"+uuid+"----words.txt"));
            }
        }
    
        public static void copyFile(File fromFile, File toFile) throws IOException {
            FileInputStream ins = new FileInputStream(fromFile);
            FileOutputStream out = new FileOutputStream(toFile);
            byte[] b = new byte[1024*1024];
            @SuppressWarnings("unused")
            int n = 0;
            while ((n = ins.read(b)) != -1) {
                out.write(b, 0, b.length);
            }
    
            ins.close();
            out.close();
        }
    }

    SparkStreaming On HDFS AND TO MySQL

    代码 1:监控文件夹数据并写入到MySQL中

    public class SparkStreamingOnHDFSToMySQL {
        public static void main(String[] args) {
         
            final SparkConf conf = new SparkConf().setMaster("local[1]").setAppName("SparkStreamingOnHDFS");
            // JavaStreamingContext jsc = new JavaStreamingContext(conf,
            // Durations.seconds(5));
            final String checkpointDirectory = "hdfs://node1:9000/spark/SparkStreaming/CheckPoint2017";
            JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
                @Override
                public JavaStreamingContext create() {
                    return createContext(checkpointDirectory,conf);
                }
            };
             
            JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
            jsc.start();
            jsc.awaitTermination();
            jsc.close();
        }
    
        private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) {
    
            // If you do not see this printed, that means the StreamingContext has
            // been loaded
            // from the new checkpoint
            System.out.println("Creating new context");
            SparkConf sparkConf = conf;
            // Create the context with a 1 second batch size
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
            ssc.checkpoint(checkpointDirectory);
            
            JavaDStream<String> lines = ssc.textFileStream("hdfs://node1:9000/output/");
             
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String s) {
                    return Arrays.asList(s.split(" "));
                }
            });
    
            JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            JavaPairDStream<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });
            
            counts.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
                
                private static final long serialVersionUID = 1L;
                
                @Override
                public void call(JavaPairRDD<String, Integer> pairRdd) throws Exception {
                    
                    pairRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>() {
                         
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public void call(Iterator<Tuple2<String, Integer>> vs) throws Exception {
                            
                            JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
                        
                            List<Object[]> insertParams = new ArrayList<Object[]>();
                            while(vs.hasNext()){
                                Tuple2<String, Integer> next = vs.next();
                                insertParams.add(new Object[]{next._1,next._2});
                            }
                            System.out.println(insertParams);
                            jdbcWrapper.doBatch("INSERT INTO wordcount VALUES(?,?)", insertParams);
                        }
                    });
                    
                }
            });
            counts.print();
            return ssc;
        }
    
    }

    代码 2:向目标文件夹中写入数据

    同上。

    代码 3:JDBC

    class JDBCWrapper implements Serializable { 
        
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        public static void main(String[] args) {
            JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
            Object[] obj = new Object[]{"1","1","1","1"};
            List<Object[]> list = new ArrayList<Object[]>();
            list.add(obj);
            jdbcWrapper.doBatch("INSERT INTO result VALUES(?,?,?,?)", list);
        }
    
        private static JDBCWrapper jdbcInstance = null;
        private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>();
    
        static {
            try {
                Class.forName("com.mysql.jdbc.Driver");
            } catch (ClassNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        public static JDBCWrapper getJDBCInstance() {
            if (jdbcInstance == null) {
    
                synchronized (JDBCWrapper.class) {
                    if (jdbcInstance == null) {
                        jdbcInstance = new JDBCWrapper();
                    }
                }
            }
    
            return jdbcInstance;
        }
     
        private JDBCWrapper() {
    
            for (int i = 0; i < 10; i++) {
    
                try {
                    Connection conn = DriverManager.getConnection("jdbc:mysql://node2:3306/sparkstreaming", "root",
                            "123456");
                    dbConnectionPool.put(conn);
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
    
            }
    
        }
        
        public synchronized Connection getConnection() {
            while (0 == dbConnectionPool.size()) {
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
     
            return dbConnectionPool.poll();
        }
    
        public int[] doBatch(String sqlText, List<Object[]> paramsList) {
            Connection conn = getConnection();
            PreparedStatement preparedStatement = null;
            int[] result = null;
            try {
                conn.setAutoCommit(false);
                preparedStatement = conn.prepareStatement(sqlText);
    
                for (Object[] parameters : paramsList) {
                    for (int i = 0; i < parameters.length; i++) {
                        preparedStatement.setObject(i + 1, parameters[i]);
                    }
    
                    preparedStatement.addBatch();
                }
    
                result = preparedStatement.executeBatch();
    
                conn.commit();
    
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
    
                if (conn != null) {
                    try {
                        dbConnectionPool.put(conn);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
    
            return result;
        }
    
        public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callBack) {
    
            Connection conn = getConnection();
            PreparedStatement preparedStatement = null;
            ResultSet result = null;
            try {
    
                preparedStatement = conn.prepareStatement(sqlText);
    
                if(paramsList!=null){
                    for (int i = 0; i < paramsList.length; i++) {
                        preparedStatement.setObject(i + 1, paramsList[i]);
                    }
                }
    
                result = preparedStatement.executeQuery();
    
                callBack.resultCallBack(result);
    
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
    
                if (conn != null) {
                    try {
                        dbConnectionPool.put(conn);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }
    
    interface ExecuteCallBack {
        void resultCallBack(ResultSet result) throws Exception;
    }
  • 相关阅读:
    Row not found or changed. Linq 找不到行或行已更改
    A Session Like ViewState
    WatiN Test Recorder 录制操作的工具
    How to render the  "&nbsp;" in dropdownlist
    My validator 0.1 不支持 ajax 环境
    IDEAL (银行支付接口)如何搞定证书
    Hibernate generator小结
    java 过滤文件
    XML解析
    spark+openfire插件开发(RTX类似的组织架构)
  • 原文地址:https://www.cnblogs.com/kpsmile/p/10484204.html
Copyright © 2011-2022 走看看