zoukankan      html  css  js  c++  java
  • spark-streaming集成Kafka处理实时数据

    在这篇文章里,我们模拟了一个场景,实时分析订单数据,统计实时收益。

    场景模拟

    我试图覆盖工程上最为常用的一个场景:

    1)首先,向Kafka里实时的写入订单数据,JSON格式,包含订单ID-订单类型-订单收益

    2)然后,spark-streaming每十秒实时去消费kafka中的订单数据,并以订单类型分组统计收益

    3)最后,spark-streaming统计结果实时的存入本地MySQL。

    前提条件

    安装

    1)spark:我使用的yarn-client模式下的spark,环境中集群客户端已经搞定

    2)zookeeper:我使用的是这个集群:10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181

    3)kafka:我使用的是standalone模式:10.93.21.21:9093

    4)mysql:10.93.84.53:3306

    语言

    python:pykafka,pip install pykafka

    java:spark,spark-streaming

    下面开始

    1、数据写入kafka

    • kafka写入

    我们使用pykafka模拟数据实时写入,代码如下:

    kafka_producer.py

    # -* coding:utf8 *-  
    import time
    import json
    import uuid
    import random
    import threading
    from pykafka import KafkaClient
    
    # 创建kafka实例
    hosts = '10.93.21.21:9093'
    client = KafkaClient(hosts=hosts)
    
    # 打印一下有哪些topic
    print client.topics  
    
    # 创建kafka producer句柄
    topic = client.topics['kafka_spark']
    producer = topic.get_producer()
    
    
    # work
    def work():
        while 1:
            msg = json.dumps({
                "id": str(uuid.uuid4()).replace('-', ''),
                "type": random.randint(1, 5),
                "profit": random.randint(13, 100)})
            producer.produce(msg)
    
    # 多线程执行
    thread_list = [threading.Thread(target=work) for i in range(10)]
    for thread in thread_list:
        thread.setDaemon(True)
        thread.start()
    
    time.sleep(60)
    
    # 关闭句柄, 退出
    producer.stop()

    可以看到,我们写入的形式是一个json,订单id是一个uuid,订单类型type从1-5随机,订单收益profit从13-100随机,形如

    {"id": ${uid}, "type": 1, "profit": 30}

    注意:1)python对kafka的读写不需要借助zookeeper,2)使用多线程的形式写入,让数据量具有一定的规模。

    执行producer,会持续写入数据1分钟。

    python kafka_producer.py
    • 验证一下

    kafka_consumer.py

    # -* coding:utf8 *-
    from pykafka import KafkaClient
    
    hosts = '10.93.21.21:9093'
    client = KafkaClient(hosts=hosts)
    # 消费者
    topic = client.topics['kafka_spark']
    consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1,
                                         consumer_id='test')
    for message in consumer:
        if message is not None:
            print message.offset, message.value

     执行,可以消费kafka刚才写入的数据

    python kafka_consumer.py

    2、spark-streaming

    1)先解决依赖

    其中比较核心的是spark-streaming和kafka集成包spark-streaming-kafka_2.10,还有spark引擎spark-core_2.10

    json和mysql看大家爱好。

    pom.xml

    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.10</artifactId>
                <version>1.6.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>1.6.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.19</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
            </dependency>
            <dependency>
                <groupId>commons-dbcp</groupId>
                <artifactId>commons-dbcp</artifactId>
                <version>1.4</version>
            </dependency>
        </dependencies>

    2)MySQL准备

    • 建表

    我们的结果去向是MySQL,先建立一个结果表。

    id:主键,自增id

    type:订单类型

    profit:每个spark batch聚合出的订单收益结果

    time:时间戳

    CREATE TABLE `order` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `type` int(11) DEFAULT NULL,
      `profit` int(11) DEFAULT NULL,
      `time` mediumtext,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=56 DEFAULT CHARSET=utf8
    • Java客户端

    采用了单例线程池的模式简单写了一下。

    ConnectionPool.java

    package com.xiaoju.dqa.realtime_streaming;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.LinkedList;
    
    
    public class ConnectionPool {
        private static LinkedList<Connection> connectionQueue;
    
        static {
            try {
                Class.forName("com.mysql.jdbc.Driver");
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    
        public synchronized static Connection getConnection() {
            try {
                if (connectionQueue == null) {
                    connectionQueue = new LinkedList<Connection>();
                    for (int i = 0; i < 5; i++) {
                        Connection conn = DriverManager.getConnection(
                                "jdbc:mysql://10.93.84.53:3306/big_data",
                                "root",
                                "1234");
                        connectionQueue.push(conn);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return connectionQueue.poll();
    
        }
        public  static void returnConnection(Connection conn){connectionQueue.push(conn);}
    }

    3)代码实现

    我用java写的,不会用scala很尴尬。

    即时用java整个的处理过程依然比较简单。跟常见的wordcount也没有多大的差别。

    SparkStreaming特点

    spark的特点就是RDD,通过对RDD的操作,来屏蔽分布式运算的复杂度。

    而spark-streaming的操作对象是RDD的时间序列DStream,这个序列的生成是跟batch的选取有关。例如我这里Batch是10s一个,那么每隔10s会产出一个RDD,对RDD的切割和序列的生成,spark-streaming对我们透明了。唯一暴露给我们的DStream和原生RDD的使用方式基本一致。

    这里需要讲解一下MySQL写入注意的事项。

    MySQL写入

    在处理mysql写入时使用了foreachPartition方法,即,在foreachPartition中使用borrow mysql句柄。

    这样做的原因是:

    1)你无法再Driver端创建mysql句柄,并通过序列化的形式发送到worker端

    2)如果你在处理rdd中创建mysql句柄,很容易对每一条数据创建一个句柄,在处理过程中很快内存就会溢出。

    OrderProfitAgg.java

    package com.xiaoju.dqa.realtime_streaming;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import scala.Tuple2;
    
    import java.sql.Connection;
    import java.sql.Statement;
    import java.util.*;
    
    
    /*
    *   生产者可以选用kafka自带的producer脚本
    *   bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test
    * */
    public class OrderProfitAgg {
    
        public static void main(String[] args) throws InterruptedException {
            /*
            *   kafka所注册的zk集群
            * */
            String zkQuorum = "10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181";
    
            /*
            *   spark-streaming消费kafka的topic名称, 多个以逗号分隔
            * */
            String topics = "kafka_spark,kafka_spark2";
    
            /*
            *   消费组 group
            * */
            String group = "bigdata_qa";
    
            /*
            *   消费进程数
            * */
            int numThreads = 2;
    
            /*
            *   选用yarn队列模式, spark-streaming程序的app名称是"order profit"
            * */
            SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("order profit");
    
            /*
            *   创建sc, 全局唯一, 设置logLevel可以打印一些东西到控制台
            * */
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            sc.setLogLevel("WARN");
    
            /*
            *   创建jssc, spark-streaming的batch是每10s划分一个
            * */
            JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
    
            /*
            *   准备topicMap
            * */
            Map<String ,Integer> topicMap = new HashMap<String, Integer>();
            for (String topic : topics.split(",")) {
                topicMap.put(topic, numThreads);
            }
    
            /*
            *   kafka数据流
            * */
            List<JavaPairReceiverInputDStream<String, String>> streams = new ArrayList<JavaPairReceiverInputDStream<String, String>>();
            for (int i = 0; i < numThreads; i++) {
                streams.add(KafkaUtils.createStream(jssc, zkQuorum, group, topicMap));
            }
            /*
            *   从kafka消费数据的RDD
            * */
            JavaPairDStream<String, String> streamsRDD = streams.get(0);
            for (int i = 1; i < streams.size(); i++) {
                streamsRDD = streamsRDD.union(streams.get(i));
            }
    
            /*
            *   kafka消息形如: {"id": ${uuid}, "type": 1, "profit": 35}
            *   统计结果, 以type分组的总收益
            *   mapToPair, 将kafka消费的数据, 转化为type-profit key-value对
            *   reduceByKey, 以type分组, 聚合profit
            * */
            JavaPairDStream<Integer, Integer> profits = streamsRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {
                @Override
                public Tuple2<Integer, Integer> call(Tuple2<String, String> s_tuple2) throws Exception {
                    JSONObject jsonObject = JSON.parseObject(s_tuple2._2);
                    return new Tuple2<Integer, Integer>(jsonObject.getInteger("type"), jsonObject.getInteger("profit"));
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) throws Exception {
                    return i1 + i2;
                }
            });
    
            /*
            *   输出结果到MySQL
            *   需要为每一个partition创建一个MySQL句柄, 使用foreachPartition
            * */
            profits.foreachRDD(new Function<JavaPairRDD<Integer, Integer>, Void>() {
                @Override
                public Void call(JavaPairRDD<Integer, Integer> integerIntegerJavaPairRDD) throws Exception {
    
                    integerIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<Integer, Integer>>>() {
                        @Override
                        public void call(Iterator<Tuple2<Integer, Integer>> tuple2Iterator) throws Exception {
                            Connection connection = ConnectionPool.getConnection();
                            Statement stmt = connection.createStatement();
                            long timestamp = System.currentTimeMillis();
                            while(tuple2Iterator.hasNext()) {
                                Tuple2<Integer, Integer> tuple = tuple2Iterator.next();
                                Integer type = tuple._1;
                                Integer profit = tuple._2;
                                String sql = String.format("insert into `order` (`type`, `profit`, `time`) values (%s, %s, %s)", type, profit, timestamp);
                                stmt.executeUpdate(sql);
                            }
                            ConnectionPool.returnConnection(connection);
                        }
                    });
                    return null;
                }
            });
    
            /*
            *   开始计算, 等待计算结束
            * */
            jssc.start();
            try {
                jssc.awaitTermination();
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                jssc.close();
            }
        }
    }

    4)打包方法

    编写pom.xml build tag。

    mvn clean package打包即可。

    pom.xml

    <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <!--这里要替换成jar包main方法所在类 -->
                                <!--<mainClass>com.bigdata.qa.hotdog.driver.WordCount</mainClass>-->
                                <mainClass>com.xiaoju.dqa.realtime_streaming.OrderProfitAgg</mainClass>
    
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id> <!-- this is used for inheritance merges -->
                            <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>

    3、执行与结果

    1)执行kafka_producer.py

    python kafka_producer.py

    2) 执行spark-streaming

    这里使用的是默认参数提交yarn队列。

    spark-submit --queue=root.XXXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar

    3)查看结果

    到MySQL中查看结果,每隔10秒会聚合出type=1-5的5条数据。

    例如第一条数据,就是type=4这种类型的业务,在10s内收益是555473元。业务量惊人啊。哈哈。

    完结。

  • 相关阅读:
    Neko's loop HDU-6444(网络赛1007)
    Parameters
    SETLOCAL
    RD / RMDIR Command
    devenv 命令用法
    Cannot determine the location of the VS Common Tools folder.
    'DEVENV' is not recognized as an internal or external command,
    How to change Visual Studio default environment setting
    error signing assembly unknown error
    What is the Xcopy Command?:
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7754581.html
Copyright © 2011-2022 走看看