zoukankan      html  css  js  c++  java
  • Storm drpc学习

    示例代码:

    package com.lky.test;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.junit.Test;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.LocalDRPC;
    import backtype.storm.drpc.DRPCSpout;
    import backtype.storm.drpc.ReturnResults;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;
    
    /**
    * @ClassName: manualDRPC
    * @Description: storm 分布式RPC学习
    * @author Administrator
    * @date 2015年10月23日
    * @version 1.0
     */
    public class manualDRPC {
    
        private static Log log = LogFactory.getLog(manualDRPC.class);
    
        @SuppressWarnings("serial")
        public static class ExclamationBolt extends BaseBasicBolt {
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("result", "return-info"));
            }
    
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String arg = tuple.getString(0);
                Object retInfo = tuple.getValue(1);
                log.info("execute----------->"+arg+"--------"+retInfo);
                collector.emit(new Values(arg + "!!!", retInfo));
            }
        }
        
        @Test
        public void test(){
            TopologyBuilder builder=new TopologyBuilder();
            LocalDRPC drpc=new LocalDRPC();//本地RPC
            
            //构建topology
            DRPCSpout drpcSpout=new DRPCSpout("exclation",drpc);
            builder.setSpout("drpc", drpcSpout,2);
            builder.setBolt("exam", new ExclamationBolt(),4).shuffleGrouping("drpc");
            builder.setBolt("return", new ReturnResults(),4).shuffleGrouping("exam");
            
            //配置topology
            Config config=new Config();
            config.setDebug(true);
            config.setMaxSpoutPending(1000);
            config.setNumWorkers(2);
            
            //本地集群
            LocalCluster localCluster=new LocalCluster();
            localCluster.submitTopology("test", config, builder.createTopology());
            
            //本地和storm交互
            System.out.println("-------------"+drpc.execute("exclation", "luo")+"-------------");
            System.out.println("-------------"+drpc.execute("exclation", "lky")+"-------------");
            
            
            Utils.sleep(1000*10);
            drpc.shutdown();
            localCluster.killTopology("test");
            localCluster.shutdown();
        }
    
    }
    package com.lky.test;
    
    import java.util.Map;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.junit.Test;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.LocalDRPC;
    import backtype.storm.drpc.LinearDRPCTopologyBuilder;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;
    
    /**
     * @ClassName: drpcTopology
     * @Description:线性drpc学习
     * @author Administrator
     * @date 2015年10月23日
     *
     */
    public class drpcTopology {
    
        @SuppressWarnings("serial")
        public static class Exclation extends BaseRichBolt {
            private static Log log = LogFactory.getLog(Exclation.class);
            private OutputCollector _outputCollector;
    
            @Override
            public void execute(Tuple tuple) {
                String res = null;
                try {
                    res = tuple.getString(1);
                    if (null != res) {
                        _outputCollector.emit(new Values(tuple.getValue(0), res + "!!!!"));
                        log.info("execute 发射消息-------->" + res);
                    }
    
                } catch (Exception e) {
                    log.error("execute处理异常!!!");
                }
            }
    
            @Override
            public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector outputCollector) {
                this._outputCollector = outputCollector;
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("id", "result"));
            }
        }
    
        @SuppressWarnings("deprecation")
        @Test
        public void testDRPC() {
            // LinearDRPCTopologyBuilder帮助我们自动集成了DRPCSpout和returnResults(bolt)
            LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("text");
            builder.addBolt(new Exclation(), 3);
    
            Config config = new Config();
            LocalDRPC drpc = new LocalDRPC();
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("drpc-test", config, builder.createLocalTopology(drpc));
    
            for (String word : new String[] { "hello", "word" }) {
                System.out.println("result "" + word + "": " + drpc.execute("text", word));
            }
    
            Utils.sleep(1000 * 5);
            drpc.shutdown();
            cluster.killTopology("drpc-test");
            cluster.shutdown();
        }
    
    }
  • 相关阅读:
    【CF536D】Tavas in Kansas(博弈+动态规划)
    【CF643F】Bears and Juice(信息与可区分情况数)
    【AT3981】[ARC093D] Dark Horse(容斥+状压DP)
    【CF708E】Student's Camp(动态规划)
    【洛谷6775】[NOI2020] 制作菜品(思维好题)
    【洛谷2282】[HNOI2003] 历史年份(线段树优化DP)
    【洛谷5068】[Ynoi2015] 我回来了(线段树)
    【洛谷4117】[Ynoi2018] 五彩斑斓的世界(第二分块)
    【洛谷3745】[六省联考2017] 期末考试(水题)
    【AtCoder】AtCoder Grand Contest 050 解题报告(A~D)
  • 原文地址:https://www.cnblogs.com/dmir/p/4905644.html
Copyright © 2011-2022 走看看