zoukankan      html  css  js  c++  java
  • Flink实例(十九):FLINK 异步IO (四)实例 (二) MySQL

    业务如下:

    接收kafka数据,转为user对象,调用async,使用user.id 查询对应的phone,放回user对象,输出

     主类:

    复制代码
    import com.alibaba.fastjson.JSON;
    import com.venn.common.Common;
    import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
    import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import java.util.concurrent.TimeUnit;
    
    
    public class AsyncMysqlRequest {
    
        public static void main(String[] args) throws Exception {
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            FlinkKafkaConsumer<ObjectNode> source = new FlinkKafkaConsumer<>("async", new JsonNodeDeserializationSchema(), Common.getProp());
    
            // 接收kafka数据,转为User 对象
            DataStream<User> input = env.addSource(source).map(value -> {
                String id = value.get("id").asText();
                String username = value.get("username").asText();
                String password = value.get("password").asText();
    
                return new User(id, username, password);
            });
            // 异步IO 获取mysql数据, timeout 时间 1s,容量 10(超过10个请求,会反压上游节点)
            DataStream async = AsyncDataStream.unorderedWait(input, new AsyncFunctionForMysqlJava(), 1000, TimeUnit.MICROSECONDS, 10);
    
            async.map(user -> {
    
                return JSON.toJSON(user).toString();
            })
            .print();
    
            env.execute("asyncForMysql");
    
        }
    }
    复制代码

    函数类:

    复制代码
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.*;
    
    public class AsyncFunctionForMysqlJava extends RichAsyncFunction<AsyncUser, AsyncUser> {
    
    
        Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);
        private transient MysqlClient client;
        private transient ExecutorService executorService;
    
        /**
         * open 方法中初始化链接
         *
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            logger.info("async function for mysql java open ...");
            super.open(parameters);
    
            client = new MysqlClient();
            executorService = Executors.newFixedThreadPool(30);
        }
    
        /**
         * use asyncUser.getId async get asyncUser phone
         *
         * @param asyncUser
         * @param resultFuture
         * @throws Exception
         */
        @Override
        public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {
    
            executorService.submit(() -> {
                // submit query
                System.out.println("submit query : " + asyncUser.getId() + "-1-" + System.currentTimeMillis());
                AsyncUser tmp = client.query1(asyncUser);
                // 一定要记得放回 resultFuture,不然数据全部是timeout 的
                resultFuture.complete(Collections.singletonList(tmp));
            });
        }
    
        @Override
        public void timeout(AsyncUser input, ResultFuture<AsyncUser> resultFuture) throws Exception {
            logger.warn("Async function for hbase timeout");
            List<AsyncUser> list = new ArrayList();
            input.setPhone("timeout");
            list.add(input);
            resultFuture.complete(list);
        }
    
        /**
         * close function
         *
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            logger.info("async function for mysql java close ...");
            super.close();
        }
    }
    复制代码

    MysqlClient:

    复制代码
    import com.venn.flink.util.MathUtil;
    import org.apache.flink.shaded.netty4.io.netty.channel.DefaultEventLoop;
    import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
    import org.apache.flink.shaded.netty4.io.netty.util.concurrent.SucceededFuture;
    
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    public class MysqlClient {
    
        private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true";
        private static String username = "root";
        private static String password = "123456";
        private static String driverName = "com.mysql.jdbc.Driver";
        private static java.sql.Connection conn;
        private static PreparedStatement ps;
    
        static {
            try {
                Class.forName(driverName);
                conn = DriverManager.getConnection(jdbcUrl, username, password);
                ps = conn.prepareStatement("select phone from async.async_test where id = ?");
            } catch (ClassNotFoundException | SQLException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * execute query
         *
         * @param user
         * @return
         */
        public AsyncUser query1(AsyncUser user) {
    
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            String phone = "0000";
            try {
                ps.setString(1, user.getId());
                ResultSet rs = ps.executeQuery();
                if (!rs.isClosed() && rs.next()) {
                    phone = rs.getString(1);
                }
                System.out.println("execute query : " + user.getId() + "-2-" + "phone : " + phone + "-" + System.currentTimeMillis());
            } catch (SQLException e) {
                e.printStackTrace();
            }
            user.setPhone(phone);
            return user;
    
        }
    
        // 测试代码
        public static void main(String[] args) {
            MysqlClient mysqlClient = new MysqlClient();
    
            AsyncUser asyncUser = new AsyncUser();
            asyncUser.setId("526");
            long start = System.currentTimeMillis();
            asyncUser = mysqlClient.query1(asyncUser);
    
            System.out.println("end : " + (System.currentTimeMillis() - start));
            System.out.println(asyncUser.toString());
        }
    }
    复制代码

    函数类(错误示范:asyncInvoke 方法中阻塞查询数据库,是同步的):

    复制代码
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.util.ArrayList;
    import java.util.List;
    
    public class AsyncFunctionForMysqlJava extends RichAsyncFunction<User, User> {
    
        // 链接
        private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false";
        private static String username = "root";
        private static String password = "123456";
        private static String driverName = "com.mysql.jdbc.Driver";
    
    
        java.sql.Connection conn;
        PreparedStatement ps;
        Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);
    
        /**
         * open 方法中初始化链接
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            logger.info("async function for hbase java open ...");
            super.open(parameters);
    
            Class.forName(driverName);
            conn = DriverManager.getConnection(jdbcUrl, username, password);
            ps = conn.prepareStatement("select phone from async.async_test where id = ?");
        }
    
        /**
         * use user.getId async get user phone
         *
         * @param user
         * @param resultFuture
         * @throws Exception
         */
        @Override
        public void asyncInvoke(User user, ResultFuture<User> resultFuture) throws Exception {
            // 使用 user id 查询
            ps.setString(1, user.getId());
            ResultSet rs = ps.executeQuery();
            String phone = null;
            if (rs.next()) {
                phone = rs.getString(1);
            }
            user.setPhone(phone);
            List<User> list = new ArrayList();
            list.add(user);
            // 放回 result 队列
            resultFuture.complete(list);
        }
    
        @Override
        public void timeout(User input, ResultFuture<User> resultFuture) throws Exception {
            logger.info("Async function for hbase timeout");
            List<User> list = new ArrayList();
            list.add(input);
            resultFuture.complete(list);
        }
    
        /**
         * close function
         *
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            logger.info("async function for hbase java close ...");
            super.close();
            conn.close();
        }
    }
    复制代码

    测试数据如下:

    复制代码
    {"id" : 1, "username" : "venn", "password" : 1561709530935}
    {"id" : 2, "username" : "venn", "password" : 1561709536029}
    {"id" : 3, "username" : "venn", "password" : 1561709541033}
    {"id" : 4, "username" : "venn", "password" : 1561709546037}
    {"id" : 5, "username" : "venn", "password" : 1561709551040}
    {"id" : 6, "username" : "venn", "password" : 1561709556044}
    {"id" : 7, "username" : "venn", "password" : 1561709561048}
    复制代码

    执行结果如下:

    复制代码
    submit query : 1-1-1562763486845
    submit query : 2-1-1562763486846
    submit query : 3-1-1562763486846
    submit query : 4-1-1562763486849
    submit query : 5-1-1562763486849
    submit query : 6-1-1562763486859
    submit query : 7-1-1562763486913
    submit query : 8-1-1562763486967
    submit query : 9-1-1562763487021
    execute query : 1-2-phone : 12345678910-1562763487316
    1> {"password":"1562763486506","phone":"12345678910","id":"1","username":"venn"}  
    submit query : 10-1-1562763487408
    submit query : 11-1-1562763487408
    execute query : 9-2-phone : 1562661110630-1562763487633
    1> {"password":"1562763487017","phone":"1562661110630","id":"9","username":"venn"}  # 这里可以看到异步,提交查询的到 11 了,执行查询 的只有  1/9,返回了 1/9(unorderedWait 调用)
    submit query : 12-1-1562763487634
    execute query : 8-2-phone : 1562661110627-1562763487932
    1> {"password":"1562763486963","phone":"1562661110627","id":"8","username":"venn"}
    submit query : 13-1-1562763487933
    execute query : 7-2-phone : 1562661110624-1562763488228
    1> {"password":"1562763486909","phone":"1562661110624","id":"7","username":"venn"}
    submit query : 14-1-1562763488230
    execute query : 6-2-phone : 1562661110622-1562763488526
    1> {"password":"1562763486855","phone":"1562661110622","id":"6","username":"venn"}
    submit query : 15-1-1562763488527
    execute query : 4-2-phone : 12345678913-1562763488832
    1> {"password":"1562763486748","phone":"12345678913","id":"4","username":"venn"}
    复制代码

    hbase、redis或其他实现类似

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13779645.html

  • 相关阅读:
    slf4j绑定log4j失败
    [转]activiti5用户任务分配
    关于ajax提交的公共接口的一大用处
    jQuery插件开发方式
    centos7安装mysql
    Centos7安装JDK
    奇葩问题:spring+mybaits项目突然出现其中一些Mapper类找不到
    JAVA多线程下,获取递增的序列号
    库存扣减的流水账记录问题
    My97DatePicker使用的问题
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13779645.html
Copyright © 2011-2022 走看看