zoukankan      html  css  js  c++  java
  • Kafka客户端Producer与Consumer

    一、pom.xml

     <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.8.2.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.8.2.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.3</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-resources-plugin</artifactId>
                    <configuration>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <finalName>${project.artifactId}-${project.version}</finalName>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                            <exclude>META-INF/LICENSE*</exclude>
                                            <exclude>META-INF/NOTICE*</exclude>
                                            <exclude>license/*</exclude>
                                            <exclude>LICENSE*</exclude>
                                            <exclude>NOTICE*</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
                                    </transformer>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.bigData.DataProducer</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    

    二、相关配置文件

    producer.properties

    #acks=1
    bootstrap.servers=alary001:9092,alary002:9092,alary003:9092
    retries=2
    batch.size=16384
    linger.ms=1
    buffer.memory=33554432
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    

    log4j.properties

    # Output pattern : date [thread] priority category - message   FATAL 0  ERROR 3  WARN 4  INFO 6  DEBUG 7
    log4j.rootLogger=INFO, Console
    
    #Console
    log4j.appender.Console=org.apache.log4j.ConsoleAppender
    log4j.appender.Console.layout=org.apache.log4j.PatternLayout
    log4j.appender.Console.layout.ConversionPattern=%d %-5p [%c{5}] - %m%n
    
    

    base.properties

    topic=Data_Server
    

    三、Producer客户端

    在集群上启动zookeeper

    zkServer.sh start

    查看zookeeper的状态

    zkServer.sh status

    启动kafka集群:

    kafka-server-start.sh config/server.properties &

    创建新的topic

    kafka-topics.sh --create --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --replication-factor 3 --partitions 3 --topic Data_Server

    查看topic副本信息

    kafka-topics.sh --describe alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --replication-factor 3 --partitions 3 --topic Data_Server

    查看已经创建的topic信息

    kafka-topics.sh --list --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0

    测试生产者发送消息

    bin/kafka-console-producer.sh --broker-list alary001:9092,alary002:9092,alary003:9092 --topic Data_Server

    测试消费者消费消息

    kafka-console-consumer.sh --bootstrap-server alary001:9092,alary002:9092,alary003:9092 --from-beginning --topic Data_Server

    删除topic

    bin/kafka-topics.sh --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --delete --topic Data_Server
    需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

    停止Kafka服务

    kafka-server-stop.sh stop

    停止zookeeper集群

    zkServer.sh stop

    package com.zlkj.producer;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.Properties;
    
    public class DataTransmission {
        
        private static final Logger logger = LoggerFactory.getLogger(com.zlkj.producer.DataTransmission.class);
    
        public static void main(String[] args) {
            Properties baseConfiguration = new Properties();
            Properties producerConfiguration = new Properties();
            try {
                baseConfiguration.load(com.zlkj.producer.DataTransmission.class.getResourceAsStream("/base.properties"));
                if (args != null && args.length > 0 && StringUtils.isNoneBlank(args[0])) {
                    producerConfiguration.load(new FileInputStream(args[0]));
                } else {
                    producerConfiguration.load(com.zlkj.producer.DataTransmission.class.getResourceAsStream("/producer.properties"));
                }
            } catch (IOException e) {
                logger.error("=================加载配置异常=================");
            }
    
            //发送消息
            Producer producer = new KafkaProducer<String, String>(producerConfiguration);
            for (int i = 1; i <= 10; i++) {
                String value = "value_" + i;
                logger.info("发送的消息: {}", value);
                ProducerRecord<String, String> msg = new ProducerRecord<String, String>(baseConfiguration.getProperty("topic"), value);
                producer.send(msg);
            }
            producer.close();
        }
    }
    
    
  • 相关阅读:
    使用django开发一个web项目初试
    关于github一些比较重要的命令
    删除github repository的方法
    Node、npm与Vue配置与问题记录
    [记忆]5月第二周
    git am patch冲突解决步骤
    git丢弃本地修改的几种方式
    Log4j 2.X 漏洞解决
    Maven依赖范围Scope及传递性依赖
    Mysql创建事件定时任务
  • 原文地址:https://www.cnblogs.com/aixing/p/13327395.html
Copyright © 2011-2022 走看看