zoukankan      html  css  js  c++  java
  • demo1 spark streaming 接收 kafka 数据java代码WordCount示例

    1. 首先启动zookeeper

    windows上的安装见zk 02之 Windows安装和使用zookeeper

    启动后见:

    2. 启动kafka

    windows的安装kafka见Windows上搭建Kafka运行环境,启动后如下图:

    3. 核心代码

    生产者生产消息的java代码,生成要统计的单词

    package com.sf.omcstest;
    
    import java.util.Properties; 
       
    import kafka.javaapi.producer.Producer; 
    import kafka.producer.KeyedMessage; 
    import kafka.producer.ProducerConfig; 
       
    public class MyProducer {   
         
            public static void main(String[] args) {   
                Properties props = new Properties();   
                props.setProperty("metadata.broker.list","localhost:9092");   
                props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
                props.put("request.required.acks","1");   
                ProducerConfig config = new ProducerConfig(props);   
                //创建生产这对象
                Producer<String, String> producer = new Producer<String, String>(config);
                //生成消息
                KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("top1","test kafka");
                KeyedMessage<String, String> data2 = new KeyedMessage<String, String>("top2","hello world");
                try {   
                    int i =1; 
                    while(i < 1000){    
                        //发送消息
                        producer.send(data1);   
                        producer.send(data2);
                        System.out.println("put in kafka " + i);
                        i++;
                        Thread.sleep(1000);
                    } 
                } catch (Exception e) {   
                    e.printStackTrace();   
                }   
                producer.close();   
            }   
    }

    在SparkStreaming中接收指定话题的数据,对单词进行统计

    package com.sf;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.regex.Pattern;
    
    import org.apache.spark.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.*;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    import scala.Tuple2;
    
    import com.google.common.collect.Lists;
    public class KafkaStreamingWordCount {
    
        public static void main(String[] args) throws InterruptedException {
            //设置匹配模式,以空格分隔
            final Pattern SPACE = Pattern.compile(" ");
            //接收数据的地址和端口
            String zkQuorum = "localhost:2181";
            //话题所在的组
            String group = "1";
            //话题名称以“,”分隔
            String topics = "top1,top2";
            //每个话题的分片数
            int numThreads = 2;    
            
            //Spark Streaming程序以StreamingContext为起点,其内部维持了一个SparkContext的实例。
            // 这里我们创建一个带有两个本地线程(local[2])的StreamingContext,并设置批处理间隔为1秒
            SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
            
            // 在一个Spark应用中默认只允许有一个SparkContext,默认地spark-shell已经为我们创建好了
            // SparkContext,名为sc。因此在spark-shell中应该以下述方式创建StreamingContext,以
            // 避免创建再次创建SparkContext而引起错误:
            // val ssc = new StreamingContext(sc, Seconds(1))
            
            //jssc.checkpoint("checkpoint"); //设置检查点
            //存放话题跟分片的映射关系
            Map<String, Integer> topicmap = new HashMap<>();
            String[] topicsArr = topics.split(",");
            int n = topicsArr.length;
            for(int i=0;i<n;i++){
                topicmap.put(topicsArr[i], numThreads);
            }
            //从Kafka中获取数据转换成RDD
            JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
            //从话题中过滤所需数据
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
    
                @Override
                public Iterator<String> call(Tuple2<String, String> arg0)
                        throws Exception {
                    return Lists.newArrayList(SPACE.split(arg0._2)).iterator();
                }
            });
            //对其中的单词进行统计
            JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                  new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                      return new Tuple2<String, Integer>(s, 1);
                    }
                  }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                      return i1 + i2;
                    }
                  });
            //打印结果
            wordCounts.print();
            
            // 执行完上面代码,Spark Streaming并没有真正开始处理数据,而只是记录需在数据上执行的操作。
            // 当我们设置好所有需要在数据上执行的操作以后,我们就可以开始真正地处理数据了。如下:
            jssc.start();                  // 开始计算
            jssc.awaitTermination();      // 等待计算终止
    
        }
    
    }

    结果:

    017-01-18 18:32:27,800 WARN  org.apache.spark.storage.BlockManager.logWarning(Logging.scala:66) - Block input-0-1484735547600 replicated to only 0 peer(s) instead of 1 peers
    2017-01-18 18:32:28,801 WARN  org.apache.spark.storage.BlockManager.logWarning(Logging.scala:66) - Block input-0-1484735548600 replicated to only 0 peer(s) instead of 1 peers
    2017-01-18 18:32:29,801 WARN  org.apache.spark.storage.BlockManager.logWarning(Logging.scala:66) - Block input-0-1484735549600 replicated to only 0 peer(s) instead of 1 peers
    -------------------------------------------
    Time: 1484735550000 ms
    -------------------------------------------
    (hello,10)
    (kafka,10)
    (test,10)
    (world,10)

    master URL

    配置conf/spark-env.sh 是配置spark的standalone环境,类似于hadoop配置hdfs环境一样。但是部署程序时仍然需要指定master的位置。
    如果选择的部署模式是standalone且部署到你配置的这个集群上,可以指定 MASTER=spark://ubuntu:7070

    下面解答spark在那里指定master URL的问题:
    1.通过spark shell,执行后进入交互界面
    MASTER=spark://IP:PORT ./bin/spark-shell

    2.程序内指定(可以通过参数传入)

    val conf = new SparkConf()
    .setMaster(...)
    val sc = new SparkContext(conf)

    传递给spark的master url可以有如下几种:

    local 本地单线程
    local[K] 本地多线程(指定K个内核)
    local[*] 本地多线程(指定所有可用内核)
    spark://HOST:PORT 连接到指定的 Spark standalone cluster master,需要指定端口。
    mesos://HOST:PORT 连接到指定的 Mesos 集群,需要指定端口。
    yarn-client客户端模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。
    yarn-cluster集群模式 连接到 YARN 集群 。需要配置 HADOOP_CONF_DIR。

    spark1.0起的版本在提交程序到集群有很大的不同,需要注意:

    ./bin/spark-submit 
    --class <main-class>
    --master <master-url> 
    --deploy-mode <deploy-mode> 
    ... # other options
    <application-jar> 
    [application-arguments]

    例如:

    # Run application locally on 8 cores
    ./bin/spark-submit 
    --class org.apache.spark.examples.SparkPi 
    --master local[8] 
    /path/to/examples.jar 
    100
    
    # Run on a Spark standalone cluster
    ./bin/spark-submit 
    --class org.apache.spark.examples.SparkPi 
    --master spark://207.184.161.138:7077 
    --executor-memory 20G 
    --total-executor-cores 100 
    /path/to/examples.jar 
    1000
    
    # Run on a YARN cluster
    export HADOOP_CONF_DIR=XXX
    ./bin/spark-submit 
    --class org.apache.spark.examples.SparkPi 
    --master yarn-cluster  # can also be `yarn-client` for client mode
    --executor-memory 20G 
    --num-executors 50 
    /path/to/examples.jar 
    1000
    
    # Run a Python application on a cluster
    ./bin/spark-submit 
    --master spark://207.184.161.138:7077 
    examples/src/main/python/pi.py 
    1000

    更多关于spark-submit见《spark提交模式

  • 相关阅读:
    Spring RestTemplate 之put、delete请求
    Too many connections解决方案
    各个平台的mysql重启命令
    MySQL出现too many connections(1040)错误解决方法
    EXCEL中,如何引用一个单元格中的数据,作为另一个单元格内容中的一部分?
    [翻译][Java]ExecutorService的正确关闭方法
    MySQL:日期函数、时间函数总结(MySQL 5.X)
    MySQL 获得当前日期时间 函数
    线程本地变量ThreadLocal
    split 分割 字符串(分隔符如:* ^ : | , . ?) 及注意点
  • 原文地址:https://www.cnblogs.com/duanxz/p/3580863.html
Copyright © 2011-2022 走看看