介绍
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
相关特性:
- 高吞吐,低延迟,kafka每秒可以处理几十万条,每个延迟最低只有几毫秒,每个主题可以分多个分区,消费组对分区进行消费操作
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1)个节点失败
- 高并发:支持数千个客户端同时读写
使用场景
- 日志收集:一个公司可以用kafka收集各种服务的log,通过kafka以同一接口开放给各种consumer
- 消息系统:解耦生产者和消费者,缓存消息等
- 用户活动跟踪:记录用户的各种活动
- 运营指标:kafka也经常用来记录监控数据
- 流式处理:spark streaming 和 storm
相关概念
安装
安装jdk(此过程省略)
安装zookeeper
cd /opt/apache-zookeeper-3.5.9-bin/conf
mv zoo_sample.cfg zoo.cfg
zoo.cfg文件内容:
# The number of milliseconds of each tick
#zk服务器的心跳时间
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
#投票选举Leader的初始化时间
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
#数据目录
dataDir=/tmp/zookeeper/data
# the port at which the clients will connect
#对外服务端口
clientPort=2181
#admin服务占用端口(默认8080)
admin.serverPort=2000
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
启动zookeeper:
../bin/zkServer.sh start
cat ../logs/zookeeper-root-server-server.nfs.com.out
zk安装成功。
kafka安装
我这里使用的是kafka_2.13-2.7.0版本
kafka配置文件:
vi /opt/kafka_2.13-2.7.0/config/server.properties
需要关注以下参数:
# broker编号,如果集群中有多个broker,则每个broker的编号需要设置不同的值,默认-1
broker.id=0
#对外提供的服务入口地址,多个以逗号隔开
listeners=PLAINTEXT://192.168.1.51:9092
# 存放消息日志文件
log.dirs=/opt/kafka_2.13-2.7.0/logs
#kafka所需zookeeper的集群地址,如果有多个zk,则以逗号隔开
zookeeper.connect=192.168.1.51:2181
message.max.bytes:服务器接收单个消息的最大大小,默认约等于976.9kb
启动kafka:
cd /opt/kafka_2.13-2.7.0/
bin/kafka-server-start.sh config/server.properties
不过此方法不是后台启动,关闭终端后,kafka也随之关闭。
以下是后台守护进程启动命令:
bin/kafka-server-start.sh -daemon config/server.properties
消息生产与消费
kafka命令测试
1.创建一个主题
bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --create --topic wj --partitions 2 --replication-factor 1
--zookeeper 指定了kafka所连接的zookeeper服务地址
--topic 指定了所要创建主题的名称
--partitions 指定了分区个数
--replication-factor 指定了副本因子
--create 创建主题的动作指令
显示所有主题:
bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --list
查看主题详情:
bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --describe --topic wj
--describe 查看详情动作指令
2.测试发送与接收
启动消费端接收消息:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.51:9092 --topic wj
--bootstrap-server 指定了连接kafka集群的地址
--topic 指定了消费端订阅的主题
输入命令后,发现消费端一直处于监听状态
重新打开一个新终端,生产端发送消息:
bin/kafka-console-producer.sh --broker-list 192.168.1.51:9092 --topic wj
发送消息:
接收到消息:
java代码测试
java发送消息
相关依赖
<properties>
<scala.version>2.11</scala.version>
<slf4j.version>1.7.21</slf4j.version>
<kafka.version>2.0.0</kafka.version>
<gson.version>2.2.4</gson.version>
<protobuff.version>1.5.4</protobuff.version>
<spark.version>2.3.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>${protobuff.version}</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>${protobuff.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
配置文件:kafka_product.properties
#key序列化器
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#重试次数
retries=10
#值序列化器
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#集群地址
bootstrap.servers=192.168.1.51:9092
测试:
public class ProducerFastStart {
private static final String TOPIC = "wj";
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
properties.load(new FileInputStream(ResourceUtils.getFile("classpath:kafka_product.properties")));
System.out.println(properties);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "KAFKA-DEMO", "hello world");
//发送消息
for (int i = 0; i < 5; i++) {
producer.send(record);
}
producer.close();
}
}
java接收消息
配置文件:kafka_consumer.properties
#key序列化器
key.deserializer=org.apache.kafka.common.serialization.StringSerializer
#值序列化器
value.deserializer=org.apache.kafka.common.serialization.StringSerializer
#集群地址
bootstrap.servers=192.168.1.51:9092
group.id=group.demo
消费者代码:
public class ConsumerFastStart {
private static final String TOPIC = "wj";
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
properties.load(new FileInputStream(ResourceUtils.getFile("classpath:kafka_consumer.properties")));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
}
}
}
}
启动消费者代码后,控制台会一直输出日志(可以通过设置日志级别减少输出),我们生产者发送消息,可以看到消费者获取到消息: