zoukankan      html  css  js  c++  java
  • KafkaProducer【java版】

    前置:

    启动zk进程和kafka进程
    zkServer.sh start
    nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties &
    使用tail -F nohup.out查看启动的日志

    1.创建一个topic

    $KAFKA_HOME/bin/kafka-topics.sh -create -zookeeper spark001:2181 -replication-factor 1 -partitions 1 -topic lgoffset
    

    ps:topic 查看
    kafka-topics.sh -list -zookeeper spark001:2181

    2.先使用控制台测试,Kafka是否正常工作
    控制台生产者:

    kafka-console-producer.sh --broker-list spark001:9092 --topic lgoffset
    

    控制台消费者:

    kafka-console-consumer.sh --zookeeper spark001:2181 --topic lgoffset
    

    这个时候,生产者生产数据,消费者能够流出数据就正常。

    保证了这一步,你就可以做很多测试了。比如生产者可以是flume采集的日志,也可以是java 代码。

    KafkaProducerV2 【kafka-0-10_2.11版本】

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
          <version>2.4.0</version>
      </dependency>
    

    java代码如下:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * Description: kafka-0-10_2.11
     *
     * @Author: 留歌36
     * @Date: 2019/5/26 12:08
     */
    public class KafkaProducerV2 {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("bootstrap.servers", "192.168.1.103:9092");
    
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            for (int i = 0; i < 100; i++){
                producer.send(
                        new ProducerRecord<String, String>(
                                "test", Integer.toString(i), "留歌------------"+Integer.toString(i)
                        )
                );
            }
            System.out.println("==================留歌生产数据完毕================");
    
        }
    }
    
    
    
    

    运行上面的代码,就能够在 kafka-console-consumer 看见生产的消息了。
    这里简单说明,不同版本的Kafka可能需要的参数是不一样的。

    参照官网:
    https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

    KafkaProducerV1 【kafka-0-8_2.11版本】

        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    

    java代码如下:

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    
    import java.io.Serializable;
    import java.util.Properties;
    import java.util.UUID;
    
    
    /**
     * Description:
     <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
     <version>${spark.version}</version>2.4.0
     </dependency> *
     * @Author: 留歌36
     * @Date: 2019/5/26 12:34
     */
    public class KafkaProducerV1 implements Serializable {
        public static void main(String[] args) {
    
            Properties properties = new Properties();
            properties.put("serializer.class","kafka.serializer.StringEncoder");
            properties.put("metadata.broker.list","192.168.1.103:9092");
            properties.put("request.required.acks","1");
    
    
            ProducerConfig producerConfig = new ProducerConfig(properties);
    
            Producer<String,String> producer = new Producer<String, String>(producerConfig);
    
            String topic = "test";
            for (int i = 0; i < 100; i++) {
                producer.send(new KeyedMessage<String, String>(topic, i+"", "测试数据:" + UUID.randomUUID()));
            }
            System.out.println("==================留歌生产数据完毕V0.8================");
    
        }
    }
    
    

    两个版本的都是ok的

  • 相关阅读:
    WPF 便签项目
    .NET下WPF学习之Socket通信
    DEV控件
    字符串位数补足
    VS2008设置断点不命中
    错误描述: 242000021
    关闭Win10自带的 Windows Defender
    启用与关闭 Ad Hoc Distributed Queries
    Date工具类
    数据字段脱敏
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614743.html
Copyright © 2011-2022 走看看