zoukankan      html  css  js  c++  java
  • Apache Kafka Producer For Beginners

    在我们上一篇Kafka教程中,我们讨论了Kafka Cluster今天,我们将通过示例讨论Kafka Producer。此外,我们将看到KafkaProducer API和Producer API。此外,我们将学习Kafka Producer中的配置设置。最后,我们将在Kafka Producer教程中讨论简单的生产者应用程序。为了将消息发布到Apache Kafka主题,我们使用Kafka Producer。 
    那么,让我们详细探讨Apache Kafka Producer。

    Apache Kafka制片人

    卡夫卡初学者制片人

    1.什么是卡夫卡制片人?

    基本上,作为数据流源的应用程序就是我们所说的生产者。为了生成令牌或消息并进一步将其发布到Kafka集群中的一个或多个主题,我们使用Apache Kafka Producer。此外,Kafka的Producer API有助于打包消息或令牌并将其传递给Kafka Server。
    了解如何创建Kafka客户端
    此外,下图显示了Apache Kafka Producer的工作情况。

    Kafka Producer  -  Apache Kafka Producer Working

    Kafka Producer - Apache Kafka Producer Working

    Kafka Producer Client中提供了一些API。

    2. KafkaProducer API

    但是,要将记录流发布到一个或多个Kafka主题,此Kafka Producer API允许应用程序。而且,它的核心部分是KafkaProducer类。基本上,通过以下方法,此类提供了一个在其构造函数中连接Kafka代理的选项:

    • 为了将消息异步发送到主题,KafkaProducer类提供了send方法。所以,send()签名是:
    • producer.send(new ProducerRecord<byte[],byte[]>(topic,
      partition, key1, value1) , callback);
    • ProducerRecord - 通常,生产者管理等待发送的记录的缓冲区。
    • 回调- 当服务器确认记录时,用户提供的回调执行。

    注意:这里,null表示没有回调。
    我们来讨论Kafka-序列化和反序列化

    • 此外,为确保所有先前发送的消息已实际完成,KafkaProducer类提供了flush方法。所以,flush方法语法是 -
    1. public void flush()
    •  此外,为了获取给定主题的分区元数据,KafkaProducer类为方法提供分区。而且,我们可以将它用于自定义分区。所以,这种方法签名是:
    1. public Map metrics()

    这样,此方法返回生产者维护的内部指标的映射。

    • public void close()- 它还提供了一个close方法块,直到完成所有先前发送的请求。

    3.生产者API

    Producer类是Kafka Producer API的核心部分。通过以下方法,它提供了一个在其构造函数中连接Kafka代理的选项。

    一个。卡夫卡 制片人班

    基本上,要将消息发送到单个或多个主题,生产者类提供了一种发送方法。以下是我们可以使用签名

    1. public void send(KeyedMessaget<k,v> message)

     -将数据发送到单个主题, 使用同步或异步生成器按键进行分区。

    1. public void send(List<KeyedMessage<k,v>>messages)

    - 将数据发送到多个主题。
    使用JMeter查看Apache Kafka-Load测试

    Properties prop = new Properties();
    prop.put(producer.type,”async”)
    ProducerConfig config = new ProducerConfig(prop);
     

    但是,有两种类型的生成器,例如SyncAsync
    虽然,对于Sync生产者,适用相同的API配置。两者之间只有一个区别:Sync生成器直接发送消息但在后台发送消息,而当我们想要更高的吞吐量时,我们更喜欢Async生成器。但是,Async生成器没有send()的回调函数来注册先前版本中的错误处理程序,如0.8。它仅在当前版本的0.9中可用。

    湾 Public Void Close()

    为了关闭与所有Kafka代理的生成器池连接,producer类提供了一个public void close()方法。
    阅读Kafka用例和应用程序

    4. Kafka Producer API的配置设置

    在这里,我们列出了Kafka Producer API的主要配置设置:
    a。 client.id
    它标识生产者应用程序。
    湾 producer.type
    同步或异步。
    C。 acks
    基本上,它控制被认为完成的生产者请求的标准。
    d。 重试
    “重试”意味着如果生产者请求以某种方式失败,则自动重试特定值。
    即 bootstrap.servers
    它引导代理列表。
    F。 linger.ms
    基本上,如果我们想减少请求的数量,我们可以将linger.ms设置为大于某个值的值。
    G。 key.serializer
    它是串行器接口的关键。
    H。 value.serializer
    序列化程序接口的值。
    一世。 batch.size
    简单地说,缓冲区大小。
    学家 buffer.memory
    “buffer.memory”控制生产者可用于缓冲的总内存量。

    5. ProducerRecord API

    通过使用以下签名,它是发送到Kafka群集的键/值对。ProducerRecord类构造函数用于创建包含分区,键和值对的记录。
    public ProducerRecord(string topic,int partition,k key,v value)

    1. 主题 - 将附加到记录的用户定义主题名称。
    2. 分区 - 分区计数。
    3. 密钥 - 将包含在记录中的密钥。
    4. 价值 - 记录内容。

    public ProducerRecord(string topic,k key,v value)
    要使用键,值对和没有分区创建记录,我们使用ProducerRecord类构造函数。

    1. 主题 - 创建分配记录的主题。
    2. 密钥 - 记录的关键。
    3. 价值 - 记录内容。

    public ProducerRecord(string topic,v value)
    此外,没有分区和键,ProducerRecord类会创建一个记录。

    1. 主题 - 创建主题。
    2. 价值 - 记录内容。

    现在,我们在这里列出ProducerRecord类方法 - 
    1. public string topic()
    主题将附加到记录中。
    2. public K key()
    将包含在记录中的密钥。如果没有这样的键,则返回null。
    3. public V value()
    记录内容。
    4. partition()
    记录的分区计数。

    6.简单的卡夫卡制片人应用程序

    但是,请确保首先启动ZooKeeper和Kafka代理然后使用create topic命令在Kafka代理中创建您自己的主题。然后创建一个名为Sim-pleProducer.java Java类,并继续进行以下编码

    //import util.properties packages
    import java.util.Properties;
    //import simple producer packages
    import org.apache.kafka.clients.producer.Producer;
    //import KafkaProducer packages
    import org.apache.kafka.clients.producer.KafkaProducer;
    //import ProducerRecord packages
    import org.apache.kafka.clients.producer.ProducerRecord;
    //Create java class named “SimpleProducer”
    public class SimpleProducer {
      public static void main(String[] args) throws Exception{
         // Check arguments length value
         if(args.length == 0){
            System.out.println("Enter topic name”);
            return;
         }
         //Assign topicName to string variable
         String topicName = args[0].toString();
         // create instance for properties to access producer configs
         Properties props = new Properties();
         //Assign localhost id
         props.put("bootstrap.servers", “localhost:9092");
         //Set acknowledgements for producer requests.
         props.put("acks", “all");
         //If the request fails, the producer can automatically retry,
         props.put("retries", 0);
         //Specify buffer size in config
         props.put("batch.size", 16384);
         //Reduce the no of requests less than 0
         props.put("linger.ms", 1);
         //The buffer.memory controls the total amount of memory available to the producer for buffering.
         props.put("buffer.memory", 33554432);
         props.put("key.serializer",
            "org.apache.kafka.common.serializa-tion.StringSerializer");
         props.put("value.serializer",
            "org.apache.kafka.common.serializa-tion.StringSerializer");
         Producer<String, String> producer = new KafkaProducer
            <String, String>(props);
         for(int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>(topicName,
               Integer.toString(i), Integer.toString(i)));
                  System.out.println(“Message sent successfully”);
                  producer.close();
      }
    }

    一个。汇编

    通过使用以下命令,我们可以编译应用程序。
    我们来修改Kafka Commands

    //import util.properties packages
    import java.util.Properties;
    //import simple producer packages
    import org.apache.kafka.clients.producer.Producer;
    //import KafkaProducer packages
    import org.apache.kafka.clients.producer.KafkaProducer;
    //import ProducerRecord packages
    import org.apache.kafka.clients.producer.ProducerRecord;
    //Create java class named “SimpleProducer”
    public class SimpleProducer {
      public static void main(String[] args) throws Exception{
         // Check arguments length value
         if(args.length == 0){
            System.out.println("Enter topic name”);
            return;
         }
         //Assign topicName to string variable
         String topicName = args[0].toString();
         // create instance for properties to access producer configs
         Properties props = new Properties();
         //Assign localhost id
         props.put("bootstrap.servers", “localhost:9092");
         //Set acknowledgements for producer requests.
         props.put("acks", “all");
         //If the request fails, the producer can automatically retry,
         props.put("retries", 0);
         //Specify buffer size in config
         props.put("batch.size", 16384);
         //Reduce the no of requests less than 0
         props.put("linger.ms", 1);
         //The buffer.memory controls the total amount of memory available to the producer for buffering.
         props.put("buffer.memory", 33554432);
         props.put("key.serializer",
            "org.apache.kafka.common.serializa-tion.StringSerializer");
         props.put("value.serializer",
            "org.apache.kafka.common.serializa-tion.StringSerializer");
         Producer<String, String> producer = new KafkaProducer
            <String, String>(props);
         for(int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>(topicName,
               Integer.toString(i), Integer.toString(i)));
                  System.out.println(“Message sent successfully”);
                  producer.close();
      }
    }

    湾 执行

    此外,使用以下命令,我们可以执行该应用程序。

    1. java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

    C。产量

    消息已成功发送
    要检查上述输出,请打开新终端并键入Consumer CLI命令以接收消息。

    1. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning

    所以,这就是Apache Kafka Producer。希望你喜欢我们的解释。
    阅读Apache Kafka Workflow | Kafka Pub-Sub Messaging

    7.总结:卡夫卡制片人

    因此,在这个Kafka教程中,我们已经看到了Kafka Producer 的概念以及示例。现在,在下一个教程中,我们将了解Kafka Consumer,以便使用来自Kafka集群的消息。此外,我们学习了Producer API,Producer类,public void close。此外,我们还讨论了Kafka Producer API和Producer Record API的配置设置。最后,我们在编译,执行和输出的帮助下看到了SimpleProducer Application。此外,如果您有任何疑问,请随时在评论部分询问。 

  • 相关阅读:
    net.sf.jsqlparser.statement.select.PlainSelect.getGroupByColumnReferences()Ljava/util/List(版本问题)
    Netty ByteBuf
    Vertx session 使用须知
    用Vert.x shiro jdbcRealm对restful api鉴权
    Vert.x发送 HTTP/HTTPS请求及重定向
    解决“hao123”劫持浏览器主页
    cannot find module bcrypt_lib.node
    nodejs运行项目报错TypeError: db.collection is not a function
    [Java] Stream flatMap
    [Spring Security] Authotization
  • 原文地址:https://www.cnblogs.com/a00ium/p/10849983.html
Copyright © 2011-2022 走看看