zoukankan      html  css  js  c++  java
  • Atitit Kafka 使用总结 内容 Kafka2.0 50M1 启动 要启动zookeeper 先,比ativemp麻烦很多啊1 Kafka生产者 1 Kafka消费者2 2

    Atitit Kafka 使用总结

     

    内容

    Kafka2.0   50M1

    启动 要启动zookeeper 先,比ativemp麻烦很多啊1

    Kafka生产者 1

     Kafka消费者2

      2

     

     

     

    1. Kafka2.0   50M

     

    1. 启动 要启动zookeeper 先,比ativemp麻烦很多啊

     

     

    zookeeper-server-start.bat D:\kafka_2.11-2.1.0\config\zookeeper.properties

    //  D:\kafka_2.11-2.1.0\bin\windows\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

     

    D:\kafka_2.11-2.1.0\bin\windows\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

    //\\  kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

     

     

    1. Kafka生产者

     

    /**

     * Kafka生产者

     */

    public class KafkaProducerDemo extends Thread{

     

    public static void main(String[] args) throws InterruptedException, ExecutionException {

     

            

            Properties props = new Properties();

            props.put("bootstrap.servers", "127.0.0.1:9092");

            props.put("acks", "all");

            props.put("retries", 0);

            props.put("batch.size", 16384);

            props.put("linger.ms", 1);

            props.put("buffer.memory", 33554432);

            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            

            String message = "message_" ;

            String topic="hello_topic";

            Producer  producer =  new KafkaProducer(props);

            //Producer   producer =new KafkaProducer(props);

             //new Producer (new ProducerConfig(properties));

            Future<RecordMetadata> Future_RecordMetadata= producer.send(new ProducerRecord<String, String>(topic, "val332222"));

                System.out.println("Sent: " + message);

                System.out.println(Future_RecordMetadata.get());

     

    }

     

     

     Kafka消费者

      

    /**

     * Kafka消费者

     */

    public class KafkaConsumerCls  {

     

    public static void main(String[] args) {

    Properties props = new Properties();

     

      props.put("bootstrap.servers", "127.0.0.1:9092");

      props.put("group.id", "test");

        props.put("key.deserializer",  StringDeserializer.class);

        props.put("value.deserializer",  StringDeserializer.class);

     

            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

            consumer.subscribe(Arrays.asList("hello_topic"));

     

            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(100);

                for (ConsumerRecord<String, String> record : records) {

                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

                }

            }

    }

     

    kafka_2.12-1.1.0 生产与消费java实现示例 - cctext - 博客园

     

     

     

  • 相关阅读:
    科技部:中国131家独角兽企业 名单文字版
    Application_Start事件中用Timer做一个循环任务
    HttpRuntime.Cache再学习
    USB 3.0规范中译本 第10章 集线器,主机下行口以及设备上行口规范
    Vue.js 入门教程
    用python爬了自己的微信,原来好友都是这样的!
    小白到大神,Python 密集知识点汇总
    如何处理JS,css与smarty标签的冲突
    全新 Kali Linux 系统安装指南
    xshell连接centos与ubuntu
  • 原文地址:https://www.cnblogs.com/attilax/p/15197428.html
Copyright © 2011-2022 走看看