zoukankan      html  css  js  c++  java
  • Flink用户画像系统之实时品牌爱好

    技术点:springcloud + kafka + hbase + mogodb

    1、建立实体对象

         浏览商品行为    ScanProductLog
            收藏商品行为    CollectProductLog    
            购物车行为        BuyCartProductLog
            关注商品行为    AttentionProductLog

    2、搭建kafka并创建topic ,搭建可参考:https://www.cnblogs.com/ywjfx/p/10305161.html ,整合springboot可参考:https://www.cnblogs.com/ywjfx/p/11197646.html

                bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic scanProductLog
                
                bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic collectProductLog
    
                bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic buyCartProductLog
    
                bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic attentionProductLog

    3、添加flink stream 依赖

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
                <version>${project.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>${project.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${project.version}</version>
            </dependency>

    4、BrandLikeTask.java

    package com.yangwj.task;
    
    import com.yangwj.entity.BrandLike;
    import com.yangwj.kafka.KafkaEvent;
    import com.yangwj.map.BrandLikeMap;
    import com.yangwj.reduce.BrandLikeReduce;
    import com.yangwj.reduce.BrandLikeSink;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
    import com.yangwj.kafka.KafkaEventSchema;
    import javax.annotation.Nullable;
    
    /**
     * Created by li on 2019/1/6.
     */
    public class BrandlikeTask {
        public static void main(String[] args) {
            // parse input arguments
            args = new String[]{"--input-topic","scanProductLog","--bootstrap.servers","192.168.80.134:9092","--zookeeper.connect","192.168.80.134:2181","--group.id","yangwj"};
            final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
    //        if (parameterTool.getNumberOfParameters() < 5) {
    //            System.out.println("Missing parameters!
    " +
    //                    "Usage: Kafka --input-topic <topic> --output-topic <topic> " +
    //                    "--bootstrap.servers <kafka brokers> " +
    //                    "--zookeeper.connect <zk quorum> --group.id <some id>");
    //            return;
    //        }
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().disableSysoutLogging();
            env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
            env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
            env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            DataStream<KafkaEvent> input = env
                    .addSource(
                            new FlinkKafkaConsumer010<>(
                                    parameterTool.getRequired("input-topic"),
                                    new KafkaEventSchema(),
                                    parameterTool.getProperties())
                                    .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));//订阅并读取kafka数据
            DataStream<BrandLike> brandLikeMap = input.flatMap(new BrandLikeMap());
    
            DataStream<BrandLike> brandLikeReduce = brandLikeMap.keyBy("groupbyfield").timeWindowAll(Time.seconds(2)).reduce(new BrandLikeReduce());
    
            brandLikeReduce.addSink(new BrandLikeSink());
    
            try {
                env.execute("brandLike analy");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<KafkaEvent> {
    
            private static final long serialVersionUID = -742759155861320823L;
    
            private long currentTimestamp = Long.MIN_VALUE;
    
            @Override
            public long extractTimestamp(KafkaEvent event, long previousElementTimestamp) {
                // the inputs are assumed to be of format (message,timestamp)
                this.currentTimestamp = event.getTimestamp();
                return event.getTimestamp();
            }
    
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
            }
        }
    }

    5、KafkaEvent.java

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.yangwj.kafka;
    
    /**
     * The event type used in the {@link Kafka010Example}.
     *
     * <p>This is a Java POJO, which Flink recognizes and will allow "by-name" field referencing
     * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} of such a type.
     * For a demonstration of this, see the code in {@link Kafka010Example}.
     */
    public class KafkaEvent {
        private final static String splitword = "##";
        private String word;
        private int frequency;
        private long timestamp;
    
        public KafkaEvent() {}
    
        public KafkaEvent(String word, int frequency, long timestamp) {
            this.word = word;
            this.frequency = frequency;
            this.timestamp = timestamp;
        }
    
        public String getWord() {
            return word;
        }
    
        public void setWord(String word) {
            this.word = word;
        }
    
        public int getFrequency() {
            return frequency;
        }
    
        public void setFrequency(int frequency) {
            this.frequency = frequency;
        }
    
        public long getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }
    
        public static KafkaEvent fromString(String eventStr) {
            String[] split = eventStr.split(splitword);
            return new KafkaEvent(split[0], Integer.valueOf(split[1]), Long.valueOf(split[2]));
        }
    
        @Override
        public String toString() {
            return word +splitword + frequency + splitword + timestamp;
        }
    }
    View Code

    6、KafkaEventSchema.java

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.yangwj.kafka;
    
    import org.apache.flink.api.common.serialization.DeserializationSchema;
    import org.apache.flink.api.common.serialization.SerializationSchema;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    
    import java.io.IOException;
    
    /**
     * The serialization schema for the {@link KafkaEvent} type. This class defines how to transform a
     * Kafka record's bytes to a {@link KafkaEvent}, and vice-versa.
     */
    public class KafkaEventSchema implements DeserializationSchema<KafkaEvent>, SerializationSchema<KafkaEvent> {
    
        private static final long serialVersionUID = 6154188370181669758L;
    
        @Override
        public byte[] serialize(KafkaEvent event) {
            return event.toString().getBytes();
        }
    
        @Override
        public KafkaEvent deserialize(byte[] message) throws IOException {
            return KafkaEvent.fromString(new String(message));
        }
    
        @Override
        public boolean isEndOfStream(KafkaEvent nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<KafkaEvent> getProducedType() {
            return TypeInformation.of(KafkaEvent.class);
        }
    }
    View Code

    7、BrandLikeMap.java 

    注意:使用 FlatMapFunction,是因为要有两个标签给到reduce,如果只有一个标签,可以使用MapFunction

    package com.yangwj.map;
    
    import com.alibaba.fastjson.JSONObject;
    import com.yangwj.entity.BrandLike;
    import com.yangwj.kafka.KafkaEvent;
    import com.yangwj.log.ScanProductLog;
    import com.yangwj.util.HbaseUtils;
    import com.yangwj.utils.MapUtils;
    import org.apache.commons.lang.StringUtils;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.util.Collector;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Created by li on 2019/1/6.
     */
    public class BrandLikeMap implements FlatMapFunction<KafkaEvent, BrandLike>  {
    
        @Override
        public void flatMap(KafkaEvent kafkaEvent, Collector<BrandLike> collector) throws Exception {
                String data = kafkaEvent.getWord();
                ScanProductLog scanProductLog = JSONObject.parseObject(data,ScanProductLog.class);
                int userid = scanProductLog.getUserid();
                String brand = scanProductLog.getBrand();
                String tablename = "userflaginfo";
                String rowkey = userid+"";
                String famliyname = "userbehavior";
                String colum = "brandlist";//运营
                String mapdata = HbaseUtils.getdata(tablename,rowkey,famliyname,colum);
                Map<String,Long> map = new HashMap<String,Long>();
                if(StringUtils.isNotBlank(mapdata)){
                    map = JSONObject.parseObject(mapdata,Map.class);
                }
                //获取之前的品牌偏好
                String maxprebrand = MapUtils.getmaxbyMap(map);
    
                long prebarnd = map.get(brand)==null?0l:map.get(brand);
                map.put(brand,prebarnd+1);
                String finalstring = JSONObject.toJSONString(map);
                HbaseUtils.putdata(tablename,rowkey,famliyname,colum,finalstring);
    
                String maxbrand = MapUtils.getmaxbyMap(map);
                if(StringUtils.isNotBlank(maxbrand)&&!maxprebrand.equals(maxbrand)){
                    BrandLike brandLike = new BrandLike();
                    brandLike.setBrand(maxprebrand);
                    brandLike.setCount(-1l);
                    brandLike.setGroupbyfield("==brandlik=="+maxprebrand);
                    collector.collect(brandLike);
                }
    
                BrandLike brandLike = new BrandLike();
                brandLike.setBrand(maxbrand);
                brandLike.setCount(1l);
                collector.collect(brandLike);
                brandLike.setGroupbyfield("==brandlik=="+maxbrand);
                colum = "brandlike";
                HbaseUtils.putdata(tablename,rowkey,famliyname,colum,maxbrand);
    
        }
    
    }

    8、BrandLikeReduce.java

    package com.yangwj.reduce;
    
    import com.yangwj.entity.BrandLike;
    import com.yangwj.entity.CarrierInfo;
    import org.apache.flink.api.common.functions.ReduceFunction;
    
    /**
     * Created by li on 2019/1/6.
     */
    public class BrandLikeReduce implements ReduceFunction<BrandLike> {
        @Override
        public BrandLike reduce(BrandLike brandLike, BrandLike t1) throws Exception {
            String brand = brandLike.getBrand();
            long count1 = brandLike.getCount();
            long count2 = t1.getCount();
            BrandLike brandLikefinal = new BrandLike();
            brandLikefinal.setBrand(brand);
            brandLikefinal.setCount(count1+count2);
            return brandLikefinal;
        }
    }

    9、BrandLikeSink.java

    package com.yangwj.reduce;
    
    import com.yangwj.entity.BrandLike;
    import com.yangj.util.MongoUtils;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.bson.Document;
    
    /**
     * Created by li on 2019/1/6.
     */
    public class BrandLikeSink implements SinkFunction<BrandLike> {
        @Override
        public void invoke(BrandLike value, Context context) throws Exception {
            String brand = value.getBrand();
            long count = value.getCount();
            Document doc = MongoUtils.findoneby("brandlikestatics","portrait",brand);
            if(doc == null){
                doc = new Document();
                doc.put("info",brand);
                doc.put("count",count);
            }else{
                Long countpre = doc.getLong("count");
                Long total = countpre+count;
                doc.put("count",total);
            }
            MongoUtils.saveorupdatemongo("brandlikestatics","portrait",doc);
        }
    }

    10、BrandLike.java

    package com.yangwj.entity;
    
    /**
     * Created by li on 2019/1/6.
     */
    public class BrandLike {
        private String brand;
        private long count;
        private String groupbyfield;
    
        public String getGroupbyfield() {
            return groupbyfield;
        }
    
        public void setGroupbyfield(String groupbyfield) {
            this.groupbyfield = groupbyfield;
        }
    
        public String getBrand() {
            return brand;
        }
    
        public void setBrand(String brand) {
            this.brand = brand;
        }
    
        public long getCount() {
            return count;
        }
    
        public void setCount(long count) {
            this.count = count;
        }
    }
    View Code
  • 相关阅读:
    BZOJ 3924 / Luogu P3345 [ZJOI2015]幻想乡战略游戏 (动态点分治/点分树)
    BZOJ 3065 带插入区间K小值 (替罪羊树套线段树)
    BZOJ 3217: ALOEXT (块状链表套trie)
    BZOJ 3514: Codechef MARCH14 GERALD07加强版 (LCT维护最大生成树+主席树)
    BZOJ 3932: [CQOI2015]任务查询系统 (主席树板题)
    BZOJ 3658: Jabberwocky (双向链表+BIT)
    BZOJ 1180 [CROATIAN 2009]OTOCI // BZOJ 2843 极地旅行社 // Luogu P4321 [COCI 2009] OTOCI / 极地旅行社 (LCA板题)
    BZOJ 2759 一个动态树好题 (LCT)
    BZOJ 2244: [SDOI2011]拦截导弹 (CDQ分治 三维偏序 DP)
    codefroces 612E Square Root of Permutation
  • 原文地址:https://www.cnblogs.com/ywjfx/p/12349496.html
Copyright © 2011-2022 走看看