zoukankan      html  css  js  c++  java
  • 用beam实现连接kafka和elasticSearch示例 在flink平台运行

    示例实现beam用java编程,监听kafka的testmsg主题,然后将收取到的单词,按5秒做一次统计。结果输出到outputmessage 的kafka主题,同时同步到elasticSearch。

    kafka需要运行

    启动:
    cd /root/kafuka/kafka_2.12-0.11.0.0 nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties & 创建topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testmsg bin/kafka-topics.sh --list --zookeeper localhost:2181 生产者producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 消费者consumer bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    elasticSearch

    创建索引Put http://192.168.11.100:9200/myindex?pretty
    查看所有索引: http://192.168.11.100:9200/_cat/indices?v
    
    获取内容Get http://192.168.11.100:9200/myindex/_search?q=*&pretty
    http://192.168.11.100:9200/myindex/_search?q=*&sort=_id:desc&pretty

    用mvn自动生成项目代码:

    windows在powershell中运行:
     mvn archetype:generate `
     -D archetypeGroupId=org.apache.beam `
     -D archetypeArtifactId=beam-sdks-java-maven-archetypes-examples `
     -D archetypeVersion=2.8.0 `
     -D groupId=org.example `
     -D artifactId=word-count-beam `
     -D version="0.1" `
     -D package=org.apache.beam.examples `
     -D interactiveMode=false
    
    其他参考beam官方文档: <https://beam.apache.org/get-started/quickstart-java/> 

    替换pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <!--
        Licensed to the Apache Software Foundation (ASF) under one or more
        contributor license agreements.  See the NOTICE file distributed with
        this work for additional information regarding copyright ownership.
        The ASF licenses this file to You under the Apache License, Version 2.0
        (the "License"); you may not use this file except in compliance with
        the License.  You may obtain a copy of the License at
    
           http://www.apache.org/licenses/LICENSE-2.0
    
        Unless required by applicable law or agreed to in writing, software
        distributed under the License is distributed on an "AS IS" BASIS,
        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        See the License for the specific language governing permissions and
        limitations under the License.
    -->
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>org.example</groupId>
      <artifactId>word-count-beam</artifactId>
      <version>0.1</version>
    
      <packaging>jar</packaging>
    
      <properties>
        <beam.version>2.8.0</beam.version>
    
        <bigquery.version>v2-rev402-1.24.1</bigquery.version>
        <google-clients.version>1.24.1</google-clients.version>
        <guava.version>20.0</guava.version>
        <hamcrest.version>1.3</hamcrest.version>
        <jackson.version>2.9.5</jackson.version>
        <joda.version>2.4</joda.version>
        <junit.version>4.12</junit.version>
        <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
        <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
        <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
        <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
        <mockito.version>1.10.19</mockito.version>
        <pubsub.version>v1-rev399-1.24.1</pubsub.version>
        <slf4j.version>1.7.25</slf4j.version>
        <spark.version>2.3.2</spark.version>
        <hadoop.version>2.7.3</hadoop.version>
        <maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
        
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <repositories>
        <repository>
          <id>apache.snapshots</id>
          <name>Apache Development Snapshot Repository</name>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
      </repositories>
    
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>${maven-compiler-plugin.version}</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
    
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>${maven-surefire-plugin.version}</version>
            <configuration>
              <parallel>all</parallel>
              <threadCount>4</threadCount>
              <redirectTestOutputToFile>true</redirectTestOutputToFile>
            </configuration>
            <dependencies>
              <dependency>
                <groupId>org.apache.maven.surefire</groupId>
                <artifactId>surefire-junit47</artifactId>
                <version>${maven-surefire-plugin.version}</version>
              </dependency>
            </dependencies>
          </plugin>
    
          <!-- Ensure that the Maven jar plugin runs before the Maven
            shade plugin by listing the plugin higher within the file. -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>${maven-jar-plugin.version}</version>
          </plugin>
    
          <!--
            Configures `mvn package` to produce a bundled jar ("fat jar") for runners
            that require this for job submission to a cluster.
          -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>${maven-shade-plugin.version}</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>shade</goal>
                </goals>
                <configuration>
                  <finalName>${project.artifactId}-bundled-${project.version}</finalName>
                  <filters>
                    <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                        <exclude>META-INF/LICENSE</exclude>
                        <exclude>META-INF/*.SF</exclude>
                        <exclude>META-INF/*.DSA</exclude>
                        <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                    </filter>
                  </filters>
                  <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                  </transformers>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
    
        <pluginManagement>
          <plugins>
            <plugin>
              <groupId>org.codehaus.mojo</groupId>
              <artifactId>exec-maven-plugin</artifactId>
              <version>${maven-exec-plugin.version}</version>
              <configuration>
                <cleanupDaemonThreads>false</cleanupDaemonThreads>
              </configuration>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    
      <profiles>
        <profile>
          <id>direct-runner</id>
          <activation>
            <activeByDefault>true</activeByDefault>
          </activation>
          <!-- Makes the DirectRunner available when running a pipeline. -->
          <dependencies>
            <dependency>
              <groupId>org.apache.beam</groupId>
              <artifactId>beam-runners-direct-java</artifactId>
              <version>${beam.version}</version>
              <scope>runtime</scope>
            </dependency>
          </dependencies>
        </profile>
    
        <profile>
          <id>apex-runner</id>
          <!-- Makes the ApexRunner available when running a pipeline. -->
          <dependencies>
            <dependency>
              <groupId>org.apache.beam</groupId>
              <artifactId>beam-runners-apex</artifactId>
              <version>${beam.version}</version>
              <scope>runtime</scope>
            </dependency>
            <!--
              Apex depends on httpclient version 4.3.6, project has a transitive dependency to httpclient 4.0.1 from
              google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
              can be removed when the project no longer has a dependency on a different httpclient version.
            -->
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
              <version>4.3.6</version>
              <scope>runtime</scope>
              <exclusions>
                <exclusion>
                  <groupId>commons-codec</groupId>
                  <artifactId>commons-codec</artifactId>
                </exclusion>
              </exclusions>
            </dependency>
            <!--
              Apex 3.6 is built against YARN 2.6. Version in the fat jar has to match
              what's on the cluster, hence we need to repeat the Apex Hadoop dependencies here.
            -->
            <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-yarn-client</artifactId>
              <version>${hadoop.version}</version>
              <scope>runtime</scope>
            </dependency>
            <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-common</artifactId>
              <version>${hadoop.version}</version>
              <scope>runtime</scope>
            </dependency>
          </dependencies>
        </profile>
    
        <profile>
          <id>dataflow-runner</id>
          <!-- Makes the DataflowRunner available when running a pipeline. -->
          <dependencies>
            <dependency>
              <groupId>org.apache.beam</groupId>
              <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
              <version>${beam.version}</version>
              <scope>runtime</scope>
            </dependency>
          </dependencies>
        </profile>
    
        <profile>
          <id>flink-runner</id>
          <!-- Makes the FlinkRunner available when running a pipeline. -->
          <dependencies>
            <dependency>
              <groupId>org.apache.beam</groupId>
              <artifactId>beam-runners-flink_2.11</artifactId>
              <version>${beam.version}</version>
              <scope>runtime</scope>
            </dependency>
          </dependencies>
        </profile>
    
        <profile>
          <id>spark-runner</id>
          <!-- Makes the SparkRunner available when running a pipeline. Additionally,
               overrides some Spark dependencies to Beam-compatible versions. -->
          <properties>
            <netty.version>4.1.17.Final</netty.version>
          </properties>
          <dependencies>
            <dependency>
              <groupId>org.apache.beam</groupId>
              <artifactId>beam-runners-spark</artifactId>
              <version>${beam.version}</version>
              <scope>runtime</scope>
            </dependency>
            <dependency>
              <groupId>org.apache.beam</groupId>
              <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
              <version>${beam.version}</version>
              <scope>runtime</scope>
            </dependency>
            <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-streaming_2.11</artifactId>
              <version>${spark.version}</version>
              <scope>runtime</scope>
              <exclusions>
                <exclusion>
                  <groupId>org.slf4j</groupId>
                  <artifactId>jul-to-slf4j</artifactId>
                </exclusion>
              </exclusions>
            </dependency>
            <dependency>
              <groupId>com.fasterxml.jackson.module</groupId>
              <artifactId>jackson-module-scala_2.11</artifactId>
              <version>${jackson.version}</version>
              <scope>runtime</scope>
            </dependency>
            <!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners -->
            <dependency>
              <groupId>org.apache.beam</groupId>
              <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
              <version>${beam.version}</version>
              <exclusions>
                <exclusion>
                  <groupId>io.grpc</groupId>
                  <artifactId>grpc-netty</artifactId>
                </exclusion>
                <exclusion>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-handler</artifactId>
                </exclusion>
              </exclusions>
            </dependency>
          </dependencies>
        </profile>
        <profile>
          <id>gearpump-runner</id>
          <dependencies>
            <dependency>
              <groupId>org.apache.beam</groupId>
              <artifactId>beam-runners-gearpump</artifactId>
              <version>${beam.version}</version>
              <scope>runtime</scope>
            </dependency>
          </dependencies>
        </profile>
      </profiles>
    
      <dependencies>
        <!-- Adds a dependency on the Beam SDK. -->
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-core</artifactId>
          <version>${beam.version}</version>
        </dependency>
    
        <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
          <version>${beam.version}</version>
        </dependency>
    
        <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
        <dependency>
          <groupId>com.google.api-client</groupId>
          <artifactId>google-api-client</artifactId>
          <version>${google-clients.version}</version>
          <exclusions>
            <!-- Exclude an old version of guava that is being pulled
                 in by a transitive dependency of google-api-client -->
            <exclusion>
              <groupId>com.google.guava</groupId>
              <artifactId>guava-jdk5</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
    
        <dependency>
          <groupId>com.google.apis</groupId>
          <artifactId>google-api-services-bigquery</artifactId>
          <version>${bigquery.version}</version>
          <exclusions>
            <!-- Exclude an old version of guava that is being pulled
                 in by a transitive dependency of google-api-client -->
            <exclusion>
              <groupId>com.google.guava</groupId>
              <artifactId>guava-jdk5</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
    
        <dependency>
          <groupId>com.google.http-client</groupId>
          <artifactId>google-http-client</artifactId>
          <version>${google-clients.version}</version>
          <exclusions>
            <!-- Exclude an old version of guava that is being pulled
                 in by a transitive dependency of google-api-client -->
            <exclusion>
              <groupId>com.google.guava</groupId>
              <artifactId>guava-jdk5</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
    
        <dependency>
          <groupId>com.google.apis</groupId>
          <artifactId>google-api-services-pubsub</artifactId>
          <version>${pubsub.version}</version>
          <exclusions>
            <!-- Exclude an old version of guava that is being pulled
                 in by a transitive dependency of google-api-client -->
            <exclusion>
              <groupId>com.google.guava</groupId>
              <artifactId>guava-jdk5</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
    
        <dependency>
          <groupId>joda-time</groupId>
          <artifactId>joda-time</artifactId>
          <version>${joda.version}</version>
        </dependency>
    
        <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>${guava.version}</version>
        </dependency>
    
        <!-- Add slf4j API frontend binding with JUL backend -->
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-jdk14</artifactId>
          <version>${slf4j.version}</version>
          <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
          <scope>runtime</scope>
        </dependency>
    
        <!-- Hamcrest and JUnit are required dependencies of PAssert,
             which is used in the main code of DebuggingWordCount example. -->
        <dependency>
          <groupId>org.hamcrest</groupId>
          <artifactId>hamcrest-core</artifactId>
          <version>${hamcrest.version}</version>
        </dependency>
        
        <dependency>
          <groupId>org.hamcrest</groupId>
          <artifactId>hamcrest-library</artifactId>
          <version>${hamcrest.version}</version>
        </dependency>
    
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>${junit.version}</version>
        </dependency>
    
        <!-- The DirectRunner is needed for unit tests. -->
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-direct-java</artifactId>
          <version>${beam.version}</version>
          <scope>test</scope>
        </dependency>
        
        <dependency>
          <groupId>org.mockito</groupId>
          <artifactId>mockito-core</artifactId>
          <version>${mockito.version}</version>
          <scope>test</scope>
        </dependency>
    
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-kafka</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
    
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
            <version>${beam.version}</version>
        </dependency>
    
        
        
      </dependencies>
    </project>
    View Code

    将如下代码加入java目录 src/main/java/org.apache.beam.examples

    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.beam.examples;
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
    import org.apache.beam.sdk.io.kafka.KafkaIO;
    import org.apache.beam.sdk.io.kafka.KafkaRecord;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.Validation.Required;
    import org.apache.beam.sdk.transforms.Count;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.SimpleFunction;
    import org.apache.beam.sdk.transforms.windowing.FixedWindows;
    import org.apache.beam.sdk.transforms.windowing.Window;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.joda.time.Duration;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.common.collect.ImmutableMap;
    //import org.apache.beam.runners.flink.FlinkRunner;
    
    public class KafkaSample {
    
        public static void main(String[] args) {
            String hosts = "211.100.75.227:9092";// 192.168.1.110:11092,192.168.1.119:11092,192.168.1.120:11092
    
            String sourceTopic = "testmsg";
            // 创建管道工厂
            WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
    
            // 设置相关管道
            Pipeline pipeline = Pipeline.create(options);
    
            // 这里 kV 后说明 kafka 中的 key 和 value 均为 String 类型
            PCollection<KafkaRecord<String, String>> lines = pipeline.apply(KafkaIO.<String,
    
            String> read().withBootstrapServers(hosts)// 必需设置 kafka的服务器地址和端口
                    .withTopic(sourceTopic)// 必需设置要读取的 kafka 的 topic 名称
                    .withKeyDeserializer(StringDeserializer.class)// 必需序列化 key
                    .withValueDeserializer(StringDeserializer.class)
                    // 必需序列化 value
                    .updateConsumerProperties(ImmutableMap.<String, Object> of("auto.offset.reset", "latest")));// 这个属性
                                                                                                                // kafka
                                                                                                                // 最常见的.earliest
            // 为输出的消息类型。或者进行处理后返回的消息类型
            PCollection<String> kafkadata = lines.apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<String, String>, String>() {
                private static final long serialVersionUID = 1L;
    
                @ProcessElement
                public void processElement(ProcessContext ctx) {
                    System.out.println("get from topic:" + ctx.element().getKV());
                    ctx.output(ctx.element().getKV().getValue());// 对kafka收到的消息处理
                }
            }));
            PCollection<String> windowedEvents = kafkadata.apply(Window.<String> into(FixedWindows.of(Duration.standardSeconds(5))));
            PCollection<KV<String, Long>> wordcount = windowedEvents.apply(Count.<String> perElement()); // 统计每一个
                                                                                                            // kafka
                                                                                                            // 消息的
                                                                                                            // Count
            PCollection<String> wordtj = wordcount.apply("ConcatResultKVs", MapElements.via( // 拼接最后的格式化输出(Key
                                                                                                //// Word,Value
                                                                                                //// Count)
                    new SimpleFunction<KV<String, Long>, String>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public String apply(KV<String, Long> input) {
                            System.out.println("key=" + input.getKey());
                            System.out.println("value=" + input.getValue());
                            String ret = " {"" + input.getKey() + "":"" + input.getValue() + ""}";
                            System.out.println(ret);
                            return ret;
                        }
                    }));
    
            /* sink to kafka*/
            wordtj.apply(KafkaIO.<Void, String> write().withBootstrapServers(hosts)// 设置写会
                    // kafka
                    // 的集群配置地址
                    .withTopic("outputmessage")// 设置返回 kafka 的消息主题
                    // .withKeySerializer(StringSerializer.class)// 这里不用设置了,因为上面
                    // Void
                    .withValueSerializer(StringSerializer.class)
                    // Dataflow runner and Spark 兼容, Flink 对 kafka0.11 才支持。我的版本是
                    // 0.10 不兼容
                    // .withEOS(20, "eos-sink-group-id")
                    .values() // 只需要在此写入默认的 key 就行了,默认为 null 值
            ); // 输出结果
    
            /* sink to elasticsearch */
            String[] addresses = { "http://192.168.11.100:9200" };
            wordtj.apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration.create(addresses, "myindex", "testdoc")));
    
            pipeline.run().waitUntilFinish();
        }
    
        public interface WordCountOptions extends PipelineOptions {
    
            /**
             * By default, this example reads from a public dataset containing the
             * text of King Lear. Set this option to choose a different input file
             * or glob.
             */
            @Description("Path of the file to read from")
            @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
            String getInputFile();
    
            void setInputFile(String value);
    
            /** Set this required option to specify where to write the output. */
            @Description("Path of the file to write to")
            @Required
            String getOutput();
    
            void setOutput(String value);
        }
    
        private static final Logger logger = LoggerFactory.getLogger(KafkaSample.class);
    
        /**
         * Options supported by {@link WordCount}.
         *
         * <p>
         * Concept #4: Defining your own configuration options. Here, you can add
         * your own arguments to be processed by the command-line parser, and
         * specify default values for them. You can then access the options values
         * in your pipeline code.
         *
         * <p>
         * Inherits standard configuration options.
         */
        public interface KFOptions extends PipelineOptions {
    
            /**
             * By default, this example reads from a public dataset containing the
             * text of King Lear. Set this option to choose a different input file
             * or glob.
             */
            @Description("Path of the file to read from")
            @Default.String("211.100.75.227:9092")
            String getBrokers();
    
            void setBrokers(String value);
    
        }
    }
    View Code

    修改里面kafka地址,elasticSearch地址。大功告成,可以执行了!

    beam平台直接运行:

    Direct-Local runner
    mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.KafkaSample `
     -D exec.args="--inputFile=pom.xml --output=counts" -P direct-runner

    自启动Flink local平台上运行:

     mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.KafkaSample `
      -D exec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -P flink-runner 

    打包放入已经运行的flink local平台上运行:

    mvn package -Pflink-runner 
    这样可以打包后,上传到flink,指定启动类:
    --runner=FlinkRunner --inputFile=C:path	oquickstartpom.xml --output=C:	mpcounts  --filesToStage=.	argetword-count-beam-bundled-0.1.jar
    
    org.apache.beam.examples.KafkaSample

     


    遇到的问题:

    1,kafka收到的json,普通的可以导入elasticSearch。比如

    {

    “field":"value",

    "filed1:"value"

    }

    但是如果字串里面带有冒号等字符,会报错,后来发现写在一行可以通过。比如

    {"mytime": "2018-12-13T06:44:41.460Z","carColor":"blue"}

    可能和elasticSearch的_bulk批量插入有关。

  • 相关阅读:
    php require与include的区别
    php页面消耗内存过大处理办法
    flex if(a is b){},flex if(a as b){} 意思
    thinkphp的配置文件修改后不起作用
    ThinkPHP 入口文件定义 3.0 输出两次解决办法
    JS中的Math.ceil和Math.floor函数
    ThinkPHP 配置问题
    Google 快照无法使用时,如何恢复
    首页被收录了,但是只显示网址,处理办法
    RED5 端口使用
  • 原文地址:https://www.cnblogs.com/bigben0123/p/10072427.html
Copyright © 2011-2022 走看看