zoukankan      html  css  js  c++  java
  • Atitit Kafka 使用总结 内容 Kafka2.0 50M1 启动 要启动zookeeper 先,比ativemp麻烦很多啊1 Kafka生产者 1 Kafka消费者2 2

    Atitit Kafka 使用总结

     

    内容

    Kafka2.0   50M1

    启动 要启动zookeeper 先,比ativemp麻烦很多啊1

    Kafka生产者 1

     Kafka消费者2

      2

     

     

     

    1. Kafka2.0   50M

     

    1. 启动 要启动zookeeper 先,比ativemp麻烦很多啊

     

     

    zookeeper-server-start.bat D:\kafka_2.11-2.1.0\config\zookeeper.properties

    //  D:\kafka_2.11-2.1.0\bin\windows\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

     

    D:\kafka_2.11-2.1.0\bin\windows\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

    //\\  kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

     

     

    1. Kafka生产者

     

    /**

     * Kafka生产者

     */

    public class KafkaProducerDemo extends Thread{

     

    public static void main(String[] args) throws InterruptedException, ExecutionException {

     

            

            Properties props = new Properties();

            props.put("bootstrap.servers", "127.0.0.1:9092");

            props.put("acks", "all");

            props.put("retries", 0);

            props.put("batch.size", 16384);

            props.put("linger.ms", 1);

            props.put("buffer.memory", 33554432);

            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            

            String message = "message_" ;

            String topic="hello_topic";

            Producer  producer =  new KafkaProducer(props);

            //Producer   producer =new KafkaProducer(props);

             //new Producer (new ProducerConfig(properties));

            Future<RecordMetadata> Future_RecordMetadata= producer.send(new ProducerRecord<String, String>(topic, "val332222"));

                System.out.println("Sent: " + message);

                System.out.println(Future_RecordMetadata.get());

     

    }

     

     

     Kafka消费者

      

    /**

     * Kafka消费者

     */

    public class KafkaConsumerCls  {

     

    public static void main(String[] args) {

    Properties props = new Properties();

     

      props.put("bootstrap.servers", "127.0.0.1:9092");

      props.put("group.id", "test");

        props.put("key.deserializer",  StringDeserializer.class);

        props.put("value.deserializer",  StringDeserializer.class);

     

            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

            consumer.subscribe(Arrays.asList("hello_topic"));

     

            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(100);

                for (ConsumerRecord<String, String> record : records) {

                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

                }

            }

    }

     

    kafka_2.12-1.1.0 生产与消费java实现示例 - cctext - 博客园

     

     

     

  • 相关阅读:
    vue打包传递参数配置域名
    相同域名nginx下部署两个vue项目
    vue项目改造服务端渲染
    vue项目使用less全局变量
    postMessage跨域实现localstorage跨域共享
    node_webkit打包成桌面应用程序
    vue项目本地服务器设置既能localhost访问又能手机ip访问
    GATT scan的流程
    Windows下面的常用的快捷键
    把驱动编译进内核和编译成模块
  • 原文地址:https://www.cnblogs.com/attilax/p/15197428.html
Copyright © 2011-2022 走看看