zoukankan      html  css  js  c++  java
  • Flink接收RabbitMQ数据写入到Oracle

    文件内容

    FlinkMain.java

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
    import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
    
    public class FlinkMain
    {
        public static void main(String[] args) throws Exception
        {
            // 1,执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2,RabbitMQ配置
            RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                    .setHost("192.168.1.3")
                    .setPort(5673)
                    .setUserName("panfeng")
                    .setPassword("panfeng")
                    .setVirtualHost("/panfeng")
                    .build();
    
            // 3,添加资源
            DataStreamSource<String> dataStreamSource = env.addSource(new RMQSource<String>(
                    connectionConfig,
                    "flink",
                    true,
                    new SimpleStringSchema()));
    
            // 4,添加到流,去执行接收到的数据进行入库
            dataStreamSource.addSink(new SinkOracle());
    
            // 5,执行工作,定义一个工作名称
            env.execute("rabbitmq flink oracle");
        }
    }
    

    SinkOracle.java

    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    
    public class SinkOracle extends RichSinkFunction<String>
    {
        private Connection connection;
        private PreparedStatement statement;
    
        // 1,初始化
        @Override
        public void open(Configuration parameters) throws Exception
        {
            super.open(parameters);
            Class.forName("oracle.jdbc.OracleDriver");
            connection = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "scott", "123456");
            statement = connection.prepareStatement("INSERT INTO FLINK VALUES (SEQ_FLINK.NEXTVAL,?)");
        }
    
        // 2,执行
        @Override
        public void invoke(String value, Context context) throws Exception
        {
            System.out.println("value.toString()-------" + value.toString());
            statement.setString(1, value);
            statement.execute();
        }
    
        // 3,关闭
        @Override
        public void close() throws Exception
        {
            super.close();
            if (statement != null)
                statement.close();
            if (connection != null)
                connection.close();
        }
    }
    
    

    pom.xml

    <dependencies>
            <!--flink-java-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.9.0</version>
            </dependency>
    
            <!--flink-streaming-java_2.11-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.9.0</version>
            </dependency>
    
            <!--flink-connector-rabbitmq_2.11-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-rabbitmq_2.11</artifactId>
                <version>1.9.0</version>
            </dependency>
    
            <!--Oracle-->
            <dependency>
                <groupId>com.oracle</groupId>
                <artifactId>ojdbc8</artifactId>
                <version>12.2.0.1.0</version>
            </dependency>
        </dependencies>
    

    测试步骤

    执行 Flink.java中的主方法,往对应队列中传入数据,可以输入到控制台

    • 如果想把配置信息写文件application.properties的话
    配置文件内容
    
    db.driver=oracle.jdbc.OracleDriver
    db.url=jdbc:oracle:thin:@10.18.20.180:1521:MUDATA
    db.username=MD_REF
    db.password=MD_REF_2018
    

    rmq.host=10.18.20.13
    rmq.port=5672
    rmq.username=camel
    rmq.password=camel123
    rmq.vhost=reference
    rmq.exchanges=ref.muservice.input
    rmq.queue.airport=two.airport.muservice.input
    rmq.queue.city=two.city.muservice.input
    rmq.queue.country=two.country.muservice.input

    读取RabbitMQ
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
    import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
    

    public class CountryFlinkMain
    {
    public static void main(String[] args) throws Exception
    {
    // 1,执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2,读取 country.properties 配置
        ParameterTool pt=ParameterTool.fromPropertiesFile("flink-crew-assign/src/main/resources/country.properties");
    
        // 3,RabbitMQ配置
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost(pt.get("rmq.host"))
                .setPort(Integer.parseInt(pt.get("rmq.port")))
                .setUserName(pt.get("rmq.username"))
                .setPassword(pt.get("rmq.password"))
                .setVirtualHost(pt.get("rmq.vhost"))
                .build();
    
        // 4,添加资源,RMQSource(OUT)
        DataStreamSource<String> dataStreamSource = env.addSource(new RMQSource<String>(
                connectionConfig,
                pt.get("rmq.queue.country"),// 国家
                true,
                new SimpleStringSchema()));
    
        // 5,添加到流,去执行接收到的数据进行入库,addSink(IN)
        dataStreamSource.addSink(new CountrySinkOracle());
        // 6,执行工作,定义一个工作名称
        env.execute("rabbitmq flink oracle");
    }
    

    }

    读取数据库
    
    public class CountrySinkOracle extends RichSinkFunction
    {
        private Connection conn;
        private PreparedStatement statement;
    
    // 1,初始化
    @Override
    public void open(Configuration parameters) throws Exception
    {
        super.open(parameters);
        ParameterTool pt = ParameterTool.fromPropertiesFile("flink-crew-assign/src/main/resources/country.properties");
        Class.forName(pt.get("db.driver"));
        conn = DriverManager.getConnection(pt.get("db.url"), pt.get("db.username"), pt.get("db.password"));
    }
    

  • 相关阅读:
    5个最佳WordPress通知栏插件
    最新lombok插件和IDEA2020.1不兼容,Plugin "Lombok" is incompatible (until build 193.SNAPSHOT < IU-201.6668....
    nuxt中localstorage的替代方案
    nuxt或者vue,axios中如何发送多个请求
    wordpress nginx详细环境配置安装命令和相关问题解决
    [no_perms] Private mode enable, only admin can publish this module
    vue bootstrap中modal对话框不显示遮挡打不开
    vue监听当前页面的地址变化/路由变化
    来看看JDK13的81个新特性和API
    Unable to find a @SpringBootConfiguration, you need to use @ContextConfiguration or @SpringBootTest(classes=...) with your test java.lang.IllegalStateException
  • 原文地址:https://www.cnblogs.com/taopanfeng/p/11684912.html
Copyright © 2011-2022 走看看