zoukankan      html  css  js  c++  java
  • storm manual drpc 的远程调用

    一.创建server端

    public class ManualDRPC {
        
        private static final Logger LOG = LoggerFactory.getLogger(ManualDRPC.class);
        
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            DRPCSpout spout = new DRPCSpout("add");
            builder.setSpout("drpc", spout);
            builder.setBolt("add", new AddBolt(),3).shuffleGrouping("drpc");
            builder.setBolt("return", new ReturnResults(),3).shuffleGrouping("add");
            
            Config conf = new Config();
            StormSubmitter.submitTopology("ManualDRPC", conf, builder.createTopology());
            LOG.warn("==================================================");
            LOG.warn("the topology {} is submitted.","ManualDRPC");
            LOG.warn("==================================================");
            
        }
        public static class AddBolt extends BaseBasicBolt{
    
            @Override
            public void execute(Tuple input, BasicOutputCollector collector) {
                Object returnInfo = input.getValue(1);
                String params = input.getString(0);
                String[] numbers = params.split(",");
                String conversValue = String.valueOf(Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]));
                collector.emit(new Values(conversValue,returnInfo));
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("result","return-info"));
            }
            
        }
    }

    二.client端

    public class ManualClientDRPC {
        
        private static final Logger LOG = LoggerFactory.getLogger(ManualClientDRPC.class);
        
        public static void main(String[] args) {
            Config conf = new Config();
            conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
            conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
            conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
            conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
            
            try {
                DRPCClient client = new DRPCClient(conf, "master", 3777);
                String result = client.execute("add", "1,2");
                LOG.info("============== result:{}",result);
            } catch (Exception e) {
                LOG.info("ERR");
            }
        }
    }
  • 相关阅读:
    C/C++产生随机数
    BNUOJ34973Liserious战队
    oracle ebs 12.20 安装成功其过程失败日记及总结(1)
    HDU 2544 最短路 SPFA 邻接表 模板
    GridView编辑删除操作
    Hibernate_10_继承的例子_单表
    String不变性
    Mac在结构quick cocos2d-x编译环境
    Hash散列算法 Time33算法
    南京地图南京全套的卫星地图下载 百度高清卫星地图包括道路、标签信息叠加
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11120211.html
Copyright © 2011-2022 走看看