zoukankan      html  css  js  c++  java
  • kafka集群配置和java编写生产者消费者操作例子

    kafka

    kafka的操作相对来说简单很多

    安装

    下载kafka http://kafka.apache.org/downloads
    
    tar -zxvf kafka_2.12-2.1.0.tgz
    rm kafka_2.12-2.1.0.tgz
    mv kafka_2.12-2.1.0 kafka
    
    sudo vim /etc/profile
        export KAFKA_HOME=/usr/local/kafka
        export PATH=$PATH:$KAFKA_HOME/bin
    
    source /etc/profile
    
    准备 worker1 worker2 worker3 这四台机器
    
    首先确保你的zookeeper集群能够正常运行worker1 worker2 worker3为zk集群
    具体配置参照我的博客https://www.cnblogs.com/ye-hcj/p/9889585.html
    

    修改配置文件

    1. server.properties

      sudo vim server.properties
      添加如下属性
      broker.id=0 # 3台机器分别设置成0 1 2
      log.dirs=/usr/local/kafka/logs
      zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
      
    2. 运行

      运行 
          bin/kafka-server-start.sh config/server.properties
      创建topic 
          bin/kafka-topics.sh --create --zookeeper worker1:2181 --replication-factor 2 --partitions 2 --topic test
      查看topic
          bin/kafka-topics.sh --list --zookeeper worker1:2181
      订阅topic,利用worker2来订阅
          bin/kafka-console-consumer.sh --bootstrap-server worker1:9092 --topic test --from-beginning
      topic发送消息
          bin/kafka-console-producer.sh --broker-list worker1:9092 --topic test
          键入任何消息,worker2都能接收到
      查看topic详情
          bin/kafka-topics.sh --describe --zookeeper worker1:2181 --topic test
      

    java操作kafka

    1. 依赖

      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.12</artifactId>
          <version>2.1.0</version>
      </dependency>
      
    2. 生产者

      public class Producer 
      {
          public static void main( String[] args ){
              Properties props = new Properties();
              // 服务器ip
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
              // 属性键值对都序列化成字符串
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      
              // 创建一个生产者,向test主题发送数据
              KafkaProducer<String, String> producer = new KafkaProducer<>(props);
              producer.send(new ProducerRecord<String, String>("test", "生产者传递的消息"));
              producer.close();
          }
      }
      
    3. 消费者

      public class Consumer 
      {
          public static void main( String[] args ){
              Properties props = new Properties();
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
              props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
              props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
              props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      
              // 消费者对象
              KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
              kafkaConsumer.subscribe(Arrays.asList("test"));
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.between(
                  LocalDateTime.parse("2019-01-09T11:30:30"), LocalDateTime.now()));
                  for (ConsumerRecord<String, String> record : records) {
                      System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                      System.out.println();
                  }
              }
          }
      }
      
  • 相关阅读:
    Django在新浪SAE中使用storage服务实现文件上传保存
    安装mysql5.1.30时mysql_install_db出现FATAL ERROR: Could not find mysqld错误解决
    Ubuntu下的负载均衡Web集群配置
    ERROR 2002 (HY000): Can't connect to local MySQL server through so...
    Ubuntu下的C/C++环境搭建
    php linux sphinx 安装
    sphinx安装步骤
    Ubuntu下vmwaretools安装
    LAMP全新安装 Linux+Apache+MySQL+PHP+phpMyadmin+Zend
    ubuntu 彻底删除MySQL数据库实操
  • 原文地址:https://www.cnblogs.com/ye-hcj/p/10260954.html
Copyright © 2011-2022 走看看