zoukankan      html  css  js  c++  java
  • kafka环境安装及简单使用(单机版)

    一个分布式发布-订阅消息传递系统

    特点:

        高吞吐量、低延迟

    使用场景(举例):

        日志收集:用kafka收集各种服务产生的log,通过kafka以统一的接口服务的方式开放给各种consumer,如hadoop,hbase等

    下载安装:

        1.下载地址    选择一个版本的kafka进行下载

        2.解压

    tar -zxvf kafka_2.11-0.9.0.1.tgz
    mv kafka_2.11-0.9.0.1 /opt/

        3.配置环境变量(可选步骤)

    上手使用:

        1.config目录配置文件(zookeeper.properties,service.properties,producer.properties,consumer.properties)

      我们暂时先不管这些配置文件,遵守初始的配置

        2.先启动zookeeper - kafka依赖与zookeeper 实现分布式一致性

      我们下载的kafka安装包,就自带了zookeeepr,zookeeper.properties就是自带的zk的配置文件

    nohup bin/zookeeper-server-start.sh config/zookeeepr.properties&       nohup &是实现在后台启动

       

        3.再启动kafka服务

    bin/kafka-server-start.sh config/server.properties

       

        4.创建一个Topic

    bin/kafka-topics.sh --create --topic test1 --zookeeper localehost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

        4.再启动kafka生产端

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1

        5.在新窗口再启动kafka消费端

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

        6.在生产窗口输入任意字符,观察在消费端是否能够收到相应字符

      

    如果无法收到正确字符,或者报错,尝试从以下方面排查:

        1.服务是否都按顺序正常启动

        2.命令中开启的服务端口是否和相应的配置文件中的配置对应

            注:生产端访问的端口不是  zookeeper的localhost:2181, 而是producer.properties中配置的broker的端口,默认为9092

            注:这个broker的端口是需要在 server中有相应的配置才可以

    简单介绍一下上面提到了config目录下面的配置,以及kafka集群的搭建

    server.properties:一个server.properties文件代表了一个kafka服务,也就是一个Broker

    所以说,如果我们想搭建一个kafka集群,需要有不同的 server.properties文件,来启动多个broker,多个borker组成kafka cluster

        注:每个server.properties配置文件中的 broker.id(服务器唯一标识)不能一样

             port(服务器监听端口号)不能一样

             zookeeper.connect(zookeeper的连接ip及端口),需和zookeeper.properties保持一致

    kafka在Java程序的简单示例:

      生产:

    public class JavaKafkaProducer {
        private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
        public static final String TOPIC_NAME = "test1";
        public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
        public static final int chartsLength = charts.length;
    
    
        public static void main(String[] args) {
            String brokerList = "127.0.0.1:9092";
            Properties props = new Properties();
            props.put("metadata.broker.list", brokerList);
            /**
             * 0表示不等待结果返回<br/>
             * 1表示等待至少有一个服务器返回数据接收标识<br/>
             * -1表示必须接收到所有的服务器返回标识,及同步写入<br/>
             * */
            props.put("request.required.acks", "0");
            /**
             * 内部发送数据是异步还是同步
             * sync:同步, 默认
             * async:异步
             */
            props.put("producer.type", "async");
            /**
             * 设置序列化的类
             * 可选:kafka.serializer.StringEncoder
             * 默认:kafka.serializer.DefaultEncoder
             */
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            /**
             * 设置分区类
             * 根据key进行数据分区
             * 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区
             * 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区
             */
            props.put("partitioner.class", "com.kafka.JavaKafkaProducerPartitioner");
    
            // 重试次数
            props.put("message.send.max.retries", "3");
    
            // 异步提交的时候(async),并发提交的记录数
            props.put("batch.num.messages", "200");
    
            // 设置缓冲区大小,默认10KB
            props.put("send.buffer.bytes", "102400");
    
            // 2. 构建Kafka Producer Configuration上下文
            ProducerConfig config = new ProducerConfig(props);
    
            // 3. 构建Producer对象
            final Producer<String, String> producer = new Producer<String, String>(config);
    
            // 4. 发送数据到服务器,并发线程发送
            final AtomicBoolean flag = new AtomicBoolean(true);
            int numThreads = 50;
            ExecutorService pool = Executors.newFixedThreadPool(numThreads);
            for (int i = 0; i < 5; i++) {
                pool.submit(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        while (flag.get()) {
                            // 发送数据
                            KeyedMessage message = generateKeyedMessage();
                            producer.send(message);
                            System.out.println("发送数据:" + message);
    
                            // 休眠一下
                            try {
                                int least = 10;
                                int bound = 100;
                                Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        System.out.println(Thread.currentThread().getName() + " shutdown....");
                    }
                }, "Thread-" + i));
    
            }
    
            // 5. 等待执行完成
            long sleepMillis = 600000;
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            flag.set(false);
    
            // 6. 关闭资源
    
            pool.shutdown();
            try {
                pool.awaitTermination(6, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            } finally {
                producer.close(); // 最后之后调用
            }
        }
    
        /**
         * 产生一个消息
         *
         * @return
         */
        private static KeyedMessage<String, String> generateKeyedMessage() {
            String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
            StringBuilder sb = new StringBuilder();
            int num = ThreadLocalRandom.current().nextInt(1, 5);
            for (int i = 0; i < num; i++) {
                sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
            }
            String message = sb.toString().trim();
            return new KeyedMessage(TOPIC_NAME, key, message);
        }
    
        /**
         * 产生一个给定长度的字符串
         *
         * @param numItems
         * @return
         */
        private static String generateStringMessage(int numItems) {
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < numItems; i++) {
                sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
            }
            return sb.toString();
        }
    }

      消费:

    public class JavaKafkaConsumerHighAPITest {
        public static void main(String[] args) {
            String zookeeper = "127.0.0.1";
            String groupId = "test-consumer-group";
            String topic = "test1";
            int threads = 1;
    
            JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId);
            new Thread(example).start();
    
            // 执行10秒后结束
            int sleepMillis = 600000;
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 关闭
            example.shutdown();
        }
    }

    kafka各组件说明:

        1.Broker -- 每个kafka server称为一个Broker,多个borker组成kafka cluster。

        2.Topic  --  Topic 就是消息类别名,一个topic中通常放置一类消息。每个topic都有一个或者多个订阅者,也就是消息的消费者consumer。

            Producer将消息推送到topic,由订阅该topic的consumer从topic中拉取消息。

            一个Broker上可以创建一个或者多个Topic。同一个topic可以在同一集群下的多个Broker中分布。

        ....

    参考博文:http://www.cnblogs.com/liuming1992/tag/Kafka/

  • 相关阅读:
    docker下用keepalived+Haproxy实现高可用负载均衡集群
    Centos 7 搭建蓝鲸V4.1.16社区版
    IDEA中Thrift插件配置
    idea打包java可执行jar包
    CentOS7 docker.repo 用阿里云Docker Yum源
    linux 配置maven环境变量
    Linux中修改docker镜像源及安装docker
    Spring-boot和Spring-Cloud遇到的问题
    IntelliJ Idea 常用快捷键列表
    Invocation of destroy method failed on bean with name ‘XXXX’
  • 原文地址:https://www.cnblogs.com/xuzekun/p/8986540.html
Copyright © 2011-2022 走看看