zoukankan      html  css  js  c++  java
  • Storm系列(十七)DRPC介绍

    Storm版本0.9.5

    在storm中DRPC服务应用于远程分布式计算,根据客户端提交的请求参数,而返回Storm计算的结果。

    DRPC服务启动流程(远程模式)

    • 启动DRPC服务,启动命令:storm drpc
    • 修改storm.yaml配置文件,添加drpc.servers,如:

      drpc.servers:

      - "mast"

    • 向storm集群中的drpc服务节点mast提交DRPC拓扑

    DRPC工作流程

    1. 客户端发起请求到DRPC Service
    2. DRPC Service会为这次请求生成对应的requestId与输入的参数一起发送到DRPCSpout,再提交到DrpcTopology(输入参数的字段名称固定为args)
    3. 处理后的结果返回给ReturnResults
    4. 调用DRPC Service服务返回给客户端,返回的结果包括requestId和请求的结果数据.
    5. ReturnResults输出两个字段:第一个为结果数据,第二个为requestId

    工作流程图

    image

    本地调用DRPC服务方式

    TopologyBuilder topologyBuilder = new TopologyBuilder();
    LocalDRPC drpc = new LocalDRPC();
           
    DRPCSpout spout = new DRPCSpout("test", drpc);
    topologyBuilder.setSpout("drpc", spout);
    topologyBuilder.setBolt("bolt1", new TestBolt(),2).shuffleGrouping("drpc");
    topologyBuilder.setBolt("return", new ReturnResults(),2).shuffleGrouping("bolt1");
           
    LocalCluster cluster = new LocalCluster();
    10  Config conf = new Config();
    11  cluster.submitTopology("bolt1", conf, topologyBuilder.createTopology());

    分布式集群模式调用DRPC服务

    TopologyBuilder topologyBuilder = new TopologyBuilder();
           
    DRPCSpout spout = new DRPCSpout("test");
    topologyBuilder.setSpout("drpc", spout);
    topologyBuilder.setBolt("bolt1", new TestBolt(),2).shuffleGrouping("drpc");
    topologyBuilder.setBolt("return", new ReturnResults(),2).shuffleGrouping("bolt1");   
     
    Config conf = new Config();
    StormSubmitter.submitTopology("bolt1", conf, topologyBuilder.createTopology());

    调用

    public class DRPCTest {
     
        public static void main(String[] args) {
     
            DRPCClient client = new DRPCClient("远程服务IP地址", 3772);
            try {
                String result = client.execute("exclamation", "hello ");
               
                System.out.println(result);
    10          } catch (TException e) {
    11              e.printStackTrace();
    12          } catch (DRPCExecutionException e) {
    13              e.printStackTrace();
    14          } 
    15      }
    16  }
  • 相关阅读:
    MSSQL·阻止保存要求重新创建表的更改配置
    MSSQL·查询某数据库中所有表的记录数并排序
    异常处理·psftp·local unable to open
    MSSQL·Execution Timeout Expired. The timeout period elapsed prior to completion of the oper..
    MSSQL·ORDER BY 1 DESC是什么写法?
    MSSQL·大数据量历史数据清理的思路
    ubuntu清理wine卸载后的残余项目
    Learning the Vi Editor, 6th Edition O'Reilly Media
    做一粒不浮躁的好“种子”
    Qt Designer使用简易教程
  • 原文地址:https://www.cnblogs.com/jianyuan/p/4902958.html
Copyright © 2011-2022 走看看