我们来安装Kafka的集群模式,三台机器:
192.168.131.128
192.168.131.130
192.168.131.131
Kafka集群需要依赖zookeeper,所以需要先安装好zk。
下载Kafka安装包:
Kafka_2.11-1.1.0.tgz
解压到 /usr/local/下。
进入到Kafka的config目录下:
我们看到有zk的配置文件,这是Kafka自带的zk,如果你没有安装zk,可以使用Kafka集成的zk,配置方式和单独安装是一样的。
我们默认已经安装zk,所以修改server.properties文件,大致的配置项有这些:
broker.id=0 #每个实例不一样
listeners=PLAINTEXT://192.168.131.128:9092 #改为所在主机的ip
advertised.host.name=192.168.131.128 #改为改为所在主机的ip
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/Kafka/log #需手动创建,Kafka并不会根据配置文件自动创建
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.131.128:2181,192.168.131.130:2181,192.168.131.131:2181 #修改为zookeeper所在主机ip:port
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
auto.create.topics.enable=false
需要修改的地方已经标注出来了。
然后我们需要将Kafka同步到另外两台机器上:
scp -r Kafka hadoop@hadoopslaver1:/usr/local
scp -r Kafka hadoop@hadoopslaver2:/usr/local
下面我们准备启动,首先确保zk是启动的,如果没有安装可以使用Kafka的zk:
bin/zookeeper-server-start.sh config/zookeeper.properties &
然后我们启动Kafka:
bin/Kafka-server-start.sh -daemon config/server.properties &
三台机器上都要执行启动操作,如果偶没有报错就是启动成功了。
接下来我们可以做一些测试。
消费端:
import java.util.Arrays;
import java.util.Properties;
import org.apache.Kafka.clients.consumer.Consumer;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.clients.consumer.ConsumerRecord;
import org.apache.Kafka.clients.consumer.ConsumerRecords;
import org.apache.Kafka.clients.consumer.KafkaConsumer;
import org.apache.Kafka.common.serialization.StringDeserializer;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.131.128:9092,192.168.131.130:9092,192.168.131.131:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("page_visits"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
生产者:
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import org.apache.Kafka.clients.producer.Callback;
import org.apache.Kafka.clients.producer.KafkaProducer;
import org.apache.Kafka.clients.producer.Producer;
import org.apache.Kafka.clients.producer.ProducerRecord;
import org.apache.Kafka.clients.producer.RecordMetadata;
public class Producer {
public static void main(String[] args) {
long events = 1;
Random rnd = new Random();
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.131.128:9092,192.168.131.130:9092,192.168.131.131:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.Kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.Kafka.common.serialization.StringSerializer");
//配置partitionner选择策略,可选配置
props.put("partitioner.class", "com.rickiyang.service.Partitioner");
Producer<String, String> producer = new KafkaProducer<>(props);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", ip, msg);
producer.send(data,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}
producer.close();
}
}
自定义分区策略:
import java.util.List;
import java.util.Map;
import org.apache.Kafka.clients.producer.Partitioner;
import org.apache.Kafka.common.Cluster;
import org.apache.Kafka.common.PartitionInfo;
public class Partitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int partition = 0;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
}
return partition;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
我们运行一下:
Producer:
Consumer
客户端可以接受到服务端的消息的。