zoukankan      html  css  js  c++  java
  • Storm的StreamID使用样例(版本1.0.2)

    随手尝试了一下StreamID的的用法。留个笔记。

    ==数据样例==

    {
        "Address": "小桥镇小桥中学对面",
        "CityCode": "511300",
        "CountyCode": "511322",
        "EnterpriseCode": "YUNDA",
        "MailNo": "667748320345",
        "Mobile": "183****5451",
        "Name": "王***",
        "ProvCode": "510000",
        "Weight": "39"
    }

    ==拓扑结构==

    ==程序源码==

    <Spout1>

    package test;
    
    import com.alibaba.fastjson.JSONObject;
    import common.constants.Constants;
    import common.simulate.DataRandom;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    public class Spout1 extends BaseRichSpout {
        private SpoutOutputCollector _collector = null;
        private DataRandom _dataRandom = null;
        private int _timeInterval = 1000;
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declareStream("Stream1", new Fields("json"));
            declarer.declareStream("Stream2", new Fields("json"));
        }
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
            _dataRandom = DataRandom.getInstance();
            if (conf.containsKey(Constants.SpoutInterval)) {
                _timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval));
            }
        }
    
        @Override
        public void nextTuple() {
            try {
                Thread.sleep(_timeInterval);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            JSONObject jsonObject = _dataRandom.getRandomExpressData();
            System.out.print("[---Spout1---]jsonObject=" + jsonObject + "
    ");
            _collector.emit("Stream1", new Values(jsonObject.toJSONString()));
            _collector.emit("Stream2", new Values(jsonObject.toJSONString()));
        }
    }

    <CountBolt1>

    package test;
    
    import com.alibaba.fastjson.JSONObject;
    import common.constants.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class CountBolt1 extends BaseRichBolt {
        private OutputCollector _collector = null;
        private int taskId = 0;
        private Map<String, Integer> _map = new HashMap<>();
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declareStream("Stream3", new Fields("company", "count"));
        }
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
            taskId = context.getThisTaskId();
        }
    
        @Override
        public void execute(Tuple input) {
            String str = input.getStringByField("json");
            JSONObject jsonObject = JSONObject.parseObject(str);
            String company = jsonObject.getString(Constants.EnterpriseCode);
    
            int count = 0;
            if (_map.containsKey(company)) {
                count = _map.get(company);
            }
            count++;
            _map.put(company, count);
    
            _collector.emit("Stream3", new Values(company, count));
            System.out.print("[---CountBolt1---]" +
                    "taskId=" + taskId + ", company=" + company + ", count=" + count + "
    ");
        }
    }

    <CountBolt2>

    package test;
    
    import com.alibaba.fastjson.JSONObject;
    import common.constants.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    public class CountBolt2 extends BaseRichBolt {
        private OutputCollector _collector = null;
        private int _taskId = 0;
        private Map<String, Integer> _map = new HashMap<>();
    
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            _collector = outputCollector;
            _taskId = topologyContext.getThisTaskId();
        }
    
        @Override
        public void execute(Tuple tuple) {
            String str = tuple.getStringByField("json");
            JSONObject jsonObject = JSONObject.parseObject(str);
            String prov = jsonObject.getString(Constants.ProvCode);
    
            int count = 0;
            if (_map.containsKey(prov)) {
                count = _map.get(prov);
            }
            count++;
            _map.put(prov, count);
    
            _collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));
            System.out.print("[---CountBolt2---]" +
                    "taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "
    ");
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));
        }
    }

    <CountBolt3>

    package test;
    
    import com.alibaba.fastjson.JSONObject;
    import common.constants.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    public class CountBolt3 extends BaseRichBolt {
        private OutputCollector _collector = null;
        private int _taskId = 0;
        private Map<String, Integer> _map = new HashMap<>();
    
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            _collector = outputCollector;
            _taskId = topologyContext.getThisTaskId();
        }
    
        @Override
        public void execute(Tuple tuple) {
            String str = tuple.getStringByField("json");
    
            JSONObject jsonObject = JSONObject.parseObject(str);
            String city = jsonObject.getString(Constants.CityCode);
    
            int count = 0;
            if (_map.containsKey(city)) {
                count = _map.get(city);
            }
            count++;
            _map.put(city, count);
    
            _collector.emit("Stream4", new Values(city, count));
            System.out.print("[---CountBolt3---]" +
                    "taskId=" + _taskId + ", city=" + city + ", count=" + count + "
    ");
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));
        }
    }

    <TopBolt>

    package test;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.List;
    import java.util.Map;
    
    public class TopBolt extends BaseRichBolt {
    
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        }
    
        @Override
        public void execute(Tuple tuple) {
            System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "
    ");
            List<Object> values = tuple.getValues();
            for(Object value : values) {
                System.out.print("[---TopBolt---]value=" + value + "
    ");
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        }
    }

    <TestTopology>

    package test;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class TestTopology {
        public static void main(String[] args)
                throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("Spout1", new Spout1());
            builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");
            builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");
            builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");
            builder.setBolt("Top", new TopBolt())
                    .fieldsGrouping("Count1", "Stream3", new Fields("company"))
                    .fieldsGrouping("Count2", "Stream4", new Fields("prov"))
                    .fieldsGrouping("Count3", "Stream4", new Fields("city"));
    
            Config config = new Config();
            config.setNumWorkers(1);
            config.put(common.constants.Constants.SpoutInterval, args[1]);
    
            if (Boolean.valueOf(args[0])) {
                StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());
            } else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("TestTopology1", config, builder.createTopology());
            }
        }
    }

    ==结果日志==

    [---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小桥镇小桥中学对面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"}
    [---CountBolt1---]taskId=1, company=YUNDA, count=1
    [---CountBolt3---]taskId=3, city=511300, count=1
    [---CountBolt2---]taskId=2, prov=510000, count=1
    [---TopBolt---]StreamID=Stream4
    [---TopBolt---]value=510000
    [---TopBolt---]value=1
    [---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616
    [---TopBolt---]StreamID=Stream4
    [---TopBolt---]value=511300
    [---TopBolt---]value=1
    [---TopBolt---]StreamID=Stream3
    [---TopBolt---]value=YUNDA
    [---TopBolt---]value=1
  • 相关阅读:
    Anagram
    HDU 1205 吃糖果(鸽巢原理)
    Codeforces 1243D 0-1 MST(补图的连通图数量)
    Codeforces 1243C Tile Painting(素数)
    Codeforces 1243B2 Character Swap (Hard Version)
    Codeforces 1243B1 Character Swap (Easy Version)
    Codeforces 1243A Maximum Square
    Codeforces 1272E Nearest Opposite Parity(BFS)
    Codeforces 1272D Remove One Element
    Codeforces 1272C Yet Another Broken Keyboard
  • 原文地址:https://www.cnblogs.com/quchunhui/p/8302192.html
Copyright © 2011-2022 走看看