zoukankan      html  css  js  c++  java
  • kafka+spark(待整理)

    
    
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    * spark streaming 整合 kafka
    */
    object KafkaDirectStream {

    def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
    val streamingContext = new StreamingContext(sparkConf, Seconds(5)) //5秒一个批次

    val kafkaParams = Map[String, Object](
    /*
    * 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。
    * 不过建议至少提供两个broker的信息作为容错。
    */
    "bootstrap.servers" -> "test:9091,test:9092,test:9093",
    /*键的序列化器*/
    "key.deserializer" -> classOf[StringDeserializer],
    /*值的序列化器*/
    "value.deserializer" -> classOf[StringDeserializer],
    /*消费者所在分组的ID*/
    "group.id" -> "spark-streaming-group",
    /*
    * 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
    * latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
    * earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
    */
    "auto.offset.reset" -> "latest",
    /*是否自动提交*/
    "enable.auto.commit" -> (true: java.lang.Boolean)
    )

    /*可以同时订阅多个主题*/
    val topics = Array("abc")
    val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    /*位置策略*/
    PreferConsistent,
    /*订阅主题*/
    Subscribe[String, String](topics, kafkaParams)
    )

                       //stream.saveAsTextFiles("D:\wps\a")           写入本地文件
                //stream.saveAsTextFiles("hdfs://192.168.0.118:9000/root/eight9/") 写入 hdfs

    /*打印输入流*/
    stream.map(record => (record.key, record.value)).print()

    streamingContext.start()
    streamingContext.awaitTermination()
    }
    }
     
     

     下面才是正规的   pom.xml  和 自己集群版本一致

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.produce</groupId>
    <artifactId>produce</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

    <dependency>
    <groupId>com.thoughtworks.paranamer</groupId>
    <artifactId>paranamer</artifactId>
    <version>2.8</version>
    </dependency>
    <dependency> <!-- Spark -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.0</version>

    </dependency>
    <dependency> <!-- Spark Streaming -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.0</version>

    </dependency>

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.4.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->

    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>3.8.1</version>
    <scope>test</scope>
    </dependency>
    </dependencies>
    </project>

    然后  kafka 部分   (版本为 scala 2.2.* )  kafka 2.2  (2.1 不支持指定broker)

    创建生产者
    bin/kafka-console-producer.sh --broker-list test:9091,test:9092,test:9093 --topic abc
    
    
    
    
    查看topic list
     bin/kafka-topics.sh --list --bootstrap-server test:9091,test:9092,test:9093     
    
    
    
    创建 topic
    bin/kafka-console-producer.sh --broker-list test:9091,test:9092,test:9093  --topic abc
    如果遇到程序代码  卡死在  kafka commit 的情况下,意味着 kafka找不到 broker地址,真他妈煞笔,需要将虚拟机  的  ip  hostname  填写在  
    C:WindowsSystem32driversetc
    上面
    
    kafka就可以找到相应 broker的 topic

    集群方式

    代码
    
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * spark streaming 整合 kafka
      */
    object KafkaDirectStream {
    
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("spark://test:7077")
        val streamingContext = new StreamingContext(sparkConf, Seconds(5))             //5秒一个批次
    
        val kafkaParams = Map[String, Object](
          /*
           * 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。
           * 不过建议至少提供两个broker的信息作为容错。
           */
          "bootstrap.servers" -> "test:9091,test:9092,test:9093",
          /*键的序列化器*/
          "key.deserializer" -> classOf[StringDeserializer],
          /*值的序列化器*/
          "value.deserializer" -> classOf[StringDeserializer],
          /*消费者所在分组的ID*/
          "group.id" -> "spark-streaming-group",
          /*
           * 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
           * latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
           * earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
           */
          "auto.offset.reset" -> "latest",
          /*是否自动提交*/
          "enable.auto.commit" -> (true: java.lang.Boolean)
        )
    
        /*可以同时订阅多个主题*/
        val topics = Array("abc")
        val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          /*位置策略*/
          PreferConsistent,
          /*订阅主题*/
          Subscribe[String, String](topics, kafkaParams)
        )
    
    //stream.saveAsTextFiles("hdfs://192.168.0.118:9000/root/hello/")      //写入 hdfs
    /*打印输入流*/ stream.map(record => (record.key, record.value)).print() streamingContext.start() streamingContext.awaitTermination() } }
    maven依赖
    
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.produce</groupId>
        <artifactId>produce</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
    
            <dependency>
                <groupId>com.thoughtworks.paranamer</groupId>
                <artifactId>paranamer</artifactId>
                <version>2.8</version>
            </dependency>
            <dependency> <!-- Spark -->
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.4.0</version>
    
            </dependency>
            <dependency> <!-- Spark Streaming -->
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.4.0</version>
    
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.4.0</version>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    </project>
    集群提交方式
    
    ./bin/spark-submit --class KafkaDirectStream --num-executors 4 --driver-memory 1G --executor-memory 1g --executor-cores 1 --conf spark.default.parallelism=1000 produce.jar

    组件版本

    JDK    8
    scala  2.11.*
    kafka  kafka_2.11-2.2.0
    spark  spark-2.4.0-bin-hadoop2.7
    
    版本不对  会报 method not found  (版本信息)
    RUSH B
  • 相关阅读:
    硝烟中的Scrum和XP-我们如何实施Scrum 12)发布计划 13)组合XP
    php array key 的存储规则
    IE下单选按钮隐藏后点击对应label无法选中的bug解决
    使用QML自绘页面导航条
    unity 获取物体尺寸
    输出众数,输出超限问题
    HTTP请求具体解释
    app 设计原则 ,步骤
    Distinct Subsequences
    OSX: 禁止iCloud钥匙链?
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11204247.html
Copyright © 2011-2022 走看看