zoukankan      html  css  js  c++  java
  • kafka集群和zookeeper集群的部署,kafka的java代码示例

    来自:http://doc.okbase.net/QING____/archive/19447.html

    也可参考:

    http://blog.csdn.net/21aspnet/article/details/19325373

    http://blog.csdn.net/unix21/article/details/18990123

    kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

        我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

        其中kafka为0.8V,zookeeper为3.4.5V

    一.Zookeeper集群构建

        我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

        1) zk-0

        调整配置文件:

    clientPort=2181
    server.0=127.0.0.1:2888:3888
    server.1=127.0.0.1:2889:3889
    server.2=127.0.0.1:2890:3890
    ##只需要修改上述配置,其他配置保留默认值

        启动zookeeper

    ./zkServer.sh start

        2) zk-1

        调整配置文件(其他配置和zk-0一只):

    clientPort=2182
    ##只需要修改上述配置,其他配置保留默认值

        启动zookeeper

    ./zkServer.sh start

        3) zk-2

        调整配置文件(其他配置和zk-0一只):

    clientPort=2183
    ##只需要修改上述配置,其他配置保留默认值

        启动zookeeper

    ./zkServer.sh start

      

    二. Kafka集群构建

        因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

        1) kafka-0

        在config目录下修改配置文件为:

    复制代码
    broker.id=0
    port=9092
    num.network.threads=2
    num.io.threads=2
    socket.send.buffer.bytes=1048576
    socket.receive.buffer.bytes=1048576
    socket.request.max.bytes=104857600
    log.dir=./logs
    num.partitions=2
    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
    log.retention.hours=168
    #log.retention.bytes=1073741824
    log.segment.bytes=536870912
    
    log.cleanup.interval.mins=10
    zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
    zookeeper.connection.timeout.ms=1000000
    kafka.metrics.polling.interval.secs=5
    kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
    kafka.csv.metrics.dir=/tmp/kafka_metrics
    kafka.csv.metrics.reporter.enabled=false
    复制代码

        因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

    > cd kafka-0
    > ./sbt update
    > ./sbt package
    > ./sbt assembly-package-dependency 

        其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

    > JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

        因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

        2) kafka-1

    broker.id=1
    port=9093
    ##其他配置和kafka-0保持一致

        然后和kafka-0一样执行打包命令,然后启动此broker.

    > JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

        到目前为止环境已经OK了,那我们就开始展示编程实例吧。

    三.项目准备

        项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。

    复制代码
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.test</groupId>
        <artifactId>test-kafka</artifactId>
        <packaging>jar</packaging>
    
        <name>test-kafka</name>
        <url>http://maven.apache.org</url>
        <version>1.0.0</version>
        <dependencies>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.14</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.8.0</artifactId>
                <version>0.8.0-beta1</version>
                <exclusions>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.8.1</version>
            </dependency>
            <dependency>
                <groupId>com.yammer.metrics</groupId>
                <artifactId>metrics-core</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.3</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>test-kafka-1.0</finalName>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <filtering>true</filtering>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <source>1.5</source>
                        <target>1.5</target>
                        <encoding>gb2312</encoding>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>2.2</version>
                    <configuration>
                        <encoding>gbk</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    复制代码

    四.Producer端代码

        1) producer.properties文件:此文件放在/resources目录下

    复制代码
    #partitioner.class=
    metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
    ##,127.0.0.1:9093
    producer.type=sync
    compression.codec=0
    serializer.class=kafka.serializer.StringEncoder
    ##在producer.type=async时有效
    #batch.num.messages=100
    复制代码

        2) LogProducer.java代码样例

    复制代码
    package com.test.kafka;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    public class LogProducer {
    
        private Producer<String,String> inner;
        public LogProducer() throws Exception{
            Properties properties = new Properties();
            properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
            ProducerConfig config = new ProducerConfig(properties);
            inner = new Producer<String, String>(config);
        }
    
        
        public void send(String topicName,String message) {
            if(topicName == null || message == null){
                return;
            }
            KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
            inner.send(km);
        }
        
        public void send(String topicName,Collection<String> messages) {
            if(topicName == null || messages == null){
                return;
            }
            if(messages.isEmpty()){
                return;
            }
            List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
            for(String entry : messages){
                KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
                kms.add(km);
            }
            inner.send(kms);
        }
        
        public void close(){
            inner.close();
        }
        
        /**
         * @param args
         */
        public static void main(String[] args) {
            LogProducer producer = null;
            try{
                producer = new LogProducer();
                int i=0;
                while(true){
                    producer.send("test-topic", "this is a sample" + i);
                    i++;
                    Thread.sleep(2000);
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                if(producer != null){
                    producer.close();
                }
            }
    
        }
    
    }
    复制代码

    五.Consumer端

         1) consumer.properties:文件位于/resources目录下

    复制代码
    zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
    ##,127.0.0.1:2182,127.0.0.1:2183
    # timeout in ms for connecting to zookeeper
    zookeeper.connectiontimeout.ms=1000000
    #consumer group id
    group.id=test-group
    #consumer timeout
    #consumer.timeout.ms=5000
    复制代码

        2) LogConsumer.java代码样例

    复制代码
    package com.test.kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    public class LogConsumer {
    
        private ConsumerConfig config;
        private String topic;
        private int partitionsNum;
        private MessageExecutor executor;
        private ConsumerConnector connector;
        private ExecutorService threadPool;
        public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
            Properties properties = new Properties();
            properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
            config = new ConsumerConfig(properties);
            this.topic = topic;
            this.partitionsNum = partitionsNum;
            this.executor = executor;
        }
        
        public void start() throws Exception{
            connector = Consumer.createJavaConsumerConnector(config);
            Map<String,Integer> topics = new HashMap<String,Integer>();
            topics.put(topic, partitionsNum);
            Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
            List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
            threadPool = Executors.newFixedThreadPool(partitionsNum);
            for(KafkaStream<byte[], byte[]> partition : partitions){
                threadPool.execute(new MessageRunner(partition));
            } 
        }
    
            
        public void close(){
            try{
                threadPool.shutdownNow();
            }catch(Exception e){
                //
            }finally{
                connector.shutdown();
            }
            
        }
        
        class MessageRunner implements Runnable{
            private KafkaStream<byte[], byte[]> partition;
            
            MessageRunner(KafkaStream<byte[], byte[]> partition) {
                this.partition = partition;
            }
            
            public void run(){
                ConsumerIterator<byte[], byte[]> it = partition.iterator();
                while(it.hasNext()){
                    MessageAndMetadata<byte[],byte[]> item = it.next();
                    System.out.println("partiton:" + item.partition());
                    System.out.println("offset:" + item.offset());
                    executor.execute(new String(item.message()));//UTF-8
                }
            }
        }
        
        interface MessageExecutor {
            
            public void execute(String message);
        }
        
        /**
         * @param args
         */
        public static void main(String[] args) {
            LogConsumer consumer = null;
            try{
                MessageExecutor executor = new MessageExecutor() {
                    
                    public void execute(String message) {
                        System.out.println(message);
                        
                    }
                };
                consumer = new LogConsumer("test-topic", 2, executor);
                consumer.start();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
    //            if(consumer != null){
    //                consumer.close();
    //            }
            }
    
        }
    
    }
    复制代码
  • 相关阅读:
    Nginx反向代理Mysql
    Postgresql数据迁移
    Docker安装及配置
    jstack用法
    Centos7系统添加Windows字体
    Bash美化
    ERROR: new encoding (UTF8) is incompatible xxx
    Python selenium 自动化脚本打包成一个exe文件(转载 原文https://www.jb51.net/article/178430.htm)
    python -m pip install --upgrade pip 失败
    Warning: no saslprep library specified. Passwords will not be sanitized
  • 原文地址:https://www.cnblogs.com/jasenin/p/4314837.html
Copyright © 2011-2022 走看看