zoukankan      html  css  js  c++  java
  • Storm---DirectGroup(直接分组)

    以单词分割计数为例实现Storm的DirectGroup分组:

    1、Spout实现

    Spout是Storm数据源头,使用DirectGroup方式将Spout数据发送指定的Bolt,需注意:

    1)、Spout消费的Bolt的Task(Task应为Storm的Executor的编号),在如下代码中Spout.open()初始化中拿到消费Task

    2)、需使用SpoutOutputCollector.emitDirect()方法

    3)、将Spout声明为直接流,即在Spout.declareOutputFields()声明

    /**
     * Fixed Cycle Spout
     *
     * @author hanhan.zhang
     * */
    public class FixedCycleSpout implements IRichSpout {
    
        private String _fieldName;
    
        private boolean _direct;
    
        // stream mark
        private String _streamId;
    
        private int _index;
    
        // key = msgId, value = sending tuple
        private Map<String, List<Object>> _pendingTuple;
    
        // send tuple
        private List<Object> [] _sendTuple;
    
        private SpoutOutputCollector _collector;
        private CountMetric _sendMetric;
        private CountMetric _failMetric;
    
        // consume task set
        private List<Integer> _consumeTaskIdList;
    
        public FixedCycleSpout(String _streamId, String _fieldName, boolean _direct, List<Object> ... _sendTuple) {
            this._streamId = _streamId;
            this._fieldName = _fieldName;
            this._direct = _direct;
            this._sendTuple = _sendTuple;
        }
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this._index = 0;
            _pendingTuple = Maps.newHashMap();
    
            // register metric
            this._sendMetric = context.registerMetric("cycle.spout.send.tuple.metric", new CountMetric(), 60);
            this._failMetric = context.registerMetric("cycle.spout.fail.tuple.metric", new CountMetric(), 60);
            this._collector = collector;
    
            // get consume task id
            if (this._direct) {
                this._consumeTaskIdList = Lists.newLinkedList();
                Map<String, Map<String, Grouping>> consumeTargets = context.getThisTargets();
                if (consumeTargets != null && !consumeTargets.isEmpty()) {
                    // streamId = this._streamId
                    consumeTargets.forEach((streamId, target) -> {
                        if (target != null && !target.isEmpty()) {
                            // componentId = consume target component Id
                            target.forEach((componentId, group) -> {
                                if (group.is_set_direct()) {
                                    this._consumeTaskIdList.addAll(context.getComponentTasks(componentId));
                                }
                            });
                        }
                    });
                }
            }
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void activate() {
    
        }
    
        @Override
        public void deactivate() {
    
        }
    
        @Override
        public void nextTuple() {
            this._sendMetric.incr();
            if (this._index == _sendTuple.length) {
                this._index = 0;
            }
            String msgId = UUID.randomUUID().toString();
            List<Object> tuple = this._sendTuple[this._index++];
            sendTuple(msgId, tuple);
        }
    
        @Override
        public void ack(Object msgId) {
            String msgIdStr = (String) msgId;
            System.out.println("ack tuple with msgId " + msgIdStr);
            this._pendingTuple.remove(msgIdStr);
        }
    
        @Override
        public void fail(Object msgId) {
            this._failMetric.incr();
            String msgIdStr = (String) msgId;
            System.out.println("fail tuple with msgId " + msgIdStr);
            sendTuple(msgIdStr, this._pendingTuple.get(msgIdStr));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declareStream(this._streamId, this._direct, new Fields(_fieldName));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        protected void sendTuple(String msgId, List<Object> tuple) {
            this._pendingTuple.put(msgId, tuple);
            if (this._direct) {
                if (this._consumeTaskIdList == null || this._consumeTaskIdList.isEmpty()) {
                    throw new IllegalStateException("direct task is empty !");
                }
                this._consumeTaskIdList.forEach(taskId ->
                        this._collector.emitDirect(taskId, this._streamId, tuple, msgId));
            } else {
                this._collector.emit(tuple, msgId);
            }
        }
    }
    

    2、Bolt实现

    /**
     * Sentence Split Bolt
     *
     * @author hanhan.zhang
     * */
    public class SentenceSplitBolt implements IRichBolt {
    
        private OutputCollector _collector;
    
        private CountMetric _ackMetric;
    
        private CountMetric _failMetric;
    
        private String _separator;
    
        private int _taskId;
    
        private boolean _direct;
    
        private String _streamId;
    
        public SentenceSplitBolt(String _streamId, boolean _direct) {
            this._streamId = _streamId;
            this._direct = _direct;
        }
    
        /**
         * @param context
         *          1: Register Metric
         *          2: Next Bolt Message
         * @param collector (thread-safe)
         *          1: Emit Tuple
         *          2: Ack/Fail Tuple
         * */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this._collector = collector;
            // register metric for monitor
            this._ackMetric = context.registerMetric("sentence.split.ack.metric", new CountMetric(), 60);
            this._failMetric = context.registerMetric("sentence.split.fail.metric", new CountMetric(), 60);
            this._taskId = context.getThisTaskId();
    
            this._separator = (String) stormConf.get(Const.SEPARATOR);
        }
    
        @Override
        public void execute(Tuple input) {
            try {
                String sentence = input.getString(0);
                if (Strings.isNullOrEmpty(sentence)) {
                    return;
                }
                String []fields = sentence.split(_separator);
                for (String field : fields) {
                    if (this._direct) {
                        this._collector.emitDirect(this._taskId, _streamId, input, new Values(field, 1));
                    } else {
                        this._collector.emit(this._streamId, input, new Values(field, 1));
                    }
                }
                this._collector.ack(input);
                this._ackMetric.incr();
            } catch (Exception e) {
                this._collector.fail(input);
                this._failMetric.incr();
            }
        }
    
        @Override
        public void cleanup() {
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declareStream(this._streamId, this._direct, new Fields("word", "count"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    
    
    
    /**
     * Word Sum Bolt
     *
     * @author hanhan.zhang
     * */
    public class WordSumBolt extends BaseRichBolt {
    
        private OutputCollector _collector;
    
        private int _taskId;
    
        private Cache<String, AtomicInteger> _wordCache;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this._collector = collector;
            this._taskId = context.getThisTaskId();
            this._wordCache = CacheBuilder.newBuilder()
                                            .maximumSize(1024)
                                            .expireAfterWrite(3, TimeUnit.SECONDS)
                                            .removalListener((removalNotification) -> {
                                                String key = (String) removalNotification.getKey();
                                                AtomicInteger sum = (AtomicInteger) removalNotification.getValue();
                                                System.out.println("word sum result : [" + key + "," + sum.get() + "]");
                                            })
                                            .build();
        }
    
        @Override
        public void execute(Tuple input) {
            try {
                String word = input.getString(0);
                int count = input.getInteger(1);
                if (Strings.isEmpty(word)) {
                    return;
                }
                AtomicInteger counter = this._wordCache.getIfPresent(word);
                if (counter == null) {
                    this._wordCache.put(word, new AtomicInteger(count));
                } else {
                    counter.addAndGet(count);
                }
                this._collector.ack(input);
            } catch (Exception e) {
                this._collector.fail(input);
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
        }
    }

    3、Storm运行

    /**
     * Tuple Split-Flow Topology
     *
     * @author hanhan.zhang
     * */
    public class FlowTopology {
    
        public static void main(String[] args) {
    
            // send tuple
            List<Object> []tuple = new List[] {new Values("the cow jumped over the moon"),
                                                new Values("the man went to the store and bought some candy"),
                                                new Values("four score and seven years ago"),
                                                new Values("how many apples can you eat")};
    
    
            //stream name
            String spoutStreamId = "topology.flow.cycle.spout.stream";
            String splitStreamId = "topology.flow.split.bolt.stream";
    
            // spout
            FixedCycleSpout cycleSpout = new FixedCycleSpout(spoutStreamId, "sentence", true, tuple);
    
            // bolt
            SentenceSplitBolt splitBolt = new SentenceSplitBolt(splitStreamId, false);
            WordSumBolt sumBolt = new WordSumBolt();
    
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout ("sentence.cycle.spout", cycleSpout, 1);
    
            topologyBuilder.setBolt("sentence.split.bolt", splitBolt, 1)
                            .directGrouping("sentence.cycle.spout", spoutStreamId);
    
            topologyBuilder.setBolt("word.sum.bolt", sumBolt, 3)
                            .fieldsGrouping("sentence.split.bolt", splitStreamId, new Fields("word"));
    
            Config config = new Config();
            config.put(Const.SEPARATOR, " ");
    
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("flowTopology", config, topologyBuilder.createTopology());
    
        }
    
    }
  • 相关阅读:
    新闻发布项目——Servlet类(doRegServlet )
    新闻发布项目——Servlet类(doNewsModifyServlet )
    新闻发布项目——Servlet类(doNewsModifyServlet )
    新闻发布项目——Servlet类(doNewsModifyServlet )
    and or 逻辑组合
    sqoop
    t
    tmp
    临时表 数据在 内存 转移时间 将160秒的创建临时表时间放入定时任务 不到1秒的求和时间 hadoop 引入Hadoop 分布式计算
    /tmp/crontab.tDoyrp: 设备上没有空间 查看文件夹所在分区 磁盘剩余空间 15g的root-mail大文件
  • 原文地址:https://www.cnblogs.com/hanfight/p/6011675.html
Copyright © 2011-2022 走看看