zoukankan      html  css  js  c++  java
  • 网站访问量实时统计

    一、需求:统计网站访问量(实时统计)

    技术选型:特点(数据量大、做计算、实时)
    
    实时流式计算框架:storm
    
    1)spout
    数据源,接入数据源
    本地文件
    
    2)splitbolt
    业务逻辑处理
    切分数据
    拿到网址
    
    3)bolt
    累加次数求和

    1、PvCountSpout类

    package com.demo.pvcount;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class PvCountSpout implements IRichSpout{
    
        private SpoutOutputCollector collector;
        private BufferedReader br;
        private String line;
        
        @Override
        public void nextTuple() {
            //发送读取的数据的每一行
            try {
                while((line = br.readLine())!= null) {
                    //发送数据到splitbolt
                    collector.emit(new Values(line));
                    //设置延迟
                    Thread.sleep(500);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
            this.collector = collector;
            
            //读取文件
            try {
                br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/weblog.log")));
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //声明
            declarer.declare(new Fields("logs"));
        }
        
        //处理tuple成功 回调的方法
        @Override
        public void ack(Object arg0) {
        }
    
        //如果spout在失效的模式中 调用此方法来激活
        @Override
        public void activate() {
        }
    
        //在spout程序关闭前执行 不能保证一定被执行 kill -9 是不执行 storm kill 是不执行
        @Override
        public void close() {
        }
    
        //在spout失效期间,nextTuple不会被调用
        @Override
        public void deactivate() {
        }
    
        //处理tuple失败回调的方法
        @Override
        public void fail(Object arg0) {
        }
    
        //配置
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

    2、PvCountSplitBolt类

    package com.demo.pvcount;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class PvCountSplitBolt implements IRichBolt{
    
        private OutputCollector collector;
        
        //一个bolt即将关闭时调用 不能保证一定被调用 资源清理
        @Override
        public void cleanup() {
        }
    
        private int pvnum = 0;
        //业务逻辑 分布式 集群 并发度 线程 (接收tuple然后进行处理)
        @Override
        public void execute(Tuple input) {
            //1.获取数据
            String line = input.getStringByField("logs");
            
            //2.切分数据
            String[] fields = line.split("	");
            String session_id = fields[1];
            
            //3.局部累加
            if (session_id != null) {
                //累加
                pvnum++;
                //输出
                collector.emit(new Values(Thread.currentThread().getId(),pvnum));
            }
        }
    
        //初始化调用
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
            this.collector = collector;
        }
    
        //声明
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //声明输出
            declarer.declare(new Fields("threadid","pvnum"));
        }
    
        //配置
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

    3、PvCountSumBolt类

    package com.demo.pvcount;
    
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;
    
    public class PvCountSumBolt implements IRichBolt{
    
        private HashMap<Long, Integer> hashMap = new HashMap<>();
        
        @Override
        public void cleanup() {
        }
    
        //全局累加求和 业务逻辑
        @Override
        public void execute(Tuple input) {
            //1.获取数据
            Long threadid = input.getLongByField("threadid");
            Integer pvnum = input.getIntegerByField("pvnum");
            
            //2.创建集合 存储(threadid,pvnum) 15 20
            hashMap.put(threadid, pvnum);
            
            //3.累加求和(拿到集合中所有value值)
            Iterator<Integer> iterator = hashMap.values().iterator();
            
            //4.清空之前的数据
            int sumnum = 0;
            while (iterator.hasNext()) {
                sumnum += iterator.next();
            }
            
            System.err.println(Thread.currentThread().getName() + "总访问量为->" + sumnum);
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

    4、PvCountDriver类

    package com.demo.pvcount;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class PvCountDriver {
        public static void main(String[] args) {
            // 1.hadoop->Job storm->topology 创建拓扑
            TopologyBuilder builder = new TopologyBuilder();
    
            // 2.指定设置
            builder.setSpout("PvCountSpout", new PvCountSpout(), 1);
            builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4)
                    .fieldsGrouping("PvCountSpout", new Fields("logs"));
            builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 1).fieldsGrouping("PvCountSplitBolt", new Fields("pvnum"));
    
            // 3.创建配置信息
            Config conf = new Config();
            conf.setNumWorkers(2);
    
            // 4.提交任务
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());
        }
    }

    5、PvCountDriver_Shuffle类

    package com.demo.pvcount;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    
    public class PvCountDriver_Shuffle {
        public static void main(String[] args) {
            // 1.hadoop->Job storm->topology 创建拓扑
            TopologyBuilder builder = new TopologyBuilder();
    
            // 2.指定设置
            builder.setSpout("PvCountSpout", new PvCountSpout(), 1);
            builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4)
                    .shuffleGrouping("PvCountSpout");
            builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 2).shuffleGrouping("PvCountSplitBolt");
    
            // 3.创建配置信息
            Config conf = new Config();
            conf.setNumWorkers(2);
    
            // 4.提交任务
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());
        }
    }

    6、weblog.log文件

    storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 10:40:49
    storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 08:40:50
    storm.apache.org    BBYH61456DEL89RG5VV9UYU7    2018-08-07 10:40:49
    storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 09:40:49
    storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 10:40:49
    storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 12:40:49
    storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 08:40:52
    storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 08:40:50
    storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 09:40:49...
    ...
    ...
    storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:53 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 12:40:49 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:51 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 10:40:49 storm.apache.org HUNTERH6YCGFJYERTT834R52FDXV9U34 2018-08-07 08:40:53 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 08:40:50 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:53 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 10:40:49

    7、运行(4)中的main方法,控制台显示如下图:

    此时在weblog.log文件中增加几条数据,则总访问量相应增加几条。

    至此,简单实现了网站访问量实时统计。

  • 相关阅读:
    zbb20181207 springboot @ConfigurationProperties使用
    zbb20181206 logback,lombok 默认日志logback配置解析
    Spring Boot (8) 全局异常处理
    Spring Boot (7) JdbcTemplate访问数据库
    Spring Boot (6) Spring Data JPA
    Spring Boot (4) 静态页面和Thymeleaf模板
    Spring Boot (3) 热部署devtools
    Spring Boot (2) Restful风格接口
    Spring Boot (1) 构建第一个Spring Boot工程
    idea使用maven搭建ssm框架实现登陆商品增删改查
  • 原文地址:https://www.cnblogs.com/areyouready/p/10188300.html
Copyright © 2011-2022 走看看