zoukankan      html  css  js  c++  java
  • storm高级原语-Transactional topology

    参考:

    1. http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/
    2. http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/

    示例代码:

    package com.lky.topology;
    
    import java.math.BigInteger;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.junit.Test;
    
    
    import com.lky.util.FileUtil;
    import com.lky.util.RunStorm;
    
    import backtype.storm.Config;
    import backtype.storm.coordination.BatchOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.testing.MemoryTransactionalSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBatchBolt;
    import backtype.storm.topology.base.BaseTransactionalBolt;
    import backtype.storm.transactional.ICommitter;
    import backtype.storm.transactional.TransactionAttempt;
    import backtype.storm.transactional.TransactionalTopologyBuilder;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    @SuppressWarnings({ "deprecation", "serial", "rawtypes" })
    /**
     * @Title: TransactionalGlobalCount.java 
     * @Package com.lky.topology 
     * @Description: 事务topology(模拟实时统计消息数量)
     * @author lky 
     * @date 2015年10月25日 上午11:23:12 
     * @version V1.0
     */
    public class TransactionalGlobalCount {
        private static Log log=LogFactory.getLog(TransactionalGlobalCount.class);
        public static final int PARTITION_TAKE_PER_BATCH = 3;
        public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {
            {
                put(0, new ArrayList<List<Object>>() {
                    {
                        add(new Values("cat"));
                        add(new Values("dog"));
                        add(new Values("chicken"));
                        add(new Values("cat"));
                        add(new Values("dog"));
                        add(new Values("apple"));
                    }
                });
                put(1, new ArrayList<List<Object>>() {
                    {
                        add(new Values("cat"));
                        add(new Values("dog"));
                        add(new Values("apple"));
                        add(new Values("banana"));
                    }
                });
                put(2, new ArrayList<List<Object>>() {
                    {
                        add(new Values("cat"));
                        add(new Values("cat"));
                        add(new Values("cat"));
                        add(new Values("cat"));
                        add(new Values("cat"));
                        add(new Values("dog"));
                        add(new Values("dog"));
                        add(new Values("dog"));
                        add(new Values("dog"));
                    }
                });
            }
        };
    
        public static class Value {
            int count = 0;
            BigInteger txid;
        }
    
        public static Map<String, Value> DATABASE = new HashMap<String, Value>();
        public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
    
        /**
        * @Title: TransactionalGlobalCount.java 
        * @Package com.lky.topology 
        * @Description: processing阶段(可以并行处理)
        * @author lky 
        * @date 2015年10月25日 下午12:14:26 
        * @version V1.0
         */
        public static class BatchCount extends BaseBatchBolt {
            BatchOutputCollector collector;
            Object id;
            Integer _count = 0;
    
            @Override
            public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
                this.collector = collector;
                this.id = id;
            }
    
            @Override
            public void execute(Tuple tuple) {
                _count++;
                log.info("-------------->"+_count);
            }
    
            @Override
            public void finishBatch() {
                log.info("--------"+_count+"----------");
                collector.emit(new Values(id, _count));
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("id", "count"));
            }
    
        }
        
        /**
        * @Title: TransactionalGlobalCount.java 
        * @Package com.lky.topology 
        * @Description: committer 汇总阶段(强顺序流)
        * @author lky 
        * @date 2015年10月25日 下午12:14:54 
        * @version V1.0
         */
        public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
    
            BatchOutputCollector collector;
            TransactionAttempt id;
            Integer _size = 0;
    
            @Override
            public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
                this.collector = collector;
                this.id = id;
            }
    
            @Override
            public void execute(Tuple tuple) {
                Integer sum = tuple.getInteger(1);
                log.info("sum---------->"+sum);
                if (sum > 0) {
                    _size += sum;
                }
            }
    
            @Override
            public void finishBatch() {
                Value oldValue = DATABASE.get(GLOBAL_COUNT_KEY);
                Value newValue;
    
                // 如果没有存储过,或者有新的事务到达,更新
                if (null == oldValue || !oldValue.txid.equals(id.getTransactionId())) {
                    newValue = new Value();
                    newValue.txid = id.getTransactionId();
                    if (null == oldValue) {
                        newValue.count = _size;
                    } else {
                        newValue.count = _size + oldValue.count;
                        collector.emit(new Values(id, newValue.count));
                        FileUtil.strToFile(Integer.valueOf(newValue.count).toString(), "sum.txt", true);
                    }
    
                    DATABASE.put(GLOBAL_COUNT_KEY, newValue);
                } else {
                    newValue = oldValue;
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("id", "size"));
            }
    
        }
    
        @Test
        public void test() {
             MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
                TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
                builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
                builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
    
                Config config = new Config();
                config.setDebug(true);
                config.setMaxSpoutPending(3);
    
                RunStorm.runStormLocally(builder.buildTopology(), "ss", config, 5);
        }
    }
  • 相关阅读:
    如何隐藏DLL中,导出函数的名称?
    排序算法之0-1、0-1-2排序
    在Vista以上版本运行WTL程序,有时候会提示“这个程序可能安装补正确...”的错误
    编码格式(未完待续......)
    WinDbg分析DUMP文件
    自己捣鼓了一个12306抢票软件,欢迎大家使用,并讨论改进方法!
    Cocos2D-X扫盲之坐标系、锚点
    Spring核心组件剖析
    走进Java中的持有对象(容器类)【二】Collection
    走进JVM【二】理解JVM内存区域
  • 原文地址:https://www.cnblogs.com/dmir/p/4908611.html
Copyright © 2011-2022 走看看