zoukankan      html  css  js  c++  java
  • flink 使用sql实现kafka生产者和消费者

    1.maven依赖

    <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <flink.version>1.11.2</flink.version>
            <logback.version>1.1.7</logback.version>
            <slf4j.version>1.7.25</slf4j.version>
        </properties>
    
        <dependencies>
            <dependency>
                <!-- Used by maven-dependency-plugin -->
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-wikiedits_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>${logback.version}</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>${logback.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.18</version>
            </dependency>
        </dependencies>

    2.生产者

    import com.g2.flink.models.CustomerStatusChangedEvent;
    
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    /**
     * Hello world!
     */
    //@Slf4j
    public class KafkaTableStreamApiProducerTest {
    
        public static void main(String[] args) {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    .inStreamingMode()
                    //.useOldPlanner() // flink
                    .useBlinkPlanner() // blink
                    .build();
            StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
    
            Long baseTimestamp = 1600855709000L;
            DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(
                    new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                    new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                    new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                    new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
            );
    
            String ddl = "CREATE TABLE CustomerStatusChangedEvent(
    " +
                    "customerId int,
    " +
                    "oldStatus int,
    " +
                    "newStatus int,
    " +
                    "eventTime bigint
    " +
                    ") WITH(
    " +
                    "'connector.type'='kafka',
    " +
                    "'connector.version'='universal',
    " +
    
                    "'connector.properties.bootstrap.servers'='192.168.1.85:9092,192.168.1.86:9092,192.168.12.87:9092',
    " +
                    "'connector.topic'='customer_statusChangedEvent',
    " +
                   
                    "'format.type'='json'
    " +
                    ")
    "
                    ;
            tableEnvironment.executeSql(ddl);
    
    
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(3);
                    int status = (int) (System.currentTimeMillis() % 3);
                    String insert = "insert into CustomerStatusChangedEvent(customerId,oldStatus,newStatus,eventTime)" +
                            "values(1001,1," + status + "," + System.currentTimeMillis() + ")";
                    tableEnvironment.executeSql(insert);
                } catch (Exception ex) {
    
                }
            }
    
        }
    }

    3.消费者

    import com.g2.flink.models.CustomerStatusChangedEvent;
    
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * Hello world!
     */
    //@Slf4j
    public class KafkaTableStreamApiConsumerTest {
    
        public static void main(String[] args) {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    .inStreamingMode()
                    //.useOldPlanner() // flink
                    .useBlinkPlanner() // blink
                    .build();
            StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
    
            Long baseTimestamp = 1600855709000L;
            DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(
                    new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                    new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                    new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                    new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
            );
    
            String ddl = "CREATE TABLE CustomerStatusChangedEvent(
    " +
                    "customerId int,
    " +
                    "oldStatus int,
    " +
                    "newStatus int,
    " +
                    "eventTime bigint
    " +
                    ") WITH(
    " +
                    "'connector.type'='kafka',
    " +
                    "'connector.version'='universal',
    " +
                    "'connector.properties.group.id'='g2_group',
    " +
                    "'connector.properties.bootstrap.servers'='192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092',
    " +
                    "'connector.topic'='customer_statusChangedEvent',
    " +
                    "'connector.startup-mode' = 'latest-offset',
    " +
                    "'format.type'='json'
    " +
                    ")
    ";
            tableEnvironment.executeSql(ddl);
    
            Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " +
                    " from CustomerStatusChangedEvent" +
                    " where newStatus in(1,2)"
            );
    
    
        /*
        DataStream<Tuple2<Boolean, Tuple2<Integer, Integer>>> result = tableEnvironment.toRetractStream(resultTb,
                    Types.TUPLE(Types.INT, Types.INT));
    
      */
            DataStream<CustomerStatusLog> result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class);
            result.print();
    
            try {
                env.execute();
            } catch (Exception ex) {
    
            }
        }
    
        public static class CustomerStatusLog {
            private Long customerId;
    
            private Integer status;
    
            public Long getCustomerId() {
                return customerId;
            }
    
            public void setCustomerId(Long customerId) {
                this.customerId = customerId;
            }
    
            public Integer getStatus() {
                return status;
            }
    
            public void setStatus(Integer newStatus) {
                this.status = newStatus;
            }
    
            public CustomerStatusLog() {
    
            }
    
            @Override
            public String toString() {
                return "CustomerStatusLog{" +
                        "customerId=" + customerId +
                        ", status=" + status +
                        '}';
            }
        }
    }

    4.消费者打印

    4> CustomerStatusLog{customerId=1001, status=2}
    4> CustomerStatusLog{customerId=1001, status=1}
    4> CustomerStatusLog{customerId=1001, status=2}
    4> CustomerStatusLog{customerId=1001, status=2}

  • 相关阅读:
    [Intellij] 软件设置和常用快捷键
    [Intellij] Project Structure 配置说明
    [日志log] 常用log日志记录方式对比和详解
    [J2EE]web.xml各个版本模板
    [技术选型] CDH-Cloudera Distribution Hadoop
    [技术选型] dubbo
    [技术选型] spring boot
    [hbase] HBase内置过滤器的一些总结
    [zookeeper] Zookeeper伪分布式集群配置
    [maven] settings 文件 本地maven仓库
  • 原文地址:https://www.cnblogs.com/zhshlimi/p/13725081.html
Copyright © 2011-2022 走看看