zoukankan      html  css  js  c++  java
  • Trident中 FixedBatchSpout分析

    FixedBatchSpout 继承自 IBatchSpout

    IBatchSpout 方法

    public interface IBatchSpout extends Serializable {
        void open(Map conf, TopologyContext context);
        void emitBatch(long batchId, TridentCollector collector);
        void ack(long batchId);
        void close();
        Map getComponentConfiguration();
        Fields getOutputFields();
    }
    
     FixedBatchSpout代码
    
    package storm.trident.testing;
    
    import backtype.storm.Config;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.tuple.Fields;
    import java.util.List;
    import java.util.Map;
    import storm.trident.operation.TridentCollector;
    import storm.trident.spout.IBatchSpout;
    
    
    public class FixedBatchSpout implements IBatchSpout {
    
        Fields fields;
        List<Object>[] outputs;
        int maxBatchSize;
        
        public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
            this.fields = fields; // 输出字段
            this.outputs = outputs;  // 保存至本地, 每个对象都是一个List<Object>
            this.maxBatchSize = maxBatchSize; //  该批次最大发射次数,但是不是唯一决定元素
        }
        
        int index = 0;
        boolean cycle = false;
        
        public void setCycle(boolean cycle) {
            this.cycle = cycle;
        }
        
        @Override
        public void open(Map conf, TopologyContext context) {
            index = 0;
        }
        
        // trident调用 @Override public void emitBatch(long batchId, TridentCollector collector) { //Utils.sleep(2000); if(index>=outputs.length && cycle) { index = 0; // 超过下标后,让index归零, 继续循环发送 } // 在不超过outputs大小的情况下,每次发射一个List<Object> for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) { collector.emit(outputs[index]); } } @Override public void ack(long batchId) { } @Override public void close() { } @Override public Map getComponentConfiguration() { Config conf = new Config(); conf.setMaxTaskParallelism(1); // 最大并行度,默认是1. 好像没提供接口来修改, 很奇怪。 return conf; } @Override public Fields getOutputFields() { return fields ; // 输出字段 } }

     外部使用

    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1,
                    new Values("ab ab ab ab ab ab ab ab ab ab"));  // 这里设置为1,表示每批只发送一个List<Value>,但是设置更大,也不会出错,参见上面的代码注释,它要同时满足不超过数组大小,所以不会越界。
             spout.setCycle(true);  // 设置则表示会一直发送,如果不用它一直发射, 可以注释掉。

    其他就是trident内部调用。

    如分析有误,请指出,谢谢。。

  • 相关阅读:
    您认为在测试人员同开发人员的沟通过程中,如何提高沟通的效率和改善沟通的效果?维持测试人员同开发团队中其他成员良好的人际关系的关键是什么?
    redis和jedis的用法,区别
    Jedis实现多种功能总结
    Druid简单介绍
    Svn与Git的区别
    SVN的一些基本概念(学前了解)
    Redis-cli 的功能
    postman的使用方法
    Spring Boot 有哪些优点?
    Redis中的常用命令哪些?
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/3999641.html
Copyright © 2011-2022 走看看