zoukankan      html  css  js  c++  java
  • kafka单机安装测试-原创-本机测试过

    安装环境:vmware 安装的centos6.8   安装IP地址:192.168.52.136

    kafka和zookeeper版本:kafka_2.13-2.4.0.tgz  和zookeeper-3.4.5.tar.gz  

    centos安装目录:

    zookeeper 配置和启动就不说了 

    ./zkServer.sh start
    

      

    启动kafka

     /usr/local/kafka/bin/kafka-server-start.sh  /usr/local/kafka/config/server.properties
    

     

    查看实时消息

    ./kafka-console-consumer.sh --bootstrap-server 192.168.52.136:9092 --topic test3 --from-beginning
    

      

    server.properties

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    # see kafka.server.KafkaConfig for additional details and defaults
    
    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=1
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
     
    advertised.listeners=PLAINTEXT://192.168.52.136:9092
    hostname=192.168.52.136
    listeners=PLAINTEXT://192.168.52.136:9092
    
    
    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. It will get the value returned from 
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    #advertised.listeners=PLAINTEXT://192.168.52.36:9092
    #listeners=PLAINTEXT://localhost:9092
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # A comma separated list of directories under which to store log files  不是日志
    log.dirs=/tmp/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    #1G大小
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=192.168.52.136:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=60000
    
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0
    

      

    java测试项目 采用maven 组织方式

    <dependencies>
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>3.8.1</version>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>2.4.0</version>
    		</dependency>
    	<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
       </dependency>
    </dependencies>

    生产消息代码

    public class ProducerDemo {
     
        private final KafkaProducer<String, String> producer;
     
        public final static String TOPIC = "test3";
     
        private ProducerDemo() {
        	
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.52.136:9092");//xxx服务器ip
            props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed"
            props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:)
            props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
            //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
            props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
            props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。
           props.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
            props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
     
            producer = new KafkaProducer<String, String>(props);
        }
     
        public void produce() {
            int messageNo = 1;
            final int COUNT = 5;
     
            while(messageNo < COUNT) {
                String key = String.valueOf(messageNo);
                String data = String.format("hello KafkaProducer message %s from hubo liuyahui ", key);
                
                try {
    				Future f=producer.send(new ProducerRecord<String, String>(TOPIC, data));
    				System.out.println(f.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                messageNo++;
            }
            producer.close();
        }
        public static void main(String[] args) {
             new ProducerDemo().produce();
        }
    }
    

      

    消费消息代码

    public class ConsumrTest {
    	 
        public static void main(String[] args) {
            
            String topicNmae="test3";
            String groupID="test-group";
            Properties props= new Properties();
            props.put("bootstrap.servers","192.168.52.136:9092");
            props.put("group.id",groupID);
            props.put("enable.auto.commit","true");
            props.put("auto.commit.interval.ms","1000");
            props.put("auto.offset.reset","earliest");
            props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
     
            KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList(topicNmae));
            try {
                while (true){
                    ConsumerRecords<String,String> records=consumer.poll(1000);
                    for (ConsumerRecord<String,String> record:records){
                        System.out.printf("offset = %d ,key =%s, value= %s%n" ,record.offset(),record.key(),record.value());
                    }
                }
            }finally {
                consumer.close();
            }
        }
    

      

  • 相关阅读:
    小波变换的引入,通俗易懂
    Leetcode 437. Path Sum III
    Leetcode 113. Path Sum II
    Leetcode 112 Path Sum
    Leetcode 520 Detect Capital
    Leetcode 443 String Compression
    Leetcode 38 Count and Say
    python中的生成器(generator)总结
    python的random模块及加权随机算法的python实现
    leetcode 24. Swap Nodes in Pairs(链表)
  • 原文地址:https://www.cnblogs.com/liuguiqian/p/12523947.html
Copyright © 2011-2022 走看看