zoukankan      html  css  js  c++  java
  • Streaming+Sparksql使用sql实时分析 rabbitmq+mongodb+hive

    SparkConf sparkConf = new SparkConf()
    //此处使用一个链接切记使用一个链接否则汇报有多个sparkcontext错误
    .setAppName("SparkConsumerRabbit")
    .setMaster("local[2]")
    .set("hive.metastore.uris", thrift)
    .set("spark.sql.warehouse.dir", hdfs)
    .set("spark.mongodb.input.uri", "mongodb://" + rule.getMUName(jsonStr) + ":" + rule.getMpwd(jsonStr) + "@" + rule.getMIp(jsonStr) + ":" + rule.getMport(jsonStr) + "/" + rule.getMDBName(jsonStr) + "." + rule.getMtable(jsonStr))
    .set("spark.mongodb.output.uri", "mongodb://root:123456@192.168.4.51:27010/pachong.test");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    //Duration参数秒
    //Streaming 方式
    JavaStreamingContext jsc = new JavaStreamingContext(sc, Durations.seconds(5));
    //hivesql 方式
    HiveContext hiveContext = new HiveContext(sc);
    hiveContext.sql("show databases").show();
    hiveContext.sql("use" + " " + db);
    //mongodb 方式
    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
    Map<String, String> params = new HashMap<>();
    //map中参数设置,加载map连接rabbit
    params.put("hosts", "192.168.7.96");
    params.put("port", "5672");
    params.put("userName", "admin");
    params.put("password", "admin");
    params.put("queueName", "cj_ack");
    params.put("durable", "false");
    Function<QueueingConsumer.Delivery, String> handler = message -> new String(message.getBody());
    JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jsc,String.class,params,handler);
    messages.print();
  • 相关阅读:
    对象形式传递
    解决DLNA方案的技术框架
    关于DLNA
    MAC配置Xcode的Cocos2d-x环境
    什么叫做双缓冲?
    Window7 Cocos2d-x配置开发环境
    Windows 8.1 Update 2更新了什么?
    微软发布Windows Phone 8.1 Update 和中文版Cortana“小娜”
    大开眼界 游览Facebook香港办公室
    小米的“假照片”危机
  • 原文地址:https://www.cnblogs.com/Mr--zhao/p/11344372.html
Copyright © 2011-2022 走看看