zoukankan      html  css  js  c++  java
  • kafka安装

    zookepper安装参考:

    https://yq.aliyun.com/articles/662422

    安装scala

    wget https://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz

    参考:https://www.cnblogs.com/shaosks/p/9242181.html

    安装kafka:

    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz

    下载编译版,不要下载源码版本,否在会报错“Error: Could not find or load main class kafka.Kafka”

    kafka代码参考:https://www.cnblogs.com/xuwujing/p/8371127.html

    1、下载kafka_2.12-2.1.0.tgz

    2、启动zk(cd /usr/local/kafka/kafka_2.12-2.1.0/bin)

          ./zookeeper-server-start.sh  -daemon /usr/local/kafka/kafka_2.12-2.1.0/config/zookeeper.properties

    3、修改Kafka配置,并启动

          log.dirs=/usr/local/kafka/kafka-logs

          zookeeper.connect=localhost:2185

          ./kafka-server-start.sh  -daemon /usr/local/kafka/kafka_2.12-2.1.0/config/server.properties

    4、创建topic

          bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    创建完后可以使用list查看下:

    /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 test

    重新打开两个终端 假设一个终端发送消息 一个终端接收消息,这里:

    producer,指定的Socket(localhost+9092),说明生产者的消息要发往kafka,也即是broker

    consumer, 指定的Socket(localhost+2181),说明消费者的消息来自zookeeper(协调转发)

    在终端9092中,启动为提供者

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic first_topic

    接受:

     /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server  localhost:9092 --topic test --from-beginning 

     

    代码参考:

    https://www.cnblogs.com/xuwujing/p/8371127.html 

    java测试代码,生产端使用kafka命令行,消费端使用java代码,代码如下:

    package cc.gongchang.kafkaDemo;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaConsumerTest implements Runnable {
        private final KafkaConsumer<String, String> consumer;
        private ConsumerRecords<String, String> msgList;
        private final String topic;
        private static final String GROUPID = "groupA";
    
        public KafkaConsumerTest(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", GROUPID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            this.consumer = new KafkaConsumer<String, String>(props);
            this.topic = topicName;
            this.consumer.subscribe(Arrays.asList(topic));
        }
    
        public void run() {
            int messageNo = 1;
            System.out.println("---------开始消费---------");
            try {
                for (;;) {
                    msgList = consumer.poll(10);
                    if(null!=msgList&&msgList.count()>0){
                        for (ConsumerRecord<String, String> record : msgList) {
                            //消费100条就打印 ,但打印的数据不一定是这个规律的
                            if(messageNo%5==0){
                                System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                            }
                            //当消费了1000条就退出
                            if(messageNo%10==0){
                                break;
                            }
                            messageNo++;
                        }
                    }else{
                        Thread.sleep(1000);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
        public static void main(String args[]) {
            KafkaConsumerTest test1 = new KafkaConsumerTest("test");
            Thread thread1 = new Thread(test1);
            thread1.start();
        }
    }

    将代码打包jar包,在IDEA的菜单中选择File–>Project Structure,选择“Artifacts“在出现的图中点击加号

    需要选择jar包默认运行的入口类,需要设置MANIFEST.MF的位置

     

     点击Build–>Build Artifacts,打包成jar,然后上传到服务器上

    代码执行截图

     生产者代码也附上:

    package cc.gongchang.kafkaDemo;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProductTest implements Runnable {
        private final KafkaProducer<String, String> producer;
        private final String topic;
        public KafkaProductTest(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("key.serializer", StringSerializer.class.getName());
            props.put("value.serializer", StringSerializer.class.getName());
            this.producer = new KafkaProducer<String, String>(props);
            this.topic = topicName;
        }
    
        public void run() {
            int messageNo = 1;
            try {
                for(;;) {
                    String messageStr="你好,这是第"+messageNo+"条数据";
                    producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
                    //生产了100条就打印
                    //if(messageNo%100==0){
                        System.out.println("发送的信息:" + messageStr);
                    //}
                    //生产1000条就退出
                    //if(messageNo%1000==0){
                    //    System.out.println("成功发送了"+messageNo+"条");
                    //    break;
                    //}
                   messageNo++;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    
        public static void main(String args[]) {
            KafkaProductTest test = new KafkaProductTest("test");
            Thread thread = new Thread(test);
            thread.start();
        }
    }

     完毕。

  • 相关阅读:
    四套读写方案
    如何保证ArrayList线程安全
    异常总结<经典例题>
    java.移位运算符
    java反射机制
    面试题:return和finally执行
    Spring_通过注解配置 Bean(1)
    Spring_通过 FactoryBean 配置 Bean
    Spring_通过工厂方法配置 Bean
    Spring_管理 Bean 的生命周期
  • 原文地址:https://www.cnblogs.com/zl0372/p/11592036.html
Copyright © 2011-2022 走看看