zoukankan      html  css  js  c++  java
  • spark streaming 接收 kafka 数据java代码WordCount示例

    1. 首先启动zookeeper

    2. 启动kafka

    3. 核心代码

    • 生产者生产消息的java代码,生成要统计的单词
    package streaming;
    
    import java.util.Properties; 
       
    import kafka.javaapi.producer.Producer; 
    import kafka.producer.KeyedMessage; 
    import kafka.producer.ProducerConfig; 
       
    public class MyProducer {   
         
            public static void main(String[] args) {   
                Properties props = new Properties();   
                props.setProperty("metadata.broker.list","localhost:9092");   
                props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
                props.put("request.required.acks","1");   
                ProducerConfig config = new ProducerConfig(props);   
                //创建生产这对象
                Producer<String, String> producer = new Producer<String, String>(config);
                //生成消息
                KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("top1","test kafka");
                KeyedMessage<String, String> data2 = new KeyedMessage<String, String>("top2","hello world");
                try {   
                    int i =1; 
                    while(i < 100){    
                        //发送消息
                        producer.send(data1);   
                        producer.send(data2);
                        i++;
                        Thread.sleep(1000);
                    } 
                } catch (Exception e) {   
                    e.printStackTrace();   
                }   
                producer.close();   
            }   
    }
    • 在SparkStreaming中接收指定话题的数据,对单词进行统计
    package streaming;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.regex.Pattern;
    
    import org.apache.spark.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.*;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    import scala.Tuple2;
    
    import com.google.common.collect.Lists;
    public class KafkaStreamingWordCount {
    
        public static void main(String[] args) {
            //设置匹配模式,以空格分隔
            final Pattern SPACE = Pattern.compile(" ");
            //接收数据的地址和端口
            String zkQuorum = "localhost:2181";
            //话题所在的组
            String group = "1";
            //话题名称以“,”分隔
            String topics = "top1,top2";
            //每个话题的分片数
            int numThreads = 2;        
            SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
    //        jssc.checkpoint("checkpoint"); //设置检查点
            //存放话题跟分片的映射关系
            Map<String, Integer> topicmap = new HashMap<>();
            String[] topicsArr = topics.split(",");
            int n = topicsArr.length;
            for(int i=0;i<n;i++){
                topicmap.put(topicsArr[i], numThreads);
            }
            //从Kafka中获取数据转换成RDD
            JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
            //从话题中过滤所需数据
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
    
                @Override
                public Iterable<String> call(Tuple2<String, String> arg0)
                        throws Exception {
                    return Lists.newArrayList(SPACE.split(arg0._2));
                }
            });
            //对其中的单词进行统计
            JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                  new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                      return new Tuple2<String, Integer>(s, 1);
                    }
                  }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                      return i1 + i2;
                    }
                  });
            //打印结果
            wordCounts.print();
            jssc.start();
            jssc.awaitTermination();
    
        }
    
    }
  • 相关阅读:
    MySQL约束笔记
    MySQL 存储过程入门
    数据库范式
    Hibernate 懒加载 错误----no session
    复选框 checkbox 选中事件
    Hibernate 三种状态变化 与 sql 语句的关系
    Spring 4 + Hibernate 4 下 getCurrentSession()的使用情况
    35个java代码性能优化总结
    为什么 Java中1000==1000为false而100==100为true?AND "2+2=5"?
    MyBatis对象关联关系----多对多的保存与查询
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/4959633.html
Copyright © 2011-2022 走看看