zoukankan      html  css  js  c++  java
  • storm wordcount练手

    spout ----拆分 bolt ---合并 bolt

    spout

    package cn.ljh.storm.wordcount;
    
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    //采集数据: Spout组件
    public class WordCountSpout extends BaseRichSpout{
        //模拟产生一些数据
        private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"};
    
        //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
        private SpoutOutputCollector collector;
    
        @Override
        public void nextTuple() {
            //每隔3秒 采集一次数据
            Utils.sleep(3000);
    
            //由Storm的引擎调用,用于处理采集的每条数据
    
            //生成一个3以内的随机数
            int random = (new Random()).nextInt(3);
            String value = data[random];
    
            //打印
            System.out.println("采集的数据是:" + value);
    
            //发送给下一个组件
            this.collector.emit(new Values(value));
        }
    
        @Override
        public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
            //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
            //在open方法中对collector初始化
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            // 申明发送给下一个组建的tuple的schema(结构)
            declare.declare(new Fields("sentence"));
        }
    }

    拆分bolt

    package cn.ljh.storm.wordcount;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    //第一个bolt组件:单词拆分
    public class WordCountSplitBolt extends BaseRichBolt{
    
        //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
        private OutputCollector collector;
    
        @Override
        public void execute(Tuple tuple) {
            //如何处理上一级发来的数据: I love Beijing
           // String value = tuple.getStringByField("sentence");
            String value =tuple.getString(0);
            //分词
            String[] words = value.split(" ");
    
            //输出
            for(String w:words){
                collector.emit(new Values(w,1));
            }
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
            // 初始化
            //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            // 申明发送给下一个组建的tuple的schema(结构)
            declare.declare(new Fields("word","count"));
        }
    }

    计数bolt

    package cn.ljh.storm.wordcount;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    //第二个Bolt组件:单词的计数
    public class WordCountTotalBolt extends BaseRichBolt{
        //使用Map集合存储结果
        private Map<String, Integer> result = new HashMap<>();
    
        //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
        private OutputCollector collector;
    
        @Override
        public void execute(Tuple tuple) {
            //取出数据
            String word = tuple.getStringByField("word");
            int count = tuple.getIntegerByField("count");
    
            //求和
            if(result.containsKey(word)){
                //如果已经存在,累加
                int total = result.get(word);
                result.put(word, total+count);
            }else{
                //这是一个新单词
                result.put(word, count);
            }
    
            //输出到屏幕
            System.out.println("统计的结果是:" + result);
    
            //输出给下一个组件                                               单词           总频率
            this.collector.emit(new Values(word,result.get(word)));
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            declare.declare(new Fields("word","total"));
        }
    }

    拓扑链接

    package cn.ljh.storm.wordcount;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class WordCountTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            //指定任务的spout组件
            builder.setSpout("mywordcountspout", new WordCountSpout());
    
            //指定任务的第一个bolt组件
            builder.setBolt("mywordcountsplit", new WordCountSplitBolt())
                    .shuffleGrouping("mywordcountspout");//随机分组
    
            //指定任务的第二个bolt组件
            builder.setBolt("mywordcounttotal", new WordCountTotalBolt())
                    .fieldsGrouping("mywordcountsplit", new Fields("word"));
    
            //创建任务
            StormTopology job = builder.createTopology();
    
            Config conf = new Config();
    
            //任务有两种运行方式:1、本地模式   2、集群模式
            //1、本地模式
            LocalCluster localcluster = new LocalCluster();
            localcluster.submitTopology("MyWordCount", conf, job);
    
            //2、集群模式:用于打包jar,并放到storm运行
    //        StormSubmitter.submitTopology(args[0], conf, job);
        }
    
    
    }
    RUSH B
  • 相关阅读:
    mysql 主从复制原理
    java操作ldap
    ldap数据库--ldapsearch,ldapmodify
    ldap数据库--ODSEE--ACI
    ldap数据库--ODSEE--schema
    ldap数据库--ODSEE--复制协议
    ldap数据库--ODSEE--suffix
    ldap数据库--ODSEE--卸载
    ldap数据库--ODSEE--安装
    WebService--cxf
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11131285.html
Copyright © 2011-2022 走看看