zoukankan      html  css  js  c++  java
  • 51-kafka-安装及常用的命令

    kafka的安装非常简单, 只需要配置几个必须的参数

    首先, 必须要有zookeeper 集群正常启动

    1, conf/server.properties配置

    broker.id=0  # 第几个broker就写几, 从0开始
    port=9092
    num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs     # 数据存放路径 num.partitions=2       # 分区数量 log.retention.hours=168 log.segment.bytes=536870912   # 设置 segment的大小 log.retention.check.interval.ms=60000 log.cleaner.enable=false
    log.reention.hours=    #消息保留时间, 默认168h, 一周
    zookeeper.connect=node1:2181,node2:2181,node3:2181 zookeeper.connection.timeout.ms=1000000

    2, 创建启动脚本./bin/startkfaka.sh, 并赋值启动权限

    nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
                                  #       文件存储目录 1: 标准输出, 2: 标准错误, &1: 取值  后台运行
    chmod +x startkafka.sh

    3, 分发

     注意更改每个从机的broker

    4, 启动

    bash startkafka.sh

    常用的kafka命令

    1, kafka-topics.sh

    1), 查看topic的列表

    kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list

    2), 查看topic的详细, 可看prtition的具体分配

    kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe
    [wenbronk@dock bin]$ ./kafka-topics.sh --zookeeper dock:2181 --describe
    Topic:20180311    PartitionCount:3    ReplicationFactor:2    Configs:
    Topic: 20180311    Partition: 0    Leader: 0    Replicas: 0,1    Isr: 0,1
    Topic: 20180311    Partition: 1    Leader: 1    Replicas: 1,2    Isr: 1,2
    Topic: 20180311    Partition: 2    Leader: 2    Replicas: 2,0    Isr: 2,0

    参数  

      partition: 分区

      leader: 一个主分区负责读写

      replicas: 副本

      Isr: 

    3), 创建topic

    kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create  --topic 20170926 --partitions 3 --replication-factor 2

    参数解释:

      replication-factor 副本数量,

      partition 分区数

    创建成功以后, 可用 describe进行查看

    replication-factor的数量不可超过broker的数量

    如果在配置kafka时有指定zookeeper的路径, 那么创建等操作时也需要指定路径

    /opt/install/kafka_2.13-2.4.1/bin/kafka-topics.sh --zookeeper 10.144.91.9:2181,10.144.91.10:2181,10.144.91.11:2181/cdn_kafka --create  --topic test1 --partitions 3 --replication-factor 2

    4), 删除topic, 并不会真正的删除, 而是更加一个删除位

    kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --delete --topic 20170926

    可通过 --list 查看, 添加了 -marked for deletion

    2, kafka-consumer-producer.sh

    1),  监听一个topic

    ./kafka-console-consumer.sh --bootstrap-server 10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093 --topic letv_env

    通过 --from-beginning 从头开始消费

    3, kafka-console-producer.sh

    2), 启动另一个, 传送消息

    ./kafka-console-producer.sh --broker-list 10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093 --topic letv_qy

    更多关于kafka的原理及消息存储, 见美团团队: https://tech.meituan.com/kafka-fs-design-theory.html

    3, java操作kafka

    老版本的需要依赖 zk, 新版本的不需要依赖zk, 只需要bootstrap-server就可以了

    链接时, 只要有几个种子节点就可以发现整个集群

    1), provider

    import java.util.Properties; 
       
    import kafka.javaapi.producer.Producer; 
    import kafka.producer.KeyedMessage; 
    import kafka.producer.ProducerConfig; 
       
    public class MyProducer {   
         
            public static void main(String[] args) {   
                Properties props = new Properties();   
                props.setProperty("metadata.broker.list","localhost:9092");   
                props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
                props.put("request.required.acks","1");   
                ProducerConfig config = new ProducerConfig(props);   
                //创建生产这对象
                Producer<String, String> producer = new Producer<String, String>(config);
                //生成消息
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");
                try {   
                    int i =1; 
                    while(i < 100){    
                        //发送消息
                        producer.send(data);   
                    } 
                } catch (Exception e) {   
                    e.printStackTrace();   
                }   
                producer.close();   
            }   
    }

    2) cosumer

    import java.util.HashMap; 
    import java.util.List;   
    import java.util.Map;   
    import java.util.Properties;   
         
    import kafka.consumer.ConsumerConfig;   
    import kafka.consumer.ConsumerIterator;   
    import kafka.consumer.KafkaStream;   
    import kafka.javaapi.consumer.ConsumerConnector;  
       
    public class MyConsumer extends Thread{ 
            //消费者连接
            private final ConsumerConnector consumer;   
            //要消费的话题
            private final String topic;   
         
            public MyConsumer(String topic) {   
                consumer =kafka.consumer.Consumer   
                        .createJavaConsumerConnector(createConsumerConfig());   
                this.topic =topic;   
            }   
         
        //配置相关信息
        private static ConsumerConfig createConsumerConfig() {   
            Properties props = new Properties();   
    //        props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
            //配置要连接的zookeeper地址与端口
            //The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster.
            //Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Group
            props.put("zookeeper.connect","localhost:2181");
            
            //配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.)
            props.put("group.id", "0");
            
            //配置zookeeper连接超时间隔
            //The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for 
            //ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.
            props.put("zookeeper.session.timeout.ms","10000"); 
     
            //The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.
            props.put("zookeeper.sync.time.ms", "200");
    
            //The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. 
            //Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);   
        }   
         
        public void run(){ 
            
            Map<String,Integer> topickMap = new HashMap<String, Integer>();   
            topickMap.put(topic, 1);   
            Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   
            
            KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   
            ConsumerIterator<byte[],byte[]> it =stream.iterator();   
            System.out.println("*********Results********");   
            while(true){   
                if(it.hasNext()){ 
                    //打印得到的消息   
                    System.err.println(Thread.currentThread()+" get data:" +new String(it.next().message()));   
                } 
                try {   
                    Thread.sleep(1000);   
                } catch (InterruptedException e) {   
                    e.printStackTrace();   
                }   
            }   
        }  
        
        
        public static void main(String[] args) {   
            MyConsumer consumerThread = new MyConsumer("mykafka");   
            consumerThread.start();   
        }   
    }

    3), 调用

    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    public class Consumer implements Runnable {
        
        private KafkaStream stream;
        private int threadNumber;
     
        public Consumer(KafkaStream a_stream, int a_threadNumber) {
            threadNumber = a_threadNumber;
            stream = a_stream;
        }
     
        public void run() {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext())
                System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
            System.out.println("Shutting down Thread: " + threadNumber);
        }
    }

    系列来自尚学堂

  • 相关阅读:
    SQL Server IF Exists 判断数据库对象是否存在的用法
    C# RDLC报表不出现预览窗体直接输出到打印机
    C# 未安装Office环境下使用NPOI导出Excel文件
    C# 键盘中的按键对应KeyValue
    微信小程序下可以使用的MD5以及AES加密(通用)
    SQL Server 根据树状结构表生成以/号分割的路由字符串
    C# Winform下一个热插拔的MIS/MRP/ERP框架16(窗体基类场景2)
    WEB H5 JS QRCode二维码快速自动生成
    C# Winform 小技巧(Datagridview某一列按状态显示不同图片)
    获取请求地址的IP地址
  • 原文地址:https://www.cnblogs.com/wenbronk/p/7599578.html
Copyright © 2011-2022 走看看