zoukankan      html  css  js  c++  java
  • 大数据学习之storm-wordcount 实时版开发以及分组策略34

    五:storm-wordcount 实时版开发

    1:编写Spout

    package wc;
    
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    /**
     * @author Dawn
     * @date 2019年6月5日15:59:04
     * @version 1.0
     * 需求:单词计数  hello dawn hello dawn indicate
     * 实现接口:IRichSpout	IRichBolt
     * 继承抽象类:BaseRichSpout BaseRichSpout	常用
     */
    public class WordCountSpout extends BaseRichSpout{
    
    	//定义收集器
    	private SpoutOutputCollector collector;
    	
    	//发送数据
    	@Override
    	public void nextTuple() {
    		//1:发送数据到blot
    		collector.emit(new Values("hello dawn hello dawn indicate"));
    		
    		//2.设置延迟
    		try {
    			Thread.sleep(500);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		
    	}
    
    	//创建收集器
    	@Override
    	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
    
    		this.collector=collector;
    	}
    
    	//声明
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declare) {
    		//起别名
    		declare.declare(new Fields("dawn"));
    	}
    
    }
    

      

    2:编写分词bolt

    package wc;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    /**
     * @author Dawn
     * @date 2019年6月5日16:09:58
     * @version 1.0
     * 单词切分Bolt组件
     */
    public class WordCountSplitBolt extends BaseRichBolt{
    	
    	//数据继续发送到下一个bolt
    	private OutputCollector collector;
    
    	@Override
    	public void execute(Tuple in) {
    		//1.获取数据
    		String line = in.getStringByField("dawn");
    		
    		//2.切分数据
    		String[] fields = line.split(" ");
    
    		//3.<单词,1> 发送出去 下一个bolt(累加求和)
    		for(String w:fields) {
    			collector.emit(new Values(w,1));
    		}
    	}
    
    	//初始化
    	@Override
    	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    		this.collector=collector;
    	}
    
    	//声明描述
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declare) {
    		declare.declare(new Fields("word","sum"));
    	}
    
    }
    

      

    3:编写计数bolt

    package wc;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * @author Dawn
     * @date 2019年6月5日16:17:45
     * @version 1.0
     * 计数bolt
     */
    public class WordCount extends BaseRichBolt{
    
    	private Map<String, Integer> map=new HashMap<>();
    	
    	//累加求和
    	@Override
    	public void execute(Tuple in) {
    		//1.获取数据
    		String word = in.getStringByField("word");
    		Integer sum = in.getIntegerByField("sum");
    		
    		//2.业务处理
    		if(map.containsKey(word)) {
    			//之前出现几次
    			Integer count = map.get(word);
    			//已有的
    			map.put(word, count+sum);
    		}else {
    			map.put(word, sum);
    		}
    		
    		//3.打印控制台
    				System.err.println(Thread.currentThread().getName() + "	 单词为:" + word + "	 当前已出现次数为:" + map.get(word));
    	}
    
    	@Override
    	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    	}
    
    	//没有下一个阶段就不用写
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declare) {
    	}
    
    }
    

      

    4:编写driver驱动类

    package wc;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    /**
     * @author Dawn
     * @date 2019年6月5日16:18:52
     * @version 1.0
     * 驱动类,以及使用不同的分组策略演示(字段,随机,全局)
     */
    public class WordCountDriver {
    
    	public static void main(String[] args) {
    		//1.hadoop->Job storm->topology 创建拓扑
    		TopologyBuilder builder = new TopologyBuilder();
    		
    		//2.指定设置
    		//设置任务的spout组件
    		builder.setSpout("WordCountSpout", new WordCountSpout(),2);//拓扑名,数据源,并行度
    		
    		//设置任务的单词拆分的bolt组件,是字段分组 并行度为2,总任务数 4
    		builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(),2).setNumTasks(4).fieldsGrouping("WordCountSpout", new Fields("dawn"));
    				
    		//设置任务的单词计数的bolt组件,是字段分组 ,并行度为2
    		builder.setBolt("WordCount", new WordCount(),4).fieldsGrouping("WordCountSplitBolt", new Fields("word","sum"));
    		
    //============================================================================================================》		
    //		//设置任务的单词拆分的bolt组件,是随机分组 并行度为2,总任务数 4
    //		builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(),2).setNumTasks(4).shuffleGrouping("WordCountSpout");
    //		
    //		//设置任务的单词计数的bolt组件,是随机分组 ,并行度为2
    //		builder.setBolt("WordCount", new WordCount(),4).shuffleGrouping("WordCountSplitBolt");
    		
    //============================================================================================================》			
    //		//设置任务的单词拆分的bolt组件,是全局分组 并行度为2,总任务数 4
    //		//分配给task id值最小的 根据线程id判断,只分噢诶给线程id最小的  
    //		builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(),2).setNumTasks(4).globalGrouping("WordCountSpout");
    //				
    //		//设置任务的单词计数的bolt组件,是全局分组 ,并行度为2
    //		builder.setBolt("WordCount", new WordCount(),4).globalGrouping("WordCountSplitBolt");
    //		
    		//3.创建配置信息
    		Config conf = new Config();
    		//conf.setNumWorkers(10);可以设置work数
    		
    //		//集群模式运行
    //		try {
    //			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    //		} catch (AlreadyAliveException e) {
    //			// TODO Auto-generated catch block
    //			e.printStackTrace();
    //		} catch (InvalidTopologyException e) {
    //			// TODO Auto-generated catch block
    //			e.printStackTrace();
    //		} catch (AuthorizationException e) {
    //			// TODO Auto-generated catch block
    //			e.printStackTrace();
    //		}
    		
    		//4.提交任务(本地模式)
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("wordcounttopology", conf, builder.createTopology());
    		
    	}
    
    }
    

      

    六:storm-wordcount 提交到集群上运行

    1:打包程序到Linux

    2:提交任务

    3:在Storm UI上看任务执行情况

     

    七:分组策略

    使用上面word count程序来学习分组策略

    总图:参照这个来看

      

      一个executor就是一个线程数

      一个task就是一个任务数

    1) Fields Grouping

    按照字段分组。相同字段发送到一个task中。

    运行结果:

    可以看出都是进行字段进行分组的,为什么了?应为我这里字段(hello dawn hello dawn indicate)也就只有3个,而且我这里并行度设置的是4(就理解为线程数)。从结果中只有3个线程在使用!!!也就是相同的字段放入一个task中。。。

    2shuffle Grouping

    随机分组。轮询。平均分配。随机分发tuple,保证每个bolt中的tuple数量相同。

    运行结果:

    可以看出明显不是字段分组。因为这里并行度为4,并且这4个线程都用上了。平均分配。随机分发tuple,保证每个bolt中的tuple数量相同。而且这里次数出现的都有点问题。个人觉得有点像线程中的那个同步问题。(个人觉得哈!!只是助于理解,具体是不是我也不知道)

    3Non Grouping

    不分组

    采用这种策略每个bolt中接收的单词不同。

    4All Grouping

    广播发送

    5Global Grouping

    全局分组

    分配给task id值最小的

    根据线程id判断,只分噢诶给线程id最小的

    运行结果:

    可以看出虽然我们设置了4个线程。但是这个全局分组分配给task id值(线程id)最小的。根据线程id判断,只分给线程id最小的。只用到了一个线程id最小的线程

    总结:一般来说,就字段分组和随机分组用的多点。其他用的都很少

  • 相关阅读:
    使用Nginx实现反向代理
    nginx配置
    jsonp跨域实现单点登录,跨域传递用户信息以及保存cookie注意事项
    jsonp形式的ajax请求:
    面试题
    PHP设计模式_工厂模式
    Redis限制在规定时间范围内登陆错误次数限制
    HTTP 状态码简介(对照)
    Django 进阶(分页器&中间件)
    Django 之 权限系统(组件)
  • 原文地址:https://www.cnblogs.com/hidamowang/p/10981304.html
Copyright © 2011-2022 走看看