zoukankan      html  css  js  c++  java
  • kafka学习笔记


    一,kafka概述

       Kafka是一个高吞吐量的、持久性的、分布式发布/订阅消息系统。

       它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。

      在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
      Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
      Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker--服务器节点。
      无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性

       三大特点:

      高吞吐量:可以满足每秒百万级别消息的生产和消费——生产消费。需要硬件支撑
      持久性:有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。
      分布式:基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体健壮性。

    二,kafka核心组件

      Topic: Kafka处理的消息的不同分类。  

         Broker:消息代理,Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。  

      Partition:Topic物理上的分组,一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定。 

      Message:消息,是通信的基本单位,每个消息都属于一个partition> Kafka服务相关  

      Producer:消息和数据的生产者,向Kafka的一个topic发布消息。 

      Consumer:消息和数据的消费者,定于topic并处理其发布的消息。 

      Zookeeper:协调kafka的正常运行。

    三,kafka集群部署

      1,下载,解压安装包。关闭防火墙。

      2,修改配置文件。

    //全局唯一编号,不能重复
    broker.id=0
    //监听连接的端口,producer或consumer将在此端口建立连接
    port=9092
    
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    ocket.send.buffer.bytes=102400
    
    log.dirs=/home/kafka-logs
    num.partitions=2
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    

      3,分发安装包

    scp -r /export/servers/kafka_2.11-0.8.2.2 kafka02:/export/servers
    然后分别在各机器上创建软连
    cd /export/servers/	
    ln -s kafka_2.11-0.8.2.2 kafka
    

      4,再次修改配置文件

      依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复。

      5,启动集群

      依次在各节点上启动kafka

      bin/kafka-server-start.sh  config/server.properties

    四,kafka常用操作的命令

    查看当前服务器中的所有topic
    bin/kafka-topics.sh --list --zookeeper  server1:2181
    创建topic
    bin/kafka-topics.sh --create --zookeeper server1:2181 --replication-factor 1 --partitions 1 --topic test
    replication-factor 备份个数
    删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
    sh bin/kafka-topics.sh --delete --zookeeper server1:2181 --topic test
    需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
    通过shell命令发送消息
    kafka-console-producer.sh --broker-list server1:9092 --topic t
    通过shell消费消息
    sh bin/kafka-console-consumer.sh --zookeeper server1:2181 --from-beginning --topic t
    查看消费位置
    sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
    查看某个Topic的详情
    sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181
    

      

    五,通过java调用kafka

      1,priduce代码

    package com;
    
    
    import java.util.Properties;
    import java.util.Scanner;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    /**
     * 模拟生产者发送消息,通过循环去发送消息给kafka
     *
     *
     */
    public class producer {
    private static KafkaProducer<String, String> producer ;
    @SuppressWarnings("resource")
    public static void main(String[] args) {
    	Properties perties=new Properties();
    	//服務器地址
    	perties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server1:9092");
    	//客户端的名字  随便起 的
    	perties.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
    	perties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    	perties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    	producer=new KafkaProducer<String, String>(perties);
    	int num=0;
    	while(true){
    	
    		String data=new Scanner(System.in).nextLine();
    		producer.send(new ProducerRecord<String, String>("test1",num+"", data+num), new Callback() {
    			
    			@Override
    			public void onCompletion(RecordMetadata metadata, Exception exception) {
    				// TODO Auto-generated method stub
    				if(metadata!=null)
    				System.out.println(metadata.toString());
    			}
    		});
    		num++;
    	}
    }
    }
    

      2,consumer代码

    package com;
    
    import java.util.Collections;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    
    /**kafka-topics.sh --list --zookeeper  server1:2181
     *
     *
     */
    public class consumer {
    	
    	private static KafkaConsumer<String, String> consumer;
    public static void main(String[] args) {
    	Properties perties=new Properties();
    	//服務器地址
    	perties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"server1:9092");
    	perties.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
    	perties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    	perties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    	perties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    	perties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    	perties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    	consumer=new KafkaConsumer<>(perties);
    	
    	consumer.subscribe(Collections.singletonList("test1"));
    	while (true) {
    		ConsumerRecords<String, String> datas = consumer.poll(1000);
    		
    		for(ConsumerRecord<String, String> data:datas) {
    			System.out.println(data.key()+"----"+data.value());
    		}
    		
    	}
    }
    }
    

      注意:代码中的topic 要在linux虚拟机中的存在,若没有则创建。

      

  • 相关阅读:
    selenium---常用元素等待的三种方法
    selenium---浏览器操作方法
    selenium---xpath轴定位
    requests---通过file_data方法请求yaml数据
    pywinauto客户端自动化---模拟键盘操作
    pywinauto客户端自动化---模拟鼠标操作
    开发摆摊网心理路程
    解决MVC提示未能加载文件或程序集“System.Web.Mvc, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35”或它的某一个依赖项。
    ATH9K驱动支持2MHz,2.5Mhz,1Mhz等工作带宽
    javax.validation 参数验证
  • 原文地址:https://www.cnblogs.com/songweideboke/p/9904852.html
Copyright © 2011-2022 走看看