zoukankan      html  css  js  c++  java
  • storm wordcount练手

    spout ----拆分 bolt ---合并 bolt

    spout

    package cn.ljh.storm.wordcount;
    
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    //采集数据: Spout组件
    public class WordCountSpout extends BaseRichSpout{
        //模拟产生一些数据
        private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"};
    
        //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
        private SpoutOutputCollector collector;
    
        @Override
        public void nextTuple() {
            //每隔3秒 采集一次数据
            Utils.sleep(3000);
    
            //由Storm的引擎调用,用于处理采集的每条数据
    
            //生成一个3以内的随机数
            int random = (new Random()).nextInt(3);
            String value = data[random];
    
            //打印
            System.out.println("采集的数据是:" + value);
    
            //发送给下一个组件
            this.collector.emit(new Values(value));
        }
    
        @Override
        public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
            //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
            //在open方法中对collector初始化
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            // 申明发送给下一个组建的tuple的schema(结构)
            declare.declare(new Fields("sentence"));
        }
    }

    拆分bolt

    package cn.ljh.storm.wordcount;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    //第一个bolt组件:单词拆分
    public class WordCountSplitBolt extends BaseRichBolt{
    
        //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
        private OutputCollector collector;
    
        @Override
        public void execute(Tuple tuple) {
            //如何处理上一级发来的数据: I love Beijing
           // String value = tuple.getStringByField("sentence");
            String value =tuple.getString(0);
            //分词
            String[] words = value.split(" ");
    
            //输出
            for(String w:words){
                collector.emit(new Values(w,1));
            }
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
            // 初始化
            //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            // 申明发送给下一个组建的tuple的schema(结构)
            declare.declare(new Fields("word","count"));
        }
    }

    计数bolt

    package cn.ljh.storm.wordcount;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    //第二个Bolt组件:单词的计数
    public class WordCountTotalBolt extends BaseRichBolt{
        //使用Map集合存储结果
        private Map<String, Integer> result = new HashMap<>();
    
        //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
        private OutputCollector collector;
    
        @Override
        public void execute(Tuple tuple) {
            //取出数据
            String word = tuple.getStringByField("word");
            int count = tuple.getIntegerByField("count");
    
            //求和
            if(result.containsKey(word)){
                //如果已经存在,累加
                int total = result.get(word);
                result.put(word, total+count);
            }else{
                //这是一个新单词
                result.put(word, count);
            }
    
            //输出到屏幕
            System.out.println("统计的结果是:" + result);
    
            //输出给下一个组件                                               单词           总频率
            this.collector.emit(new Values(word,result.get(word)));
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            declare.declare(new Fields("word","total"));
        }
    }

    拓扑链接

    package cn.ljh.storm.wordcount;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class WordCountTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            //指定任务的spout组件
            builder.setSpout("mywordcountspout", new WordCountSpout());
    
            //指定任务的第一个bolt组件
            builder.setBolt("mywordcountsplit", new WordCountSplitBolt())
                    .shuffleGrouping("mywordcountspout");//随机分组
    
            //指定任务的第二个bolt组件
            builder.setBolt("mywordcounttotal", new WordCountTotalBolt())
                    .fieldsGrouping("mywordcountsplit", new Fields("word"));
    
            //创建任务
            StormTopology job = builder.createTopology();
    
            Config conf = new Config();
    
            //任务有两种运行方式:1、本地模式   2、集群模式
            //1、本地模式
            LocalCluster localcluster = new LocalCluster();
            localcluster.submitTopology("MyWordCount", conf, job);
    
            //2、集群模式:用于打包jar,并放到storm运行
    //        StormSubmitter.submitTopology(args[0], conf, job);
        }
    
    
    }
    RUSH B
  • 相关阅读:
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 454 四数相加 II
    Java实现 LeetCode 454 四数相加 II
    Java实现 LeetCode 454 四数相加 II
    FFmpeg解码H264及swscale缩放详解
    linux中cat more less head tail 命令区别
    C语言字符串操作总结大全(超详细)
    如何使用eclipse进行嵌入式Linux的开发
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11131285.html
Copyright © 2011-2022 走看看