转载自:http://blog.csdn.net/ch717828/article/details/50818261
1. 开启Kafka Consumer
首先选择集群的一台机器,打开kafka consumer,接收发送给kafka的消息。我选择的是 10.101.214.71这台机器。
通过以下命令打开 Consumer 。还不了解的可以看 kafka集群环境搭建 http://blog.csdn.net/ch717828/article/details/50748872
- /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-consumer.sh --zookeeper 10.101.214.71:2181,10.101.214.73:2181,10.101.214.74:2181/kafka --from-beginning --topic TestTopic
2. Kafka配置文件修改
因为将要写的 Java代码在本地,而 kafka安装在集群的71,73,74这3台机器上。因此需要对kafka的配置文件做修改,下面以 71的配置为例,73,74的类似。
vim /usr/local/kafka/config/server.properties
主要修改的内容为下面两行
- host.name=10.101.214.71
- advertised.host.name=10.101.214.71
3. Java实现Kafka Producer
使用maven 管理依赖包,pom.xml如下
log4j.properties 配置如下
- log4j.rootLogger=INFO,console
- #for package com.demo.kafka, log would be sent to kafka appender.
- log4j.logger.com.demo.kafka=DEBUG,kafka
- ## appender kafka
- #log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
- #log4j.appender.kafka.topic=my-replicated-topic5
- ## multiple brokers are separated by comma ",".
- #log4j.appender.kafka.brokerList=10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092
- #log4j.appender.kafka.compressionType=none
- #log4j.appender.kafka.syncSend=true
- #log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
- #log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
- # appender console
- log4j.appender.console=org.apache.log4j.ConsoleAppender
- log4j.appender.console.target=System.out
- log4j.appender.console.layout=org.apache.log4j.PatternLayout
- log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
Java实现代码如下,只做Demo使用,因此没有 close掉
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- public class MyProducer {
- private static final String TOPIC = "TestTopic"; //kafka创建的topic
- private static final String CONTENT = "This is a single message"; //要发送的内容
- private static final String BROKER_LIST = "10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092"; //broker的地址和端口
- private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder"; // 序列化类
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("serializer.class", SERIALIZER_CLASS);
- props.put("metadata.broker.list", BROKER_LIST);
- ProducerConfig config = new ProducerConfig(props);
- Producer<String, String> producer = new Producer<String, String>(config);
- //Send one message.
- KeyedMessage<String, String> message =
- new KeyedMessage<String, String>(TOPIC, CONTENT);
- producer.send(message);
- //Send multiple messages.
- List<KeyedMessage<String,String>> messages =
- new ArrayList<KeyedMessage<String, String>>();
- for (int i = 0; i < 5; i++) {
- messages.add(new KeyedMessage<String, String>
- (TOPIC, "Multiple message at a time. " + i));
- }
- producer.send(messages);
- }
- }
此时完成了 java来向kafka产生消息。