zoukankan      html  css  js  c++  java
  • spring boot 集成spark-streaming-kafka

    主要思路:SparkContext由spring容器管理,在spring容器启动完毕后,执行spark-streaming-kafka,获取数据并处理。

    1.spring容器中初始化SparkContext,代码片段如下:

    @Bean
    @ConditionalOnMissingBean(SparkConf.class)
    public SparkConf sparkConf() {
    SparkConf conf = new SparkConf()
    .setAppName(sparkAppName)
    .setMaster(sparkMasteer).set("spark.driver.memory",sparkDriverMemory)
    .set("spark.worker.memory",sparkWorkerMemory)//"26g".set("spark.shuffle.memoryFraction","0") //默认0.2
    .set("spark.executor.memory",sparkExecutorMemory)
    .set("spark.rpc.message.maxSize",sparkRpcMessageMaxSize);
    // .setMaster("local[*]");//just use in test
    return conf;
    }
    @Bean
    @ConditionalOnMissingBean(JavaSparkContext.class) //默认: JVM 只允许存在一个sparkcontext
    public JavaSparkContext javaSparkContext(@Autowired SparkConf sparkConf) {
    return new JavaSparkContext(sparkConf);

    }

    2.spark-streaming-kafka 执行类:

    @Component
    public class SparkKafkaStreamExecutor implements Serializable,Runnable{
    private static final long serialVersionUID = 1L;
    private static final Logger log = LoggerFactory.getLogger(SparkKafkaStreamExecutor.class);

    @Value("${spark.stream.kafka.durations}")
    private String streamDurationTime;
    @Value("${kafka.broker.list}")
    private String metadatabrokerlist;
    @Value("${spark.kafka.topics}")
    private String topicsAll;
    @Autowired

    private transient Gson gson;

    private transient JavaStreamingContext jsc;
    @Autowired
    private transient JavaSparkContext javaSparkContext;

    @Override
    public void run() {
    startStreamTask();
    }

    public void startStreamTask() {
    // System.setProperty("hadoop.home.dir", "D:\hadoop-2.7.5");
    Set<String> topics = new HashSet<String>(Arrays.asList(topicsAll.split(",")));
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", metadatabrokerlist);
    jsc = new JavaStreamingContext(javaSparkContext,
    Durations.seconds(Integer.valueOf(streamDurationTime)));
    jsc.checkpoint("checkpoint"); //保证元数据恢复,就是Driver端挂了之后数据仍然可以恢复

    // 得到数据流
    final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(jsc, String.class,
    String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
    System.out.println("stream started!");
    stream.print();
    stream.foreachRDD(v -> {
    //针对单篇文章流式处理
    List<String> topicDatas = v.values().collect();
    for (String topicData : topicDatas) {
    List<Map<String, Object>> list = gson
    .fromJson(topicData, new TypeToken<List<Map<String, String>>>() {}.getType());
    list.parallelStream().forEach(m->{
    //do something
    System.out.println(m);
    });
    }
    log.info("一批次数据流处理完: {}",topicDatas);
    });
    jsc.start();
    }


    public void destoryStreamTask() {
    if(jsc!=null) {
    jsc.stop();
    }
    }

    3.容器加载完成后容器监听执行类:

    public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
    ApplicationContext ac = event.getApplicationContext();
    SparkKafkaStreamExecutor sparkKafkaStreamExecutor= ac.getBean(SparkKafkaStreamExecutor.class);
    Thread thread = new Thread(sparkKafkaStreamExecutor);
    thread.start();
    }

    }

    4.项目启动类,注册监听类:

    @SpringBootApplication
    public class SampleApplication {

    public static void main(String[] args) {
    SpringApplication springApplication = new SpringApplication(SampleApplication.class);
    springApplication.addListeners(new ApplicationStartup());
    springApplication.run(args);
    }
    //将Gson划归为spring管理
    @Bean
    public Gson gson() {
    return new Gson();
    }

    }


    ————————————————
    版权声明:本文为CSDN博主「river_rock」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/swjtu_yhz/article/details/79361472

  • 相关阅读:
    pcntl_fork 导致 MySQL server has gone away 解决方案
    视频网站 阻止迅雷劫持下载
    推荐大家使用的CSS书写规范、顺序
    console对象
    js Math函数
    致13级师弟师妹关于校招的一些话
    UVA514 铁轨 Rails:题解
    SP1805 HISTOGRA
    洛谷 P4363 [九省联考2018]一双木棋chess 题解
    比赛:大奔的方案solution
  • 原文地址:https://www.cnblogs.com/javalinux/p/15061775.html
Copyright © 2011-2022 走看看