zoukankan      html  css  js  c++  java
  • kafka配置及基本命令

    kafka:
        分布式消息系统
        p2p + ps = 消费者组
        
    
    
    JMS:
        java message service
        
        p2p:
            peer to peer
            point to point
    
        ps:
            publish && subscribe
    
    
    kafka: scala + java
    ===============
        用于实时流处理
        特点:1、持久化数据
              2、高吞吐量
              3、分布式
              4、多客户端支持
              5、实时性
    
        kafka:broker:代理
               生产者
               消费者
    
        
        kafka_2.11-1.1.0.tgz:
                    // 2.11 ===> scala版本
                    // 1.1.0 ==> kafka版本
    
    
    kafka安装:
    ========================
        1、解压
        2、符号链接
        3、环境变量
            # kafka环境变量
            export KAFKA_HOME=/soft/kafka
            export PATH=$PATH:$KAFKA_HOME/bin
        4、生效环境变量
        5、
    
    
    kafka本地模式:
    =======================
        1、修改配置文件:/soft/kafka/config/server.properties
            修改zookeeper.connect=s102:2181,s103:2181,s104:2181
            修改log.dirs=/home/centos/kafka/logs
            修改listeners=PLAINTEXT://s101:9092
    
        2、启动kafka
            kafka-server-start.sh [-daemon] /soft/kafka/config/server.properties
    
        3、jps查看kafka进程
    
    
        4、关闭kafka
            kafka-server-stop.sh
            
            
    
    
    kafka完全分布式:s102-s104
        
        1、同步kafka安装目录和符号链接
            xsync.sh /soft/kafka
            xsync.sh /soft/kafka_2.11-1.1.0
    
        2、root同步环境变量
            su root
            xsync.sh /etc/profile
            exit
        
        3、分别使s102-s104环境变量生效
            s102> source /etc/profile
            s103> source /etc/profile
            s104> source /etc/profile
    
        4、分别修改s102-s104的kafka配置文件/soft/kafka/config/server.properties
            s102:
                修改broker.id=102
                修改listeners=PLAINTEXT://s102:9092
    
            s103:
                修改broker.id=103
                修改listeners=PLAINTEXT://s103:9092
    
            s104:
                修改broker.id=104
                修改listeners=PLAINTEXT://s104:9092
    
        5、分别启动s102-s104的kafka
            
            s102> kafka-server-start.sh -daemon /soft/kafka/config/server.properties
            s103> kafka-server-start.sh -daemon /soft/kafka/config/server.properties
            s104> kafka-server-start.sh -daemon /soft/kafka/config/server.properties
    
    
        6、编写批量启动kafka
            #!/bin/bash
            if [ $# -ge 1 ] ; then echo param must be 0 ; exit ; fi
            for (( i=102 ; i<=104 ; i++ )) ; do
                tput setaf 2
                echo ================ s$i starting kafka  ================
                tput setaf 9
                ssh s$i "source /etc/profile ; kafka-server-start.sh -daemon /soft/kafka/config/server.properties"
            done
    
        
        7、编写批量关闭kafka
            #!/bin/bash
            if [ $# -ge 1 ] ; then echo param must be 0 ; exit ; fi
            for (( i=102 ; i<=104 ; i++ )) ; do
                tput setaf 2
                echo ================ s$i stoping kafka  ================
                tput setaf 9
                ssh s$i "source /etc/profile ; kafka-server-stop.sh"
            done
    
        8、结合:xkafka.sh
    
            #!/bin/bash
            cmd=$1
            if [ $# -gt 1 ] ; then echo param must be 1 ; exit ; fi
            for (( i=102 ; i<=104 ; i++ )) ; do
                tput setaf 2
                echo ================ s$i $cmd kafka  ================
                tput setaf 9
                case $cmd in 
                start ) ssh s$i "source /etc/profile ; kafka-server-start.sh -daemon /soft/kafka/config/server.properties" ;; 
                stop ) ssh s$i "source /etc/profile ; kafka-server-stop.sh" ;; 
                * ) echo illegal argument ; exit ;;
                esac
            done
    
    
    kafka基本命令:s102-s104操作
    ==============================
        开启kafka服务:kafka-server-start.sh -daemon /soft/kafka/config/server.properties
        关闭kafka    :kafka-server-stop.sh
    
        topic:  kafka生产消费的基本单位
        record:kafka生产消费的基本数据单位
            以K-V形式存在
             
        topic: kafka向指定topic生产数据,向指定topic消费数据
        
        创建topic: kafka-topics.sh --create --topic t1 --zookeeper s102:2181 --partitions 2 --replication-factor 2
                    
        列出topic: kafka-topics.sh --list --zookeeper s102:2181
                    
    
        创建生产者:kafka-console-producer.sh --broker-list s102:9092 --topic t1
                   
        创建消费者:kafka-console-consumer.sh --zookeeper s102:2181 --topic t1  //接着上一次位置消费
                    kafka-console-consumer.sh --zookeeper s102:2181 --topic t1 --from-beginning  //从头开始消费
                    
    
    kafka使用API生产者和消费者:
    ==============================
        1、pom文件
        <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>1.1.0</version>
            </dependency>
    
        2、编写生产者
            public class MyProducer {
    
    
                public static void main(String[] args) throws Exception {
                //初始化java配置文件
                Properties props = new Properties();
                props.put("metadata.broker.list", "s102:9092, s103:9093, s104:9094 ");
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                props.put("request.required.acks", "1");
    
                //将java配置文件封装成kafka配置文件
                ProducerConfig config = new ProducerConfig(props);
                Producer<String, String> producer = new Producer<String, String>(config);
    
                for (int i = 0; i < 100; i++) {
                    String msg = "tom" + i;
                    System.out.println(msg);
    
                    // 创建kafka消息实例
                    KeyedMessage<String, String> data = new KeyedMessage<String, String>("t1", msg);
                    // 发送数据
                    producer.send(data);
                    System.out.println("Message Count - " + i);
                    Thread.sleep(1000);
                }
                // 关闭生产者
                producer.close();
    
                }
    
            }
            
        3、编写消费者
            public class MyConsumer {
    
                public static void main(String[] args) {
    
                Properties props = new Properties();
                props.put("zookeeper.connect", "s102:2181,s103:2181,s104:2181");
                props.put("group.id", "g1");
                props.put("zookeeper.session.timeout.ms", "500");
                props.put("zookeeper.sync.time.ms", "250");
                props.put("auto.commit.interval.ms", "1000");
    
                //初始化consumer配置
                ConsumerConfig config = new ConsumerConfig(props);
    
                //初始化consumer
                ConsumerConnector consumer = kafka.consumer.Consumer
                    .createJavaConsumerConnector(config);
    
                Map<String, Integer> topicMap = new HashMap<String, Integer>();
    
                //指定消费线程数
                topicMap.put("t1", 1);
    
                //创建消息对象
                //Map<topic, List<k, message>>
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
    
                //获取t1主题的数据(k-v)
                List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get("t1");
    
                for (final KafkaStream<byte[], byte[]> stream : streamList) {
                    ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
                    while (consumerIte.hasNext()) {
                    //迭代取出消息数据
                    System.out.println("value: " + new String(consumerIte.next().message()));
                    }
                }
                if (consumer != null) {
                    consumer.shutdown();
                }
                }
            }
  • 相关阅读:
    Python系列:四、Python函数--技术流ken
    Centos7破解密码的两种方法--技术流ken
    Python系列:三、流程控制循环语句--技术流ken
    Python系列:二、数据类型--技术流ken
    Python系列:一、Python概述与环境安装--技术流ken
    zabbix实现百台服务器的自动化监控--技术流ken
    学会这个删库再也不用跑路了~ --技术流ken
    五分钟彻底学会iptables防火墙--技术流ken
    Docker之使用Dockerfile创建定制化镜像(四)--技术流ken
    Docker数据卷Volume实现文件共享、数据迁移备份(三)--技术流ken
  • 原文地址:https://www.cnblogs.com/zyde/p/8946770.html
Copyright © 2011-2022 走看看