本测试使用本地文件系统代替HDFS,如需测试HDFS监控请解开注释换成自己的HDFS集群
sparkstreaming只会监控该文件夹下新增的文件,并不会监控到原文件的删除和修改
SparkStreaming On HDFS
代码 1:监控文件夹下新增加的数据并打印到控制台上
/** * * Spark standalone or Mesos with cluster deploy mode only: * 在提交application的时候 添加 --supervise 选项 如果Driver挂掉 会自动启动一个Driver * */ public class SparkStreamingOnHDFS { public static void main(String[] args) { final SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkStreamingOnHDFS"); // final String checkpointDirectory = "hdfs://node1:9000/spark/SparkStreaming/CheckPoint2017"; final String checkpointDirectory = "./checkpoint"; JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return createContext(checkpointDirectory,conf); } }; /** * 获取JavaStreamingContext 先去指定的checkpoint目录中去恢复JavaStreamingContext * 如果恢复不到,通过factory创建 */ JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); jsc.start(); jsc.awaitTermination(); jsc.close(); } // @SuppressWarnings("deprecation") private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) { // If you do not see this printed, that means the StreamingContext has // been loaded // from the new checkpoint System.out.println("Creating new context"); SparkConf sparkConf = conf; // Create the context with a 1 second batch size JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5)); // ssc.sparkContext().setLogLevel("WARN"); /** * checkpoint 保存: * 1.配置信息 * 2.DStream操作逻辑 * 3.job的执行进度 * 4.offset */ ssc.checkpoint(checkpointDirectory); /** * 监控的是HDFS上的一个目录,监控文件数量的变化 文件内容如果追加监控不到。 * 只监控文件夹下新增的文件,减少的文件时监控不到的,文件的内容有改动也监控不到。 */ // JavaDStream<String> lines = ssc.textFileStream("hdfs://node1:9000/spark/sparkstreaming"); JavaDStream<String> lines = ssc.textFileStream("./data"); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } }); JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s.trim(), 1); } }); JavaPairDStream<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); counts.print(); // counts.filter(new Function<Tuple2<String,Integer>, Boolean>() { // // /** // * // */ // private static final long serialVersionUID = 1L; // // @Override // public Boolean call(Tuple2<String, Integer> v1) throws Exception { // System.out.println("*************************"); // return true; // } // }).print(); return ssc; } }
代码 2:该代码源源不断向目标文件夹中写入数据
/** * 此复制文件的程序是模拟在data目录下动态生成相同格式的txt文件,用于给sparkstreaming 中 textFileStream提供输入流。 * @author root * */ public class CopyFile_data { public static void main(String[] args) throws IOException, InterruptedException { while(true){ Thread.sleep(5000); String uuid = UUID.randomUUID().toString(); System.out.println(uuid); copyFile(new File("words.txt"),new File(".\data\"+uuid+"----words.txt")); } } public static void copyFile(File fromFile, File toFile) throws IOException { FileInputStream ins = new FileInputStream(fromFile); FileOutputStream out = new FileOutputStream(toFile); byte[] b = new byte[1024*1024]; @SuppressWarnings("unused") int n = 0; while ((n = ins.read(b)) != -1) { out.write(b, 0, b.length); } ins.close(); out.close(); } }
SparkStreaming On HDFS AND TO MySQL
代码 1:监控文件夹数据并写入到MySQL中
public class SparkStreamingOnHDFSToMySQL { public static void main(String[] args) { final SparkConf conf = new SparkConf().setMaster("local[1]").setAppName("SparkStreamingOnHDFS"); // JavaStreamingContext jsc = new JavaStreamingContext(conf, // Durations.seconds(5)); final String checkpointDirectory = "hdfs://node1:9000/spark/SparkStreaming/CheckPoint2017"; JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return createContext(checkpointDirectory,conf); } }; JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); jsc.start(); jsc.awaitTermination(); jsc.close(); } private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) { // If you do not see this printed, that means the StreamingContext has // been loaded // from the new checkpoint System.out.println("Creating new context"); SparkConf sparkConf = conf; // Create the context with a 1 second batch size JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5)); ssc.checkpoint(checkpointDirectory); JavaDStream<String> lines = ssc.textFileStream("hdfs://node1:9000/output/"); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } }); JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); counts.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(JavaPairRDD<String, Integer> pairRdd) throws Exception { pairRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Iterator<Tuple2<String, Integer>> vs) throws Exception { JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance(); List<Object[]> insertParams = new ArrayList<Object[]>(); while(vs.hasNext()){ Tuple2<String, Integer> next = vs.next(); insertParams.add(new Object[]{next._1,next._2}); } System.out.println(insertParams); jdbcWrapper.doBatch("INSERT INTO wordcount VALUES(?,?)", insertParams); } }); } }); counts.print(); return ssc; } }
代码 2:向目标文件夹中写入数据
同上。
代码 3:JDBC
class JDBCWrapper implements Serializable { /** * */ private static final long serialVersionUID = 1L; public static void main(String[] args) { JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance(); Object[] obj = new Object[]{"1","1","1","1"}; List<Object[]> list = new ArrayList<Object[]>(); list.add(obj); jdbcWrapper.doBatch("INSERT INTO result VALUES(?,?,?,?)", list); } private static JDBCWrapper jdbcInstance = null; private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>(); static { try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static JDBCWrapper getJDBCInstance() { if (jdbcInstance == null) { synchronized (JDBCWrapper.class) { if (jdbcInstance == null) { jdbcInstance = new JDBCWrapper(); } } } return jdbcInstance; } private JDBCWrapper() { for (int i = 0; i < 10; i++) { try { Connection conn = DriverManager.getConnection("jdbc:mysql://node2:3306/sparkstreaming", "root", "123456"); dbConnectionPool.put(conn); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public synchronized Connection getConnection() { while (0 == dbConnectionPool.size()) { try { Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return dbConnectionPool.poll(); } public int[] doBatch(String sqlText, List<Object[]> paramsList) { Connection conn = getConnection(); PreparedStatement preparedStatement = null; int[] result = null; try { conn.setAutoCommit(false); preparedStatement = conn.prepareStatement(sqlText); for (Object[] parameters : paramsList) { for (int i = 0; i < parameters.length; i++) { preparedStatement.setObject(i + 1, parameters[i]); } preparedStatement.addBatch(); } result = preparedStatement.executeBatch(); conn.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (conn != null) { try { dbConnectionPool.put(conn); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } return result; } public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callBack) { Connection conn = getConnection(); PreparedStatement preparedStatement = null; ResultSet result = null; try { preparedStatement = conn.prepareStatement(sqlText); if(paramsList!=null){ for (int i = 0; i < paramsList.length; i++) { preparedStatement.setObject(i + 1, paramsList[i]); } } result = preparedStatement.executeQuery(); callBack.resultCallBack(result); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (conn != null) { try { dbConnectionPool.put(conn); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } interface ExecuteCallBack { void resultCallBack(ResultSet result) throws Exception; }