zoukankan      html  css  js  c++  java
  • kafka急速入门与核心API解析

    kafka环境安装

    上一节课我们已经对kafka的基本概念、核心思想有了一定的了解和认知,并且掌握了kafka在实际工作中的一些主要的应用场景。那么接下来,我们就一起快速进入kafka的安装吧。

    • kafka下载地址:http://kafka.apache.org/downloads.html

    • kafka安装环境介绍:

      节点名称节点作用节点备注
      hostname:192.168.11.111 zookeeper节点 kafka注册、配置中心
      hostname:192.168.11.112 zookeeper节点 kafka注册、配置中心
      hostname:192.168.11.113 zookeeper节点 kafka注册、配置中心
      hostname:192.168.11.51 kafka节点 此节点为kafka broker
    • kafka安装步骤:首先kafka安装需要依赖与zookeeper,所以小伙伴们先准备好zookeeper环境(三个节点即可),然后我们来一起构建kafka broker。

      ## 解压命令:
      tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
      ## 改名命令:
      mv kafka_2.12-2.1.0/ kafka_2.12
      ## 进入解压后的目录,修改server.properties文件:
      vim /usr/local/kafka_2.12/config/server.properties
      ## 修改配置:
      broker.id=0
      port=9092
      host.name=192.168.11.51
      advertised.host.name=192.168.11.51
      log.dirs=/usr/local/kafka_2.12/kafka-logs
      num.partitions=2
      zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
      
      ## 建立日志文件夹:
      mkdir /usr/local/kafka_2.12/kafka-logs
      
      ##启动kafka:
      /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
      

    kafka常用命令


    我们接下来一起了解几个非常重要的命令,通过这些命令我们对kafka topic partition 进行查看和操作。

    • 常用命令:

      ## 简单操作:
      #(1)创建topic主题命令:(创建名为test的topic, 1个分区分别存放数据,数据备份总共1份)
      kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic topic1 --partitions 1  --replication-factor 1  
      ## --zookeeper 为zookeeper服务列表地址配置项,这里任意指定zookeeper其中一个服务列表地址即可
      ## --create 命令后 --topic 为创建topic 并指定 topic name
      ## --partitions 为指定分区数量配置项
      ## --replication-factor 为指定副本集数量配置项
      
      #(2)查看topic列表命令:
      kafka-topics.sh --zookeeper 192.168.11.111:2181 --list
      
      #(3)kafka命令发送数据:(然后我们就可以编写数据发送出去了)
      kafka-console-producer.sh --broker-list 192.168.11.51:9092 --topic topic1
      ## --brokerlist kafka服务的broker节点列表
      
      #(4)kafka命令接受数据:(然后我们就可以看到消费的信息了)
      kafka-console-consumer.sh --bootstrap-server 192.168.11.51:9092 --topic topic1 --from-beginning
      
      
      #(5)删除topic命令:
      kafka-topics.sh --zookeeper 192.168.11.111:2181 --delete --topic topic1
      
      #(6)kafka查看消费进度:(当我们需要查看一个消费者组的消费进度时,则使用下面的命令)
      kafka-consumer-groups.sh --bootstrap-server 192.168.11.51:9092 --describe --group group1
      ## --describe --group 为订阅组, 后面指定 group name
      

    急速入门


    下面我们一起使用kafka最基本的API来对kafka进行操作!

    • kafka依赖包:

      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.12</artifactId>
      </dependency> 
      
    • kafka生产者:

      package com.bfxy.mix.kafka;
      
      import java.util.Properties;
      import org.apache.kafka.clients.producer.Callback;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import com.alibaba.fastjson.JSON;
      
      public class CollectKafkaProducer {
      
          // 创建一个kafka生产者
      	private final KafkaProducer<String, String> producer;
      	// 定义一个成员变量为topic
      	private final String topic;
          
           // 初始化kafka的配置文件和实例:Properties & KafkaProducer
      	public CollectKafkaProducer(String topic) { 
      		Properties props = new Properties(); 
               // 配置broker地址
      		props.put("bootstrap.servers", "192.168.11.51:9092"); 
               // 定义一个 client.id
      		props.put("client.id", "demo-producer-test"); 
      		
               // 其他配置项:
              
      //		props.put("batch.size", 16384);			//16KB -> 满足16KB发送批量消息
      //		props.put("linger.ms", 10); 			//10ms -> 满足10ms时间间隔发送批量消息
      //		props.put("buffer.memory", 33554432);	 //32M -> 缓存提性能
      		
               // kafka 序列化配置:
      		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
               
               // 创建 KafkaProducer 与 接收 topic
      		this.producer = new KafkaProducer<>(props);
      		this.topic = topic; 
      	}
      
          // 发送消息 (同步或者异步)
      	public void send(Object message, boolean syncSend) throws InterruptedException { 
      		try { 
                   // 同步发送
      			if(syncSend) {
      				producer.send(new ProducerRecord<>(topic, JSON.toJSONString(message)));		
      			} 
                   // 异步发送(callback实现回调监听)
                   else {
      				producer.send(new ProducerRecord<>(topic, 
                                    JSON.toJSONString(message)), 
                                    new Callback() {
      					@Override
      					public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      	                    if (e != null) {
      	                        System.err.println("Unable to write to Kafka in CollectKafkaProducer [" + topic + "] exception: " + e);
      	                    }
      					}
      				});				
      			}
      		} catch (Exception e) {
      			e.printStackTrace(); 
      		} 
      	} 
      	
          // 关闭producer
      	public void close() {
      		producer.close();
      	}
      
          // 测试函数
      	public static void main(String[] args) throws InterruptedException {
      		String topic = "topic1";
      		CollectKafkaProducer collectKafkaProducer = new CollectKafkaProducer(topic);
      
      		for(int i = 0 ; i < 10; i ++) {
      			User user = new User();
      			user.setId(i+"");
      			user.setName("张三");
      			collectKafkaProducer.send(user, true);
      		}
              
      		Thread.sleep(Integer.MAX_VALUE);
      	
      	}
      	
      }
      
      
    • kafka消费者:

      package com.bfxy.mix.kafka;
      
      import java.util.ArrayList;
      import java.util.Collection;
      import java.util.Collections;
      import java.util.List;
      import java.util.Properties;
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.consumer.OffsetAndMetadata;
      import org.apache.kafka.common.TopicPartition;
      
      import kafka.consumer.Consumer;
      import lombok.extern.slf4j.Slf4j;
      
      @Slf4j
      public class CollectKafkaConsumer {
      	
          // 定义消费者实例
      	private final KafkaConsumer<String, String> consumer;
      	// 定义消费主题
      	private final String topic;
      
      
           // 消费者初始化
      	public CollectKafkaConsumer(String topic) { 
      		Properties props = new Properties();
               // 消费者的zookeeper 地址配置
      		props.put("zookeeper.connect", "192.168.11.111:2181"); 
               // 消费者的broker 地址配置
      		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.11.51:9092");
      		// 消费者组定义
               props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group-id"); 
               // 是否自动提交(auto commit,一般生产环境均设置为false,则为手工确认)
      		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
               // 自动提交配置项
      //		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
               // 消费进度(位置 offset)重要设置: latest,earliest 
      		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
               // 超时时间配置
      		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); 
      		// kafka序列化配置
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
      		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
              
               // 创建consumer对象 & 赋值topic
      		consumer = new KafkaConsumer<>(props); 
      		this.topic = topic; 
               // 订阅消费主题
      		consumer.subscribe(Collections.singletonList(topic));
      		
      	} 
      	
      	// 循环拉取消息并进行消费,手工ACK方式
          private void receive(KafkaConsumer<String, String> consumer) {
              while (true) {
                  // 	拉取结果集(拉取超时时间为1秒)
              	ConsumerRecords<String, String> records = consumer.poll(1000);
                  //  拉取结果集后获取具体消息的主题名称 分区位置 消息数量
                  for (TopicPartition partition : records.partitions()) {
                      List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                      String topic = partition.topic();
                      int size = partitionRecords.size();
                      log.info("获取topic:{},分区位置:{},消息数为:{}", topic, partition.partition(), size);
                  	// 分别对每个partition进行处理
                      for (int i = 0; i< size; i++) {
                      	System.err.println("-----> value: " + partitionRecords.get(i).value());
                          long offset = partitionRecords.get(i).offset() + 1;
                      	// consumer.commitSync(); // 这种提交会自动获取partition 和 offset 
                           // 这种是显示提交partition 和 offset 进度
      				   consumer.commitSync(Collections.singletonMap(partition, 
                                                                  new OffsetAndMetadata(offset)));
                          log.info("同步成功, topic: {}, 提交的 offset: {} ", topic, offset);
                      }
      
                  }
              }
          }
      	
          // 测试函数
      	public static void main(String[] args) {
      		String topic = "topic1";
      		CollectKafkaConsumer collectKafkaConsumer = new CollectKafkaConsumer(topic);
      		collectKafkaConsumer.receive(collectKafkaConsumer.consumer);
      	}
      }
      
      
     
  • 相关阅读:
    Dll 入口函数
    .net中Global.asax
    jquery图片翻转
    c#发送邮件
    jquery ajax里面的datetype设成json时,提交不了数据的问题
    今儿写前台tab效果时用的,(jquery)
    C# 的一些常用日期时间函数(老用不熟)
    下午的表单注册~~~
    CSS图片、文字垂直居中对齐
    asp.net 用继承方法实现页面判断session
  • 原文地址:https://www.cnblogs.com/wjx6270/p/13426080.html
Copyright © 2011-2022 走看看