正文
一,简介
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
二,Kafka的角色
Broker : 安装Kafka服务的那台集群就是一个broker(broker的id要全局唯一)
Producer :消息的生产者,负责将数据写入到broker中(push)
Consumer:消息的消费者,负责从kafka中读取数据(pull),老版本的消费者需要依赖zk,新版本的不需要
Topic: 主题,相当于是数据的一个分类,不同topic存放不同的数据
Consumer Group: 消费者组,一个topic可以有多个消费者同时消费,多个消费者如果在一个消费者组中,那么他们不能重复消费数据
三,Kafka的安装
3.1 文件下载和解压
我这里的spark是2.3.3所以需要kafka0.10.2.0版本:点击下载
解压到相应的文件夹:如下图所示
3.2 文件配置
三个必要配置的地方:
broker.id=1 ===> 全局唯一,三台都要配置我这里分别是1,2,3
listeners=PLAINTEXT://hd1:9092 ===> 还有两台hd2,hd3
# 这个目录自己创建,用来保存kafka的数据
log.dirs=/usr/local/hadoop/kafka/data
zookeeper.connect=hd1:2181,hd2:2181,hd3:2181 ===> zookeeper的地址
如下:
3.3 服务启动
./bin/kafka-server-start.sh -daemon /usr/local/hadoop/kafka/kafka_2.10-0.10.2.0/config/server.properties
四,Kafka的常用命令
# 启动 ./bin/kafka-server-start.sh -daemon /usr/local/hadoop/kafka/kafka_2.10-0.10.2.0/config/server.properties # 查看有那些topic ./bin/kafka-topics.sh --list --zookeeper hd1:2181,hd2:2181,hd3:2181 # 创建topic ./bin/kafka-topics.sh --create --zookeeper hd1:2181,hd2:2181,hd3:2181 --replication-factor 3 --partitions 3 --topic test # 生产者数据 ./bin/kafka-console-producer.sh --broker-list hd1:9092,hd2:9092,hd3:9092 --topic test # 消费者消费数据 ./bin/kafka-console-consumer.sh --zookeeper hd1:2181,hd2:2181,hd3:2181 --topic test --from-beginning
五,Kafka的JAVA编程
5.1 Producer编程
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class ProduceDemo { public static void main(String[] args){ Properties props = new Properties();//配置项 props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");//使用新的API指定kafka集群位置 props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); String messageStr = null; for (int i = 1;i<1000;i++){ messageStr = "hello, this is "+i+"th message"; producer.send(new ProducerRecord<String, String>("test","Message",messageStr)); } producer.close(); } }
5.2 Consumer编程
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class ConsumerDemo implements Runnable{ private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUDID = "groupA"; public ConsumerDemo(String topicName){ Properties props = new Properties(); props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092"); props.put("group.id", GROUDID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } public void run(){ int messageNum = 1; try{ for (;;){ msgList = consumer.poll(500); if (msgList!=null && msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList){ if (messageNum % 50 ==0){ System.out.println(messageNum+"=receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } if (messageNum % 1000 == 0) break; messageNum++; } } else{ Thread.sleep(1000); } } } catch (InterruptedException e){ e.printStackTrace(); } finally{ consumer.close(); } } public static void main(String[] args){ ConsumerDemo demo = new ConsumerDemo("test"); Thread thread = new Thread(demo); thread.start(); } }