zoukankan      html  css  js  c++  java
  • SparkStreaming对接rabbitMQ

    /**
    * SparkStreaming对接rabbitmq java代码
    */
    public class SparkConsumerRabbit {
    public static void main(String[] args) throws InterruptedException, AnalysisException {
    SparkConf sparkConf = new SparkConf()
    .setAppName("SparkConsumerRabbit")
    .setMaster("local[2]");
    //毫秒 Duration参数
    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1000));
    Map<String, String> params = new HashMap<>();
    //map中参数设置
    params.put("hosts", "192.168.45.10");
    params.put("port", "5672");
    params.put("userName", "admin");
    params.put("password", "admin");
    params.put("queueName", "cj_ack");

    //如报错请添加下面的参数,原因是代码运行报错底层已经把durable置为true了;
    //params.put("durable", "false");
    Function<QueueingConsumer.Delivery, String> handler = message -> new String(message.getBody());
    JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jsc,String.class,params,handler);
    messages.print();
    jsc.start();
    jsc.awaitTermination();
    }
    }

    本代码在1.5.0中运行无误,如使用2.3.0以上代码编写需要添加logging类,后续将补充上,如有不足之处请谅解。相互学习。

  • 相关阅读:
    小结
    五种常见的 PHP 设计模式
    php克隆 自动加载
    小知识点
    php抽象 与接口
    php静态
    iOS开发零碎笔记
    iOS开发错误日志
    Objective-C:Foundation框架-常用类-NSObject
    Objective-C:Foundation框架-常用类-NSDate
  • 原文地址:https://www.cnblogs.com/Mr--zhao/p/11278793.html
Copyright © 2011-2022 走看看