zoukankan      html  css  js  c++  java
  • Kafka入门教程

    最近准备看一下大数据相关的工具,hadoop, spark,zookeeper,kafka

    hadoop:大数据库处理框架,主要是map和reduce,通过map分解任务成一个个小问题,并行处理,reduce,将并行处理结果整合起来

    spark:也是一个大数据库处理框架,基于RDD,这个RDD在内存中进行操作,速度较快,spark是用scala实现的。scala是一种类似java的语言,编译的字节码可以在jvm上运行。spark同时支持python,java,Scala等,通过这几种语言感觉python确实抽象程度更高,相同问题,python可能几行就解决了,java需要更多代码

    zookeeper:可以在上面创建节点,存放值,同时,watch这些节点,节点改变会通知客户端

    kafka:消息中间件,生产者将消息放到kafka里面,消费者从kafka里面消费消息

    运行环境用的是centos,以前喜欢用ubuntu,但是ubuntu不是很稳定,总是提示内部错误。centos相对稳定一点,公司服务器装的也是centos。

    1. kafka环境

    linux : centos
    
    java:java8
    
    zookeeper:自带的

    2.安装

    2.1 安装jdk(谷歌 or 百度)

    2.2 安装kafka

    下载kafka:https://kafka.apache.org/downloads,这里选择编译好的,.tgz的。这里我下载的版本是

    放到centos的/opt/soft里面,解压,重命名为kafk211,路径/opt/soft/kafka211

    ok,到这里安装完成。其实,就是下载,解压就可以了

    3. 启动kafka

    kafka启动需要先启动zookeeper

    3.1 启动zookeeper,控制台执行

    bin/zookeeper-server-start.sh config/zookeeper.properties& 

    zookeeper启动成功后,可以使用zk客户端连接去检查是否启动成功。或者用ps命令,jps命令等

    3.2 启动kafka

    bin/kafka-server-start.sh config/server.properties &

    同样可以用ps命令或jps命令,查看是否启动成功

    这样kafak已经启动完成

    kafka作为一个消息中间件,把消息分为了不同的topic,生产者把不同的消息放到不同的topic里面,消费者从不同的topic里面取消息

    4. 控制台启动生产者和消费者

    4.1 创建topic test

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    可以通过命令查看创建好的topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181

    4.2 启动生产者,往topic test里面写消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    kafka监听的9092端口,zookeeper监听的是2181端口

    4.3 启动消费者,从topic test里面消费消息

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    ok,到这儿,消费者和生产者已经启动完成

    5. 通过java客户端,写一个生产者和消费者的demo

    需要修改一下kafka的配置文件config/server.properties

    advertised.host.name=192.168.211.129
    advertised.port=9092

    这里改成自己网卡的ip,这里是个坑,不配置这个,会通过

    java.net.InetAddress.getCanonicalHostName()

    获取host.name,造成不能回复生成者消息

    5.1 kafka maven依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.1.0</version>
    </dependency>

    工具类KafkaUtil.java

    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    
    import java.util.Properties;
    
    /**
     * Created by gxf on 2017/6/23.
     */
    public class KafkaUtil {
        private static Producer<String, String> kp;
        private static KafkaConsumer<String, String> kc;
    
        public static Producer<String, String> getProducer() {
            if (kp == null) {
                Properties props = new Properties();
                props.put("bootstrap.servers", "192.168.211.129:9092");
                props.put("acks", "1");
                props.put("retries", 0);
    //            props.put("batch.size", 16384);
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                kp = new KafkaProducer<String, String>(props);
            }
            return kp;
        }
    
        public static KafkaConsumer<String, String> getConsumer() {
            if(kc == null) {
                Properties props = new Properties();
                props.put("bootstrap.servers", "192.168.211.129:9092");
                props.put("group.id", "1");
                props.put("enable.auto.commit", "true");
                props.put("auto.commit.interval.ms", "1000");
                props.put("session.timeout.ms", "30000");
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                kc = new KafkaConsumer<String, String>(props);
            }
            return kc;
        }
    }

    生产者 ProducerTest.java

    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    /**
     * Created by gxf on 2017/6/23.
     */
    public class ProducerTest {
        public static void main(String[] args) throws Exception{
            Producer<String, String> producer = KafkaUtil.getProducer();
            int i = 0;
            while(true) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if (e != null)
                            e.printStackTrace();
                        System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                    }
                });
                i++;
                Thread.sleep(1000);
            }
        }
    }

    每个1秒钟,向kafka发送消息,并注册回调函数,完成消息发送,调用回调函数。kafka控制台消费者也可以消费这里发送的消息

    消费者ConsumerTest.java

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    
    /**
     * Created by gxf on 2017/6/23.
     */
    public class ConsumerTest {
        public static void main(String[] args) throws Exception{
            KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
            consumer.subscribe(Arrays.asList("test"));
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for(ConsumerRecord<String, String> record : records) {
                    System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
                }
            }
        }
    }

    同样,这里的消费者,也可以消费控制台生产者放的消息

    ok至此,kafak安装以及java客户端的使用基本了解了

  • 相关阅读:
    Python——python读取html实战,作业7(python programming)
    Python——python读取html实战,作业7(python programming)
    Python——python读取xml实战,作业6(python programming)
    Python——python读取xml实战,作业6(python programming)
    二分查找(c &amp; c++)
    大型站点技术架构(八)--站点的安全架构
    Android MTP 文件浏览Demo
    HDU2037 事件排序问题
    折腾开源WRT的AC无线路由之路-3
    启动VIP报CRS-1028/CRS-0223致使VIP状态为UNKNOWN故障分析与解决
  • 原文地址:https://www.cnblogs.com/luckygxf/p/7072691.html
Copyright © 2011-2022 走看看