zoukankan      html  css  js  c++  java
  • flink 流式处理中如何集成mybatis框架

    flink 中自身虽然实现了大量的connectors,如下图所示,也实现了jdbc的connector,可以通过jdbc 去操作数据库,但是flink-jdbc包中对数据库的操作是以ROW来操作并且对数据库事务的控制比较死板,有时候操作关系型数据库我们会非常怀念在java web应用开发中的非常优秀的mybatis框架,那么其实flink中是可以自己集成mybatis进来的。 我们这里以flink 1.9版本为例来进行集成。

    如下图为flink内部自带的flink-jdbc:

     

    创建一个flink的流式处理项目,引入flink的maven依赖和mybatis依赖(注意这里引入的是非spring版本,也就是mybatis的单机版):

    <properties>
    
    <flink.version>1.9.0</flink.version>
    </properties>
    <!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
    <dependency>
        <groupId>org.mybatis</groupId>
        <artifactId>mybatis</artifactId>
        <version>3.5.2</version>
    </dependency>
    <!-- flink java 包 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    maven依赖引入以后,那么需要在resources下面定义mybatis-config.xml 配置:

    mybatis-config.xml 需要定义如下配置:

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
            "http://mybatis.org/dtd/mybatis-3-config.dtd">
    <configuration>
        <typeAliases>
            <typeAlias alias="BankBillPublic" type="xxxx.xx.xx.BankBillPublic" />
        </typeAliases>
        <environments default="development">
            <environment id="development">
                <transactionManager type="JDBC" />
                <dataSource type="POOLED">
                    <property name="driver" value="com.mysql.jdbc.Driver" />
                    <property name="url" value="jdbc:mysql://xx.xx.xx.xx:3306/hue?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&autoReconnect=true" />
                    <property name="username" value="xxxx" />
                    <property name="password" value="xxxx*123%" />
                </dataSource>
            </environment>
        </environments>
        <mappers>
            <mapper resource="mapper/xxxxxMapper.xml" />
        </mappers>
    </configuration>

    typeAlias 标签中为自定义的数据类型,然后在xxxxxMapper.xml 中parameterType或者resultType就可以直接用这种定义的数据类型。

    dataSource type="POOLED" 我们使用的是mybatis中的POOLED 类型,也就是连接池的方式去使用。默认支持如下这三种类型。

     我们也可以使用阿里巴巴开源的druid连接池,那么就需要引入对应的maven依赖,如下所示:

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.0.14</version>
            </dependency>  

     然后定义一个对应的druid的DataSource,如下所示:

    import java.sql.SQLException;
    import java.util.Properties;
    import javax.sql.DataSource;
    import org.apache.ibatis.datasource.DataSourceFactory;
    import com.alibaba.druid.pool.DruidDataSource;
    
    public class DruidDataSourceFactory implements DataSourceFactory {
        private Properties props;
    
        @Override
        public DataSource getDataSource() {
            DruidDataSource dds = new DruidDataSource();
            dds.setDriverClassName(this.props.getProperty("driver"));
            dds.setUrl(this.props.getProperty("url"));
            dds.setUsername(this.props.getProperty("username"));
            dds.setPassword(this.props.getProperty("password"));
            // 其他配置可以根据MyBatis主配置文件进行配置
            try {
                dds.init();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return dds;
        }
    
        @Override
        public void setProperties(Properties props) {
            this.props = props;
        }
    }

    之后就可以mybatis的配置中使用了,如下所示:

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
            "http://mybatis.org/dtd/mybatis-3-config.dtd">
    <configuration>
        <typeAliases>
            <typeAlias alias="BankBillPublic" type="xxxx.xx.xx.BankBillPublic" />
            <typeAlias alias="DRUID" 
     type="com.xx.mybatis.druid.utils.DruidDataSourceFactory" />
        </typeAliases>
        <environments default="development">
            <environment id="development">
                <transactionManager type="JDBC" />
                <dataSource type="DRUID">
                    <property name="driver" value="com.mysql.jdbc.Driver" />
                    <property name="url" value="jdbc:mysql://xx.xx.xx.xx:3306/hue?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&autoReconnect=true" />
                    <property name="username" value="xxxx" />
                    <property name="password" value="xxxx*123%" />
                </dataSource>
            </environment>
        </environments>
        <mappers>
            <mapper resource="mapper/xxxxxMapper.xml" />
        </mappers>
    </configuration>

    <mappers> 下面为定义的mybatis 的xxxxxMapper文件。里面放置的都是sql语句。

    本文作者张永清,转载请注明出处:flink 流式处理中如何集成mybatis框架

    xxxxxMapper.xml 中的sql示例:

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
            "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
    <mapper namespace="xx.xx.bigdata.flink.xx.xx.mapper.UserRelaInfoMapper">
        <!--查询关键字匹配 -->
        <select id="queryUserRelaInfo" parameterType="String" resultType="UserRelaInfo">
            SELECT id AS id,
            USER_NAME AS userName,
            APPL_IDCARD AS applIdCard,
            PEER_USER AS peerUser,
            RELA_TYPE AS relaType,
            CREATE_USER AS createUser,
            CREATE_TIME AS createTime
            FROM USER_RELA_INFO
            <where>
                <if test="applIdCard != null">
                    APPL_IDCARD=#{applIdCard}
                </if>
                <if test="peerUser != null">
                AND PEER_USER=#{peerUser}
                </if>
            </where>
        </select>
    </mapper>

     定义Mapper,一般可以定义一个interface ,和xxxxxMapper.xml中的namespace保持一致

    注意传入的参数一般加上@Param 注解,传入的参数和xxxxxMapper.xml中需要的参数保持一致

    public interface UserRelaInfoMapper {
        List<UserRelaInfo> queryUserRelaInfo(@Param("applIdCard")String applIdCard,@Param("peerUser") String peerUser);
    }

    定义SessionFactory工厂(单例模式):

    /**
     *
     *  sqlsession factory 单例  事务设置为手动提交
     */
    public class MybatisSessionFactory {
        private static final Logger LOG = LoggerFactory.getLogger(MybatisSessionFactory.class);
        private static SqlSessionFactory sqlSessionFactory;
        private MybatisSessionFactory(){
            super();
        }
        public synchronized static SqlSessionFactory getSqlSessionFactory(){
            if(null==sqlSessionFactory){
                InputStream inputStream=null;
                try{
                    inputStream = MybatisSessionFactory.class.getClassLoader().getResourceAsStream("mybatis-config.xml");
                    sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
                }
                catch (Exception e){
                    LOG.error("create MybatisSessionFactory read mybatis-config.xml cause Exception",e);
                }
                if(null!=sqlSessionFactory){
                    LOG.info("get Mybatis sqlsession sucessed....");
                }
                else {
                    LOG.info("get Mybatis sqlsession failed....");
                }
            }
            return sqlSessionFactory;
        }
    }
    

      

    使用mybatis 对数据库进行操作:

            SqlSession sqlSession = MybatisSessionFactory.getSqlSessionFactory().openSession();
            UserRelaInfoMapper  userRelaInfoMapper  = sqlSession.getMapper(UserRelaInfoMapper .class);
    		//调用对应的方法
    		userRelaInfoMapper.xxxx();
    		//提交事务
    		sqlSession.commit();
    		//回滚事务,一般可以捕获异常,在发生Exception的时候,事务进行回滚
    		sqlSession.rollback();
    		
    		
    		
    

    这里以mysql为示例,写一个flink下mysql的sink示例,可以自己来灵活控制事务的提交:

    public class MysqlSinkFunction<IN> extends RichSinkFunction {
        private static final Logger LOG = LoggerFactory.getLogger(MysqlSinkFunction.class);
        @Override
        public void invoke(Object value, Context context) throws Exception{
            SqlSession sqlSession = MybatisSessionFactory.getSqlSessionFactory().openSession();
            try{
                                //插入
                                LOG.info("MysqlSinkFunction start to do insert data...");
                                xxx.xxx();
    							//更新
                                LOG.info("MysqlSinkFunction start to do update data...");
    							xxx.xxx();
                                //删除
                                LOG.info("MysqlSinkFunction start to do delete data...");
    							xxx.xxx();
    
                        
                    
                    sqlSession.commit();
                    LOG.info("MysqlSinkFunction commit transaction success...");
            }
            catch (Throwable e){
                sqlSession.rollback();
                LOG.error("MysqlSinkFunction cause Exception,sqlSession transaction rollback...",e);
            }
        }
    }  
    相信您如果以前在spring中用过mybatis的话,对上面的这些操作一定不会陌生。由此你也可以发现,在大数据中可以完美的集成mybatis,这样可以发挥mybatis框架对数据库操作的优势,使用起来也非常简单方便。
    一旦集成了mybaitis后,在flink中就可以方便的对各种各样的关系型数据库进行操作了。

    本文作者张永清,转载请注明出处:flink 流式处理中如何集成mybatis框架
  • 相关阅读:
    Junit单元测试
    win7的6个网络命令
    WOJ1024 (POJ1985+POJ2631) Exploration 树/BFS
    WOJ1022 Competition of Programming 贪心 WOJ1023 Division dp
    woj1019 Curriculum Schedule 输入输出 woj1020 Adjacent Difference 排序
    woj1018(HDU4384)KING KONG 循环群
    woj1016 cherry blossom woj1017 Billiard ball 几何
    woj1013 Barcelet 字符串 woj1014 Doraemon's Flashlight 几何
    woj1012 Thingk and Count DP好题
    woj1010 alternate sum 数学 woj1011 Finding Teamates 数学
  • 原文地址:https://www.cnblogs.com/laoqing/p/11890929.html
Copyright © 2011-2022 走看看