kafka简介
高流量的活跃数据是无法确定其大小的,因为他可能随时的变化,比如商家可能促销,节假日打折,突然又冒出一个跳楼价等等。所有的数据可能是数量级的往上递增。 传统日志分析方式都是需要离线,而且操作起来比较复杂,根本无法满足实时的分析。另一方面,现有的消息队列系统只能达到近似实时的分析,因为无法消费大量的持久化在队列系统上的信息。Kafka的目标就是能够成为一个高效的队列平台,无论是处理离线的信息还是在线的信息。
安装伪分布式kafka
cd /usr/local
tar -zxvf kafka_2.10-0.8.2.0.tgz
mv kafka_2.10-0.8.2.0 kafka
cd /usr/local/kafka/
启动Kafka自带的ZooKeeper,后台运行
bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
启动Kafka服务,后台运行
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
创建一个Kafka的主题,连接到本地zk,副本因子1,分区1,主题名是test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看ZooKeeper上Kafka的主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看Kafka的主题详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
创建生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
创建消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
安装完全分布式kafka,在h5 h6 h7节点上
在h5节点上安装Kafka,
要求启动ZooKeeper集群。
cd /usr/local
tar -zxvf kafka_2.10-0.8.2.0.tgz
mv kafka_2.10-0.8.2.0 kafka
cd /usr/local/kafka/
vi config/server.properties
broker.id=36 ##必须是数字
host.name=h6 ##可以是IP、主机名、域名
log.dirs=/usr/local/kafka/logs
scp -rq /usr/local/kafka/ h6:/usr/local
scp -rq /usr/local/kafka/ h7:/usr/local
创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test
bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
查看Kafka的主题详情
bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
zkCli.sh
ls /brokers/topics/test/
使用java代码实现kafka的生产者和消费者
1、生产者
1 package com.mengyao.kafka; 2 3 import java.util.Properties; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 import kafka.serializer.StringEncoder; 9 10 public class KafkaProducerTest extends Thread { 11 12 private String topic; 13 14 public KafkaProducerTest(){ 15 16 } 17 18 public KafkaProducerTest(String topic){ 19 this.topic = topic; 20 } 21 22 private Producer<Integer, String> getProducer(Properties prop) { 23 return new Producer<Integer, String>(new ProducerConfig(prop)); 24 } 25 26 private Properties getProperties() { 27 Properties prop = new Properties(); 28 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181"); 29 prop.put("serializer.class", StringEncoder.class.getName()); 30 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092"); 31 return prop; 32 } 33 34 @Override 35 public void run() { 36 Properties prop = getProperties(); 37 Producer<Integer, String> producer = getProducer(prop); 38 int i = 0; 39 while (true) { 40 producer.send(new KeyedMessage<Integer, String>(topic, "msg:"+i++)); 41 try { 42 Thread.sleep(1000); 43 } catch (InterruptedException e) { 44 e.printStackTrace(); 45 } 46 } 47 } 48 49 public static void main(String[] args) { 50 new KafkaProducerTest("test111").start(); 51 } 52 53 }
2、消费者
1 package com.mengyao.kafka; 2 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Properties; 7 8 import kafka.consumer.Consumer; 9 import kafka.consumer.ConsumerConfig; 10 import kafka.consumer.ConsumerIterator; 11 import kafka.consumer.KafkaStream; 12 import kafka.javaapi.consumer.ConsumerConnector; 13 import kafka.serializer.StringEncoder; 14 15 public class KafkaConsumerTest extends Thread { 16 17 private String topic; 18 19 public KafkaConsumerTest() { 20 21 } 22 23 public KafkaConsumerTest(String topic) { 24 this.topic = topic; 25 } 26 27 private ConsumerConnector getConsumer(Properties prop) { 28 return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop)); 29 } 30 31 private Properties getProperties() { 32 Properties prop = new Properties(); 33 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181"); 34 prop.put("serializer.class", StringEncoder.class.getName()); 35 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092"); 36 prop.put("group.id", "group1"); 37 return prop; 38 } 39 40 @Override 41 public void run() { 42 Properties prop = getProperties(); 43 ConsumerConnector consumer = getConsumer(prop); 44 HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>(); 45 topicCountMap.put(topic, 1); 46 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); 47 KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0); 48 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); 49 while (iterator.hasNext()) { 50 final String msg = new String(iterator.next().message()); 51 System.out.println(msg); 52 } 53 } 54 55 public static void main(String[] args) { 56 new KafkaConsumerTest("test111").start(); 57 } 58 59 }