zoukankan      html  css  js  c++  java
  • 使用Storm进行词频统计

    词频统计

    1.需求:读取指定目录的数据,并且实现单词计数功能
    2.实现方案:
    Spout用于读取指定文件夹(目录),读取文件,将文件的每一行发射到Bolt
    SplitBolt用于接收Spout发射过来的数据,并拆分,发射到CountBolt
    CountBolt接收SplitBolt发送的每一个单词,进行单词计数操作
    3.拓扑设计:
    DataSourceSpout + SplitBolt + CountBolt

    代码如下:

    package com.csylh;
    
    import org.apache.commons.io.FileUtils;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.*;
    
    /**
     * Description:使用Storm完成词频统计功能
     *
     * @author: 留歌36
     * Date:2018/9/4 9:28
     */
    public class LocalWordCountStormTopology {
        /**
         * 读取数据并发送到Bolt上去
         */
        public static class DataSourceSpout extends BaseRichSpout{
            //定义一个发射器
            private SpoutOutputCollector collector;
    
            /**
             * 初始化方法 只是会被调用一次
             * @param conf  配置参数
             * @param context  上下文
             * @param collector  数据发射器
             */
            @Override
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                //对上面定义的的发射器进行赋初值
                this.collector = collector;
            }
    
            /**
             * 用于数据的产生
             * 业务:
             * 1.读取指定目录的文件夹下的数据
             * 2.把每一行数据发射出去
             */
            @Override
            public void nextTuple() {
    //            获取所有文件,这里指定文件的后缀
                Collection<File> files = FileUtils.listFiles(new File("E:\StormText"),new String[]{"txt"},true);
    //            循环遍历每一个文件 ==>  由于这里指定的是文件夹下面的目录 所以就是需要进行循环遍历
                for( File file : files){
                    try {
    //                    获取每一个文件的每一行
                        List<String> lines =  FileUtils.readLines(file);
                        for(String line : lines){
    //                        把每一行数据发射出去
                            this.collector.emit(new Values(line));
                        }
                        //TODO 数据处理完毕之后 改名  否则的话 会一直执行的
                        FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));
    
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
    
            /**
             * 声明输出字段名称
             * @param declarer
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("line"));
            }
        }
        /**
         * 对Spout发送过来的数据进行分割
         */
        public static class SplitBolt extends BaseRichBolt{
            private OutputCollector collector;
            /**
             * 初始化方法  只是会被执行一次
             * @param stormConf
             * @param context
             * @param collector Bolt的发射器,指定下一个Bolt的地址
             */
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                    this.collector = collector;
            }
    
            /**
             * 用于获取Spout发送过来的数据
             * 业务逻辑
             *  spout发送过来的数据是一行一行的line
             *  这里是需要line进行分割
             *
             * @param input
             */
            @Override
            public void execute(Tuple input) {
                String line = input.getStringByField("line");
                String[] words = line.split(",");
    
                for(String word : words){
    //                这里把每一个单词发射出去
                    this.collector.emit(new Values(word));
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                    declarer.declare(new Fields("word"));
            }
        }
        /**
         * 词频汇总的Bolt
         */
        public static class CountBolt extends BaseRichBolt{
            /**
             * 由于这里是不需要向外部发射  所以就不需要定义Collector
             * @param stormConf
             * @param context
             * @param collector
             */
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            }
            Map<String,Integer> map = new HashMap<String, Integer>();
            /**
             * 业务逻辑
             * 1.获取每一个单词
             * 2.对每一个单词进行汇总
             * 3.输出结果
             * @param input
             */
            @Override
            public void execute(Tuple input) {
    //            获取每一个单词
               String word = input.getStringByField("word");
               Integer count =  map.get(word);
               if (count == null){
                   count = 0;
               }
                count++;
    //           对单词进行汇总
                map.put(word,count);
    //           输出
                System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
                Set<Map.Entry<String,Integer>> entrySet = map.entrySet();
                for(Map.Entry<String,Integer> entry :entrySet){
                    System.out.println(entry);
                }
            }
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }
        }
        /**
         * 主函数
         * @param args
         */
        public static void main(String[] args) {
    //            使用TopologyBuilder根据Spout和Bolt构建Topology
            TopologyBuilder builder = new TopologyBuilder();
    //            设置Bolt和Spout  设置Spout和Bolt的关联关系
            builder.setSpout("DataSourceSpout",new DataSourceSpout());
            builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout");
            builder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt");
    //            创建一个本地的集群
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWordCountStormTopology",new Config(),builder.createTopology());
        }
    }
    
    

    小结:开发Storm程序的步骤就是:
    根据需求 设计实现方案 规划拓扑

    一般是先写Spout数据产生器 发射数据到Bolt
    接着,就是Bolt进行数据处理,如果有多个Bolt,非最后一个Bolt也要写发射器Collector
    最后一个Bolt直接输出结果或者 输出到HDFS或者关系型数据库中
    最终需要将Spout和Bolt进行组装起来(借助TopologyBuilder)

  • 相关阅读:
    使用PHP Socket 编程模拟Http post和get请求
    php socket客户端及服务器端应用实例
    php五大运行模式CGI,FAST-CGI,CLI,ISAPI,APACHE模式浅谈
    php 连接 mssql sql2008
    开源内容管理系统Joomla正式发布3.5版本 基于PHP 7
    swift--使用 is 和 as 操作符来实现类型检查和转换 / AnyObject与Any的区别
    swift--获取window
    ios开发之--ios11适配:TableView的heightForHeaderInSection设置高度无效/UISearchBar消失
    swift--触摸(UITouch)事件(点击,移动,抬起)
    swift--添加新手引导页
  • 原文地址:https://www.cnblogs.com/liuge36/p/9882747.html
Copyright © 2011-2022 走看看