zoukankan      html  css  js  c++  java
  • 【原创】Kafka接受发送消息对象Object基础版

    首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1)

    kafka常用的发送消息的方法如下:

    Properties props = new Properties();
    props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181");  
    props.put("serializer.class", "kafka.serializer.StringEncoder");       
    props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092");
    ProducerConfig config = new ProducerConfig(props);
    Producer<String, String> producer = new Producer<String, String>(config);  
    String str = "test";
    producer.send(new KeyedMessage<String, String>("exhibition",str));

    但是如果用kafka发送对象的话就需要重写serializer.class中byte[] toBytes方法:

    Producer示例:其中MessageBean是自己定义的实体类:

    Properties props = new Properties();
    props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181");  
    props.put("serializer.class", "com.performanceTest.BeanSerializer"); // 需要修改
    props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092");
    ProducerConfig config = new ProducerConfig(props);
    Producer<MessageBean, MessageBean> producer = new Producer<MessageBean, MessageBean>(config);  
    MessageBean str =  new MessageBean();
    	str.setFromJID("2"+i);
    	str.setToJID("3"+i);
    	str.setMessage("京"+i);
    	str.setSendtime(System.currentTimeMillis());
    KeyedMessage<MessageBean, MessageBean> data = new KeyedMessage<MessageBean, MessageBean>("exhibition",str);
    	producer.send(data);

    com.performanceTest.BeanSerializer代码:

    package com.performanceTest;
    import com.performanceTest.BeanUtils;
    import kafka.serializer.Encoder;
    import kafka.utils.VerifiableProperties;
    import kafka.serializer.Encoder;
    import kafka.utils.VerifiableProperties;
    import com.performanceTest.MessageBean;
    public class BeanSerializer implements Encoder<MessageBean>{
    
    	 public BeanSerializer(VerifiableProperties props) {
    		 
    	 }
    
    	@Override
    	public byte[] toBytes(MessageBean mb) {
    		System.out.println("encoder ---> " + mb);
    		return BeanUtils.object2Bytes(mb);
    	}
    	
    }

    BeanUtils的代码:

    public class BeanUtils {
    	public static Object bytes2Object(byte[] bytes) {
    		Object obj = null;
    		ByteArrayInputStream bais = null;
    		ObjectInputStream ois = null;
    		try {
    			bais = new ByteArrayInputStream(bytes);
    			ois = new ObjectInputStream(bais);
    			obj = (Object) ois.readObject();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			try {
    				ois.close();
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		
    		return obj;
    	}
    
    	public static byte[] object2Bytes(Object obj) {
    		byte[] bytes = null;
    		ByteArrayOutputStream baos = null;
    		ObjectOutputStream oos = null;
    		try {
    			baos = new ByteArrayOutputStream();
    			oos = new ObjectOutputStream(baos);
    			oos.writeObject(obj);
    			bytes = baos.toByteArray();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			try {
    				oos.close();
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		
    		return bytes;
    	}
    }

    Consumer示例:

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
    				.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
    		byte[] bytes = it.next().message();
    		MessageBean mb = (MessageBean) BeanUtils.bytes2Object(bytes);
                    ...
                    ... 
                    ...
    }
    

    OK,至此基本的应用kafka传输接受对象的例子完毕,尝试看过高端的代码如SimpleConsumer,基础不够,实在费劲,接着努力吧~~~~

    PS:转载请注明出处

  • 相关阅读:
    Java 读取大容量excel
    Linux 安装mysql
    Linux 配置nginx
    java 微信H5支付
    微信公众号授权登录两种方式
    Jsoup 获取页面返回的table中的内容
    Python Model执行迁移数据库失败
    java上传txt文件,出现中文乱码
    在Window环境下,使用Django shell 命令查询数据库
    Java模拟form表单提交普通参数和文件
  • 原文地址:https://www.cnblogs.com/nanxin521/p/4514971.html
Copyright © 2011-2022 走看看