zoukankan      html  css  js  c++  java
  • storm manual drpc 的远程调用

    一.创建server端

    public class ManualDRPC {
        
        private static final Logger LOG = LoggerFactory.getLogger(ManualDRPC.class);
        
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            DRPCSpout spout = new DRPCSpout("add");
            builder.setSpout("drpc", spout);
            builder.setBolt("add", new AddBolt(),3).shuffleGrouping("drpc");
            builder.setBolt("return", new ReturnResults(),3).shuffleGrouping("add");
            
            Config conf = new Config();
            StormSubmitter.submitTopology("ManualDRPC", conf, builder.createTopology());
            LOG.warn("==================================================");
            LOG.warn("the topology {} is submitted.","ManualDRPC");
            LOG.warn("==================================================");
            
        }
        public static class AddBolt extends BaseBasicBolt{
    
            @Override
            public void execute(Tuple input, BasicOutputCollector collector) {
                Object returnInfo = input.getValue(1);
                String params = input.getString(0);
                String[] numbers = params.split(",");
                String conversValue = String.valueOf(Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]));
                collector.emit(new Values(conversValue,returnInfo));
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("result","return-info"));
            }
            
        }
    }

    二.client端

    public class ManualClientDRPC {
        
        private static final Logger LOG = LoggerFactory.getLogger(ManualClientDRPC.class);
        
        public static void main(String[] args) {
            Config conf = new Config();
            conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
            conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
            conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
            conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
            
            try {
                DRPCClient client = new DRPCClient(conf, "master", 3777);
                String result = client.execute("add", "1,2");
                LOG.info("============== result:{}",result);
            } catch (Exception e) {
                LOG.info("ERR");
            }
        }
    }
  • 相关阅读:
    无题
    赌对了
    赌:
    这次是真的再见了,oi退役回忆录
    线段树(lazy标记)
    《挑战》2.1 POJ POJ 1979 Red and Black (简单的DFS)
    《挑战》2.1 POJ 2386 Lake Counting (简单的dfs)
    TC安装全系列教程
    ProblemC 剪花布条(KMP基础)
    多校+CF简单题
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11120211.html
Copyright © 2011-2022 走看看