zoukankan      html  css  js  c++  java
  • Kafka中Producer端封装自定义消息

    我们知道KeywordMessage就是被kafka发送和存储的对象。所以只需要模拟出这个就可以发送自定义消息了。

    比如我需要将用户的id,user,age,address和访问ip和访问date记录为一个消息。我就自定义一个消息格式(id-user-age-address-ip-date)。

    我立马想到自己定义个javaBean.写一个UserInfo类伪代码。

    class UserInfo(){

        id;

       user;

       age;

       address;

       ip;

       date;

       toString(){

        return this.getId()+"-"+this.getUser()+"-"+"..."+this.getDate(); 

    }

    }

    你以为这样就可以了吗?当然不行啊!

    还要按照kafka的消息类型进行封装,在这里我们只需要实现Encoder类即可:继续看看代码就好;

    public class KeywordMessage implements kafka.serializer.Encoder<UserInfo>{
         
        public static final Logger LOG=LoggerFactory.getLogger(UserInfo.class);
         
        @Override
        public Message toMessage(Keyword words) {
            LOG.info("start in encoding...");
            return new Message(words.toString().getBytes());
        }
    }
    这样KeywordMessage就是一个可以被kafka发送和存储的对象了。
     
    我们再看看producer,producer数据的推送到broker的,所以发起者还是业务系统,下面的代码就能直接发送一次数据。
    /**配置producer必要的参数*/
    Properties props = new Properties();
    必要的一些配置省略。。。
    /**选择用哪个类来进行序列化,就是我们自定义的消息类*/
    props.put("serializer.class", "org.kafka.message.UserInfo");
    ProducerConfig config=new ProducerConfig(props);
    /**构造测试数据*/
    UserInfo userInfo = new UserInfo();
    userInfo.setId(1);
    userInfo.setUser("xiaoming");
     ...
    List<UserInfo> msg=new ArrayList<UserInfo>();
    msg.add(userInfo);
    /**构造数据发送对象*/
    Producer<String, UserInfo> producer=new Producer<String, UserInfo>(config);      
    ProducerData<String,UserInfo> data=new ProducerData<String, UserInfo>("test", msg);
    producer.send(data);
     
    以上就是自定义封装消息内容。
  • 相关阅读:
    设计模式 -- 桥接模式(Bridge)
    设计模式 -- 单例模式(Singleton)
    设计模式 -- 简单工厂模式
    Nginx服务器的启动控制
    【Vue.js】二、Vue的基础
    【Vue.js】一、Vue介绍和安装使用
    linux常用操作命令
    Redis的学习(一、Redis的一些常用技术)
    Spring的学习(四、Spring事务管理)
    Spring的学习(三、Spring中的AOP)
  • 原文地址:https://www.cnblogs.com/xubiao/p/5361710.html
Copyright © 2011-2022 走看看