zoukankan      html  css  js  c++  java
  • Storm Grouping —— 流分组策略

    Storm Grouping

    Shuffle Grouping :

    随机分组,尽量均匀分布到下游Bolt中将流分组定义为混排。
    这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。

    Fields Grouping :

    按字段分组,按数据中field值进行分组;相同field值的Tuple被发送到相同的Task这种grouping机制保证相同field值的tuple会去同一个task,
    这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”. —— 小示例

    All grouping :广播

    广播发送, 对于每一个tuple将会复制到每一个bolt中处理。

    Global grouping :全局分组

    Tuple被分配到一个Bolt中的一个Task,实现事务性的Topology。
    Stream中的所有的tuple都会发送给同一个bolt任务处理,所有的tuple将会发送给拥有最小task_id的bolt任务处理。

    None grouping :不分组

    不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。

    Direct grouping :直接分组 指定分组

    由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。
    只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)。

    Fields Grouping 代码

    /**
     * 数字累加求和
     * 先添加storm依赖
     */
    public class LocalTopologySumFieldsGrouping {
        /**
         * spout需要继承baserichspout,实现未实现的方法
         * @author Administrator
         *
         */
        public static class MySpout extends BaseRichSpout{
            private Map conf;
            private TopologyContext context;
            private SpoutOutputCollector collector;
            
            /**
             * 初始化方法,只会执行一次
             * 在这里面可以写一个初始化的代码
             * Map conf:其实里面保存的是topology的一些配置信息
             * TopologyContext context:topology的上下文,类似于servletcontext
             * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
             */
            @Override
            public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                this.conf = conf;
                this.context = context;
                this.collector = collector;
            }
    
            int num = 1;
            /**
             * 这个方法是spout中最重要的方法,
             * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
             * 每调用一次,会向外发射一条数据
             */
            @Override
            public void nextTuple() {
                System.out.println("spout发射:"+num);
                //把数据封装到values中,称为一个tuple,发射出去
                this.collector.emit(new Values(num++,num%2));
                Utils.sleep(1000);
            }
            
            /**
             * 声明输出字段
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                //给values中的数据起个名字,方便后面的bolt从这个values中取数据
                //fields中定义的参数和values中传递的数值是一一对应的
                declarer.declare(new Fields("num","flag"));
            }
        }
        
        /**
         * 自定义bolt需要实现baserichbolt
         * @author Administrator
         *
         */
        public static class MyBolt extends BaseRichBolt{
            private Map stormConf; 
            private TopologyContext context;
            private OutputCollector collector;
            
            /**
             * 和spout中的open方法意义一样
             */
            @Override
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
    
            int sum = 0;
            /**
             * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
             */
            @Override
            public void execute(Tuple input) {
                //input.getInteger(0);//也可以根据角标获取tuple中的数据
                Integer value = input.getIntegerByField("num");
                System.out.println("线程id:"+Thread.currentThread().getId()+",值:"+value);
                //sum+=value;
                //System.out.println("和:"+sum);
            }
            
            /**
             * 声明输出字段
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
                //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
            }
            
        }
        /**
         * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
         * @param args
         */
        public static void main(String[] args) {
            //组装topology
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("spout1", new MySpout());
            //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
            topologyBuilder.setBolt("bolt1", new MyBolt(),3).fieldsGrouping("spout1", new Fields("flag"));
            
            //创建本地storm集群
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
        }
        
    }
    

    ShuffleGrouping代码

    /**
     * 数字累加求和
     * 先添加storm依赖
     */
    public class LocalTopologySumShufferGrouping {
        /**
         * spout需要继承baserichspout,实现未实现的方法
         * @author Administrator
         *
         */
        public static class MySpout extends BaseRichSpout{
            private Map conf;
            private TopologyContext context;
            private SpoutOutputCollector collector;
            
            /**
             * 初始化方法,只会执行一次
             * 在这里面可以写一个初始化的代码
             * Map conf:其实里面保存的是topology的一些配置信息
             * TopologyContext context:topology的上下文,类似于servletcontext
             * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
             */
            @Override
            public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                this.conf = conf;
                this.context = context;
                this.collector = collector;
            }
    
            int num = 1;
            /**
             * 这个方法是spout中最重要的方法,
             * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
             * 每调用一次,会向外发射一条数据
             */
            @Override
            public void nextTuple() {
                System.out.println("spout发射:"+num);
                //把数据封装到values中,称为一个tuple,发射出去
                this.collector.emit(new Values(num++));
                Utils.sleep(1000);
            }
            
            /**
             * 声明输出字段
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                //给values中的数据起个名字,方便后面的bolt从这个values中取数据
                //fields中定义的参数和values中传递的数值是一一对应的
                declarer.declare(new Fields("num"));
            }
            
        }
        
        
        /**
         * 自定义bolt需要实现baserichbolt
         * @author Administrator
         *
         */
        public static class MyBolt extends BaseRichBolt{
            private Map stormConf; 
            private TopologyContext context;
            private OutputCollector collector;
            
            /**
             * 和spout中的open方法意义一样
             */
            @Override
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
    
            int sum = 0;
            /**
             * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
             */
            @Override
            public void execute(Tuple input) {
                //input.getInteger(0);//也可以根据角标获取tuple中的数据
                Integer value = input.getIntegerByField("num");
                System.out.println("线程id:"+Thread.currentThread().getId()+",值:"+value);//这样可以知道哪个线程接收到这个数据了.
                //sum+=value;
                //System.out.println("和:"+sum);
            }
            
            /**
             * 声明输出字段
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
                //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
            }
            
        }
        /**
         * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
         * @param args
         */
        public static void main(String[] args) {
            //组装topology
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("spout1", new MySpout());
            //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
            topologyBuilder.setBolt("bolt1", new MyBolt(),3).globalGrouping("spout1");
            
            //创建本地storm集群
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
        }
    }
    
  • 相关阅读:
    WebAPI实现移动端上传头像接口
    WebAPI发布IIS报错问题
    EF 解除属性映射到数据库中 NotMappedAttribute无效解决办法
    更改MVC注册Areas的顺序,掌控Areas的运作
    js判断字符串是否为JSON格式
    docker commit命令
    强大的strace命令用法详解
    Openshift概念
    如何在生产环境使用Btrace进行调试
    kill-9 kill-15
  • 原文地址:https://www.cnblogs.com/0xcafedaddy/p/6880291.html
Copyright © 2011-2022 走看看