zoukankan      html  css  js  c++  java
  • 使用storm分别进行计数和词频统计

    计数

    直接上代码

    public class LocalStormSumTopology {
    
        public static void main(String[] agrs) {
    
            //Topology是通过build模式创建出来的
            //storm中的所有作业都是通过topology来指定的
            TopologyBuilder builder = new TopologyBuilder();
    
            //在设置bolt到topology时,需要设置该bolt的上游的spout或者bolt的id,这样topology才知道该bolt的执行顺序,有点类似于单向链表结构,
            //每一个环节持有上一个环节的引用,在bolt这里是持有上一个环节的id,这样同样可以定位到上一个环节
            builder.setSpout("DataSourceSpout", new DataSourceSpout());
            builder.setBolt("TotalBolt", new TotalBolt()).shuffleGrouping("DataSourceSpout");
    
    
            //启动一个本地的Storm集群,不需要搭真正的集群,本地集群使用LocalCluster来提交topology,如果是在生产环境上提交topology,那么使用
            //这个类StormSubmitter来代替LocalCluster来提交topology
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalStormSumTopology", new Config(), builder.createTopology());
        }
    
    
        private static final String NUM = "num";
    
        /**
         * 发送数据源的spout类,一般是继承BaseRichSpout这个类
         */
        public static class DataSourceSpout extends BaseRichSpout {
    
            private SpoutOutputCollector mCollector;
    
            int num;
    
            /**
             * 在storm开始的开始工作前回调一次,在这里做初始化
             *
             * @param conf      配置参数
             * @param context   上下文
             * @param collector 数据发射器,用来将数据发送到bolt中,类似于rxjava的数据发射器
             */
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                this.mCollector = collector;
            }
    
    
            /**
             * 这是一个死循环方法,会自动循环调用,这个方法用来发送数据到下游
             */
            public void nextTuple() {
    
                //将数据发射到bolt中,一般使用Values这个类,传入的是可变参数,底层封装成ArrayList
                mCollector.emit(new Values(++num));
    
                System.out.println("从spout发射出的数据:" + num);
    
                Utils.sleep(1000);
            }
    
            /**
             * 声明从spout中发射的数据的字段名,在bolt阶段可以通过这里预设置的字段名进行取值,类似于安卓中的使用sp传输,
             * 字段名和发送出来的数据一一对应,这样如果下游需要接收多个数据发射源,那么可以通过该字段名来做区别
             *
             * @param declarer
             */
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                
                //一般使用Fields来进行封装字段名fields底层封装了ArrayList<String>
                declarer.declare(new Fields(NUM));
            }
    
            @Override
            public void close() {
                this.mCollector = null;
            }
        }
    
        public static class TotalBolt extends BaseRichBolt {
    
            private int sum = 0;
    
            /**
             * 初始化方法,跟spout中的open方法类似,只会调用一次,在这里做初始化
             *
             * @param stormConf
             * @param context
             * @param collector
             */
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            }
    
            /**
             * 每从上游接收到一个数据,就调用该方法回调过来
             *
             * @param input 用来提取上一个流程传过来的数据
             */
            public void execute(Tuple input) {
                
                //通过在上游设置的字段名来获取数据
                Integer integerByField = input.getIntegerByField(NUM);
                sum += integerByField;
                System.out.println("累加的结果是:" + sum);
            }
    
            /**
             * 为往下游发送的数据加上字段名,方面区别数据的来源
             * @param declarer
             */
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
    
        }
    }
    
    

    词频统计

    直接上代码

    public class LocalWorldCountStormTopology {
    
    
        public static void main(String[] agrs) {
    
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("DataSourceSpout", new DataSourceSpout());
            builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("DataSourceSpout");
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWorldCountStormTopology", new Config(), builder.createTopology());
        }
    
        /**
         * 输出每一行文本的spout
         */
    
        public static class DataSourceSpout extends BaseRichSpout {
    
            private SpoutOutputCollector mCollector;
    
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                this.mCollector = collector;
            }
    
            public void nextTuple() {
    
                //通过这个方法,可以获取到某一个文件夹下所有符合规定后缀的文件,并且可以设置是否递归获取
                Collection<File> files = FileUtils.listFiles(new File("/Users/teng/Downloads"), new String[]{"txt"}, true);
    
                try {
                    for (File file : files) {
                        
                        //因为下一步还需要做切割,因此需要先将文件一行一行取出来,放在String集合中
                        List<String> lines = FileUtils.readLines(file);
                        for (String line : lines) {
                            //使用,进行分割
                            String[] split = line.split(",");
                            //发射单词出去
                            for (String s : split) {
                                mCollector.emit(new Values(s));
                            }
                        }
                    //执行完成一次之后,需要修改文件名,这样就不用一直执行
                    FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                //定义数据的字段名
                declarer.declare(new Fields("word"));
            }
        }
    
        /**
         * 统计词频的bolt
         */
        public static class CountBolt extends BaseRichBolt {
    
            private Map<String, Integer> map = new HashMap<String, Integer>();
    
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            }
    
            public void execute(Tuple input) {
                String word = input.getStringByField("word");
    
                Integer num = map.get(word);
                if (num == null) {
                    num = 1;
                } else {
                    num++;
                }
    
                map.put(word, num);
    
                System.out.println("~~~~~~~~~");
                Set<Map.Entry<String, Integer>> entries = map.entrySet();
                for (Map.Entry<String, Integer> entry : entries) {
                    System.out.println(entry.getKey() + "出现的次数为:" + entry.getValue());
                }
    
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
        }
    
    }
    
    
  • 相关阅读:
    atom 安装插件列表
    django学习
    windows 安装 python3
    python3 监控代码变化 自动重启 提高开发效率
    git无法pull仓库refusing to merge unrelated histories
    python 项目部署virtualenv
    python 多线程并发threading & 任务队列Queue
    python logging 日志使用
    jupyter 教程
    mysql 替换数据库字段内容
  • 原文地址:https://www.cnblogs.com/flowyourheart/p/shi-yongstorm-fen-bie-jin-xing-ji-shu-he-ci-pin-to.html
Copyright © 2011-2022 走看看