zoukankan      html  css  js  c++  java
  • kafka环境安装及简单使用(单机版)

    一个分布式发布-订阅消息传递系统

    特点:

        高吞吐量、低延迟

    使用场景(举例):

        日志收集:用kafka收集各种服务产生的log,通过kafka以统一的接口服务的方式开放给各种consumer,如hadoop,hbase等

    下载安装:

        1.下载地址    选择一个版本的kafka进行下载

        2.解压

    tar -zxvf kafka_2.11-0.9.0.1.tgz
    mv kafka_2.11-0.9.0.1 /opt/

        3.配置环境变量(可选步骤)

    上手使用:

        1.config目录配置文件(zookeeper.properties,service.properties,producer.properties,consumer.properties)

      我们暂时先不管这些配置文件,遵守初始的配置

        2.先启动zookeeper - kafka依赖与zookeeper 实现分布式一致性

      我们下载的kafka安装包,就自带了zookeeepr,zookeeper.properties就是自带的zk的配置文件

    nohup bin/zookeeper-server-start.sh config/zookeeepr.properties&       nohup &是实现在后台启动

       

        3.再启动kafka服务

    bin/kafka-server-start.sh config/server.properties

       

        4.创建一个Topic

    bin/kafka-topics.sh --create --topic test1 --zookeeper localehost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

        4.再启动kafka生产端

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1

        5.在新窗口再启动kafka消费端

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

        6.在生产窗口输入任意字符,观察在消费端是否能够收到相应字符

      

    如果无法收到正确字符,或者报错,尝试从以下方面排查:

        1.服务是否都按顺序正常启动

        2.命令中开启的服务端口是否和相应的配置文件中的配置对应

            注:生产端访问的端口不是  zookeeper的localhost:2181, 而是producer.properties中配置的broker的端口,默认为9092

            注:这个broker的端口是需要在 server中有相应的配置才可以

    简单介绍一下上面提到了config目录下面的配置,以及kafka集群的搭建

    server.properties:一个server.properties文件代表了一个kafka服务,也就是一个Broker

    所以说,如果我们想搭建一个kafka集群,需要有不同的 server.properties文件,来启动多个broker,多个borker组成kafka cluster

        注:每个server.properties配置文件中的 broker.id(服务器唯一标识)不能一样

             port(服务器监听端口号)不能一样

             zookeeper.connect(zookeeper的连接ip及端口),需和zookeeper.properties保持一致

    kafka在Java程序的简单示例:

      生产:

    public class JavaKafkaProducer {
        private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
        public static final String TOPIC_NAME = "test1";
        public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
        public static final int chartsLength = charts.length;
    
    
        public static void main(String[] args) {
            String brokerList = "127.0.0.1:9092";
            Properties props = new Properties();
            props.put("metadata.broker.list", brokerList);
            /**
             * 0表示不等待结果返回<br/>
             * 1表示等待至少有一个服务器返回数据接收标识<br/>
             * -1表示必须接收到所有的服务器返回标识,及同步写入<br/>
             * */
            props.put("request.required.acks", "0");
            /**
             * 内部发送数据是异步还是同步
             * sync:同步, 默认
             * async:异步
             */
            props.put("producer.type", "async");
            /**
             * 设置序列化的类
             * 可选:kafka.serializer.StringEncoder
             * 默认:kafka.serializer.DefaultEncoder
             */
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            /**
             * 设置分区类
             * 根据key进行数据分区
             * 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区
             * 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区
             */
            props.put("partitioner.class", "com.kafka.JavaKafkaProducerPartitioner");
    
            // 重试次数
            props.put("message.send.max.retries", "3");
    
            // 异步提交的时候(async),并发提交的记录数
            props.put("batch.num.messages", "200");
    
            // 设置缓冲区大小,默认10KB
            props.put("send.buffer.bytes", "102400");
    
            // 2. 构建Kafka Producer Configuration上下文
            ProducerConfig config = new ProducerConfig(props);
    
            // 3. 构建Producer对象
            final Producer<String, String> producer = new Producer<String, String>(config);
    
            // 4. 发送数据到服务器,并发线程发送
            final AtomicBoolean flag = new AtomicBoolean(true);
            int numThreads = 50;
            ExecutorService pool = Executors.newFixedThreadPool(numThreads);
            for (int i = 0; i < 5; i++) {
                pool.submit(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        while (flag.get()) {
                            // 发送数据
                            KeyedMessage message = generateKeyedMessage();
                            producer.send(message);
                            System.out.println("发送数据:" + message);
    
                            // 休眠一下
                            try {
                                int least = 10;
                                int bound = 100;
                                Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        System.out.println(Thread.currentThread().getName() + " shutdown....");
                    }
                }, "Thread-" + i));
    
            }
    
            // 5. 等待执行完成
            long sleepMillis = 600000;
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            flag.set(false);
    
            // 6. 关闭资源
    
            pool.shutdown();
            try {
                pool.awaitTermination(6, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            } finally {
                producer.close(); // 最后之后调用
            }
        }
    
        /**
         * 产生一个消息
         *
         * @return
         */
        private static KeyedMessage<String, String> generateKeyedMessage() {
            String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
            StringBuilder sb = new StringBuilder();
            int num = ThreadLocalRandom.current().nextInt(1, 5);
            for (int i = 0; i < num; i++) {
                sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
            }
            String message = sb.toString().trim();
            return new KeyedMessage(TOPIC_NAME, key, message);
        }
    
        /**
         * 产生一个给定长度的字符串
         *
         * @param numItems
         * @return
         */
        private static String generateStringMessage(int numItems) {
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < numItems; i++) {
                sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
            }
            return sb.toString();
        }
    }

      消费:

    public class JavaKafkaConsumerHighAPITest {
        public static void main(String[] args) {
            String zookeeper = "127.0.0.1";
            String groupId = "test-consumer-group";
            String topic = "test1";
            int threads = 1;
    
            JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId);
            new Thread(example).start();
    
            // 执行10秒后结束
            int sleepMillis = 600000;
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 关闭
            example.shutdown();
        }
    }

    kafka各组件说明:

        1.Broker -- 每个kafka server称为一个Broker,多个borker组成kafka cluster。

        2.Topic  --  Topic 就是消息类别名,一个topic中通常放置一类消息。每个topic都有一个或者多个订阅者,也就是消息的消费者consumer。

            Producer将消息推送到topic,由订阅该topic的consumer从topic中拉取消息。

            一个Broker上可以创建一个或者多个Topic。同一个topic可以在同一集群下的多个Broker中分布。

        ....

    参考博文:http://www.cnblogs.com/liuming1992/tag/Kafka/

  • 相关阅读:
    2.Android之按钮Button和编辑框EditText学习
    《DSP using MATLAB》Problem 3.8
    《DSP using MATLAB》Problem 3.7
    《DSP using MATLAB》Problem 3.6
    《DSP using MATLAB》Problem 3.5
    《DSP using MATLAB》Problem 3.4
    《DSP using MATLAB》Problem 3.3
    《DSP using MATLAB》Problem 3.2
    《DSP using MATLAB》Problem 3.1
    《DSP using MATLAB》Problem 2.20
  • 原文地址:https://www.cnblogs.com/xuzekun/p/8986540.html
Copyright © 2011-2022 走看看