zoukankan      html  css  js  c++  java
  • Storm累计求和中使用各种分组Grouping

           Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡)

           Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到同一任务, 而不同的userid则会被分配到不同的任务

           All Grouping: 广播发送,对于每一个tuple,Bolts中的所有任务都会收到.

           Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.

           Non Grouping: 随机分派,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,

           Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者具体由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)

     Fields Grouping 的代码

      1 /**
      2  * 数字累加求和
      3  * 先添加storm依赖
      4  */
      5 public class LocalTopologySumFieldsGrouping {
      6     /**
      7      * spout需要继承baserichspout,实现未实现的方法
      8      * @author Administrator
      9      *
     10      */
     11     public static class MySpout extends BaseRichSpout{
     12         private Map conf;
     13         private TopologyContext context;
     14         private SpoutOutputCollector collector;
     15         
     16         /**
     17          * 初始化方法,只会执行一次
     18          * 在这里面可以写一个初始化的代码
     19          * Map conf:其实里面保存的是topology的一些配置信息
     20          * TopologyContext context:topology的上下文,类似于servletcontext
     21          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     22          */
     23         @Override
     24         public void open(Map conf, TopologyContext context,
     25                 SpoutOutputCollector collector) {
     26             this.conf = conf;
     27             this.context = context;
     28             this.collector = collector;
     29         }
     30 
     31         int num = 1;
     32         /**
     33          * 这个方法是spout中最重要的方法,
     34          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     35          * 每调用一次,会向外发射一条数据
     36          */
     37         @Override
     38         public void nextTuple() {
     39             System.out.println("spout发射:"+num);
     40             //把数据封装到values中,称为一个tuple,发射出去
     41             this.collector.emit(new Values(num++,num%2));
     42             Utils.sleep(1000);
     43         }
     44         
     45         /**
     46          * 声明输出字段
     47          */
     48         @Override
     49         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     50             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     51             //fields中定义的参数和values中传递的数值是一一对应的
     52             declarer.declare(new Fields("num","flag"));
     53         }
     54     }
     55     
     56     /**
     57      * 自定义bolt需要实现baserichbolt
     58      * @author Administrator
     59      *
     60      */
     61     public static class MyBolt extends BaseRichBolt{
     62         private Map stormConf; 
     63         private TopologyContext context;
     64         private OutputCollector collector;
     65         
     66         /**
     67          * 和spout中的open方法意义一样
     68          */
     69         @Override
     70         public void prepare(Map stormConf, TopologyContext context,
     71                 OutputCollector collector) {
     72             this.stormConf = stormConf;
     73             this.context = context;
     74             this.collector = collector;
     75         }
     76 
     77         int sum = 0;
     78         /**
     79          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
     80          */
     81         @Override
     82         public void execute(Tuple input) {
     83             //input.getInteger(0);//也可以根据角标获取tuple中的数据
     84             Integer value = input.getIntegerByField("num");
     85             System.out.println("线程id:"+Thread.currentThread().getId()+",值:"+value);
     86             //sum+=value;
     87             //System.out.println("和:"+sum);
     88         }
     89         
     90         /**
     91          * 声明输出字段
     92          */
     93         @Override
     94         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     95             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
     96             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
     97         }
     98         
     99     }
    100     /**
    101      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    102      * @param args
    103      */
    104     public static void main(String[] args) {
    105         //组装topology
    106         TopologyBuilder topologyBuilder = new TopologyBuilder();
    107         topologyBuilder.setSpout("spout1", new MySpout());
    108         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    109         topologyBuilder.setBolt("bolt1", new MyBolt(),3).fieldsGrouping("spout1", new Fields("flag"));
    110         
    111         //创建本地storm集群
    112         LocalCluster localCluster = new LocalCluster();
    113         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
    114     }
    115     
    116 }

    ShuffleGrouping代码

      1 /**
      2  * 数字累加求和
      3  * 先添加storm依赖
      4  */
      5 public class LocalTopologySumShufferGrouping {
      6     /**
      7      * spout需要继承baserichspout,实现未实现的方法
      8      * @author Administrator
      9      *
     10      */
     11     public static class MySpout extends BaseRichSpout{
     12         private Map conf;
     13         private TopologyContext context;
     14         private SpoutOutputCollector collector;
     15         
     16         /**
     17          * 初始化方法,只会执行一次
     18          * 在这里面可以写一个初始化的代码
     19          * Map conf:其实里面保存的是topology的一些配置信息
     20          * TopologyContext context:topology的上下文,类似于servletcontext
     21          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     22          */
     23         @Override
     24         public void open(Map conf, TopologyContext context,
     25                 SpoutOutputCollector collector) {
     26             this.conf = conf;
     27             this.context = context;
     28             this.collector = collector;
     29         }
     30 
     31         int num = 1;
     32         /**
     33          * 这个方法是spout中最重要的方法,
     34          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     35          * 每调用一次,会向外发射一条数据
     36          */
     37         @Override
     38         public void nextTuple() {
     39             System.out.println("spout发射:"+num);
     40             //把数据封装到values中,称为一个tuple,发射出去
     41             this.collector.emit(new Values(num++));
     42             Utils.sleep(1000);
     43         }
     44         
     45         /**
     46          * 声明输出字段
     47          */
     48         @Override
     49         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     50             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     51             //fields中定义的参数和values中传递的数值是一一对应的
     52             declarer.declare(new Fields("num"));
     53         }
     54         
     55     }
     56     
     57     
     58     /**
     59      * 自定义bolt需要实现baserichbolt
     60      * @author Administrator
     61      *
     62      */
     63     public static class MyBolt extends BaseRichBolt{
     64         private Map stormConf; 
     65         private TopologyContext context;
     66         private OutputCollector collector;
     67         
     68         /**
     69          * 和spout中的open方法意义一样
     70          */
     71         @Override
     72         public void prepare(Map stormConf, TopologyContext context,
     73                 OutputCollector collector) {
     74             this.stormConf = stormConf;
     75             this.context = context;
     76             this.collector = collector;
     77         }
     78 
     79         int sum = 0;
     80         /**
     81          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
     82          */
     83         @Override
     84         public void execute(Tuple input) {
     85             //input.getInteger(0);//也可以根据角标获取tuple中的数据
     86             Integer value = input.getIntegerByField("num");
     87             System.out.println("线程id:"+Thread.currentThread().getId()+",值:"+value);//这样可以知道哪个线程接收到这个数据了.
     88             //sum+=value;
     89             //System.out.println("和:"+sum);
     90         }
     91         
     92         /**
     93          * 声明输出字段
     94          */
     95         @Override
     96         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     97             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
     98             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
     99         }
    100         
    101     }
    102     /**
    103      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    104      * @param args
    105      */
    106     public static void main(String[] args) {
    107         //组装topology
    108         TopologyBuilder topologyBuilder = new TopologyBuilder();
    109         topologyBuilder.setSpout("spout1", new MySpout());
    110         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    111         topologyBuilder.setBolt("bolt1", new MyBolt(),3).globalGrouping("spout1");
    112         
    113         //创建本地storm集群
    114         LocalCluster localCluster = new LocalCluster();
    115         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
    116     }
    117 }
  • 相关阅读:
    现实世界的Windows Azure:采访Gridsum的Sr.业务发展总监Yun Xu
    现在可用——Windows Azure SDK 1.6
    Rock Paper Azure Challenge回来啦
    这几天比较忙,自己的职业生涯规划好了吗?目标又是什么呢?生活在十字路口。。。。。。
    GDI+ 学习记录(24): 输出文本<3>
    GDI+ 学习记录(30): MetaFile 文件操作
    GDI+ 学习记录(29): 区域 Region
    GDI+ 学习记录(26): 显示图像 Image
    GDI+ 学习记录(25): 变换 Transform
    返回整数的四种情况
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/5793975.html
Copyright © 2011-2022 走看看