zoukankan      html  css  js  c++  java
  • Kafka入门

    Kafka

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。(百度百科)

     

    1、启动

    1.1、下载Kafka文件

    下载:kafka_2.12-2.2.0.tgz

    解压:tar -xzf kafka_2.12-2.2.0.tgz –C /usr/local/

    cd /usr/local/kafka_2.12-2.2.0

    Kafka目录:

     

     

    1.2、单例

    启动zookeeper服务:

    修改配置文件:config/zookeeper.properties

    mkdir -p /var/zookeeper

    数据目录

    dataDir=/var/zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties&

     

    启动Kafka服务:

    修改配置文件:config/ server.properties

    mkdir -p /var/kafka/logs

    服务地址

    listeners=PLAINTEXT://192.168.229.13:9092

    日志目录

    log.dirs=/var/kafka/logs

    bin/kafka-server-start.sh config/server.properties&

     

    创建与查看主题:

    bin/kafka-topics.sh --create --bootstrap-server 192.168.229.13:9092 --replication-factor 1 --partitions 1 --topic test

     bin/kafka-topics.sh --list --bootstrap-server 192.168.229.13:9092

     

    启动生产者,发送信息:

    bin/kafka-console-producer.sh --broker-list 192.168.229.13:9092 --topic test

     

     启动消费者,接收信息:

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.229.13:9092 --topic test --from-beginning

     

    1.3、集群

    1.3.1、配置

    序号

    服务编号

    日志目录

    服务地址

    1

    broker.id=0

    og.dirs=/var/kafka/logs

    listeners=PLAINTEXT:// 192.168.229.13:9092

    2

    broker.id=1

    og.dirs=/var/kafka/logs-1

    listeners=PLAINTEXT:// 192.168.229.13:9093

    3

    broker.id=2

    og.dirs=/var/kafka/logs-2

    listeners=PLAINTEXT:// 192.168.229.13:9094

    mkdir -p /var/kafka/logs-1

    mkdir -p /var/kafka/logs-2

    复制修改配置文件:

    cp config/server.properties config/server-1.properties

    cp config/server.properties config/server-2.properties 

    1.3.2、启动

    命令启动:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    bin/kafka-server-start.sh config/server.properties&

    bin/kafka-server-start.sh config/server-1.properties&

    bin/kafka-server-start.sh config/server-2.properties&

    查看服务:

      

    创建与查看主题(3副本,1分区):

    bin/kafka-topics.sh --create --bootstrap-server 192.168.229.13:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic 

    bin/kafka-topics.sh --describe --bootstrap-server 192.168.229.13:9092 --topic my-replicated-topic

     

    说明:

    ①Leader: 0  对应着server.properties配置文件broker.id=0

    ②Replicas:可作为主节点的Kafka服务编号(与配置文件中broker.id一一对应)

    启动生产者,发送信息:

    bin/kafka-console-producer.sh --broker-list 192.168.229.13:9092 --topic my-replicated-topic

     

    启动消费者,接收信息:

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.229.13:9092 --topic my-replicated-topic --from-beginning

     

    1.3.3、集群容错验证:

    杀掉一个服务:

     

    kill -9 3468

    生产者

     

    消费者

     

    查看主题信息

     

    再次启动broker.id=1

    bin/kafka-server-start.sh config/server-1.properties&

    2、Java连接

    Kafka

    package ha;

    import org.junit.runner.RunWith;

    import org.springframework.boot.test.context.SpringBootTest;

    import org.springframework.test.context.junit4.SpringRunner;

    @RunWith(SpringRunner.class)

    @SpringBootTest

    public class BaseTest {}

    2.1、Kafka生产者

    package ha.kafka;

    import java.util.Properties;

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.Producer;

    import org.apache.kafka.clients.producer.ProducerRecord;

    import org.apache.kafka.common.KafkaException;

    import org.apache.kafka.common.errors.AuthorizationException;

    import org.apache.kafka.common.errors.OutOfOrderSequenceException;

    import org.apache.kafka.common.errors.ProducerFencedException;

    import org.apache.kafka.common.serialization.StringSerializer;

    import org.junit.Before;

    import org.junit.Test;

    import ha.BaseTest;

    /**

     * Kafka生产者

     * @author wangym

     *

     */

    public class KafkaProducerTest extends BaseTest {

           Properties props = new Properties();

           @Before

           public void initKafka() {

                  props.put("bootstrap.servers", "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

                  props.put("acks", "all");

                  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

                  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           }

           // 发送信息

           @Test

           public void test1() {

                  Producer<String, String> producer = new KafkaProducer<String, String>(props);

                 

                  long start = System.currentTimeMillis();

                 

                  for (int i = 0; i < 1000000; i++) {

                         producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

                  }

                  producer.close();

                 

                  long end = System.currentTimeMillis();

                  long time = end - start;

                  System.out.println("共计耗时:" + time + "ms");

           }

           @Test

           public void test2() {

                  props.put("transactional.id", "my-transactional-id");

                  Producer<String, String> producer = new KafkaProducer<String, String>(props, new StringSerializer(), new StringSerializer());

                  producer.initTransactions();// 开启事务

                 

                  long start = System.currentTimeMillis();

                 

                  try {

                         producer.beginTransaction();

                         for (int i = 0; i < 1000000; i++) {

                                producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));

                         }

                         producer.commitTransaction();// 提交事务

                  } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {

                         // We can't recover from these exceptions, so our only option is to close the

                         // producer and exit.

                         producer.close();

                  } catch (KafkaException e) {

                         // For all other exceptions, just abort the transaction and try again.

                         producer.abortTransaction();

                  }

                  producer.close();

                 

                  long end = System.currentTimeMillis();

                  long time = end - start;

                  System.out.println("共计耗时:" + time + "ms");

           }

    }

    2.2、Kafka消费者

    package ha.kafka;

    import java.time.Duration;

    import java.util.ArrayList;

    import java.util.Arrays;

    import java.util.Collections;

    import java.util.List;

    import java.util.Properties;

    import org.apache.kafka.clients.consumer.ConsumerRecord;

    import org.apache.kafka.clients.consumer.ConsumerRecords;

    import org.apache.kafka.clients.consumer.KafkaConsumer;

    import org.apache.kafka.clients.consumer.OffsetAndMetadata;

    import org.apache.kafka.common.TopicPartition;

    import org.junit.Before;

    import org.junit.Test;

    import ha.BaseTest;

    /**

     * Kafka消费者

     * @author wangym

     *

     */

    public class KafkaConsumerTest extends BaseTest {

           Properties props = new Properties();

           KafkaConsumer<String, String> consumer = null;

           @Before

           public void initKafka() {

                  props.put("bootstrap.servers", "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

                  props.setProperty("group.id", "test");

                  props.setProperty("enable.auto.commit", "true");

                  props.setProperty("auto.commit.interval.ms", "1000");

                  props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

                  props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

           }

           /**

            *

            * 发送信息 自动提交

            */

           @SuppressWarnings("resource")

           @Test

           public void test1() {

                  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

                  consumer.subscribe(Arrays.asList("my-topic"));

                  while (true) {

                         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                         for (ConsumerRecord<String, String> record : records)

                                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

                  }

           }

           /**

            *

            * 发送信息 手动提交

            */

           @Test

           public void test2() {

                  props.setProperty("enable.auto.commit", "false");

                  consumer = new KafkaConsumer<String, String>(props);

                  consumer.subscribe(Arrays.asList("foo", "bar"));

                  final int minBatchSize = 200;

                  List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();

                  while (true) {

                         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                         for (ConsumerRecord<String, String> record : records) {

                                buffer.add(record);

                         }

                         if (buffer.size() >= minBatchSize) {

                                // insertIntoDb(buffer);

                                consumer.commitSync();// 手动提交

                                buffer.clear();

                         }

                  }

           }

           /**

            *

            * 遍历分区 手动提交

            */

           @Test

           public void test3() {

                  try {

                         consumer = new KafkaConsumer<String, String>(props);

                         while (true) {

                                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));

                                for (TopicPartition partition : records.partitions()) {

                                       List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

                                       for (ConsumerRecord<String, String> record : partitionRecords) {

                                              System.out.println(record.offset() + ":" + record.value());

                                       }

                                       long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                                       consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));

                                }

                         }

                  } finally {

                         consumer.close();

                  }

           }

    }

    2.3、Kafka流

    流的生产者:

    package ha.kafka;

    import java.util.Properties;

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.Producer;

    import org.apache.kafka.clients.producer.ProducerRecord;

    import org.apache.kafka.common.KafkaException;

    import org.apache.kafka.common.errors.AuthorizationException;

    import org.apache.kafka.common.errors.OutOfOrderSequenceException;

    import org.apache.kafka.common.errors.ProducerFencedException;

    import org.apache.kafka.common.serialization.StringSerializer;

    import org.junit.Before;

    import org.junit.Test;

    import ha.BaseTest;

    /**

     * Kafka生产者

     * @author wangym

     *

     */

    public class KafkaStreamsProducerTest extends BaseTest {

           Properties props = new Properties();

           @Before

           public void initKafka() {

                  props.put("bootstrap.servers", "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

                  props.put("acks", "all");

                  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

                  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           }

           /**

            * 流测试-生产者

            * 自动提交

            */

           @Test

           public void streamProducerTest() {

                  Producer<String, String> producer = new KafkaProducer<String, String>(props);

                 

                  long start = System.currentTimeMillis();

                  for (int i = 0; i < 1000000; i++) {

                         producer.send(new ProducerRecord<String, String>("my-stream-input-topic", Integer.toString(i), Integer.toString(i)));

                  }

                  producer.close();

                 

                  long end = System.currentTimeMillis();

                  long time = end - start;

                  System.out.println("共计耗时:" + time + "ms");

           }

    }

    流处理:

    package ha.kafka;

    import java.util.Properties;

    import org.apache.kafka.clients.consumer.KafkaConsumer;

    import org.apache.kafka.common.serialization.Serdes;

    import org.apache.kafka.streams.KafkaStreams;

    import org.apache.kafka.streams.StreamsBuilder;

    import org.apache.kafka.streams.StreamsConfig;

    import org.junit.Before;

    import org.junit.Test;

    import ha.BaseTest;

    /**

     * Kafka流测试

     * @author wangym

     *

     */

    public class KafkaStreamsTest extends BaseTest {

           Properties props = new Properties();

           KafkaConsumer<String, String> consumer = null;

           @Before

           public void initKafka() {

                   props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");

                   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

                   props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                   props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

           }

           /**

            * 流测试

            */

           @SuppressWarnings("resource")

           @Test

           public void test1() {

                   StreamsBuilder builder = new StreamsBuilder();

                   builder.<String, String>stream("my-stream-input-topic")

                   .mapValues(value -> {

                       System.out.println(value);

                       return value;

                   })

                   .to("my-stream-output-topic");

                   KafkaStreams streams = new KafkaStreams(builder.build(), props);

                   streams.start();

                   

                   while(true) {

                          

                   }

           }

    }

    流消费者:

    package ha.kafka;

    import java.time.Duration;

    import java.util.Arrays;

    import java.util.Properties;

    import org.apache.kafka.clients.consumer.ConsumerRecord;

    import org.apache.kafka.clients.consumer.ConsumerRecords;

    import org.apache.kafka.clients.consumer.KafkaConsumer;

    import org.junit.Before;

    import org.junit.Test;

    import ha.BaseTest;

    /**

     * Kafka消费者

     * @author wangym

     *

     */

    public class KafkaStreamsConsumerTest extends BaseTest {

           Properties props = new Properties();

           KafkaConsumer<String, String> consumer = null;

           @Before

           public void initKafka() {

                  props.put("bootstrap.servers", "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

                  props.setProperty("group.id", "test");

                  props.setProperty("enable.auto.commit", "true");

                  props.setProperty("auto.commit.interval.ms", "1000");

                  props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

                  props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

           }

           /**

            * 流测试-消费者

            * 自动提交

            */

           @SuppressWarnings("resource")

           @Test

           public void streamConsumerTest() {

                  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

                  consumer.subscribe(Arrays.asList("my-stream-output-topic"));

                  while (true) {

                         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                         for (ConsumerRecord<String, String> record : records)

                                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

                  }

           }

    }

  • 相关阅读:
    Zookeeper 系列(五)Curator API
    Zookeeper 系列(四)ZKClient API
    Zookeeper 系列(三)Zookeeper API
    Zookeeper 系列(二)安装配制
    [bzoj 2393] Cirno的完美算数教室 (容斥原理+dfs剪枝)
    [Sdoi2013] [bzoj 3198] spring (hash+容斥原理)
    [bzoj 1471] 不相交路径 (容斥原理)
    [bzoj 3701] Olympic Games (莫比乌斯反演)
    [bzoj 2693] jzptab & [bzoj 2154] Crash的数字表格 (莫比乌斯反演)
    [51Nod 1244]
  • 原文地址:https://www.cnblogs.com/wangymd/p/11028555.html
Copyright © 2011-2022 走看看