zoukankan      html  css  js  c++  java
  • 测试Storm的多源头锚定

    过程,

    Spout 发送msgid 1-10

    一级Bolt, msgid1的tuple做为基本组合tuple, 其他8个和一组合, 然后发送给二级Bolt, 同时单个msgid对应的tuple都ack一次,msgid1对象tuple, acker将会跟踪8个二级bolt处理情况.

    二级Bolt,发送ack fail(模拟处理失败)

    结果:在spout fail下出现msg1-9都失败的情况 .

    拓扑代码

    package storm.starter;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.LocalDRPC;
    import backtype.storm.StormSubmitter;
    import backtype.storm.drpc.DRPCSpout;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.ShellBolt;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import storm.starter.spout.RandomSentenceSpout;
    
    import java.lang.management.ManagementFactory;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.log4j.Logger;
    import org.apache.log4j.PropertyConfigurator;
    
    /**
     * This topology demonstrates Storm's stream groupings and multilang
     * capabilities.
     */
    public class WordCountTopology {
    	public static String GetThreadName() {
    		Thread thread = Thread.currentThread();
    		return thread.getName();
    	}
    
    	public static final Logger logger = Logger
    			.getLogger(WordCountTopology.class);
    
    	// 切分单词 一级bolt
    	/*
    	 * public static class SplitSentence extends ShellBolt implements IRichBolt
    	 * { public SplitSentence() { super("python", "splitsentence.py");
    	 * logger.error(GetThreadName() + "SplitSentence create"); }
    	 * 
    	 * // 定义字段发送
    	 * 
    	 * @Override public void declareOutputFields(OutputFieldsDeclarer declarer)
    	 * { declarer.declare(new Fields("word")); logger.error(GetThreadName() +
    	 * "declarer.declare(new Fields(word))"); }
    	 * 
    	 * @Override public Map<String, Object> getComponentConfiguration() {
    	 * logger.error("getComponentConfiguration"); return null; } }
    	 */
    	public static class SplitSentence implements IRichBolt {
    		private OutputCollector _collector;
    		
    		int num = 0;
    		@Override
    		public void prepare(Map stormConf, TopologyContext context,
    				OutputCollector collector) {
    			_collector = collector;
    		}
    		
    		private Tuple tuple1;
    		@Override
    		public void execute(Tuple tuple) {
    			String sentence = tuple.getString(0);
    		    if(sentence.equals("a")) {
    		    	tuple1 = tuple;
    		    }
    		    else{
    		    	List<Tuple> anchors = new ArrayList<Tuple>();
    		    	anchors.add(tuple1);
    		    	anchors.add(tuple);
    		    	_collector.emit(anchors, new Values(sentence + "a"));
    		    	_collector.ack(tuple);
    		    	_collector.ack(tuple1);
    		    }
    			
    //			for (String word : sentence.split(" ")){
    //				_collector.emit(tuple, new Values(word));
    //			}
    //			num++;
    			
    			System.out.println("Bolt Thread " + Thread.currentThread().getName() + "recve : " + sentence);	
    			System.out.println( num + " bolt recev:" + tuple.getMessageId().getAnchorsToIds());			
    		}
    
    		@Override
    		public void cleanup() {
    		}
    
    		@Override
    		public void declareOutputFields(OutputFieldsDeclarer declarer) {
    			declarer.declare(new Fields("word"));
    		}
    
    		@Override
    		public Map<String, Object> getComponentConfiguration() {
    			// TODO Auto-generated method stub
    			return null;
    		}
    	}
    	
    	public static class CountCount1 implements IRichBolt {
    		Map<String, Integer> counts = new HashMap<String, Integer>();
    		private OutputCollector _collector;
    		int num = 0;
    
    		@Override
    		public void execute(Tuple tuple) {
    			String word = tuple.getString(0);
    			//logger.error(this.toString() + "word = " + word);
    			Integer count = counts.get(word);
    			if (count == null)
    				count = 0;
    			
    			count++;
    			counts.put(word, count); 
    			num++;
    			
    			_collector.fail(tuple);
    			//_collector.ack(tuple);
    			
    		   //_collector.emit(tuple, new Values(word, count));
    		}
    
    		@Override
    		public void declareOutputFields(OutputFieldsDeclarer declarer) {
    			// logger.error("declareOutputFields :");
    			declarer.declare(new Fields("result", "count"));
    		}
    
    		@Override
    		public void prepare(Map stormConf, TopologyContext context,
    				OutputCollector collector) {
    			// TODO Auto-generated method stub
    			_collector = collector;
    		}
    
    		@Override
    		public void cleanup() {
    			// TODO Auto-generated method stub
    			
    		}
    
    		@Override
    		public Map<String, Object> getComponentConfiguration() {
    			// TODO Auto-generated method stub
    			return null;
    		}
    	}
    	
    	
    	public static class WordCount extends BaseBasicBolt {
    		private OutputCollector _collector;
    		Map<String, Integer> counts = new HashMap<String, Integer>();
    
    		@Override
    		public void execute(Tuple tuple, BasicOutputCollector collector) {
    			String word = tuple.getString(0);
    			//logger.error(this.toString() + "word = " + word);
    			Integer count = counts.get(word);
    			if (count == null)
    				count = 0;
    			count++;
    			counts.put(word, count); // <key, list<value, count> >
    			//logger.error(this.toString() + "count = " + count);
    			collector.emit(new Values(word, count));
    		}
    
    		@Override
    		public void declareOutputFields(OutputFieldsDeclarer declarer) {
    			// logger.error("declareOutputFields :");
    			declarer.declare(new Fields("result", "count"));
    		}
    	}
    
    	public static class WordCount1 extends BaseBasicBolt {
    		Map<String, Integer> counts = new HashMap<String, Integer>();
    
    		@Override
    		public void execute(Tuple tuple, BasicOutputCollector collector) {
    			// logger.error("WordCount1");
    			// tuple.getFields()[0];
    			if (tuple.getFields().contains("result")) {
    				String count = (String) tuple.getValueByField("result");
    				// tuple.getValueByField(field)
    				long countl = -0;// = Long.valueOf(count);
    				// logger.error(this.toString() + " key  = resultkey " + count);
    			}
    
    			if (tuple.getFields().contains("count")) {
    				Integer count = (Integer) tuple.getValueByField("count");
    				// tuple.getValueByField(field)
    				long countl = -0;// = Long.valueOf(count);
    				//logger.error(this.toString() + " key  = count " + count);
    			}
    
    			// String word = tuple.getString(0);
    			// logger.error(this.toString() +"word = " + word);
    			// Integer count = counts.get(word);
    			// if (count == null)
    			// count = 0;
    			// count++;
    			// counts.put(word, count);
    			// logger.error(this.toString() + "count = " + count);
    			// collector.emit(new Values(word, count));
    		}
    
    		@Override
    		public void declareOutputFields(OutputFieldsDeclarer declarer) {
    			// logger.error("declareOutputFields :");
    			declarer.declare(new Fields("word1", "count1"));
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		TopologyBuilder builder = new TopologyBuilder();
    
    		PropertyConfigurator
    				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");
    		
    	    // parallelism_hint 代表是executor数量, setNumTasks 代表Tasks数量
    		builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2);
    
            builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout");
    		builder.setBolt("count", new CountCount1(), 12).fieldsGrouping("split",
    				new Fields("word"));
    //		builder.setBolt("WordCount1", new WordCount1(), 1).fieldsGrouping(
    //				"count", new Fields("result", "count"));
    
    		Config conf = new Config();
    		conf.setDebug(true);
                    //  这个设置一个spout task上面最多有多少个没有处理(ack/fail)的tuple,防止tuple队列过大, 只对可靠任务起作用 conf.setMaxSpoutPending(2); conf.setMessageTimeoutSecs(5); // 消息处理延时 conf.setNumAckers(2); // 消息处理acker System.out.println("args.length = " + args.length); if (args != null && args.length > 0) { conf.setNumWorkers(5); // 设置工作进程 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { // 每个组件的最大executor数 conf.setMaxTaskParallelism(1); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); String str = "testdrpc"; // drpc.execute("testdrpc", str); Thread.sleep(1088000); cluster.shutdown(); } } }

     spout代码

    package storm.starter.spout;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;
    
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.log4j.Logger;
    
    import storm.starter.WordCountTopology;
    
    
    // IRichSpout 
    public class RandomSentenceSpout extends BaseRichSpout { 
    	SpoutOutputCollector _collector;
    	Random _rand;
    
    	public static final Logger logger = Logger
    			.getLogger(RandomSentenceSpout.class);
    
    	@Override
    	public void open(Map conf, TopologyContext context,
    			SpoutOutputCollector collector) {
    		_collector = collector;
    		_rand = new Random();
    
    		WordCountTopology.logger.error(this.toString()
    				+ "RandomSentenceSpout is create");
    	}
    
    	private int num = 0;
    
    	private String gettmstr() {
    		StringBuilder tmp = new StringBuilder();
    		for (int i = 0; i <= num; i++)
    			tmp.append("a");
    		num++;
    		return tmp.toString();
    	}
    
    	@Override
    	public void nextTuple() {
    		Utils.sleep(200);
    		// String[] sentences = new String[]{ "the cow jumped over the moon",
    		// "an apple a day keeps the doctor away",
    		// "four score and seven years ago", "snow white and the seven dwarfs",
    		// "i am at two with nature" };
    		String[] sentences = new String[] { "A" };
    
    		String sentence = gettmstr(); // sentences[_rand.nextInt(sentences.length)];
    		if (num < 10) {
    			_collector.emit(new Values(sentence), new Integer(num));
    			// logger.error(this.toString() + "send sentence = " + sentence);
    		   // System.out.println(Thread.currentThread().getName() + " Spout ");
    		}
    	}
    
    	@Override
    	public void ack(Object id) {
    		logger.error(this.toString() + "spout ack =" + (Integer)id);
    	}
    
    	@Override
    	public void fail(Object id) {
    		logger.error("spout fail =" + (Integer)id);
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("word"));
    	}
    
    }
    

     运行结果

    2014-10-03 21:17:31,149 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =1
    2014-10-03 21:17:31,351 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =2
    Bolt Thread Thread-22recve : aaa
    0 bolt recev:{-3139141336114052337=7131499433188364504}
    2014-10-03 21:17:31,552 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =3
    Bolt Thread Thread-22recve : aaaa
    0 bolt recev:{-4497680640148241887=-615828348570847097}
    2014-10-03 21:17:31,754 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =4
    Bolt Thread Thread-22recve : aaaaa
    0 bolt recev:{-8878772617767839827=-7708082520013359311}
    2014-10-03 21:17:31,957 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =5
    Bolt Thread Thread-22recve : aaaaaa
    0 bolt recev:{-3995020874692495577=-5070846720162762196}
    2014-10-03 21:17:32,160 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =6
    Bolt Thread Thread-22recve : aaaaaaa
    0 bolt recev:{-5994700617723404155=-3738685841476816404}
    2014-10-03 21:17:32,362 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =7
    Bolt Thread Thread-22recve : aaaaaaaa
    0 bolt recev:{-2308734827213127682=-5719708045753233056}
    2014-10-03 21:17:32,563 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =8
    Bolt Thread Thread-22recve : aaaaaaaaa
    0 bolt recev:{-3718844156917119468=-6359724009048981605}
    2014-10-03 21:17:32,766 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =9
    
  • 相关阅读:
    Java synchronized对象级别与类级别的同步锁
    java并发编程JUC第十二篇:AtomicInteger原子整型
    java并发编程JUC第十一篇:如何在线程之间进行对等数据交换
    java并发编程JUC第十篇:CyclicBarrier线程同步
    java并发编程JUC第九篇:CountDownLatch线程同步
    java并发编程工具类JUC第八篇:ConcurrentHashMap
    分享Sql性能优化的一些建议
    java并发编程工具类JUC第七篇:BlockingDeque双端阻塞队列
    java并发编程工具类JUC第六篇:SynchronousQueue同步队列
    java并发编程工具类JUC第五篇:PriorityBlockingQueue优先级队列
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/4005359.html
Copyright © 2011-2022 走看看