zoukankan      html  css  js  c++  java
  • kafka学习总结007 --- 生产者Java API实例

    事先说明,本文的所有实例均基于kafka2.5.0开发;依赖的jar包

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.5.0</version>
            </dependency>

    创建topic的方法

      private static final String BOOTSTRAP_SERVER = "192.168.1.8:9091,192.168.1.8:9092,192.168.1.8:9093";
      public static void createTopic(String topicName) {
            Properties properties = new Properties();
            properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
            AdminClient adminClient = KafkaAdminClient.create(properties);
            NewTopic newTopic = new NewTopic(topicName,2, (short) 3);
            adminClient.createTopics(Collections.singletonList(newTopic));
            adminClient.close();
        }

    执行后创建topic成功:

    kafka生产消息有同步和异步两种方式:

    1. 创建生产者的方法

        public static KafkaProducer<String, String> createProducer() {
            Properties properties = new Properties();
            properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return new KafkaProducer<>(properties);
        }

    2. 同步生产消息

    public class MySyncProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    KafkaProducer<String, String> producer = KafkaTestUtil.createProducer();
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaTestContants.SECOND_TOPIC, "first msg");
    RecordMetadata record = producer.send(producerRecord).get();
    System.out.println("Producer msg: partition=" + record.partition() + ", offset=" + record.offset());
    producer.close();
    }
    }

    3. 异步生产消息

    public class MyAsyncProducer {
        public static void main(String[] args) {
            KafkaProducer<String, String> producer = KafkaTestUtil.createProducer();
            for (int i = 0; i < 4; i++) {
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaTestContants.SECOND_TOPIC, "0617 msg" + i);
                producer.send(producerRecord, (RecordMetadata metadata, Exception exception) -> {
                    if (null != exception) {
                        exception.printStackTrace();
                    }
    
                    if (null != metadata) {
                        System.out.println("Producer msg: partition=" + metadata.partition() + ", offset=" + metadata.offset());
                    }
                });
            }
            producer.close();
        }
    }

    相关命令:

    查看topic详细信息:
    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic java-api-0617-topic --describe

    查看kafka log数据:
    bin/kafka-run-class.sh kafka.tools.DumpLogSegments -print-data-log -files kafka-logs/java-api-0617-topic-0/00000000000000000000.log

    查看某个消费组消费情况:
    bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.8:9091,192.168.1.8:9092,192.168.1.8:9093 --group group1 --describe

  • 相关阅读:
    xml传数据
    简单实用的GroupBox控件
    漂亮的NavMenu导航控件
    使用设计模式构建通用数据库访问类
    Windows路由表详解
    zz Linux Shell常用技巧(目录)
    Ubuntu Linux 环境变量PATH设置
    zz eclipse.ini内存设置
    find 用法
    zz【java规范】Java spi机制浅谈
  • 原文地址:https://www.cnblogs.com/sniffs/p/13149527.html
Copyright © 2011-2022 走看看