zoukankan      html  css  js  c++  java
  • Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

    1、创建Maven项目

    创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374

    2、启动Kafka

    A:安装kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874 
    B:创建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874

    3、编写Pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>cn.toto.spark</groupId>
    <artifactId>bigdata</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.10.6</scala.version>
        <spark.version>1.6.2</spark.version>
        <hadoop.version>2.6.4</hadoop.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-make:transitive</arg>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
    
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
    
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.toto.spark.FlumeStreamingWordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    </project>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166

    4.编写代码

    package cn.toto.spark
    
    import cn.toto.spark.streams.LoggerLevels
    import org.apache.spark.{HashPartitioner, SparkConf}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by toto on 2017/7/13.
      * 从kafka中读数据,并且进行单词数量的计算
      */
    object KafkaWordCount {
    
      /**
        * String         :单词
        * Seq[Int]       :单词在当前批次出现的次数
        * Option[Int]    :历史结果
        */
      val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
        //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
        iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }
      }
    
      def main(args: Array[String]): Unit = {
        LoggerLevels.setStreamingLogLevels()
        //这里的args从IDEA中传入,在Program arguments中填写如下内容:
        //参数用一个数组来接收:
        //zkQuorum     :zookeeper集群的
        //group        :组
        //topic        :kafka的组
        //numThreads   :线程数量
        //hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1   要注意的是要创建line这个topic
        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf,Seconds(5))
        ssc.checkpoint("E:\wordcount\outcheckpoint")
        //"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"
        //"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        //保存到内存和磁盘,并且进行序列化
        val data: ReceiverInputDStream[(String, String)] =
              KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
        //从kafka中写数据其实也是(key,value)形式的,这里的_._2就是value
        val words = data.map(_._2).flatMap(_.split(" "))
        val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,
              new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    5.配置IDEA中运行的参数:

    这里写图片描述 
    配置说明:http://www.woaipu.com/shops/zuzhuan/61406

    hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1
    hadoop11:2181,hadoop12:2181,hadoop13:2181       :zookeeper集群地址
    g1                                              :组
    wordcount                                       :kafka的topic
    1                                               :线程数为1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 1
    • 2
    • 3
    • 4
    • 5

    6、创建kafka,并在kafka中传递参数

    启动kafka

    [root@hadoop1 kafka]# pwd
    /home/tuzq/software/kafka/servers/kafka
    [root@hadoop1 kafka]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
    • 1
    • 2
    • 3
    • 1
    • 2
    • 3

    创建topic

    [root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 --partitions 1 --topic wordcount
    Created topic "wordcount".
    • 1
    • 2
    • 1
    • 2

    查看主题

    bin/kafka-topics.sh --list --zookeeper hadoop11:2181
    • 1
    • 1

    启动一个生产者发送消息(我的kafka在hadoop1,hadoop2,hadoop3这几台机器上)

    [root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordcount
    No safe wading in an unknown water
    Anger begins with folly,and ends in repentance
    No safe wading in an unknown water
    Anger begins with folly,and ends in repentance
    Anger begins with folly,and ends in repentance
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    使用Spark-submit来运行程序

    #启动spark-streaming应用程序
    bin/spark-submit --class cn.toto.spark.KafkaWordCount /root/streaming-1.0.jar hadoop11:2181 group1 wordcount 1
    • 1
    • 2
    • 1
    • 2

    http://www.woaipu.com/shops/zuzhuan/61406

    7、查看运行结果

    这里写图片描述


    8、再如统计URL出现的次数

    package cn.toto.spark
    
    import org.apache.spark.{HashPartitioner, SparkConf}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by toto on 2017/7/14.
      */
    object UrlCount {
      val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
        iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
      }
    http://www.woaipu.com/shops/zuzhuan/61406
      def main(args: Array[String]) {
        //接收命令行中的参数
        val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
        //创建SparkConf并设置AppName
        val conf = new SparkConf().setAppName("UrlCount")
        //创建StreamingContext
        val ssc = new StreamingContext(conf, Seconds(2))
        //设置检查点
        ssc.checkpoint(hdfs)
        //设置topic信息
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        //重Kafka中拉取数据创建DStream
        val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
        //切分数据,截取用户点击的url
        val urls = lines.map(x=>(x.split(" ")(6), 1))
        //统计URL点击量
        val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
        //将结果打印到控制台
        result.print()
        ssc.start()
        ssc.awaitTermination()
    http://www.woaipu.com/shops/zuzhuan/61406
  • 相关阅读:
    C语言中的排序算法--冒泡排序,选择排序,希尔排序
    常见算法:C语言求最小公倍数和最大公约数三种算法
    提高软件测试效率的方法探讨
    面试官询问的刁钻问题——以及如何巧妙地应付它们
    软件测试面试--如何测试网页的登录页面
    如何衡量测试效率,如何提高测试效率!
    利用交叉测试提升软件测试效率
    交叉测试的必要性和遇到的问题
    敏捷测试
    HttpWatch工具简介及使用技巧
  • 原文地址:https://www.cnblogs.com/sy646et/p/7197664.html
Copyright © 2011-2022 走看看