zoukankan      html  css  js  c++  java
  • Kafka集群模式安装(二)

    我们来安装Kafka的集群模式,三台机器:

    192.168.131.128
    192.168.131.130
    192.168.131.131
    

    Kafka集群需要依赖zookeeper,所以需要先安装好zk。

    下载Kafka安装包:
    Kafka_2.11-1.1.0.tgz

    解压到 /usr/local/下。

    进入到Kafka的config目录下:

    我们看到有zk的配置文件,这是Kafka自带的zk,如果你没有安装zk,可以使用Kafka集成的zk,配置方式和单独安装是一样的。

    我们默认已经安装zk,所以修改server.properties文件,大致的配置项有这些:

    broker.id=0        #每个实例不一样
    listeners=PLAINTEXT://192.168.131.128:9092    #改为所在主机的ip
    advertised.host.name=192.168.131.128     #改为改为所在主机的ip
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/local/Kafka/log                                         #需手动创建,Kafka并不会根据配置文件自动创建
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.131.128:2181,192.168.131.130:2181,192.168.131.131:2181 #修改为zookeeper所在主机ip:port
    zookeeper.connection.timeout.ms=6000
    delete.topic.enable=true
    auto.create.topics.enable=false
    

    需要修改的地方已经标注出来了。

    然后我们需要将Kafka同步到另外两台机器上:

    scp -r Kafka hadoop@hadoopslaver1:/usr/local
    scp -r Kafka hadoop@hadoopslaver2:/usr/local
    

    下面我们准备启动,首先确保zk是启动的,如果没有安装可以使用Kafka的zk:

    bin/zookeeper-server-start.sh config/zookeeper.properties &
    

    然后我们启动Kafka:

    bin/Kafka-server-start.sh -daemon config/server.properties &
    

    三台机器上都要执行启动操作,如果偶没有报错就是启动成功了。

    接下来我们可以做一些测试。

    消费端:

    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.Kafka.clients.consumer.Consumer;
    import org.apache.Kafka.clients.consumer.ConsumerConfig;
    import org.apache.Kafka.clients.consumer.ConsumerRecord;
    import org.apache.Kafka.clients.consumer.ConsumerRecords;
    import org.apache.Kafka.clients.consumer.KafkaConsumer;
    import org.apache.Kafka.common.serialization.StringDeserializer;
    
    public class Consumer {
    
        public static void main(String[] args) {
            Properties props = new Properties();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.131.128:9092,192.168.131.130:9092,192.168.131.131:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("page_visits"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }
    

    生产者:

    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;
    
    import org.apache.Kafka.clients.producer.Callback;
    import org.apache.Kafka.clients.producer.KafkaProducer;
    import org.apache.Kafka.clients.producer.Producer;
    import org.apache.Kafka.clients.producer.ProducerRecord;
    import org.apache.Kafka.clients.producer.RecordMetadata;
    
    public class Producer {
        public static void main(String[] args) {
            long events = 1;
            Random rnd = new Random();
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.131.128:9092,192.168.131.130:9092,192.168.131.131:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.Kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.Kafka.common.serialization.StringSerializer");
            //配置partitionner选择策略,可选配置
            props.put("partitioner.class", "com.rickiyang.service.Partitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String ip = "192.168.2." + rnd.nextInt(255);
                String msg = runtime + ",www.example.com," + ip;
                ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", ip, msg);
                producer.send(data,
                        new Callback() {
                            public void onCompletion(RecordMetadata metadata, Exception e) {
                                if(e != null) {
                                    e.printStackTrace();
                                } else {
                                    System.out.println("The offset of the record we just sent is: " + metadata.offset());
                                }
                            }
                        });
            }
            producer.close();
        }
    }
    

    自定义分区策略:

    import java.util.List;
    import java.util.Map;
    
    import org.apache.Kafka.clients.producer.Partitioner;
    import org.apache.Kafka.common.Cluster;
    import org.apache.Kafka.common.PartitionInfo;
    
    
    public class Partitioner implements Partitioner {
    
        @Override
        public void configure(Map<String, ?> configs) {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes,
                             Object value, byte[] valueBytes, Cluster cluster) {
            int partition = 0;
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            String stringKey = (String) key;
            int offset = stringKey.lastIndexOf('.');
            if (offset > 0) {
                partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
            }
    
            return partition;
        }
    
        @Override
        public void close() {
            // TODO Auto-generated method stub
    
        }
    
    }
    

    我们运行一下:

    Producer:

    Consumer

    客户端可以接受到服务端的消息的。

  • 相关阅读:
    pytorch固定部分参数
    Norm比较
    Pytorch的tensor数据类型
    Batchnorm原理详解
    深入Pytorch微分传参
    Ubuntu server16.04安装配置驱动418.87、cuda10.1、cudnn7.6.4.38、anaconda、pytorch超详细解决
    Matplotlib绘图及动画总结
    Pytorch创建模型的多种方法
    python常用代码
    VS 2017 + OpenCV + Spinnaker SDK(PointGrey) 配置
  • 原文地址:https://www.cnblogs.com/rickiyang/p/11074195.html
Copyright © 2011-2022 走看看