zoukankan      html  css  js  c++  java
  • Storm的StreamID使用样例(版本1.0.2)

    随手尝试了一下StreamID的的用法。留个笔记。

    ==数据样例==

    {
        "Address": "小桥镇小桥中学对面",
        "CityCode": "511300",
        "CountyCode": "511322",
        "EnterpriseCode": "YUNDA",
        "MailNo": "667748320345",
        "Mobile": "183****5451",
        "Name": "王***",
        "ProvCode": "510000",
        "Weight": "39"
    }

    ==拓扑结构==

    ==程序源码==

    <Spout1>

    package test;
    
    import com.alibaba.fastjson.JSONObject;
    import common.constants.Constants;
    import common.simulate.DataRandom;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    public class Spout1 extends BaseRichSpout {
        private SpoutOutputCollector _collector = null;
        private DataRandom _dataRandom = null;
        private int _timeInterval = 1000;
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declareStream("Stream1", new Fields("json"));
            declarer.declareStream("Stream2", new Fields("json"));
        }
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
            _dataRandom = DataRandom.getInstance();
            if (conf.containsKey(Constants.SpoutInterval)) {
                _timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval));
            }
        }
    
        @Override
        public void nextTuple() {
            try {
                Thread.sleep(_timeInterval);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            JSONObject jsonObject = _dataRandom.getRandomExpressData();
            System.out.print("[---Spout1---]jsonObject=" + jsonObject + "
    ");
            _collector.emit("Stream1", new Values(jsonObject.toJSONString()));
            _collector.emit("Stream2", new Values(jsonObject.toJSONString()));
        }
    }

    <CountBolt1>

    package test;
    
    import com.alibaba.fastjson.JSONObject;
    import common.constants.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class CountBolt1 extends BaseRichBolt {
        private OutputCollector _collector = null;
        private int taskId = 0;
        private Map<String, Integer> _map = new HashMap<>();
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declareStream("Stream3", new Fields("company", "count"));
        }
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
            taskId = context.getThisTaskId();
        }
    
        @Override
        public void execute(Tuple input) {
            String str = input.getStringByField("json");
            JSONObject jsonObject = JSONObject.parseObject(str);
            String company = jsonObject.getString(Constants.EnterpriseCode);
    
            int count = 0;
            if (_map.containsKey(company)) {
                count = _map.get(company);
            }
            count++;
            _map.put(company, count);
    
            _collector.emit("Stream3", new Values(company, count));
            System.out.print("[---CountBolt1---]" +
                    "taskId=" + taskId + ", company=" + company + ", count=" + count + "
    ");
        }
    }

    <CountBolt2>

    package test;
    
    import com.alibaba.fastjson.JSONObject;
    import common.constants.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    public class CountBolt2 extends BaseRichBolt {
        private OutputCollector _collector = null;
        private int _taskId = 0;
        private Map<String, Integer> _map = new HashMap<>();
    
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            _collector = outputCollector;
            _taskId = topologyContext.getThisTaskId();
        }
    
        @Override
        public void execute(Tuple tuple) {
            String str = tuple.getStringByField("json");
            JSONObject jsonObject = JSONObject.parseObject(str);
            String prov = jsonObject.getString(Constants.ProvCode);
    
            int count = 0;
            if (_map.containsKey(prov)) {
                count = _map.get(prov);
            }
            count++;
            _map.put(prov, count);
    
            _collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));
            System.out.print("[---CountBolt2---]" +
                    "taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "
    ");
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));
        }
    }

    <CountBolt3>

    package test;
    
    import com.alibaba.fastjson.JSONObject;
    import common.constants.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    public class CountBolt3 extends BaseRichBolt {
        private OutputCollector _collector = null;
        private int _taskId = 0;
        private Map<String, Integer> _map = new HashMap<>();
    
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            _collector = outputCollector;
            _taskId = topologyContext.getThisTaskId();
        }
    
        @Override
        public void execute(Tuple tuple) {
            String str = tuple.getStringByField("json");
    
            JSONObject jsonObject = JSONObject.parseObject(str);
            String city = jsonObject.getString(Constants.CityCode);
    
            int count = 0;
            if (_map.containsKey(city)) {
                count = _map.get(city);
            }
            count++;
            _map.put(city, count);
    
            _collector.emit("Stream4", new Values(city, count));
            System.out.print("[---CountBolt3---]" +
                    "taskId=" + _taskId + ", city=" + city + ", count=" + count + "
    ");
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));
        }
    }

    <TopBolt>

    package test;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.List;
    import java.util.Map;
    
    public class TopBolt extends BaseRichBolt {
    
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        }
    
        @Override
        public void execute(Tuple tuple) {
            System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "
    ");
            List<Object> values = tuple.getValues();
            for(Object value : values) {
                System.out.print("[---TopBolt---]value=" + value + "
    ");
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        }
    }

    <TestTopology>

    package test;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class TestTopology {
        public static void main(String[] args)
                throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("Spout1", new Spout1());
            builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");
            builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");
            builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");
            builder.setBolt("Top", new TopBolt())
                    .fieldsGrouping("Count1", "Stream3", new Fields("company"))
                    .fieldsGrouping("Count2", "Stream4", new Fields("prov"))
                    .fieldsGrouping("Count3", "Stream4", new Fields("city"));
    
            Config config = new Config();
            config.setNumWorkers(1);
            config.put(common.constants.Constants.SpoutInterval, args[1]);
    
            if (Boolean.valueOf(args[0])) {
                StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());
            } else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("TestTopology1", config, builder.createTopology());
            }
        }
    }

    ==结果日志==

    [---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小桥镇小桥中学对面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"}
    [---CountBolt1---]taskId=1, company=YUNDA, count=1
    [---CountBolt3---]taskId=3, city=511300, count=1
    [---CountBolt2---]taskId=2, prov=510000, count=1
    [---TopBolt---]StreamID=Stream4
    [---TopBolt---]value=510000
    [---TopBolt---]value=1
    [---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616
    [---TopBolt---]StreamID=Stream4
    [---TopBolt---]value=511300
    [---TopBolt---]value=1
    [---TopBolt---]StreamID=Stream3
    [---TopBolt---]value=YUNDA
    [---TopBolt---]value=1
  • 相关阅读:
    Django框架03 /视图相关
    Django框架02 /Django下载安装、url路由分发
    Django框架01 / http协议、web框架本质
    前端09 /jQuery标签操作、事件、补充
    前端08 /jQuery标签操作、事件
    前端07 /jQuery初识
    Public key for mysql....rpm is not installed
    无参数实例化Configuration对象以及addResource无法加载core-site.xml中的内容
    Permission denied user=hadoop access=WRITE inode=root rootsupergroup rwxr
    MaxTemperature程序Mapper ClassNotFoundException
  • 原文地址:https://www.cnblogs.com/quchunhui/p/8302192.html
Copyright © 2011-2022 走看看