zoukankan      html  css  js  c++  java
  • java使用flink集成mybatis每五分钟实时计算小时内用户行为数据

    java使用flink集成mybatis每五分钟实时计算小时内用户行为数据

    目前在学习flink,写了一个比较常见的需求:每五分钟统计前一小时用户点击最多的商品,并且把源数据存入mysql.

    实现思路:

    使用滑动窗口 size 1h,间隔5分钟,使用商品作为keyby的分组,过滤掉不是点击的数据,aggregate函数来增量计算每一个商品被点击的数量 使用ProcessWindowFunction方法组成二元组<商品id,点击数量>最后存入redis zset类型中,以商品id为key,点击次数为score.

    异步的将所有用户行为数据和迟到数据存入mysql

    下面是我的代码

    用户行为实体类:

    @Data
    public class UserBehavingInfo {
    
        private String userNo;
    
        /**
         * 用户行为
         */
        private String behavior;
    
        /**
         * 行为商品
         */
        private String operatedGoods;
    
        /**
         * 行为发生时间
         */
        private Long time;
    }
    

    main函数

    public static void main(String[] args) throws Exception {
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //事件时间
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.219.128:9092");
            properties.setProperty("group.id", "event");
            properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(("eventDetails"), new SimpleStringSchema(), properties);
            //flink会自动保存kafka 偏移量作为状态
            // start from the earliest record possible
            kafkaConsumer.setStartFromGroupOffsets();
    
            // 接收kafka数据,转为UserBehavingInfo 对象
            SingleOutputStreamOperator<UserBehavingInfo> input =
                    env.addSource(kafkaConsumer)
                            .map(string -> JSON.parseObject(string, UserBehavingInfo.class)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBehavingInfo>(Time.seconds(5)) {
                        @Override
                        public long extractTimestamp(UserBehavingInfo UserBehavingInfo) {
                            System.out.println("mark:" + (UserBehavingInfo.getTime() - 5*1000L));
                            return UserBehavingInfo.getTime();
                        }
                    }).setParallelism(1);
            //将用户行为数据异步插入mysql
            // 异步IO 获取mysql数据, timeout 时间 1s,容量 10(超过10个请求,会反压上游节点) unorderedWait返回结果无顺序(如果是事件时间 实则会根据watermark排序)  orderedWait返回结果有序(fifo)
            //超过10个请求,会反压上游节点  反压机制来抑制上游数据的摄入
            AsyncDataStream.unorderedWait(input, new AsyncInsertUserBehaviorToMysql(), 1000, TimeUnit.MICROSECONDS, 10);
    
            SingleOutputStreamOperator<UserBehavingInfo> filterClick = input.filter(new FilterFunction<UserBehavingInfo>() {
                @Override
                public boolean filter(UserBehavingInfo userBehavingInfo) throws Exception {
                    return "click".equals(userBehavingInfo.getBehavior());
                }
            });
    
            //创建迟到数据侧输出流
            OutputTag<UserBehavingInfo> lateOutputUserBehavior = new OutputTag<UserBehavingInfo>("late-userBehavior-data"){};
            SingleOutputStreamOperator<Tuple2<String, Integer>> aggregateUserClick = filterClick
                    .keyBy(new KeySelector<UserBehavingInfo, String>() {
                        @Override
                        public String getKey(UserBehavingInfo userBehavingInfo) throws Exception {
                            return userBehavingInfo.getOperatedGoods();
                        }
                    })
                    .window(SlidingEventTimeWindows.of(
    //                        Time.hours(1), Time.minutes(5)
                            Time.seconds(10),Time.seconds(5)
                    ))
                    .allowedLateness(Time.hours(1))
                    .sideOutputLateData(lateOutputUserBehavior)
                    //增量计算用户点击数量
                    .aggregate(new UserBehavorCountAggregateUtils(), new UserBehavorCountWindowFunction());
    
            aggregateUserClick.print();
            //迟到数据   迟到数据不会触发窗口  存入数据库
            AsyncDataStream.unorderedWait(aggregateUserClick.getSideOutput(lateOutputUserBehavior), new AsyncInsertUserBehaviorToMysql(), 1000, TimeUnit.MICROSECONDS, 10);
    
        //输入到redis中   rank:click
            FlinkJedisPoolConfig redis = new FlinkJedisPoolConfig.Builder().setDatabase(1).setHost("192.168.219.128").setPort(6379).setPassword("redis").build();
            aggregateUserClick.addSink(new RedisSink<>(redis,new UserBehaviorRedisMapper()));
            env.execute("userClickBehaviorRank");
        }
    

    使用阿里巴巴提供的 异步io访问数据库,将用户行为数据存入数据库

    /**
     * 异步将用户行为数据插入mysql
     */
    public class AsyncInsertUserBehaviorToMysql extends RichAsyncFunction<UserBehavingInfo, Integer> {
    
        Logger logger = LoggerFactory.getLogger(AsyncInsertUserBehaviorToMysql.class);
    
        //创建mybatis 会话工厂
        private transient SqlSession sqlSession ;
        /**
         * open 方法中初始化链接
         *
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("async function for mysql java open ..."+Thread.currentThread().getName());
            super.open(parameters);
            sqlSession =  MybatisSessionFactory.getSqlSessionFactory().openSession();
        }
    
        /**
         * use asyncUser.getId async get asyncUser phone
         *
         * @param asyncUser
         * @param resultFuture
         * @throws Exception
         */
        @Override
        public void asyncInvoke(UserBehavingInfo asyncUser, ResultFuture<Integer> resultFuture) throws Exception {
            Integer insertNum = 0;
            try{
    
                UserBehaviorDetailsMapper mapper = sqlSession.getMapper(UserBehaviorDetailsMapper.class);
                 insertNum = mapper.insertUserBehavior(asyncUser);
                sqlSession.commit();
                System.out.println("插入数据库"+insertNum);
            }catch (Exception throwable){
                sqlSession.rollback();
                System.out.println("异常回滚"+ throwable);
            }finally {
                // 一定要记得放回 resultFuture,不然数据全部是timeout 的
                resultFuture.complete(Collections.singletonList(insertNum));
            }
        }
    
    
        /**
         * close function
         *
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            logger.info("async function for mysql java close ...");
            //关闭会话,释放资源
            sqlSession.close();
            super.close();
        }
    
    
    }
    
    

    增量计算用户点击行为数量

    public  class UserBehavorCountAggregateUtils implements AggregateFunction<UserBehavingInfo, Integer, Integer> {
    
        @Override
        public Integer createAccumulator() {
            return 0;
        }
    
        //一条数据执行一次
        @Override
        public Integer add(UserBehavingInfo UserBehavingInfo, Integer integer) {
            return integer + 1;
        }
    
        //窗口结束执行一次
        @Override
        public Integer getResult(Integer integer) {
            return integer;
        }
    
        @Override
        public Integer merge(Integer integer, Integer acc1) {
            return integer+acc1;
        }
    
    }
    
    

    窗口方法 组成二元组

    public class UserBehavorCountWindowFunction extends ProcessWindowFunction<Integer, Tuple2<String, Integer>, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<Integer> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
            collector.collect(new Tuple2<String, Integer>(key,iterable.iterator().next()));
        }
    
    }
    

    将商品点击信息二元组存入redis zset类型

    public  class UserBehaviorRedisMapper implements RedisMapper<Tuple2<String, Integer>> {
    
        //设置redis 命令
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.ZADD,"rank:click");
        }
    
        //从数据中获取key
        @Override
        public String getKeyFromData(Tuple2<String, Integer> stringEventDetailsTuple2) {
            return stringEventDetailsTuple2.f0;
        }
        //从数据中获取value
        @Override
        public String getValueFromData(Tuple2<String, Integer> stringEventDetailsTuple2) {
            return String.valueOf(stringEventDetailsTuple2.f1);
        }
    }
    

    这是我flink集成mybatis的配置

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
            "http://mybatis.org/dtd/mybatis-3-config.dtd">
    <configuration>
        <settings>
            <setting name="defaultExecutorType" value="BATCH" />
        </settings>
        <environments default="development">
            <environment id="development">
                <transactionManager type="JDBC" />
                <dataSource type="config.DruidDataSourceFactory">
                    <property name="driver" value="com.mysql.jdbc.Driver" />
                    <property name="url" value="jdbc:mysql://127.0.0.1:3306/risk_control?useSSL=false&amp;characterEncoding=utf8&amp;serverTimezone=GMT%28&amp;allowPublicKeyRetrieval=true" />
                    <property name="username" value="root" />
                    <property name="password" value="root" />
                </dataSource>
            </environment>
        </environments>
        <mappers>
            <mapper resource="mapper/EventDetailsMapper.xml" />
            <mapper resource="mapper/UserBehaviorDetailsMapper.xml" />
        </mappers>
    </configuration>
    
  • 相关阅读:
    python的logging库
    python的os库
    python的setup和teardown
    CF339D Xenia and Bit Operations线段树
    poj3311Hie with the Pie状压dp
    poj3254Corn Fields状压Dp
    CF414BMashmokh and ACMDP
    母函数6连杀
    母函数hdu1085
    UVA 1401Remember the WordDp
  • 原文地址:https://www.cnblogs.com/cg14/p/13182900.html
Copyright © 2011-2022 走看看