zoukankan      html  css  js  c++  java
  • Flink用户画像系统之用户年代标签

    注意,整个任务的流程如下:

     * Task执行过程:环境配置---> 获取输入数据 ---> map操作(数据打标签,存入hbase)---> groupby(分组)
               ---> reduce(根据标签聚合) --->sink(存入mongodb)

    1、Task任务

    package com.youfan.task;
    
    import com.youfan.entity.YearBase;
    import com.youfan.map.YearBaseMap;
    import com.youfan.reduce.YearBaseReduce;
    import com.youfan.util.MongoUtils;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.bson.Document;
    
    import java.util.List;
    
    /**
     * 
     * Task执行过程:环境配置---> 获取输入数据 ---> map操作(数据打标签,存入hbase)---> groupby(分组)
     *          ---> reduce(根据标签聚合) --->sink(存入mongodb)
     */
    public class YearBaseTask {
        public static void main(String[] args) {
            final ParameterTool params = ParameterTool.fromArgs(args);
    
            // set up the execution environment
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            // make parameters available in the web interface
            env.getConfig().setGlobalJobParameters(params);
    
            // get input data
            DataSet<String> text = env.readTextFile(params.get("input"));
    
            DataSet<YearBase> mapresult = text.map(new YearBaseMap());
            DataSet<YearBase> reduceresutl = mapresult.groupBy("groupfield").reduce(new YearBaseReduce());
            try {
                List<YearBase> reusltlist = reduceresutl.collect();//获取reduce中的所有对象
                for(YearBase yearBase:reusltlist){
                        String yeartype = yearBase.getYeartype();
                        Long count = yearBase.getCount();
    
                    Document doc = MongoUtils.findoneby("yearbasestatics","portrait",yeartype);
                    if(doc == null){
                        doc = new Document();
                        doc.put("info",yeartype);
                        doc.put("count",count);
                    }else{
                        Long countpre = doc.getLong("count");
                        Long total = countpre+count;
                        doc.put("count",total);
                    }
                    MongoUtils.saveorupdatemongo("yearbasestatics","portrait",doc);
                }
                env.execute("year base analy");
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }

    2、map任务

    package com.youfan.map;
    
    import com.yangwj.entity.YearBase;
    import com.yangwj.util.DateUtils;
    import com.yangwj.util.HbaseUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.api.common.functions.MapFunction;
    
    /**
     * 
     */
    public class YearBaseMap implements MapFunction<String, YearBase>{
        @Override
        public YearBase map(String s) throws Exception {
            if(StringUtils.isBlank(s)){
                return null;
            }
            String[] userinfos = s.split(",");
            String userid = userinfos[0];
            String username = userinfos[1];
            String sex = userinfos[2];
            String telphone = userinfos[3];
            String email = userinfos[4];
            String age = userinfos[5];
            String registerTime = userinfos[6];
            String usetype = userinfos[7];//'终端类型:0、pc端;1、移动端;2、小程序端'
    
            String yearbasetype = DateUtils.getYearbasebyAge(age);//打标签
            String tablename = "userflaginfo";
            String rowkey = userid;
            String famliyname = "baseinfo";
            String colum = "yearbase";//年代
            HbaseUtils.putdata(tablename,rowkey,famliyname,colum,yearbasetype);//打标签,存入Hbase
            HbaseUtils.putdata(tablename,rowkey,famliyname,"age",age);
            YearBase yearBase = new YearBase();
            String groupfield = "yearbase=="+yearbasetype;//用于reduce分组
            yearBase.setYeartype(yearbasetype);
            yearBase.setCount(1l);
            yearBase.setGroupfield(groupfield);
            return yearBase;
        }
    }

    3、reduce任务

    package com.yangwj.reduce;
    
    import com.yangwj.entity.YearBase;
    import org.apache.flink.api.common.functions.ReduceFunction;
    
    /**
     * 
     */
    public class YearBaseReduce implements ReduceFunction<YearBase>{
        @Override
        public YearBase reduce(YearBase yearBase, YearBase t1) throws Exception {
            String yeartype = yearBase.getYeartype();
            Long count1 = yearBase.getCount();
    
            Long count2 = t1.getCount();
    
            YearBase finalyearBase = new YearBase();
            finalyearBase.setYeartype(yeartype);
            finalyearBase.setCount(count1+count2);
            return finalyearBase;
        }
    }

    4、HbaseUtil

    package com.youfan.util;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * Created by li on 2019/1/5.
     */
    public class HbaseUtils {
            private static Admin admin = null;
            private static Connection conn = null;
            static{
                // 创建hbase配置对象
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.rootdir","hdfs://192.168.80.134:9000/hbase");
                //使用eclipse时必须添加这个,否则无法定位
                conf.set("hbase.zookeeper.quorum","192.168.80.134");
                conf.set("hbase.client.scanner.timeout.period", "600000");
                conf.set("hbase.rpc.timeout", "600000");
                try {
                    conn = ConnectionFactory.createConnection(conf);
                    // 得到管理程序
                    admin = conn.getAdmin();
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
            }
    
            /**
             * 插入数据,create "userflaginfo,"baseinfo"
             * create "tfidfdata,"baseinfo"
             */
            public static void put(String tablename, String rowkey, String famliyname, Map<String,String> datamap) throws Exception {
                Table table = conn.getTable(TableName.valueOf(tablename));
                // 将字符串转换成byte[]
                byte[] rowkeybyte = Bytes.toBytes(rowkey);
                Put put = new Put(rowkeybyte);
                if(datamap != null){
                    Set<Map.Entry<String,String>> set = datamap.entrySet();
                    for(Map.Entry<String,String> entry : set){
                        String key = entry.getKey();
                        Object value = entry.getValue();
                        put.addColumn(Bytes.toBytes(famliyname), Bytes.toBytes(key), Bytes.toBytes(value+""));
                    }
                }
                table.put(put);
                table.close();
                System.out.println("ok");
            }
    
            /**
             *
             */
            public static String getdata(String tablename, String rowkey, String famliyname,String colum) throws Exception {
                Table table = conn.getTable(TableName.valueOf(tablename));
                // 将字符串转换成byte[]
                byte[] rowkeybyte = Bytes.toBytes(rowkey);
                Get get = new Get(rowkeybyte);
                Result result =table.get(get);
                byte[] resultbytes = result.getValue(famliyname.getBytes(),colum.getBytes());
                if(resultbytes == null){
                    return null;
                }
    
                return new String(resultbytes);
            }
    
            /**
             *
             */
            public static void putdata(String tablename, String rowkey, String famliyname,String colum,String data) throws Exception {
                Table table = conn.getTable(TableName.valueOf(tablename));
                Put put = new Put(rowkey.getBytes());
                put.addColumn(famliyname.getBytes(),colum.getBytes(),data.getBytes());
                table.put(put);
            }
    
    
    }
    View Code

    5、MongodbUtil

    package com.youfan.util;
    
    import com.alibaba.fastjson.JSONObject;
    import com.mongodb.MongoClient;
    import com.mongodb.client.FindIterable;
    import com.mongodb.client.MongoCollection;
    import com.mongodb.client.MongoCursor;
    import com.mongodb.client.MongoDatabase;
    import org.bson.Document;
    import org.bson.types.ObjectId;
    
    /**
     * 
     */
    public class MongoUtils {
    
        private static MongoClient mongoClient = new MongoClient("192.168.80.134",27017);
    
    
    
        public static Document findoneby(String tablename, String database,String yearbasetype){
            MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
            MongoCollection mongoCollection = mongoDatabase.getCollection(tablename);
            Document  doc = new Document();
            doc.put("info", yearbasetype);
            FindIterable<Document> itrer = mongoCollection.find(doc);
            MongoCursor<Document> mongocursor = itrer.iterator();
            if(mongocursor.hasNext()){
                return mongocursor.next();
            }else{
                return null;
            }
        }
    
    ` `
        public static void saveorupdatemongo(String tablename,String database,Document doc) {
            MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
            MongoCollection<Document> mongocollection = mongoDatabase.getCollection(tablename);
            if(!doc.containsKey("_id")){
                ObjectId objectid = new ObjectId();
                doc.put("_id", objectid);
                mongocollection.insertOne(doc);
                return;
            }
            Document matchDocument = new Document();
            String objectid = doc.get("_id").toString();
            matchDocument.put("_id", new ObjectId(objectid));
            FindIterable<Document> findIterable =  mongocollection.find(matchDocument);
            if(findIterable.iterator().hasNext()){
                mongocollection.updateOne(matchDocument, new Document("$set",doc));
                try {
                    System.out.println("come into saveorupdatemongo ---- update---"+ JSONObject.toJSONString(doc));
                } catch (Exception e) {
    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else{
                mongocollection.insertOne(doc);
                try {
                    System.out.println("come into saveorupdatemongo ---- insert---"+JSONObject.toJSONString(doc));
                }catch (Exception e) {
    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    View Code
  • 相关阅读:
    Eclipse中项目进行发布到Tomcat中的位置
    Tomcat中server.xml文件的配置
    Tomcat的安装跟配置
    实习第二天(查看项目源代码)
    mac、windows、linux版jdk1.8下载
    idea搭建简单ssm框架的最详细教程(新)
    ssm中mapper注入失败的传奇经历
    富文本编辑器handyeditor,上传和预览图片的host地址不一样
    nginx配置ssl证书
    java中pojo对象首字母大写导致无法赋值问题
  • 原文地址:https://www.cnblogs.com/ywjfx/p/12343468.html
Copyright © 2011-2022 走看看