zoukankan      html  css  js  c++  java
  • Storm 中drpc调用

    package storm.starter;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.LocalDRPC;
    import backtype.storm.StormSubmitter;
    import backtype.storm.drpc.DRPCSpout;
    import backtype.storm.task.ShellBolt;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import storm.starter.spout.RandomSentenceSpout;
    
    import java.lang.management.ManagementFactory;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.log4j.Logger;
    import org.apache.log4j.PropertyConfigurator;
    
    /**
     * This topology demonstrates Storm's stream groupings and multilang
     * capabilities.
     */
    public class Drpctest {
    	public static final Logger logger = Logger.getLogger(Drpctest.class);
    	public static class WordCount extends BaseBasicBolt {
    		Map<String, Integer> counts = new HashMap<String, Integer>();
    
    		@Override
    		public void execute(Tuple tuple, BasicOutputCollector collector) {
    			String word = tuple.getString(0);
    			logger.error(this.toString() + "word = " + word);
    			Integer count = counts.get(word);
    			if (count == null)
    				count = 0;
    			count++;
    			counts.put(word, count);
    			logger.error(this.toString() + "count = " + count);
    			collector.emit(new Values(word, count));
    		}
    
    		String str = Thread.currentThread().getName();
    
    		@Override
    		public void declareOutputFields(OutputFieldsDeclarer declarer) {
    			logger.error("declareOutputFields :");
    			declarer.declare(new Fields("result", "count"));
    		}
    	}
    
    	public static class DrpcBolt extends BaseBasicBolt {
    		Map<String, Integer> counts = new HashMap<String, Integer>();
    
    		@Override
    		public void execute(Tuple tuple, BasicOutputCollector collector) {
    			String logString = tuple.getString(0);
    			logger.error("DrpcBolt recve :" + logString);
    		}
    
    		@Override
    		public void declareOutputFields(OutputFieldsDeclarer declarer) {
    			// 暂时没用
    			declarer.declare(new Fields("word1", "count1"));
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		TopologyBuilder builder = new TopologyBuilder();
    
    		// drpc
    		LocalDRPC drpc = new LocalDRPC();
    		DRPCSpout drpc_spout = new DRPCSpout("testdrpc", drpc);
    		builder.setSpout("drpcspout", drpc_spout, 3);
    
    		PropertyConfigurator
    				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");
    
    		// 接入drpc
    		builder.setBolt("DrpcBolt", new DrpcBolt(), 1).shuffleGrouping(
    				"drpcspout");
    
    		Config conf = new Config();
    		conf.setDebug(true);
    
    		if (args != null && args.length > 0) {
    			conf.setNumWorkers(3);
    
    			StormSubmitter.submitTopology(args[0], conf,
    					builder.createTopology());
    		} else {
    			conf.setMaxTaskParallelism(3);
    			conf.setDebug(true);
    
    			LocalCluster cluster = new LocalCluster();
    			cluster.submitTopology("word-count", conf, builder.createTopology());
    
    			String str = "send test drpc"; // 和 DRPCSpout 名字对应
    			drpc.execute("testdrpc", str);
    
    			Thread.sleep(10000);
    
    			cluster.shutdown();
    		}
    	}
    }
    
  • 相关阅读:
    03014_properties配置文件
    Python 文件I/O
    Python面向对象
    Python CGI编程
    Python正则表达式
    Python使用SMTP发送邮件
    python操作mysql数据库
    Python多线程
    python XML解析
    给傻瓜用的SP2010开发--第一部分--理解SP开发平台--第一章节--理解SP促销讨论(2)--追踪SP源头
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/3989209.html
Copyright © 2011-2022 走看看