zoukankan      html  css  js  c++  java
  • Flink接收RabbitMQ数据写入到Oracle

    文件内容

    FlinkMain.java

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
    import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
    
    public class FlinkMain
    {
        public static void main(String[] args) throws Exception
        {
            // 1,执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2,RabbitMQ配置
            RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                    .setHost("192.168.1.3")
                    .setPort(5673)
                    .setUserName("panfeng")
                    .setPassword("panfeng")
                    .setVirtualHost("/panfeng")
                    .build();
    
            // 3,添加资源
            DataStreamSource<String> dataStreamSource = env.addSource(new RMQSource<String>(
                    connectionConfig,
                    "flink",
                    true,
                    new SimpleStringSchema()));
    
            // 4,添加到流,去执行接收到的数据进行入库
            dataStreamSource.addSink(new SinkOracle());
    
            // 5,执行工作,定义一个工作名称
            env.execute("rabbitmq flink oracle");
        }
    }
    

    SinkOracle.java

    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    
    public class SinkOracle extends RichSinkFunction<String>
    {
        private Connection connection;
        private PreparedStatement statement;
    
        // 1,初始化
        @Override
        public void open(Configuration parameters) throws Exception
        {
            super.open(parameters);
            Class.forName("oracle.jdbc.OracleDriver");
            connection = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "scott", "123456");
            statement = connection.prepareStatement("INSERT INTO FLINK VALUES (SEQ_FLINK.NEXTVAL,?)");
        }
    
        // 2,执行
        @Override
        public void invoke(String value, Context context) throws Exception
        {
            System.out.println("value.toString()-------" + value.toString());
            statement.setString(1, value);
            statement.execute();
        }
    
        // 3,关闭
        @Override
        public void close() throws Exception
        {
            super.close();
            if (statement != null)
                statement.close();
            if (connection != null)
                connection.close();
        }
    }
    
    

    pom.xml

    <dependencies>
            <!--flink-java-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.9.0</version>
            </dependency>
    
            <!--flink-streaming-java_2.11-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.9.0</version>
            </dependency>
    
            <!--flink-connector-rabbitmq_2.11-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-rabbitmq_2.11</artifactId>
                <version>1.9.0</version>
            </dependency>
    
            <!--Oracle-->
            <dependency>
                <groupId>com.oracle</groupId>
                <artifactId>ojdbc8</artifactId>
                <version>12.2.0.1.0</version>
            </dependency>
        </dependencies>
    

    测试步骤

    执行 Flink.java中的主方法,往对应队列中传入数据,可以输入到控制台

    • 如果想把配置信息写文件application.properties的话
    配置文件内容
    
    db.driver=oracle.jdbc.OracleDriver
    db.url=jdbc:oracle:thin:@10.18.20.180:1521:MUDATA
    db.username=MD_REF
    db.password=MD_REF_2018
    

    rmq.host=10.18.20.13
    rmq.port=5672
    rmq.username=camel
    rmq.password=camel123
    rmq.vhost=reference
    rmq.exchanges=ref.muservice.input
    rmq.queue.airport=two.airport.muservice.input
    rmq.queue.city=two.city.muservice.input
    rmq.queue.country=two.country.muservice.input

    读取RabbitMQ
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
    import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
    

    public class CountryFlinkMain
    {
    public static void main(String[] args) throws Exception
    {
    // 1,执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2,读取 country.properties 配置
        ParameterTool pt=ParameterTool.fromPropertiesFile("flink-crew-assign/src/main/resources/country.properties");
    
        // 3,RabbitMQ配置
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost(pt.get("rmq.host"))
                .setPort(Integer.parseInt(pt.get("rmq.port")))
                .setUserName(pt.get("rmq.username"))
                .setPassword(pt.get("rmq.password"))
                .setVirtualHost(pt.get("rmq.vhost"))
                .build();
    
        // 4,添加资源,RMQSource(OUT)
        DataStreamSource<String> dataStreamSource = env.addSource(new RMQSource<String>(
                connectionConfig,
                pt.get("rmq.queue.country"),// 国家
                true,
                new SimpleStringSchema()));
    
        // 5,添加到流,去执行接收到的数据进行入库,addSink(IN)
        dataStreamSource.addSink(new CountrySinkOracle());
        // 6,执行工作,定义一个工作名称
        env.execute("rabbitmq flink oracle");
    }
    

    }

    读取数据库
    
    public class CountrySinkOracle extends RichSinkFunction
    {
        private Connection conn;
        private PreparedStatement statement;
    
    // 1,初始化
    @Override
    public void open(Configuration parameters) throws Exception
    {
        super.open(parameters);
        ParameterTool pt = ParameterTool.fromPropertiesFile("flink-crew-assign/src/main/resources/country.properties");
        Class.forName(pt.get("db.driver"));
        conn = DriverManager.getConnection(pt.get("db.url"), pt.get("db.username"), pt.get("db.password"));
    }
    

  • 相关阅读:
    同库 不同表更新
    js 随机时间
    转 vagrant package[打包命令]详解
    mysql 批量更新与批量更新多条记录的不同值实现方法 (转)
    vagrant up时提示 Authentication failure. Retrying
    vagrant 错误记录
    签名保存
    linux下svn命令大全
    linux 搭建SVN服务器,为多个项目分别建立版本库并单独配置权限
    log file sync
  • 原文地址:https://www.cnblogs.com/taopanfeng/p/11684912.html
Copyright © 2011-2022 走看看