zoukankan      html  css  js  c++  java
  • Apache Kafka Producer For Beginners 2019(转发)

    原文:https://data-flair.training/blogs/kafka-producer/

    In our last Kafka Tutorial, we discussed Kafka Cluster.

    Today, we will discuss Kafka Producer with the example.

    Moreover, we will see KafkaProducer API and Producer API.

    Also, we will learn configurations settings in Kafka Producer.

    At last, we will discuss simple producer application in Kafka Producer tutorial.

    In order to publish messages to an Apache Kafka topic, we use Kafka Producer. 
    So, let’s explore Apache Kafka Producer in detail.

    1. What is Kafka Producer?

    Basically, an application that is the source of the data stream is what we call a producer.

    In order to generate tokens or messages and further publish it to one or more topics in the Kafka cluster, we use Apache Kafka Producer.

    Also, the Producer API from Kafka helps to pack the message or token and deliver it to Kafka Server.


    Learn How to Create Kafka Clients
    Further, the picture below is showing the working of Apache Kafka Producer.

    Kafka Producer - Apache Kafka Producer Working

    Kafka Producer – Apache Kafka Producer Working

     

    There are some API’s available in Kafka Producer Client.

    2. KafkaProducer API

    However, to publish a stream of records to one or more Kafka topics, this Kafka Producer API permits to an application.

    Moreover, its central part is KafkaProducer class.

    Basically, with the following methods, this class offers an option to connect a Kafka broker in its constructor:

    • In order to send messages asynchronously to a topic, KafkaProducer class provides send method. So, the signature of send() is:
    1. producer.send(new ProducerRecord<byte[],byte[]>(topic,partition, key1, value1) , callback);
    • ProducerRecord − Generally, the producer manages a buffer of records waiting to be sent.
    • Callback − When the record has been acknowledged by the server, a user-supplied callback to execute.

    Note: Here, null indicates no callback.
    Let’s discuss Kafka- serialization and deserialization

    • Moreover, to ensure all previously sent messages have been actually completed, KafkaProducer class provides a flush method. So, the syntax of the flush method is −
    1. public void flush()
    •  Also, to get the partition metadata for a given topic, KafkaProducer class provides the partition for method. Moreover, we can use it for custom partitioning. So, the signature of this method is:
    1. public Map metrics()

    In this way, this method returns the map of internal metrics maintained by the producer.

    • public void close() − It also offers a close method blocks until all previously sent requests are completed.

    3. Producer API

    Producer class is the central part of the Kafka Producer API.

    By the following methods, it offers an option to connect the Kafka broker in its constructor.

    a. Kafka Producer Class

    Basically, to send messages to either single or multiple topics, the producer class offers an send method. The following are the signatures we can use for it.

    1. public void send(KeyedMessaget<k,v> message)

     – sends the data to a single topic, partitioned by key using either sync or async producer.

    1. public void send(List<KeyedMessage<k,v>>messages)

    – sends data to multiple topics.
    Have a look at Apache Kafka-Load Test with JMeter

    1. Properties prop = new Properties();
    2. prop.put(producer.type,”async”)
    3. ProducerConfig config = new ProducerConfig(prop);

    However, there are two types of producers, such as Sync and Async.
    Although, to Sync producer, the same API configuration applies.

    There is only one difference in both: Sync producer sends messages directly but in the background whereas, when we want higher throughput, we prefer the Async producer.

    However, an Async producer does not have a callback for send() to register error handlers in the previous releases like 0.8. It is only available in the current release of 0.9.

    b. Public Void Close()

    In order to close the producer pool connections to all Kafka brokers, producer class offers a public void close() method.
    Read Kafka Use Cases and Applications

    4. Configuration Settings For Kafka Producer API

    Here, we are listing the Kafka Producer API’s main configuration settings:
    a. client.id
    It identifies producer application.


    b. producer.type
    Either sync or async.


    c. acks
    Basically, it controls the criteria for producer requests that are considered complete.


    d. retries
    “Retries” means if somehow producer request fails, then automatically retry with the specific value.


    e. bootstrap.servers
    It bootstraps list of brokers.


    f. linger.ms
    Basically, we can set linger.ms to something greater than some value, if we want to reduce the number of requests.


    g. key.serializer
    It is a key for the serializer interface.


    h. value.serializer
    A value for the serializer interface.


    i. batch.size
    Simply, Buffer size.


    j. buffer.memory
    “buffer.memory” controls the total amount of memory available to the producer for buffering.

    5. ProducerRecord API

    By using the following signature, it is a key/value pair that is sent to the Kafka cluster.

    ProducerRecord class constructor is for creating a record with partition, key and value pairs.


    public ProducerRecord (string topic, int partition, k key, v value)

    1. Topic − user-defined topic name that will append to record.
    2. Partition − partition count.
    3. Key − The key that will be included in the record.
    4. Value − Record contents.

    public ProducerRecord (string topic, k key, v value)
    To create a record with the key, value pairs and without partition, we use the ProducerRecord class constructor.

    1. Topic − Create a topic to assign record.
    2. Key − key for the record.
    3. Value − Record contents.

    public ProducerRecord (string topic, v value)
    Moreover, without partition and key, ProducerRecord class creates a record.

    1. Topic − Create a topic.
    2. Value − Record contents.

    Now, here we are listing the ProducerRecord class methods −
    1. public string topic()
    The topic will append to the record.


    2. public K key()
    Key that will be included in the record. If no such key, null will be returned here.


    3. public V value()
    To record contents.


    4. partition()
    Partition count for the record.

    6. Simple Kafka Producer Application

    However, make sure that first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. Then create a Java class named Sim-pleProducer.java and proceed with the following coding:

    //import util.properties packages
    import java.util.Properties;
    //import simple producer packages
    import org.apache.kafka.clients.producer.Producer;
    //import KafkaProducer packages
    import org.apache.kafka.clients.producer.KafkaProducer;
    //import ProducerRecord packages
    import org.apache.kafka.clients.producer.ProducerRecord;
    //Create java class named “SimpleProducer”
    public class SimpleProducer {
      public static void main(String[] args) throws Exception{
         // Check arguments length value
         if(args.length == 0){
            System.out.println("Enter topic name”);
            return;
         }
         //Assign topicName to string variable
         String topicName = args[0].toString();
         // create instance for properties to access producer configs
         Properties props = new Properties();
         //Assign localhost id
         props.put("bootstrap.servers", “localhost:9092");
         //Set acknowledgements for producer requests.
         props.put("acks", “all");
         //If the request fails, the producer can automatically retry,
         props.put("retries", 0);
         //Specify buffer size in config
         props.put("batch.size", 16384);
         //Reduce the no of requests less than 0
         props.put("linger.ms", 1);
         //The buffer.memory controls the total amount of memory available to the producer for buffering.
         props.put("buffer.memory", 33554432);
         props.put("key.serializer",
            "org.apache.kafka.common.serializa-tion.StringSerializer");
         props.put("value.serializer",
            "org.apache.kafka.common.serializa-tion.StringSerializer");
         Producer<String, String> producer = new KafkaProducer
            <String, String>(props);
         for(int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>(topicName,
               Integer.toString(i), Integer.toString(i)));
                  System.out.println(“Message sent successfully”);
                  producer.close();
      }
    }
    

      

    a. Compilation

    By using the following command, we can compile the application.
    Let’s revise Kafka Commands

     

    b. Execution

    Further, using the following command, we can execute the application.

    1. java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
     

    c. Output

    Message sent successfully
    To check the above output open the new terminal and type Consumer CLI command to receive messages.

    1. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    So, this was all about Apache Kafka Producer. Hope you like our explanation.
    Read Apache Kafka Workflow | Kafka Pub-Sub Messaging

    7. Summary: Kafka Producer

    Hence, in this Kafka Tutorial, we have seen the concept of Kafka Producer along with the example.

    Now, in the next tutorial, we will learn about the Kafka Consumer, in order to consume messages from the Kafka cluster.

    Further, we have learned Producer API, Producer class, public void close. Also, we discussed the configuration setting for the Kafka Producer API and Producer Record API.

    Finally, we saw SimpleProducer Application with the help of compilation, execution, and output.

    Furthermore, if you have any doubt, feel free to ask in the comment section. 

  • 相关阅读:
    网络并发服务器设计
    linux脚本编程技术
    守护进程学习
    UDP通讯程序设计
    TCP通讯程序设计
    linux中socket的理解
    linux网络协议
    kafka ProducerConfig 配置
    crontab定时执行datax
    crontab
  • 原文地址:https://www.cnblogs.com/panpanwelcome/p/13534349.html
Copyright © 2011-2022 走看看