zoukankan      html  css  js  c++  java
  • Kafka(一) 初识

     

    正文

    一,简介

      Kafka是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

    二,Kafka的角色  

      Broker : 安装Kafka服务的那台集群就是一个broker(broker的id要全局唯一)
      Producer :消息的生产者,负责将数据写入到broker中(push)
      Consumer:消息的消费者,负责从kafka中读取数据(pull),老版本的消费者需要依赖zk,新版本的不需要
      Topic: 主题,相当于是数据的一个分类,不同topic存放不同的数据
      Consumer Group: 消费者组,一个topic可以有多个消费者同时消费,多个消费者如果在一个消费者组中,那么他们不能重复消费数据

    三,Kafka的安装

      3.1 文件下载和解压

      我这里的spark是2.3.3所以需要kafka0.10.2.0版本:点击下载

      解压到相应的文件夹:如下图所示

      

      3.2 文件配置

      三个必要配置的地方:

    broker.id=1  ===> 全局唯一,三台都要配置我这里分别是1,2,3
    listeners=PLAINTEXT://hd1:9092   ===> 还有两台hd2,hd3
    # 这个目录自己创建,用来保存kafka的数据
    log.dirs=/usr/local/hadoop/kafka/data  
    zookeeper.connect=hd1:2181,hd2:2181,hd3:2181 ===> zookeeper的地址

      如下:

      

      3.3 服务启动

    ./bin/kafka-server-start.sh -daemon /usr/local/hadoop/kafka/kafka_2.10-0.10.2.0/config/server.properties

    四,Kafka的常用命令

    # 启动
    ./bin/kafka-server-start.sh -daemon /usr/local/hadoop/kafka/kafka_2.10-0.10.2.0/config/server.properties
    
    # 查看有那些topic
    ./bin/kafka-topics.sh --list --zookeeper hd1:2181,hd2:2181,hd3:2181
    
    # 创建topic
    ./bin/kafka-topics.sh --create --zookeeper hd1:2181,hd2:2181,hd3:2181 --replication-factor 3 --partitions 3 --topic test
    
    # 生产者数据
    ./bin/kafka-console-producer.sh --broker-list hd1:9092,hd2:9092,hd3:9092 --topic test
    
    # 消费者消费数据
    ./bin/kafka-console-consumer.sh --zookeeper hd1:2181,hd2:2181,hd3:2181 --topic test --from-beginning

    五,Kafka的JAVA编程

      5.1 Producer编程

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class ProduceDemo {
    
        public static void main(String[] args){
            Properties props = new Properties();//配置项
            props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");//使用新的API指定kafka集群位置
            props.put("key.serializer", StringSerializer.class.getName());
            props.put("value.serializer", StringSerializer.class.getName());
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
            String messageStr = null;
            for (int i = 1;i<1000;i++){
                messageStr = "hello, this is "+i+"th message";
                producer.send(new ProducerRecord<String, String>("test","Message",messageStr));
            }
            producer.close();
        }
    }

      5.2 Consumer编程

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class ConsumerDemo implements Runnable{
        private final KafkaConsumer<String, String> consumer;
        private ConsumerRecords<String, String> msgList;
        private final String topic;
        private static final String GROUDID = "groupA";
    
        public ConsumerDemo(String topicName){
            Properties props = new Properties();
            props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");
            props.put("group.id", GROUDID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            this.consumer = new KafkaConsumer<String, String>(props);
            this.topic = topicName;
            this.consumer.subscribe(Arrays.asList(topic));
        }
    
        public void run(){
            int messageNum = 1;
            try{
                for (;;){
                    msgList = consumer.poll(500);
                    if (msgList!=null && msgList.count()>0){
                        for (ConsumerRecord<String, String> record : msgList){
                            if (messageNum % 50 ==0){
                                System.out.println(messageNum+"=receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                            }
                            if (messageNum % 1000 == 0)
                                break;
                            messageNum++;
                        }
                    }
                    else{
                        Thread.sleep(1000);
                    }
                }
            }
            catch (InterruptedException e){
                e.printStackTrace();
            }
            finally{
                consumer.close();
            }
        }
    
        public static void main(String[] args){
            ConsumerDemo demo = new ConsumerDemo("test");
            Thread thread = new Thread(demo);
            thread.start();
        }
    }
  • 相关阅读:
    XHTML学习笔记 Part3:核心属性
    XHTML学习笔记 Part2:核心元素
    XHTML学习笔记 part1
    北航非全日制-软件学院考研攻略(经验仅来自于2019年,2020年招生简章有变动,需谨慎)
    为什么能抓到网站https传输的明文密码?------顺便说说“知乎”和“支付宝”的安全性对比
    JetBrain系列学生免费授权
    印象笔记模板推荐使用
    测试用例评审总结与规范
    Django入门
    Django在根据models生成数据库表时报 __init__() missing 1 required positional argument: 'on_delete'
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/11005514.html
Copyright © 2011-2022 走看看