zoukankan      html  css  js  c++  java
  • 【Storm篇】--Storm分组策略

    一、前述

    Storm由数源泉spout到bolt时,可以选择分组策略,实现对spout发出的数据的分发。对多个并行度的时候有用。

    二、具体原理

    1. Shuffle Grouping
    随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
    轮询,平均分配

    2. Fields Grouping(相同fields去分发到同一个Bolt)
    按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。

    3. All Grouping
    广播发送,对于每一个tuple,所有的bolts都会收到

    4. Global Grouping
    全局分组,把tuple分配给task id最低的task 。

    5. None Grouping
    不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。

    6. Direct Grouping
    指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id) 

    7. Local or shuffle grouping
    本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致

    8.customGrouping
    自定义,相当于mapreduce那里自己去实现一个partition一样。

    总结:前4种用的多些,后面4种用的少些。

    三、具体案例

    Spout(产生数据):

    package com.sxt.storm.grouping;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.InputStreamReader;
    import java.util.Map;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    public class MySpout implements IRichSpout {
    
        private static final long serialVersionUID = 1L;
    
        FileInputStream fis;
        InputStreamReader isr;
        BufferedReader br;
    
        SpoutOutputCollector collector = null;
        String str = null;
    
        @Override
        public void nextTuple() {//真正发的逻辑
            try {
                while ((str = this.br.readLine()) != null) {
                    // 过滤动作
                    collector.emit(new Values(str, str.split("	")[1]));//发出数据,一行和一行切分完后第二个字段。
                }
            } catch (Exception e) {
            }
    
        }
    
        @Override
        public void close() {//释放资源
            try {
                br.close();
                isr.close();
                fis.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {//初始化(方法只调用一次)
            try {
                this.collector = collector;
                this.fis = new FileInputStream("track.log");
                this.isr = new InputStreamReader(fis, "UTF-8");
                this.br = new BufferedReader(isr);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {//声明发出去的字段
            declarer.declare(new Fields("log", "session_id"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        @Override
        public void ack(Object msgId) {
            System.out.println("spout ack:" + msgId.toString());
        }
    
        @Override
        public void activate() {
        }
    
        @Override
        public void deactivate() {
        }
    
        @Override
        public void fail(Object msgId) {
            System.out.println("spout fail:" + msgId.toString());
        }
    
    }

    Bolt:(处理单元)

    package com.sxt.storm.grouping;
    
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    
    public class MyBolt implements IRichBolt {
    
        private static final long serialVersionUID = 1L;
    
        OutputCollector collector = null;
        int num = 0;
        String valueString = null;
    
        @Override
        public void cleanup() {
    
        }
    
        @Override
        public void execute(Tuple input) {
            try {
                valueString = input.getStringByField("log");//通过fields接收数据
    
                if (valueString != null) {
                    num++;
                    System.err.println(input.getSourceStreamId() + " " + Thread.currentThread().getName() + "--id="//打印当前进程名字
                            + Thread.currentThread().getId() + "   lines  :" + num + "   session_id:"//打印当前进程id 
                            + valueString.split("	")[1]);//这行词的第二个字母
                }
                collector.ack(input);
                // Thread.sleep(2000);
            } catch (Exception e) {
                collector.fail(input);
                e.printStackTrace();
            }
    
        }
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(""));//声明空即可
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
    }

    Main方法:

    package com.sxt.storm.grouping;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    public class Main {
    
        /**
         * @param args
         */
        public static void main(String[] args) {
    
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout("spout", new MySpout(), 1);//拓扑名,数据源,并行度
    
            builder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");//两个spot并行 所有都分发

            //builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");// shuffleGrouping其实就是随机往下游去发,不自觉的做到了负载均衡
     
    //builder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("session_id")); // fieldsGrouping其实就是MapReduce里面理解的Shuffle,根据fields求hash来取模

           //builder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout"); // 只往一个里面发,往taskId小的那个里面去发送

    // builder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");   // 等于shuffleGrouping
    // Map conf = new HashMap(); // conf.put(Config.TOPOLOGY_WORKERS, 4); Config conf = new Config(); conf.setDebug(false); conf.setMessageTimeoutSecs(30); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology());//集群方式 } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology());// 本地模拟参数分别为名称,配置,构建拓扑结构。 } } }

     结果:

    1. builder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");//两个spot并行 所有都分发

     2. builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout")其实就是随机往下游去发,不自觉的做到了负载均衡

    3.builder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("session_id")); // fieldsGrouping其实就是MapReduce里面理解的Shuffle,根据fields求hash来取模,相同的名称的fields分发到一个bolt里面。

    4.builder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout"); // 只往一个里面发,往taskId小的那个里面去发送

     企业中常用的也就是这几个!!!

  • 相关阅读:
    通过出生日期获取年龄的方法--Java
    Hql没有limit,替换方案
    springMvc <form action="">提交跳转路径问题
    The user specified as a definer ('root'@'%') does not exist
    Eclipse中SVN设置文件为ignore后重新添加至版本控制
    (转)关于BigDecimal 转化字符串toPlainString()和toString()的区别
    禅道---Bug管理模块
    github pages部署静态网页
    mybatis-generator 代码自动生成工具(maven方式)
    警告: [SetContextPropertiesRule]{Context} Setting property 'source' to 'org.eclipse.jst.jee.server:JsonBlog' did not find a matching property.
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8352994.html
Copyright © 2011-2022 走看看