zoukankan      html  css  js  c++  java
  • 二.Flink实时项目电商用户行为之实时流量统计

    1.1 模块创建和数据准备

    在Flink-project下新建一个 maven module作为子项目,命名为gmall-network-flow。在这个子模块中,我们同样并没有引入更多的依赖,所以也不需要改动pom文件。

    src/main/目录下,将apache服务器的日志文件apache.log复制到资源文件目录input下,我们将从这里读取数据。

    当然,我们也可以仍然用UserBehavior.csv作为数据源,这时我们分析的就不是每一次对服务器的访问请求了,而是具体的页面浏览(“pv”)操作。

    1.2 基于服务器log的热门页面浏览量统计

    我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。

    我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问每一个url的次数,然后排序输出显示。

    具体做法为:每隔5秒,输出最近10分钟内访问量最多的前NURL可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。

    src/main/app下创建HotUrlApp类。定义javaBean ApacheLog,这是输入的日志数据流;另外还有UrlViewCount,这是窗口操作统计的输出数据类型。在main函数中创建StreamExecutionEnvironment 并做配置,然后从apache.log文件中读取数据,并包装成ApacheLog类型。

    需要注意的是,原始日志中的时间是dd/MM/yyyy:HH:mm:ss”的形式,需要定义一个DateTimeFormat将其转换为我们需要的时间戳格式:

    .map(line -> {
    String[] split = line.split(",");
    SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
    long time = sdf.parse(split[3]).getTime();
    return new ApacheLog(split[0], split[1], time, split[5], split[6]);
    })

    完整代码如下:

    public class HotUrlApp {

        public static void main(String[] args) throws Exception {

    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.读取文本文件创建流,转换为JavaBean并提取时间戳
    // SingleOutputStreamOperator<ApacheLog> apachLogDS = env.readTextFile("input/apache.log")
    SingleOutputStreamOperator<ApacheLog> apachLogDS = env.socketTextStream("hadoop102", 7777)
    .map(line -> {
    String[] fields = line.split(" ");
    SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
    long time = sdf.parse(fields[3]).getTime();
    return new ApacheLog(fields[0], fields[1], time, fields[5], fields[6]);
    })
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLog>(Time.seconds(1)) {
    @Override
    public long extractTimestamp(ApacheLog element) {
    return element.getEventTime();
    }
    });

    OutputTag<ApacheLog> outputTag = new OutputTag<ApacheLog>("sideOutPut") {
    };

    //3.过滤数据,按照url分组,开窗,累加计算
    SingleOutputStreamOperator<UrlViewCount> aggregate = apachLogDS
    .filter(data -> "GET".equals(data.getMethod()))
    .keyBy(data -> data.getUrl())
    .timeWindow(Time.minutes(10), Time.seconds(5))
    .allowedLateness(Time.seconds(60))
    .sideOutputLateData(outputTag)
    .aggregate(new UrlCountAggFunc(), new UrlCountWindowFunc());

    //4.按照窗口结束时间重新分组,计算组内排序
    SingleOutputStreamOperator<String> result = aggregate.keyBy(data -> data.getWindowEnd())
    .process(new UrlCountProcessFunc(5));

    //5.打印数据
    apachLogDS.print("apachLogDS");
    aggregate.print("aggregate");
    result.print("result");
    aggregate.getSideOutput(outputTag).print("side");

    //6.执行
    env.execute();

    }
    public static class UrlCountAggFunc implements AggregateFunction<ApacheLog, Long, Long> {

    @Override
    public Long createAccumulator() {
    return 0L;
    }

    @Override
    public Long add(ApacheLog value, Long accumulator) {
    return accumulator + 1L;
    }

    @Override
    public Long getResult(Long accumulator) {
    return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
    return a + b;
    }
    }

    public static class UrlCountWindowFunc implements WindowFunction<Long, UrlViewCount, String, TimeWindow> {

    @Override
    public void apply(String url, TimeWindow window, Iterable<Long> input, Collector<UrlViewCount> out) throws Exception {
    out.collect(new UrlViewCount(url, window.getEnd(), input.iterator().next()));
    }
    }

    public static class UrlCountProcessFunc extends KeyedProcessFunction<Long, UrlViewCount, String> {

    //定义TopSize属性
    private Integer topSize;

    public UrlCountProcessFunc() {
    }

    public UrlCountProcessFunc(Integer topSize) {
    this.topSize = topSize;
    }

    //定义集合状态用于存放同一个窗口中的数据
    private MapState<String,UrlViewCount> mapState; //不能用ListState,因为它会把相同url的数据都会保持而我们只需要后面那个状态的,例如<url,1>,<url,2>但我们只要最新来的那个更新后的数据

    @Override
    public void open(Configuration parameters) throws Exception {
    mapState=getRuntimeContext().getMapState(new MapStateDescriptor<String, UrlViewCount>("map-state",String.class,UrlViewCount.class));
    }

    @Override
    public void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception {

    //将数据放置集合状态
    mapState.put(value.getUrl(),value);
    //注册定时器,用于处理状态中的数据
    ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
    //注册定时器,用于触发清空状态的
    ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 60000L);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

    if (timestamp == ctx.getCurrentKey() + 60000L) {
    //清空状态
    mapState.clear(); 超过watermark才清空
    return;
    }

    //1.取出状态中的数据
    Iterator<Map.Entry<String, UrlViewCount>> iterator = mapState.entries().iterator();
    List<Map.Entry<String, UrlViewCount>> entries = Lists.newArrayList(iterator);

    //2.排序
    entries.sort(new Comparator<Map.Entry<String, UrlViewCount>>() {
    @Override
    public int compare(Map.Entry<String, UrlViewCount> o1, Map.Entry<String, UrlViewCount> o2) {
    if (o1.getValue().getCount() > o2.getValue().getCount()) {
    return -1;
    } else if (o1.getValue().getCount() < o2.getValue().getCount()) {
    return 1;
    } else {
    return 0;
    }
    }
    });

    StringBuilder sb = new StringBuilder();
    sb.append("====================== ");
    sb.append("当前窗口结束时间为:").append(new Timestamp(timestamp - 1L)).append(" ");

    //取前topSize条数据输出
    for (int i = 0; i < Math.min(topSize, entries.size()); i++) {
    //取出数据
    Map.Entry<String, UrlViewCount> entry = entries.get(i);
    sb.append("TOP ").append(i + 1);
    sb.append(" URL=").append(entry.getValue().getUrl());
    sb.append(" 页面热度=").append(entry.getValue().getCount());
    sb.append(" ");
    }
    sb.append("====================== ");
    //清空状态
    // listState.clear(); //不在这里删除的原因是每来一条不同的数据它会把上一条数据给清空,我们需要保持上一条数据的状态
    Thread.sleep(1000);
    //输出数据
    out.collect(sb.toString());
    }
    }
    }
    
    

    1.3.1基于埋点日志数据的网络流量统计

    衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page ViewPV)。用户每次打开一个页面便记录1PV,多次打开同一页面则浏览量累计。一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV

    我们知道,用户浏览页面时,会从浏览器向网络服务器发出一个请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,从而产生了一个PV。所以我们的统计方法,可以是从web服务器的日志中去提取对应的页面访问然后统计,就向上一节中的做法一样;也可以直接从埋点日志中提取用户发来的页面请求,从而统计出总浏览量。

    所以,接下来我们用UserBehavior.csv作为数据源,实现一个网站总浏览量的统计。我们可以设置滚动时间窗口,实时统计每小时内的网站PV

    完整代码如下:

    1JavaBean--PVCount

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class PvCount {
        private String url;
        private Long windowEnd;
        private Long count;
    }

    2) 主程序

    public class PageViewApp {
    public static void main(String[] args) throws Exception {

    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(8);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.从文件读取数据创建流并转换为JavaBean同时提取事件时间
    SingleOutputStreamOperator<UserBehavior> userDS = env.readTextFile("input/UserBehavior.csv")
    .map(line -> {
    String[] fileds = line.split(",");
    return new UserBehavior(Long.parseLong(fileds[0]),
    Long.parseLong(fileds[1]),
    Integer.parseInt(fileds[2]),
    fileds[3],
    Long.parseLong(fileds[4]));
    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior element) {
    return element.getTimestamp() * 1000L;
    }
    });

    //3.按照"pv"过滤,按照itemID分组,开窗,计算数据
    SingleOutputStreamOperator<PvCount> aggregate = userDS.filter(data -> "pv".equals(data.getBehavior()))
    .map(new MapFunction<UserBehavior, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> map(UserBehavior value) throws Exception {
    Random random = new Random();
    return new Tuple2<>("pv_" + random.nextInt(8), 1);
    }
    })
    .keyBy(0)
    .timeWindow(Time.hours(1))
    .aggregate(new PvCountAggFunc(), new PvCountWindowFunc());
    //Aggregate是将同一个窗口里的数据聚合(sum(将所有窗口里的数据聚合))
    //Aggregate的作用 是可以给窗口放一个结束时间,拿到结束时间做重新分组然后再聚合
    //按照窗口结束时间重新分组(按定时器加1毫秒作为结束时间)
    SingleOutputStreamOperator<String> result = aggregate.keyBy(data -> data.getWindowEnd())
    .process(new PvCountProcessFunc());

    //4.打印输出
    result.print();

    //5.执行
    env.execute();

    }
    //做累加计算
    public static class PvCountAggFunc implements AggregateFunction<Tuple2<String, Integer>,Long,Long>{

    @Override
    public Long createAccumulator() {
    return 0L;
    }

    @Override
    public Long add(Tuple2<String, Integer> value, Long accumulator) {
    return accumulator + 1L;
    }

    @Override
    public Long getResult(Long accumulator) {
    return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
    return a + b ;
    }
    }
    //提取窗口时间
    public static class PvCountWindowFunc implements WindowFunction<Long, PvCount, Tuple, TimeWindow>{

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<PvCount> out) throws Exception {
    String field = tuple.getField(0);
    out.collect(new PvCount(field,window.getEnd(),input.iterator().next()));
    }
    }
    public static class PvCountProcessFunc extends KeyedProcessFunction<Long,PvCount,String>{
    private ListState<PvCount> listState;

    @Override
    public void open(Configuration parameters) throws Exception {
    listState=getRuntimeContext().getListState(new ListStateDescriptor<PvCount>("list-state",PvCount.class));
    }

    @Override
    public void processElement(PvCount value, Context ctx, Collector<String> out) throws Exception {
    listState.add(value);
    ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1L);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    //取出状态信息
    Iterator<PvCount> iterator = listState.get().iterator();

    //定义最终一个小时的数据总和
    Long count=0L;
    //遍历集合数据累加结果
    while (iterator.hasNext()){
    count += iterator.next().getCount();
    }
    //输出结果数据
    out.collect("pv:"+count);
    //清空状态
    listState.clear();
    }
    }
    }
    1.3.2 网站独立访客数UV)的统计

    在上节的例子中,我们统计的是所有用户对页面的所有浏览行为,也就是说,同一用户的浏览行为会被重复统计。而在实际应用中,我们往往还会关注,在一段时间内到底有多少不同的用户访问了网站。

    另外一个统计流量的重要指标是网站的独立访客数(Unique VisitorUV)。UV指的是一段时间(比如一小时)内访问网站的总人数,1天内同一访客的多次访问只记录为一个访客。通过IPcookie一般是判断UV值的两种方式。当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出一个Cookie,通常放在这个客户端电脑的C盘当中。在这个Cookie中会分配一个独一无二的编号,这其中会记录一些访问服务器的信息,如访问时间,访问了哪些页面等等。当你下次再访问这个服务器的时候,服务器就可以直接从你的电脑中找到上一次放进去的Cookie文件,并且对其进行一些更新,但那个独一无二的编号是不会变的。

    当然,对于UserBehavior数据源来说,我们直接可以根据userId来区分不同的用户。

    完整代码如下:

    1JavaBean--UvCount

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class UvCount {
        private Long windowEnd;
        private Long count;
    }

    2)主程序--UvCountApp

    /**
    * uv是独立访客数 需要去重
    * 去重方式:
    * 不用filter
    * 可以用本地集合直接开窗(timewindowall有触发器 ,windowall是没有触发器(要自己写触发器)) )后面用apply 生成环境不可取(不用分组)
    * 效率太低,所有数据放在一个集合一起处理,内存消耗高 生成环境不可取
    * 全量窗口关闭一起处理
    * 可以用外部框架redis
    *
    */
    public class UvCountApp {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    SingleOutputStreamOperator<UserBehavior> userDS = env.readTextFile("input/UserBehavior.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new UserBehavior(Long.parseLong(fields[0]),
    Long.parseLong(fields[1]),
    Integer.parseInt(fields[2]),
    fields[3],
    Long.parseLong(fields[4]));
    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior element) {
    return element.getTimestamp() * 1000L;
    }
    });
    //3.开窗一个小时
    SingleOutputStreamOperator<UvCount> result = userDS.timeWindowAll(Time.hours(1))
    .apply(new UvCountAllWindowFunc());
    //4.打印
    result.print();
    //5.启动任务
    env.execute();
    }

    //自定义实现AllWindowFunction
    public static class UvCountAllWindowFunc implements AllWindowFunction<UserBehavior, UvCount, TimeWindow> {

    @Override
    public void apply(TimeWindow window, Iterable<UserBehavior> values, Collector<UvCount> out) throws Exception {
    //创建hashSet用于存放userID
    HashSet<Long> userIds = new HashSet<Long>();
    //遍历values
    Iterator<UserBehavior> iterator = values.iterator();
    while (iterator.hasNext()) {
    userIds.add(iterator.next().getUserId());
    }
    //输出数据
    String uv = "uv";
    String windowEnd = new Timestamp(window.getEnd()).toString();
    Long count = (long) userIds.size();
    out.collect(new UvCount(uv, windowEnd, count));
    }
    }
    }

    1.3.3 使用布隆过滤器UV统计

    在上节的例子中,我们把所有数据的userId都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,这种做法也没什么问题;但如果我们遇到的数据量很大呢?

    把所有数据暂存放到内存里,显然不是一个好注意。我们会想到,可以利用redis这种内存级k-v数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数据大到惊人呢?比如上亿级的用户,要去重计算UV

    如果放到redis中,亿级的用户id(每个20字节左右的话)可能需要几G甚至几十G的空间来存储。当然放到redis中,用集群进行扩展也不是不可以,但明显代价太大了。

    一个更好的想法是,其实我们不需要完整地存储用户ID的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个用户的状态。这个思想的具体实现就是布隆过滤器(Bloom Filter)。

    本质上布隆过滤器是一种数据结构比较巧妙的概率型数据结构probabilistic data structure),特点是高效地插入和查询,可以用来告诉你 “某样东西一定不存在或者可能存在”。

    它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是0,就是1。相比于传统的 ListSetMap 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。

    我们的目标就是,利用某种方法(一般是Hash函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是1,不存在则为0

    接下来我们就来具体实现一下。

    注意这里我们用到了redis连接存取数据,所以需要加入redis客户端的依赖:

    <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.1</version>
        </dependency>
    </dependencies>

    完整代码如下:

     

    public class UvCountWithBloomFilterApp {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    SingleOutputStreamOperator<UserBehavior> userDS = env.readTextFile("input/UserBehavior.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new UserBehavior(Long.parseLong(fields[0]),
    Long.parseLong(fields[1]),
    Integer.parseInt(fields[2]),
    fields[3],
    Long.parseLong(fields[4]));
    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior element) {
    return element.getTimestamp() * 1000L;
    }
    });
    //3.开窗一个小时
    SingleOutputStreamOperator<UvCount> result = userDS.timeWindowAll(Time.hours(1))
    .trigger(new MyTrigger())
    .process(new UvWithBloomFilterWindowFunc());
    result.print();
    env.execute();

    }
    //自定义触发器,每来一条数据,触发一次计算并输出结果
    public static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {

    @Override
    public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.FIRE_AND_PURGE;//做计算和输出
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

    }
    }
    //考虑redis
    //.存储数据本身 900-1000 count
    //1.如果用hash存考虑外部key和内部key
    //我们使用hash的时候,外层key可以在外面拿,而内存key(窗口的信息)要通过process方法里的
    // Context上下文来拿
    //外部key(随便给) UvCount 内部key 900-1000 value count
    //过期时间只能删除外部key不能删除内部key(数据一旦超时会将根据外部key将数据全部删除而我们希望可以根据内部key来删除)
    //UvCount900-1000count
    //2.如果用String(数据量极大的时候,我们考虑用String,如果定义超时时间,数据要删的化,也考虑String)
    //key(我们通过拼接的方式) UvCount-900-1000 value count
    //.BitMap 我们要操作SETBIT,GETBIT,我们只能用String来存
    //bitMap-900-1000
    public static class UvWithBloomFilterWindowFunc extends ProcessAllWindowFunction<UserBehavior, UvCount,TimeWindow>{
    //定义Redis连接
    Jedis jedis;
    //定义一个布隆过滤器
    MyBloomFilter myBloomFilter;

    //定义UvCountRedisKey Hash
    String uvCountRedisKey;

    @Override
    public void open(Configuration parameters) throws Exception {
    jedis = new Jedis("hadoop102", 6379);
    myBloomFilter=new MyBloomFilter(1L << 29);//64M 1亿数据*(3-10)这里我们取5
    //5亿 5*10^8 =(2^10)^3/2=2^29 5亿 是2^29bit=1024*1024*2^3*2^6=64M
    uvCountRedisKey="UvCount";
    }

    @Override
    public void process(Context context, Iterable<UserBehavior> elements, Collector<UvCount> out) throws Exception {
    //1.获取窗口信息并指定UvCountRedisKeyfield,同时指定BitMapRedisKey
    String windowEnd = new Timestamp(context.window().getEnd()).toString();
    String bitMapRedisKey="UvBitMap:"+windowEnd;
    //2.判断当前的userID是否已经存在
    Long offset = myBloomFilter.hash(elements.iterator().next().getUserId() + "");
    Boolean exist = jedis.getbit(bitMapRedisKey, offset);
    //3.如果不存在,redis中累加数据,并将BitMap中对应的位置改为true
    if(! exist){
    jedis.hincrBy(uvCountRedisKey,windowEnd,1L);
    jedis.setbit(bitMapRedisKey,offset,true);
    }
    //4.取出Redis中对应的Count,发送
    long count=Long.parseLong(jedis.hget(uvCountRedisKey,windowEnd));
    //5.发送数据
    out.collect(new UvCount("uv",windowEnd,count));

    }

    @Override
    public void close() throws Exception {
    jedis.close();
    }
    }
    //自定义一个布隆过滤器,定义一个位图的大小和hash函数
    public static class MyBloomFilter{
    //定义布隆过滤器的总容量,bit的个数,必须是2的整次幂
    private Long cap;

    public MyBloomFilter() {
    }

    public MyBloomFilter(Long cap) {
    this.cap = cap;
    }
        //hash函数
    public Long hash(String value){
    int result=0;
    for(char c:value.toCharArray()){
    result=result * 31 + c;//乘以质数31是为了abc ,bca 虽然元素相同,但位置不同,结果不同
    }
    //位与运算   // 返回hash值,不能超过cap
    return result & (cap - 1);//由于是2的整次幂,cap-1 由原来的10000000 变成 01111111
    //result大于cap-1 假如result11100001
    //01111111 上下做位与操作 去除了result 大于cap-1的可能
    //11100001 高位取的是cap -1 低位取的是 result
    // 01100001
    }
    }
    }

     

     

     

     

     

     









     

     

     

     

  • 相关阅读:
    大神总结的
    更改Xcode的缺省公司名
    iPhone分辨率
    iOS 的 APP 如何适应 iPhone 5s/6/6Plus 三种屏幕的尺寸?
    storyBoard(tableViewl)
    storyBoard
    XIB可视化编程
    UITableView(五)
    UITableView(四)
    UITableView(三)
  • 原文地址:https://www.cnblogs.com/whdd/p/14058363.html
Copyright © 2011-2022 走看看