zoukankan      html  css  js  c++  java
  • storm wordcount练手

    spout ----拆分 bolt ---合并 bolt

    spout

    package cn.ljh.storm.wordcount;
    
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    //采集数据: Spout组件
    public class WordCountSpout extends BaseRichSpout{
        //模拟产生一些数据
        private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"};
    
        //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
        private SpoutOutputCollector collector;
    
        @Override
        public void nextTuple() {
            //每隔3秒 采集一次数据
            Utils.sleep(3000);
    
            //由Storm的引擎调用,用于处理采集的每条数据
    
            //生成一个3以内的随机数
            int random = (new Random()).nextInt(3);
            String value = data[random];
    
            //打印
            System.out.println("采集的数据是:" + value);
    
            //发送给下一个组件
            this.collector.emit(new Values(value));
        }
    
        @Override
        public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
            //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
            //在open方法中对collector初始化
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            // 申明发送给下一个组建的tuple的schema(结构)
            declare.declare(new Fields("sentence"));
        }
    }

    拆分bolt

    package cn.ljh.storm.wordcount;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    //第一个bolt组件:单词拆分
    public class WordCountSplitBolt extends BaseRichBolt{
    
        //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
        private OutputCollector collector;
    
        @Override
        public void execute(Tuple tuple) {
            //如何处理上一级发来的数据: I love Beijing
           // String value = tuple.getStringByField("sentence");
            String value =tuple.getString(0);
            //分词
            String[] words = value.split(" ");
    
            //输出
            for(String w:words){
                collector.emit(new Values(w,1));
            }
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
            // 初始化
            //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            // 申明发送给下一个组建的tuple的schema(结构)
            declare.declare(new Fields("word","count"));
        }
    }

    计数bolt

    package cn.ljh.storm.wordcount;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    //第二个Bolt组件:单词的计数
    public class WordCountTotalBolt extends BaseRichBolt{
        //使用Map集合存储结果
        private Map<String, Integer> result = new HashMap<>();
    
        //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
        private OutputCollector collector;
    
        @Override
        public void execute(Tuple tuple) {
            //取出数据
            String word = tuple.getStringByField("word");
            int count = tuple.getIntegerByField("count");
    
            //求和
            if(result.containsKey(word)){
                //如果已经存在,累加
                int total = result.get(word);
                result.put(word, total+count);
            }else{
                //这是一个新单词
                result.put(word, count);
            }
    
            //输出到屏幕
            System.out.println("统计的结果是:" + result);
    
            //输出给下一个组件                                               单词           总频率
            this.collector.emit(new Values(word,result.get(word)));
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            declare.declare(new Fields("word","total"));
        }
    }

    拓扑链接

    package cn.ljh.storm.wordcount;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class WordCountTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            //指定任务的spout组件
            builder.setSpout("mywordcountspout", new WordCountSpout());
    
            //指定任务的第一个bolt组件
            builder.setBolt("mywordcountsplit", new WordCountSplitBolt())
                    .shuffleGrouping("mywordcountspout");//随机分组
    
            //指定任务的第二个bolt组件
            builder.setBolt("mywordcounttotal", new WordCountTotalBolt())
                    .fieldsGrouping("mywordcountsplit", new Fields("word"));
    
            //创建任务
            StormTopology job = builder.createTopology();
    
            Config conf = new Config();
    
            //任务有两种运行方式:1、本地模式   2、集群模式
            //1、本地模式
            LocalCluster localcluster = new LocalCluster();
            localcluster.submitTopology("MyWordCount", conf, job);
    
            //2、集群模式:用于打包jar,并放到storm运行
    //        StormSubmitter.submitTopology(args[0], conf, job);
        }
    
    
    }
    RUSH B
  • 相关阅读:
    Springboot 之 自定义配置文件及读取配置文件
    SQLSERVER系统视图 sql server系统表详细说明
    MySQL Workbench建表时 PK NN UQ BIN UN ZF AI 的含义
    使用Ecplise git commit时出现"There are no stages files"
    maven添加sqlserver的jdbc驱动包
    java将XML文档转换成json格式数据
    java将XML文档转换成json格式数据
    cannot be resolved. It is indirectly referenced from required .class files
    org.codehaus.jackson.map.JsonMappingException: Can not construct instance of java.util.Date from String value '2012-12-12 12:01:01': not a valid representation (error: Can not parse date "2012-12-
    @Autowired注解和静态方法 NoClassDefFoundError could not initialize class 静态类
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11131285.html
Copyright © 2011-2022 走看看