zoukankan      html  css  js  c++  java
  • RabbitMQ

    试着用RabbitMQ进行RPC。


    其实用RabbitMQ搞RPC也没什么特别的。
    只是我们需要在请求中再加入一个callback queue。
    比如这样:

    callbackQueueName = channel.queueDeclare().getQueue();
     
    BasicProperties props = new BasicProperties
                                .Builder()
                                .replyTo(callbackQueueName)
                                .build();

    channel.basicPublish("", "rpc_queue", props, message.getBytes());

    剩下的工作就是等待对方处理完成再从callback队列中读取响应消息。


    上面用到了BasicProperties。
    (注意:是com.rabbitmq.client.AMQP.BasicProperties 不是 com.rabbitmq.client.BasicProperties)
    关于Message properties,AMQP协议为消息预定义了14种属性。

            private String contentType;
            private String contentEncoding;
            private Map<String,Object> headers;
            private Integer deliveryMode;
            private Integer priority;
            private String correlationId;
            private String replyTo;
            private String expiration;
            private String messageId;
            private Date timestamp;
            private String type;
            private String userId;
            private String appId;
            private String clusterId;

    通常我们只需要使用其中一小部分:
    ·deliveryMode: 将消息设置为持久或者临时,2为持久,其余为临时。
    ·contentType: 指定mime-type,比如要使用JSON就是application/json
    ·replyTo: 指定callback queue的名字
    ·correlationId: 用来关联RPC请求和响应的标识。
    上面那段代码中就是用到了correlationId。

    另外需要说明这个correlationId。
    其实在上面的代码中我们为每一个RPC请求都创建了一个回调队列。
    但这样明显不效率,我们可以为每一个客户端只创建一个回调队列。

    但这样我们又需要考虑另一个问题:<当我们将收到的消息放到队列时,如何确定该消息是属于哪个请求?>

    这时我们可以使用correlationId解决这个问题。
    我们可以用它来为每一个请求加上标识,获取信息时对比这个标识,以对应请求和响应。
    如果我们收到了无法识别的correlationId,即该响应不与任何请求匹配,那么这个消息将会废除。

    好了,代码比较简单。

    class RPCServer{
        private static final String RPC_QUEUE_NAME = "rpc_queue";
        public static void main(String[] args) throws Exception {
     
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
     
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
     
            channel.basicQos(1);
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
     
            System.out.println(" [x] Awaiting RPC requests");
     
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
     
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps = new BasicProperties
                        .Builder()
                        .correlationId(props.getCorrelationId())
                        .build();
     
                String message = new String(delivery.getBody());
                int n = Integer.parseInt(message);
     
                System.out.println(" [.] fib(" + message + ")");
                String response = "" + fib(n);
     
                channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
     
        }
     
        private static int fib(int n) throws Exception {
            if (n == 0) return 0;
            if (n == 1) return 1;
            return fib(n-1) + fib(n-2);
        }
    }

    由于是共享队列,这里我们就不用exchange和routing了。
    另外,有时我们可能需要运行多个服务,为了让多个服务端负载均衡,我们可以使用prefetchCount。
    这个属性在之前任务队列的例子里也用过,也就是

    workerChannel.basicQos(1);

    即让多个worker一次获取一个任务。
    用basicConsume方法进入队列后循环等待请求,发现有请求到达时根据队列和CorrelationId对相应请求作出响应。

    另外需要注意的一点,server中basicConsume的第二个参数是false。
    其意义为是否自动作出回应,即:
    true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
    于是循环时需要显示调用basicAck进行回应。

    class RPCClient{
     
        private Connection connection;
        private Channel channel;
        private String requestQueueName = "rpc_queue";
        private String replyQueueName;
        private QueueingConsumer consumer;
     
        public RPCClient() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
     
            replyQueueName = channel.queueDeclare().getQueue();
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(replyQueueName, true, consumer);
        }
     
        public String call(String message) throws Exception {
            String response = null;
            String corrId = java.util.UUID.randomUUID().toString();
     
            BasicProperties props = new BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
     
            channel.basicPublish("", requestQueueName, props, message.getBytes());
     
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                    response = new String(delivery.getBody());
                    break;
                }
            }
     
            return response;
        }
     
        public void close() throws Exception {
            connection.close();
        }
    }

    callback队列只是一个匿名队列,但切记需要将其设置到BasicProperties中。
    corrId的生成方法有很多种,在这里使用UUID。
    call方法中通过调用basicPublish进行RPC请求,参数中带着BasicProperties。

  • 相关阅读:
    形象理解ERP(转)
    禁用windows server 2008 域密码复杂性要求策略
    How to adding find,filter,remove filter on display method Form
    Windows Server 2008 R2激活工具
    How to using bat command running VS development SSRS report
    Creating Your First Mac AppGetting Started
    Creating Your First Mac AppAdding a Track Object 添加一个 Track 对象
    Creating Your First Mac AppImplementing Action Methods 实现动作方法
    Creating Your First Mac AppReviewing the Code 审查代码
    Creating Your First Mac AppConfiguring the window 设置窗口
  • 原文地址:https://www.cnblogs.com/kavlez/p/4117291.html
Copyright © 2011-2022 走看看