zoukankan      html  css  js  c++  java
  • storm manual drpc 的远程调用

    一.创建server端

    public class ManualDRPC {
        
        private static final Logger LOG = LoggerFactory.getLogger(ManualDRPC.class);
        
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            DRPCSpout spout = new DRPCSpout("add");
            builder.setSpout("drpc", spout);
            builder.setBolt("add", new AddBolt(),3).shuffleGrouping("drpc");
            builder.setBolt("return", new ReturnResults(),3).shuffleGrouping("add");
            
            Config conf = new Config();
            StormSubmitter.submitTopology("ManualDRPC", conf, builder.createTopology());
            LOG.warn("==================================================");
            LOG.warn("the topology {} is submitted.","ManualDRPC");
            LOG.warn("==================================================");
            
        }
        public static class AddBolt extends BaseBasicBolt{
    
            @Override
            public void execute(Tuple input, BasicOutputCollector collector) {
                Object returnInfo = input.getValue(1);
                String params = input.getString(0);
                String[] numbers = params.split(",");
                String conversValue = String.valueOf(Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]));
                collector.emit(new Values(conversValue,returnInfo));
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("result","return-info"));
            }
            
        }
    }

    二.client端

    public class ManualClientDRPC {
        
        private static final Logger LOG = LoggerFactory.getLogger(ManualClientDRPC.class);
        
        public static void main(String[] args) {
            Config conf = new Config();
            conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
            conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
            conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
            conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
            
            try {
                DRPCClient client = new DRPCClient(conf, "master", 3777);
                String result = client.execute("add", "1,2");
                LOG.info("============== result:{}",result);
            } catch (Exception e) {
                LOG.info("ERR");
            }
        }
    }
  • 相关阅读:
    如何降低微服务测试成本?我的经验之谈
    Serverless 在 SaaS 领域的最佳实践
    技术干货 | “选图预览并上传”的场景如何解?全网最全方案汇总来了
    SRE技术保障平台-盯屏中心TAC: 混合云一站式告警运维平台
    DTCC 2020 | 阿里云王涛:阿里巴巴电商数据库上云实践
    中值滤波算法 C
    python logger.debug_python中的logger模块讲解
    唯一值
    接触jeecgBoot低代码开发
    php数字操作
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11120211.html
Copyright © 2011-2022 走看看