zoukankan      html  css  js  c++  java
  • Spark Streaming DStream的output操作以及foreachRDD详解+与Spark SQL结合统计top3热门商品

    一.DStream的output操作以及foreachRDD详解

    1.output操作概览


    2.output操作

    DStream中的所有计算,都是由output操作触发的,比如print()。如果没有任何output操作,那么,压根儿就不会执行定义的计算逻辑。

    此外,即使你使用了foreachRDD output操作,也必须在里面对RDD执行action操作,才能触发对每一个batch的计算逻辑。否则,光有foreachRDD output操作,在里面没有对RDD执行action操作,也不会触发任何逻辑。


    3.foreachRDD详解
    通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。

    误区一:

    在RDD的foreach操作外部,创建Connection

    这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输。

    dstream.foreachRDD { rdd =>
    val connection = createNewConnection()
    rdd.foreach { record => connection.send(record)
    }
    }

    误区二:

    在RDD的foreach操作内部,创建Connection

    这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。

    dstream.foreachRDD { rdd =>
    rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
    }
    }

    合理方式一:

    使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象,这样就相当于是,为RDD的每个partition创建一个Connection对象,节省资源的多了。

    dstream.foreachRDD { rdd =>
    rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
    }
    }

    合理方式二:

    自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。这样的话,甚至在多个RDD的partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。

    dstream.foreachRDD { rdd =>
    rdd.foreachPartition { partitionOfRecords =>
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)
    }
    }


    4.foreachRDD实战

    案例:改写UpdateStateByKeyWordCount,将每次统计出来的全局的单词计数,写入一份,到MySQL数据库中。

    建表语句
    create table wordcount (
    id integer auto_increment primary key,
    updated_time timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
    word varchar(255),
    count integer
    );

    代码如下

    package com.hzk.sparkStreaming;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.LinkedList;
    
    /**
     * 简易版的连接池
     * @author Administrator
     *
     */
    public class ConnectionPool {
        // 静态的Connection队列
        private static LinkedList<Connection> connectionQueue;
    
        /**
         * 加载驱动
         */
        static {
            try {
                Class.forName("com.mysql.jdbc.Driver");
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 获取连接,多线程访问并发控制
         * @return
         */
        public synchronized static Connection getConnection() {
            try {
                if(connectionQueue == null) {
                    connectionQueue = new LinkedList<Connection>();
                    for(int i = 0; i < 10; i++) {
                        Connection conn = DriverManager.getConnection(
                                "jdbc:mysql://hadoop-001:3306/baidu",
                                "",
                                "");
                        connectionQueue.push(conn);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return connectionQueue.poll();
        }
    
        /**
         * 还回去一个连接
         */
        public static void returnConnection(Connection conn) {
            connectionQueue.push(conn);
        }
    }
    package com.hzk.sparkStreaming;
    
    import java.sql.Connection;
    import java.sql.Statement;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    
    
    import scala.Tuple2;
    
    /**
     * 基于持久化机制的实时wordcount程序
     * @author Administrator
     *
     */
    public class PersistWordCount {
        public static void main(String[] args) throws InterruptedException {
            SparkConf conf = new SparkConf()
                    .setMaster("local[2]")
                    .setAppName("PersistWordCount");
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
    
            jssc.checkpoint("hdfs://hadoop-001:9000/wordcount_checkpoint");
    
            JavaReceiverInputDStream<String> lines = jssc.socketTextStream("hadoop-001", 9999);
    
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" ")).iterator();
                }
    
            });
    
            JavaPairDStream<String, Integer> pairs = words.mapToPair(
    
                    new PairFunction<String, String, Integer>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(String word)
                                throws Exception {
                            return new Tuple2<String, Integer>(word, 1);
                        }
    
                    });
    
            JavaPairDStream<String, Integer> wordCounts = pairs.updateStateByKey(
    
                    new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Optional<Integer> call(List<Integer> values,
                                                      Optional<Integer> state) throws Exception {
                            Integer newValue = 0;
    
                            if(state.isPresent()) {
                                newValue = state.get();
                            }
    
                            for(Integer value : values) {
                                newValue += value;
                            }
    
                            return Optional.of(newValue);
                        }
    
                    });
    
            // 每次得到当前所有单词的统计次数之后,将其写入mysql存储,进行持久化,以便于后续的J2EE应用程序
            // 进行显示
            wordCounts.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
                      @Override
                      public void call(JavaPairRDD<String, Integer> wordCountsRDD) throws Exception {
                          // 调用RDD的foreachPartition()方法
                          wordCountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {
    
                              @Override
                              public void call(Iterator<Tuple2<String, Integer>> wordCounts) throws Exception {
                                  // 给每个partition,获取一个连接
                                  Connection conn = ConnectionPool.getConnection();
    
                                  // 遍历partition中的数据,使用一个连接,插入数据库
                                  Tuple2<String, Integer> wordCount = null;
                                  while (wordCounts.hasNext()) {
                                      wordCount = wordCounts.next();
    
                                      String sql = "insert into wordcount(word,count) "
                                              + "values('" + wordCount._1 + "'," + wordCount._2 + ")";
    
                                      Statement stmt = conn.createStatement();
                                      stmt.executeUpdate(sql);
                                  }
    
                                  // 用完以后,将连接还回去
                                  ConnectionPool.returnConnection(conn);
                              }
                          });
                      }
                  });
            jssc.start();
            jssc.awaitTermination();
            jssc.close();
        }
    }

    二.与Spark SQL结合使用之top3热门商品实时统计案例实战

    Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

    案例:每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3热门的商品。

     代码如下

    package com.hzk.sparkStreaming;
    
    import org.apache.avro.generic.GenericData;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.hive.HiveContext;
    import org.apache.spark.sql.types.DataType;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.bouncycastle.util.Strings;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class Top3HotProduct {
        public static void main(String[] args) throws InterruptedException {
            SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("Top3HotProduct");
            JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(5));
            // 首先看一下,输入日志的格式
            // leo iphone mobile_phone
    
            // 首先,获取输入数据流
            // 这里顺带提一句,之前没有讲过,就是说,我们的Spark Streaming的案例为什么都是基于socket的呢?
            // 因为方便啊。。。
            // 其实,企业里面,真正最常用的,都是基于Kafka这种数据源
            // 但是我觉得我们的练习,用socket也无妨,比较方便,而且一点也不影响学习
            // 因为不同的输入来源的,不同之处,只是在创建输入DStream的那一点点代码
            // 所以,核心是在于之后的Spark Streaming的实时计算
            // 所以只要我们掌握了各个案例和功能的使用
            // 在企业里,切换到Kafka,易如反掌把,因为我们之前都详细讲过,而且实验过,实战编码过,将Kafka作为
            // 数据源的两种方式了
            JavaReceiverInputDStream<String> productClickLogsDStream=jssc.socketTextStream("hadoop-001",9999);
    
            // 然后,应该是做一个映射,将每个种类的每个商品,映射为(category_product, 1)的这种格式
            // 从而在后面可以使用window操作,对窗口中的这种格式的数据,进行reduceByKey操作
            // 从而统计出来,一个窗口中的每个种类的每个商品的,点击次数
            JavaPairDStream<String,Integer> categoryProductPairsDStream=productClickLogsDStream.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    String[] productClickLogSplites=s.split(" ");
                    return new Tuple2<>(productClickLogSplites[2]+"_"+productClickLogSplites[1],1);
                }
            });
            // 然后执行window操作
            // 到这里,就可以做到,每隔10秒钟,对最近60秒的数据,执行reduceByKey操作
            // 计算出来这60秒内,每个种类的每个商品的点击次数
            JavaPairDStream<String,Integer> categoryProductCountsDStream= categoryProductPairsDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer+integer2;
                }
            },Durations.seconds(60),Durations.seconds(10));
            // 然后针对60秒内的每个种类的每个商品的点击次数
            // foreachRDD,在内部,使用Spark SQL执行top3热门商品的统计
            categoryProductCountsDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
    
                @Override
                public void call(JavaPairRDD<String, Integer> categoryProductCountsRDD) throws Exception {
    
                    // 将该RDD,转换为JavaRDD<Row>的格式
                    JavaRDD<Row> categoryProductCountRowRDD = categoryProductCountsRDD.map(
    
                            new Function<Tuple2<String,Integer>, Row>() {
    
                                private static final long serialVersionUID = 1L;
    
                                @Override
                                public Row call(Tuple2<String, Integer> categoryProductCount)
                                        throws Exception {
                                    String category = categoryProductCount._1.split("_")[0];
                                    String product = categoryProductCount._1.split("_")[1];
                                    Integer count = categoryProductCount._2;
                                    return RowFactory.create(category, product, count);
                                }
    
                            });
    
                    // 然后,执行DataFrame转换
                    List<StructField> structFields = new ArrayList<StructField>();
                    structFields.add(DataTypes.createStructField("category", DataTypes.StringType, true));
                    structFields.add(DataTypes.createStructField("product", DataTypes.StringType, true));
                    structFields.add(DataTypes.createStructField("click_count", DataTypes.IntegerType, true));
                    StructType structType = DataTypes.createStructType(structFields);
    
                    HiveContext hiveContext = new HiveContext(categoryProductCountsRDD.context());
    
                    Dataset categoryProductCountDF = hiveContext.createDataFrame(
                            categoryProductCountRowRDD, structType);
    
                    // 将60秒内的每个种类的每个商品的点击次数的数据,注册为一个临时表
                    categoryProductCountDF.registerTempTable("product_click_log");
    
                    // 执行SQL语句,针对临时表,统计出来每个种类下,点击次数排名前3的热门商品
                    Dataset top3ProductDF = hiveContext.sql(
                            "SELECT category,product,click_count "
                                    + "FROM ("
                                    + "SELECT "
                                    + "category,"
                                    + "product,"
                                    + "click_count,"
                                    + "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "
                                    + "FROM product_click_log"
                                    + ") tmp "
                                    + "WHERE rank<=3");
    
                    // 这里说明一下,其实在企业场景中,可以不是打印的
                    // 案例说,应该将数据保存到redis缓存、或者是mysql db中
                    // 然后,应该配合一个J2EE系统,进行数据的展示和查询、图形报表
    
                    top3ProductDF.show();
                }
            });
    
    
            jssc.start();
            jssc.awaitTermination();
            jssc.close();
        }
    }
  • 相关阅读:
    Java RunTime Environment (JRE) or Java Development Kit (JDK) must be available in order to run Eclipse. ......
    UVA 1597 Searching the Web
    UVA 1596 Bug Hunt
    UVA 230 Borrowers
    UVA 221 Urban Elevations
    UVA 814 The Letter Carrier's Rounds
    UVA 207 PGA Tour Prize Money
    UVA 1592 Database
    UVA 540 Team Queue
    UVA 12096 The SetStack Computer
  • 原文地址:https://www.cnblogs.com/Transkai/p/11385723.html
Copyright © 2011-2022 走看看