zoukankan      html  css  js  c++  java
  • Java连接kafka

    1、maven依赖:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.xxx.test</groupId>
        <artifactId>xxx</artifactId>
        <version>1.0-SNAPSHOT</version>
        <inceptionYear>2008</inceptionYear>
        <properties>
            <scala.version>2.11.12</scala.version>
            <kafka.version>0.10.2.0</kafka.version>
            <!--<kafka.version>1.1.0</kafka.version>-->
        </properties>
    
        <repositories>
            <repository>
                <id>scala-tools.org</id>
                <name>Scala-Tools Maven2 Repository</name>
                <url>http://scala-tools.org/repo-releases</url>
            </repository>
        </repositories>
    
        <pluginRepositories>
            <pluginRepository>
                <id>scala-tools.org</id>
                <name>Scala-Tools Maven2 Repository</name>
                <url>http://scala-tools.org/repo-releases</url>
            </pluginRepository>
        </pluginRepositories>
    
        <dependencies>
    
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.12</artifactId>
                <version>${kafka.version}</version>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka.version}</version>
            </dependency>
    
    
        </dependencies>
    
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <scalaVersion>${scala.version}</scalaVersion>
                        <args>
                            <arg>-target:jvm-1.8</arg>
                        </args>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-eclipse-plugin</artifactId>
                    <configuration>
                        <downloadSources>true</downloadSources>
                        <buildcommands>
                            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
                        </buildcommands>
                        <additionalProjectnatures>
                            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
                        </additionalProjectnatures>
                        <classpathContainers>
                            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
                        </classpathContainers>
                    </configuration>
                </plugin>
    
    
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <appendAssemblyId>false</appendAssemblyId>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <!-- 此处指定main方法入口的class -->
                                <mainClass></mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>assembly</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
        <reporting>
            <plugins>
    
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <configuration>
                        <scalaVersion>${scala.version}</scalaVersion>
                    </configuration>
                </plugin>
    
            </plugins>
        </reporting>
    </project>
    

      

    2、Java代码:

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    
    public class MyConsumer{
    
        private final static String TOPIC = "test_topic";
        private final static String KAFKA_SERVER_URL = "10.31.7.200";
        private final static String KAFKA_SERVER_PORT = "9092";
        public static final String HOST_NAME = KAFKA_SERVER_URL;
    
        public static KafkaConsumer<Integer, String>  getConsumer() {
            KafkaConsumer<Integer, String> consumer;
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            props.put("host.name",HOST_NAME);
    
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
    //        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
    //        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    //        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
    
            consumer = new KafkaConsumer<Integer, String>(props);
            return consumer;
        }
    
        public static void main(String[] args) {
            KafkaConsumer<Integer, String> consumer = getConsumer();
            consumer.subscribe(Collections.singletonList(TOPIC));
            ConsumerRecords<Integer, String> records ;
    
            records = consumer.poll(1000);
    
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
        }
    }
    

     3、可能的报错:

      需要设置 props.put("host.name",ip);
      org.apache.kafka.common.errors.TimeoutException

    这个报错需要检查k/v的序列化类,要求序列化类是org.apache.kafka.common.serialization.Deserializer的子类
    Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition event_topic-0 at offset 161529729
    Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

    1、需要设置 value.deserializer 的值
    Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value.
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:436)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:63)
    at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:426)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
    at com.fumi.test.MyConsumer2.get(MyConsumer2.java:46)
    at com.fumi.test.MyConsumer2.main(MyConsumer2.java:56)

    2、k/v的 序列化类 org.apache.kafka.connect.json.JsonConverter 不是 org.apache.kafka.common.serialization.Deserializer 的子类,所以报错了。
    Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
    at com.fumi.test.MyConsumer2.get(MyConsumer2.java:46)
    at com.fumi.test.MyConsumer2.main(MyConsumer2.java:56)
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonConverter is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:645)
    ... 4 more


    这个报错也是类似
    Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
    at com.fumi.test.MyConsumer2.get(MyConsumer2.java:46)
    at com.fumi.test.MyConsumer2.main(MyConsumer2.java:56)
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)
    ... 4 more

  • 相关阅读:
    将指定文件夹下所有图片转换成base64并返回数组
    SQL技巧
    yii 进行事务操作是不可以在一条sql里边放多条sql
    yii 直接执行sql
    按照特定方法排序
    表名为变量时的语法
    如何添加 actions
    触发器原理
    codeCeption 调试方法
    最长不下降子序列(LIS)
  • 原文地址:https://www.cnblogs.com/fillPv/p/9258757.html
Copyright © 2011-2022 走看看