zoukankan      html  css  js  c++  java
  • Kafka中Producer端封装自定义消息

    我们知道KeywordMessage就是被kafka发送和存储的对象。所以只需要模拟出这个就可以发送自定义消息了。

    比如我需要将用户的id,user,age,address和访问ip和访问date记录为一个消息。我就自定义一个消息格式(id-user-age-address-ip-date)。

    我立马想到自己定义个javaBean.写一个UserInfo类伪代码。

    class UserInfo(){

        id;

       user;

       age;

       address;

       ip;

       date;

       toString(){

        return this.getId()+"-"+this.getUser()+"-"+"..."+this.getDate(); 

    }

    }

    你以为这样就可以了吗?当然不行啊!

    还要按照kafka的消息类型进行封装,在这里我们只需要实现Encoder类即可:继续看看代码就好;

    public class KeywordMessage implements kafka.serializer.Encoder<UserInfo>{
         
        public static final Logger LOG=LoggerFactory.getLogger(UserInfo.class);
         
        @Override
        public Message toMessage(Keyword words) {
            LOG.info("start in encoding...");
            return new Message(words.toString().getBytes());
        }
    }
    这样KeywordMessage就是一个可以被kafka发送和存储的对象了。
     
    我们再看看producer,producer数据的推送到broker的,所以发起者还是业务系统,下面的代码就能直接发送一次数据。
    /**配置producer必要的参数*/
    Properties props = new Properties();
    必要的一些配置省略。。。
    /**选择用哪个类来进行序列化,就是我们自定义的消息类*/
    props.put("serializer.class", "org.kafka.message.UserInfo");
    ProducerConfig config=new ProducerConfig(props);
    /**构造测试数据*/
    UserInfo userInfo = new UserInfo();
    userInfo.setId(1);
    userInfo.setUser("xiaoming");
     ...
    List<UserInfo> msg=new ArrayList<UserInfo>();
    msg.add(userInfo);
    /**构造数据发送对象*/
    Producer<String, UserInfo> producer=new Producer<String, UserInfo>(config);      
    ProducerData<String,UserInfo> data=new ProducerData<String, UserInfo>("test", msg);
    producer.send(data);
     
    以上就是自定义封装消息内容。
  • 相关阅读:
    jmeter获取上一个接口的返回值作为下一个接口的传入参数
    浅析Android 消息机制
    【性能测试】针对部分接口进行压力测试
    TPS及计算方法
    Python+selenium鼠标、键盘事件
    Python+selenium下拉菜单选项
    Webdriver元素定位3(CSS)
    Webdriver元素定位2(XPath)
    Webdriver元素定位1
    Webdriver测试脚本2(控制浏览器)
  • 原文地址:https://www.cnblogs.com/xubiao/p/5361710.html
Copyright © 2011-2022 走看看