首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1)
kafka常用的发送消息的方法如下:
Properties props = new Properties();
props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
String str = "test";
producer.send(new KeyedMessage<String, String>("exhibition",str));
但是如果用kafka发送对象的话就需要重写serializer.class中byte[] toBytes方法:
Producer示例:其中MessageBean是自己定义的实体类:
Properties props = new Properties();
props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181");
props.put("serializer.class", "com.performanceTest.BeanSerializer"); // 需要修改
props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<MessageBean, MessageBean> producer = new Producer<MessageBean, MessageBean>(config);
MessageBean str = new MessageBean();
str.setFromJID("2"+i);
str.setToJID("3"+i);
str.setMessage("京"+i);
str.setSendtime(System.currentTimeMillis());
KeyedMessage<MessageBean, MessageBean> data = new KeyedMessage<MessageBean, MessageBean>("exhibition",str);
producer.send(data);
com.performanceTest.BeanSerializer代码:
package com.performanceTest;
import com.performanceTest.BeanUtils;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import com.performanceTest.MessageBean;
public class BeanSerializer implements Encoder<MessageBean>{
public BeanSerializer(VerifiableProperties props) {
}
@Override
public byte[] toBytes(MessageBean mb) {
System.out.println("encoder ---> " + mb);
return BeanUtils.object2Bytes(mb);
}
}
BeanUtils的代码:
public class BeanUtils {
public static Object bytes2Object(byte[] bytes) {
Object obj = null;
ByteArrayInputStream bais = null;
ObjectInputStream ois = null;
try {
bais = new ByteArrayInputStream(bytes);
ois = new ObjectInputStream(bais);
obj = (Object) ois.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
ois.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return obj;
}
public static byte[] object2Bytes(Object obj) {
byte[] bytes = null;
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(obj);
bytes = baos.toByteArray();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return bytes;
}
}
Consumer示例:
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
byte[] bytes = it.next().message();
MessageBean mb = (MessageBean) BeanUtils.bytes2Object(bytes);
...
...
...
}
OK,至此基本的应用kafka传输接受对象的例子完毕,尝试看过高端的代码如SimpleConsumer,基础不够,实在费劲,接着努力吧~~~~