zoukankan      html  css  js  c++  java
  • kafka入门

    基本概念:

    1、什么是kafka?
    Kafka是一个高吞吐量、分布式的发布订阅消息系统。据kafka官方网站介绍,当前的kafka已经定位为一个分布式流式处理平台( a distributed streaming platform),它最初由LinkedIn公司开发,后来成为Apache项目的一部分。
    kafka核心模块使用scala语言开发,支持多语言(如java、c/c++、python、go、erlang、node.js等)客户端,它以可以水平扩展和具有高吞吐量等特性而被广泛使用。

    2、流式处理平台的3大关键特性。
    a、能够允许发布和订阅流数据;
    b、存储流数据时提供相应的容错机制;
    c、当流数据到达时能够被及时处理。

    3.体系结构

    4、kafka的名词
    a、producer:生产者
    b、consumer:消费者
    c、topic:消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)
    d、broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

    e、分区和副本:kafka将一组消息归纳为一个主题,而每个主题又被分为一个或多个分区(partition)。每个分区由一系列有序、不可变的消息组成,是一个有序队列。
    每个分区在物理上对应为一个文件夹,分区命名规则为 主题名+“-”+分区编号,分区编号从0开始,到最大值减一。每个分区又有一到多个副本(replica),分区的副本分布在集群的不同代理上,一提高可用性。

    每个消息(也叫作record记录,也被称为消息)是由一个key,一个value和时间戳构成。

    安装部署:

    1、JDK
    a、最新的kafka要jdk1.7以上,官方推荐jdk1.8
    b、环境变量,JAVA_HOME = D:ProgramFilesJavajdk1.8.0_144
    c、jdk安装路径不能有空格,否则kafka启动报错

    2、zookeeper
    a、下载安装包: http://zookeeper.apache.org/releases.html#download
    下载后不需要安装,直接解压就好,我的jdk安装在D盘,我把zookeeper也解压在那里。
    b、环境变量,ZOOKEEPER_HOME = D:ProgramFileszookeeper-3.3.6
    记得添加到path系统变量下,%ZOOKEEPER_HOME%in;
    c、修改配置文件
    (1)进入目录 D:ProgramFileszookeeper-3.3.6conf;
    (2)将“zoo_sample.cfg”重命名为“zoo.cfg”;
    (3)文本编辑器打开zoo.cfg,找到并编辑 dataDir=D:\ProgramFiles\zookeeper-3.3.6\tmp\zookeeper_logs
    d、在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)

    e、启动zookeeper,两种方式
    (1)打开新的cmd,输入zkServer,运行Zookeeper;
    (2)进入目录D:ProgramFileszookeeper-3.3.6in,执行zkServer.cmd脚本。

    3、kafka
    a、下载安装包:
    http://kafka.apache.org/downloads.html
    下载后不需要安装,直接解压就好,我的jdk安装在D盘,我把kafka也解压在那里。

    b、修改配置文件
    (1)进入Kafka配置目录: D:ProgramFileskafka_2.12-1.1.0config
    (2)文本编辑器打开 server.properties 文件,找到并编辑日志路径
    log.dirs=D:\ProgramFiles\kafka_2.12-1.1.0\tmp\kafka-logs
    (3)找到并编辑zookeeper.connect=localhost:2181。表示本地运行
    (4)Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。

    c、运行
    重要:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行。
    (1)进入kafka安装目录, D:ProgramFileskafka_2.12-1.1.0
    (2)按下Shift键,同时单击鼠标右键,选择“打开命令窗口”选项,打开命令行;
    (3)输入
    .inwindowskafka-server-start.bat .configserver.properties
    然后回车

    上面的Zookeeper和kafka一直打开。
    1、创建主题
    a、进入kafka安装目录, D:ProgramFileskafka_2.12-1.1.0
    b、按下Shift键,同时单击鼠标右键,选择“打开命令窗口”选项,打开命令行;
    c、输入
    .inwindowskafka-topics.bat --create --zookeeper localhost:2181 –repartition-factor 1 --partitions 1 --topic gl_test_topic
    d、已创建的topic,不可以重复创建。

    上面所有的窗口要一直打开。
    2、创建生产者
    a、进入kafka安装目录, D:ProgramFileskafka_2.12-1.1.0
    b、按下Shift键,同时单击鼠标右键,选择“打开命令窗口”选项,打开命令行;
    c、输入
    .inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic gl_test_topic_00

    3、创建消费者
    a、进入kafka安装目录, D:ProgramFileskafka_2.12-1.1.0
    b、按下Shift键,同时单击鼠标右键,选择“打开命令窗口”选项,打开命令行;
    c、输入
    .inwindowskafka-console-consumer.bat --zookeeper localhost:2181 --topic gl_test_topic_00

    一、安装kafka
    1.安装zookeeper
        相关配置
        log_Dir
    2.安装kafka
        相关配置
        log_Dir
    3.启动zookeeper服务
        ./bin/zookeeper-server-start.sh config/zookeeper.properties
    4.开启kafka服务
        .inwindowskafka-server-start.bat .configserver.properties 
        ./bin/kafka-server-start.sh config/server.properties
    5.创建topic
        .inwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink
        ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink
      查看topic:
        ./bin/kafka-topics.sh --list --zookeeper localhost:2181
    创建生产者
        .inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test_flink     
        ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_flink 
    创建消费者
        .inwindowskafka-console-consumer.bat --zookeeper localhost:2181 --topic test_flink     
        ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_flink --from-beginning
    
    二、搭建一个多个broker的集群
        首先为每个节点编写配置文件
        1.    config/server-1.properties:
            broker.id=1
            port=9093
            log.dir=/tmp/kafka-logs-1
        2.    config/server-2.properties:
            broker.id=2
            port=9094
            log.dir=/tmp/kafka-logs-2
        3.启动另外的节点
        ./bin/kafka-server-start.sh config/server-1.properties
        ./bin/kafka-server-start.sh config/server-2.properties
        4.创建一个拥有3个副本的topic
        ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
        ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
        ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
        ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
        
    三、搭建Kafka开发环境   
    1.添加依赖    
        <dependency>
            <groupId> org.apache.kafka</groupId >
            <artifactId> kafka_2.10</artifactId >
            <version> 0.8.0</ version>
        </dependency>
    
        
    2.  
    public interface KafkaProperties {
        final static String ZKCONNECT = "10.22.10.139:2181";
        final static String GROUPID = "group1";
        final static String TOPIC = "topic1";
        final static String KAFKASERVERURL = "10.22.10.139";
        final static int KAFKASERVERPORT = 9092;
        final static int KAFKAPRODUCERBUFFERSIZE = 64 * 1024;
        final static int CONNECTIONTIMEOUT = 20000;
        final static int RECONNECTINTERVAL = 10000;
        final static String TOPIC2 = "topic2";
        final static String TOPIC3 = "topic3";
        final static String CLIENTID = "SimpleConsumerDemoClient";
    }
    
    3.生产者 
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import java.util.Properties;
    
    public class KafkaProducer extends Thread {
        private final Producer<Integer, String> producer;
        private final String topic;
        private final Properties props = new Properties();
    
        public KafkaProducer(String topic) {
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("metadata.broker.list", "10.22.10.139:9092");
            producer = new Producer<Integer, String>(new ProducerConfig(props));
            this.topic = topic;
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            while (true) {
                String messageStr = new String("Message_" + messageNo);
                System.out.println("Send:" + messageStr);
                producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
                messageNo++;
                try {
                    sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }   
        
    4.消费者  
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    public class KafkaConsumer extends Thread {
        private final ConsumerConnector consumer;
        private final String topic;
    
        public KafkaConsumer(String topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                    createConsumerConfig());
            this.topic = topic;
        }
    
        private static ConsumerConfig createConsumerConfig() {
            Properties props = new Properties();
            props.put("zookeeper.connect", KafkaProperties.ZKCONNECT);
            props.put("group.id", KafkaProperties.GROUPID);
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);
        }
    
        @Override
        public void run() {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(1));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]> stream =consumerMap.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("receive:" + new String(it.next().message()));
                try {
                    sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    } 
    View Code
  • 相关阅读:
    Mac 修改hosts
    MTK平台系统稳定性分析
    由MTK平台 mtkfb 设备注册疑问引发的知识延伸--ARM Device Tree
    mtk display 架构
    【6572】关于mtk平台display模块的学习探讨
    如何用Bugzilla系统管理产品研发过中相关需求和bug
    Bugzilla 系统企业应用案例
    MT6753平台一项目不同手机最低亮度存偏差问题分析过程
    嵌入式驱动实习生试题(答案)
    嵌入式驱动实习生试题
  • 原文地址:https://www.cnblogs.com/yin-fei/p/11208317.html
Copyright © 2011-2022 走看看