zoukankan      html  css  js  c++  java
  • Vertx使用EventBus发送接受自定义对象

    先看官方文档步骤:

    需要一个编解码器,看源码:

    可见内置了需要数据类型的实现,所以发送其他消息可以发送,但是如果发送自定义对象就需要自己实现编解码逻辑了

    一 自定义编解码器

    /**
     * 自定义对象编解码器,两个类型可用于消息转换,即发送对象转换为接受需要的对象
     */
    public class CustomizeMessageCodec implements MessageCodec<OrderMessage, OrderMessage> {
        /**
         * 将消息实体封装到Buffer用于传输
         * 实现方式:使用对象流从对象中获取Byte数组然后追加到Buffer
         */
        @Override
        public void encodeToWire(Buffer buffer, OrderMessage orderMessage) {
            final ByteArrayOutputStream b = new ByteArrayOutputStream();
            try (ObjectOutputStream o = new ObjectOutputStream(b)){
                o.writeObject(orderMessage);
                o.close();
                buffer.appendBytes(b.toByteArray());
            } catch (IOException e) { e.printStackTrace(); }
        }
        //从Buffer中获取消息对象
        @Override
        public OrderMessage decodeFromWire(int pos, Buffer buffer) {
            final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());
            OrderMessage msg = null;
            try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject();
            } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }
            return msg;
        }
        //消息转换
        @Override
        public OrderMessage transform(OrderMessage orderMessage) {
            System.out.println("消息转换---");//可对接受消息进行转换,比如转换成另一个对象等
            orderMessage.setName("姚振");
            return orderMessage;
        }
        @Override
        public String name() { return "myCodec"; }
        //识别是否是用户自定义编解码器,通常为-1
        @Override
        public byte systemCodecID() { return -1; }
        public static MessageCodec create() {
            return new CustomizeMessageCodec();
        }
    }

     这里有一个点要注意,nam方法是必须的,且发送的时候一定要指明name

    二 发送消息编写

    public class ProducerVerticle extends AbstractVerticle {
        @Override
        public void start() throws Exception {
            EventBus eventBus = vertx.eventBus();
            //发布消息(群发)
            eventBus.publish("com.hou", "群发祝福!");
            //发送消息(单发),只会发送注册此地址的一个,采用不严格的轮询算法选择
            DeliveryOptions options = new DeliveryOptions();//设置消息头等
            options.addHeader("some-header", "some-value");
            eventBus.send("com.hou", "单发消息",options,ar->{
                if(ar.succeeded()) System.out.println("收到消费者确认信息:"+ar.result().body());
            });
            //发送自定义对象,需要编解码器
            eventBus.registerCodec(CustomizeMessageCodec.create());//注册编码器
            DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必须指定名字
            OrderMessage orderMessage = new OrderMessage();
            orderMessage.setName("侯征");
            eventBus.send("com.hou", orderMessage, options1);
        }
    }

    三 接受消息Verticle编写

    public class ConsumerVerticle extends AbstractVerticle {
        @Override
        public void start() throws Exception {
            //每个Vertx实例默认是单例
            EventBus eb = vertx.eventBus();
            //注册处理器,消费com.hou发送的消息
            MessageConsumer<Object> consumer = eb.consumer("com.hou");//订阅地址
            consumer.handler(message -> {//消息处理器
                if(message.body() instanceof OrderMessage){
                    System.out.println("接受到对象: " + ((OrderMessage) message.body()).getName());
                }
                System.out.println("我是普通消费者: " + message.body());
                message.reply("收到了!"); // 回复生产者,send才能接受
            }).completionHandler(res -> {//注册完成后通知事件,适用于集群中比较慢的情况下
                    System.out.println("注册处理器结果"+res.succeeded());
            });
            //撤销处理器
            //consumer.unregister();
        }
    }

    四 注册部署Verticcle

    vertx.deployVerticle(ConsumerVerticle.class.getName());
            TimeUnit.SECONDS.sleep(1);
            vertx.deployVerticle(ProducerVerticle.class.getName());

    五 测试

  • 相关阅读:
    绿色版 notepad++ 添加鼠标右键菜单
    Scala 安装与配置
    Scala 神奇的下划线 _
    Kafka 安装部署
    Pulsar 下一代消息平台
    Sqoop 安装部署
    Flume 常用配置项
    Android-selector
    android- 9patch
    有关内存的思考题
  • 原文地址:https://www.cnblogs.com/houzheng/p/11279654.html
Copyright © 2011-2022 走看看